feat: Add async output resolution with caching and ordered processing (#7487)

* feat: enhance output processing to maintain order

* feat: add async output resolution method with caching support

* test: Update component outputs in test_component_events.py

Enhance the test for component build results by adding output definitions for 'text_output' and 'tool_output' to ensure comprehensive coverage of output handling during the build process.

* 📝 Add docstrings to `order-outputs` (#8280)

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* [autofix.ci] apply automated fixes

* fix: Update output retrieval in Component class to handle missing outputs gracefully

Modified the output retrieval logic in the Component class to use `get` method for accessing `_outputs_map`, providing a default value of a deepcopy of the output. This change enhances robustness by preventing KeyError exceptions when an output is not found in the map.

* refactor: Enhance output processing logic in Component class

Updated the _get_outputs_to_process method to first process outputs in the order defined by self.outputs, followed by any remaining outputs from _outputs_map. This change improves the output handling logic and ensures that all relevant outputs are considered for processing.

* refactor: Improve docstring clarity in test_component_events.py

Updated the docstring for the test_component_build_results function to enhance clarity and readability. The changes ensure that the purpose and expectations of the test are clearly communicated, improving documentation quality.

---------

Co-authored-by: Edwin Jose <edwin.jose@datastax.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Gabriel Luiz Freitas Almeida 2025-05-30 12:10:56 -03:00 committed by GitHub
commit aaf36c4316
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 72 additions and 2 deletions

View file

@ -963,14 +963,53 @@ class Component(CustomComponent):
self._append_tool_to_outputs_map()
def _should_process_output(self, output):
"""Determines whether a given output should be processed based on vertex edge configuration.
Returns True if the component has no vertex or outgoing edges, or if the output's name is among
the vertex's source edge names.
"""
if not self._vertex or not self._vertex.outgoing_edges:
return True
return output.name in self._vertex.edges_source_names
def _get_outputs_to_process(self):
return (output for output in self._outputs_map.values() if self._should_process_output(output))
"""Returns a list of outputs to process, ordered according to self.outputs.
Outputs are included only if they should be processed, as determined by _should_process_output.
First processes outputs in the order defined by self.outputs, then processes any remaining outputs
from _outputs_map that weren't in self.outputs.
Returns:
list: Outputs to be processed in the defined order.
Raises:
ValueError: If an output name in self.outputs is not present in _outputs_map.
"""
result = []
processed_names = set()
# First process outputs in the order defined by self.outputs
for output in self.outputs:
output_obj = self._outputs_map.get(output.name, deepcopy(output))
if self._should_process_output(output_obj):
result.append(output_obj)
processed_names.add(output_obj.name)
# Then process any remaining outputs from _outputs_map
for name, output_obj in self._outputs_map.items():
if name not in processed_names and self._should_process_output(output_obj):
result.append(output_obj)
return result
async def _get_output_result(self, output):
"""Computes and returns the result for a given output, applying caching and output options.
If the output is cached and a value is already defined, returns the cached value. Otherwise,
invokes the associated output method asynchronously, applies output options, updates the cache,
and returns the result. Raises a ValueError if the output method is not defined, or a TypeError
if the method invocation fails.
"""
if output.cache and output.value != UNDEFINED:
return output.value
@ -997,7 +1036,29 @@ class Component(CustomComponent):
return result
async def resolve_output(self, output_name: str) -> Any:
"""Resolves and returns the value for a specified output by name.
If output caching is enabled and a value is already available, returns the cached value;
otherwise, computes and returns the output result. Raises a KeyError if the output name
does not exist.
"""
output = self._outputs_map.get(output_name)
if output is None:
msg = (
f"Sorry, an output named '{output_name}' could not be found. "
"Please ensure that the output is correctly configured and try again."
)
raise KeyError(msg)
if output.cache and output.value != UNDEFINED:
return output.value
return await self._get_output_result(output)
def _build_artifact(self, result):
"""Builds an artifact dictionary containing a string representation, raw data, and type for a result.
The artifact includes a human-readable representation, the processed raw result, and its determined type.
"""
custom_repr = self.custom_repr()
if custom_repr is None and isinstance(result, dict | Data | str):
custom_repr = result

View file

@ -134,7 +134,11 @@ async def test_component_error_handling():
@pytest.mark.usefixtures("client")
async def test_component_build_results():
"""Test component's build_results functionality."""
"""Test that build_results correctly generates output results and artifacts for defined outputs.
Test that the results dictionary contains the correct output keys and values,
and that the artifacts dictionary includes the correct types for each output.
"""
# Create event queue and manager
queue = asyncio.Queue()
event_manager = EventManager(queue)
@ -149,6 +153,11 @@ async def test_component_build_results():
"tool_output": Output(name="tool_output", method="get_tool"),
}
component.outputs = [
Output(name="text_output", method="get_text"),
Output(name="tool_output", method="get_tool"),
]
# Build results
results, artifacts = await component._build_results()