From 59f375b61a2b3b0640ebcfe6e09d0a787b55c30c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=8Dtalo=20Johnny?= Date: Fri, 17 Jan 2025 16:55:21 -0300 Subject: [PATCH] refactor: decompose complex function into smaller ones for readability (#5517) * refactor: decompose complex function into smaller ones for readability * chore: optimize output processing logic for efficiency * Update src/backend/base/langflow/custom/custom_component/component.py Co-authored-by: Gabriel Luiz Freitas Almeida --------- Co-authored-by: Gabriel Luiz Freitas Almeida --- .../custom/custom_component/component.py | 142 ++++++++++-------- 1 file changed, 79 insertions(+), 63 deletions(-) diff --git a/src/backend/base/langflow/custom/custom_component/component.py b/src/backend/base/langflow/custom/custom_component/component.py index 2a1f51375..20e09ed4c 100644 --- a/src/backend/base/langflow/custom/custom_component/component.py +++ b/src/backend/base/langflow/custom/custom_component/component.py @@ -868,78 +868,94 @@ class Component(CustomComponent): raise async def _build_results(self) -> tuple[dict, dict]: - results = {} - artifacts = {} + results, artifacts = {}, {} + + self._pre_run_setup_if_needed() + self._handle_tool_mode() + + for output in self._get_outputs_to_process(): + self._current_output = output.name + result = await self._get_output_result(output) + results[output.name] = result + artifacts[output.name] = self._build_artifact(result) + self._log_output(output) + + self._finalize_results(results, artifacts) + return results, artifacts + + def _pre_run_setup_if_needed(self): if hasattr(self, "_pre_run_setup"): self._pre_run_setup() - if hasattr(self, "outputs"): - if any(getattr(_input, "tool_mode", False) for _input in self.inputs): - self._append_tool_to_outputs_map() - for output in self._outputs_map.values(): - # Build the output if it's connected to some other vertex - # or if it's not connected to any vertex - if ( - not self._vertex - or not self._vertex.outgoing_edges - or output.name in self._vertex.edges_source_names - ): - if output.method is None: - msg = f"Output {output.name} does not have a method defined." - raise ValueError(msg) - self._current_output = output.name - method: Callable = getattr(self, output.method) - if output.cache and output.value != UNDEFINED: - results[output.name] = output.value - result = output.value - else: - # If the method is asynchronous, we need to await it - if inspect.iscoroutinefunction(method): - result = await method() - else: - result = await asyncio.to_thread(method) - if ( - self._vertex is not None - and isinstance(result, Message) - and result.flow_id is None - and self._vertex.graph.flow_id is not None - ): - result.set_flow_id(self._vertex.graph.flow_id) - results[output.name] = result - output.value = result - custom_repr = self.custom_repr() - if custom_repr is None and isinstance(result, dict | Data | str): - custom_repr = result - if not isinstance(custom_repr, str): - custom_repr = str(custom_repr) - raw = result - if self.status is None: - artifact_value = raw - else: - artifact_value = self.status - raw = self.status + def _handle_tool_mode(self): + if hasattr(self, "outputs") and any(getattr(_input, "tool_mode", False) for _input in self.inputs): + self._append_tool_to_outputs_map() - if hasattr(raw, "data") and raw is not None: - raw = raw.data - if raw is None: - raw = custom_repr + def _should_process_output(self, output): + if not self._vertex or not self._vertex.outgoing_edges: + return True + return output.name in self._vertex.edges_source_names - elif hasattr(raw, "model_dump") and raw is not None: - raw = raw.model_dump() - if raw is None and isinstance(result, dict | Data | str): - raw = result.data if isinstance(result, Data) else result - artifact_type = get_artifact_type(artifact_value, result) - raw, artifact_type = post_process_raw(raw, artifact_type) - artifact = {"repr": custom_repr, "raw": raw, "type": artifact_type} - artifacts[output.name] = artifact - self._output_logs[output.name] = self._logs - self._logs = [] - self._current_output = "" + def _get_outputs_to_process(self): + return (output for output in self._outputs_map.values() if self._should_process_output(output)) + + async def _get_output_result(self, output): + if output.cache and output.value != UNDEFINED: + return output.value + + if output.method is None: + msg = f'Output "{output.name}" does not have a method defined.' + raise ValueError(msg) + + method = getattr(self, output.method) + try: + result = await method() if inspect.iscoroutinefunction(method) else await asyncio.to_thread(method) + except TypeError as e: + msg = f'Error running method "{output.method}": {e}' + raise TypeError(msg) from e + + if ( + self._vertex is not None + and isinstance(result, Message) + and result.flow_id is None + and self._vertex.graph.flow_id is not None + ): + result.set_flow_id(self._vertex.graph.flow_id) + + output.value = result + return result + + def _build_artifact(self, result): + custom_repr = self.custom_repr() or (result if isinstance(result, dict | Data | str) else str(result)) + + raw = self._process_raw_result(result) + artifact_type = get_artifact_type(self.status or raw, result) + raw, artifact_type = post_process_raw(raw, artifact_type) + return {"repr": custom_repr, "raw": raw, "type": artifact_type} + + def _process_raw_result(self, result): + if self.status: + raw = self.status + elif hasattr(result, "data"): + raw = result.data + elif hasattr(result, "model_dump"): + raw = result.model_dump() + elif isinstance(result, dict | Data | str): + raw = result.data if isinstance(result, Data) else result + else: + raw = result + return raw + + def _log_output(self, output): + self._output_logs[output.name] = self._logs + self._logs = [] + self._current_output = "" + + def _finalize_results(self, results, artifacts): self._artifacts = artifacts self._results = results if self._tracing_service: self._tracing_service.set_outputs(self.trace_name, results) - return results, artifacts def custom_repr(self): if self.repr_value == "":