diff --git a/src/backend/base/langflow/api/router.py b/src/backend/base/langflow/api/router.py index 731d0a3e9..41e505e4f 100644 --- a/src/backend/base/langflow/api/router.py +++ b/src/backend/base/langflow/api/router.py @@ -13,6 +13,7 @@ from langflow.api.v1 import ( mcp_projects_router, mcp_router, monitor_router, + openai_responses_router, projects_router, starter_projects_router, store_router, @@ -50,6 +51,7 @@ router_v1.include_router(knowledge_bases_router) router_v1.include_router(mcp_router) router_v1.include_router(voice_mode_router) router_v1.include_router(mcp_projects_router) +router_v1.include_router(openai_responses_router) router_v2.include_router(files_router_v2) router_v2.include_router(mcp_router_v2) diff --git a/src/backend/base/langflow/api/v1/__init__.py b/src/backend/base/langflow/api/v1/__init__.py index 9a86307c0..96ba29d16 100644 --- a/src/backend/base/langflow/api/v1/__init__.py +++ b/src/backend/base/langflow/api/v1/__init__.py @@ -9,6 +9,7 @@ from langflow.api.v1.login import router as login_router from langflow.api.v1.mcp import router as mcp_router from langflow.api.v1.mcp_projects import router as mcp_projects_router from langflow.api.v1.monitor import router as monitor_router +from langflow.api.v1.openai_responses import router as openai_responses_router from langflow.api.v1.projects import router as projects_router from langflow.api.v1.starter_projects import router as starter_projects_router from langflow.api.v1.store import router as store_router @@ -29,6 +30,7 @@ __all__ = [ "mcp_projects_router", "mcp_router", "monitor_router", + "openai_responses_router", "projects_router", "starter_projects_router", "store_router", diff --git a/src/backend/base/langflow/api/v1/endpoints.py b/src/backend/base/langflow/api/v1/endpoints.py index 53c5ec7dc..9418a5ca0 100644 --- a/src/backend/base/langflow/api/v1/endpoints.py +++ b/src/backend/base/langflow/api/v1/endpoints.py @@ -116,6 +116,7 @@ async def simple_run_flow( stream: bool = False, api_key_user: User | None = None, event_manager: EventManager | None = None, + context: dict | None = None, ): validate_input_and_tweaks(input_request) try: @@ -127,7 +128,9 @@ async def simple_run_flow( raise ValueError(msg) graph_data = flow.data.copy() graph_data = process_tweaks(graph_data, input_request.tweaks or {}, stream=stream) - graph = Graph.from_payload(graph_data, flow_id=flow_id_str, user_id=str(user_id), flow_name=flow.name) + graph = Graph.from_payload( + graph_data, flow_id=flow_id_str, user_id=str(user_id), flow_name=flow.name, context=context + ) inputs = None if input_request.input_value is not None: inputs = [ @@ -228,6 +231,7 @@ async def run_flow_generator( api_key_user: User | None, event_manager: EventManager, client_consumed_queue: asyncio.Queue, + context: dict | None = None, ) -> None: """Executes a flow asynchronously and manages event streaming to the client. @@ -240,6 +244,7 @@ async def run_flow_generator( api_key_user (User | None): Optional authenticated user running the flow event_manager (EventManager): Manages the streaming of events to the client client_consumed_queue (asyncio.Queue): Tracks client consumption of events + context (dict | None): Optional context to pass to the flow Events Generated: - "add_message": Sent when new messages are added during flow execution @@ -260,6 +265,7 @@ async def run_flow_generator( stream=True, api_key_user=api_key_user, event_manager=event_manager, + context=context, ) event_manager.on_end(data={"result": result.model_dump()}) await client_consumed_queue.get() diff --git a/src/backend/base/langflow/api/v1/openai_responses.py b/src/backend/base/langflow/api/v1/openai_responses.py new file mode 100644 index 000000000..ca0c280b3 --- /dev/null +++ b/src/backend/base/langflow/api/v1/openai_responses.py @@ -0,0 +1,545 @@ +import asyncio +import json +import time +import uuid +from collections.abc import AsyncGenerator +from typing import Annotated, Any + +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request +from fastapi.responses import StreamingResponse +from loguru import logger + +from langflow.api.v1.endpoints import consume_and_yield, run_flow_generator, simple_run_flow +from langflow.api.v1.schemas import SimplifiedAPIRequest +from langflow.events.event_manager import create_stream_tokens_event_manager +from langflow.helpers.flow import get_flow_by_id_or_endpoint_name +from langflow.schema.content_types import ToolContent +from langflow.schema.openai_responses_schemas import ( + OpenAIErrorResponse, + OpenAIResponsesRequest, + OpenAIResponsesResponse, + OpenAIResponsesStreamChunk, + create_openai_error, +) +from langflow.services.auth.utils import api_key_security +from langflow.services.database.models.flow.model import FlowRead +from langflow.services.database.models.user.model import UserRead +from langflow.services.deps import get_telemetry_service +from langflow.services.telemetry.schema import RunPayload +from langflow.services.telemetry.service import TelemetryService + +router = APIRouter(tags=["OpenAI Responses API"]) + + +def has_chat_input(flow_data: dict | None) -> bool: + """Check if the flow has a chat input component.""" + if not flow_data or "nodes" not in flow_data: + return False + + return any(node.get("data", {}).get("type") in ["ChatInput", "Chat Input"] for node in flow_data["nodes"]) + + +def has_chat_output(flow_data: dict | None) -> bool: + """Check if the flow has a chat input component.""" + if not flow_data or "nodes" not in flow_data: + return False + + return any(node.get("data", {}).get("type") in ["ChatOutput", "Chat Output"] for node in flow_data["nodes"]) + + +async def run_flow_for_openai_responses( + flow: FlowRead, + request: OpenAIResponsesRequest, + api_key_user: UserRead, + *, + stream: bool = False, + variables: dict[str, str] | None = None, +) -> OpenAIResponsesResponse | StreamingResponse: + """Run a flow for OpenAI Responses API compatibility.""" + # Check if flow has chat input + if not has_chat_input(flow.data): + msg = "Flow must have a ChatInput component to be compatible with OpenAI Responses API" + raise ValueError(msg) + + if not has_chat_output(flow.data): + msg = "Flow must have a ChatOutput component to be compatible with OpenAI Responses API" + raise ValueError(msg) + + # Use previous_response_id as session_id for conversation continuity + # If no previous_response_id, create a new session_id + session_id = request.previous_response_id or str(uuid.uuid4()) + + # Store header variables in context for global variable override + context = {} + if variables: + context["request_variables"] = variables + logger.debug(f"Added request variables to context: {variables}") + + # Convert OpenAI request to SimplifiedAPIRequest + # Note: We're moving away from tweaks to a context-based approach + simplified_request = SimplifiedAPIRequest( + input_value=request.input, + input_type="chat", # Use chat input type for better compatibility + output_type="chat", # Use chat output type for better compatibility + tweaks={}, # Empty tweaks, using context instead + session_id=session_id, + ) + + # Context will be passed separately to simple_run_flow + + logger.debug(f"SimplifiedAPIRequest created with context: {context}") + + # Use session_id as response_id for OpenAI compatibility + response_id = session_id + created_timestamp = int(time.time()) + + if stream: + # Handle streaming response + asyncio_queue: asyncio.Queue = asyncio.Queue() + asyncio_queue_client_consumed: asyncio.Queue = asyncio.Queue() + event_manager = create_stream_tokens_event_manager(queue=asyncio_queue) + + async def openai_stream_generator() -> AsyncGenerator[str, None]: + """Convert Langflow events to OpenAI Responses API streaming format.""" + main_task = asyncio.create_task( + run_flow_generator( + flow=flow, + input_request=simplified_request, + api_key_user=api_key_user, + event_manager=event_manager, + client_consumed_queue=asyncio_queue_client_consumed, + context=context, + ) + ) + + try: + # Send initial chunk to establish connection + initial_chunk = OpenAIResponsesStreamChunk( + id=response_id, + created=created_timestamp, + model=request.model, + delta={"content": ""}, + ) + yield f"data: {initial_chunk.model_dump_json()}\n\n" + + tool_call_counter = 0 + processed_tools = set() # Track processed tool calls to avoid duplicates + previous_content = "" # Track content already sent to calculate deltas + + async for event_data in consume_and_yield(asyncio_queue, asyncio_queue_client_consumed): + if event_data is None: + break + + content = "" + + # Parse byte string events as JSON + if isinstance(event_data, bytes): + try: + import json + + event_str = event_data.decode("utf-8") + parsed_event = json.loads(event_str) + + if isinstance(parsed_event, dict): + event_type = parsed_event.get("event") + data = parsed_event.get("data", {}) + + # Handle add_message events + if event_type == "add_message": + sender_name = data.get("sender_name", "") + text = data.get("text", "") + sender = data.get("sender", "") + content_blocks = data.get("content_blocks", []) + + # Look for Agent Steps in content_blocks + for block in content_blocks: + if block.get("title") == "Agent Steps": + contents = block.get("contents", []) + for step in contents: + # Look for tool_use type items + if step.get("type") == "tool_use": + tool_name = step.get("name", "") + tool_input = step.get("tool_input", {}) + tool_output = step.get("output") + + # Only emit tool calls with explicit tool names and + # meaningful arguments + if tool_name and tool_input is not None and tool_output is not None: + # Create unique identifier for this tool call + tool_signature = ( + f"{tool_name}:{hash(str(sorted(tool_input.items())))}" + ) + + # Skip if we've already processed this tool call + if tool_signature in processed_tools: + continue + + processed_tools.add(tool_signature) + tool_call_counter += 1 + call_id = f"call_{tool_call_counter}" + tool_id = f"fc_{tool_call_counter}" + tool_call_event = { + "type": "response.output_item.added", + "item": { + "id": tool_id, + "type": "function_call", # OpenAI uses "function_call" + "status": "in_progress", # OpenAI includes status + "name": tool_name, + "arguments": "", # Start with empty, build via deltas + "call_id": call_id, + }, + } + yield ( + f"event: response.output_item.added\n" + f"data: {json.dumps(tool_call_event)}\n\n" + ) + + # Send function call arguments as delta events (like OpenAI) + arguments_str = json.dumps(tool_input) + arg_delta_event = { + "type": "response.function_call_arguments.delta", + "delta": arguments_str, + "item_id": tool_id, + "output_index": 0, + } + yield ( + f"event: response.function_call_arguments.delta\n" + f"data: {json.dumps(arg_delta_event)}\n\n" + ) + + # Send function call arguments done event + arg_done_event = { + "type": "response.function_call_arguments.done", + "arguments": arguments_str, + "item_id": tool_id, + "output_index": 0, + } + yield ( + f"event: response.function_call_arguments.done\n" + f"data: {json.dumps(arg_done_event)}\n\n" + ) + + # If there's output, send completion event + if tool_output is not None: + # Check if include parameter requests tool_call.results + include_results = ( + request.include + and "tool_call.results" in request.include + ) + + if include_results: + # Format with detailed results + tool_done_event = { + "type": "response.output_item.done", + "item": { + "id": f"{tool_name}_{tool_id}", + "inputs": tool_input, # Raw inputs as-is + "status": "completed", + "type": "tool_call", + "tool_name": f"{tool_name}", + "results": tool_output, # Raw output as-is + }, + "output_index": 0, + "sequence_number": tool_call_counter + 5, + } + else: + # Regular function call format + tool_done_event = { + "type": "response.output_item.done", + "item": { + "id": tool_id, + "type": "function_call", # Match OpenAI format + "status": "completed", + "arguments": arguments_str, + "call_id": call_id, + "name": tool_name, + }, + } + + yield ( + f"event: response.output_item.done\n" + f"data: {json.dumps(tool_done_event)}\n\n" + ) + + # Extract text content for streaming (only AI responses) + if ( + sender in ["Machine", "AI", "Agent"] + and text != request.input + and sender_name == "Agent" + ): + # Calculate delta: only send newly generated content + if text.startswith(previous_content): + content = text[len(previous_content) :] + previous_content = text + else: + # If text doesn't start with previous content, send full text + # This handles cases where the content might be reset + content = text + previous_content = text + + except (json.JSONDecodeError, UnicodeDecodeError): + continue + + # Only send chunks with actual content + if content: + chunk = OpenAIResponsesStreamChunk( + id=response_id, + created=created_timestamp, + model=request.model, + delta={"content": content}, + ) + yield f"data: {chunk.model_dump_json()}\n\n" + + # Send final completion chunk + final_chunk = OpenAIResponsesStreamChunk( + id=response_id, + created=created_timestamp, + model=request.model, + delta={}, + status="completed", + ) + yield f"data: {final_chunk.model_dump_json()}\n\n" + yield "data: [DONE]\n\n" + + except Exception as e: # noqa: BLE001 + logger.error(f"Error in stream generator: {e}") + error_response = create_openai_error( + message=str(e), + type_="processing_error", + ) + yield f"data: {error_response}\n\n" + finally: + if not main_task.done(): + main_task.cancel() + + return StreamingResponse( + openai_stream_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "Access-Control-Allow-Origin": "*", + }, + ) + + # Handle non-streaming response + result = await simple_run_flow( + flow=flow, + input_request=simplified_request, + stream=False, + api_key_user=api_key_user, + context=context, + ) + + # Extract output text and tool calls from result + output_text = "" + tool_calls: list[dict[str, Any]] = [] + + if result.outputs: + for run_output in result.outputs: + if run_output and run_output.outputs: + for component_output in run_output.outputs: + if component_output: + # Handle messages (final chat outputs) + if hasattr(component_output, "messages") and component_output.messages: + for msg in component_output.messages: + if hasattr(msg, "message"): + output_text = msg.message + break + # Handle results + if not output_text and hasattr(component_output, "results") and component_output.results: + for value in component_output.results.values(): + if hasattr(value, "get_text"): + output_text = value.get_text() + break + if isinstance(value, str): + output_text = value + break + + if hasattr(component_output, "results") and component_output.results: + for blocks in component_output.results.get("message", {}).content_blocks: + tool_calls.extend( + { + "name": content.name, + "input": content.tool_input, + "output": content.output, + } + for content in blocks.contents + if isinstance(content, ToolContent) + ) + if output_text: + break + if output_text: + break + + # Build output array + output_items = [] + + # Add tool calls if includes parameter requests them + include_results = request.include and "tool_call.results" in request.include + + tool_call_id_counter = 1 + for tool_call in tool_calls: + if include_results: + # Format as detailed tool call with results (like file_search_call in sample) + tool_call_item = { + "id": f"{tool_call['name']}_{tool_call_id_counter}", + "queries": list(tool_call["input"].values()) + if isinstance(tool_call["input"], dict) + else [str(tool_call["input"])], + "status": "completed", + "tool_name": f"{tool_call['name']}", + "type": "tool_call", + "results": tool_call["output"] if tool_call["output"] is not None else [], + } + else: + # Format as basic function call + tool_call_item = { + "id": f"fc_{tool_call_id_counter}", + "type": "function_call", + "status": "completed", + "name": tool_call["name"], + "arguments": json.dumps(tool_call["input"]) if tool_call["input"] is not None else "{}", + } + + output_items.append(tool_call_item) + tool_call_id_counter += 1 + + # Add the message output + output_message = { + "type": "message", + "id": f"msg_{response_id}", + "status": "completed", + "role": "assistant", + "content": [{"type": "output_text", "text": output_text, "annotations": []}], + } + output_items.append(output_message) + + return OpenAIResponsesResponse( + id=response_id, + created_at=created_timestamp, + model=request.model, + output=output_items, + previous_response_id=request.previous_response_id, + ) + + +@router.post("/responses", response_model=None) +async def create_response( + request: OpenAIResponsesRequest, + background_tasks: BackgroundTasks, + api_key_user: Annotated[UserRead, Depends(api_key_security)], + telemetry_service: Annotated[TelemetryService, Depends(get_telemetry_service)], + http_request: Request, +) -> OpenAIResponsesResponse | StreamingResponse | OpenAIErrorResponse: + """Create a response using OpenAI Responses API format. + + This endpoint accepts a flow_id in the model parameter and processes + the input through the specified Langflow flow. + + Args: + request: OpenAI Responses API request with model (flow_id) and input + background_tasks: FastAPI background task manager + api_key_user: Authenticated user from API key + http_request: The incoming HTTP request + telemetry_service: Telemetry service for logging + + Returns: + OpenAI-compatible response or streaming response + + Raises: + HTTPException: For validation errors or flow execution issues + """ + start_time = time.perf_counter() + + # Extract global variables from X-LANGFLOW-GLOBAL-VAR-* headers + variables = {} + header_prefix = "x-langflow-global-var-" + + logger.debug(f"All headers received: {list(http_request.headers.keys())}") + logger.debug(f"Looking for headers starting with: {header_prefix}") + + for header_name, header_value in http_request.headers.items(): + header_lower = header_name.lower() + logger.debug(f"Checking header: '{header_lower}' (original: '{header_name}')") + if header_lower.startswith(header_prefix): + # Extract variable name from header (remove prefix) and convert to uppercase + var_name_lower = header_lower[len(header_prefix) :] + var_name = var_name_lower.upper() # Default to uppercase + + variables[var_name] = header_value + logger.debug( + f"Found global variable: {var_name} = {header_value} " + f"(converted to uppercase from header: {header_name})" + ) + + logger.debug(f"Extracted global variables from headers: {list(variables.keys())}") + logger.debug(f"Variables dict: {variables}") + + # Validate tools parameter - error out if tools are provided + if request.tools is not None: + error_response = create_openai_error( + message="Tools are not supported yet", + type_="invalid_request_error", + code="tools_not_supported", + ) + return OpenAIErrorResponse(error=error_response["error"]) + + # Get flow using the model field (which contains flow_id) + try: + flow = await get_flow_by_id_or_endpoint_name(request.model, str(api_key_user.id)) + except HTTPException: + flow = None + + if flow is None: + error_response = create_openai_error( + message=f"Flow with id '{request.model}' not found", + type_="invalid_request_error", + code="flow_not_found", + ) + return OpenAIErrorResponse(error=error_response["error"]) + + try: + # Process the request + result = await run_flow_for_openai_responses( + flow=flow, + request=request, + api_key_user=api_key_user, + stream=request.stream, + variables=variables, + ) + + # Log telemetry for successful completion + if not request.stream: # Only log for non-streaming responses + end_time = time.perf_counter() + background_tasks.add_task( + telemetry_service.log_package_run, + RunPayload( + run_is_webhook=False, + run_seconds=int(end_time - start_time), + run_success=True, + run_error_message="", + ), + ) + + except Exception as exc: # noqa: BLE001 + logger.error(f"Error processing OpenAI Responses request: {exc}") + + # Log telemetry for failed completion + background_tasks.add_task( + telemetry_service.log_package_run, + RunPayload( + run_is_webhook=False, + run_seconds=int(time.perf_counter() - start_time), + run_success=False, + run_error_message=str(exc), + ), + ) + + # Return OpenAI-compatible error + error_response = create_openai_error( + message=str(exc), + type_="processing_error", + ) + return OpenAIErrorResponse(error=error_response["error"]) + return result diff --git a/src/backend/base/langflow/custom/custom_component/custom_component.py b/src/backend/base/langflow/custom/custom_component/custom_component.py index 3ddea976e..cf54f6f9a 100644 --- a/src/backend/base/langflow/custom/custom_component/custom_component.py +++ b/src/backend/base/langflow/custom/custom_component/custom_component.py @@ -8,6 +8,7 @@ from typing import TYPE_CHECKING, Any, ClassVar import yaml from cachetools import TTLCache from langchain_core.documents import Document +from loguru import logger from pydantic import BaseModel from langflow.custom.custom_component.base_component import BaseComponent @@ -421,6 +422,16 @@ class CustomComponent(BaseComponent): if hasattr(self, "_user_id") and not self.user_id: msg = f"User id is not set for {self.__class__.__name__}" raise ValueError(msg) + + # Check graph context for request-level variable overrides first + if hasattr(self, "graph") and self.graph and hasattr(self.graph, "context"): + context = self.graph.context + if context and "request_variables" in context: + request_variables = context["request_variables"] + if name in request_variables: + logger.debug(f"Found context override for variable '{name}': {request_variables[name]}") + return request_variables[name] + variable_service = get_variable_service() # Get service instance # Retrieve and decrypt the variable by name for the current user if isinstance(self.user_id, str): diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index 44cd12aee..c53f21495 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -1047,6 +1047,7 @@ class Graph: flow_id: str | None = None, flow_name: str | None = None, user_id: str | None = None, + context: dict | None = None, ) -> Graph: """Creates a graph from a payload. @@ -1055,6 +1056,7 @@ class Graph: flow_id: The ID of the flow. flow_name: The flow name. user_id: The user ID. + context: Optional context dictionary for request-specific data. Returns: Graph: The created graph. @@ -1064,7 +1066,7 @@ class Graph: try: vertices = payload["nodes"] edges = payload["edges"] - graph = cls(flow_id=flow_id, flow_name=flow_name, user_id=user_id) + graph = cls(flow_id=flow_id, flow_name=flow_name, user_id=user_id, context=context) graph.add_nodes_and_edges(vertices, edges) except KeyError as exc: logger.exception(exc) diff --git a/src/backend/base/langflow/schema/openai_responses_schemas.py b/src/backend/base/langflow/schema/openai_responses_schemas.py new file mode 100644 index 000000000..34665fb86 --- /dev/null +++ b/src/backend/base/langflow/schema/openai_responses_schemas.py @@ -0,0 +1,74 @@ +from typing import Any, Literal + +from pydantic import BaseModel, Field + + +class OpenAIResponsesRequest(BaseModel): + """OpenAI-compatible responses request with flow_id as model parameter.""" + + model: str = Field(..., description="The flow ID to execute (used instead of OpenAI model)") + input: str = Field(..., description="The input text to process") + stream: bool = Field(default=False, description="Whether to stream the response") + background: bool = Field(default=False, description="Whether to process in background") + tools: list[Any] | None = Field(default=None, description="Tools are not supported yet") + previous_response_id: str | None = Field( + default=None, description="ID of previous response to continue conversation" + ) + include: list[str] | None = Field( + default=None, description="Additional response data to include, e.g., ['tool_call.results']" + ) + + +class OpenAIResponsesResponse(BaseModel): + """OpenAI-compatible responses response.""" + + id: str + object: Literal["response"] = "response" + created_at: int + status: Literal["completed", "in_progress", "failed"] = "completed" + error: dict | None = None + incomplete_details: dict | None = None + instructions: str | None = None + max_output_tokens: int | None = None + model: str + output: list[dict] + parallel_tool_calls: bool = True + previous_response_id: str | None = None + reasoning: dict = Field(default_factory=lambda: {"effort": None, "summary": None}) + store: bool = True + temperature: float = 1.0 + text: dict = Field(default_factory=lambda: {"format": {"type": "text"}}) + tool_choice: str = "auto" + tools: list[dict] = Field(default_factory=list) + top_p: float = 1.0 + truncation: str = "disabled" + usage: dict | None = None + user: str | None = None + metadata: dict = Field(default_factory=dict) + + +class OpenAIResponsesStreamChunk(BaseModel): + """OpenAI-compatible responses stream chunk.""" + + id: str + object: Literal["response.chunk"] = "response.chunk" + created: int + model: str + delta: dict + status: Literal["completed", "in_progress", "failed"] | None = None + + +class OpenAIErrorResponse(BaseModel): + error: dict = Field(..., description="Error details") + + +def create_openai_error(message: str, type_: str = "invalid_request_error", code: str | None = None) -> dict: + """Create an OpenAI-compatible error response.""" + error_data = { + "message": message, + "type": type_, + } + if code: + error_data["code"] = code + + return {"error": error_data} diff --git a/src/backend/tests/integration/test_openai_responses_extended.py b/src/backend/tests/integration/test_openai_responses_extended.py new file mode 100644 index 000000000..2c5839ff9 --- /dev/null +++ b/src/backend/tests/integration/test_openai_responses_extended.py @@ -0,0 +1,503 @@ +import asyncio +import json +import os +import pathlib + +import pytest +from dotenv import load_dotenv +from httpx import AsyncClient +from loguru import logger + + +# Load environment variables from .env file +def load_env_vars(): + """Load environment variables from .env files.""" + # Try to find .env file in various locations + possible_paths = [ + pathlib.Path(".env"), # Current directory + pathlib.Path("../../.env"), # Project root + pathlib.Path("../../../.env"), # One level up from project root + ] + + for env_path in possible_paths: + if env_path.exists(): + logger.info(f"Loading environment variables from {env_path.absolute()}") + load_dotenv(env_path) + return True + + logger.warning("No .env file found. Using existing environment variables.") + return False + + +# Load environment variables at module import time +load_env_vars() + + +async def create_global_variable(client: AsyncClient, headers, name, value, variable_type="credential"): + """Create a global variable in Langflow.""" + payload = {"name": name, "value": value, "type": variable_type, "default_fields": []} + + response = await client.post("/api/v1/variables/", json=payload, headers=headers) + if response.status_code != 201: + logger.error(f"Failed to create global variable: {response.content}") + return False + + logger.info(f"Successfully created global variable: {name}") + return True + + +async def load_and_prepare_flow(client: AsyncClient, created_api_key): + """Load a flow template, create it, and wait for it to be ready.""" + # Set up headers + headers = {"x-api-key": created_api_key.api_key} + + # Create OPENAI_API_KEY global variable + openai_api_key = os.getenv("OPENAI_API_KEY") + if not openai_api_key: + pytest.skip("OPENAI_API_KEY environment variable not set") + + await create_global_variable(client, headers, "OPENAI_API_KEY", openai_api_key) + + # Load the Basic Prompting template + template_path = ( + pathlib.Path(__file__).resolve().parent.parent.parent + / "base" + / "langflow" + / "initial_setup" + / "starter_projects" + / "Basic Prompting.json" + ) + + flow_data = await asyncio.to_thread(lambda: json.loads(pathlib.Path(template_path).read_text())) + + # Add the flow + response = await client.post("/api/v1/flows/", json=flow_data, headers=headers) + logger.info(f"Flow creation response: {response.status_code}") + + assert response.status_code == 201 + flow = response.json() + + # Poll for flow builds to complete + max_attempts = 10 + for attempt in range(max_attempts): + # Get the flow builds + builds_response = await client.get(f"/api/v1/monitor/builds?flow_id={flow['id']}", headers=headers) + + if builds_response.status_code == 200: + builds = builds_response.json().get("vertex_builds", {}) + # Check if builds are complete + all_valid = True + for build_list in builds.values(): + if not build_list or build_list[0].get("valid") is not True: + all_valid = False + break + + if all_valid and builds: + logger.info(f"Flow builds completed successfully after {attempt + 1} attempts") + break + + # Wait before polling again + if attempt < max_attempts - 1: + logger.info(f"Waiting for flow builds to complete (attempt {attempt + 1}/{max_attempts})...") + await asyncio.sleep(1) + else: + logger.warning("Flow builds polling timed out, proceeding anyway") + + return flow, headers + + +@pytest.mark.api_key_required +@pytest.mark.integration +async def test_openai_responses_invalid_flow_id(client: AsyncClient, created_api_key): + """Test the OpenAI responses endpoint with an invalid flow ID.""" + headers = {"x-api-key": created_api_key.api_key} + + # Test with non-existent flow ID + payload = {"model": "non-existent-flow-id", "input": "Hello", "stream": False} + + response = await client.post("/api/v1/responses", json=payload, headers=headers) + + assert response.status_code == 200 # OpenAI errors are still 200 status + data = response.json() + assert "error" in data + assert isinstance(data["error"], dict) + assert data["error"]["type"] == "invalid_request_error" + assert "not found" in data["error"]["message"].lower() + + +@pytest.mark.api_key_required +@pytest.mark.integration +async def test_openai_responses_with_tools(client: AsyncClient, created_api_key): + """Test that tools parameter is rejected.""" + flow, headers = await load_and_prepare_flow(client, created_api_key) + + # Test with tools parameter + payload = { + "model": flow["id"], + "input": "Hello", + "stream": False, + "tools": [{"type": "function", "function": {"name": "test", "parameters": {}}}], + } + + response = await client.post("/api/v1/responses", json=payload, headers=headers) + + assert response.status_code == 200 # OpenAI errors are still 200 status + data = response.json() + assert "error" in data + assert isinstance(data["error"], dict) + assert data["error"]["type"] == "invalid_request_error" + assert data["error"]["code"] == "tools_not_supported" + assert "tools are not supported" in data["error"]["message"].lower() + + +@pytest.mark.api_key_required +@pytest.mark.integration +async def test_openai_responses_empty_input(client: AsyncClient, created_api_key): + """Test the OpenAI responses endpoint with empty input.""" + flow, headers = await load_and_prepare_flow(client, created_api_key) + + # Test with empty input + payload = {"model": flow["id"], "input": "", "stream": False} + + response = await client.post("/api/v1/responses", json=payload, headers=headers) + logger.info(f"Empty input response status: {response.status_code}") + + # The flow might still process empty input, so we check for a valid response structure + data = response.json() + + if "error" not in data or data["error"] is None: + # Valid response even with empty input + assert "id" in data + assert "output" in data + assert "created_at" in data + assert data["object"] == "response" + else: + # Some flows might reject empty input + assert isinstance(data["error"], dict) + assert "message" in data["error"] + + +@pytest.mark.api_key_required +@pytest.mark.integration +async def test_openai_responses_long_input(client: AsyncClient, created_api_key): + """Test the OpenAI responses endpoint with very long input.""" + flow, headers = await load_and_prepare_flow(client, created_api_key) + + # Create a very long input + long_input = "Hello " * 1000 # ~6000 characters + payload = {"model": flow["id"], "input": long_input, "stream": False} + + response = await client.post("/api/v1/responses", json=payload, headers=headers) + + assert response.status_code == 200 + data = response.json() + + if "error" not in data: + assert "id" in data + assert "output" in data + assert isinstance(data["output"], str) + + +@pytest.mark.api_key_required +@pytest.mark.integration +async def test_openai_responses_streaming_error_handling(client: AsyncClient, created_api_key): + """Test streaming response error handling.""" + headers = {"x-api-key": created_api_key.api_key} + + # Test with invalid flow ID in streaming mode + payload = {"model": "invalid-flow-id", "input": "Hello", "stream": True} + + response = await client.post("/api/v1/responses", json=payload, headers=headers) + + # For streaming errors, we should still get a 200 status but with error in the response + assert response.status_code == 200 + + # Read the response content + content = await response.aread() + text_content = content.decode("utf-8") + + # Should contain error information in JSON format, not SSE + data = json.loads(text_content) + assert "error" in data + assert isinstance(data["error"], dict) + assert data["error"]["type"] == "invalid_request_error" + + +@pytest.mark.api_key_required +@pytest.mark.integration +async def test_openai_responses_concurrent_requests(client: AsyncClient, created_api_key): + """Test handling of concurrent requests to the same flow.""" + flow, headers = await load_and_prepare_flow(client, created_api_key) + + # Create multiple concurrent requests + payloads = [{"model": flow["id"], "input": f"Request {i}", "stream": False} for i in range(5)] + + # Send all requests concurrently + tasks = [client.post("/api/v1/responses", json=payload, headers=headers) for payload in payloads] + + responses = await asyncio.gather(*tasks) + + # All requests should succeed + for i, response in enumerate(responses): + assert response.status_code == 200 + data = response.json() + + if "error" not in data: + assert "id" in data + assert "output" in data + # Each response should have a unique ID + assert all( + data["id"] != other.json()["id"] + for j, other in enumerate(responses) + if i != j and "error" not in other.json() + ) + + +@pytest.mark.api_key_required +@pytest.mark.integration +async def test_openai_responses_unauthorized(client: AsyncClient): + """Test the OpenAI responses endpoint without authentication.""" + payload = {"model": "some-flow-id", "input": "Hello", "stream": False} + + # No headers = no authentication + response = await client.post("/api/v1/responses", json=payload) + + # Should get 403 Forbidden + assert response.status_code == 403 + + +@pytest.mark.api_key_required +@pytest.mark.integration +async def test_openai_responses_invalid_api_key(client: AsyncClient): + """Test the OpenAI responses endpoint with invalid API key.""" + headers = {"x-api-key": "invalid-api-key-12345"} + payload = {"model": "some-flow-id", "input": "Hello", "stream": False} + + response = await client.post("/api/v1/responses", json=payload, headers=headers) + + # Should get 403 Forbidden + assert response.status_code == 403 + + +@pytest.mark.api_key_required +@pytest.mark.integration +async def test_openai_responses_malformed_request(client: AsyncClient, created_api_key): + """Test the OpenAI responses endpoint with malformed requests.""" + headers = {"x-api-key": created_api_key.api_key} + + # Missing required fields + test_cases = [ + {}, # Empty payload + {"model": "flow-id"}, # Missing input + {"input": "Hello"}, # Missing model + {"model": 123, "input": "Hello"}, # Wrong type for model + {"model": "flow-id", "input": 123}, # Wrong type for input + {"model": "flow-id", "input": "Hello", "stream": "yes"}, # Wrong type for stream + ] + + for payload in test_cases: + response = await client.post("/api/v1/responses", json=payload, headers=headers) + # OpenAI API returns validation errors as 200 with error in body or 422 + if response.status_code == 200: + data = response.json() + assert "error" in data + assert isinstance(data["error"], dict) + assert "message" in data["error"] + else: + # Should get 422 Unprocessable Entity for validation errors + assert response.status_code == 422 + + +@pytest.mark.api_key_required +@pytest.mark.integration +async def test_openai_responses_stream_interruption(client: AsyncClient, created_api_key): + """Test behavior when streaming is interrupted.""" + flow, headers = await load_and_prepare_flow(client, created_api_key) + + payload = {"model": flow["id"], "input": "Tell me a long story", "stream": True} + + response = await client.post("/api/v1/responses", json=payload, headers=headers) + assert response.status_code == 200 + + # Read only first 500 bytes then close (streaming might need more bytes) + content = await response.aread() + text_content = content.decode("utf-8") + + # Should have received at least some data + assert len(content) > 0 + # Check for either data: or valid response content + assert "data:" in text_content or "id" in text_content + + +@pytest.mark.api_key_required +@pytest.mark.integration +async def test_openai_responses_background_processing(client: AsyncClient, created_api_key): + """Test background processing parameter.""" + flow, headers = await load_and_prepare_flow(client, created_api_key) + + # Test with background=True + payload = {"model": flow["id"], "input": "Hello", "background": True, "stream": False} + + response = await client.post("/api/v1/responses", json=payload, headers=headers) + assert response.status_code == 200 + + data = response.json() + if "error" not in data or data["error"] is None: + assert "id" in data + assert "status" in data + # Background processing might change the status + assert data["status"] in ["completed", "in_progress"] + + +@pytest.mark.api_key_required +@pytest.mark.integration +async def test_openai_responses_previous_response_id(client: AsyncClient, created_api_key): + """Test previous_response_id parameter for conversation continuity.""" + flow, headers = await load_and_prepare_flow(client, created_api_key) + + # First request + payload1 = {"model": flow["id"], "input": "Hello", "stream": False} + response1 = await client.post("/api/v1/responses", json=payload1, headers=headers) + assert response1.status_code == 200 + + data1 = response1.json() + if "error" not in data1 or data1["error"] is None: + first_response_id = data1["id"] + + # Second request with previous_response_id + payload2 = { + "model": flow["id"], + "input": "Continue our conversation", + "previous_response_id": first_response_id, + "stream": False, + } + response2 = await client.post("/api/v1/responses", json=payload2, headers=headers) + assert response2.status_code == 200 + + data2 = response2.json() + if "error" not in data2 or data2["error"] is None: + # The previous_response_id might be preserved in the response + # This depends on the implementation, so we just check it doesn't error + # We'll just verify that the request was processed successfully + assert "id" in data2 + assert "output" in data2 + + +@pytest.mark.api_key_required +@pytest.mark.integration +async def test_openai_responses_response_format(client: AsyncClient, created_api_key): + """Test OpenAI response format compliance.""" + flow, headers = await load_and_prepare_flow(client, created_api_key) + + payload = {"model": flow["id"], "input": "Hello", "stream": False} + response = await client.post("/api/v1/responses", json=payload, headers=headers) + + assert response.status_code == 200 + data = response.json() + + if "error" not in data or data["error"] is None: + # Check OpenAI response format compliance + required_fields = ["id", "object", "created_at", "status", "model", "output"] + for field in required_fields: + assert field in data, f"Missing required field: {field}" + + # Check field types and values + assert isinstance(data["id"], str) + assert data["object"] == "response" + assert isinstance(data["created_at"], int) + assert data["status"] in ["completed", "in_progress", "failed"] + assert isinstance(data["model"], str) + assert isinstance(data["output"], list) + + # Check optional fields with expected defaults + assert data["parallel_tool_calls"] is True + assert data["store"] is True + assert data["temperature"] == 1.0 + assert data["top_p"] == 1.0 + assert data["truncation"] == "disabled" + assert data["tool_choice"] == "auto" + assert isinstance(data["tools"], list) + assert isinstance(data["reasoning"], dict) + assert isinstance(data["text"], dict) + assert isinstance(data["metadata"], dict) + + +@pytest.mark.api_key_required +@pytest.mark.integration +async def test_openai_responses_stream_chunk_format(client: AsyncClient, created_api_key): + """Test OpenAI streaming response chunk format compliance.""" + flow, headers = await load_and_prepare_flow(client, created_api_key) + + payload = {"model": flow["id"], "input": "Hello", "stream": True} + response = await client.post("/api/v1/responses", json=payload, headers=headers) + + assert response.status_code == 200 + + content = await response.aread() + text_content = content.decode("utf-8") + + # Parse the events + events = text_content.strip().split("\n\n") + data_events = [evt for evt in events if evt.startswith("data:") and not evt.startswith("data: [DONE]")] + + if data_events: + # Check first chunk format + first_chunk_json = data_events[0].replace("data: ", "") + try: + first_chunk = json.loads(first_chunk_json) + + # Basic checks for streaming response + assert "id" in first_chunk + assert "delta" in first_chunk + assert isinstance(first_chunk["id"], str) + assert isinstance(first_chunk["delta"], dict) + + # Check OpenAI stream chunk format compliance if fields exist + if "object" in first_chunk: + assert first_chunk["object"] == "response.chunk" + if "created" in first_chunk: + assert isinstance(first_chunk["created"], int) + if "model" in first_chunk: + assert isinstance(first_chunk["model"], str) + + # Status is optional in chunks and can be None + if "status" in first_chunk and first_chunk["status"] is not None: + assert first_chunk["status"] in ["completed", "in_progress", "failed"] + except json.JSONDecodeError: + # If streaming format is different or not JSON, just ensure we have data + assert len(data_events) > 0 + else: + # If no streaming chunks, ensure we have the [DONE] marker or valid response + assert "data: [DONE]" in text_content or len(text_content) > 0 + + +@pytest.mark.api_key_required +@pytest.mark.integration +async def test_openai_responses_rate_limiting_simulation(client: AsyncClient, created_api_key): + """Test behavior under rapid successive requests.""" + flow, headers = await load_and_prepare_flow(client, created_api_key) + + # Send 10 rapid requests + rapid_requests = [] + for i in range(10): + payload = {"model": flow["id"], "input": f"Rapid request {i}", "stream": False} + rapid_requests.append(client.post("/api/v1/responses", json=payload, headers=headers)) + + # Wait for all requests to complete + responses = await asyncio.gather(*rapid_requests, return_exceptions=True) + + # Check that most requests succeeded (allowing for some potential failures) + successful_responses = [r for r in responses if not isinstance(r, Exception) and r.status_code == 200] + + # At least 50% should succeed + assert len(successful_responses) >= 5 + + # Check that successful responses have unique IDs + response_ids = [] + for response in successful_responses: + data = response.json() + if "error" not in data or data["error"] is None: + response_ids.append(data["id"]) + + # All response IDs should be unique + assert len(response_ids) == len(set(response_ids)) diff --git a/src/backend/tests/integration/test_openai_responses_integration.py b/src/backend/tests/integration/test_openai_responses_integration.py new file mode 100644 index 000000000..9e4aa9956 --- /dev/null +++ b/src/backend/tests/integration/test_openai_responses_integration.py @@ -0,0 +1,167 @@ +import asyncio +import json +import os +import pathlib + +import pytest +from dotenv import find_dotenv, load_dotenv +from httpx import AsyncClient +from loguru import logger + +load_dotenv(find_dotenv()) + + +async def create_global_variable(client: AsyncClient, headers, name, value, variable_type="credential"): + """Create a global variable in Langflow.""" + payload = {"name": name, "value": value, "type": variable_type, "default_fields": []} + + response = await client.post("/api/v1/variables/", json=payload, headers=headers) + if response.status_code != 201: + logger.error(f"Failed to create global variable: {response.content}") + return False + + logger.info(f"Successfully created global variable: {name}") + return True + + +async def load_and_prepare_flow(client: AsyncClient, created_api_key): + """Load a flow template, create it, and wait for it to be ready.""" + # Set up headers + headers = {"x-api-key": created_api_key.api_key} + + # Create OPENAI_API_KEY global variable + openai_api_key = os.getenv("OPENAI_API_KEY") + if not openai_api_key: + pytest.skip("OPENAI_API_KEY environment variable not set") + + await create_global_variable(client, headers, "OPENAI_API_KEY", openai_api_key) + + # Load the Basic Prompting template + template_path = ( + pathlib.Path(__file__).resolve().parent.parent.parent + / "base" + / "langflow" + / "initial_setup" + / "starter_projects" + / "Basic Prompting.json" + ) + + flow_data = await asyncio.to_thread(lambda: json.loads(pathlib.Path(template_path).read_text())) + + # Add the flow + response = await client.post("/api/v1/flows/", json=flow_data, headers=headers) + logger.info(f"Flow creation response: {response.status_code}") + + assert response.status_code == 201 + flow = response.json() + + # Poll for flow builds to complete + max_attempts = 10 + for attempt in range(max_attempts): + # Get the flow builds + builds_response = await client.get(f"/api/v1/monitor/builds?flow_id={flow['id']}", headers=headers) + + if builds_response.status_code == 200: + builds = builds_response.json().get("vertex_builds", {}) + # Check if builds are complete + all_valid = True + for build_list in builds.values(): + if not build_list or build_list[0].get("valid") is not True: + all_valid = False + break + + if all_valid and builds: + logger.info(f"Flow builds completed successfully after {attempt + 1} attempts") + break + + # Wait before polling again + if attempt < max_attempts - 1: + logger.info(f"Waiting for flow builds to complete (attempt {attempt + 1}/{max_attempts})...") + await asyncio.sleep(1) + else: + logger.warning("Flow builds polling timed out, proceeding anyway") + + return flow, headers + + +@pytest.mark.api_key_required +@pytest.mark.integration +async def test_openai_responses_non_streaming(client: AsyncClient, created_api_key): + """Test the OpenAI-compatible non-streaming responses endpoint directly.""" + flow, headers = await load_and_prepare_flow(client, created_api_key) + + # Now test the OpenAI-compatible endpoint + payload = {"model": flow["id"], "input": "Hello, Langflow!", "stream": False} + + # Make the request + response = await client.post("/api/v1/responses", json=payload, headers=headers) + logger.info(f"Response status: {response.status_code}") + logger.debug(f"Response content: {response.content}") + + # Handle potential errors + if response.status_code != 200: + logger.error(f"Error response: {response.content}") + pytest.fail(f"Request failed with status {response.status_code}") + + try: + data = response.json() + if "error" in data and data["error"] is not None: + logger.error(f"Error in response: {data['error']}") + # Don't fail immediately, log more details for debugging + logger.error(f"Full error details: {data}") + error_msg = "Unknown error" + if isinstance(data.get("error"), dict): + error_msg = data["error"].get("message", "Unknown error") + elif data.get("error"): + error_msg = str(data["error"]) + pytest.fail(f"Error in response: {error_msg}") + + # Validate the response + assert "id" in data + assert "output" in data + except Exception as exc: + logger.exception("Exception parsing response") + pytest.fail(f"Failed to parse response: {exc}") + + +@pytest.mark.api_key_required +@pytest.mark.integration +async def test_openai_responses_streaming(client: AsyncClient, created_api_key): + """Test the OpenAI-compatible streaming responses endpoint directly.""" + flow, headers = await load_and_prepare_flow(client, created_api_key) + + # Now test the OpenAI-compatible streaming endpoint + payload = {"model": flow["id"], "input": "Hello, stream!", "stream": True} + + # Make the request + response = await client.post("/api/v1/responses", json=payload, headers=headers) + logger.info(f"Response status: {response.status_code}") + + # Handle potential errors + if response.status_code != 200: + logger.error(f"Error response: {response.content}") + pytest.fail(f"Request failed with status {response.status_code}") + + # For streaming, we should get a stream of server-sent events + content = await response.aread() + text_content = content.decode("utf-8") + logger.debug(f"Response content (first 200 chars): {text_content[:200]}") + + # Check that we got some SSE data events + assert "data:" in text_content + + # Parse the events to validate structure and final [DONE] marker + events = text_content.strip().split("\n\n") + # The stream must end with the OpenAI '[DONE]' sentinel + assert events, "No events in stream" + assert events[-1].strip() == "data: [DONE]", "Stream did not end with [DONE] marker" + + # Filter out the [DONE] marker to inspect JSON data events + data_events = [evt for evt in events if evt.startswith("data:") and not evt.startswith("data: [DONE]")] + assert data_events, "No streaming events were received" + + # Parse the first and last JSON events to check their structure + first_event = json.loads(data_events[0].replace("data: ", "")) + last_event = json.loads(data_events[-1].replace("data: ", "")) + assert "delta" in first_event + assert "delta" in last_event diff --git a/src/backend/tests/integration/test_openai_streaming_comparison.py b/src/backend/tests/integration/test_openai_streaming_comparison.py new file mode 100644 index 000000000..fff69cd48 --- /dev/null +++ b/src/backend/tests/integration/test_openai_streaming_comparison.py @@ -0,0 +1,343 @@ +import asyncio +import json +import os +import pathlib + +import httpx +import pytest +from dotenv import load_dotenv +from httpx import AsyncClient +from loguru import logger + + +# Load environment variables from .env file +def load_env_vars(): + """Load environment variables from .env files.""" + possible_paths = [ + pathlib.Path(".env"), + pathlib.Path("../../.env"), + pathlib.Path("../../../.env"), + ] + + for env_path in possible_paths: + if env_path.exists(): + logger.info(f"Loading environment variables from {env_path.absolute()}") + load_dotenv(env_path) + return True + + logger.warning("No .env file found. Using existing environment variables.") + return False + + +# Load environment variables at module import time +load_env_vars() + + +async def create_global_variable(client: AsyncClient, headers, name, value, variable_type="credential"): + """Create a global variable in Langflow.""" + payload = {"name": name, "value": value, "type": variable_type, "default_fields": []} + + response = await client.post("/api/v1/variables/", json=payload, headers=headers) + if response.status_code != 201: + logger.error(f"Failed to create global variable: {response.content}") + return False + + logger.info(f"Successfully created global variable: {name}") + return True + + +async def load_and_prepare_flow(client: AsyncClient, created_api_key): + """Load Simple Agent flow and wait for it to be ready.""" + headers = {"x-api-key": created_api_key.api_key} + + # Create OPENAI_API_KEY global variable + openai_api_key = os.getenv("OPENAI_API_KEY") + if not openai_api_key or openai_api_key == "dummy": + pytest.skip("OPENAI_API_KEY environment variable not set") + + await create_global_variable(client, headers, "OPENAI_API_KEY", openai_api_key) + + # Load the Simple Agent template + template_path = ( + pathlib.Path(__file__).resolve().parent.parent.parent + / "base" + / "langflow" + / "initial_setup" + / "starter_projects" + / "Simple Agent.json" + ) + + flow_data = await asyncio.to_thread(lambda: json.loads(pathlib.Path(template_path).read_text())) + + # Add the flow + response = await client.post("/api/v1/flows/", json=flow_data, headers=headers) + assert response.status_code == 201 + flow = response.json() + + # Poll for flow builds to complete + max_attempts = 10 + for attempt in range(max_attempts): + builds_response = await client.get(f"/api/v1/monitor/builds?flow_id={flow['id']}", headers=headers) + + if builds_response.status_code == 200: + builds = builds_response.json().get("vertex_builds", {}) + all_valid = True + for build_list in builds.values(): + if not build_list or build_list[0].get("valid") is not True: + all_valid = False + break + + if all_valid and builds: + break + + if attempt < max_attempts - 1: + await asyncio.sleep(1) + + return flow, headers + + +@pytest.mark.api_key_required +@pytest.mark.integration +async def test_openai_streaming_format_comparison(client: AsyncClient, created_api_key): + """Compare raw HTTP streaming formats between OpenAI and our API.""" + # Test input + input_msg = "What is 25 + 17? Use your calculator tool." + + # Tools definition + tools = [ + { + "type": "function", + "name": "evaluate_expression", + "description": "Perform basic arithmetic operations on a given expression.", + "parameters": { + "type": "object", + "properties": { + "expression": { + "type": "string", + "description": "The arithmetic expression to evaluate (e.g., '4*4*(33/22)+12-20').", + } + }, + "required": ["expression"], + }, + } + ] + + # Get OpenAI API key + openai_api_key = os.getenv("OPENAI_API_KEY") + if not openai_api_key: + pytest.skip("OPENAI_API_KEY environment variable not set") + + # === Test OpenAI's raw HTTP streaming format === + logger.info("=== Testing OpenAI API Raw HTTP Format ===") + + async with httpx.AsyncClient() as openai_client: + openai_payload = {"model": "gpt-4o-mini", "input": input_msg, "tools": tools, "stream": True} + + openai_response = await openai_client.post( + "https://api.openai.com/v1/responses", + headers={"Authorization": f"Bearer {openai_api_key}", "Content-Type": "application/json"}, + json=openai_payload, + ) + + logger.info(f"OpenAI status: {openai_response.status_code}") + if openai_response.status_code != 200: + logger.error(f"OpenAI error: {openai_response.text}") + pytest.skip("OpenAI API request failed") + + # Parse OpenAI's raw SSE stream + openai_content = await openai_response.aread() + openai_text = openai_content.decode("utf-8") + + openai_events = openai_text.strip().split("\n\n") + openai_data_events = [evt for evt in openai_events if "data: " in evt and not evt.startswith("data: [DONE]")] + + # === Test Our API's streaming format === + logger.info("=== Testing Our API Format ===") + + flow, headers = await load_and_prepare_flow(client, created_api_key) + + our_payload = {"model": flow["id"], "input": input_msg, "stream": True, "include": ["tool_call.results"]} + + our_response = await client.post("/api/v1/responses", json=our_payload, headers=headers) + assert our_response.status_code == 200 + + our_content = await our_response.aread() + our_text = our_content.decode("utf-8") + + our_events = our_text.strip().split("\n\n") + our_data_events = [evt for evt in our_events if "data: " in evt and not evt.startswith("data: [DONE]")] + + # === Parse and compare events === + + # Extract JSON data from OpenAI events + openai_parsed = [] + for event_block in openai_data_events: + lines = event_block.strip().split("\n") + for line in lines: + if line.startswith("data: "): + try: + json_str = line.replace("data: ", "", 1) + event_data = json.loads(json_str) + openai_parsed.append(event_data) + break + except json.JSONDecodeError: + continue + + # Extract JSON data from our events + our_parsed = [] + for event_block in our_data_events: + lines = event_block.strip().split("\n") + for line in lines: + if line.startswith("data: "): + try: + json_str = line.replace("data: ", "", 1) + event_data = json.loads(json_str) + our_parsed.append(event_data) + break + except json.JSONDecodeError: + continue + + # === Analysis === + logger.info("Event counts:") + logger.info(f" OpenAI: {len(openai_parsed)} events") + logger.info(f" Our API: {len(our_parsed)} events") + + # Check for tool call events with detailed logging + logger.info("Detailed OpenAI event analysis:") + output_item_added_events = [e for e in openai_parsed if e.get("type") == "response.output_item.added"] + logger.info(f" Found {len(output_item_added_events)} 'response.output_item.added' events") + + for i, event in enumerate(output_item_added_events): + item = event.get("item", {}) + item_type = item.get("type", "unknown") + logger.info(f" Event {i}: item.type = '{item_type}'") + logger.info(f" Event {i}: item keys = {list(item.keys())}") + if "name" in item: + logger.info(f" Event {i}: item.name = '{item.get('name')}'") + logger.debug(f" Event {i}: full item = {json.dumps(item, indent=6)}") + + openai_tool_events = [ + e + for e in openai_parsed + if e.get("type") == "response.output_item.added" and e.get("item", {}).get("type") == "tool_call" + ] + openai_function_events = [ + e + for e in openai_parsed + if e.get("type") == "response.output_item.added" and e.get("item", {}).get("type") == "function_call" + ] + + logger.info("Detailed Our API event analysis:") + our_output_item_added_events = [e for e in our_parsed if e.get("type") == "response.output_item.added"] + logger.info(f" Found {len(our_output_item_added_events)} 'response.output_item.added' events") + + for i, event in enumerate(our_output_item_added_events): + item = event.get("item", {}) + item_type = item.get("type", "unknown") + logger.info(f" Event {i}: item.type = '{item_type}'") + logger.info(f" Event {i}: item keys = {list(item.keys())}") + if "name" in item: + logger.info(f" Event {i}: item.name = '{item.get('name')}'") + logger.debug(f" Event {i}: full item = {json.dumps(item, indent=6)}") + + our_function_events = [ + e + for e in our_parsed + if e.get("type") == "response.output_item.added" and e.get("item", {}).get("type") == "function_call" + ] + + logger.info("Tool call detection results:") + logger.info(f" OpenAI tool_call events: {len(openai_tool_events)}") + logger.info(f" OpenAI function_call events: {len(openai_function_events)}") + logger.info(f" Our function_call events: {len(our_function_events)}") + + # Use the correct event type for OpenAI (function_call vs tool_call) + openai_actual_tool_events = openai_function_events if openai_function_events else openai_tool_events + + logger.info("Function call events:") + logger.info(f" OpenAI: {len(openai_actual_tool_events)} function call events") + logger.info(f" Our API: {len(our_function_events)} function call events") + + # Show event types + openai_types = {e.get("type", e.get("object", "unknown")) for e in openai_parsed} + our_types = {e.get("type", e.get("object", "unknown")) for e in our_parsed} + + logger.info("Event types:") + logger.info(f" OpenAI: {sorted(openai_types)}") + logger.info(f" Our API: {sorted(our_types)}") + + # Print sample events for debugging + logger.info("Sample OpenAI events:") + for i, event in enumerate(openai_parsed[:3]): + logger.debug(f" {i}: {json.dumps(event, indent=2)[:200]}...") + + logger.info("Sample Our events:") + for i, event in enumerate(our_parsed[:3]): + logger.debug(f" {i}: {json.dumps(event, indent=2)[:200]}...") + + # Check delta content for duplicates/accumulation + logger.info("Checking delta content for proper streaming:") + delta_contents = [] + for i, event in enumerate(our_parsed): + if event.get("object") == "response.chunk" and "delta" in event: + delta_content = event["delta"].get("content", "") + if delta_content: # Only track non-empty content + delta_contents.append(delta_content) + logger.info(f" Delta {i}: '{delta_content}'") + + # Check for accumulated content (bad) vs incremental content (good) + if len(delta_contents) > 1: + logger.info("Analyzing delta content patterns:") + accumulated_pattern = True + for i in range(1, len(delta_contents)): + if not delta_contents[i].startswith(delta_contents[i - 1]): + accumulated_pattern = False + break + + if accumulated_pattern: + logger.error("❌ DETECTED ACCUMULATED CONTENT PATTERN (BAD)") + logger.error("Each delta contains the full accumulated message instead of just new content") + logger.error("Example:") + for i, content in enumerate(delta_contents[:3]): + logger.error(f" Delta {i}: '{content}'") + else: + logger.success("✅ DETECTED INCREMENTAL CONTENT PATTERN (GOOD)") + logger.success("Each delta contains only new content") + else: + logger.info("Not enough delta content to analyze pattern") + + if openai_actual_tool_events: + logger.info("OpenAI tool call example:") + logger.debug(f" {json.dumps(openai_actual_tool_events[0], indent=2)}") + + if our_function_events: + logger.info("Our function call example:") + logger.debug(f" {json.dumps(our_function_events[0], indent=2)}") + + # === Validation === + + # Basic validation + assert len(openai_parsed) > 0, "No OpenAI events received" + assert len(our_parsed) > 0, "No events from our API" + + # Check if both APIs produced function call events + if len(openai_actual_tool_events) > 0: + logger.success("✅ OpenAI produced function call events") + if len(our_function_events) > 0: + logger.success("✅ Our API also produced function call events") + logger.success("✅ Both APIs support function call streaming") + else: + logger.error("❌ Our API did not produce function call events") + pytest.fail("Our API should produce function call events when OpenAI does") + else: + logger.info("No function calls were made by OpenAI") + + logger.info("📊 Test Summary:") + logger.info(f" OpenAI events: {len(openai_parsed)}") + logger.info(f" Our events: {len(our_parsed)}") + logger.info(f" OpenAI function events: {len(openai_actual_tool_events)}") + logger.info(f" Our function events: {len(our_function_events)}") + compatibility_result = ( + "✅ PASS" if len(our_function_events) > 0 or len(openai_actual_tool_events) == 0 else "❌ FAIL" + ) + logger.info(f" Format compatibility: {compatibility_result}")