diff --git a/src/backend/base/langflow/base/agents/agent.py b/src/backend/base/langflow/base/agents/agent.py index e4e9a47bf..eeb48d60c 100644 --- a/src/backend/base/langflow/base/agents/agent.py +++ b/src/backend/base/langflow/base/agents/agent.py @@ -1,20 +1,26 @@ from abc import abstractmethod -from typing import List +from typing import List, Optional, Union, cast +from langchain.agents import AgentExecutor, BaseMultiActionAgent, BaseSingleActionAgent from langchain.agents.agent import RunnableAgent - -from langchain.agents import AgentExecutor +from langchain_core.messages import BaseMessage from langchain_core.runnables import Runnable +from langflow.base.agents.callback import AgentAsyncHandler +from langflow.base.agents.utils import data_to_messages from langflow.custom import Component -from langflow.inputs import BoolInput, IntInput, HandleInput -from langflow.inputs.inputs import InputTypes +from langflow.field_typing import Text, Tool +from langflow.inputs.inputs import DataInput, InputTypes +from langflow.io import BoolInput, HandleInput, IntInput, MessageTextInput +from langflow.schema import Data +from langflow.schema.message import Message from langflow.template import Output class LCAgentComponent(Component): trace_type = "agent" _base_inputs: List[InputTypes] = [ + MessageTextInput(name="input_value", display_name="Input"), BoolInput( name="handle_parsing_errors", display_name="Handle Parse Errors", @@ -37,8 +43,24 @@ class LCAgentComponent(Component): outputs = [ Output(display_name="Agent", name="agent", method="build_agent"), + Output(display_name="Response", name="response", method="message_response"), ] + async def message_response(self) -> Message: + agent = self.build_agent() + result = await self.run_agent( + agent=agent, + inputs=self.input_value, + tools=self.tools, + message_history=self.chat_history, + handle_parsing_errors=self.handle_parsing_errors, + ) + if isinstance(result, list): + result = "\n".join([result_dict["text"] for result_dict in result]) + message = Message(text=result, sender="Machine") + self.status = message + return message + def _validate_outputs(self): required_output_methods = ["build_agent"] output_names = [output.name for output in self.outputs] @@ -65,6 +87,33 @@ class LCAgentComponent(Component): } return {**base, "agent_executor_kwargs": agent_kwargs} + async def run_agent( + self, + agent: Union[Runnable, BaseSingleActionAgent, BaseMultiActionAgent, AgentExecutor], + inputs: str, + tools: List[Tool], + message_history: Optional[List[Data]] = None, + handle_parsing_errors: bool = True, + ) -> Text: + if isinstance(agent, AgentExecutor): + runnable = agent + else: + runnable = AgentExecutor.from_agent_and_tools( + agent=agent, # type: ignore + tools=tools, + verbose=True, + handle_parsing_errors=handle_parsing_errors, + ) + input_dict: dict[str, str | list[BaseMessage]] = {"input": inputs} + if message_history: + input_dict["chat_history"] = data_to_messages(message_history) + result = await runnable.ainvoke(input_dict, config={"callbacks": [AgentAsyncHandler(self.log)]}) + self.status = result + if "output" not in result: + raise ValueError("Output key not found in result. Tried 'output'.") + + return cast(str, result.get("output")) + class LCToolsAgentComponent(LCAgentComponent): _base_inputs = LCAgentComponent._base_inputs + [ @@ -74,7 +123,13 @@ class LCToolsAgentComponent(LCAgentComponent): input_types=["Tool", "BaseTool"], is_list=True, ), - HandleInput(name="llm", display_name="Language Model", input_types=["LanguageModel"], required=True), + HandleInput( + name="llm", + display_name="Language Model", + input_types=["LanguageModel", "ToolEnabledLanguageModel"], + required=True, + ), + DataInput(name="chat_history", display_name="Chat History", is_list=True), ] def build_agent(self) -> AgentExecutor: diff --git a/src/backend/base/langflow/base/agents/callback.py b/src/backend/base/langflow/base/agents/callback.py new file mode 100644 index 000000000..7f75e7745 --- /dev/null +++ b/src/backend/base/langflow/base/agents/callback.py @@ -0,0 +1,103 @@ +from typing import Any, Callable, Concatenate, Dict, List +from uuid import UUID + +from langchain.callbacks.base import AsyncCallbackHandler +from langchain_core.agents import AgentAction, AgentFinish + +from langflow.schema.log import LoggableType + + +class AgentAsyncHandler(AsyncCallbackHandler): + """Async callback handler that can be used to handle callbacks from langchain.""" + + def __init__(self, log_function: Callable[Concatenate[LoggableType | list[LoggableType], ...], None] | None = None): + self.log_function = log_function + + async def on_tool_start( + self, + serialized: Dict[str, Any], + input_str: str, + *, + run_id: UUID, + parent_run_id: UUID | None = None, + tags: List[str] | None = None, + metadata: Dict[str, Any] | None = None, + inputs: Dict[str, Any] | None = None, + **kwargs: Any, + ) -> None: + if self.log_function is None: + return + self.log_function( + { + "type": "tool_start", + "serialized": serialized, + "input_str": input_str, + "run_id": run_id, + "parent_run_id": parent_run_id, + "tags": tags, + "metadata": metadata, + "inputs": inputs, + **kwargs, + }, + name="Tool Start", + ) + + async def on_tool_end(self, output: Any, *, run_id: UUID, parent_run_id: UUID | None = None, **kwargs: Any) -> None: + if self.log_function is None: + return + self.log_function( + { + "type": "tool_end", + "output": output, + "run_id": run_id, + "parent_run_id": parent_run_id, + **kwargs, + }, + name="Tool End", + ) + + async def on_agent_action( + self, + action: AgentAction, + *, + run_id: UUID, + parent_run_id: UUID | None = None, + tags: List[str] | None = None, + **kwargs: Any, + ) -> None: + if self.log_function is None: + return + self.log_function( + { + "type": "agent_action", + "action": action, + "run_id": run_id, + "parent_run_id": parent_run_id, + "tags": tags, + **kwargs, + }, + name="Agent Action", + ) + + async def on_agent_finish( + self, + finish: AgentFinish, + *, + run_id: UUID, + parent_run_id: UUID | None = None, + tags: List[str] | None = None, + **kwargs: Any, + ) -> None: + if self.log_function is None: + return + self.log_function( + { + "type": "agent_finish", + "finish": finish, + "run_id": run_id, + "parent_run_id": parent_run_id, + "tags": tags, + **kwargs, + }, + name="Agent Finish", + ) diff --git a/src/backend/base/langflow/field_typing/constants.py b/src/backend/base/langflow/field_typing/constants.py index cff143fc5..a5857ee12 100644 --- a/src/backend/base/langflow/field_typing/constants.py +++ b/src/backend/base/langflow/field_typing/constants.py @@ -21,6 +21,7 @@ from langflow.schema.message import Message NestedDict: TypeAlias = Dict[str, Union[str, Dict]] LanguageModel = TypeVar("LanguageModel", BaseLanguageModel, BaseLLM, BaseChatModel) +ToolEnabledLanguageModel = TypeVar("ToolEnabledLanguageModel", BaseLanguageModel, BaseLLM, BaseChatModel) Retriever = TypeVar( "Retriever", BaseRetriever, diff --git a/src/backend/base/langflow/services/tracing/langsmith.py b/src/backend/base/langflow/services/tracing/langsmith.py index 8501477d0..029e23056 100644 --- a/src/backend/base/langflow/services/tracing/langsmith.py +++ b/src/backend/base/langflow/services/tracing/langsmith.py @@ -120,7 +120,8 @@ class LangSmithTracer(BaseTracer): raw_outputs = outputs processed_outputs = self._convert_to_langchain_types(outputs) if logs: - child.add_metadata(self._convert_to_langchain_types({"logs": {log.get("name"): log for log in logs}})) + logs_dicts = [log if isinstance(log, dict) else log.model_dump() for log in logs] + child.add_metadata(self._convert_to_langchain_types({"logs": {log.get("name"): log for log in logs_dicts}})) child.add_metadata(self._convert_to_langchain_types({"outputs": raw_outputs})) child.end(outputs=processed_outputs, error=self._error_to_string(error)) if error: diff --git a/src/backend/base/langflow/services/tracing/schema.py b/src/backend/base/langflow/services/tracing/schema.py index 7519c974f..fab6728de 100644 --- a/src/backend/base/langflow/services/tracing/schema.py +++ b/src/backend/base/langflow/services/tracing/schema.py @@ -1,9 +1,27 @@ -from typing_extensions import TypedDict +from pydantic import BaseModel, field_serializer +from pydantic.v1 import BaseModel as V1BaseModel from langflow.schema.log import LoggableType -class Log(TypedDict): +class Log(BaseModel): name: str message: LoggableType type: str + + @field_serializer("message") + def serialize_message(self, value): + # We need to make sure everything inside the message has been serialized + if isinstance(value, dict): + return {key: self.serialize_message(value[key]) for key in value} + if isinstance(value, list): + return [self.serialize_message(item) for item in value] + # To json is for LangChain Serializable objects + if hasattr(value, "dict") and isinstance(value, V1BaseModel): + # This is for Pydantic V1 models + return value.dict() + if hasattr(value, "to_json"): + return value.to_json() + if isinstance(value, BaseModel): + return value.model_dump(exclude_none=True) + return value