diff --git a/src/backend/base/langflow/api/v1/chat.py b/src/backend/base/langflow/api/v1/chat.py index 7f21116e2..c90e5bd9a 100644 --- a/src/backend/base/langflow/api/v1/chat.py +++ b/src/backend/base/langflow/api/v1/chat.py @@ -55,10 +55,12 @@ from langflow.services.deps import ( get_chat_service, get_queue_service, get_session, + get_settings_service, get_telemetry_service, session_scope, ) from langflow.services.job_queue.service import JobQueueNotFoundError, JobQueueService +from langflow.services.settings.service import SettingsService from langflow.services.telemetry.schema import ComponentPayload, PlaygroundPayload if TYPE_CHECKING: @@ -152,6 +154,7 @@ async def build_flow( current_user: CurrentActiveUser, queue_service: Annotated[JobQueueService, Depends(get_queue_service)], flow_name: str | None = None, + settings_service: Annotated[SettingsService, Depends(get_settings_service)], ): """Build and process a flow, returning a job ID for event polling. @@ -170,6 +173,7 @@ async def build_flow( current_user: The authenticated user queue_service: Queue service for job management flow_name: Optional name for the flow + settings_service: Settings service Returns: Dict with job_id that can be used to poll for build status @@ -193,7 +197,13 @@ async def build_flow( queue_service=queue_service, flow_name=flow_name, ) - return {"job_id": job_id} + if settings_service.settings.event_delivery != "direct": + return {"job_id": job_id} + return await get_flow_events_response( + job_id=job_id, + queue_service=queue_service, + stream=True, + ) @router.get("/build/{job_id}/events") diff --git a/src/backend/base/langflow/api/v1/schemas.py b/src/backend/base/langflow/api/v1/schemas.py index 69f0e1456..b2ec6e103 100644 --- a/src/backend/base/langflow/api/v1/schemas.py +++ b/src/backend/base/langflow/api/v1/schemas.py @@ -389,7 +389,7 @@ class ConfigResponse(BaseModel): webhook_polling_interval: int public_flow_cleanup_interval: int public_flow_expiration: int - event_delivery: Literal["polling", "streaming"] + event_delivery: Literal["polling", "streaming", "direct"] class CancelFlowResponse(BaseModel): diff --git a/src/backend/base/langflow/services/settings/base.py b/src/backend/base/langflow/services/settings/base.py index 77dec1817..3b9fbfb8d 100644 --- a/src/backend/base/langflow/services/settings/base.py +++ b/src/backend/base/langflow/services/settings/base.py @@ -236,12 +236,23 @@ class Settings(BaseSettings): public_flow_expiration: int = Field(default=86400, gt=600) """The time in seconds after which a public temporary flow will be considered expired and eligible for cleanup. Default is 24 hours (86400 seconds). Minimum is 600 seconds (10 minutes).""" - event_delivery: Literal["polling", "streaming"] = "polling" - """How to deliver build events to the frontend. Can be 'polling' or 'streaming'.""" + event_delivery: Literal["polling", "streaming", "direct"] = "polling" + """How to deliver build events to the frontend. Can be 'polling', 'streaming' or 'direct'.""" lazy_load_components: bool = False """If set to True, Langflow will only partially load components at startup and fully load them on demand. This significantly reduces startup time but may cause a slight delay when a component is first used.""" + @field_validator("event_delivery", mode="before") + @classmethod + def set_event_delivery(cls, value, info): + # If workers > 1, we need to use direct delivery + # because polling and streaming are not supported + # in multi-worker environments + if info.data.get("workers", 1) > 1: + logger.warning("Multi-worker environment detected, using direct event delivery") + return "direct" + return value + @field_validator("dev") @classmethod def set_dev(cls, value): diff --git a/src/frontend/src/CustomNodes/GenericNode/components/NodeStatus/index.tsx b/src/frontend/src/CustomNodes/GenericNode/components/NodeStatus/index.tsx index 59015daed..5150f9158 100644 --- a/src/frontend/src/CustomNodes/GenericNode/components/NodeStatus/index.tsx +++ b/src/frontend/src/CustomNodes/GenericNode/components/NodeStatus/index.tsx @@ -167,7 +167,11 @@ export default function NodeStatus({ function handlePlayWShortcut() { if (buildStatus === BuildStatus.BUILDING || isBuilding || !selected) return; setValidationStatus(null); - buildFlow({ stopNodeId: nodeId, stream: shouldStreamEvents() }); + buildFlow({ + stopNodeId: nodeId, + stream: shouldStreamEvents(), + eventDelivery: config.data?.event_delivery, + }); } const play = useShortcutsStore((state) => state.play); @@ -259,7 +263,11 @@ export default function NodeStatus({ return; } if (buildStatus === BuildStatus.BUILDING || isBuilding) return; - buildFlow({ stopNodeId: nodeId, stream: shouldStreamEvents() }); + buildFlow({ + stopNodeId: nodeId, + stream: shouldStreamEvents(), + eventDelivery: config.data?.event_delivery, + }); track("Flow Build - Clicked", { stopNodeId: nodeId }); }; diff --git a/src/frontend/src/constants/enums.ts b/src/frontend/src/constants/enums.ts index 915ccf4fb..72fce7804 100644 --- a/src/frontend/src/constants/enums.ts +++ b/src/frontend/src/constants/enums.ts @@ -42,4 +42,5 @@ export enum IOOutputTypes { export enum EventDeliveryType { STREAMING = "streaming", POLLING = "polling", + DIRECT = "direct", } diff --git a/src/frontend/src/modals/IOModal/new-modal.tsx b/src/frontend/src/modals/IOModal/new-modal.tsx index 8519ab4da..08e72223c 100644 --- a/src/frontend/src/modals/IOModal/new-modal.tsx +++ b/src/frontend/src/modals/IOModal/new-modal.tsx @@ -180,6 +180,7 @@ export default function IOModal({ silent: true, session: sessionId, stream: shouldStreamEvents(), + eventDelivery: config.data?.event_delivery, }).catch((err) => { console.error(err); }); diff --git a/src/frontend/src/stores/flowStore.ts b/src/frontend/src/stores/flowStore.ts index c89767699..99f5b1c3b 100644 --- a/src/frontend/src/stores/flowStore.ts +++ b/src/frontend/src/stores/flowStore.ts @@ -23,7 +23,7 @@ import { FLOW_BUILD_SUCCESS_ALERT, MISSED_ERROR_ALERT, } from "../constants/alerts_constants"; -import { BuildStatus } from "../constants/enums"; +import { BuildStatus, EventDeliveryType } from "../constants/enums"; import { LogsLogType, VertexBuildTypeAPI } from "../types/api"; import { ChatInputType, ChatOutputType } from "../types/chat"; import { @@ -599,6 +599,7 @@ const useFlowStore = create((set, get) => ({ silent, session, stream = true, + eventDelivery = EventDeliveryType.STREAMING, }: { startNodeId?: string; stopNodeId?: string; @@ -607,6 +608,7 @@ const useFlowStore = create((set, get) => ({ silent?: boolean; session?: string; stream?: boolean; + eventDelivery?: EventDeliveryType; }) => { const playgroundPage = get().playgroundPage; get().setIsBuilding(true); @@ -834,6 +836,7 @@ const useFlowStore = create((set, get) => ({ logBuilds: get().onFlowPage, playgroundPage, stream, + eventDelivery, }); get().setIsBuilding(false); get().revertBuiltStatusFromBuilding(); diff --git a/src/frontend/src/types/zustand/flow/index.ts b/src/frontend/src/types/zustand/flow/index.ts index 9f34e2e99..8eba24dee 100644 --- a/src/frontend/src/types/zustand/flow/index.ts +++ b/src/frontend/src/types/zustand/flow/index.ts @@ -7,7 +7,7 @@ import { ReactFlowInstance, Viewport, } from "@xyflow/react"; -import { BuildStatus } from "../../../constants/enums"; +import { BuildStatus, EventDeliveryType } from "../../../constants/enums"; import { VertexBuildTypeAPI } from "../../api"; import { ChatInputType, ChatOutputType } from "../../chat"; import { FlowState } from "../../tabs"; @@ -150,6 +150,7 @@ export type FlowStoreType = { silent, session, stream, + eventDelivery, }: { startNodeId?: string; stopNodeId?: string; @@ -158,6 +159,7 @@ export type FlowStoreType = { silent?: boolean; session?: string; stream?: boolean; + eventDelivery?: EventDeliveryType; }) => Promise; getFlow: () => { nodes: Node[]; edges: EdgeType[]; viewport: Viewport }; updateVerticesBuild: ( diff --git a/src/frontend/src/utils/buildUtils.ts b/src/frontend/src/utils/buildUtils.ts index 1a101adc4..a71139e4c 100644 --- a/src/frontend/src/utils/buildUtils.ts +++ b/src/frontend/src/utils/buildUtils.ts @@ -9,7 +9,7 @@ import { useMessagesStore } from "@/stores/messagesStore"; import { Edge, Node } from "@xyflow/react"; import { AxiosError } from "axios"; import { flushSync } from "react-dom"; -import { BuildStatus } from "../constants/enums"; +import { BuildStatus, EventDeliveryType } from "../constants/enums"; import { getVerticesOrder, postBuildVertex } from "../controllers/API"; import useAlertStore from "../stores/alertStore"; import useFlowStore from "../stores/flowStore"; @@ -41,6 +41,7 @@ type BuildVerticesParams = { session?: string; playgroundPage?: boolean; stream?: boolean; + eventDelivery?: EventDeliveryType; }; function getInactiveVertexData(vertexId: string): VertexBuildTypeAPI { @@ -139,7 +140,7 @@ export async function buildFlowVerticesWithFallback( ) { logFlowLoad("Starting flow load"); try { - // Use shouldUsePolling() to determine stream mode + // Use the event_delivery parameter directly return await buildFlowVertices({ ...params }); } catch (e: any) { if ( @@ -241,6 +242,7 @@ export async function buildFlowVertices({ session, playgroundPage, stream = true, + eventDelivery, }: BuildVerticesParams) { const inputs = {}; @@ -258,6 +260,11 @@ export async function buildFlowVertices({ queryParams.append("log_builds", logBuilds.toString()); } + // Add stream parameter when using direct event delivery + if (eventDelivery === EventDeliveryType.DIRECT) { + queryParams.append("stream", "true"); + } + if (queryParams.toString()) { buildUrl = `${buildUrl}?${queryParams.toString()}`; } @@ -283,6 +290,57 @@ export async function buildFlowVertices({ } try { + // If event_delivery is direct, we'll stream from the build endpoint directly + if (eventDelivery === EventDeliveryType.DIRECT) { + const buildController = new AbortController(); + buildController.signal.addEventListener("abort", () => { + onBuildStopped && onBuildStopped(); + }); + useFlowStore.getState().setBuildController(buildController); + + const buildResults: Array = []; + const verticesStartTimeMs: Map = new Map(); + + return performStreamingRequest({ + method: "POST", + url: buildUrl, + body: postData, + onData: async (event) => { + const type = event["event"]; + const data = event["data"]; + return await onEvent(type, data, buildResults, verticesStartTimeMs, { + onBuildStart, + onBuildUpdate, + onBuildComplete, + onBuildError, + onGetOrderSuccess, + onValidateNodes, + }); + }, + onError: (statusCode) => { + if (statusCode === 404) { + throw new Error("Flow not found"); + } + throw new Error("Error processing build events"); + }, + onNetworkError: (error: Error) => { + if (error.name === "AbortError") { + onBuildStopped && onBuildStopped(); + return; + } + onBuildError!("Error Building Component", [ + "Network error. Please check the connection to the server.", + ]); + }, + buildController, + }); + } + } catch (e) { + console.log(e); + } + + try { + // Otherwise, use the existing two-step process (job_id + events endpoint) // First, start the build and get the job ID const buildResponse = await fetch(buildUrl, { method: "POST",