diff --git a/src/backend/base/langflow/api/v1/chat.py b/src/backend/base/langflow/api/v1/chat.py index 9ee4edb03..72575e99d 100644 --- a/src/backend/base/langflow/api/v1/chat.py +++ b/src/backend/base/langflow/api/v1/chat.py @@ -16,6 +16,7 @@ from langflow.api.utils import ( ) from langflow.api.v1.schemas import ( InputValueRequest, + Log, ResultDataResponse, StreamData, VertexBuildResponse, @@ -138,6 +139,7 @@ async def build_vertex( start_time = time.perf_counter() next_runnable_vertices = [] top_level_vertices = [] + messages = [] try: start_time = time.perf_counter() cache = await chat_service.get_cache(flow_id) @@ -157,9 +159,9 @@ async def build_vertex( next_runnable_vertices, top_level_vertices, result_dict, - params, + log_message, valid, - artifacts, + _, vertex, ) = await graph.build_vertex( lock=lock, @@ -172,14 +174,17 @@ async def build_vertex( except Exception as exc: logger.exception(f"Error building vertex: {exc}") - params = format_exception_message(exc) + log_message = format_exception_message(exc) valid = False result_data_response = ResultDataResponse(results={}) - artifacts = {} + # If there's an error building the vertex # we need to clear the cache await chat_service.clear_cache(flow_id) + log_object = Log(message=log_message) + result_data_response.logs.append(log_object) + # Log the vertex build if not vertex.will_stream: background_tasks.add_task( @@ -187,21 +192,21 @@ async def build_vertex( flow_id=flow_id, vertex_id=vertex_id, valid=valid, - params=params, + logs=result_data_response.logs, data=result_data_response, - artifacts=artifacts, ) timedelta = time.perf_counter() - start_time duration = format_elapsed_time(timedelta) result_data_response.duration = duration result_data_response.timedelta = timedelta - vertex.add_build_time(timedelta) - inactivated_vertices = None - inactivated_vertices = list(graph.inactivated_vertices) - graph.reset_inactivated_vertices() - graph.reset_activated_vertices() - await chat_service.set_cache(flow_id, graph) + async with chat_service._cache_locks[flow_id] as lock: + vertex.add_build_time(timedelta) + inactivated_vertices = None + inactivated_vertices = list(graph.inactivated_vertices) + graph.reset_inactivated_vertices() + graph.reset_activated_vertices() + await chat_service.set_cache(flow_id=flow_id, data=graph, lock=lock) # graph.stop_vertex tells us if the user asked # to stop the build of the graph at a certain vertex @@ -215,7 +220,6 @@ async def build_vertex( next_vertices_ids=next_runnable_vertices, top_level_vertices=top_level_vertices, valid=valid, - params=params, id=vertex.id, data=result_data_response, ) diff --git a/src/backend/base/langflow/api/v1/schemas.py b/src/backend/base/langflow/api/v1/schemas.py index f5d0b97d1..1f70519cf 100644 --- a/src/backend/base/langflow/api/v1/schemas.py +++ b/src/backend/base/langflow/api/v1/schemas.py @@ -1,7 +1,9 @@ -from datetime import datetime +from datetime import datetime, timezone from enum import Enum from pathlib import Path from typing import Any, Dict, List, Optional, Union +from langflow.utils.schemas import ChatOutputResponse +from typing_extensions import TypedDict from uuid import UUID from pydantic import BaseModel, ConfigDict, Field, field_validator, model_serializer @@ -233,9 +235,14 @@ class VerticesOrderResponse(BaseModel): vertices_to_run: List[str] +class Log(TypedDict): + message: str + + class ResultDataResponse(BaseModel): results: Optional[Any] = Field(default_factory=dict) - artifacts: Optional[Any] = Field(default_factory=dict) + logs: List[Log | None] = Field(default_factory=list) + messages: List[ChatOutputResponse | None] = Field(default_factory=list) timedelta: Optional[float] = None duration: Optional[str] = None @@ -246,11 +253,9 @@ class VertexBuildResponse(BaseModel): next_vertices_ids: Optional[List[str]] = None top_level_vertices: Optional[List[str]] = None valid: bool - params: Optional[Any] = Field(default_factory=dict) - """JSON string of the params.""" data: ResultDataResponse """Mapping of vertex ids to result dict containing the param name and result value.""" - timestamp: Optional[datetime] = Field(default_factory=datetime.utcnow) + timestamp: Optional[datetime] = Field(default_factory=lambda: datetime.now(timezone.utc)) """Timestamp of the build.""" diff --git a/src/backend/base/langflow/graph/vertex/types.py b/src/backend/base/langflow/graph/vertex/types.py index d7a98df8a..cbebb14ea 100644 --- a/src/backend/base/langflow/graph/vertex/types.py +++ b/src/backend/base/langflow/graph/vertex/types.py @@ -1,6 +1,7 @@ import ast import json from typing import AsyncIterator, Callable, Dict, Iterator, List, Optional, Union + import yaml from langchain_core.messages import AIMessage from loguru import logger @@ -422,9 +423,9 @@ class ChatVertex(Vertex): flow_id=self.graph.flow_id, vertex_id=self.id, valid=True, - params=self._built_object_repr(), + logs=self._built_object_repr(), data=self.result, - artifacts=self.artifacts, + messages=self.artifacts, ) self._validate_built_object() diff --git a/src/backend/base/langflow/services/monitor/schema.py b/src/backend/base/langflow/services/monitor/schema.py index 80636a554..fefa44bcd 100644 --- a/src/backend/base/langflow/services/monitor/schema.py +++ b/src/backend/base/langflow/services/monitor/schema.py @@ -36,13 +36,13 @@ class MessageModel(BaseModel): sender_name: str session_id: str message: str - artifacts: dict + files: list[str] = [] class Config: from_attributes = True populate_by_name = True - @validator("artifacts", pre=True) + @validator("files", pre=True) def validate_target_args(cls, v): if isinstance(v, str): return json.loads(v) @@ -58,7 +58,7 @@ class MessageModel(BaseModel): sender_name=record.sender_name, message=record.text, session_id=record.session_id, - artifacts=record.artifacts or {}, + files=record.files or [], ) @@ -67,16 +67,15 @@ class VertexBuildModel(BaseModel): id: Optional[str] = Field(default=None, alias="id") flow_id: str valid: bool - params: Any + logs: Any data: dict - artifacts: dict timestamp: datetime = Field(default_factory=datetime.now) class Config: from_attributes = True populate_by_name = True - @field_serializer("data", "artifacts") + @field_serializer("data") def serialize_dict(v): if isinstance(v, dict): # check if the value of each key is a BaseModel or a list of BaseModels @@ -88,7 +87,7 @@ class VertexBuildModel(BaseModel): return json.dumps(v) return v - @validator("params", pre=True) + @validator("logs", pre=True) def validate_params(cls, v): if isinstance(v, str): try: @@ -97,7 +96,7 @@ class VertexBuildModel(BaseModel): return v return v - @field_serializer("params") + @field_serializer("logs") def serialize_params(v): if isinstance(v, list) and all(isinstance(i, BaseModel) for i in v): return json.dumps([i.model_dump() for i in v]) @@ -109,15 +108,11 @@ class VertexBuildModel(BaseModel): return json.loads(v) return v - @validator("artifacts", pre=True) - def validate_artifacts(cls, v): - if isinstance(v, str): - return json.loads(v) - return v - class VertexBuildResponseModel(VertexBuildModel): - @field_serializer("data", "artifacts") + messages: list[MessageModel] = [] + + @field_serializer("data") def serialize_dict(v): return v diff --git a/src/backend/base/langflow/services/monitor/service.py b/src/backend/base/langflow/services/monitor/service.py index e3d56db52..3bcaefe89 100644 --- a/src/backend/base/langflow/services/monitor/service.py +++ b/src/backend/base/langflow/services/monitor/service.py @@ -69,7 +69,7 @@ class MonitorService(Service): valid: Optional[bool] = None, order_by: Optional[str] = "timestamp", ): - query = "SELECT id, flow_id, valid, params, data, artifacts, timestamp FROM vertex_builds" + query = "SELECT id, flow_id, valid, logs, data, timestamp FROM vertex_builds" conditions = [] if flow_id: conditions.append(f"flow_id = '{flow_id}'") diff --git a/src/backend/base/langflow/services/monitor/utils.py b/src/backend/base/langflow/services/monitor/utils.py index a71653e82..b746994b3 100644 --- a/src/backend/base/langflow/services/monitor/utils.py +++ b/src/backend/base/langflow/services/monitor/utils.py @@ -138,9 +138,9 @@ async def log_vertex_build( flow_id: str, vertex_id: str, valid: bool, - params: Any, + logs: Any, data: "ResultDataResponse", - artifacts: Optional[dict] = None, + messages: Optional[dict] = None, ): try: monitor_service = get_monitor_service() @@ -149,9 +149,9 @@ async def log_vertex_build( "flow_id": flow_id, "id": vertex_id, "valid": valid, - "params": params, + "logs": logs, "data": data.model_dump(), - "artifacts": artifacts or {}, + "messages": messages or {}, "timestamp": monitor_service.get_timestamp(), } monitor_service.add_row(table_name="vertex_builds", data=row) diff --git a/src/backend/base/langflow/services/socket/utils.py b/src/backend/base/langflow/services/socket/utils.py index c1f012e18..bed00e28f 100644 --- a/src/backend/base/langflow/services/socket/utils.py +++ b/src/backend/base/langflow/services/socket/utils.py @@ -90,9 +90,9 @@ async def build_vertex( flow_id=flow_id, vertex_id=vertex_id, valid=valid, - params=params, + logs=params, data=result_dict, - artifacts=artifacts, + messages=artifacts, ) # Emit the vertex build response diff --git a/src/backend/base/langflow/utils/schemas.py b/src/backend/base/langflow/utils/schemas.py index 3e6f17a5a..7d9535fff 100644 --- a/src/backend/base/langflow/utils/schemas.py +++ b/src/backend/base/langflow/utils/schemas.py @@ -14,6 +14,7 @@ class ChatOutputResponse(BaseModel): session_id: Optional[str] = None stream_url: Optional[str] = None component_id: Optional[str] = None + files: List[str] = [] @classmethod def from_message( diff --git a/src/frontend/src/CustomNodes/GenericNode/index.tsx b/src/frontend/src/CustomNodes/GenericNode/index.tsx index 29673fdb8..ed1281fee 100644 --- a/src/frontend/src/CustomNodes/GenericNode/index.tsx +++ b/src/frontend/src/CustomNodes/GenericNode/index.tsx @@ -204,16 +204,21 @@ export default function GenericNode({ }, [flowPool[data.id], data.id]); useEffect(() => { - if (validationStatus?.params) { + if (validationStatus?.logs) { // if it is not a string turn it into a string - let newValidationString = validationStatus.params; + let newValidationString = ""; + if (validationStatus?.logs && Array.isArray(validationStatus.logs)) { + newValidationString = validationStatus.logs + .map((log) => log.message) + .join("\n"); + } if (typeof newValidationString !== "string") { - newValidationString = JSON.stringify(validationStatus.params); + newValidationString = JSON.stringify(validationStatus.logs); } setValidationString(newValidationString); } - }, [validationStatus, validationStatus?.params]); + }, [validationStatus, validationStatus?.logs]); const [showNode, setShowNode] = useState(data.showNode ?? true); diff --git a/src/frontend/src/modals/IOModal/components/chatView/index.tsx b/src/frontend/src/modals/IOModal/components/chatView/index.tsx index 7c3614b0a..0a7a4532e 100644 --- a/src/frontend/src/modals/IOModal/components/chatView/index.tsx +++ b/src/frontend/src/modals/IOModal/components/chatView/index.tsx @@ -64,11 +64,11 @@ export default function ChatView({ const chatMessages: ChatMessageType[] = chatOutputResponses .sort((a, b) => Date.parse(a.timestamp) - Date.parse(b.timestamp)) // - .filter((output) => output.data.artifacts?.message !== null) + .filter((output) => output.messages && output.messages.length > 0) .map((output, index) => { try { - const { sender, message, sender_name, stream_url } = output.data - .artifacts as ChatOutputType; + const { sender, message, sender_name, stream_url } = output + .messages[0] as ChatOutputType; const is_ai = sender === "Machine" || sender === null; return { @@ -202,7 +202,9 @@ export default function ChatView({ chatValue={chatValue} noInput={!inputTypes.includes("ChatInput")} lockChat={lockChat} - sendMessage={({repeat,files}) => sendMessage({repeat,files})} + sendMessage={({ repeat, files }) => + sendMessage({ repeat, files }) + } setChatValue={(value) => { setChatValue(value); }} diff --git a/src/frontend/src/types/api/index.ts b/src/frontend/src/types/api/index.ts index faef230a5..cb871895d 100644 --- a/src/frontend/src/types/api/index.ts +++ b/src/frontend/src/types/api/index.ts @@ -1,4 +1,6 @@ import { Edge, Node, Viewport } from "reactflow"; +import { ChatOutputType, chatInputType } from "../chat"; +import { Log } from "../components"; import { FlowType } from "../flow"; //kind and class are just representative names to represent the actual structure of the object received by the API export type APIDataType = { [key: string]: APIKindType }; @@ -150,7 +152,8 @@ export type VertexBuildTypeAPI = { top_level_vertices: Array; run_id: string; valid: boolean; - params: string; + logs: Log[]; + messages: ChatOutputType[] | chatInputType[]; data: VertexDataTypeAPI; timestamp: string; }; @@ -160,6 +163,7 @@ export type VertexBuildTypeAPI = { export type VertexDataTypeAPI = { results: { [key: string]: { [key: string]: string } }; artifacts: { [key: string]: string }; + messages: ChatOutputType[] | chatInputType[]; timedelta?: number; duration?: string; }; diff --git a/src/frontend/src/types/chat/index.ts b/src/frontend/src/types/chat/index.ts index e24c6e891..5b462234d 100644 --- a/src/frontend/src/types/chat/index.ts +++ b/src/frontend/src/types/chat/index.ts @@ -28,7 +28,8 @@ export type chatInputType = { export type FlowPoolObjectType = { timestamp: string; valid: boolean; - params: any; + // list of chat outputs or list of chat inputs + messages: Array | []; data: { artifacts: any; results: any | ChatOutputType | chatInputType }; id: string; }; diff --git a/src/frontend/src/types/components/index.ts b/src/frontend/src/types/components/index.ts index ef4c9ccf9..23aba789f 100644 --- a/src/frontend/src/types/components/index.ts +++ b/src/frontend/src/types/components/index.ts @@ -187,7 +187,13 @@ export type FloatComponentType = { id?: string; }; -export type FilePreviewType={loading:boolean,file:File,error:boolean,id:string,path?:string} +export type FilePreviewType = { + loading: boolean; + file: File; + error: boolean; + id: string; + path?: string; +}; export type TooltipComponentType = { children: ReactElement; @@ -475,7 +481,13 @@ export type chatInputType = { }; lockChat: boolean; noInput: boolean; - sendMessage: ({repeat,files}:{repeat:number,files?:string[]}) => void; + sendMessage: ({ + repeat, + files, + }: { + repeat: number; + files?: string[]; + }) => void; setChatValue: (value: string) => void; }; @@ -673,10 +685,14 @@ export type crashComponentPropsType = { resetErrorBoundary: (args) => void; }; +export type Log = { + message: string; +}; + export type validationStatusType = { id: string; data: object | any; - params: string; + logs: Log[]; progress?: number; valid: boolean; duration?: string; @@ -716,7 +732,13 @@ export type IOFieldViewProps = { export type UndrawCardComponentProps = { flow: FlowType }; export type chatViewProps = { - sendMessage: ({repeat,files}:{repeat:number,files?:string[]}) => void; + sendMessage: ({ + repeat, + files, + }: { + repeat: number; + files?: string[]; + }) => void; chatValue: string; setChatValue: (value: string) => void; lockChat: boolean; diff --git a/src/frontend/src/types/zustand/flow/index.ts b/src/frontend/src/types/zustand/flow/index.ts index adc982696..b6c099089 100644 --- a/src/frontend/src/types/zustand/flow/index.ts +++ b/src/frontend/src/types/zustand/flow/index.ts @@ -12,7 +12,7 @@ import { FlowState } from "../../tabs"; export type chatInputType = { result: string; - files?:string[]; + files?: string[]; }; export type ChatOutputType = { @@ -20,13 +20,13 @@ export type ChatOutputType = { sender: string; sender_name: string; stream_url?: string; - files?:string[]; + files?: string[]; }; export type FlowPoolObjectType = { timestamp: string; valid: boolean; - params: any; + messages: Array | []; data: { artifacts: any | ChatOutputType | chatInputType; results: any | ChatOutputType | chatInputType; @@ -44,7 +44,7 @@ export type FlowPoolObjectTypeNew = { timestamp: string; valid: boolean; data: { - logs?:any | ChatOutputType | chatInputType; + logs?: any | ChatOutputType | chatInputType; results: any | ChatOutputType | chatInputType; }; duration?: string; @@ -123,7 +123,7 @@ export type FlowStoreType = { startNodeId?: string; stopNodeId?: string; input_value?: string; - files?:string[]; + files?: string[]; }) => Promise; getFlow: () => { nodes: Node[]; edges: Edge[]; viewport: Viewport }; updateVerticesBuild: (