fix: add direct event delivery option but keep polling as default (#7435)

* feat: add new event delivery method "direct"

- Updated event_delivery options in ConfigResponse and Settings to include "direct".
- Modified build_flow function to utilize settings_service for conditional event handling.
- Improved flow processing by returning job_id or event responses based on delivery method.

* feat: implement direct event delivery method on the frontend

- Updated buildFlow function to include eventDelivery parameter, allowing for "direct" event delivery mode.
- Modified NodeStatus and IOModal components to utilize the new eventDelivery setting.
- Expanded flowStore and buildUtils to support event delivery options, improving flexibility in event handling.
- Added "direct" option to event delivery types in enums for better configurability.
- Updated tests to cover new event delivery modes.

* fix: change default event delivery method to "polling"

- Updated the default value of event_delivery in Settings from "direct" to "polling" to align with intended behavior.
- Ensured documentation reflects the change in default settings for clarity on event delivery options.

* Update src/frontend/src/controllers/API/api.tsx

* Update src/frontend/src/utils/buildUtils.ts

* Update src/frontend/src/utils/buildUtils.ts

* Update src/frontend/src/utils/buildUtils.ts

* feat: add event delivery validation for multi-worker environments

- Introduced a new field validator for the `event_delivery` setting to enforce "direct" delivery when the number of workers exceeds one.
- This change ensures compatibility with multi-worker setups, improving the robustness of event handling in the application.

* feat: add warning for multi-worker event delivery

- Added a warning log in the Settings class to notify users when a multi-worker environment is detected, indicating that "direct" event delivery will be used. This enhances visibility and understanding of event handling behavior in such setups.

* refactor: remove "direct" from event delivery modes in tests

- Updated the event delivery modes in the withEventDeliveryModes utility to exclude "direct", aligning with recent changes in event handling behavior.
- This change simplifies the testing setup and ensures consistency with the current application logic.

---------

Co-authored-by: Cristhian Zanforlin Lousa <cristhian.lousa@gmail.com>
This commit is contained in:
Gabriel Luiz Freitas Almeida 2025-04-03 18:05:39 -03:00 committed by GitHub
commit d9ab4458ad
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 104 additions and 10 deletions

View file

@ -55,10 +55,12 @@ from langflow.services.deps import (
get_chat_service,
get_queue_service,
get_session,
get_settings_service,
get_telemetry_service,
session_scope,
)
from langflow.services.job_queue.service import JobQueueNotFoundError, JobQueueService
from langflow.services.settings.service import SettingsService
from langflow.services.telemetry.schema import ComponentPayload, PlaygroundPayload
if TYPE_CHECKING:
@ -152,6 +154,7 @@ async def build_flow(
current_user: CurrentActiveUser,
queue_service: Annotated[JobQueueService, Depends(get_queue_service)],
flow_name: str | None = None,
settings_service: Annotated[SettingsService, Depends(get_settings_service)],
):
"""Build and process a flow, returning a job ID for event polling.
@ -170,6 +173,7 @@ async def build_flow(
current_user: The authenticated user
queue_service: Queue service for job management
flow_name: Optional name for the flow
settings_service: Settings service
Returns:
Dict with job_id that can be used to poll for build status
@ -193,7 +197,13 @@ async def build_flow(
queue_service=queue_service,
flow_name=flow_name,
)
return {"job_id": job_id}
if settings_service.settings.event_delivery != "direct":
return {"job_id": job_id}
return await get_flow_events_response(
job_id=job_id,
queue_service=queue_service,
stream=True,
)
@router.get("/build/{job_id}/events")

View file

@ -389,7 +389,7 @@ class ConfigResponse(BaseModel):
webhook_polling_interval: int
public_flow_cleanup_interval: int
public_flow_expiration: int
event_delivery: Literal["polling", "streaming"]
event_delivery: Literal["polling", "streaming", "direct"]
class CancelFlowResponse(BaseModel):

View file

@ -236,12 +236,23 @@ class Settings(BaseSettings):
public_flow_expiration: int = Field(default=86400, gt=600)
"""The time in seconds after which a public temporary flow will be considered expired and eligible for cleanup.
Default is 24 hours (86400 seconds). Minimum is 600 seconds (10 minutes)."""
event_delivery: Literal["polling", "streaming"] = "polling"
"""How to deliver build events to the frontend. Can be 'polling' or 'streaming'."""
event_delivery: Literal["polling", "streaming", "direct"] = "polling"
"""How to deliver build events to the frontend. Can be 'polling', 'streaming' or 'direct'."""
lazy_load_components: bool = False
"""If set to True, Langflow will only partially load components at startup and fully load them on demand.
This significantly reduces startup time but may cause a slight delay when a component is first used."""
@field_validator("event_delivery", mode="before")
@classmethod
def set_event_delivery(cls, value, info):
# If workers > 1, we need to use direct delivery
# because polling and streaming are not supported
# in multi-worker environments
if info.data.get("workers", 1) > 1:
logger.warning("Multi-worker environment detected, using direct event delivery")
return "direct"
return value
@field_validator("dev")
@classmethod
def set_dev(cls, value):

View file

@ -167,7 +167,11 @@ export default function NodeStatus({
function handlePlayWShortcut() {
if (buildStatus === BuildStatus.BUILDING || isBuilding || !selected) return;
setValidationStatus(null);
buildFlow({ stopNodeId: nodeId, stream: shouldStreamEvents() });
buildFlow({
stopNodeId: nodeId,
stream: shouldStreamEvents(),
eventDelivery: config.data?.event_delivery,
});
}
const play = useShortcutsStore((state) => state.play);
@ -259,7 +263,11 @@ export default function NodeStatus({
return;
}
if (buildStatus === BuildStatus.BUILDING || isBuilding) return;
buildFlow({ stopNodeId: nodeId, stream: shouldStreamEvents() });
buildFlow({
stopNodeId: nodeId,
stream: shouldStreamEvents(),
eventDelivery: config.data?.event_delivery,
});
track("Flow Build - Clicked", { stopNodeId: nodeId });
};

View file

@ -42,4 +42,5 @@ export enum IOOutputTypes {
export enum EventDeliveryType {
STREAMING = "streaming",
POLLING = "polling",
DIRECT = "direct",
}

View file

@ -180,6 +180,7 @@ export default function IOModal({
silent: true,
session: sessionId,
stream: shouldStreamEvents(),
eventDelivery: config.data?.event_delivery,
}).catch((err) => {
console.error(err);
});

View file

@ -23,7 +23,7 @@ import {
FLOW_BUILD_SUCCESS_ALERT,
MISSED_ERROR_ALERT,
} from "../constants/alerts_constants";
import { BuildStatus } from "../constants/enums";
import { BuildStatus, EventDeliveryType } from "../constants/enums";
import { LogsLogType, VertexBuildTypeAPI } from "../types/api";
import { ChatInputType, ChatOutputType } from "../types/chat";
import {
@ -599,6 +599,7 @@ const useFlowStore = create<FlowStoreType>((set, get) => ({
silent,
session,
stream = true,
eventDelivery = EventDeliveryType.STREAMING,
}: {
startNodeId?: string;
stopNodeId?: string;
@ -607,6 +608,7 @@ const useFlowStore = create<FlowStoreType>((set, get) => ({
silent?: boolean;
session?: string;
stream?: boolean;
eventDelivery?: EventDeliveryType;
}) => {
const playgroundPage = get().playgroundPage;
get().setIsBuilding(true);
@ -834,6 +836,7 @@ const useFlowStore = create<FlowStoreType>((set, get) => ({
logBuilds: get().onFlowPage,
playgroundPage,
stream,
eventDelivery,
});
get().setIsBuilding(false);
get().revertBuiltStatusFromBuilding();

View file

@ -7,7 +7,7 @@ import {
ReactFlowInstance,
Viewport,
} from "@xyflow/react";
import { BuildStatus } from "../../../constants/enums";
import { BuildStatus, EventDeliveryType } from "../../../constants/enums";
import { VertexBuildTypeAPI } from "../../api";
import { ChatInputType, ChatOutputType } from "../../chat";
import { FlowState } from "../../tabs";
@ -150,6 +150,7 @@ export type FlowStoreType = {
silent,
session,
stream,
eventDelivery,
}: {
startNodeId?: string;
stopNodeId?: string;
@ -158,6 +159,7 @@ export type FlowStoreType = {
silent?: boolean;
session?: string;
stream?: boolean;
eventDelivery?: EventDeliveryType;
}) => Promise<void>;
getFlow: () => { nodes: Node[]; edges: EdgeType[]; viewport: Viewport };
updateVerticesBuild: (

View file

@ -9,7 +9,7 @@ import { useMessagesStore } from "@/stores/messagesStore";
import { Edge, Node } from "@xyflow/react";
import { AxiosError } from "axios";
import { flushSync } from "react-dom";
import { BuildStatus } from "../constants/enums";
import { BuildStatus, EventDeliveryType } from "../constants/enums";
import { getVerticesOrder, postBuildVertex } from "../controllers/API";
import useAlertStore from "../stores/alertStore";
import useFlowStore from "../stores/flowStore";
@ -41,6 +41,7 @@ type BuildVerticesParams = {
session?: string;
playgroundPage?: boolean;
stream?: boolean;
eventDelivery?: EventDeliveryType;
};
function getInactiveVertexData(vertexId: string): VertexBuildTypeAPI {
@ -139,7 +140,7 @@ export async function buildFlowVerticesWithFallback(
) {
logFlowLoad("Starting flow load");
try {
// Use shouldUsePolling() to determine stream mode
// Use the event_delivery parameter directly
return await buildFlowVertices({ ...params });
} catch (e: any) {
if (
@ -241,6 +242,7 @@ export async function buildFlowVertices({
session,
playgroundPage,
stream = true,
eventDelivery,
}: BuildVerticesParams) {
const inputs = {};
@ -258,6 +260,11 @@ export async function buildFlowVertices({
queryParams.append("log_builds", logBuilds.toString());
}
// Add stream parameter when using direct event delivery
if (eventDelivery === EventDeliveryType.DIRECT) {
queryParams.append("stream", "true");
}
if (queryParams.toString()) {
buildUrl = `${buildUrl}?${queryParams.toString()}`;
}
@ -283,6 +290,57 @@ export async function buildFlowVertices({
}
try {
// If event_delivery is direct, we'll stream from the build endpoint directly
if (eventDelivery === EventDeliveryType.DIRECT) {
const buildController = new AbortController();
buildController.signal.addEventListener("abort", () => {
onBuildStopped && onBuildStopped();
});
useFlowStore.getState().setBuildController(buildController);
const buildResults: Array<boolean> = [];
const verticesStartTimeMs: Map<string, number> = new Map();
return performStreamingRequest({
method: "POST",
url: buildUrl,
body: postData,
onData: async (event) => {
const type = event["event"];
const data = event["data"];
return await onEvent(type, data, buildResults, verticesStartTimeMs, {
onBuildStart,
onBuildUpdate,
onBuildComplete,
onBuildError,
onGetOrderSuccess,
onValidateNodes,
});
},
onError: (statusCode) => {
if (statusCode === 404) {
throw new Error("Flow not found");
}
throw new Error("Error processing build events");
},
onNetworkError: (error: Error) => {
if (error.name === "AbortError") {
onBuildStopped && onBuildStopped();
return;
}
onBuildError!("Error Building Component", [
"Network error. Please check the connection to the server.",
]);
},
buildController,
});
}
} catch (e) {
console.log(e);
}
try {
// Otherwise, use the existing two-step process (job_id + events endpoint)
// First, start the build and get the job ID
const buildResponse = await fetch(buildUrl, {
method: "POST",