fix: flow_runner better run_id and support stream (#7892)

Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
This commit is contained in:
Bar Nuri 2025-05-06 01:16:07 +03:00 committed by GitHub
commit 21f9182a82
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 14 additions and 4 deletions

View file

@ -641,7 +641,7 @@ class Graph:
raise ValueError(msg)
return self._run_id
def set_run_id(self, run_id: uuid.UUID | None = None) -> None:
def set_run_id(self, run_id: uuid.UUID | str | None = None) -> None:
"""Sets the ID of the current run.
Args:

View file

@ -71,6 +71,7 @@ async def run_graph(
session_id: str | None = None,
fallback_to_env_vars: bool = False,
output_component: str | None = None,
stream: bool = False,
) -> list[RunOutputs]:
"""Runs the given Langflow Graph with the specified input and returns the outputs.
@ -83,6 +84,7 @@ async def run_graph(
fallback_to_env_vars (bool, optional): Whether to fallback to environment variables.
Defaults to False.
output_component (Optional[str], optional): The specific output component to retrieve. Defaults to None.
stream (bool, optional): Whether to stream the results or not. Defaults to False.
Returns:
List[RunOutputs]: A list of RunOutputs objects representing the outputs of the graph.
@ -113,7 +115,7 @@ async def run_graph(
inputs_components=components,
types=types,
outputs=outputs or [],
stream=False,
stream=stream,
session_id=session_id,
fallback_to_env_vars=fallback_to_env_vars,
)

View file

@ -43,8 +43,10 @@ class LangflowRunnerExperimental:
session_id: str, # UUID required currently
flow: Path | str | dict,
input_value: str,
*,
input_type: str = "chat",
output_type: str = "chat",
stream: bool = False,
):
logger.info(f"Start Handling {session_id=}")
await self.init_db_if_needed()
@ -56,7 +58,7 @@ class LangflowRunnerExperimental:
await self.add_flow_to_db(session_id, flow_dict)
graph = await self.create_graph_from_flow(session_id, flow_dict)
try:
result = await self.run_graph(input_value, input_type, output_type, session_id, graph)
result = await self.run_graph(input_value, input_type, output_type, session_id, graph, stream=stream)
finally:
await self.clear_flow_state(session_id, flow_dict)
logger.info(f"Finish Handling {session_id=}")
@ -74,7 +76,9 @@ class LangflowRunnerExperimental:
await session.commit()
@staticmethod
async def run_graph(input_value: str, input_type: str, output_type: str, session_id: str, graph: Graph):
async def run_graph(
input_value: str, input_type: str, output_type: str, session_id: str, graph: Graph, *, stream: bool
):
return await run_graph(
graph=graph,
session_id=session_id,
@ -82,13 +86,17 @@ class LangflowRunnerExperimental:
fallback_to_env_vars=True,
input_type=input_type,
output_type=output_type,
stream=stream,
)
@staticmethod
async def create_graph_from_flow(session_id: str, flow_dict: dict):
graph = await aload_flow_from_json(flow=flow_dict, disable_logs=False)
graph.flow_id = flow_dict["id"]
graph.flow_name = flow_dict.get("name")
graph.session_id = session_id
graph.set_run_id(session_id)
await graph.initialize_run()
return graph
@staticmethod