feat: event polling can process more than one event at a time (#7858)

* refactor: enhance event polling to retrieve all available events and return in NDJSON format

Updated the event polling logic to collect all available events from the queue instead of just one. The response format has been changed to NDJSON, allowing multiple events to be returned in a single response. Improved error handling for timeouts by returning an empty response instead of raising an error.

* refactor: improve NDJSON event processing in build polling

Enhanced the event polling logic to handle NDJSON responses by processing multiple events in a single fetch. Updated the response handling to read the response as text, split it into individual JSON objects, and process each event accordingly. This change improves the efficiency of event handling and maintains a consistent polling interval.

* fix: change event list type from list[str] to list in get_flow_events_response

* refactor: enhance event stream consumption with timeout and error handling

Updated the consume_and_assert_stream function to include a timeout for processing events and improved error handling for JSON parsing failures. Added logging for better debugging and tracking of processed events. Adjusted the event validation logic to ensure both the first and last expected events are present.

* refactor: enhance event polling logic with improved limits and JSON validation

Updated the event polling mechanism to include configurable limits for total events and empty polls, along with a timeout for requests. Enhanced JSON validation for individual events and added logging for better debugging of polling behavior. This refactor aims to improve robustness and maintainability of the event consumption process.

* refactor: update polling interval and streamline event handling

Modified the polling interval constant to improve responsiveness and adjusted event handling logic in the get_flow_events_response function to simplify the processing of event values. This change enhances the efficiency of event consumption and maintains a consistent approach to handling NDJSON responses.

* refactor: simplify event retrieval in get_flow_events_response

Removed unused event_id and put_time variables from the event retrieval logic in the get_flow_events_response function. This change streamlines the code and enhances readability while maintaining the functionality of event handling.

* refactor: update consume_and_assert_stream to use asyncio.wait_for for timeout handling

Modified the consume_and_assert_stream function to utilize asyncio.wait_for for managing timeouts in event processing. This change improves compatibility with Python 3.10 and enhances the robustness of the event consumption logic while maintaining existing functionality.
This commit is contained in:
Gabriel Luiz Freitas Almeida 2025-05-05 19:20:04 -03:00 committed by GitHub
commit 304e28b48a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 240 additions and 93 deletions

View file

@ -5,8 +5,7 @@ import traceback
import uuid
from collections.abc import AsyncIterator
from fastapi import BackgroundTasks, HTTPException
from fastapi.responses import JSONResponse
from fastapi import BackgroundTasks, HTTPException, Response
from loguru import logger
from sqlmodel import select
@ -100,22 +99,42 @@ async def get_flow_events_response(
event_task=event_task,
)
# Polling mode - get exactly one event
# Polling mode - get all available events
try:
_, value, _ = await main_queue.get()
if value is None:
# End of stream, trigger end event
if event_task is not None:
event_task.cancel()
event_manager.on_end(data={})
events: list = []
# Get all available events from the queue without blocking
while not main_queue.empty():
_, value, _ = await main_queue.get()
if value is None:
# End of stream, trigger end event
if event_task is not None:
event_task.cancel()
event_manager.on_end(data={})
# Include the end event
events.append(None)
break
events.append(value.decode("utf-8"))
return JSONResponse({"event": value.decode("utf-8") if value else None})
# If no events were available, wait for one (with timeout)
if not events:
_, value, _ = await main_queue.get()
if value is None:
# End of stream, trigger end event
if event_task is not None:
event_task.cancel()
event_manager.on_end(data={})
else:
events.append(value.decode("utf-8"))
# Return as NDJSON format - each line is a complete JSON object
content = "\n".join([event for event in events if event is not None])
return Response(content=content, media_type="application/x-ndjson")
except asyncio.CancelledError as exc:
logger.info(f"Event polling was cancelled for job {job_id}")
raise HTTPException(status_code=499, detail="Event polling was cancelled") from exc
except asyncio.TimeoutError as exc:
except asyncio.TimeoutError:
logger.warning(f"Timeout while waiting for events for job {job_id}")
raise HTTPException(status_code=408, detail="Timeout while waiting for events") from exc
return Response(content="", media_type="application/x-ndjson") # Return empty response instead of error
except JobQueueNotFoundError as exc:
logger.error(f"Job not found: {job_id}. Error: {exc!s}")

View file

@ -1,8 +1,10 @@
import asyncio
import json
from typing import Any
from uuid import UUID
from httpx import AsyncClient, codes
from loguru import logger
async def create_flow(client: AsyncClient, flow_data: str, headers: dict[str, str]) -> UUID:
@ -25,51 +27,103 @@ async def build_flow(
async def get_build_events(client: AsyncClient, job_id: str, headers: dict[str, str]):
"""Get events for a build job."""
return await client.get(f"api/v1/build/{job_id}/events", headers=headers)
# Add Accept header for NDJSON format
headers_with_accept = {**headers, "Accept": "application/x-ndjson"}
return await client.get(f"api/v1/build/{job_id}/events", headers=headers_with_accept)
async def consume_and_assert_stream(response, job_id):
"""Consume the event stream and assert the expected event structure."""
async def consume_and_assert_stream(response, job_id, timeout=10.0):
"""Consume the event stream and assert the expected event structure.
Args:
response: The response object with an aiter_lines method
job_id: The job ID to verify in events
timeout: Maximum time in seconds to wait for events (default: 10s)
"""
count = 0
lines = []
async for line in response.aiter_lines():
# Skip empty lines (ndjson uses double newlines)
if not line:
continue
first_event_seen = False
end_event_seen = False
lines.append(line)
parsed = json.loads(line)
if "job_id" in parsed:
assert parsed["job_id"] == job_id
continue
# Set a timeout for the entire consumption process
try:
# In Python 3.10, asyncio.timeout() is not available, so we use wait_for instead
async def process_events():
nonlocal count, lines, first_event_seen, end_event_seen
async for line in response.aiter_lines():
# Skip empty lines (ndjson uses double newlines)
if not line:
continue
if count == 0:
# First event should be vertices_sorted
assert parsed["event"] == "vertices_sorted", (
"Invalid first event. Expected 'vertices_sorted'. Full event stream:\n" + "\n".join(lines)
)
ids = parsed["data"]["ids"]
ids.sort()
assert ids == ["ChatInput-CIGht"], "Invalid ids in first event. Full event stream:\n" + "\n".join(lines)
lines.append(line)
try:
parsed = json.loads(line)
except json.JSONDecodeError:
logger.debug(f"ERROR: Failed to parse JSON: {line}")
raise
to_run = parsed["data"]["to_run"]
to_run.sort()
assert to_run == ["ChatInput-CIGht", "ChatOutput-QA7ej", "Memory-amN4Z", "Prompt-iWbCC"], (
"Invalid to_run list in first event. Full event stream:\n" + "\n".join(lines)
)
elif count > 0 and count < 5:
# Next events should be end_vertex events
assert parsed["event"] == "end_vertex", (
f"Invalid event at position {count}. Expected 'end_vertex'. Full event stream:\n" + "\n".join(lines)
)
assert parsed["data"]["build_data"] is not None, (
f"Missing build_data at position {count}. Full event stream:\n" + "\n".join(lines)
)
elif count == 5:
# Final event should be end
assert parsed["event"] == "end", "Invalid final event. Expected 'end'. Full event stream:\n" + "\n".join(
lines
)
else:
raise ValueError(f"Unexpected event at position {count}. Full event stream:\n" + "\n".join(lines))
count += 1
if "job_id" in parsed:
assert parsed["job_id"] == job_id
continue
# First event should be vertices_sorted
if not first_event_seen:
assert parsed["event"] == "vertices_sorted", (
"Invalid first event. Expected 'vertices_sorted'. Full event stream:\n" + "\n".join(lines)
)
ids = parsed["data"]["ids"]
ids.sort()
assert ids == ["ChatInput-CIGht"], "Invalid ids in first event. Full event stream:\n" + "\n".join(
lines
)
to_run = parsed["data"]["to_run"]
to_run.sort()
assert to_run == ["ChatInput-CIGht", "ChatOutput-QA7ej", "Memory-amN4Z", "Prompt-iWbCC"], (
"Invalid to_run list in first event. Full event stream:\n" + "\n".join(lines)
)
first_event_seen = True
# Last event should be end
elif parsed["event"] == "end":
end_event_seen = True
# Middle events should be end_vertex
elif parsed["event"] == "end_vertex":
assert parsed["data"]["build_data"] is not None, (
f"Missing build_data at position {count}. Full event stream:\n" + "\n".join(lines)
)
# Other event types (like token or add_message) are allowed and ignored
else:
# Allow other event types to pass through without failing
pass
count += 1
# Debug output for verbose mode to track progress
if count % 10 == 0:
logger.debug(f"Processed {count} events so far")
await asyncio.wait_for(process_events(), timeout=timeout)
except asyncio.TimeoutError as e:
# If we timed out, logger.debug what we have so far and fail the test
events_summary = "\n".join(
f"{i}: {line[:80]}..." if len(line) > 80 else f"{i}: {line}" for i, line in enumerate(lines)
)
logger.debug(
f"ERROR: Test timed out after {timeout}s. Processed {count} events.\nEvents received:\n{events_summary}"
)
if first_event_seen and not end_event_seen:
msg = f"Test timed out after {timeout}s waiting for 'end' event"
raise TimeoutError(msg) from e
if not first_event_seen:
msg = f"Test timed out after {timeout}s waiting for 'vertices_sorted' event"
raise TimeoutError(msg) from e
msg = f"Test timed out after {timeout}s"
raise TimeoutError(msg) from e
# Verify we saw both the first and end events
assert first_event_seen, "Missing vertices_sorted event. Full event stream:\n" + "\n".join(lines)
assert end_event_seen, "Missing end event. Full event stream:\n" + "\n".join(lines)
# logger.debug summary of events processed
logger.debug(f"Successfully processed {count} events for job {job_id}")
return count

View file

@ -1,4 +1,5 @@
import asyncio
import json
import uuid
from uuid import UUID
@ -6,6 +7,7 @@ import pytest
from httpx import codes
from langflow.memory import aget_messages
from langflow.services.database.models.flow import FlowUpdate
from loguru import logger
from tests.unit.build_utils import build_flow, consume_and_assert_stream, create_flow, get_build_events
@ -160,33 +162,87 @@ async def test_build_flow_polling(client, json_memory_chatbot_no_llm, logged_in_
self.job_id = job_id
self.headers = headers
self.status_code = codes.OK
self.max_total_events = 50 # Limit to prevent infinite loops
self.max_empty_polls = 10 # Maximum number of empty polls before giving up
self.poll_timeout = 1.0 # Timeout for each polling request
async def aiter_lines(self):
try:
sleeps = 0
max_sleeps = 100
while True:
response = await self.client.get(
f"api/v1/build/{self.job_id}/events?event_delivery=polling", headers=self.headers
)
assert response.status_code == codes.OK
data = response.json()
empty_polls = 0
total_events = 0
end_event_found = False
if data["event"] is None:
# No event available, add delay to prevent tight polling
while (
empty_polls < self.max_empty_polls and total_events < self.max_total_events and not end_event_found
):
# Add Accept header for NDJSON
headers = {**self.headers, "Accept": "application/x-ndjson"}
# Set a timeout for the request
response = await asyncio.wait_for(
self.client.get(
f"api/v1/build/{self.job_id}/events?event_delivery=polling",
headers=headers,
),
timeout=self.poll_timeout,
)
assert response.status_code == codes.OK
# Get the NDJSON response as text
text = response.text
# Skip if response is empty
if not text.strip():
empty_polls += 1
await asyncio.sleep(0.1)
sleeps += 1
continue
yield data["event"]
# Reset empty polls counter since we got data
empty_polls = 0
# Process each line as an individual JSON object
line_count = 0
for line in text.splitlines():
if not line.strip():
continue
line_count += 1
total_events += 1
# Check for end event with multiple possible formats
if '"event":"end"' in line or '"event": "end"' in line:
end_event_found = True
# Validate it's proper JSON before yielding
try:
json.loads(line) # Test parse to ensure it's valid JSON
yield line
except json.JSONDecodeError as e:
logger.debug(f"WARNING: Skipping invalid JSON: {line}")
logger.debug(f"Error: {e}")
# Don't yield invalid JSON, but continue processing other lines
# If we had no events in this batch, count as empty poll
if line_count == 0:
empty_polls += 1
# Add a small delay to prevent tight polling
await asyncio.sleep(0.1)
# If we hit the limit without finding the end event, log a warning
if total_events >= self.max_total_events:
logger.debug(
f"WARNING: Reached maximum event limit ({self.max_total_events}) without finding end event"
)
if empty_polls >= self.max_empty_polls and not end_event_found:
logger.debug(
f"WARNING: Reached maximum empty polls ({self.max_empty_polls}) without finding end event"
)
# If this was the end event, stop polling
if '"end"' in data["event"]:
break
if sleeps > max_sleeps:
msg = "Build event polling timed out."
raise TimeoutError(msg)
except asyncio.TimeoutError as e:
logger.debug(f"ERROR: Polling request timed out after {self.poll_timeout}s")
msg = "Build event polling timed out."
raise TimeoutError(msg) from e

View file

@ -1024,7 +1024,7 @@ export const POLLING_MESSAGES = {
STREAMING_NOT_SUPPORTED: "Streaming not supported",
} as const;
export const POLLING_INTERVAL = 100;
export const BUILD_POLLING_INTERVAL = 25;
export const IS_AUTO_LOGIN =
!process?.env?.LANGFLOW_AUTO_LOGIN ||

View file

@ -1,7 +1,7 @@
import { MISSED_ERROR_ALERT } from "@/constants/alerts_constants";
import {
BASE_URL_API,
POLLING_INTERVAL,
BUILD_POLLING_INTERVAL,
POLLING_MESSAGES,
} from "@/constants/constants";
import { performStreamingRequest } from "@/controllers/API/api";
@ -184,6 +184,7 @@ async function pollBuildEvents(
method: "GET",
headers: {
"Content-Type": "application/json",
Accept: "application/x-ndjson",
},
signal: abortController.signal, // Add abort signal to fetch
},
@ -197,34 +198,51 @@ async function pollBuildEvents(
);
}
const data = await response.json();
if (!data.event) {
// No event in this request, try again
// Get the response text - will be NDJSON format (one JSON per line)
const responseText = await response.text();
// Skip if empty response
if (!responseText.trim()) {
await new Promise((resolve) => setTimeout(resolve, 100));
continue;
}
// Process the event
const event = JSON.parse(data.event);
const result = await onEvent(
event.event,
event.data,
buildResults,
verticesStartTimeMs,
callbacks,
);
if (!result) {
isDone = true;
abortController.abort();
// Split by newlines to get individual JSON objects
const eventLines = responseText.split("\n").filter((line) => line.trim());
// If no events, continue polling
if (eventLines.length === 0) {
await new Promise((resolve) => setTimeout(resolve, 100));
continue;
}
// Check if this was the end event or if we got a null value
if (event.event === "end" || data.event === null) {
isDone = true;
// Process all events in the NDJSON response
for (const eventStr of eventLines) {
// Process the event
const event = JSON.parse(eventStr);
const result = await onEvent(
event.event,
event.data,
buildResults,
verticesStartTimeMs,
callbacks,
);
if (!result) {
isDone = true;
abortController.abort();
break;
}
// Check if this was the end event
if (event.event === "end") {
isDone = true;
break;
}
}
// Add a small delay between polls to avoid overwhelming the server
await new Promise((resolve) => setTimeout(resolve, POLLING_INTERVAL));
// Add a small delay between polls
await new Promise((resolve) => setTimeout(resolve, BUILD_POLLING_INTERVAL));
}
}