refactor: decompose complex function into smaller ones for readability (#5517)
* refactor: decompose complex function into smaller ones for readability * chore: optimize output processing logic for efficiency * Update src/backend/base/langflow/custom/custom_component/component.py Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org> --------- Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
This commit is contained in:
parent
a47dee3065
commit
59f375b61a
1 changed files with 79 additions and 63 deletions
|
|
@ -868,78 +868,94 @@ class Component(CustomComponent):
|
|||
raise
|
||||
|
||||
async def _build_results(self) -> tuple[dict, dict]:
|
||||
results = {}
|
||||
artifacts = {}
|
||||
results, artifacts = {}, {}
|
||||
|
||||
self._pre_run_setup_if_needed()
|
||||
self._handle_tool_mode()
|
||||
|
||||
for output in self._get_outputs_to_process():
|
||||
self._current_output = output.name
|
||||
result = await self._get_output_result(output)
|
||||
results[output.name] = result
|
||||
artifacts[output.name] = self._build_artifact(result)
|
||||
self._log_output(output)
|
||||
|
||||
self._finalize_results(results, artifacts)
|
||||
return results, artifacts
|
||||
|
||||
def _pre_run_setup_if_needed(self):
|
||||
if hasattr(self, "_pre_run_setup"):
|
||||
self._pre_run_setup()
|
||||
if hasattr(self, "outputs"):
|
||||
if any(getattr(_input, "tool_mode", False) for _input in self.inputs):
|
||||
self._append_tool_to_outputs_map()
|
||||
for output in self._outputs_map.values():
|
||||
# Build the output if it's connected to some other vertex
|
||||
# or if it's not connected to any vertex
|
||||
if (
|
||||
not self._vertex
|
||||
or not self._vertex.outgoing_edges
|
||||
or output.name in self._vertex.edges_source_names
|
||||
):
|
||||
if output.method is None:
|
||||
msg = f"Output {output.name} does not have a method defined."
|
||||
raise ValueError(msg)
|
||||
self._current_output = output.name
|
||||
method: Callable = getattr(self, output.method)
|
||||
if output.cache and output.value != UNDEFINED:
|
||||
results[output.name] = output.value
|
||||
result = output.value
|
||||
else:
|
||||
# If the method is asynchronous, we need to await it
|
||||
if inspect.iscoroutinefunction(method):
|
||||
result = await method()
|
||||
else:
|
||||
result = await asyncio.to_thread(method)
|
||||
if (
|
||||
self._vertex is not None
|
||||
and isinstance(result, Message)
|
||||
and result.flow_id is None
|
||||
and self._vertex.graph.flow_id is not None
|
||||
):
|
||||
result.set_flow_id(self._vertex.graph.flow_id)
|
||||
results[output.name] = result
|
||||
output.value = result
|
||||
|
||||
custom_repr = self.custom_repr()
|
||||
if custom_repr is None and isinstance(result, dict | Data | str):
|
||||
custom_repr = result
|
||||
if not isinstance(custom_repr, str):
|
||||
custom_repr = str(custom_repr)
|
||||
raw = result
|
||||
if self.status is None:
|
||||
artifact_value = raw
|
||||
else:
|
||||
artifact_value = self.status
|
||||
raw = self.status
|
||||
def _handle_tool_mode(self):
|
||||
if hasattr(self, "outputs") and any(getattr(_input, "tool_mode", False) for _input in self.inputs):
|
||||
self._append_tool_to_outputs_map()
|
||||
|
||||
if hasattr(raw, "data") and raw is not None:
|
||||
raw = raw.data
|
||||
if raw is None:
|
||||
raw = custom_repr
|
||||
def _should_process_output(self, output):
|
||||
if not self._vertex or not self._vertex.outgoing_edges:
|
||||
return True
|
||||
return output.name in self._vertex.edges_source_names
|
||||
|
||||
elif hasattr(raw, "model_dump") and raw is not None:
|
||||
raw = raw.model_dump()
|
||||
if raw is None and isinstance(result, dict | Data | str):
|
||||
raw = result.data if isinstance(result, Data) else result
|
||||
artifact_type = get_artifact_type(artifact_value, result)
|
||||
raw, artifact_type = post_process_raw(raw, artifact_type)
|
||||
artifact = {"repr": custom_repr, "raw": raw, "type": artifact_type}
|
||||
artifacts[output.name] = artifact
|
||||
self._output_logs[output.name] = self._logs
|
||||
self._logs = []
|
||||
self._current_output = ""
|
||||
def _get_outputs_to_process(self):
|
||||
return (output for output in self._outputs_map.values() if self._should_process_output(output))
|
||||
|
||||
async def _get_output_result(self, output):
|
||||
if output.cache and output.value != UNDEFINED:
|
||||
return output.value
|
||||
|
||||
if output.method is None:
|
||||
msg = f'Output "{output.name}" does not have a method defined.'
|
||||
raise ValueError(msg)
|
||||
|
||||
method = getattr(self, output.method)
|
||||
try:
|
||||
result = await method() if inspect.iscoroutinefunction(method) else await asyncio.to_thread(method)
|
||||
except TypeError as e:
|
||||
msg = f'Error running method "{output.method}": {e}'
|
||||
raise TypeError(msg) from e
|
||||
|
||||
if (
|
||||
self._vertex is not None
|
||||
and isinstance(result, Message)
|
||||
and result.flow_id is None
|
||||
and self._vertex.graph.flow_id is not None
|
||||
):
|
||||
result.set_flow_id(self._vertex.graph.flow_id)
|
||||
|
||||
output.value = result
|
||||
return result
|
||||
|
||||
def _build_artifact(self, result):
|
||||
custom_repr = self.custom_repr() or (result if isinstance(result, dict | Data | str) else str(result))
|
||||
|
||||
raw = self._process_raw_result(result)
|
||||
artifact_type = get_artifact_type(self.status or raw, result)
|
||||
raw, artifact_type = post_process_raw(raw, artifact_type)
|
||||
return {"repr": custom_repr, "raw": raw, "type": artifact_type}
|
||||
|
||||
def _process_raw_result(self, result):
|
||||
if self.status:
|
||||
raw = self.status
|
||||
elif hasattr(result, "data"):
|
||||
raw = result.data
|
||||
elif hasattr(result, "model_dump"):
|
||||
raw = result.model_dump()
|
||||
elif isinstance(result, dict | Data | str):
|
||||
raw = result.data if isinstance(result, Data) else result
|
||||
else:
|
||||
raw = result
|
||||
return raw
|
||||
|
||||
def _log_output(self, output):
|
||||
self._output_logs[output.name] = self._logs
|
||||
self._logs = []
|
||||
self._current_output = ""
|
||||
|
||||
def _finalize_results(self, results, artifacts):
|
||||
self._artifacts = artifacts
|
||||
self._results = results
|
||||
if self._tracing_service:
|
||||
self._tracing_service.set_outputs(self.trace_name, results)
|
||||
return results, artifacts
|
||||
|
||||
def custom_repr(self):
|
||||
if self.repr_value == "":
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue