feat: Implement streaming support and EventManager integration in flow execution (#5460)

* refactor: add create_stream_tokens_event_manager for handling streaming events

* feat: integrate EventManager into run_graph_internal for enhanced event handling

- Added EventManager import and parameter to run_graph_internal function.
- Updated function call to include event_manager for improved event management during graph execution.

* feat: enhance Graph class with event_manager parameter

- Added event_manager parameter to multiple methods in the Graph class to facilitate better event management during graph execution.
- Updated process and run methods to include event_manager, ensuring it is passed through to relevant function calls.
- Improved documentation for methods to reflect the new event_manager parameter.

* feat: implement streaming support in flow execution with EventManager integration

- Added support for streaming responses in the simplified_run_flow endpoint, allowing real-time event handling during flow execution.
- Introduced consume_and_yield and run_flow_generator functions to manage event consumption and client communication.
- Integrated EventManager for enhanced event tracking, including success and error notifications.
- Updated endpoint documentation to reflect new streaming capabilities and parameters.
- Improved error handling and logging for better debugging and client disconnection management.

* refactor: remove request logging from simplified_run_flow endpoint

- Removed the logging of the request object in the simplified_run_flow function to streamline logging and reduce verbosity.
- This change enhances the clarity of logs by focusing on essential information during flow execution.
This commit is contained in:
Gabriel Luiz Freitas Almeida 2025-01-03 11:21:14 -03:00 committed by GitHub
commit 3e23800199
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 171 additions and 61 deletions

View file

@ -2,6 +2,7 @@ from __future__ import annotations
import asyncio
import time
from collections.abc import AsyncGenerator
from http import HTTPStatus
from typing import TYPE_CHECKING, Annotated
from uuid import UUID
@ -9,6 +10,7 @@ from uuid import UUID
import sqlalchemy as sa
from fastapi import APIRouter, BackgroundTasks, Body, Depends, HTTPException, Request, UploadFile, status
from fastapi.encoders import jsonable_encoder
from fastapi.responses import StreamingResponse
from loguru import logger
from sqlmodel import select
@ -26,6 +28,7 @@ from langflow.api.v1.schemas import (
)
from langflow.custom.custom_component.component import Component
from langflow.custom.utils import build_custom_component_template, get_instance_name, update_component_build_config
from langflow.events.event_manager import create_stream_tokens_event_manager
from langflow.exceptions.api import APIException, InvalidChatInputError
from langflow.exceptions.serialization import SerializationError
from langflow.graph.graph.base import Graph
@ -47,6 +50,7 @@ from langflow.services.telemetry.schema import RunPayload
from langflow.utils.version import get_version_info
if TYPE_CHECKING:
from langflow.services.event_manager import EventManager
from langflow.services.settings.service import SettingsService
router = APIRouter(tags=["Base"])
@ -92,6 +96,7 @@ async def simple_run_flow(
*,
stream: bool = False,
api_key_user: User | None = None,
event_manager: EventManager | None = None,
):
if input_request.input_value is not None and input_request.tweaks is not None:
validate_input_and_tweaks(input_request)
@ -131,6 +136,7 @@ async def simple_run_flow(
inputs=inputs,
outputs=outputs,
stream=stream,
event_manager=event_manager,
)
return RunResponse(outputs=task_result, session_id=session_id)
@ -145,6 +151,7 @@ async def simple_run_flow_task(
*,
stream: bool = False,
api_key_user: User | None = None,
event_manager: EventManager | None = None,
):
"""Run a flow task as a BackgroundTask, therefore it should not throw exceptions."""
try:
@ -153,12 +160,96 @@ async def simple_run_flow_task(
input_request=input_request,
stream=stream,
api_key_user=api_key_user,
event_manager=event_manager,
)
except Exception: # noqa: BLE001
logger.exception(f"Error running flow {flow.id} task")
async def consume_and_yield(queue: asyncio.Queue, client_consumed_queue: asyncio.Queue) -> AsyncGenerator:
"""Consumes events from a queue and yields them to the client while tracking timing metrics.
This coroutine continuously pulls events from the input queue and yields them to the client.
It tracks timing metrics for how long events spend in the queue and how long the client takes
to process them.
Args:
queue (asyncio.Queue): The queue containing events to be consumed and yielded
client_consumed_queue (asyncio.Queue): A queue for tracking when the client has consumed events
Yields:
The value from each event in the queue
Notes:
- Events are tuples of (event_id, value, put_time)
- Breaks the loop when receiving a None value, signaling completion
- Tracks and logs timing metrics for queue time and client processing time
- Notifies client consumption via client_consumed_queue
"""
while True:
event_id, value, put_time = await queue.get()
if value is None:
break
get_time = time.time()
yield value
get_time_yield = time.time()
client_consumed_queue.put_nowait(event_id)
logger.debug(
f"consumed event {event_id} "
f"(time in queue, {get_time - put_time:.4f}, "
f"client {get_time_yield - get_time:.4f})"
)
async def run_flow_generator(
flow: Flow,
input_request: SimplifiedAPIRequest,
api_key_user: User | None,
event_manager: EventManager,
client_consumed_queue: asyncio.Queue,
) -> None:
"""Executes a flow asynchronously and manages event streaming to the client.
This coroutine runs a flow with streaming enabled and handles the event lifecycle,
including success completion and error scenarios.
Args:
flow (Flow): The flow to execute
input_request (SimplifiedAPIRequest): The input parameters for the flow
api_key_user (User | None): Optional authenticated user running the flow
event_manager (EventManager): Manages the streaming of events to the client
client_consumed_queue (asyncio.Queue): Tracks client consumption of events
Events Generated:
- "add_message": Sent when new messages are added during flow execution
- "token": Sent for each token generated during streaming
- "end": Sent when flow execution completes, includes final result
- "error": Sent if an error occurs during execution
Notes:
- Runs the flow with streaming enabled via simple_run_flow()
- On success, sends the final result via event_manager.on_end()
- On error, logs the error and sends it via event_manager.on_error()
- Always sends a final None event to signal completion
"""
try:
result = await simple_run_flow(
flow=flow,
input_request=input_request,
stream=True,
api_key_user=api_key_user,
event_manager=event_manager,
)
event_manager.on_end(data={"result": result.model_dump()})
await client_consumed_queue.get()
except (ValueError, InvalidChatInputError, SerializationError) as e:
logger.error(f"Error running flow: {e}")
event_manager.on_error(data={"error": str(e)})
finally:
await event_manager.queue.put((None, None, time.time))
@router.post("/run/{flow_id_or_name}", response_model_exclude_none=True) # noqa: RUF100, FAST003
async def simplified_run_flow(
*,
@ -167,76 +258,68 @@ async def simplified_run_flow(
input_request: SimplifiedAPIRequest | None = None,
stream: bool = False,
api_key_user: Annotated[UserRead, Depends(api_key_security)],
) -> RunResponse:
"""Executes a specified flow by ID.
):
"""Executes a specified flow by ID with support for streaming and telemetry.
Executes a specified flow by ID with input customization, performance enhancements through caching,
and optional data streaming.
This endpoint executes a flow identified by ID or name, with options for streaming the response
and tracking execution metrics. It handles both streaming and non-streaming execution modes.
### Parameters:
- `db` (Session): Database session for executing queries.
- `flow_id_or_name` (str): ID or endpoint name of the flow to run.
- `input_request` (SimplifiedAPIRequest): Request object containing input values, types, output selection, tweaks,
and session ID.
- `api_key_user` (User): User object derived from the provided API key, used for authentication.
- `session_service` (SessionService): Service for managing flow sessions, essential for session reuse and caching.
Args:
background_tasks (BackgroundTasks): FastAPI background task manager
flow (FlowRead | None): The flow to execute, loaded via dependency
input_request (SimplifiedAPIRequest | None): Input parameters for the flow
stream (bool): Whether to stream the response
api_key_user (UserRead): Authenticated user from API key
request (Request): The incoming HTTP request
### SimplifiedAPIRequest:
- `input_value` (Optional[str], default=""): Input value to pass to the flow.
- `input_type` (Optional[Literal["chat", "text", "any"]], default="chat"): Type of the input value,
determining how the input is interpreted.
- `output_type` (Optional[Literal["chat", "text", "any", "debug"]], default="chat"): Desired type of output,
affecting which components' outputs are included in the response. If set to "debug", all outputs are returned.
- `output_component` (Optional[str], default=None): Specific component output to retrieve. If provided,
only the output of the specified component is returned. This overrides the `output_type` parameter.
- `tweaks` (Optional[Tweaks], default=None): Adjustments to the flow's behavior, allowing for custom execution
parameters.
- `session_id` (Optional[str], default=None): An identifier for reusing session data, aiding in performance for
subsequent requests.
Returns:
Union[StreamingResponse, RunResponse]: Either a streaming response for real-time results
or a RunResponse with the complete execution results
Raises:
HTTPException: For flow not found (404) or invalid input (400)
APIException: For internal execution errors (500)
### Tweaks
A dictionary of tweaks to customize the flow execution.
The tweaks can be used to modify the flow's parameters and components.
Tweaks can be overridden by the input values.
You can use Component's `id` or Display Name as key to tweak a specific component
(e.g., `{"Component Name": {"parameter_name": "value"}}`).
You can also use the parameter name as key to tweak all components with that parameter
(e.g., `{"parameter_name": "value"}`).
### Returns:
- A `RunResponse` object containing the execution results, including selected (or all, based on `output_type`)
outputs of the flow and the session ID, facilitating result retrieval and further interactions in a session
context.
### Raises:
- HTTPException: 404 if the specified flow ID curl -X 'POST' \
### Example:
```bash
curl -X 'POST' \
'http://<your_server>/run/{flow_id}' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-H 'x-api-key: YOU_API_KEY' \
-H '
-d '{
"input_value": "Sample input",
"input_type": "chat",
"output_type": "chat",
"tweaks": {},
}'
```
This endpoint provides a powerful interface for executing flows with enhanced flexibility and efficiency,
supporting a wide range of applications by allowing for dynamic input and output configuration along with
performance optimizations through session management and caching.
Notes:
- Supports both streaming and non-streaming execution modes
- Tracks execution time and success/failure via telemetry
- Handles graceful client disconnection in streaming mode
- Provides detailed error handling with appropriate HTTP status codes
- In streaming mode, uses EventManager to handle events:
- "add_message": New messages during execution
- "token": Individual tokens during streaming
- "end": Final execution result
"""
telemetry_service = get_telemetry_service()
input_request = input_request if input_request is not None else SimplifiedAPIRequest()
if flow is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Flow not found")
start_time = time.perf_counter()
if stream:
asyncio_queue: asyncio.Queue = asyncio.Queue()
asyncio_queue_client_consumed: asyncio.Queue = asyncio.Queue()
event_manager = create_stream_tokens_event_manager(queue=asyncio_queue)
main_task = asyncio.create_task(
run_flow_generator(
flow=flow,
input_request=input_request,
api_key_user=api_key_user,
event_manager=event_manager,
client_consumed_queue=asyncio_queue_client_consumed,
)
)
async def on_disconnect() -> None:
logger.debug("Client disconnected, closing tasks")
main_task.cancel()
return StreamingResponse(
consume_and_yield(asyncio_queue, asyncio_queue_client_consumed),
background=on_disconnect,
media_type="text/event-stream",
)
try:
result = await simple_run_flow(
flow=flow,

View file

@ -93,3 +93,11 @@ def create_default_event_manager(queue):
manager.register_event("on_build_start", "build_start")
manager.register_event("on_build_end", "build_end")
return manager
def create_stream_tokens_event_manager(queue):
manager = EventManager(queue)
manager.register_event("on_message", "add_message")
manager.register_event("on_token", "token")
manager.register_event("on_end", "end")
return manager

View file

@ -726,6 +726,7 @@ class Graph:
stream: bool,
session_id: str,
fallback_to_env_vars: bool,
event_manager: EventManager | None = None,
) -> list[ResultData | None]:
"""Runs the graph with the given inputs.
@ -737,6 +738,7 @@ class Graph:
stream (bool): Whether to stream the results or not.
session_id (str): The session ID for the graph.
fallback_to_env_vars (bool): Whether to fallback to environment variables.
event_manager (EventManager | None): The event manager for the graph.
Returns:
List[Optional["ResultData"]]: The outputs of the graph.
@ -770,7 +772,11 @@ class Graph:
try:
# Prioritize the webhook component if it exists
start_component_id = find_start_component_id(self._is_input_vertices)
await self.process(start_component_id=start_component_id, fallback_to_env_vars=fallback_to_env_vars)
await self.process(
start_component_id=start_component_id,
fallback_to_env_vars=fallback_to_env_vars,
event_manager=event_manager,
)
self.increment_run_count()
except Exception as exc:
self._end_all_traces_async(error=exc)
@ -804,6 +810,7 @@ class Graph:
session_id: str | None = None,
stream: bool = False,
fallback_to_env_vars: bool = False,
event_manager: EventManager | None = None,
) -> list[RunOutputs]:
"""Runs the graph with the given inputs.
@ -815,6 +822,7 @@ class Graph:
session_id (Optional[str], optional): The session ID for the graph. Defaults to None.
stream (bool, optional): Whether to stream the results or not. Defaults to False.
fallback_to_env_vars (bool, optional): Whether to fallback to environment variables. Defaults to False.
event_manager (EventManager | None): The event manager for the graph.
Returns:
List[RunOutputs]: The outputs of the graph.
@ -847,6 +855,7 @@ class Graph:
stream=stream,
session_id=session_id or "",
fallback_to_env_vars=fallback_to_env_vars,
event_manager=event_manager,
)
run_output_object = RunOutputs(inputs=run_inputs, outputs=run_outputs)
logger.debug(f"Run outputs: {run_output_object}")
@ -1495,7 +1504,13 @@ class Graph:
vertices.append(vertex)
return vertices
async def process(self, *, fallback_to_env_vars: bool, start_component_id: str | None = None) -> Graph:
async def process(
self,
*,
fallback_to_env_vars: bool,
start_component_id: str | None = None,
event_manager: EventManager | None = None,
) -> Graph:
"""Processes the graph with vertices in each layer run in parallel."""
first_layer = self.sort_vertices(start_component_id=start_component_id)
vertex_task_run_count: dict[str, int] = {}
@ -1521,6 +1536,7 @@ class Graph:
fallback_to_env_vars=fallback_to_env_vars,
get_cache=chat_service.get_cache,
set_cache=chat_service.set_cache,
event_manager=event_manager,
),
name=f"{vertex.display_name} Run {vertex_task_run_count.get(vertex_id, 0)}",
)

View file

@ -15,6 +15,7 @@ if TYPE_CHECKING:
from langflow.api.v1.schemas import InputValueRequest
from langflow.graph.graph.base import Graph
from langflow.graph.schema import RunOutputs
from langflow.services.event_manager import EventManager
class Result(BaseModel):
@ -30,6 +31,7 @@ async def run_graph_internal(
session_id: str | None = None,
inputs: list[InputValueRequest] | None = None,
outputs: list[str] | None = None,
event_manager: EventManager | None = None,
) -> tuple[list[RunOutputs], str]:
"""Run the graph and generate the result."""
inputs = inputs or []
@ -55,6 +57,7 @@ async def run_graph_internal(
stream=stream,
session_id=effective_session_id or "",
fallback_to_env_vars=fallback_to_env_vars,
event_manager=event_manager,
)
return run_outputs, effective_session_id