diff --git a/src/backend/base/langflow/api/build.py b/src/backend/base/langflow/api/build.py index 87d2b004d..9ae6f8bcb 100644 --- a/src/backend/base/langflow/api/build.py +++ b/src/backend/base/langflow/api/build.py @@ -5,8 +5,7 @@ import traceback import uuid from collections.abc import AsyncIterator -from fastapi import BackgroundTasks, HTTPException -from fastapi.responses import JSONResponse +from fastapi import BackgroundTasks, HTTPException, Response from loguru import logger from sqlmodel import select @@ -100,22 +99,42 @@ async def get_flow_events_response( event_task=event_task, ) - # Polling mode - get exactly one event + # Polling mode - get all available events try: - _, value, _ = await main_queue.get() - if value is None: - # End of stream, trigger end event - if event_task is not None: - event_task.cancel() - event_manager.on_end(data={}) + events: list = [] + # Get all available events from the queue without blocking + while not main_queue.empty(): + _, value, _ = await main_queue.get() + if value is None: + # End of stream, trigger end event + if event_task is not None: + event_task.cancel() + event_manager.on_end(data={}) + # Include the end event + events.append(None) + break + events.append(value.decode("utf-8")) - return JSONResponse({"event": value.decode("utf-8") if value else None}) + # If no events were available, wait for one (with timeout) + if not events: + _, value, _ = await main_queue.get() + if value is None: + # End of stream, trigger end event + if event_task is not None: + event_task.cancel() + event_manager.on_end(data={}) + else: + events.append(value.decode("utf-8")) + + # Return as NDJSON format - each line is a complete JSON object + content = "\n".join([event for event in events if event is not None]) + return Response(content=content, media_type="application/x-ndjson") except asyncio.CancelledError as exc: logger.info(f"Event polling was cancelled for job {job_id}") raise HTTPException(status_code=499, detail="Event polling was cancelled") from exc - except asyncio.TimeoutError as exc: + except asyncio.TimeoutError: logger.warning(f"Timeout while waiting for events for job {job_id}") - raise HTTPException(status_code=408, detail="Timeout while waiting for events") from exc + return Response(content="", media_type="application/x-ndjson") # Return empty response instead of error except JobQueueNotFoundError as exc: logger.error(f"Job not found: {job_id}. Error: {exc!s}") diff --git a/src/backend/tests/unit/build_utils.py b/src/backend/tests/unit/build_utils.py index abad2d769..694507e0d 100644 --- a/src/backend/tests/unit/build_utils.py +++ b/src/backend/tests/unit/build_utils.py @@ -1,8 +1,10 @@ +import asyncio import json from typing import Any from uuid import UUID from httpx import AsyncClient, codes +from loguru import logger async def create_flow(client: AsyncClient, flow_data: str, headers: dict[str, str]) -> UUID: @@ -25,51 +27,103 @@ async def build_flow( async def get_build_events(client: AsyncClient, job_id: str, headers: dict[str, str]): """Get events for a build job.""" - return await client.get(f"api/v1/build/{job_id}/events", headers=headers) + # Add Accept header for NDJSON format + headers_with_accept = {**headers, "Accept": "application/x-ndjson"} + return await client.get(f"api/v1/build/{job_id}/events", headers=headers_with_accept) -async def consume_and_assert_stream(response, job_id): - """Consume the event stream and assert the expected event structure.""" +async def consume_and_assert_stream(response, job_id, timeout=10.0): + """Consume the event stream and assert the expected event structure. + + Args: + response: The response object with an aiter_lines method + job_id: The job ID to verify in events + timeout: Maximum time in seconds to wait for events (default: 10s) + """ count = 0 lines = [] - async for line in response.aiter_lines(): - # Skip empty lines (ndjson uses double newlines) - if not line: - continue + first_event_seen = False + end_event_seen = False - lines.append(line) - parsed = json.loads(line) - if "job_id" in parsed: - assert parsed["job_id"] == job_id - continue + # Set a timeout for the entire consumption process + try: + # In Python 3.10, asyncio.timeout() is not available, so we use wait_for instead + async def process_events(): + nonlocal count, lines, first_event_seen, end_event_seen + async for line in response.aiter_lines(): + # Skip empty lines (ndjson uses double newlines) + if not line: + continue - if count == 0: - # First event should be vertices_sorted - assert parsed["event"] == "vertices_sorted", ( - "Invalid first event. Expected 'vertices_sorted'. Full event stream:\n" + "\n".join(lines) - ) - ids = parsed["data"]["ids"] - ids.sort() - assert ids == ["ChatInput-CIGht"], "Invalid ids in first event. Full event stream:\n" + "\n".join(lines) + lines.append(line) + try: + parsed = json.loads(line) + except json.JSONDecodeError: + logger.debug(f"ERROR: Failed to parse JSON: {line}") + raise - to_run = parsed["data"]["to_run"] - to_run.sort() - assert to_run == ["ChatInput-CIGht", "ChatOutput-QA7ej", "Memory-amN4Z", "Prompt-iWbCC"], ( - "Invalid to_run list in first event. Full event stream:\n" + "\n".join(lines) - ) - elif count > 0 and count < 5: - # Next events should be end_vertex events - assert parsed["event"] == "end_vertex", ( - f"Invalid event at position {count}. Expected 'end_vertex'. Full event stream:\n" + "\n".join(lines) - ) - assert parsed["data"]["build_data"] is not None, ( - f"Missing build_data at position {count}. Full event stream:\n" + "\n".join(lines) - ) - elif count == 5: - # Final event should be end - assert parsed["event"] == "end", "Invalid final event. Expected 'end'. Full event stream:\n" + "\n".join( - lines - ) - else: - raise ValueError(f"Unexpected event at position {count}. Full event stream:\n" + "\n".join(lines)) - count += 1 + if "job_id" in parsed: + assert parsed["job_id"] == job_id + continue + + # First event should be vertices_sorted + if not first_event_seen: + assert parsed["event"] == "vertices_sorted", ( + "Invalid first event. Expected 'vertices_sorted'. Full event stream:\n" + "\n".join(lines) + ) + ids = parsed["data"]["ids"] + ids.sort() + assert ids == ["ChatInput-CIGht"], "Invalid ids in first event. Full event stream:\n" + "\n".join( + lines + ) + + to_run = parsed["data"]["to_run"] + to_run.sort() + assert to_run == ["ChatInput-CIGht", "ChatOutput-QA7ej", "Memory-amN4Z", "Prompt-iWbCC"], ( + "Invalid to_run list in first event. Full event stream:\n" + "\n".join(lines) + ) + first_event_seen = True + # Last event should be end + elif parsed["event"] == "end": + end_event_seen = True + # Middle events should be end_vertex + elif parsed["event"] == "end_vertex": + assert parsed["data"]["build_data"] is not None, ( + f"Missing build_data at position {count}. Full event stream:\n" + "\n".join(lines) + ) + # Other event types (like token or add_message) are allowed and ignored + else: + # Allow other event types to pass through without failing + pass + + count += 1 + + # Debug output for verbose mode to track progress + if count % 10 == 0: + logger.debug(f"Processed {count} events so far") + + await asyncio.wait_for(process_events(), timeout=timeout) + except asyncio.TimeoutError as e: + # If we timed out, logger.debug what we have so far and fail the test + events_summary = "\n".join( + f"{i}: {line[:80]}..." if len(line) > 80 else f"{i}: {line}" for i, line in enumerate(lines) + ) + logger.debug( + f"ERROR: Test timed out after {timeout}s. Processed {count} events.\nEvents received:\n{events_summary}" + ) + if first_event_seen and not end_event_seen: + msg = f"Test timed out after {timeout}s waiting for 'end' event" + raise TimeoutError(msg) from e + if not first_event_seen: + msg = f"Test timed out after {timeout}s waiting for 'vertices_sorted' event" + raise TimeoutError(msg) from e + msg = f"Test timed out after {timeout}s" + raise TimeoutError(msg) from e + + # Verify we saw both the first and end events + assert first_event_seen, "Missing vertices_sorted event. Full event stream:\n" + "\n".join(lines) + assert end_event_seen, "Missing end event. Full event stream:\n" + "\n".join(lines) + + # logger.debug summary of events processed + logger.debug(f"Successfully processed {count} events for job {job_id}") + return count diff --git a/src/backend/tests/unit/test_chat_endpoint.py b/src/backend/tests/unit/test_chat_endpoint.py index 9e97c7689..1ec5bb929 100644 --- a/src/backend/tests/unit/test_chat_endpoint.py +++ b/src/backend/tests/unit/test_chat_endpoint.py @@ -1,4 +1,5 @@ import asyncio +import json import uuid from uuid import UUID @@ -6,6 +7,7 @@ import pytest from httpx import codes from langflow.memory import aget_messages from langflow.services.database.models.flow import FlowUpdate +from loguru import logger from tests.unit.build_utils import build_flow, consume_and_assert_stream, create_flow, get_build_events @@ -160,33 +162,87 @@ async def test_build_flow_polling(client, json_memory_chatbot_no_llm, logged_in_ self.job_id = job_id self.headers = headers self.status_code = codes.OK + self.max_total_events = 50 # Limit to prevent infinite loops + self.max_empty_polls = 10 # Maximum number of empty polls before giving up + self.poll_timeout = 1.0 # Timeout for each polling request async def aiter_lines(self): try: - sleeps = 0 - max_sleeps = 100 - while True: - response = await self.client.get( - f"api/v1/build/{self.job_id}/events?event_delivery=polling", headers=self.headers - ) - assert response.status_code == codes.OK - data = response.json() + empty_polls = 0 + total_events = 0 + end_event_found = False - if data["event"] is None: - # No event available, add delay to prevent tight polling + while ( + empty_polls < self.max_empty_polls and total_events < self.max_total_events and not end_event_found + ): + # Add Accept header for NDJSON + headers = {**self.headers, "Accept": "application/x-ndjson"} + + # Set a timeout for the request + response = await asyncio.wait_for( + self.client.get( + f"api/v1/build/{self.job_id}/events?event_delivery=polling", + headers=headers, + ), + timeout=self.poll_timeout, + ) + + assert response.status_code == codes.OK + + # Get the NDJSON response as text + text = response.text + + # Skip if response is empty + if not text.strip(): + empty_polls += 1 await asyncio.sleep(0.1) - sleeps += 1 continue - yield data["event"] + # Reset empty polls counter since we got data + empty_polls = 0 + + # Process each line as an individual JSON object + line_count = 0 + for line in text.splitlines(): + if not line.strip(): + continue + + line_count += 1 + total_events += 1 + + # Check for end event with multiple possible formats + if '"event":"end"' in line or '"event": "end"' in line: + end_event_found = True + + # Validate it's proper JSON before yielding + try: + json.loads(line) # Test parse to ensure it's valid JSON + yield line + except json.JSONDecodeError as e: + logger.debug(f"WARNING: Skipping invalid JSON: {line}") + logger.debug(f"Error: {e}") + # Don't yield invalid JSON, but continue processing other lines + + # If we had no events in this batch, count as empty poll + if line_count == 0: + empty_polls += 1 + + # Add a small delay to prevent tight polling + await asyncio.sleep(0.1) + + # If we hit the limit without finding the end event, log a warning + if total_events >= self.max_total_events: + logger.debug( + f"WARNING: Reached maximum event limit ({self.max_total_events}) without finding end event" + ) + + if empty_polls >= self.max_empty_polls and not end_event_found: + logger.debug( + f"WARNING: Reached maximum empty polls ({self.max_empty_polls}) without finding end event" + ) - # If this was the end event, stop polling - if '"end"' in data["event"]: - break - if sleeps > max_sleeps: - msg = "Build event polling timed out." - raise TimeoutError(msg) except asyncio.TimeoutError as e: + logger.debug(f"ERROR: Polling request timed out after {self.poll_timeout}s") msg = "Build event polling timed out." raise TimeoutError(msg) from e diff --git a/src/frontend/src/constants/constants.ts b/src/frontend/src/constants/constants.ts index ce7e8360e..94b021ffd 100644 --- a/src/frontend/src/constants/constants.ts +++ b/src/frontend/src/constants/constants.ts @@ -1024,7 +1024,7 @@ export const POLLING_MESSAGES = { STREAMING_NOT_SUPPORTED: "Streaming not supported", } as const; -export const POLLING_INTERVAL = 100; +export const BUILD_POLLING_INTERVAL = 25; export const IS_AUTO_LOGIN = !process?.env?.LANGFLOW_AUTO_LOGIN || diff --git a/src/frontend/src/utils/buildUtils.ts b/src/frontend/src/utils/buildUtils.ts index 4dc5fcc95..df3b97755 100644 --- a/src/frontend/src/utils/buildUtils.ts +++ b/src/frontend/src/utils/buildUtils.ts @@ -1,7 +1,7 @@ import { MISSED_ERROR_ALERT } from "@/constants/alerts_constants"; import { BASE_URL_API, - POLLING_INTERVAL, + BUILD_POLLING_INTERVAL, POLLING_MESSAGES, } from "@/constants/constants"; import { performStreamingRequest } from "@/controllers/API/api"; @@ -184,6 +184,7 @@ async function pollBuildEvents( method: "GET", headers: { "Content-Type": "application/json", + Accept: "application/x-ndjson", }, signal: abortController.signal, // Add abort signal to fetch }, @@ -197,34 +198,51 @@ async function pollBuildEvents( ); } - const data = await response.json(); - if (!data.event) { - // No event in this request, try again + // Get the response text - will be NDJSON format (one JSON per line) + const responseText = await response.text(); + + // Skip if empty response + if (!responseText.trim()) { await new Promise((resolve) => setTimeout(resolve, 100)); continue; } - // Process the event - const event = JSON.parse(data.event); - const result = await onEvent( - event.event, - event.data, - buildResults, - verticesStartTimeMs, - callbacks, - ); - if (!result) { - isDone = true; - abortController.abort(); + // Split by newlines to get individual JSON objects + const eventLines = responseText.split("\n").filter((line) => line.trim()); + + // If no events, continue polling + if (eventLines.length === 0) { + await new Promise((resolve) => setTimeout(resolve, 100)); + continue; } - // Check if this was the end event or if we got a null value - if (event.event === "end" || data.event === null) { - isDone = true; + // Process all events in the NDJSON response + for (const eventStr of eventLines) { + // Process the event + const event = JSON.parse(eventStr); + const result = await onEvent( + event.event, + event.data, + buildResults, + verticesStartTimeMs, + callbacks, + ); + + if (!result) { + isDone = true; + abortController.abort(); + break; + } + + // Check if this was the end event + if (event.event === "end") { + isDone = true; + break; + } } - // Add a small delay between polls to avoid overwhelming the server - await new Promise((resolve) => setTimeout(resolve, POLLING_INTERVAL)); + // Add a small delay between polls + await new Promise((resolve) => setTimeout(resolve, BUILD_POLLING_INTERVAL)); } }