From d403ca7a6c972c6377e47657941287f6e4a2f229 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 4 Mar 2024 15:17:57 -0300 Subject: [PATCH] Add session_id and component_id to ChatOutputResponse schema --- src/backend/langflow/api/v1/schemas.py | 2 +- .../langflow/components/utilities/RunFlow.py | 58 +++++++++++++++++++ src/backend/langflow/graph/schema.py | 3 +- src/backend/langflow/graph/vertex/base.py | 29 ++++++++++ .../custom_component/custom_component.py | 2 +- src/backend/langflow/schema/schema.py | 10 ++-- src/backend/langflow/utils/schemas.py | 2 + .../src/CustomNodes/GenericNode/index.tsx | 49 +++++++++++++--- 8 files changed, 138 insertions(+), 17 deletions(-) create mode 100644 src/backend/langflow/components/utilities/RunFlow.py diff --git a/src/backend/langflow/api/v1/schemas.py b/src/backend/langflow/api/v1/schemas.py index 9968e839c..433d57a37 100644 --- a/src/backend/langflow/api/v1/schemas.py +++ b/src/backend/langflow/api/v1/schemas.py @@ -232,7 +232,7 @@ class VertexBuildResponse(BaseModel): inactivated_vertices: Optional[List[str]] = None next_vertices_ids: Optional[List[str]] = None valid: bool - params: Optional[str] + params: Optional[Any] = Field(default_factory=dict) """JSON string of the params.""" data: ResultDataResponse """Mapping of vertex ids to result dict containing the param name and result value.""" diff --git a/src/backend/langflow/components/utilities/RunFlow.py b/src/backend/langflow/components/utilities/RunFlow.py new file mode 100644 index 000000000..d0e49ac90 --- /dev/null +++ b/src/backend/langflow/components/utilities/RunFlow.py @@ -0,0 +1,58 @@ +from typing import List, Optional + +from langflow import CustomComponent +from langflow.field_typing import NestedDict, Text +from langflow.graph.schema import ResultData +from langflow.schema import Record + + +class RunFlowComponent(CustomComponent): + display_name = "Run Flow" + description = "A component to run a flow." + + def get_flow_names(self) -> List[str]: + flow_records = self.list_flows() + return [flow_record.data["name"] for flow_record in flow_records] + + def build_config(self): + return { + "input_value": { + "display_name": "Input Value", + "multiline": True, + }, + "flow_name": { + "display_name": "Flow Name", + "info": "The name of the flow to run.", + "options": self.get_flow_names, + }, + "tweaks": { + "display_name": "Tweaks", + "info": "Tweaks to apply to the flow.", + }, + } + + def build_records_from_result_data(self, result_data: ResultData) -> Record: + messages = result_data.messages + records = [] + for message in messages: + record = Record(text=message.get("text", ""), data={"result": result_data}) + records.append(record) + return records + + async def build( + self, input_value: Text, flow_name: str, tweaks: NestedDict + ) -> Record: + + results: List[Optional[ResultData]] = await self.run_flow( + input_value=input_value, flow_name=flow_name, tweaks=tweaks + ) + if isinstance(results, list): + records = [] + for result in results: + if result: + records.extend(self.build_records_from_result_data(result)) + else: + records = self.build_records_from_result_data(results) + + self.status = records + return records diff --git a/src/backend/langflow/graph/schema.py b/src/backend/langflow/graph/schema.py index f53a0833f..a9f06ac1e 100644 --- a/src/backend/langflow/graph/schema.py +++ b/src/backend/langflow/graph/schema.py @@ -4,12 +4,13 @@ from typing import Any, Optional from pydantic import BaseModel, Field, field_serializer from langflow.graph.utils import serialize_field -from langflow.utils.schemas import ContainsEnumMeta +from langflow.utils.schemas import ChatOutputResponse, ContainsEnumMeta class ResultData(BaseModel): results: Optional[Any] = Field(default_factory=dict) artifacts: Optional[Any] = Field(default_factory=dict) + messages: Optional[list[ChatOutputResponse]] = Field(default_factory=list) timedelta: Optional[float] = None duration: Optional[str] = None diff --git a/src/backend/langflow/graph/vertex/base.py b/src/backend/langflow/graph/vertex/base.py index 186030729..635b6f7f3 100644 --- a/src/backend/langflow/graph/vertex/base.py +++ b/src/backend/langflow/graph/vertex/base.py @@ -28,6 +28,7 @@ from langflow.interface.initialize import loading from langflow.interface.listing import lazy_load_dict from langflow.services.deps import get_storage_service from langflow.utils.constants import DIRECT_TYPES +from langflow.utils.schemas import ChatOutputResponse from langflow.utils.util import sync_to_async if TYPE_CHECKING: @@ -411,15 +412,43 @@ class Vertex: self._built = True + def extract_messages_from_artifacts(self, artifacts: Dict[str, Any]) -> List[str]: + """ + Extracts messages from the artifacts. + + Args: + artifacts (Dict[str, Any]): The artifacts to extract messages from. + + Returns: + List[str]: The extracted messages. + """ + messages = [] + for key, artifact in artifacts.items(): + if not isinstance(artifact, dict): + continue + if "message" in artifact: + chat_output_response = ChatOutputResponse( + message=artifact["message"], + sender=artifact.get("sender"), + sender_name=artifact.get("sender_name"), + session_id=artifact.get("session_id"), + component_id=self.id, + ) + messages.append(chat_output_response.model_dump(exclude_none=True)) + + return messages + def _finalize_build(self): result_dict = self.get_built_result() # We need to set the artifacts to pass information # to the frontend self.set_artifacts() artifacts = self.artifacts + messages = self.extract_messages_from_artifacts(artifacts) result_dict = ResultData( results=result_dict, artifacts=artifacts, + messages=messages, ) self.set_result(result_dict) diff --git a/src/backend/langflow/interface/custom/custom_component/custom_component.py b/src/backend/langflow/interface/custom/custom_component/custom_component.py index 1596533a2..ced2c7a78 100644 --- a/src/backend/langflow/interface/custom/custom_component/custom_component.py +++ b/src/backend/langflow/interface/custom/custom_component/custom_component.py @@ -132,7 +132,7 @@ class CustomComponent(Component): return yaml.dump(self.repr_value) if isinstance(self.repr_value, str): return self.repr_value - return str(self.repr_value) + return self.repr_value def build_config(self): return self.field_config diff --git a/src/backend/langflow/schema/schema.py b/src/backend/langflow/schema/schema.py index c99aaf127..639c9da96 100644 --- a/src/backend/langflow/schema/schema.py +++ b/src/backend/langflow/schema/schema.py @@ -1,4 +1,4 @@ -from typing import Any +from typing import Any, Optional from langchain_core.documents import Document from pydantic import BaseModel @@ -13,7 +13,7 @@ class Record(BaseModel): data (dict, optional): Additional data associated with the record. """ - text: str + text: Optional[str] = "" data: dict = {} @classmethod @@ -52,8 +52,6 @@ class Record(BaseModel): Returns the text of the record. Returns: - str: The text of the record. + str: The text and data of the record. """ - return self.text - - + return self.model_dump_json(indent=2) diff --git a/src/backend/langflow/utils/schemas.py b/src/backend/langflow/utils/schemas.py index 354cb5949..3e6f17a5a 100644 --- a/src/backend/langflow/utils/schemas.py +++ b/src/backend/langflow/utils/schemas.py @@ -11,7 +11,9 @@ class ChatOutputResponse(BaseModel): message: Union[str, List[Union[str, Dict]]] sender: Optional[str] = "Machine" sender_name: Optional[str] = "AI" + session_id: Optional[str] = None stream_url: Optional[str] = None + component_id: Optional[str] = None @classmethod def from_message( diff --git a/src/frontend/src/CustomNodes/GenericNode/index.tsx b/src/frontend/src/CustomNodes/GenericNode/index.tsx index a7126fff7..72bedfcb3 100644 --- a/src/frontend/src/CustomNodes/GenericNode/index.tsx +++ b/src/frontend/src/CustomNodes/GenericNode/index.tsx @@ -9,6 +9,7 @@ import Loading from "../../components/ui/loading"; import { Textarea } from "../../components/ui/textarea"; import Xmark from "../../components/ui/xmark"; import { + RUN_TIMESTAMP_PREFIX, STATUS_BUILD, STATUS_BUILDING, priorityFields, @@ -59,6 +60,8 @@ export default function GenericNode({ useState(null); const [handles, setHandles] = useState(0); + const [validationString, setValidationString] = useState(""); + const takeSnapshot = useFlowsManagerStore((state) => state.takeSnapshot); function countHandles(): void { @@ -131,6 +134,18 @@ export default function GenericNode({ } }, [flowPool[data.id], data.id]); + useEffect(() => { + if (validationStatus?.params) { + // if it is not a string turn it into a string + let newValidationString = validationStatus.params; + if (typeof newValidationString !== "string") { + newValidationString = JSON.stringify(validationStatus.params); + } + + setValidationString(newValidationString); + } + }, [validationStatus, validationStatus?.params]); + const showNode = data.showNode ?? true; const nameEditable = true; @@ -493,14 +508,32 @@ export default function GenericNode({ ) : !validationStatus ? ( {STATUS_BUILD} ) : ( -
- {typeof validationStatus.params === "string" - ? `${durationString}\n${validationStatus.params}` - .split("\n") - .map((line, index) => ( -
{line}
- )) - : durationString} +
+
+ {lastRunTime && ( +
+
{RUN_TIMESTAMP_PREFIX}
+
+ {lastRunTime} +
+
+ )} +
+
+
Duration:
+
+ {validationStatus?.data.duration} +
+
+
+ + Output + +
+ {validationString.split("\n").map((line, index) => ( +
{line}
+ ))} +
) }