From 304e28b48ae05e291d00f08f8859d2053eaa261b Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 5 May 2025 19:20:04 -0300 Subject: [PATCH] feat: event polling can process more than one event at a time (#7858) * refactor: enhance event polling to retrieve all available events and return in NDJSON format Updated the event polling logic to collect all available events from the queue instead of just one. The response format has been changed to NDJSON, allowing multiple events to be returned in a single response. Improved error handling for timeouts by returning an empty response instead of raising an error. * refactor: improve NDJSON event processing in build polling Enhanced the event polling logic to handle NDJSON responses by processing multiple events in a single fetch. Updated the response handling to read the response as text, split it into individual JSON objects, and process each event accordingly. This change improves the efficiency of event handling and maintains a consistent polling interval. * fix: change event list type from list[str] to list in get_flow_events_response * refactor: enhance event stream consumption with timeout and error handling Updated the consume_and_assert_stream function to include a timeout for processing events and improved error handling for JSON parsing failures. Added logging for better debugging and tracking of processed events. Adjusted the event validation logic to ensure both the first and last expected events are present. * refactor: enhance event polling logic with improved limits and JSON validation Updated the event polling mechanism to include configurable limits for total events and empty polls, along with a timeout for requests. Enhanced JSON validation for individual events and added logging for better debugging of polling behavior. This refactor aims to improve robustness and maintainability of the event consumption process. * refactor: update polling interval and streamline event handling Modified the polling interval constant to improve responsiveness and adjusted event handling logic in the get_flow_events_response function to simplify the processing of event values. This change enhances the efficiency of event consumption and maintains a consistent approach to handling NDJSON responses. * refactor: simplify event retrieval in get_flow_events_response Removed unused event_id and put_time variables from the event retrieval logic in the get_flow_events_response function. This change streamlines the code and enhances readability while maintaining the functionality of event handling. * refactor: update consume_and_assert_stream to use asyncio.wait_for for timeout handling Modified the consume_and_assert_stream function to utilize asyncio.wait_for for managing timeouts in event processing. This change improves compatibility with Python 3.10 and enhances the robustness of the event consumption logic while maintaining existing functionality. --- src/backend/base/langflow/api/build.py | 43 ++++-- src/backend/tests/unit/build_utils.py | 136 +++++++++++++------ src/backend/tests/unit/test_chat_endpoint.py | 92 ++++++++++--- src/frontend/src/constants/constants.ts | 2 +- src/frontend/src/utils/buildUtils.ts | 60 +++++--- 5 files changed, 240 insertions(+), 93 deletions(-) diff --git a/src/backend/base/langflow/api/build.py b/src/backend/base/langflow/api/build.py index 87d2b004d..9ae6f8bcb 100644 --- a/src/backend/base/langflow/api/build.py +++ b/src/backend/base/langflow/api/build.py @@ -5,8 +5,7 @@ import traceback import uuid from collections.abc import AsyncIterator -from fastapi import BackgroundTasks, HTTPException -from fastapi.responses import JSONResponse +from fastapi import BackgroundTasks, HTTPException, Response from loguru import logger from sqlmodel import select @@ -100,22 +99,42 @@ async def get_flow_events_response( event_task=event_task, ) - # Polling mode - get exactly one event + # Polling mode - get all available events try: - _, value, _ = await main_queue.get() - if value is None: - # End of stream, trigger end event - if event_task is not None: - event_task.cancel() - event_manager.on_end(data={}) + events: list = [] + # Get all available events from the queue without blocking + while not main_queue.empty(): + _, value, _ = await main_queue.get() + if value is None: + # End of stream, trigger end event + if event_task is not None: + event_task.cancel() + event_manager.on_end(data={}) + # Include the end event + events.append(None) + break + events.append(value.decode("utf-8")) - return JSONResponse({"event": value.decode("utf-8") if value else None}) + # If no events were available, wait for one (with timeout) + if not events: + _, value, _ = await main_queue.get() + if value is None: + # End of stream, trigger end event + if event_task is not None: + event_task.cancel() + event_manager.on_end(data={}) + else: + events.append(value.decode("utf-8")) + + # Return as NDJSON format - each line is a complete JSON object + content = "\n".join([event for event in events if event is not None]) + return Response(content=content, media_type="application/x-ndjson") except asyncio.CancelledError as exc: logger.info(f"Event polling was cancelled for job {job_id}") raise HTTPException(status_code=499, detail="Event polling was cancelled") from exc - except asyncio.TimeoutError as exc: + except asyncio.TimeoutError: logger.warning(f"Timeout while waiting for events for job {job_id}") - raise HTTPException(status_code=408, detail="Timeout while waiting for events") from exc + return Response(content="", media_type="application/x-ndjson") # Return empty response instead of error except JobQueueNotFoundError as exc: logger.error(f"Job not found: {job_id}. Error: {exc!s}") diff --git a/src/backend/tests/unit/build_utils.py b/src/backend/tests/unit/build_utils.py index abad2d769..694507e0d 100644 --- a/src/backend/tests/unit/build_utils.py +++ b/src/backend/tests/unit/build_utils.py @@ -1,8 +1,10 @@ +import asyncio import json from typing import Any from uuid import UUID from httpx import AsyncClient, codes +from loguru import logger async def create_flow(client: AsyncClient, flow_data: str, headers: dict[str, str]) -> UUID: @@ -25,51 +27,103 @@ async def build_flow( async def get_build_events(client: AsyncClient, job_id: str, headers: dict[str, str]): """Get events for a build job.""" - return await client.get(f"api/v1/build/{job_id}/events", headers=headers) + # Add Accept header for NDJSON format + headers_with_accept = {**headers, "Accept": "application/x-ndjson"} + return await client.get(f"api/v1/build/{job_id}/events", headers=headers_with_accept) -async def consume_and_assert_stream(response, job_id): - """Consume the event stream and assert the expected event structure.""" +async def consume_and_assert_stream(response, job_id, timeout=10.0): + """Consume the event stream and assert the expected event structure. + + Args: + response: The response object with an aiter_lines method + job_id: The job ID to verify in events + timeout: Maximum time in seconds to wait for events (default: 10s) + """ count = 0 lines = [] - async for line in response.aiter_lines(): - # Skip empty lines (ndjson uses double newlines) - if not line: - continue + first_event_seen = False + end_event_seen = False - lines.append(line) - parsed = json.loads(line) - if "job_id" in parsed: - assert parsed["job_id"] == job_id - continue + # Set a timeout for the entire consumption process + try: + # In Python 3.10, asyncio.timeout() is not available, so we use wait_for instead + async def process_events(): + nonlocal count, lines, first_event_seen, end_event_seen + async for line in response.aiter_lines(): + # Skip empty lines (ndjson uses double newlines) + if not line: + continue - if count == 0: - # First event should be vertices_sorted - assert parsed["event"] == "vertices_sorted", ( - "Invalid first event. Expected 'vertices_sorted'. Full event stream:\n" + "\n".join(lines) - ) - ids = parsed["data"]["ids"] - ids.sort() - assert ids == ["ChatInput-CIGht"], "Invalid ids in first event. Full event stream:\n" + "\n".join(lines) + lines.append(line) + try: + parsed = json.loads(line) + except json.JSONDecodeError: + logger.debug(f"ERROR: Failed to parse JSON: {line}") + raise - to_run = parsed["data"]["to_run"] - to_run.sort() - assert to_run == ["ChatInput-CIGht", "ChatOutput-QA7ej", "Memory-amN4Z", "Prompt-iWbCC"], ( - "Invalid to_run list in first event. Full event stream:\n" + "\n".join(lines) - ) - elif count > 0 and count < 5: - # Next events should be end_vertex events - assert parsed["event"] == "end_vertex", ( - f"Invalid event at position {count}. Expected 'end_vertex'. Full event stream:\n" + "\n".join(lines) - ) - assert parsed["data"]["build_data"] is not None, ( - f"Missing build_data at position {count}. Full event stream:\n" + "\n".join(lines) - ) - elif count == 5: - # Final event should be end - assert parsed["event"] == "end", "Invalid final event. Expected 'end'. Full event stream:\n" + "\n".join( - lines - ) - else: - raise ValueError(f"Unexpected event at position {count}. Full event stream:\n" + "\n".join(lines)) - count += 1 + if "job_id" in parsed: + assert parsed["job_id"] == job_id + continue + + # First event should be vertices_sorted + if not first_event_seen: + assert parsed["event"] == "vertices_sorted", ( + "Invalid first event. Expected 'vertices_sorted'. Full event stream:\n" + "\n".join(lines) + ) + ids = parsed["data"]["ids"] + ids.sort() + assert ids == ["ChatInput-CIGht"], "Invalid ids in first event. Full event stream:\n" + "\n".join( + lines + ) + + to_run = parsed["data"]["to_run"] + to_run.sort() + assert to_run == ["ChatInput-CIGht", "ChatOutput-QA7ej", "Memory-amN4Z", "Prompt-iWbCC"], ( + "Invalid to_run list in first event. Full event stream:\n" + "\n".join(lines) + ) + first_event_seen = True + # Last event should be end + elif parsed["event"] == "end": + end_event_seen = True + # Middle events should be end_vertex + elif parsed["event"] == "end_vertex": + assert parsed["data"]["build_data"] is not None, ( + f"Missing build_data at position {count}. Full event stream:\n" + "\n".join(lines) + ) + # Other event types (like token or add_message) are allowed and ignored + else: + # Allow other event types to pass through without failing + pass + + count += 1 + + # Debug output for verbose mode to track progress + if count % 10 == 0: + logger.debug(f"Processed {count} events so far") + + await asyncio.wait_for(process_events(), timeout=timeout) + except asyncio.TimeoutError as e: + # If we timed out, logger.debug what we have so far and fail the test + events_summary = "\n".join( + f"{i}: {line[:80]}..." if len(line) > 80 else f"{i}: {line}" for i, line in enumerate(lines) + ) + logger.debug( + f"ERROR: Test timed out after {timeout}s. Processed {count} events.\nEvents received:\n{events_summary}" + ) + if first_event_seen and not end_event_seen: + msg = f"Test timed out after {timeout}s waiting for 'end' event" + raise TimeoutError(msg) from e + if not first_event_seen: + msg = f"Test timed out after {timeout}s waiting for 'vertices_sorted' event" + raise TimeoutError(msg) from e + msg = f"Test timed out after {timeout}s" + raise TimeoutError(msg) from e + + # Verify we saw both the first and end events + assert first_event_seen, "Missing vertices_sorted event. Full event stream:\n" + "\n".join(lines) + assert end_event_seen, "Missing end event. Full event stream:\n" + "\n".join(lines) + + # logger.debug summary of events processed + logger.debug(f"Successfully processed {count} events for job {job_id}") + return count diff --git a/src/backend/tests/unit/test_chat_endpoint.py b/src/backend/tests/unit/test_chat_endpoint.py index 9e97c7689..1ec5bb929 100644 --- a/src/backend/tests/unit/test_chat_endpoint.py +++ b/src/backend/tests/unit/test_chat_endpoint.py @@ -1,4 +1,5 @@ import asyncio +import json import uuid from uuid import UUID @@ -6,6 +7,7 @@ import pytest from httpx import codes from langflow.memory import aget_messages from langflow.services.database.models.flow import FlowUpdate +from loguru import logger from tests.unit.build_utils import build_flow, consume_and_assert_stream, create_flow, get_build_events @@ -160,33 +162,87 @@ async def test_build_flow_polling(client, json_memory_chatbot_no_llm, logged_in_ self.job_id = job_id self.headers = headers self.status_code = codes.OK + self.max_total_events = 50 # Limit to prevent infinite loops + self.max_empty_polls = 10 # Maximum number of empty polls before giving up + self.poll_timeout = 1.0 # Timeout for each polling request async def aiter_lines(self): try: - sleeps = 0 - max_sleeps = 100 - while True: - response = await self.client.get( - f"api/v1/build/{self.job_id}/events?event_delivery=polling", headers=self.headers - ) - assert response.status_code == codes.OK - data = response.json() + empty_polls = 0 + total_events = 0 + end_event_found = False - if data["event"] is None: - # No event available, add delay to prevent tight polling + while ( + empty_polls < self.max_empty_polls and total_events < self.max_total_events and not end_event_found + ): + # Add Accept header for NDJSON + headers = {**self.headers, "Accept": "application/x-ndjson"} + + # Set a timeout for the request + response = await asyncio.wait_for( + self.client.get( + f"api/v1/build/{self.job_id}/events?event_delivery=polling", + headers=headers, + ), + timeout=self.poll_timeout, + ) + + assert response.status_code == codes.OK + + # Get the NDJSON response as text + text = response.text + + # Skip if response is empty + if not text.strip(): + empty_polls += 1 await asyncio.sleep(0.1) - sleeps += 1 continue - yield data["event"] + # Reset empty polls counter since we got data + empty_polls = 0 + + # Process each line as an individual JSON object + line_count = 0 + for line in text.splitlines(): + if not line.strip(): + continue + + line_count += 1 + total_events += 1 + + # Check for end event with multiple possible formats + if '"event":"end"' in line or '"event": "end"' in line: + end_event_found = True + + # Validate it's proper JSON before yielding + try: + json.loads(line) # Test parse to ensure it's valid JSON + yield line + except json.JSONDecodeError as e: + logger.debug(f"WARNING: Skipping invalid JSON: {line}") + logger.debug(f"Error: {e}") + # Don't yield invalid JSON, but continue processing other lines + + # If we had no events in this batch, count as empty poll + if line_count == 0: + empty_polls += 1 + + # Add a small delay to prevent tight polling + await asyncio.sleep(0.1) + + # If we hit the limit without finding the end event, log a warning + if total_events >= self.max_total_events: + logger.debug( + f"WARNING: Reached maximum event limit ({self.max_total_events}) without finding end event" + ) + + if empty_polls >= self.max_empty_polls and not end_event_found: + logger.debug( + f"WARNING: Reached maximum empty polls ({self.max_empty_polls}) without finding end event" + ) - # If this was the end event, stop polling - if '"end"' in data["event"]: - break - if sleeps > max_sleeps: - msg = "Build event polling timed out." - raise TimeoutError(msg) except asyncio.TimeoutError as e: + logger.debug(f"ERROR: Polling request timed out after {self.poll_timeout}s") msg = "Build event polling timed out." raise TimeoutError(msg) from e diff --git a/src/frontend/src/constants/constants.ts b/src/frontend/src/constants/constants.ts index ce7e8360e..94b021ffd 100644 --- a/src/frontend/src/constants/constants.ts +++ b/src/frontend/src/constants/constants.ts @@ -1024,7 +1024,7 @@ export const POLLING_MESSAGES = { STREAMING_NOT_SUPPORTED: "Streaming not supported", } as const; -export const POLLING_INTERVAL = 100; +export const BUILD_POLLING_INTERVAL = 25; export const IS_AUTO_LOGIN = !process?.env?.LANGFLOW_AUTO_LOGIN || diff --git a/src/frontend/src/utils/buildUtils.ts b/src/frontend/src/utils/buildUtils.ts index 4dc5fcc95..df3b97755 100644 --- a/src/frontend/src/utils/buildUtils.ts +++ b/src/frontend/src/utils/buildUtils.ts @@ -1,7 +1,7 @@ import { MISSED_ERROR_ALERT } from "@/constants/alerts_constants"; import { BASE_URL_API, - POLLING_INTERVAL, + BUILD_POLLING_INTERVAL, POLLING_MESSAGES, } from "@/constants/constants"; import { performStreamingRequest } from "@/controllers/API/api"; @@ -184,6 +184,7 @@ async function pollBuildEvents( method: "GET", headers: { "Content-Type": "application/json", + Accept: "application/x-ndjson", }, signal: abortController.signal, // Add abort signal to fetch }, @@ -197,34 +198,51 @@ async function pollBuildEvents( ); } - const data = await response.json(); - if (!data.event) { - // No event in this request, try again + // Get the response text - will be NDJSON format (one JSON per line) + const responseText = await response.text(); + + // Skip if empty response + if (!responseText.trim()) { await new Promise((resolve) => setTimeout(resolve, 100)); continue; } - // Process the event - const event = JSON.parse(data.event); - const result = await onEvent( - event.event, - event.data, - buildResults, - verticesStartTimeMs, - callbacks, - ); - if (!result) { - isDone = true; - abortController.abort(); + // Split by newlines to get individual JSON objects + const eventLines = responseText.split("\n").filter((line) => line.trim()); + + // If no events, continue polling + if (eventLines.length === 0) { + await new Promise((resolve) => setTimeout(resolve, 100)); + continue; } - // Check if this was the end event or if we got a null value - if (event.event === "end" || data.event === null) { - isDone = true; + // Process all events in the NDJSON response + for (const eventStr of eventLines) { + // Process the event + const event = JSON.parse(eventStr); + const result = await onEvent( + event.event, + event.data, + buildResults, + verticesStartTimeMs, + callbacks, + ); + + if (!result) { + isDone = true; + abortController.abort(); + break; + } + + // Check if this was the end event + if (event.event === "end") { + isDone = true; + break; + } } - // Add a small delay between polls to avoid overwhelming the server - await new Promise((resolve) => setTimeout(resolve, POLLING_INTERVAL)); + // Add a small delay between polls + await new Promise((resolve) => setTimeout(resolve, BUILD_POLLING_INTERVAL)); } }