diff --git a/src/backend/base/langflow/api/build.py b/src/backend/base/langflow/api/build.py index dde85cdc8..87d2b004d 100644 --- a/src/backend/base/langflow/api/build.py +++ b/src/backend/base/langflow/api/build.py @@ -13,6 +13,7 @@ from sqlmodel import select from langflow.api.disconnect import DisconnectHandlerStreamingResponse from langflow.api.utils import ( CurrentActiveUser, + EventDeliveryType, build_graph_from_data, build_graph_from_db, format_elapsed_time, @@ -84,12 +85,12 @@ async def get_flow_events_response( *, job_id: str, queue_service: JobQueueService, - stream: bool = True, + event_delivery: EventDeliveryType, ): """Get events for a specific build job, either as a stream or single event.""" try: main_queue, event_manager, event_task, _ = queue_service.get_queue_data(job_id) - if stream: + if event_delivery in (EventDeliveryType.STREAMING, EventDeliveryType.DIRECT): if event_task is None: logger.error(f"No event task found for job {job_id}") raise HTTPException(status_code=404, detail="No event task found for job") diff --git a/src/backend/base/langflow/api/utils.py b/src/backend/base/langflow/api/utils.py index 6a8258f0a..652b0e27e 100644 --- a/src/backend/base/langflow/api/utils.py +++ b/src/backend/base/langflow/api/utils.py @@ -2,6 +2,7 @@ from __future__ import annotations import uuid from datetime import timedelta +from enum import Enum from typing import TYPE_CHECKING, Annotated, Any from fastapi import Depends, HTTPException, Query @@ -34,6 +35,12 @@ CurrentActiveUser = Annotated[User, Depends(get_current_active_user)] DbSession = Annotated[AsyncSession, Depends(get_session)] +class EventDeliveryType(str, Enum): + STREAMING = "streaming" + DIRECT = "direct" + POLLING = "polling" + + def has_api_terms(word: str): return "api" in word and ("key" in word or ("token" in word and "tokens" not in word)) diff --git a/src/backend/base/langflow/api/v1/chat.py b/src/backend/base/langflow/api/v1/chat.py index c90e5bd9a..d58a3fab9 100644 --- a/src/backend/base/langflow/api/v1/chat.py +++ b/src/backend/base/langflow/api/v1/chat.py @@ -27,6 +27,7 @@ from langflow.api.limited_background_tasks import LimitVertexBuildBackgroundTask from langflow.api.utils import ( CurrentActiveUser, DbSession, + EventDeliveryType, build_and_cache_graph_from_data, build_graph_from_db, format_elapsed_time, @@ -55,12 +56,10 @@ from langflow.services.deps import ( get_chat_service, get_queue_service, get_session, - get_settings_service, get_telemetry_service, session_scope, ) from langflow.services.job_queue.service import JobQueueNotFoundError, JobQueueService -from langflow.services.settings.service import SettingsService from langflow.services.telemetry.schema import ComponentPayload, PlaygroundPayload if TYPE_CHECKING: @@ -154,7 +153,7 @@ async def build_flow( current_user: CurrentActiveUser, queue_service: Annotated[JobQueueService, Depends(get_queue_service)], flow_name: str | None = None, - settings_service: Annotated[SettingsService, Depends(get_settings_service)], + event_delivery: EventDeliveryType = EventDeliveryType.POLLING, ): """Build and process a flow, returning a job ID for event polling. @@ -174,6 +173,7 @@ async def build_flow( queue_service: Queue service for job management flow_name: Optional name for the flow settings_service: Settings service + event_delivery: Optional event delivery type - default is streaming Returns: Dict with job_id that can be used to poll for build status @@ -197,12 +197,14 @@ async def build_flow( queue_service=queue_service, flow_name=flow_name, ) - if settings_service.settings.event_delivery != "direct": + + # This is required to support FE tests - we need to be able to set the event delivery to direct + if event_delivery == EventDeliveryType.DIRECT: return {"job_id": job_id} return await get_flow_events_response( job_id=job_id, queue_service=queue_service, - stream=True, + event_delivery=event_delivery, ) @@ -211,13 +213,13 @@ async def get_build_events( job_id: str, queue_service: Annotated[JobQueueService, Depends(get_queue_service)], *, - stream: bool = True, + event_delivery: EventDeliveryType = EventDeliveryType.STREAMING, ): """Get events for a specific build job.""" return await get_flow_events_response( job_id=job_id, queue_service=queue_service, - stream=stream, + event_delivery=event_delivery, ) diff --git a/src/frontend/src/CustomNodes/GenericNode/components/NodeStatus/index.tsx b/src/frontend/src/CustomNodes/GenericNode/components/NodeStatus/index.tsx index 5150f9158..cd2b1786f 100644 --- a/src/frontend/src/CustomNodes/GenericNode/components/NodeStatus/index.tsx +++ b/src/frontend/src/CustomNodes/GenericNode/components/NodeStatus/index.tsx @@ -87,7 +87,7 @@ export default function NodeStatus({ const isBuilding = useFlowStore((state) => state.isBuilding); const setNode = useFlowStore((state) => state.setNode); const version = useDarkStore((state) => state.version); - const config = useGetConfig(); + const eventDeliveryConfig = useUtilityStore((state) => state.eventDelivery); const setErrorData = useAlertStore((state) => state.setErrorData); const postTemplateValue = usePostTemplateValue({ @@ -96,10 +96,6 @@ export default function NodeStatus({ node: data.node, }); - const shouldStreamEvents = () => { - return config.data?.event_delivery === EventDeliveryType.STREAMING; - }; - // Start polling when connection is initiated const startPolling = () => { window.open(connectionLink, "_blank"); @@ -169,8 +165,7 @@ export default function NodeStatus({ setValidationStatus(null); buildFlow({ stopNodeId: nodeId, - stream: shouldStreamEvents(), - eventDelivery: config.data?.event_delivery, + eventDelivery: eventDeliveryConfig, }); } @@ -265,8 +260,7 @@ export default function NodeStatus({ if (buildStatus === BuildStatus.BUILDING || isBuilding) return; buildFlow({ stopNodeId: nodeId, - stream: shouldStreamEvents(), - eventDelivery: config.data?.event_delivery, + eventDelivery: eventDeliveryConfig, }); track("Flow Build - Clicked", { stopNodeId: nodeId }); }; diff --git a/src/frontend/src/controllers/API/api.tsx b/src/frontend/src/controllers/API/api.tsx index 596424345..e013a739c 100644 --- a/src/frontend/src/controllers/API/api.tsx +++ b/src/frontend/src/controllers/API/api.tsx @@ -6,7 +6,7 @@ import axios, { AxiosError, AxiosInstance, AxiosRequestConfig } from "axios"; import * as fetchIntercept from "fetch-intercept"; import { useEffect } from "react"; import { Cookies } from "react-cookie"; -import { BuildStatus } from "../../constants/enums"; +import { BuildStatus, EventDeliveryType } from "../../constants/enums"; import useAlertStore from "../../stores/alertStore"; import useFlowStore from "../../stores/flowStore"; import { checkDuplicateRequestAndStoreRequest } from "./helpers/check-duplicate-requests"; @@ -263,6 +263,7 @@ export type StreamingRequestParams = { onError?: (statusCode: number) => void; onNetworkError?: (error: Error) => void; buildController: AbortController; + eventDeliveryConfig?: EventDeliveryType; }; async function performStreamingRequest({ diff --git a/src/frontend/src/controllers/API/queries/config/use-get-config.ts b/src/frontend/src/controllers/API/queries/config/use-get-config.ts index f6a911921..4a21634b8 100644 --- a/src/frontend/src/controllers/API/queries/config/use-get-config.ts +++ b/src/frontend/src/controllers/API/queries/config/use-get-config.ts @@ -39,7 +39,7 @@ export const useGetConfig: useQueryFunctionType = ( const setWebhookPollingInterval = useUtilityStore( (state) => state.setWebhookPollingInterval, ); - + const setEventDelivery = useUtilityStore((state) => state.setEventDelivery); const { query } = UseRequestProcessor(); const getConfigFn = async () => { @@ -59,6 +59,7 @@ export const useGetConfig: useQueryFunctionType = ( setWebhookPollingInterval( data.webhook_polling_interval ?? DEFAULT_POLLING_INTERVAL, ); + setEventDelivery(data.event_delivery ?? EventDeliveryType.POLLING); } return data; }; diff --git a/src/frontend/src/modals/IOModal/new-modal.tsx b/src/frontend/src/modals/IOModal/new-modal.tsx index 462a2c470..0608e42fa 100644 --- a/src/frontend/src/modals/IOModal/new-modal.tsx +++ b/src/frontend/src/modals/IOModal/new-modal.tsx @@ -155,11 +155,7 @@ export default function IOModal({ const chatValue = useUtilityStore((state) => state.chatValueStore); const setChatValue = useUtilityStore((state) => state.setChatValueStore); - const config = useGetConfig(); - - function shouldStreamEvents() { - return config.data?.event_delivery === EventDeliveryType.STREAMING; - } + const eventDeliveryConfig = useUtilityStore((state) => state.eventDelivery); const sendMessage = useCallback( async ({ @@ -178,8 +174,7 @@ export default function IOModal({ files: files, silent: true, session: sessionId, - stream: shouldStreamEvents(), - eventDelivery: config.data?.event_delivery, + eventDelivery: eventDeliveryConfig, }).catch((err) => { console.error(err); }); diff --git a/src/frontend/src/stores/flowStore.ts b/src/frontend/src/stores/flowStore.ts index 99f5b1c3b..b4be016b0 100644 --- a/src/frontend/src/stores/flowStore.ts +++ b/src/frontend/src/stores/flowStore.ts @@ -835,7 +835,6 @@ const useFlowStore = create((set, get) => ({ edges: get().edges || undefined, logBuilds: get().onFlowPage, playgroundPage, - stream, eventDelivery, }); get().setIsBuilding(false); diff --git a/src/frontend/src/stores/utilityStore.ts b/src/frontend/src/stores/utilityStore.ts index 71c434e86..43a0ee8d0 100644 --- a/src/frontend/src/stores/utilityStore.ts +++ b/src/frontend/src/stores/utilityStore.ts @@ -1,3 +1,4 @@ +import { EventDeliveryType } from "@/constants/enums"; import { Pagination, Tag } from "@/types/utils/types"; import { UtilityStoreType } from "@/types/zustand/utility"; import { create } from "zustand"; @@ -43,4 +44,7 @@ export const useUtilityStore = create((set, get) => ({ currentSessionId: "", setCurrentSessionId: (sessionId: string) => set({ currentSessionId: sessionId }), + eventDelivery: EventDeliveryType.POLLING, + setEventDelivery: (eventDelivery: EventDeliveryType) => + set({ eventDelivery }), })); diff --git a/src/frontend/src/types/zustand/flow/index.ts b/src/frontend/src/types/zustand/flow/index.ts index 8eba24dee..5c757fc38 100644 --- a/src/frontend/src/types/zustand/flow/index.ts +++ b/src/frontend/src/types/zustand/flow/index.ts @@ -149,7 +149,6 @@ export type FlowStoreType = { files, silent, session, - stream, eventDelivery, }: { startNodeId?: string; @@ -158,7 +157,6 @@ export type FlowStoreType = { files?: string[]; silent?: boolean; session?: string; - stream?: boolean; eventDelivery?: EventDeliveryType; }) => Promise; getFlow: () => { nodes: Node[]; edges: EdgeType[]; viewport: Viewport }; diff --git a/src/frontend/src/types/zustand/utility/index.ts b/src/frontend/src/types/zustand/utility/index.ts index 3ab489364..4244dbbfe 100644 --- a/src/frontend/src/types/zustand/utility/index.ts +++ b/src/frontend/src/types/zustand/utility/index.ts @@ -1,3 +1,4 @@ +import { EventDeliveryType } from "@/constants/enums"; import { Pagination, Tag } from "@/types/utils/types"; export type UtilityStoreType = { @@ -25,4 +26,6 @@ export type UtilityStoreType = { setCurrentSessionId: (sessionId: string) => void; setClientId: (clientId: string) => void; clientId: string; + eventDelivery: EventDeliveryType; + setEventDelivery: (eventDelivery: EventDeliveryType) => void; }; diff --git a/src/frontend/src/utils/buildUtils.ts b/src/frontend/src/utils/buildUtils.ts index a71139e4c..4dc5fcc95 100644 --- a/src/frontend/src/utils/buildUtils.ts +++ b/src/frontend/src/utils/buildUtils.ts @@ -40,8 +40,7 @@ type BuildVerticesParams = { logBuilds?: boolean; session?: string; playgroundPage?: boolean; - stream?: boolean; - eventDelivery?: EventDeliveryType; + eventDelivery: EventDeliveryType; }; function getInactiveVertexData(vertexId: string): VertexBuildTypeAPI { @@ -148,7 +147,10 @@ export async function buildFlowVerticesWithFallback( e.message === POLLING_MESSAGES.STREAMING_NOT_SUPPORTED ) { // Fallback to polling - return await buildFlowVertices({ ...params, stream: false }); + return await buildFlowVertices({ + ...params, + eventDelivery: EventDeliveryType.POLLING, + }); } throw e; } @@ -176,13 +178,16 @@ async function pollBuildEvents( ): Promise { let isDone = false; while (!isDone) { - const response = await fetch(`${url}?stream=false`, { - method: "GET", - headers: { - "Content-Type": "application/json", + const response = await fetch( + `${url}?event_delivery=${EventDeliveryType.POLLING}`, + { + method: "GET", + headers: { + "Content-Type": "application/json", + }, + signal: abortController.signal, // Add abort signal to fetch }, - signal: abortController.signal, // Add abort signal to fetch - }); + ); if (!response.ok) { const errorData = await response.json().catch(() => ({})); @@ -241,7 +246,6 @@ export async function buildFlowVertices({ logBuilds, session, playgroundPage, - stream = true, eventDelivery, }: BuildVerticesParams) { const inputs = {}; @@ -260,10 +264,10 @@ export async function buildFlowVertices({ queryParams.append("log_builds", logBuilds.toString()); } - // Add stream parameter when using direct event delivery - if (eventDelivery === EventDeliveryType.DIRECT) { - queryParams.append("stream", "true"); - } + queryParams.append( + "event_delivery", + eventDelivery ?? EventDeliveryType.POLLING, + ); if (queryParams.toString()) { buildUrl = `${buildUrl}?${queryParams.toString()}`; @@ -376,13 +380,12 @@ export async function buildFlowVertices({ } }); useFlowStore.getState().setBuildController(buildController); - // Then stream the events const eventsUrl = `${BASE_URL_API}build/${job_id}/events`; const buildResults: Array = []; const verticesStartTimeMs: Map = new Map(); - if (stream) { + if (eventDelivery === EventDeliveryType.STREAMING) { return performStreamingRequest({ method: "GET", url: eventsUrl,