* feat: Add dual output support to Agent component with structured JSON parsing ## Summary - Add "Structured Response" output alongside existing "Response" output - Filter out conflicting json_mode field from OpenAI inputs - Implement robust JSON parsing with fallback handling ## Changes Made ### Agent Component (agent.py) - Add second output: "Structured Response" (Data type) with tool_mode=False - Filter json_mode from OpenAI inputs to prevent UI conflicts - Add json_response() method with multi-stage JSON parsing: - Direct JSON parsing for valid responses - Regex extraction for embedded JSON in text - Graceful error handling with diagnostic info - Share execution between outputs (no duplicate agent runs) - Fix model building to handle missing json_mode attribute ### Tests (test_agent_component.py) - Add 9 comprehensive test cases covering: - Dual output structure validation - Input filtering verification - JSON parsing (valid, embedded, error cases) - Model building without json_mode - Shared execution efficiency - Frontend node structure - Component initialization ## Benefits - Users get both Message and Data output types to choose from - Clean UI without confusing duplicate JSON toggles - Robust JSON parsing handles various response formats - Efficient single-execution approach - Maintains backward compatibility 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * [autofix.ci] apply automated fixes * update to templates with model list update * [autofix.ci] apply automated fixes * Update test_agent_component.py * update to the test and update to templates * [autofix.ci] apply automated fixes --------- Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Edwin Jose <edwin.jose@datastax.com>
359 lines
15 KiB
Python
359 lines
15 KiB
Python
import json
|
|
import re
|
|
|
|
from langchain_core.tools import StructuredTool
|
|
|
|
from langflow.base.agents.agent import LCToolsAgentComponent
|
|
from langflow.base.agents.events import ExceptionWithMessageError
|
|
from langflow.base.models.model_input_constants import (
|
|
ALL_PROVIDER_FIELDS,
|
|
MODEL_DYNAMIC_UPDATE_FIELDS,
|
|
MODEL_PROVIDERS,
|
|
MODEL_PROVIDERS_DICT,
|
|
MODELS_METADATA,
|
|
)
|
|
from langflow.base.models.model_utils import get_model_name
|
|
from langflow.components.helpers.current_date import CurrentDateComponent
|
|
from langflow.components.helpers.memory import MemoryComponent
|
|
from langflow.components.langchain_utilities.tool_calling import ToolCallingAgentComponent
|
|
from langflow.custom.custom_component.component import _get_component_toolkit
|
|
from langflow.custom.utils import update_component_build_config
|
|
from langflow.field_typing import Tool
|
|
from langflow.io import BoolInput, DropdownInput, IntInput, MultilineInput, Output
|
|
from langflow.logging import logger
|
|
from langflow.schema.data import Data
|
|
from langflow.schema.dotdict import dotdict
|
|
from langflow.schema.message import Message
|
|
|
|
|
|
def set_advanced_true(component_input):
|
|
component_input.advanced = True
|
|
return component_input
|
|
|
|
|
|
MODEL_PROVIDERS_LIST = ["Anthropic", "Google Generative AI", "Groq", "OpenAI"]
|
|
|
|
|
|
class AgentComponent(ToolCallingAgentComponent):
|
|
display_name: str = "Agent"
|
|
description: str = "Define the agent's instructions, then enter a task to complete using tools."
|
|
documentation: str = "https://docs.langflow.org/agents"
|
|
icon = "bot"
|
|
beta = False
|
|
name = "Agent"
|
|
|
|
memory_inputs = [set_advanced_true(component_input) for component_input in MemoryComponent().inputs]
|
|
|
|
# Filter out json_mode from OpenAI inputs since we handle structured output differently
|
|
openai_inputs_filtered = [
|
|
input_field
|
|
for input_field in MODEL_PROVIDERS_DICT["OpenAI"]["inputs"]
|
|
if not (hasattr(input_field, "name") and input_field.name == "json_mode")
|
|
]
|
|
|
|
inputs = [
|
|
DropdownInput(
|
|
name="agent_llm",
|
|
display_name="Model Provider",
|
|
info="The provider of the language model that the agent will use to generate responses.",
|
|
options=[*MODEL_PROVIDERS_LIST, "Custom"],
|
|
value="OpenAI",
|
|
real_time_refresh=True,
|
|
input_types=[],
|
|
options_metadata=[MODELS_METADATA[key] for key in MODEL_PROVIDERS_LIST] + [{"icon": "brain"}],
|
|
),
|
|
*openai_inputs_filtered,
|
|
MultilineInput(
|
|
name="system_prompt",
|
|
display_name="Agent Instructions",
|
|
info="System Prompt: Initial instructions and context provided to guide the agent's behavior.",
|
|
value="You are a helpful assistant that can use tools to answer questions and perform tasks.",
|
|
advanced=False,
|
|
),
|
|
IntInput(
|
|
name="n_messages",
|
|
display_name="Number of Chat History Messages",
|
|
value=100,
|
|
info="Number of chat history messages to retrieve.",
|
|
advanced=True,
|
|
show=True,
|
|
),
|
|
*LCToolsAgentComponent._base_inputs,
|
|
# removed memory inputs from agent component
|
|
# *memory_inputs,
|
|
BoolInput(
|
|
name="add_current_date_tool",
|
|
display_name="Current Date",
|
|
advanced=True,
|
|
info="If true, will add a tool to the agent that returns the current date.",
|
|
value=True,
|
|
),
|
|
]
|
|
outputs = [
|
|
Output(name="response", display_name="Response", method="message_response"),
|
|
Output(name="structured_response", display_name="Structured Response", method="json_response", tool_mode=False),
|
|
]
|
|
|
|
async def message_response(self) -> Message:
|
|
try:
|
|
# Get LLM model and validate
|
|
llm_model, display_name = self.get_llm()
|
|
if llm_model is None:
|
|
msg = "No language model selected. Please choose a model to proceed."
|
|
raise ValueError(msg)
|
|
self.model_name = get_model_name(llm_model, display_name=display_name)
|
|
|
|
# Get memory data
|
|
self.chat_history = await self.get_memory_data()
|
|
if isinstance(self.chat_history, Message):
|
|
self.chat_history = [self.chat_history]
|
|
|
|
# Add current date tool if enabled
|
|
if self.add_current_date_tool:
|
|
if not isinstance(self.tools, list): # type: ignore[has-type]
|
|
self.tools = []
|
|
current_date_tool = (await CurrentDateComponent(**self.get_base_args()).to_toolkit()).pop(0)
|
|
if not isinstance(current_date_tool, StructuredTool):
|
|
msg = "CurrentDateComponent must be converted to a StructuredTool"
|
|
raise TypeError(msg)
|
|
self.tools.append(current_date_tool)
|
|
# note the tools are not required to run the agent, hence the validation removed.
|
|
|
|
# Set up and run agent
|
|
self.set(
|
|
llm=llm_model,
|
|
tools=self.tools or [],
|
|
chat_history=self.chat_history,
|
|
input_value=self.input_value,
|
|
system_prompt=self.system_prompt,
|
|
)
|
|
agent = self.create_agent_runnable()
|
|
result = await self.run_agent(agent)
|
|
|
|
# Store result for potential JSON output
|
|
self._agent_result = result
|
|
# return result
|
|
|
|
except (ValueError, TypeError, KeyError) as e:
|
|
logger.error(f"{type(e).__name__}: {e!s}")
|
|
raise
|
|
except ExceptionWithMessageError as e:
|
|
logger.error(f"ExceptionWithMessageError occurred: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error: {e!s}")
|
|
raise
|
|
else:
|
|
return result
|
|
|
|
async def json_response(self) -> Data:
|
|
"""Convert agent response to structured JSON Data output."""
|
|
# Run the regular message response first to get the result
|
|
if not hasattr(self, "_agent_result"):
|
|
await self.message_response()
|
|
|
|
result = self._agent_result
|
|
|
|
# Extract content from result
|
|
if hasattr(result, "content"):
|
|
content = result.content
|
|
elif hasattr(result, "text"):
|
|
content = result.text
|
|
else:
|
|
content = str(result)
|
|
|
|
# Try to parse as JSON
|
|
try:
|
|
json_data = json.loads(content)
|
|
return Data(data=json_data)
|
|
except json.JSONDecodeError:
|
|
# If it's not valid JSON, try to extract JSON from the content
|
|
json_match = re.search(r"\{.*\}", content, re.DOTALL)
|
|
if json_match:
|
|
try:
|
|
json_data = json.loads(json_match.group())
|
|
return Data(data=json_data)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# If we can't extract JSON, return the raw content as data
|
|
return Data(data={"content": content, "error": "Could not parse as JSON"})
|
|
|
|
async def get_memory_data(self):
|
|
# TODO: This is a temporary fix to avoid message duplication. We should develop a function for this.
|
|
messages = (
|
|
await MemoryComponent(**self.get_base_args())
|
|
.set(session_id=self.graph.session_id, order="Ascending", n_messages=self.n_messages)
|
|
.retrieve_messages()
|
|
)
|
|
return [
|
|
message for message in messages if getattr(message, "id", None) != getattr(self.input_value, "id", None)
|
|
]
|
|
|
|
def get_llm(self):
|
|
if not isinstance(self.agent_llm, str):
|
|
return self.agent_llm, None
|
|
|
|
try:
|
|
provider_info = MODEL_PROVIDERS_DICT.get(self.agent_llm)
|
|
if not provider_info:
|
|
msg = f"Invalid model provider: {self.agent_llm}"
|
|
raise ValueError(msg)
|
|
|
|
component_class = provider_info.get("component_class")
|
|
display_name = component_class.display_name
|
|
inputs = provider_info.get("inputs")
|
|
prefix = provider_info.get("prefix", "")
|
|
|
|
return self._build_llm_model(component_class, inputs, prefix), display_name
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error building {self.agent_llm} language model: {e!s}")
|
|
msg = f"Failed to initialize language model: {e!s}"
|
|
raise ValueError(msg) from e
|
|
|
|
def _build_llm_model(self, component, inputs, prefix=""):
|
|
model_kwargs = {}
|
|
for input_ in inputs:
|
|
if hasattr(self, f"{prefix}{input_.name}"):
|
|
model_kwargs[input_.name] = getattr(self, f"{prefix}{input_.name}")
|
|
return component.set(**model_kwargs).build_model()
|
|
|
|
def set_component_params(self, component):
|
|
provider_info = MODEL_PROVIDERS_DICT.get(self.agent_llm)
|
|
if provider_info:
|
|
inputs = provider_info.get("inputs")
|
|
prefix = provider_info.get("prefix")
|
|
# Filter out json_mode and only use attributes that exist on this component
|
|
model_kwargs = {}
|
|
for input_ in inputs:
|
|
if hasattr(self, f"{prefix}{input_.name}"):
|
|
model_kwargs[input_.name] = getattr(self, f"{prefix}{input_.name}")
|
|
|
|
return component.set(**model_kwargs)
|
|
return component
|
|
|
|
def delete_fields(self, build_config: dotdict, fields: dict | list[str]) -> None:
|
|
"""Delete specified fields from build_config."""
|
|
for field in fields:
|
|
build_config.pop(field, None)
|
|
|
|
def update_input_types(self, build_config: dotdict) -> dotdict:
|
|
"""Update input types for all fields in build_config."""
|
|
for key, value in build_config.items():
|
|
if isinstance(value, dict):
|
|
if value.get("input_types") is None:
|
|
build_config[key]["input_types"] = []
|
|
elif hasattr(value, "input_types") and value.input_types is None:
|
|
value.input_types = []
|
|
return build_config
|
|
|
|
async def update_build_config(
|
|
self, build_config: dotdict, field_value: str, field_name: str | None = None
|
|
) -> dotdict:
|
|
# Iterate over all providers in the MODEL_PROVIDERS_DICT
|
|
# Existing logic for updating build_config
|
|
if field_name in ("agent_llm",):
|
|
build_config["agent_llm"]["value"] = field_value
|
|
provider_info = MODEL_PROVIDERS_DICT.get(field_value)
|
|
if provider_info:
|
|
component_class = provider_info.get("component_class")
|
|
if component_class and hasattr(component_class, "update_build_config"):
|
|
# Call the component class's update_build_config method
|
|
build_config = await update_component_build_config(
|
|
component_class, build_config, field_value, "model_name"
|
|
)
|
|
|
|
provider_configs: dict[str, tuple[dict, list[dict]]] = {
|
|
provider: (
|
|
MODEL_PROVIDERS_DICT[provider]["fields"],
|
|
[
|
|
MODEL_PROVIDERS_DICT[other_provider]["fields"]
|
|
for other_provider in MODEL_PROVIDERS_DICT
|
|
if other_provider != provider
|
|
],
|
|
)
|
|
for provider in MODEL_PROVIDERS_DICT
|
|
}
|
|
if field_value in provider_configs:
|
|
fields_to_add, fields_to_delete = provider_configs[field_value]
|
|
|
|
# Delete fields from other providers
|
|
for fields in fields_to_delete:
|
|
self.delete_fields(build_config, fields)
|
|
|
|
# Add provider-specific fields
|
|
if field_value == "OpenAI" and not any(field in build_config for field in fields_to_add):
|
|
build_config.update(fields_to_add)
|
|
else:
|
|
build_config.update(fields_to_add)
|
|
# Reset input types for agent_llm
|
|
build_config["agent_llm"]["input_types"] = []
|
|
elif field_value == "Custom":
|
|
# Delete all provider fields
|
|
self.delete_fields(build_config, ALL_PROVIDER_FIELDS)
|
|
# Update with custom component
|
|
custom_component = DropdownInput(
|
|
name="agent_llm",
|
|
display_name="Language Model",
|
|
options=[*sorted(MODEL_PROVIDERS), "Custom"],
|
|
value="Custom",
|
|
real_time_refresh=True,
|
|
input_types=["LanguageModel"],
|
|
options_metadata=[MODELS_METADATA[key] for key in sorted(MODELS_METADATA.keys())]
|
|
+ [{"icon": "brain"}],
|
|
)
|
|
build_config.update({"agent_llm": custom_component.to_dict()})
|
|
# Update input types for all fields
|
|
build_config = self.update_input_types(build_config)
|
|
|
|
# Validate required keys
|
|
default_keys = [
|
|
"code",
|
|
"_type",
|
|
"agent_llm",
|
|
"tools",
|
|
"input_value",
|
|
"add_current_date_tool",
|
|
"system_prompt",
|
|
"agent_description",
|
|
"max_iterations",
|
|
"handle_parsing_errors",
|
|
"verbose",
|
|
]
|
|
missing_keys = [key for key in default_keys if key not in build_config]
|
|
if missing_keys:
|
|
msg = f"Missing required keys in build_config: {missing_keys}"
|
|
raise ValueError(msg)
|
|
if (
|
|
isinstance(self.agent_llm, str)
|
|
and self.agent_llm in MODEL_PROVIDERS_DICT
|
|
and field_name in MODEL_DYNAMIC_UPDATE_FIELDS
|
|
):
|
|
provider_info = MODEL_PROVIDERS_DICT.get(self.agent_llm)
|
|
if provider_info:
|
|
component_class = provider_info.get("component_class")
|
|
component_class = self.set_component_params(component_class)
|
|
prefix = provider_info.get("prefix")
|
|
if component_class and hasattr(component_class, "update_build_config"):
|
|
# Call each component class's update_build_config method
|
|
# remove the prefix from the field_name
|
|
if isinstance(field_name, str) and isinstance(prefix, str):
|
|
field_name = field_name.replace(prefix, "")
|
|
build_config = await update_component_build_config(
|
|
component_class, build_config, field_value, "model_name"
|
|
)
|
|
return dotdict({k: v.to_dict() if hasattr(v, "to_dict") else v for k, v in build_config.items()})
|
|
|
|
async def _get_tools(self) -> list[Tool]:
|
|
component_toolkit = _get_component_toolkit()
|
|
tools_names = self._build_tools_names()
|
|
agent_description = self.get_tool_description()
|
|
# TODO: Agent Description Depreciated Feature to be removed
|
|
description = f"{agent_description}{tools_names}"
|
|
tools = component_toolkit(component=self).get_tools(
|
|
tool_name="Call_Agent", tool_description=description, callbacks=self.get_langchain_callbacks()
|
|
)
|
|
if hasattr(self, "tools_metadata"):
|
|
tools = component_toolkit(component=self, metadata=self.tools_metadata).update_tools_metadata(tools=tools)
|
|
return tools
|