fix: Resolve event delivery configuration header passing between frontend and backend (#7514)

Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Cristhian Zanforlin Lousa 2025-04-09 14:58:11 -03:00 committed by GitHub
commit 59a7440045
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 54 additions and 46 deletions

View file

@ -13,6 +13,7 @@ from sqlmodel import select
from langflow.api.disconnect import DisconnectHandlerStreamingResponse
from langflow.api.utils import (
CurrentActiveUser,
EventDeliveryType,
build_graph_from_data,
build_graph_from_db,
format_elapsed_time,
@ -84,12 +85,12 @@ async def get_flow_events_response(
*,
job_id: str,
queue_service: JobQueueService,
stream: bool = True,
event_delivery: EventDeliveryType,
):
"""Get events for a specific build job, either as a stream or single event."""
try:
main_queue, event_manager, event_task, _ = queue_service.get_queue_data(job_id)
if stream:
if event_delivery in (EventDeliveryType.STREAMING, EventDeliveryType.DIRECT):
if event_task is None:
logger.error(f"No event task found for job {job_id}")
raise HTTPException(status_code=404, detail="No event task found for job")

View file

@ -2,6 +2,7 @@ from __future__ import annotations
import uuid
from datetime import timedelta
from enum import Enum
from typing import TYPE_CHECKING, Annotated, Any
from fastapi import Depends, HTTPException, Query
@ -34,6 +35,12 @@ CurrentActiveUser = Annotated[User, Depends(get_current_active_user)]
DbSession = Annotated[AsyncSession, Depends(get_session)]
class EventDeliveryType(str, Enum):
STREAMING = "streaming"
DIRECT = "direct"
POLLING = "polling"
def has_api_terms(word: str):
return "api" in word and ("key" in word or ("token" in word and "tokens" not in word))

View file

@ -27,6 +27,7 @@ from langflow.api.limited_background_tasks import LimitVertexBuildBackgroundTask
from langflow.api.utils import (
CurrentActiveUser,
DbSession,
EventDeliveryType,
build_and_cache_graph_from_data,
build_graph_from_db,
format_elapsed_time,
@ -55,12 +56,10 @@ from langflow.services.deps import (
get_chat_service,
get_queue_service,
get_session,
get_settings_service,
get_telemetry_service,
session_scope,
)
from langflow.services.job_queue.service import JobQueueNotFoundError, JobQueueService
from langflow.services.settings.service import SettingsService
from langflow.services.telemetry.schema import ComponentPayload, PlaygroundPayload
if TYPE_CHECKING:
@ -154,7 +153,7 @@ async def build_flow(
current_user: CurrentActiveUser,
queue_service: Annotated[JobQueueService, Depends(get_queue_service)],
flow_name: str | None = None,
settings_service: Annotated[SettingsService, Depends(get_settings_service)],
event_delivery: EventDeliveryType = EventDeliveryType.POLLING,
):
"""Build and process a flow, returning a job ID for event polling.
@ -174,6 +173,7 @@ async def build_flow(
queue_service: Queue service for job management
flow_name: Optional name for the flow
settings_service: Settings service
event_delivery: Optional event delivery type - default is streaming
Returns:
Dict with job_id that can be used to poll for build status
@ -197,12 +197,14 @@ async def build_flow(
queue_service=queue_service,
flow_name=flow_name,
)
if settings_service.settings.event_delivery != "direct":
# This is required to support FE tests - we need to be able to set the event delivery to direct
if event_delivery == EventDeliveryType.DIRECT:
return {"job_id": job_id}
return await get_flow_events_response(
job_id=job_id,
queue_service=queue_service,
stream=True,
event_delivery=event_delivery,
)
@ -211,13 +213,13 @@ async def get_build_events(
job_id: str,
queue_service: Annotated[JobQueueService, Depends(get_queue_service)],
*,
stream: bool = True,
event_delivery: EventDeliveryType = EventDeliveryType.STREAMING,
):
"""Get events for a specific build job."""
return await get_flow_events_response(
job_id=job_id,
queue_service=queue_service,
stream=stream,
event_delivery=event_delivery,
)

View file

@ -87,7 +87,7 @@ export default function NodeStatus({
const isBuilding = useFlowStore((state) => state.isBuilding);
const setNode = useFlowStore((state) => state.setNode);
const version = useDarkStore((state) => state.version);
const config = useGetConfig();
const eventDeliveryConfig = useUtilityStore((state) => state.eventDelivery);
const setErrorData = useAlertStore((state) => state.setErrorData);
const postTemplateValue = usePostTemplateValue({
@ -96,10 +96,6 @@ export default function NodeStatus({
node: data.node,
});
const shouldStreamEvents = () => {
return config.data?.event_delivery === EventDeliveryType.STREAMING;
};
// Start polling when connection is initiated
const startPolling = () => {
window.open(connectionLink, "_blank");
@ -169,8 +165,7 @@ export default function NodeStatus({
setValidationStatus(null);
buildFlow({
stopNodeId: nodeId,
stream: shouldStreamEvents(),
eventDelivery: config.data?.event_delivery,
eventDelivery: eventDeliveryConfig,
});
}
@ -265,8 +260,7 @@ export default function NodeStatus({
if (buildStatus === BuildStatus.BUILDING || isBuilding) return;
buildFlow({
stopNodeId: nodeId,
stream: shouldStreamEvents(),
eventDelivery: config.data?.event_delivery,
eventDelivery: eventDeliveryConfig,
});
track("Flow Build - Clicked", { stopNodeId: nodeId });
};

View file

@ -6,7 +6,7 @@ import axios, { AxiosError, AxiosInstance, AxiosRequestConfig } from "axios";
import * as fetchIntercept from "fetch-intercept";
import { useEffect } from "react";
import { Cookies } from "react-cookie";
import { BuildStatus } from "../../constants/enums";
import { BuildStatus, EventDeliveryType } from "../../constants/enums";
import useAlertStore from "../../stores/alertStore";
import useFlowStore from "../../stores/flowStore";
import { checkDuplicateRequestAndStoreRequest } from "./helpers/check-duplicate-requests";
@ -263,6 +263,7 @@ export type StreamingRequestParams = {
onError?: (statusCode: number) => void;
onNetworkError?: (error: Error) => void;
buildController: AbortController;
eventDeliveryConfig?: EventDeliveryType;
};
async function performStreamingRequest({

View file

@ -39,7 +39,7 @@ export const useGetConfig: useQueryFunctionType<undefined, ConfigResponse> = (
const setWebhookPollingInterval = useUtilityStore(
(state) => state.setWebhookPollingInterval,
);
const setEventDelivery = useUtilityStore((state) => state.setEventDelivery);
const { query } = UseRequestProcessor();
const getConfigFn = async () => {
@ -59,6 +59,7 @@ export const useGetConfig: useQueryFunctionType<undefined, ConfigResponse> = (
setWebhookPollingInterval(
data.webhook_polling_interval ?? DEFAULT_POLLING_INTERVAL,
);
setEventDelivery(data.event_delivery ?? EventDeliveryType.POLLING);
}
return data;
};

View file

@ -155,11 +155,7 @@ export default function IOModal({
const chatValue = useUtilityStore((state) => state.chatValueStore);
const setChatValue = useUtilityStore((state) => state.setChatValueStore);
const config = useGetConfig();
function shouldStreamEvents() {
return config.data?.event_delivery === EventDeliveryType.STREAMING;
}
const eventDeliveryConfig = useUtilityStore((state) => state.eventDelivery);
const sendMessage = useCallback(
async ({
@ -178,8 +174,7 @@ export default function IOModal({
files: files,
silent: true,
session: sessionId,
stream: shouldStreamEvents(),
eventDelivery: config.data?.event_delivery,
eventDelivery: eventDeliveryConfig,
}).catch((err) => {
console.error(err);
});

View file

@ -835,7 +835,6 @@ const useFlowStore = create<FlowStoreType>((set, get) => ({
edges: get().edges || undefined,
logBuilds: get().onFlowPage,
playgroundPage,
stream,
eventDelivery,
});
get().setIsBuilding(false);

View file

@ -1,3 +1,4 @@
import { EventDeliveryType } from "@/constants/enums";
import { Pagination, Tag } from "@/types/utils/types";
import { UtilityStoreType } from "@/types/zustand/utility";
import { create } from "zustand";
@ -43,4 +44,7 @@ export const useUtilityStore = create<UtilityStoreType>((set, get) => ({
currentSessionId: "",
setCurrentSessionId: (sessionId: string) =>
set({ currentSessionId: sessionId }),
eventDelivery: EventDeliveryType.POLLING,
setEventDelivery: (eventDelivery: EventDeliveryType) =>
set({ eventDelivery }),
}));

View file

@ -149,7 +149,6 @@ export type FlowStoreType = {
files,
silent,
session,
stream,
eventDelivery,
}: {
startNodeId?: string;
@ -158,7 +157,6 @@ export type FlowStoreType = {
files?: string[];
silent?: boolean;
session?: string;
stream?: boolean;
eventDelivery?: EventDeliveryType;
}) => Promise<void>;
getFlow: () => { nodes: Node[]; edges: EdgeType[]; viewport: Viewport };

View file

@ -1,3 +1,4 @@
import { EventDeliveryType } from "@/constants/enums";
import { Pagination, Tag } from "@/types/utils/types";
export type UtilityStoreType = {
@ -25,4 +26,6 @@ export type UtilityStoreType = {
setCurrentSessionId: (sessionId: string) => void;
setClientId: (clientId: string) => void;
clientId: string;
eventDelivery: EventDeliveryType;
setEventDelivery: (eventDelivery: EventDeliveryType) => void;
};

View file

@ -40,8 +40,7 @@ type BuildVerticesParams = {
logBuilds?: boolean;
session?: string;
playgroundPage?: boolean;
stream?: boolean;
eventDelivery?: EventDeliveryType;
eventDelivery: EventDeliveryType;
};
function getInactiveVertexData(vertexId: string): VertexBuildTypeAPI {
@ -148,7 +147,10 @@ export async function buildFlowVerticesWithFallback(
e.message === POLLING_MESSAGES.STREAMING_NOT_SUPPORTED
) {
// Fallback to polling
return await buildFlowVertices({ ...params, stream: false });
return await buildFlowVertices({
...params,
eventDelivery: EventDeliveryType.POLLING,
});
}
throw e;
}
@ -176,13 +178,16 @@ async function pollBuildEvents(
): Promise<void> {
let isDone = false;
while (!isDone) {
const response = await fetch(`${url}?stream=false`, {
method: "GET",
headers: {
"Content-Type": "application/json",
const response = await fetch(
`${url}?event_delivery=${EventDeliveryType.POLLING}`,
{
method: "GET",
headers: {
"Content-Type": "application/json",
},
signal: abortController.signal, // Add abort signal to fetch
},
signal: abortController.signal, // Add abort signal to fetch
});
);
if (!response.ok) {
const errorData = await response.json().catch(() => ({}));
@ -241,7 +246,6 @@ export async function buildFlowVertices({
logBuilds,
session,
playgroundPage,
stream = true,
eventDelivery,
}: BuildVerticesParams) {
const inputs = {};
@ -260,10 +264,10 @@ export async function buildFlowVertices({
queryParams.append("log_builds", logBuilds.toString());
}
// Add stream parameter when using direct event delivery
if (eventDelivery === EventDeliveryType.DIRECT) {
queryParams.append("stream", "true");
}
queryParams.append(
"event_delivery",
eventDelivery ?? EventDeliveryType.POLLING,
);
if (queryParams.toString()) {
buildUrl = `${buildUrl}?${queryParams.toString()}`;
@ -376,13 +380,12 @@ export async function buildFlowVertices({
}
});
useFlowStore.getState().setBuildController(buildController);
// Then stream the events
const eventsUrl = `${BASE_URL_API}build/${job_id}/events`;
const buildResults: Array<boolean> = [];
const verticesStartTimeMs: Map<string, number> = new Map();
if (stream) {
if (eventDelivery === EventDeliveryType.STREAMING) {
return performStreamingRequest({
method: "GET",
url: eventsUrl,