From cb4cde0ed2fe1dd018742f8cec8d498183a7cbcb Mon Sep 17 00:00:00 2001 From: Joey Yakimowich-Payne Date: Wed, 1 Oct 2025 14:33:44 -0600 Subject: [PATCH] feat: stream challenge submissions and tidy challenge imports --- api/controllers/console/__init__.py | 12 +- api/controllers/console/challenges.py | 5 +- api/controllers/web/__init__.py | 12 +- api/controllers/web/challenges.py | 3 +- api/controllers/web/register.py | 4 +- api/core/workflow/nodes/node_mapping.py | 6 +- api/models/__init__.py | 14 +- api/services/challenge_scorer_service.py | 6 +- api/services/challenge_service.py | 2 +- web/__tests__/challenge-submission.test.ts | 58 +++++-- web/app/challenges/[id]/page.tsx | 88 +++++++--- web/i18n/en-US/challenges.ts | 4 + web/service/challenges.ts | 188 ++++++++++++++++++--- 13 files changed, 309 insertions(+), 93 deletions(-) diff --git a/api/controllers/console/__init__.py b/api/controllers/console/__init__.py index e44091d17..c0ab1cf03 100644 --- a/api/controllers/console/__init__.py +++ b/api/controllers/console/__init__.py @@ -44,6 +44,10 @@ from . import ( version, ) +# Import custom challenge controllers +from . import challenges as challenges +from . import red_blue_challenges as red_blue_challenges + # Import app controllers from .app import ( advanced_prompt_template, @@ -129,10 +133,6 @@ from .workspace import ( workspace, ) -# Import custom challenge controllers -from . import challenges as challenges -from . import red_blue_challenges as red_blue_challenges - api.add_namespace(console_ns) __all__ = [ @@ -149,6 +149,7 @@ __all__ = [ "audio", "billing", "bp", + "challenges", "completion", "compliance", "console_ns", @@ -193,6 +194,7 @@ __all__ = [ "rag_pipeline_import", "rag_pipeline_workflow", "recommended_app", + "red_blue_challenges", "saved_message", "setup", "site", @@ -208,6 +210,4 @@ __all__ = [ "workflow_run", "workflow_statistic", "workspace", - "challenges", - "red_blue_challenges", ] diff --git a/api/controllers/console/challenges.py b/api/controllers/console/challenges.py index 3d3dcff22..2b75c288b 100644 --- a/api/controllers/console/challenges.py +++ b/api/controllers/console/challenges.py @@ -7,9 +7,8 @@ from controllers.console.wraps import ( account_initialization_required, setup_required, ) -from libs.login import login_required from extensions.ext_database import db -from libs.login import current_user +from libs.login import current_user, login_required from models.challenge import Challenge @@ -72,7 +71,7 @@ class ChallengeListCreateApi(Resource): c.app_id = args["app_id"] # Convert empty string to None for UUID field workflow_id = args.get("workflow_id") - c.workflow_id = workflow_id if workflow_id else None + c.workflow_id = workflow_id or None c.name = args["name"] c.description = args.get("description") c.goal = args.get("goal") diff --git a/api/controllers/web/__init__.py b/api/controllers/web/__init__.py index 9e257fc70..1e0369985 100644 --- a/api/controllers/web/__init__.py +++ b/api/controllers/web/__init__.py @@ -18,21 +18,21 @@ web_ns = Namespace("web", description="Web application API operations", path="/" from . import ( app, audio, + challenges, completion, conversation, feature, files, forgot_password, login, - register, message, passport, + red_blue_challenges, + register, remote_files, saved_message, site, workflow, - challenges, - red_blue_challenges, ) api.add_namespace(web_ns) @@ -42,20 +42,20 @@ __all__ = [ "app", "audio", "bp", + "challenges", "completion", "conversation", "feature", "files", "forgot_password", "login", - "register", "message", "passport", + "red_blue_challenges", + "register", "remote_files", "saved_message", "site", "web_ns", "workflow", - "challenges", - "red_blue_challenges", ] diff --git a/api/controllers/web/challenges.py b/api/controllers/web/challenges.py index 7ca9ee2a6..ac7cd5e11 100644 --- a/api/controllers/web/challenges.py +++ b/api/controllers/web/challenges.py @@ -1,11 +1,10 @@ from __future__ import annotations from flask_restx import Resource +from sqlalchemy import select from controllers.web import web_ns from extensions.ext_database import db -from sqlalchemy import select - from models.challenge import Challenge, ChallengeAttempt from models.model import App, Site diff --git a/api/controllers/web/register.py b/api/controllers/web/register.py index d678498d5..447864ee8 100644 --- a/api/controllers/web/register.py +++ b/api/controllers/web/register.py @@ -16,7 +16,7 @@ class WebRegisterApi(Resource): name = payload.get('name') or 'Player' password = payload.get('password') if not email or not password: - return { 'result': 'bad_request' }, 400 + return {'result': 'bad_request'}, 400 account = RegisterService.register( email=email, name=name, @@ -25,6 +25,6 @@ class WebRegisterApi(Resource): create_workspace_required=False, ) db.session.commit() - return { 'result': 'success', 'data': { 'account_id': account.id } }, 201 + return {'result': 'success', 'data': {'account_id': account.id}}, 201 diff --git a/api/core/workflow/nodes/node_mapping.py b/api/core/workflow/nodes/node_mapping.py index 7fb2820cd..1656c0330 100644 --- a/api/core/workflow/nodes/node_mapping.py +++ b/api/core/workflow/nodes/node_mapping.py @@ -4,6 +4,7 @@ from core.workflow.enums import NodeType from core.workflow.nodes.agent.agent_node import AgentNode from core.workflow.nodes.answer.answer_node import AnswerNode from core.workflow.nodes.base.node import Node +from core.workflow.nodes.challenge_evaluator.node import ChallengeEvaluatorNode from core.workflow.nodes.code import CodeNode from core.workflow.nodes.datasource.datasource_node import DatasourceNode from core.workflow.nodes.document_extractor import DocumentExtractorNode @@ -11,6 +12,7 @@ from core.workflow.nodes.end.end_node import EndNode from core.workflow.nodes.http_request import HttpRequestNode from core.workflow.nodes.if_else import IfElseNode from core.workflow.nodes.iteration import IterationNode, IterationStartNode +from core.workflow.nodes.judging_llm.node import JudgingLLMNode from core.workflow.nodes.knowledge_index import KnowledgeIndexNode from core.workflow.nodes.knowledge_retrieval import KnowledgeRetrievalNode from core.workflow.nodes.list_operator import ListOperatorNode @@ -19,14 +21,12 @@ from core.workflow.nodes.loop import LoopEndNode, LoopNode, LoopStartNode from core.workflow.nodes.parameter_extractor import ParameterExtractorNode from core.workflow.nodes.question_classifier import QuestionClassifierNode from core.workflow.nodes.start import StartNode +from core.workflow.nodes.team_challenge.node import TeamChallengeNode from core.workflow.nodes.template_transform import TemplateTransformNode from core.workflow.nodes.tool import ToolNode from core.workflow.nodes.variable_aggregator import VariableAggregatorNode from core.workflow.nodes.variable_assigner.v1 import VariableAssignerNode as VariableAssignerNodeV1 from core.workflow.nodes.variable_assigner.v2 import VariableAssignerNode as VariableAssignerNodeV2 -from core.workflow.nodes.challenge_evaluator.node import ChallengeEvaluatorNode -from core.workflow.nodes.judging_llm.node import JudgingLLMNode -from core.workflow.nodes.team_challenge.node import TeamChallengeNode LATEST_VERSION = "latest" diff --git a/api/models/__init__.py b/api/models/__init__.py index 7f28a3603..30b70283d 100644 --- a/api/models/__init__.py +++ b/api/models/__init__.py @@ -9,6 +9,7 @@ from .account import ( TenantStatus, ) from .api_based_extension import APIBasedExtension, APIBasedExtensionPoint +from .challenge import Challenge, ChallengeAttempt from .dataset import ( AppDatasetJoin, Dataset, @@ -68,6 +69,7 @@ from .provider import ( TenantDefaultModel, TenantPreferredModelProvider, ) +from .red_blue import RedBlueChallenge, TeamPairing, TeamSubmission from .source import DataSourceApiKeyAuthBinding, DataSourceOauthBinding from .task import CeleryTask, CeleryTaskSet from .tools import ( @@ -91,8 +93,6 @@ from .workflow import ( WorkflowRun, WorkflowType, ) -from .challenge import Challenge, ChallengeAttempt -from .red_blue import RedBlueChallenge, TeamSubmission, TeamPairing __all__ = [ "APIBasedExtension", @@ -113,6 +113,8 @@ __all__ = [ "BuiltinToolProvider", "CeleryTask", "CeleryTaskSet", + "Challenge", + "ChallengeAttempt", "Conversation", "ConversationVariable", "CreatorUserRole", @@ -154,10 +156,13 @@ __all__ = [ "ProviderQuotaType", "ProviderType", "RecommendedApp", + "RedBlueChallenge", "SavedMessage", "Site", "Tag", "TagBinding", + "TeamPairing", + "TeamSubmission", "Tenant", "TenantAccountJoin", "TenantAccountRole", @@ -183,9 +188,4 @@ __all__ = [ "WorkflowRunTriggeredFrom", "WorkflowToolProvider", "WorkflowType", - "Challenge", - "ChallengeAttempt", - "RedBlueChallenge", - "TeamSubmission", - "TeamPairing", ] diff --git a/api/services/challenge_scorer_service.py b/api/services/challenge_scorer_service.py index 73b7fcdea..acc819505 100644 --- a/api/services/challenge_scorer_service.py +++ b/api/services/challenge_scorer_service.py @@ -62,7 +62,7 @@ class ChallengeScorerService: raise ValueError("Scorer must return a dict with 'score' key") return result except Exception as e: - logger.error(f"Scorer plugin {scorer_plugin_id} failed: {e}", exc_info=True) + logger.error("Scorer plugin %s failed: %s", scorer_plugin_id, e, exc_info=True) raise ValueError(f"Scorer plugin execution failed: {e}") @classmethod @@ -99,11 +99,11 @@ class ChallengeScorerService: # Cache it cls._plugin_cache[cache_key] = scorer - logger.info(f"Loaded scorer plugin: {plugin_id} from {entrypoint}") + logger.info("Loaded scorer plugin: %s from %s", plugin_id, entrypoint) return scorer except Exception as e: - logger.error(f"Failed to load scorer plugin {plugin_id}:{entrypoint}: {e}", exc_info=True) + logger.error("Failed to load scorer plugin %s:%s: %s", plugin_id, entrypoint, e, exc_info=True) return None @classmethod diff --git a/api/services/challenge_service.py b/api/services/challenge_service.py index e2a484647..292c81467 100644 --- a/api/services/challenge_service.py +++ b/api/services/challenge_service.py @@ -7,7 +7,7 @@ from typing import Any from sqlalchemy.orm import Session from extensions.ext_database import db -from models.challenge import Challenge, ChallengeAttempt +from models.challenge import ChallengeAttempt class ChallengeService: diff --git a/web/__tests__/challenge-submission.test.ts b/web/__tests__/challenge-submission.test.ts index 6bc6f29e1..8b2c65a03 100644 --- a/web/__tests__/challenge-submission.test.ts +++ b/web/__tests__/challenge-submission.test.ts @@ -1,23 +1,33 @@ import { submitChallengeAttempt } from '@/service/challenges' -import { postPublic } from '@/service/base' import { PUBLIC_API_PREFIX } from '@/config' -jest.mock('@/service/base', () => ({ - getPublic: jest.fn(), - postPublic: jest.fn(), -})) - -const mockedPostPublic = postPublic as jest.MockedFunction const originalFetch = globalThis.fetch let fetchMock: jest.Mock +let ssePostMock: jest.SpyInstance + +jest.mock('@/service/base', () => { + const actual = jest.requireActual('@/service/base') + return { + ...actual, + ssePost: jest.fn(), + } +}) + +jest.mock('@/app/components/share/utils', () => ({ + ...jest.requireActual('@/app/components/share/utils'), + getInitialTokenV2: () => ({ version: 2 }), + isTokenV1: () => false, +})) describe('submitChallengeAttempt', () => { beforeEach(() => { fetchMock = jest.fn() globalThis.fetch = fetchMock as unknown as typeof fetch - mockedPostPublic.mockReset() - mockedPostPublic.mockResolvedValue({ result: 'success' } as any) + ssePostMock = jest.spyOn(require('@/service/base'), 'ssePost').mockImplementation((_url: string, _options: any, handlers: any) => { + handlers.onCompleted?.() + return Promise.resolve() + }) localStorage.clear() }) @@ -36,7 +46,7 @@ describe('submitChallengeAttempt', () => { ).rejects.toThrow('Challenge app is not published') expect(fetchMock).not.toHaveBeenCalled() - expect(mockedPostPublic).not.toHaveBeenCalled() + expect(ssePostMock).not.toHaveBeenCalled() }) it('requests a passport token and submits chat attempts through /chat-messages', async () => { @@ -46,6 +56,14 @@ describe('submitChallengeAttempt', () => { json: jest.fn().mockResolvedValue({ access_token: passportToken }), }) + ssePostMock.mockImplementation((_url, _options, handlers) => { + handlers.getAbortController?.(new AbortController()) + handlers.onData?.('Hello', true, { messageId: 'msg-1' } as any) + handlers.onMessageEnd?.({ metadata: { outputs: { challenge_succeeded: true }, answer: 'All good' } } as any) + handlers.onCompleted?.() + return Promise.resolve() + }) + await submitChallengeAttempt('challenge-123', 'app-abc', 'site-code-xyz', 'chat', 'solve this') expect(fetchMock).toHaveBeenCalledWith(`${PUBLIC_API_PREFIX}/passport`, { @@ -56,14 +74,14 @@ describe('submitChallengeAttempt', () => { credentials: 'include', }) - expect(mockedPostPublic).toHaveBeenCalledWith('/chat-messages', expect.objectContaining({ + expect(ssePostMock).toHaveBeenCalledWith('/chat-messages', expect.objectContaining({ body: { query: 'solve this', inputs: {}, - response_mode: 'blocking', + response_mode: 'streaming', conversation_id: '', }, - })) + }), expect.any(Object)) const storedToken = JSON.parse(localStorage.getItem('token') || '{}') expect(storedToken.version).toBe(2) @@ -77,6 +95,14 @@ describe('submitChallengeAttempt', () => { json: jest.fn().mockResolvedValue({ access_token: passportToken }), }) + ssePostMock.mockImplementation((_url, _options, handlers) => { + handlers.getAbortController?.(new AbortController()) + handlers.onTextChunk?.({ data: { text: 'partial' } } as any) + handlers.onWorkflowFinished?.({ data: { outputs: { challenge_succeeded: false, message: 'nope' } } } as any) + handlers.onCompleted?.() + return Promise.resolve() + }) + await submitChallengeAttempt('challenge-456', 'app-def', 'site-code-xyz', 'workflow', 'my answer') expect(fetchMock).toHaveBeenCalledWith(`${PUBLIC_API_PREFIX}/passport`, { @@ -87,14 +113,14 @@ describe('submitChallengeAttempt', () => { credentials: 'include', }) - expect(mockedPostPublic).toHaveBeenCalledWith('/workflows/run', expect.objectContaining({ + expect(ssePostMock).toHaveBeenCalledWith('/workflows/run', expect.objectContaining({ body: { inputs: { user_prompt: 'my answer', }, - response_mode: 'blocking', + response_mode: 'streaming', }, - })) + }), expect.any(Object)) const storedToken = JSON.parse(localStorage.getItem('token') || '{}') expect(storedToken['challenge-456'].DEFAULT).toBe(passportToken) diff --git a/web/app/challenges/[id]/page.tsx b/web/app/challenges/[id]/page.tsx index 41f32a47a..538bf5ed5 100644 --- a/web/app/challenges/[id]/page.tsx +++ b/web/app/challenges/[id]/page.tsx @@ -1,14 +1,14 @@ 'use client' -import { useEffect, useState } from 'react' +import { useCallback, useEffect, useRef, useState } from 'react' import { useTranslation } from 'react-i18next' import { useParams } from 'next/navigation' import { RiCheckLine, RiCloseLine, RiLoader4Line } from '@remixicon/react' -import { fetchChallengeDetail, fetchChallengeLeaderboard, submitChallengeAttempt } from '@/service/challenges' import Leaderboard from '@/app/components/challenge/leaderboard' import Button from '@/app/components/base/button' import Textarea from '@/app/components/base/textarea' import Toast from '@/app/components/base/toast' +import { fetchChallengeDetail, fetchChallengeLeaderboard, submitChallengeAttempt } from '@/service/challenges' export default function ChallengeDetailPage() { const { t } = useTranslation() @@ -21,6 +21,9 @@ export default function ChallengeDetailPage() { const [submitting, setSubmitting] = useState(false) const [userInput, setUserInput] = useState('') const [lastResult, setLastResult] = useState<{ success: boolean; message?: string; rating?: number } | null>(null) + const [streamingText, setStreamingText] = useState('') + const [hasStreamingResult, setHasStreamingResult] = useState(false) + const abortControllerRef = useRef(null) useEffect(() => { const load = async () => { @@ -43,6 +46,18 @@ export default function ChallengeDetailPage() { load() }, [id]) + const stopStreaming = useCallback(() => { + if (abortControllerRef.current) { + abortControllerRef.current.abort() + abortControllerRef.current = null + } + setHasStreamingResult(false) + }, []) + + useEffect(() => () => { + stopStreaming() + }, [stopStreaming]) + const handleSubmit = async () => { if (!userInput.trim()) { Toast.notify({ type: 'error', message: 'Please enter a response' }) @@ -53,40 +68,44 @@ export default function ChallengeDetailPage() { Toast.notify({ type: 'error', message: 'Challenge is not configured with an app' }) return } - + stopStreaming() setSubmitting(true) setLastResult(null) + setStreamingText('') + setHasStreamingResult(false) try { - // Execute the workflow with the user's input - // Endpoint varies by app type (chat vs workflow) const result = await submitChallengeAttempt( id, challenge.app_id, challenge.app_site_code, challenge.app_mode || 'workflow', userInput, + { + onStreamUpdate: (text) => { + setStreamingText(text) + setHasStreamingResult(true) + }, + onAbortController: (controller) => { + abortControllerRef.current = controller + }, + onError: (message) => { + setHasStreamingResult(false) + setStreamingText('') + Toast.notify({ type: 'error', message }) + }, + }, ) - // Extract challenge results from workflow output - // Response structure differs by app mode: - // - Chat apps: result.data.answer + result.data.metadata.outputs - // - Workflow apps: result.data (direct outputs) - const isChatApp = challenge.app_mode === 'chat' || challenge.app_mode === 'advanced-chat' - const workflowOutputs = isChatApp - ? (result.data?.metadata?.outputs || {}) - : (result.data || {}) - - const success = workflowOutputs.challenge_succeeded || false - const rating = workflowOutputs.judge_rating - const feedback = workflowOutputs.judge_feedback || workflowOutputs.message || result.data?.answer + setHasStreamingResult(false) + setStreamingText(result.rawText) setLastResult({ - success, - message: feedback || (success ? 'Challenge passed!' : 'Challenge not passed.'), - rating, + success: result.success, + message: result.message, + rating: result.rating, }) - if (success) { + if (result.success) { Toast.notify({ type: 'success', message: 'Challenge completed!' }) // Refresh leaderboard const leaders = await fetchChallengeLeaderboard(id) @@ -95,7 +114,12 @@ export default function ChallengeDetailPage() { } catch (e: any) { console.error('Submission error:', e) - Toast.notify({ type: 'error', message: e.message || 'Submission failed' }) + setHasStreamingResult(false) + setStreamingText('') + if (e?.name === 'AbortError') + return + if (!e?.__handled) + Toast.notify({ type: 'error', message: e.message || 'Submission failed' }) } finally { setSubmitting(false) @@ -156,19 +180,33 @@ export default function ChallengeDetailPage() { type='primary' onClick={handleSubmit} loading={submitting} - disabled={!userInput.trim()} + disabled={submitting || !userInput.trim()} className='w-full' > {submitting ? ( <> - {t('common.operation.processing')} + {t('challenges.player.processing', 'Processing…')} ) : ( - t('challenges.player.submit') + t('challenges.player.submitButton', 'Submit') )} + {(hasStreamingResult || streamingText) && ( +
+
+ {t('challenges.player.liveOutput')} + {hasStreamingResult && ( + + )} +
+
+ {streamingText || t('challenges.player.awaitingResponse')} +
+
+ )} + {lastResult && (
diff --git a/web/i18n/en-US/challenges.ts b/web/i18n/en-US/challenges.ts index e0374f696..af9dc99ab 100644 --- a/web/i18n/en-US/challenges.ts +++ b/web/i18n/en-US/challenges.ts @@ -50,6 +50,10 @@ export default { failed: 'Failed', pending: 'Pending', }, + liveOutput: 'Live output', + awaitingResponse: 'Waiting for the model to respond…', + processing: 'Processing…', + submitButton: 'Submit', }, leaderboard: { title: 'Leaderboard', diff --git a/web/service/challenges.ts b/web/service/challenges.ts index dd09781ba..5ff5992fe 100644 --- a/web/service/challenges.ts +++ b/web/service/challenges.ts @@ -1,7 +1,32 @@ -import { getPublic, postPublic } from './base' +import { getPublic, ssePost } from './base' import { PUBLIC_API_PREFIX } from '@/config' import { getInitialTokenV2, isTokenV1 } from '@/app/components/share/utils' import { CONVERSATION_ID_INFO } from '@/app/components/base/chat/constants' +import type { WorkflowFinishedResponse } from '@/types/workflow' + +type ChatMessageEnd = { + metadata?: { + outputs?: Record + answer?: string + message?: string + judge_feedback?: string + judge_rating?: number + } +} + +export type ChallengeAttemptResult = { + success: boolean + message: string + rating?: number + outputs: Record + rawText: string +} + +export type ChallengeAttemptCallbacks = { + onStreamUpdate?: (text: string) => void + onError?: (message: string) => void + onAbortController?: (abortController: AbortController | null) => void +} export type ChallengeListItem = { id: string @@ -31,11 +56,12 @@ export async function fetchChallengeLeaderboard(id: string) { export async function submitChallengeAttempt( challengeId: string, - appId: string, + _appId: string, appSiteCode: string | undefined, appMode: string, userInput: string, -) { + callbacks?: ChallengeAttemptCallbacks, +): Promise { if (!appSiteCode) throw new Error('Challenge app is not published. Please enable the app site for this challenge.') @@ -82,23 +108,147 @@ export async function submitChallengeAttempt( localStorage.setItem(storageKey, JSON.stringify(tokenStore)) localStorage.removeItem(CONVERSATION_ID_INFO) - if (appMode === 'chat' || appMode === 'advanced-chat' || appMode === 'agent-chat') { - return await postPublic('/chat-messages', { - body: { - query: userInput, - inputs: {}, - response_mode: 'blocking', - conversation_id: '', - }, - }) - } + const isChatApp = appMode === 'chat' || appMode === 'advanced-chat' || appMode === 'agent-chat' - return await postPublic('/workflows/run', { - body: { - inputs: { - user_prompt: userInput, + return await new Promise((resolve, reject) => { + let aggregatedText = '' + let finalOutputs: Record | undefined + let finalMessage: string | undefined + let isSettled = false + const releaseAbortController = () => { + callbacks?.onAbortController?.(null) + } + + const emitStreamUpdate = () => { + callbacks?.onStreamUpdate?.(aggregatedText) + } + + const settleError = (message: string) => { + if (isSettled) + return + isSettled = true + releaseAbortController() + const normalizedMessage = message || 'Submission failed' + const error = new Error(normalizedMessage) + if (callbacks?.onError) + (error as any).__handled = true + + callbacks?.onError?.(normalizedMessage) + reject(error) + } + + const buildResult = (): ChallengeAttemptResult => { + const outputs = finalOutputs || {} + const successFlag = Boolean(outputs.challenge_succeeded) + const rating = outputs.judge_rating ?? outputs.rating + const feedback = outputs.judge_feedback || outputs.message || finalMessage || aggregatedText + const message = feedback || (successFlag ? 'Challenge passed!' : 'Challenge not passed.') + return { + success: successFlag, + rating, + message, + outputs, + rawText: aggregatedText, + } + } + + const settleSuccess = () => { + if (isSettled) + return + isSettled = true + releaseAbortController() + resolve(buildResult()) + } + + const commonOptions = { + isPublicAPI: true, + getAbortController: (abortController: AbortController) => { + callbacks?.onAbortController?.(abortController) }, - response_mode: 'blocking', - }, + onError: (error: string) => { + const errorMessage = typeof error === 'string' ? error : 'Submission failed' + settleError(errorMessage) + }, + onCompleted: (hasError?: boolean, errorMessage?: string) => { + if (hasError) { + settleError(errorMessage || 'Submission failed') + return + } + settleSuccess() + }, + } + + if (isChatApp) { + ssePost( + '/chat-messages', + { + body: { + query: userInput, + inputs: {}, + response_mode: 'streaming', + conversation_id: '', + }, + }, + { + ...commonOptions, + onData: (message: string) => { + aggregatedText += message + emitStreamUpdate() + }, + onMessageReplace: (messageReplace) => { + aggregatedText = messageReplace.answer + emitStreamUpdate() + }, + onMessageEnd: (messageEnd) => { + const metadata = (messageEnd as ChatMessageEnd).metadata + if (metadata?.outputs) + finalOutputs = metadata.outputs + const endMessage = metadata?.answer || metadata?.message || metadata?.judge_feedback + if (endMessage) { + aggregatedText = endMessage + emitStreamUpdate() + } + if (metadata?.judge_feedback) + finalMessage = metadata.judge_feedback + else if (metadata?.answer || metadata?.message) + finalMessage = metadata.answer || metadata.message + }, + }, + ) + return + } + + ssePost( + '/workflows/run', + { + body: { + inputs: { + user_prompt: userInput, + }, + response_mode: 'streaming', + }, + }, + { + ...commonOptions, + onTextChunk: (chunk) => { + const text = (chunk as any)?.data?.text || '' + if (text) { + aggregatedText += text + emitStreamUpdate() + } + }, + onWorkflowFinished: ({ data }) => { + const resultData = (data as WorkflowFinishedResponse['data']) || {} + if (resultData.outputs) + finalOutputs = resultData.outputs + const message = resultData.outputs?.judge_feedback || resultData.outputs?.message + if (message) { + aggregatedText = message + emitStreamUpdate() + } + finalMessage = message || finalMessage + }, + }, + ) }) }