From 21f9182a82cb18fb3a2f465ee38cb532f912c39c Mon Sep 17 00:00:00 2001 From: Bar Nuri Date: Tue, 6 May 2025 01:16:07 +0300 Subject: [PATCH] fix: flow_runner better run_id and support stream (#7892) Co-authored-by: Gabriel Luiz Freitas Almeida --- src/backend/base/langflow/graph/graph/base.py | 2 +- src/backend/base/langflow/processing/process.py | 4 +++- .../base/langflow/services/flow/flow_runner.py | 12 ++++++++++-- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index 4498143a9..33d33ab8a 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -641,7 +641,7 @@ class Graph: raise ValueError(msg) return self._run_id - def set_run_id(self, run_id: uuid.UUID | None = None) -> None: + def set_run_id(self, run_id: uuid.UUID | str | None = None) -> None: """Sets the ID of the current run. Args: diff --git a/src/backend/base/langflow/processing/process.py b/src/backend/base/langflow/processing/process.py index 3cc4bc72d..f93094528 100644 --- a/src/backend/base/langflow/processing/process.py +++ b/src/backend/base/langflow/processing/process.py @@ -71,6 +71,7 @@ async def run_graph( session_id: str | None = None, fallback_to_env_vars: bool = False, output_component: str | None = None, + stream: bool = False, ) -> list[RunOutputs]: """Runs the given Langflow Graph with the specified input and returns the outputs. @@ -83,6 +84,7 @@ async def run_graph( fallback_to_env_vars (bool, optional): Whether to fallback to environment variables. Defaults to False. output_component (Optional[str], optional): The specific output component to retrieve. Defaults to None. + stream (bool, optional): Whether to stream the results or not. Defaults to False. Returns: List[RunOutputs]: A list of RunOutputs objects representing the outputs of the graph. @@ -113,7 +115,7 @@ async def run_graph( inputs_components=components, types=types, outputs=outputs or [], - stream=False, + stream=stream, session_id=session_id, fallback_to_env_vars=fallback_to_env_vars, ) diff --git a/src/backend/base/langflow/services/flow/flow_runner.py b/src/backend/base/langflow/services/flow/flow_runner.py index a578a00cf..2e84fd0b9 100644 --- a/src/backend/base/langflow/services/flow/flow_runner.py +++ b/src/backend/base/langflow/services/flow/flow_runner.py @@ -43,8 +43,10 @@ class LangflowRunnerExperimental: session_id: str, # UUID required currently flow: Path | str | dict, input_value: str, + *, input_type: str = "chat", output_type: str = "chat", + stream: bool = False, ): logger.info(f"Start Handling {session_id=}") await self.init_db_if_needed() @@ -56,7 +58,7 @@ class LangflowRunnerExperimental: await self.add_flow_to_db(session_id, flow_dict) graph = await self.create_graph_from_flow(session_id, flow_dict) try: - result = await self.run_graph(input_value, input_type, output_type, session_id, graph) + result = await self.run_graph(input_value, input_type, output_type, session_id, graph, stream=stream) finally: await self.clear_flow_state(session_id, flow_dict) logger.info(f"Finish Handling {session_id=}") @@ -74,7 +76,9 @@ class LangflowRunnerExperimental: await session.commit() @staticmethod - async def run_graph(input_value: str, input_type: str, output_type: str, session_id: str, graph: Graph): + async def run_graph( + input_value: str, input_type: str, output_type: str, session_id: str, graph: Graph, *, stream: bool + ): return await run_graph( graph=graph, session_id=session_id, @@ -82,13 +86,17 @@ class LangflowRunnerExperimental: fallback_to_env_vars=True, input_type=input_type, output_type=output_type, + stream=stream, ) @staticmethod async def create_graph_from_flow(session_id: str, flow_dict: dict): graph = await aload_flow_from_json(flow=flow_dict, disable_logs=False) graph.flow_id = flow_dict["id"] + graph.flow_name = flow_dict.get("name") graph.session_id = session_id + graph.set_run_id(session_id) + await graph.initialize_run() return graph @staticmethod