feat: enhance structured output handling with new input fields (#9483)

* feat(agent): enhance structured output handling with new input fields and validation

- Added  and  inputs to the AgentComponent for improved structured output formatting.
- Introduced  method to streamline agent setup and memory data retrieval.
- Enhanced  method to support structured output validation against a defined schema.
- Implemented error handling for JSON parsing and validation, ensuring robust output processing.

This update improves the flexibility and reliability of the agent's structured response capabilities.

* feat(agent): enhance structured output handling with new input fields and validation

- Added `format_instructions` and `output_schema` inputs to the AgentComponent for improved structured output formatting.
- Introduced `get_agent_requirements` method to streamline agent setup and memory data retrieval.
- Enhanced `json_response` method to support structured output validation against a defined schema.
- Implemented error handling for JSON parsing and validation, ensuring robust output processing.

This update improves the flexibility and reliability of the agent's structured response capabilities.

* feat(agent): add new input fields for enhanced agent configuration

- Introduced , , and  inputs to the AgentComponent for improved agent configuration and interaction.
- Updated the handling of combined instructions to ensure clarity in agent behavior and output formatting.
- Enhanced JSON schema extraction process with clearer instructions for better structured output.

This update enhances the flexibility and usability of the agent component, allowing for more tailored interactions.

* feat(agent): add new input fields for enhanced agent configuration

- Introduced `agent_llm`, `system_prompt`, and `n_messages` inputs to the AgentComponent for improved agent configuration and interaction.
- Updated the handling of combined instructions to ensure clarity in agent behavior and output formatting.
- Enhanced JSON schema extraction process with clearer instructions for better structured output.

This update enhances the flexibility and usability of the agent component, allowing for more tailored interactions.

* template udpate

* test update

* refactor(tests): streamline mocking of get_agent_requirements in test_agent_component

- Consolidated the mocking of the `get_agent_requirements` method in multiple test cases for improved readability and consistency.
- Simplified the instantiation of `MockResult` objects to enhance clarity in test setup.

This refactor enhances the maintainability of the test code by reducing redundancy.

* [autofix.ci] apply automated fixes

* add new logging

* [autofix.ci] apply automated fixes

* update templates

* Update test_agent_component.py

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Carlos Coelho <80289056+carlosrcoelho@users.noreply.github.com>
This commit is contained in:
Edwin Jose 2025-08-25 15:01:30 -04:00 committed by GitHub
commit 749768fdb7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 388 additions and 93 deletions

View file

@ -2,6 +2,7 @@ import json
import re
from langchain_core.tools import StructuredTool
from pydantic import ValidationError
from langflow.base.agents.agent import LCToolsAgentComponent
from langflow.base.agents.events import ExceptionWithMessageError
@ -19,11 +20,13 @@ from langflow.components.langchain_utilities.tool_calling import ToolCallingAgen
from langflow.custom.custom_component.component import _get_component_toolkit
from langflow.custom.utils import update_component_build_config
from langflow.field_typing import Tool
from langflow.io import BoolInput, DropdownInput, IntInput, MultilineInput, Output
from langflow.helpers.base_model import build_model_from_schema
from langflow.io import BoolInput, DropdownInput, IntInput, MultilineInput, Output, TableInput
from langflow.logging import logger
from langflow.schema.data import Data
from langflow.schema.dotdict import dotdict
from langflow.schema.message import Message
from langflow.schema.table import EditMode
def set_advanced_true(component_input):
@ -78,6 +81,67 @@ class AgentComponent(ToolCallingAgentComponent):
advanced=True,
show=True,
),
MultilineInput(
name="format_instructions",
display_name="Output Format Instructions",
info="Generic Template for structured output formatting. Valid only with Structured response.",
value=(
"You are an AI that extracts structured JSON objects from unstructured text. "
"Use a predefined schema with expected types (str, int, float, bool, dict). "
"Extract ALL relevant instances that match the schema - if multiple patterns exist, capture them all. "
"Fill missing or ambiguous values with defaults: null for missing values. "
"Remove exact duplicates but keep variations that have different field values. "
"Always return valid JSON in the expected format, never throw errors. "
"If multiple objects can be extracted, return them all in the structured format."
),
advanced=True,
),
TableInput(
name="output_schema",
display_name="Output Schema",
info=(
"Schema Validation: Define the structure and data types for structured output. "
"No validation if no output schema."
),
advanced=True,
required=False,
value=[],
table_schema=[
{
"name": "name",
"display_name": "Name",
"type": "str",
"description": "Specify the name of the output field.",
"default": "field",
"edit_mode": EditMode.INLINE,
},
{
"name": "description",
"display_name": "Description",
"type": "str",
"description": "Describe the purpose of the output field.",
"default": "description of field",
"edit_mode": EditMode.POPOVER,
},
{
"name": "type",
"display_name": "Type",
"type": "str",
"edit_mode": EditMode.INLINE,
"description": ("Indicate the data type of the output field (e.g., str, int, float, bool, dict)."),
"options": ["str", "int", "float", "bool", "dict"],
"default": "str",
},
{
"name": "multiple",
"display_name": "As List",
"type": "boolean",
"description": "Set to True if this output field should be a list of the specified type.",
"default": "False",
"edit_mode": EditMode.INLINE,
},
],
),
*LCToolsAgentComponent._base_inputs,
# removed memory inputs from agent component
# *memory_inputs,
@ -94,31 +158,33 @@ class AgentComponent(ToolCallingAgentComponent):
Output(name="structured_response", display_name="Structured Response", method="json_response", tool_mode=False),
]
async def get_agent_requirements(self):
"""Get the agent requirements for the agent."""
llm_model, display_name = await self.get_llm()
if llm_model is None:
msg = "No language model selected. Please choose a model to proceed."
raise ValueError(msg)
self.model_name = get_model_name(llm_model, display_name=display_name)
# Get memory data
self.chat_history = await self.get_memory_data()
if isinstance(self.chat_history, Message):
self.chat_history = [self.chat_history]
# Add current date tool if enabled
if self.add_current_date_tool:
if not isinstance(self.tools, list): # type: ignore[has-type]
self.tools = []
current_date_tool = (await CurrentDateComponent(**self.get_base_args()).to_toolkit()).pop(0)
if not isinstance(current_date_tool, StructuredTool):
msg = "CurrentDateComponent must be converted to a StructuredTool"
raise TypeError(msg)
self.tools.append(current_date_tool)
return llm_model, self.chat_history, self.tools
async def message_response(self) -> Message:
try:
# Get LLM model and validate
llm_model, display_name = self.get_llm()
if llm_model is None:
msg = "No language model selected. Please choose a model to proceed."
raise ValueError(msg)
self.model_name = get_model_name(llm_model, display_name=display_name)
# Get memory data
self.chat_history = await self.get_memory_data()
if isinstance(self.chat_history, Message):
self.chat_history = [self.chat_history]
# Add current date tool if enabled
if self.add_current_date_tool:
if not isinstance(self.tools, list): # type: ignore[has-type]
self.tools = []
current_date_tool = (await CurrentDateComponent(**self.get_base_args()).to_toolkit()).pop(0)
if not isinstance(current_date_tool, StructuredTool):
msg = "CurrentDateComponent must be converted to a StructuredTool"
raise TypeError(msg)
self.tools.append(current_date_tool)
# note the tools are not required to run the agent, hence the validation removed.
llm_model, self.chat_history, self.tools = await self.get_agent_requirements()
# Set up and run agent
self.set(
llm=llm_model,
@ -132,7 +198,6 @@ class AgentComponent(ToolCallingAgentComponent):
# Store result for potential JSON output
self._agent_result = result
# return result
except (ValueError, TypeError, KeyError) as e:
await logger.aerror(f"{type(e).__name__}: {e!s}")
@ -140,44 +205,173 @@ class AgentComponent(ToolCallingAgentComponent):
except ExceptionWithMessageError as e:
await logger.aerror(f"ExceptionWithMessageError occurred: {e}")
raise
# Avoid catching blind Exception; let truly unexpected exceptions propagate
except Exception as e:
await logger.aerror(f"Unexpected error: {e!s}")
raise
else:
return result
async def json_response(self) -> Data:
"""Convert agent response to structured JSON Data output."""
# Run the regular message response first to get the result
if not hasattr(self, "_agent_result"):
await self.message_response()
def _preprocess_schema(self, schema):
"""Preprocess schema to ensure correct data types for build_model_from_schema."""
processed_schema = []
for field in schema:
processed_field = {
"name": str(field.get("name", "field")),
"type": str(field.get("type", "str")),
"description": str(field.get("description", "")),
"multiple": field.get("multiple", False),
}
# Ensure multiple is handled correctly
if isinstance(processed_field["multiple"], str):
processed_field["multiple"] = processed_field["multiple"].lower() in ["true", "1", "t", "y", "yes"]
processed_schema.append(processed_field)
return processed_schema
result = self._agent_result
async def build_structured_output_base(self, content: str):
"""Build structured output with optional BaseModel validation."""
json_pattern = r"\{.*\}"
schema_error_msg = "Try setting an output schema"
# Extract content from result
if hasattr(result, "content"):
content = result.content
elif hasattr(result, "text"):
content = result.text
else:
content = str(result)
# Try to parse as JSON
# Try to parse content as JSON first
json_data = None
try:
json_data = json.loads(content)
return Data(data=json_data)
except json.JSONDecodeError:
# If it's not valid JSON, try to extract JSON from the content
json_match = re.search(r"\{.*\}", content, re.DOTALL)
json_match = re.search(json_pattern, content, re.DOTALL)
if json_match:
try:
json_data = json.loads(json_match.group())
return Data(data=json_data)
except json.JSONDecodeError:
pass
return {"content": content, "error": schema_error_msg}
else:
return {"content": content, "error": schema_error_msg}
# If we can't extract JSON, return the raw content as data
return Data(data={"content": content, "error": "Could not parse as JSON"})
# If no output schema provided, return parsed JSON without validation
if not hasattr(self, "output_schema") or not self.output_schema or len(self.output_schema) == 0:
return json_data
# Use BaseModel validation with schema
try:
processed_schema = self._preprocess_schema(self.output_schema)
output_model = build_model_from_schema(processed_schema)
# Validate against the schema
if isinstance(json_data, list):
# Multiple objects
validated_objects = []
for item in json_data:
try:
validated_obj = output_model.model_validate(item)
validated_objects.append(validated_obj.model_dump())
except ValidationError as e:
await logger.aerror(f"Validation error for item: {e}")
# Include invalid items with error info
validated_objects.append({"data": item, "validation_error": str(e)})
return validated_objects
# Single object
try:
validated_obj = output_model.model_validate(json_data)
return [validated_obj.model_dump()] # Return as list for consistency
except ValidationError as e:
await logger.aerror(f"Validation error: {e}")
return [{"data": json_data, "validation_error": str(e)}]
except (TypeError, ValueError) as e:
await logger.aerror(f"Error building structured output: {e}")
# Fallback to parsed JSON without validation
return json_data
async def json_response(self) -> Data:
"""Convert agent response to structured JSON Data output with schema validation."""
# Always use structured chat agent for JSON response mode for better JSON formatting
try:
system_components = []
# 1. Agent Instructions (system_prompt)
agent_instructions = getattr(self, "system_prompt", "") or ""
if agent_instructions:
system_components.append(f"{agent_instructions}")
# 2. Format Instructions
format_instructions = getattr(self, "format_instructions", "") or ""
if format_instructions:
system_components.append(f"Format instructions: {format_instructions}")
# 3. Schema Information from BaseModel
if hasattr(self, "output_schema") and self.output_schema and len(self.output_schema) > 0:
try:
processed_schema = self._preprocess_schema(self.output_schema)
output_model = build_model_from_schema(processed_schema)
schema_dict = output_model.model_json_schema()
schema_info = (
"You are given some text that may include format instructions, "
"explanations, or other content alongside a JSON schema.\n\n"
"Your task:\n"
"- Extract only the JSON schema.\n"
"- Return it as valid JSON.\n"
"- Do not include format instructions, explanations, or extra text.\n\n"
"Input:\n"
f"{json.dumps(schema_dict, indent=2)}\n\n"
"Output (only JSON schema):"
)
system_components.append(schema_info)
except (ValidationError, ValueError, TypeError, KeyError) as e:
await logger.aerror(f"Could not build schema for prompt: {e}", exc_info=True)
# Combine all components
combined_instructions = "\n\n".join(system_components) if system_components else ""
llm_model, self.chat_history, self.tools = await self.get_agent_requirements()
self.set(
llm=llm_model,
tools=self.tools or [],
chat_history=self.chat_history,
input_value=self.input_value,
system_prompt=combined_instructions,
)
# Create and run structured chat agent
try:
structured_agent = self.create_agent_runnable()
except (NotImplementedError, ValueError, TypeError) as e:
await logger.aerror(f"Error with structured chat agent: {e}")
raise
try:
result = await self.run_agent(structured_agent)
except (ExceptionWithMessageError, ValueError, TypeError, RuntimeError) as e:
await logger.aerror(f"Error with structured agent result: {e}")
raise
# Extract content from structured agent result
if hasattr(result, "content"):
content = result.content
elif hasattr(result, "text"):
content = result.text
else:
content = str(result)
except (ExceptionWithMessageError, ValueError, TypeError, NotImplementedError, AttributeError) as e:
await logger.aerror(f"Error with structured chat agent: {e}")
# Fallback to regular agent
content_str = "No content returned from agent"
return Data(data={"content": content_str, "error": str(e)})
# Process with structured output validation
try:
structured_output = await self.build_structured_output_base(content)
# Handle different output formats
if isinstance(structured_output, list) and structured_output:
if len(structured_output) == 1:
return Data(data=structured_output[0])
return Data(data={"results": structured_output})
if isinstance(structured_output, dict):
return Data(data=structured_output)
return Data(data={"content": content})
except (ValueError, TypeError) as e:
await logger.aerror(f"Error in structured output processing: {e}")
return Data(data={"content": content, "error": str(e)})
async def get_memory_data(self):
# TODO: This is a temporary fix to avoid message duplication. We should develop a function for this.
@ -190,7 +384,7 @@ class AgentComponent(ToolCallingAgentComponent):
message for message in messages if getattr(message, "id", None) != getattr(self.input_value, "id", None)
]
def get_llm(self):
async def get_llm(self):
if not isinstance(self.agent_llm, str):
return self.agent_llm, None
@ -207,8 +401,8 @@ class AgentComponent(ToolCallingAgentComponent):
return self._build_llm_model(component_class, inputs, prefix), display_name
except Exception as e:
logger.error(f"Error building {self.agent_llm} language model: {e!s}")
except (AttributeError, ValueError, TypeError, RuntimeError) as e:
await logger.aerror(f"Error building {self.agent_llm} language model: {e!s}")
msg = f"Failed to initialize language model: {e!s}"
raise ValueError(msg) from e

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -49,6 +49,9 @@ class TestAgentComponent(ComponentTestBaseWithoutClient):
"system_prompt": "You are a helpful assistant.",
"tools": [],
"verbose": True,
"n_messages": 100,
"format_instructions": "You are an AI that extracts structured JSON objects from unstructured text.",
"output_schema": [],
}
async def test_build_config_update(self, component_class, default_kwargs):
@ -129,10 +132,13 @@ class TestAgentComponent(ComponentTestBaseWithoutClient):
async def test_json_response_parsing_valid_json(self, component_class, default_kwargs):
"""Test that json_response correctly parses JSON from agent response."""
component = await self.component_setup(component_class, default_kwargs)
# Mock the get_agent_requirements method to avoid actual LLM calls
from unittest.mock import AsyncMock
# Mock a response with valid JSON
component.get_agent_requirements = AsyncMock(return_value=(MockLanguageModel(), [], []))
component.create_agent_runnable = AsyncMock(return_value=None)
mock_result = type("MockResult", (), {"content": '{"name": "test", "value": 123}'})()
component._agent_result = mock_result
component.run_agent = AsyncMock(return_value=mock_result)
result = await component.json_response()
@ -144,10 +150,13 @@ class TestAgentComponent(ComponentTestBaseWithoutClient):
async def test_json_response_parsing_embedded_json(self, component_class, default_kwargs):
"""Test that json_response handles text containing JSON."""
component = await self.component_setup(component_class, default_kwargs)
# Mock the get_agent_requirements method to avoid actual LLM calls
from unittest.mock import AsyncMock
# Mock a response with text containing JSON
component.get_agent_requirements = AsyncMock(return_value=(MockLanguageModel(), [], []))
component.create_agent_runnable = AsyncMock(return_value=None)
mock_result = type("MockResult", (), {"content": 'Here is the result: {"status": "success"} - done!'})()
component._agent_result = mock_result
component.run_agent = AsyncMock(return_value=mock_result)
result = await component.json_response()
@ -159,10 +168,13 @@ class TestAgentComponent(ComponentTestBaseWithoutClient):
async def test_json_response_error_handling(self, component_class, default_kwargs):
"""Test that json_response handles completely non-JSON responses."""
component = await self.component_setup(component_class, default_kwargs)
# Mock the get_agent_requirements method to avoid actual LLM calls
from unittest.mock import AsyncMock
# Mock a response with no JSON
component.get_agent_requirements = AsyncMock(return_value=(MockLanguageModel(), [], []))
component.create_agent_runnable = AsyncMock(return_value=None)
mock_result = type("MockResult", (), {"content": "This is just plain text with no JSON"})()
component._agent_result = mock_result
component.run_agent = AsyncMock(return_value=mock_result)
result = await component.json_response()
@ -190,30 +202,28 @@ class TestAgentComponent(ComponentTestBaseWithoutClient):
# Verify set was called (meaning no AttributeError occurred)
mock_component.set.assert_called_once()
async def test_shared_execution_between_outputs(self, component_class, default_kwargs):
"""Test that both outputs use the same agent execution."""
async def test_json_response_with_schema_validation(self, component_class, default_kwargs):
"""Test that json_response validates against provided schema."""
# Set up component with output schema
default_kwargs["output_schema"] = [
{"name": "name", "type": "str", "description": "Name field", "multiple": False},
{"name": "age", "type": "int", "description": "Age field", "multiple": False},
]
component = await self.component_setup(component_class, default_kwargs)
# Mock the message_response method
# Mock the get_agent_requirements method
from unittest.mock import AsyncMock
mock_result = type("MockResult", (), {"content": '{"shared": "result"}'})()
component.get_agent_requirements = AsyncMock(return_value=(MockLanguageModel(), [], []))
component.create_agent_runnable = AsyncMock(return_value=None)
mock_result = type("MockResult", (), {"content": '{"name": "John", "age": 25}'})()
component.run_agent = AsyncMock(return_value=mock_result)
async def mock_message_response_side_effect():
component._agent_result = mock_result
return mock_result
result = await component.json_response()
component.message_response = AsyncMock(side_effect=mock_message_response_side_effect)
from langflow.schema.data import Data
# Call json_response first
json_result = await component.json_response()
# message_response should have been called once
component.message_response.assert_called_once()
# Verify the result was stored and reused
assert hasattr(component, "_agent_result")
assert json_result.data == {"shared": "result"}
assert isinstance(result, Data)
assert result.data == {"name": "John", "age": 25}
async def test_agent_component_initialization(self, component_class, default_kwargs):
"""Test that Agent component initializes correctly with filtered inputs."""
@ -240,6 +250,97 @@ class TestAgentComponent(ComponentTestBaseWithoutClient):
assert "system_prompt" in build_config
assert "add_current_date_tool" in build_config
async def test_preprocess_schema(self, component_class, default_kwargs):
"""Test that _preprocess_schema correctly handles schema validation."""
component = await self.component_setup(component_class, default_kwargs)
# Test schema preprocessing
raw_schema = [
{"name": "field1", "type": "str", "description": "Test field", "multiple": "true"},
{"name": "field2", "type": "int", "description": "Another field", "multiple": False},
]
processed = component._preprocess_schema(raw_schema)
assert len(processed) == 2
assert processed[0]["multiple"] is True # String "true" should be converted to bool
assert processed[1]["multiple"] is False
async def test_build_structured_output_base_with_validation(self, component_class, default_kwargs):
"""Test build_structured_output_base with schema validation."""
default_kwargs["output_schema"] = [
{"name": "name", "type": "str", "description": "Name field", "multiple": False},
{"name": "count", "type": "int", "description": "Count field", "multiple": False},
]
component = await self.component_setup(component_class, default_kwargs)
# Test valid JSON that matches schema
valid_content = '{"name": "test", "count": 42}'
result = await component.build_structured_output_base(valid_content)
assert result == [{"name": "test", "count": 42}]
async def test_build_structured_output_base_without_schema(self, component_class, default_kwargs):
"""Test build_structured_output_base without schema validation."""
component = await self.component_setup(component_class, default_kwargs)
# Test with no output_schema
content = '{"any": "data", "number": 123}'
result = await component.build_structured_output_base(content)
assert result == {"any": "data", "number": 123}
async def test_build_structured_output_base_embedded_json(self, component_class, default_kwargs):
"""Test extraction of JSON from embedded text."""
component = await self.component_setup(component_class, default_kwargs)
content = 'Here is some text with {"embedded": "json"} inside it.'
result = await component.build_structured_output_base(content)
assert result == {"embedded": "json"}
async def test_build_structured_output_base_no_json(self, component_class, default_kwargs):
"""Test handling of content with no JSON."""
component = await self.component_setup(component_class, default_kwargs)
content = "This is just plain text with no JSON at all."
result = await component.build_structured_output_base(content)
assert "error" in result
assert result["content"] == content
async def test_new_input_fields_present(self, component_class, default_kwargs):
"""Test that new input fields are present in the component."""
component = await self.component_setup(component_class, default_kwargs)
input_names = [inp.name for inp in component.inputs if hasattr(inp, "name")]
# Test for new fields
assert "format_instructions" in input_names
assert "output_schema" in input_names
assert "n_messages" in input_names
# Verify default values
assert hasattr(component, "format_instructions")
assert hasattr(component, "output_schema")
assert hasattr(component, "n_messages")
assert component.n_messages == 100
async def test_agent_has_correct_outputs(self, component_class, default_kwargs):
"""Test that Agent component has the correct output configuration."""
component = await self.component_setup(component_class, default_kwargs)
assert len(component.outputs) == 2
# Test response output
response_output = component.outputs[0]
assert response_output.name == "response"
assert response_output.display_name == "Response"
assert response_output.method == "message_response"
# Test structured response output
structured_output = component.outputs[1]
assert structured_output.name == "structured_response"
assert structured_output.display_name == "Structured Response"
assert structured_output.method == "json_response"
assert structured_output.tool_mode is False
class TestAgentComponentWithClient(ComponentTestBaseWithClient):
@pytest.fixture