feat: OpenAI compatibility (#9069)

* take0

* previous response id and streaming fix

* add tool calls to streaming responses

* integration test

* integration test

* better response output -- works with sdk

* [autofix.ci] apply automated fixes

* Refactor logging in integration tests to use loguru for improved clarity and consistency. Replace print statements with appropriate logging levels, enhancing error handling and debugging capabilities.

* Refactor `has_chat_input` function to use `any()` for improved readability. Enhance error messaging in `run_flow_for_openai_responses` for clarity. Update response yielding format for better readability. Add noqa comments for linting compliance.

* Add comprehensive OpenAI compatibility tests for edge cases and error handling

Extend integration test coverage with new test file containing validation for empty inputs, invalid models, tools parameter rejection, timeout scenarios, and concurrent request handling. Update existing integration tests to improve error handling and response validation.

* streaming

* [autofix.ci] apply automated fixes

* format

* loguru

* [autofix.ci] apply automated fixes

* ruff

* delta fix

* support includes [tool_call.results] in response.output_item.done

* [autofix.ci] apply automated fixes

* include results non streaming

* [autofix.ci] apply automated fixes

* support global variable override with http headers via openai response api

* [autofix.ci] apply automated fixes

* ruff

* ruff

* ruff

* ruff

* ruff

* refactor: use Depends for telemetry service

* [autofix.ci] apply automated fixes

* load_dotenv(find_dotenv())

* check for chat output

* openai key dummy check

* [autofix.ci] apply automated fixes

* mypy fix

* fix: specify type for tool_calls in openai_responses (#9530)

fix: specify type for tool_calls in openai_responses.py

Updated the type annotation for the tool_calls variable to explicitly define it as a list of dictionaries with string keys and Any values, enhancing type safety and clarity in the code.

Co-authored-by: Sebastián Estévez <estevezsebastian@gmail.com>

* import fix

* [autofix.ci] apply automated fixes

* response_model=None is required becase create_response 3 different types

---------

Co-authored-by: phact <estevez.sebastian@gmail.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
Co-authored-by: Edwin Jose <edwin.jose@datastax.com>
Co-authored-by: Carlos Coelho <80289056+carlosrcoelho@users.noreply.github.com>
Co-authored-by: Jordan Frazier <122494242+jordanrfrazier@users.noreply.github.com>
This commit is contained in:
Sebastián Estévez 2025-08-26 22:00:37 -04:00 committed by GitHub
commit 4e92f75939
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 1657 additions and 2 deletions

View file

@ -13,6 +13,7 @@ from langflow.api.v1 import (
mcp_projects_router,
mcp_router,
monitor_router,
openai_responses_router,
projects_router,
starter_projects_router,
store_router,
@ -50,6 +51,7 @@ router_v1.include_router(knowledge_bases_router)
router_v1.include_router(mcp_router)
router_v1.include_router(voice_mode_router)
router_v1.include_router(mcp_projects_router)
router_v1.include_router(openai_responses_router)
router_v2.include_router(files_router_v2)
router_v2.include_router(mcp_router_v2)

View file

@ -9,6 +9,7 @@ from langflow.api.v1.login import router as login_router
from langflow.api.v1.mcp import router as mcp_router
from langflow.api.v1.mcp_projects import router as mcp_projects_router
from langflow.api.v1.monitor import router as monitor_router
from langflow.api.v1.openai_responses import router as openai_responses_router
from langflow.api.v1.projects import router as projects_router
from langflow.api.v1.starter_projects import router as starter_projects_router
from langflow.api.v1.store import router as store_router
@ -29,6 +30,7 @@ __all__ = [
"mcp_projects_router",
"mcp_router",
"monitor_router",
"openai_responses_router",
"projects_router",
"starter_projects_router",
"store_router",

View file

@ -116,6 +116,7 @@ async def simple_run_flow(
stream: bool = False,
api_key_user: User | None = None,
event_manager: EventManager | None = None,
context: dict | None = None,
):
validate_input_and_tweaks(input_request)
try:
@ -127,7 +128,9 @@ async def simple_run_flow(
raise ValueError(msg)
graph_data = flow.data.copy()
graph_data = process_tweaks(graph_data, input_request.tweaks or {}, stream=stream)
graph = Graph.from_payload(graph_data, flow_id=flow_id_str, user_id=str(user_id), flow_name=flow.name)
graph = Graph.from_payload(
graph_data, flow_id=flow_id_str, user_id=str(user_id), flow_name=flow.name, context=context
)
inputs = None
if input_request.input_value is not None:
inputs = [
@ -228,6 +231,7 @@ async def run_flow_generator(
api_key_user: User | None,
event_manager: EventManager,
client_consumed_queue: asyncio.Queue,
context: dict | None = None,
) -> None:
"""Executes a flow asynchronously and manages event streaming to the client.
@ -240,6 +244,7 @@ async def run_flow_generator(
api_key_user (User | None): Optional authenticated user running the flow
event_manager (EventManager): Manages the streaming of events to the client
client_consumed_queue (asyncio.Queue): Tracks client consumption of events
context (dict | None): Optional context to pass to the flow
Events Generated:
- "add_message": Sent when new messages are added during flow execution
@ -260,6 +265,7 @@ async def run_flow_generator(
stream=True,
api_key_user=api_key_user,
event_manager=event_manager,
context=context,
)
event_manager.on_end(data={"result": result.model_dump()})
await client_consumed_queue.get()

View file

@ -0,0 +1,545 @@
import asyncio
import json
import time
import uuid
from collections.abc import AsyncGenerator
from typing import Annotated, Any
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Request
from fastapi.responses import StreamingResponse
from loguru import logger
from langflow.api.v1.endpoints import consume_and_yield, run_flow_generator, simple_run_flow
from langflow.api.v1.schemas import SimplifiedAPIRequest
from langflow.events.event_manager import create_stream_tokens_event_manager
from langflow.helpers.flow import get_flow_by_id_or_endpoint_name
from langflow.schema.content_types import ToolContent
from langflow.schema.openai_responses_schemas import (
OpenAIErrorResponse,
OpenAIResponsesRequest,
OpenAIResponsesResponse,
OpenAIResponsesStreamChunk,
create_openai_error,
)
from langflow.services.auth.utils import api_key_security
from langflow.services.database.models.flow.model import FlowRead
from langflow.services.database.models.user.model import UserRead
from langflow.services.deps import get_telemetry_service
from langflow.services.telemetry.schema import RunPayload
from langflow.services.telemetry.service import TelemetryService
router = APIRouter(tags=["OpenAI Responses API"])
def has_chat_input(flow_data: dict | None) -> bool:
"""Check if the flow has a chat input component."""
if not flow_data or "nodes" not in flow_data:
return False
return any(node.get("data", {}).get("type") in ["ChatInput", "Chat Input"] for node in flow_data["nodes"])
def has_chat_output(flow_data: dict | None) -> bool:
"""Check if the flow has a chat input component."""
if not flow_data or "nodes" not in flow_data:
return False
return any(node.get("data", {}).get("type") in ["ChatOutput", "Chat Output"] for node in flow_data["nodes"])
async def run_flow_for_openai_responses(
flow: FlowRead,
request: OpenAIResponsesRequest,
api_key_user: UserRead,
*,
stream: bool = False,
variables: dict[str, str] | None = None,
) -> OpenAIResponsesResponse | StreamingResponse:
"""Run a flow for OpenAI Responses API compatibility."""
# Check if flow has chat input
if not has_chat_input(flow.data):
msg = "Flow must have a ChatInput component to be compatible with OpenAI Responses API"
raise ValueError(msg)
if not has_chat_output(flow.data):
msg = "Flow must have a ChatOutput component to be compatible with OpenAI Responses API"
raise ValueError(msg)
# Use previous_response_id as session_id for conversation continuity
# If no previous_response_id, create a new session_id
session_id = request.previous_response_id or str(uuid.uuid4())
# Store header variables in context for global variable override
context = {}
if variables:
context["request_variables"] = variables
logger.debug(f"Added request variables to context: {variables}")
# Convert OpenAI request to SimplifiedAPIRequest
# Note: We're moving away from tweaks to a context-based approach
simplified_request = SimplifiedAPIRequest(
input_value=request.input,
input_type="chat", # Use chat input type for better compatibility
output_type="chat", # Use chat output type for better compatibility
tweaks={}, # Empty tweaks, using context instead
session_id=session_id,
)
# Context will be passed separately to simple_run_flow
logger.debug(f"SimplifiedAPIRequest created with context: {context}")
# Use session_id as response_id for OpenAI compatibility
response_id = session_id
created_timestamp = int(time.time())
if stream:
# Handle streaming response
asyncio_queue: asyncio.Queue = asyncio.Queue()
asyncio_queue_client_consumed: asyncio.Queue = asyncio.Queue()
event_manager = create_stream_tokens_event_manager(queue=asyncio_queue)
async def openai_stream_generator() -> AsyncGenerator[str, None]:
"""Convert Langflow events to OpenAI Responses API streaming format."""
main_task = asyncio.create_task(
run_flow_generator(
flow=flow,
input_request=simplified_request,
api_key_user=api_key_user,
event_manager=event_manager,
client_consumed_queue=asyncio_queue_client_consumed,
context=context,
)
)
try:
# Send initial chunk to establish connection
initial_chunk = OpenAIResponsesStreamChunk(
id=response_id,
created=created_timestamp,
model=request.model,
delta={"content": ""},
)
yield f"data: {initial_chunk.model_dump_json()}\n\n"
tool_call_counter = 0
processed_tools = set() # Track processed tool calls to avoid duplicates
previous_content = "" # Track content already sent to calculate deltas
async for event_data in consume_and_yield(asyncio_queue, asyncio_queue_client_consumed):
if event_data is None:
break
content = ""
# Parse byte string events as JSON
if isinstance(event_data, bytes):
try:
import json
event_str = event_data.decode("utf-8")
parsed_event = json.loads(event_str)
if isinstance(parsed_event, dict):
event_type = parsed_event.get("event")
data = parsed_event.get("data", {})
# Handle add_message events
if event_type == "add_message":
sender_name = data.get("sender_name", "")
text = data.get("text", "")
sender = data.get("sender", "")
content_blocks = data.get("content_blocks", [])
# Look for Agent Steps in content_blocks
for block in content_blocks:
if block.get("title") == "Agent Steps":
contents = block.get("contents", [])
for step in contents:
# Look for tool_use type items
if step.get("type") == "tool_use":
tool_name = step.get("name", "")
tool_input = step.get("tool_input", {})
tool_output = step.get("output")
# Only emit tool calls with explicit tool names and
# meaningful arguments
if tool_name and tool_input is not None and tool_output is not None:
# Create unique identifier for this tool call
tool_signature = (
f"{tool_name}:{hash(str(sorted(tool_input.items())))}"
)
# Skip if we've already processed this tool call
if tool_signature in processed_tools:
continue
processed_tools.add(tool_signature)
tool_call_counter += 1
call_id = f"call_{tool_call_counter}"
tool_id = f"fc_{tool_call_counter}"
tool_call_event = {
"type": "response.output_item.added",
"item": {
"id": tool_id,
"type": "function_call", # OpenAI uses "function_call"
"status": "in_progress", # OpenAI includes status
"name": tool_name,
"arguments": "", # Start with empty, build via deltas
"call_id": call_id,
},
}
yield (
f"event: response.output_item.added\n"
f"data: {json.dumps(tool_call_event)}\n\n"
)
# Send function call arguments as delta events (like OpenAI)
arguments_str = json.dumps(tool_input)
arg_delta_event = {
"type": "response.function_call_arguments.delta",
"delta": arguments_str,
"item_id": tool_id,
"output_index": 0,
}
yield (
f"event: response.function_call_arguments.delta\n"
f"data: {json.dumps(arg_delta_event)}\n\n"
)
# Send function call arguments done event
arg_done_event = {
"type": "response.function_call_arguments.done",
"arguments": arguments_str,
"item_id": tool_id,
"output_index": 0,
}
yield (
f"event: response.function_call_arguments.done\n"
f"data: {json.dumps(arg_done_event)}\n\n"
)
# If there's output, send completion event
if tool_output is not None:
# Check if include parameter requests tool_call.results
include_results = (
request.include
and "tool_call.results" in request.include
)
if include_results:
# Format with detailed results
tool_done_event = {
"type": "response.output_item.done",
"item": {
"id": f"{tool_name}_{tool_id}",
"inputs": tool_input, # Raw inputs as-is
"status": "completed",
"type": "tool_call",
"tool_name": f"{tool_name}",
"results": tool_output, # Raw output as-is
},
"output_index": 0,
"sequence_number": tool_call_counter + 5,
}
else:
# Regular function call format
tool_done_event = {
"type": "response.output_item.done",
"item": {
"id": tool_id,
"type": "function_call", # Match OpenAI format
"status": "completed",
"arguments": arguments_str,
"call_id": call_id,
"name": tool_name,
},
}
yield (
f"event: response.output_item.done\n"
f"data: {json.dumps(tool_done_event)}\n\n"
)
# Extract text content for streaming (only AI responses)
if (
sender in ["Machine", "AI", "Agent"]
and text != request.input
and sender_name == "Agent"
):
# Calculate delta: only send newly generated content
if text.startswith(previous_content):
content = text[len(previous_content) :]
previous_content = text
else:
# If text doesn't start with previous content, send full text
# This handles cases where the content might be reset
content = text
previous_content = text
except (json.JSONDecodeError, UnicodeDecodeError):
continue
# Only send chunks with actual content
if content:
chunk = OpenAIResponsesStreamChunk(
id=response_id,
created=created_timestamp,
model=request.model,
delta={"content": content},
)
yield f"data: {chunk.model_dump_json()}\n\n"
# Send final completion chunk
final_chunk = OpenAIResponsesStreamChunk(
id=response_id,
created=created_timestamp,
model=request.model,
delta={},
status="completed",
)
yield f"data: {final_chunk.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
except Exception as e: # noqa: BLE001
logger.error(f"Error in stream generator: {e}")
error_response = create_openai_error(
message=str(e),
type_="processing_error",
)
yield f"data: {error_response}\n\n"
finally:
if not main_task.done():
main_task.cancel()
return StreamingResponse(
openai_stream_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
},
)
# Handle non-streaming response
result = await simple_run_flow(
flow=flow,
input_request=simplified_request,
stream=False,
api_key_user=api_key_user,
context=context,
)
# Extract output text and tool calls from result
output_text = ""
tool_calls: list[dict[str, Any]] = []
if result.outputs:
for run_output in result.outputs:
if run_output and run_output.outputs:
for component_output in run_output.outputs:
if component_output:
# Handle messages (final chat outputs)
if hasattr(component_output, "messages") and component_output.messages:
for msg in component_output.messages:
if hasattr(msg, "message"):
output_text = msg.message
break
# Handle results
if not output_text and hasattr(component_output, "results") and component_output.results:
for value in component_output.results.values():
if hasattr(value, "get_text"):
output_text = value.get_text()
break
if isinstance(value, str):
output_text = value
break
if hasattr(component_output, "results") and component_output.results:
for blocks in component_output.results.get("message", {}).content_blocks:
tool_calls.extend(
{
"name": content.name,
"input": content.tool_input,
"output": content.output,
}
for content in blocks.contents
if isinstance(content, ToolContent)
)
if output_text:
break
if output_text:
break
# Build output array
output_items = []
# Add tool calls if includes parameter requests them
include_results = request.include and "tool_call.results" in request.include
tool_call_id_counter = 1
for tool_call in tool_calls:
if include_results:
# Format as detailed tool call with results (like file_search_call in sample)
tool_call_item = {
"id": f"{tool_call['name']}_{tool_call_id_counter}",
"queries": list(tool_call["input"].values())
if isinstance(tool_call["input"], dict)
else [str(tool_call["input"])],
"status": "completed",
"tool_name": f"{tool_call['name']}",
"type": "tool_call",
"results": tool_call["output"] if tool_call["output"] is not None else [],
}
else:
# Format as basic function call
tool_call_item = {
"id": f"fc_{tool_call_id_counter}",
"type": "function_call",
"status": "completed",
"name": tool_call["name"],
"arguments": json.dumps(tool_call["input"]) if tool_call["input"] is not None else "{}",
}
output_items.append(tool_call_item)
tool_call_id_counter += 1
# Add the message output
output_message = {
"type": "message",
"id": f"msg_{response_id}",
"status": "completed",
"role": "assistant",
"content": [{"type": "output_text", "text": output_text, "annotations": []}],
}
output_items.append(output_message)
return OpenAIResponsesResponse(
id=response_id,
created_at=created_timestamp,
model=request.model,
output=output_items,
previous_response_id=request.previous_response_id,
)
@router.post("/responses", response_model=None)
async def create_response(
request: OpenAIResponsesRequest,
background_tasks: BackgroundTasks,
api_key_user: Annotated[UserRead, Depends(api_key_security)],
telemetry_service: Annotated[TelemetryService, Depends(get_telemetry_service)],
http_request: Request,
) -> OpenAIResponsesResponse | StreamingResponse | OpenAIErrorResponse:
"""Create a response using OpenAI Responses API format.
This endpoint accepts a flow_id in the model parameter and processes
the input through the specified Langflow flow.
Args:
request: OpenAI Responses API request with model (flow_id) and input
background_tasks: FastAPI background task manager
api_key_user: Authenticated user from API key
http_request: The incoming HTTP request
telemetry_service: Telemetry service for logging
Returns:
OpenAI-compatible response or streaming response
Raises:
HTTPException: For validation errors or flow execution issues
"""
start_time = time.perf_counter()
# Extract global variables from X-LANGFLOW-GLOBAL-VAR-* headers
variables = {}
header_prefix = "x-langflow-global-var-"
logger.debug(f"All headers received: {list(http_request.headers.keys())}")
logger.debug(f"Looking for headers starting with: {header_prefix}")
for header_name, header_value in http_request.headers.items():
header_lower = header_name.lower()
logger.debug(f"Checking header: '{header_lower}' (original: '{header_name}')")
if header_lower.startswith(header_prefix):
# Extract variable name from header (remove prefix) and convert to uppercase
var_name_lower = header_lower[len(header_prefix) :]
var_name = var_name_lower.upper() # Default to uppercase
variables[var_name] = header_value
logger.debug(
f"Found global variable: {var_name} = {header_value} "
f"(converted to uppercase from header: {header_name})"
)
logger.debug(f"Extracted global variables from headers: {list(variables.keys())}")
logger.debug(f"Variables dict: {variables}")
# Validate tools parameter - error out if tools are provided
if request.tools is not None:
error_response = create_openai_error(
message="Tools are not supported yet",
type_="invalid_request_error",
code="tools_not_supported",
)
return OpenAIErrorResponse(error=error_response["error"])
# Get flow using the model field (which contains flow_id)
try:
flow = await get_flow_by_id_or_endpoint_name(request.model, str(api_key_user.id))
except HTTPException:
flow = None
if flow is None:
error_response = create_openai_error(
message=f"Flow with id '{request.model}' not found",
type_="invalid_request_error",
code="flow_not_found",
)
return OpenAIErrorResponse(error=error_response["error"])
try:
# Process the request
result = await run_flow_for_openai_responses(
flow=flow,
request=request,
api_key_user=api_key_user,
stream=request.stream,
variables=variables,
)
# Log telemetry for successful completion
if not request.stream: # Only log for non-streaming responses
end_time = time.perf_counter()
background_tasks.add_task(
telemetry_service.log_package_run,
RunPayload(
run_is_webhook=False,
run_seconds=int(end_time - start_time),
run_success=True,
run_error_message="",
),
)
except Exception as exc: # noqa: BLE001
logger.error(f"Error processing OpenAI Responses request: {exc}")
# Log telemetry for failed completion
background_tasks.add_task(
telemetry_service.log_package_run,
RunPayload(
run_is_webhook=False,
run_seconds=int(time.perf_counter() - start_time),
run_success=False,
run_error_message=str(exc),
),
)
# Return OpenAI-compatible error
error_response = create_openai_error(
message=str(exc),
type_="processing_error",
)
return OpenAIErrorResponse(error=error_response["error"])
return result

View file

@ -8,6 +8,7 @@ from typing import TYPE_CHECKING, Any, ClassVar
import yaml
from cachetools import TTLCache
from langchain_core.documents import Document
from loguru import logger
from pydantic import BaseModel
from langflow.custom.custom_component.base_component import BaseComponent
@ -421,6 +422,16 @@ class CustomComponent(BaseComponent):
if hasattr(self, "_user_id") and not self.user_id:
msg = f"User id is not set for {self.__class__.__name__}"
raise ValueError(msg)
# Check graph context for request-level variable overrides first
if hasattr(self, "graph") and self.graph and hasattr(self.graph, "context"):
context = self.graph.context
if context and "request_variables" in context:
request_variables = context["request_variables"]
if name in request_variables:
logger.debug(f"Found context override for variable '{name}': {request_variables[name]}")
return request_variables[name]
variable_service = get_variable_service() # Get service instance
# Retrieve and decrypt the variable by name for the current user
if isinstance(self.user_id, str):

View file

@ -1047,6 +1047,7 @@ class Graph:
flow_id: str | None = None,
flow_name: str | None = None,
user_id: str | None = None,
context: dict | None = None,
) -> Graph:
"""Creates a graph from a payload.
@ -1055,6 +1056,7 @@ class Graph:
flow_id: The ID of the flow.
flow_name: The flow name.
user_id: The user ID.
context: Optional context dictionary for request-specific data.
Returns:
Graph: The created graph.
@ -1064,7 +1066,7 @@ class Graph:
try:
vertices = payload["nodes"]
edges = payload["edges"]
graph = cls(flow_id=flow_id, flow_name=flow_name, user_id=user_id)
graph = cls(flow_id=flow_id, flow_name=flow_name, user_id=user_id, context=context)
graph.add_nodes_and_edges(vertices, edges)
except KeyError as exc:
logger.exception(exc)

View file

@ -0,0 +1,74 @@
from typing import Any, Literal
from pydantic import BaseModel, Field
class OpenAIResponsesRequest(BaseModel):
"""OpenAI-compatible responses request with flow_id as model parameter."""
model: str = Field(..., description="The flow ID to execute (used instead of OpenAI model)")
input: str = Field(..., description="The input text to process")
stream: bool = Field(default=False, description="Whether to stream the response")
background: bool = Field(default=False, description="Whether to process in background")
tools: list[Any] | None = Field(default=None, description="Tools are not supported yet")
previous_response_id: str | None = Field(
default=None, description="ID of previous response to continue conversation"
)
include: list[str] | None = Field(
default=None, description="Additional response data to include, e.g., ['tool_call.results']"
)
class OpenAIResponsesResponse(BaseModel):
"""OpenAI-compatible responses response."""
id: str
object: Literal["response"] = "response"
created_at: int
status: Literal["completed", "in_progress", "failed"] = "completed"
error: dict | None = None
incomplete_details: dict | None = None
instructions: str | None = None
max_output_tokens: int | None = None
model: str
output: list[dict]
parallel_tool_calls: bool = True
previous_response_id: str | None = None
reasoning: dict = Field(default_factory=lambda: {"effort": None, "summary": None})
store: bool = True
temperature: float = 1.0
text: dict = Field(default_factory=lambda: {"format": {"type": "text"}})
tool_choice: str = "auto"
tools: list[dict] = Field(default_factory=list)
top_p: float = 1.0
truncation: str = "disabled"
usage: dict | None = None
user: str | None = None
metadata: dict = Field(default_factory=dict)
class OpenAIResponsesStreamChunk(BaseModel):
"""OpenAI-compatible responses stream chunk."""
id: str
object: Literal["response.chunk"] = "response.chunk"
created: int
model: str
delta: dict
status: Literal["completed", "in_progress", "failed"] | None = None
class OpenAIErrorResponse(BaseModel):
error: dict = Field(..., description="Error details")
def create_openai_error(message: str, type_: str = "invalid_request_error", code: str | None = None) -> dict:
"""Create an OpenAI-compatible error response."""
error_data = {
"message": message,
"type": type_,
}
if code:
error_data["code"] = code
return {"error": error_data}

View file

@ -0,0 +1,503 @@
import asyncio
import json
import os
import pathlib
import pytest
from dotenv import load_dotenv
from httpx import AsyncClient
from loguru import logger
# Load environment variables from .env file
def load_env_vars():
"""Load environment variables from .env files."""
# Try to find .env file in various locations
possible_paths = [
pathlib.Path(".env"), # Current directory
pathlib.Path("../../.env"), # Project root
pathlib.Path("../../../.env"), # One level up from project root
]
for env_path in possible_paths:
if env_path.exists():
logger.info(f"Loading environment variables from {env_path.absolute()}")
load_dotenv(env_path)
return True
logger.warning("No .env file found. Using existing environment variables.")
return False
# Load environment variables at module import time
load_env_vars()
async def create_global_variable(client: AsyncClient, headers, name, value, variable_type="credential"):
"""Create a global variable in Langflow."""
payload = {"name": name, "value": value, "type": variable_type, "default_fields": []}
response = await client.post("/api/v1/variables/", json=payload, headers=headers)
if response.status_code != 201:
logger.error(f"Failed to create global variable: {response.content}")
return False
logger.info(f"Successfully created global variable: {name}")
return True
async def load_and_prepare_flow(client: AsyncClient, created_api_key):
"""Load a flow template, create it, and wait for it to be ready."""
# Set up headers
headers = {"x-api-key": created_api_key.api_key}
# Create OPENAI_API_KEY global variable
openai_api_key = os.getenv("OPENAI_API_KEY")
if not openai_api_key:
pytest.skip("OPENAI_API_KEY environment variable not set")
await create_global_variable(client, headers, "OPENAI_API_KEY", openai_api_key)
# Load the Basic Prompting template
template_path = (
pathlib.Path(__file__).resolve().parent.parent.parent
/ "base"
/ "langflow"
/ "initial_setup"
/ "starter_projects"
/ "Basic Prompting.json"
)
flow_data = await asyncio.to_thread(lambda: json.loads(pathlib.Path(template_path).read_text()))
# Add the flow
response = await client.post("/api/v1/flows/", json=flow_data, headers=headers)
logger.info(f"Flow creation response: {response.status_code}")
assert response.status_code == 201
flow = response.json()
# Poll for flow builds to complete
max_attempts = 10
for attempt in range(max_attempts):
# Get the flow builds
builds_response = await client.get(f"/api/v1/monitor/builds?flow_id={flow['id']}", headers=headers)
if builds_response.status_code == 200:
builds = builds_response.json().get("vertex_builds", {})
# Check if builds are complete
all_valid = True
for build_list in builds.values():
if not build_list or build_list[0].get("valid") is not True:
all_valid = False
break
if all_valid and builds:
logger.info(f"Flow builds completed successfully after {attempt + 1} attempts")
break
# Wait before polling again
if attempt < max_attempts - 1:
logger.info(f"Waiting for flow builds to complete (attempt {attempt + 1}/{max_attempts})...")
await asyncio.sleep(1)
else:
logger.warning("Flow builds polling timed out, proceeding anyway")
return flow, headers
@pytest.mark.api_key_required
@pytest.mark.integration
async def test_openai_responses_invalid_flow_id(client: AsyncClient, created_api_key):
"""Test the OpenAI responses endpoint with an invalid flow ID."""
headers = {"x-api-key": created_api_key.api_key}
# Test with non-existent flow ID
payload = {"model": "non-existent-flow-id", "input": "Hello", "stream": False}
response = await client.post("/api/v1/responses", json=payload, headers=headers)
assert response.status_code == 200 # OpenAI errors are still 200 status
data = response.json()
assert "error" in data
assert isinstance(data["error"], dict)
assert data["error"]["type"] == "invalid_request_error"
assert "not found" in data["error"]["message"].lower()
@pytest.mark.api_key_required
@pytest.mark.integration
async def test_openai_responses_with_tools(client: AsyncClient, created_api_key):
"""Test that tools parameter is rejected."""
flow, headers = await load_and_prepare_flow(client, created_api_key)
# Test with tools parameter
payload = {
"model": flow["id"],
"input": "Hello",
"stream": False,
"tools": [{"type": "function", "function": {"name": "test", "parameters": {}}}],
}
response = await client.post("/api/v1/responses", json=payload, headers=headers)
assert response.status_code == 200 # OpenAI errors are still 200 status
data = response.json()
assert "error" in data
assert isinstance(data["error"], dict)
assert data["error"]["type"] == "invalid_request_error"
assert data["error"]["code"] == "tools_not_supported"
assert "tools are not supported" in data["error"]["message"].lower()
@pytest.mark.api_key_required
@pytest.mark.integration
async def test_openai_responses_empty_input(client: AsyncClient, created_api_key):
"""Test the OpenAI responses endpoint with empty input."""
flow, headers = await load_and_prepare_flow(client, created_api_key)
# Test with empty input
payload = {"model": flow["id"], "input": "", "stream": False}
response = await client.post("/api/v1/responses", json=payload, headers=headers)
logger.info(f"Empty input response status: {response.status_code}")
# The flow might still process empty input, so we check for a valid response structure
data = response.json()
if "error" not in data or data["error"] is None:
# Valid response even with empty input
assert "id" in data
assert "output" in data
assert "created_at" in data
assert data["object"] == "response"
else:
# Some flows might reject empty input
assert isinstance(data["error"], dict)
assert "message" in data["error"]
@pytest.mark.api_key_required
@pytest.mark.integration
async def test_openai_responses_long_input(client: AsyncClient, created_api_key):
"""Test the OpenAI responses endpoint with very long input."""
flow, headers = await load_and_prepare_flow(client, created_api_key)
# Create a very long input
long_input = "Hello " * 1000 # ~6000 characters
payload = {"model": flow["id"], "input": long_input, "stream": False}
response = await client.post("/api/v1/responses", json=payload, headers=headers)
assert response.status_code == 200
data = response.json()
if "error" not in data:
assert "id" in data
assert "output" in data
assert isinstance(data["output"], str)
@pytest.mark.api_key_required
@pytest.mark.integration
async def test_openai_responses_streaming_error_handling(client: AsyncClient, created_api_key):
"""Test streaming response error handling."""
headers = {"x-api-key": created_api_key.api_key}
# Test with invalid flow ID in streaming mode
payload = {"model": "invalid-flow-id", "input": "Hello", "stream": True}
response = await client.post("/api/v1/responses", json=payload, headers=headers)
# For streaming errors, we should still get a 200 status but with error in the response
assert response.status_code == 200
# Read the response content
content = await response.aread()
text_content = content.decode("utf-8")
# Should contain error information in JSON format, not SSE
data = json.loads(text_content)
assert "error" in data
assert isinstance(data["error"], dict)
assert data["error"]["type"] == "invalid_request_error"
@pytest.mark.api_key_required
@pytest.mark.integration
async def test_openai_responses_concurrent_requests(client: AsyncClient, created_api_key):
"""Test handling of concurrent requests to the same flow."""
flow, headers = await load_and_prepare_flow(client, created_api_key)
# Create multiple concurrent requests
payloads = [{"model": flow["id"], "input": f"Request {i}", "stream": False} for i in range(5)]
# Send all requests concurrently
tasks = [client.post("/api/v1/responses", json=payload, headers=headers) for payload in payloads]
responses = await asyncio.gather(*tasks)
# All requests should succeed
for i, response in enumerate(responses):
assert response.status_code == 200
data = response.json()
if "error" not in data:
assert "id" in data
assert "output" in data
# Each response should have a unique ID
assert all(
data["id"] != other.json()["id"]
for j, other in enumerate(responses)
if i != j and "error" not in other.json()
)
@pytest.mark.api_key_required
@pytest.mark.integration
async def test_openai_responses_unauthorized(client: AsyncClient):
"""Test the OpenAI responses endpoint without authentication."""
payload = {"model": "some-flow-id", "input": "Hello", "stream": False}
# No headers = no authentication
response = await client.post("/api/v1/responses", json=payload)
# Should get 403 Forbidden
assert response.status_code == 403
@pytest.mark.api_key_required
@pytest.mark.integration
async def test_openai_responses_invalid_api_key(client: AsyncClient):
"""Test the OpenAI responses endpoint with invalid API key."""
headers = {"x-api-key": "invalid-api-key-12345"}
payload = {"model": "some-flow-id", "input": "Hello", "stream": False}
response = await client.post("/api/v1/responses", json=payload, headers=headers)
# Should get 403 Forbidden
assert response.status_code == 403
@pytest.mark.api_key_required
@pytest.mark.integration
async def test_openai_responses_malformed_request(client: AsyncClient, created_api_key):
"""Test the OpenAI responses endpoint with malformed requests."""
headers = {"x-api-key": created_api_key.api_key}
# Missing required fields
test_cases = [
{}, # Empty payload
{"model": "flow-id"}, # Missing input
{"input": "Hello"}, # Missing model
{"model": 123, "input": "Hello"}, # Wrong type for model
{"model": "flow-id", "input": 123}, # Wrong type for input
{"model": "flow-id", "input": "Hello", "stream": "yes"}, # Wrong type for stream
]
for payload in test_cases:
response = await client.post("/api/v1/responses", json=payload, headers=headers)
# OpenAI API returns validation errors as 200 with error in body or 422
if response.status_code == 200:
data = response.json()
assert "error" in data
assert isinstance(data["error"], dict)
assert "message" in data["error"]
else:
# Should get 422 Unprocessable Entity for validation errors
assert response.status_code == 422
@pytest.mark.api_key_required
@pytest.mark.integration
async def test_openai_responses_stream_interruption(client: AsyncClient, created_api_key):
"""Test behavior when streaming is interrupted."""
flow, headers = await load_and_prepare_flow(client, created_api_key)
payload = {"model": flow["id"], "input": "Tell me a long story", "stream": True}
response = await client.post("/api/v1/responses", json=payload, headers=headers)
assert response.status_code == 200
# Read only first 500 bytes then close (streaming might need more bytes)
content = await response.aread()
text_content = content.decode("utf-8")
# Should have received at least some data
assert len(content) > 0
# Check for either data: or valid response content
assert "data:" in text_content or "id" in text_content
@pytest.mark.api_key_required
@pytest.mark.integration
async def test_openai_responses_background_processing(client: AsyncClient, created_api_key):
"""Test background processing parameter."""
flow, headers = await load_and_prepare_flow(client, created_api_key)
# Test with background=True
payload = {"model": flow["id"], "input": "Hello", "background": True, "stream": False}
response = await client.post("/api/v1/responses", json=payload, headers=headers)
assert response.status_code == 200
data = response.json()
if "error" not in data or data["error"] is None:
assert "id" in data
assert "status" in data
# Background processing might change the status
assert data["status"] in ["completed", "in_progress"]
@pytest.mark.api_key_required
@pytest.mark.integration
async def test_openai_responses_previous_response_id(client: AsyncClient, created_api_key):
"""Test previous_response_id parameter for conversation continuity."""
flow, headers = await load_and_prepare_flow(client, created_api_key)
# First request
payload1 = {"model": flow["id"], "input": "Hello", "stream": False}
response1 = await client.post("/api/v1/responses", json=payload1, headers=headers)
assert response1.status_code == 200
data1 = response1.json()
if "error" not in data1 or data1["error"] is None:
first_response_id = data1["id"]
# Second request with previous_response_id
payload2 = {
"model": flow["id"],
"input": "Continue our conversation",
"previous_response_id": first_response_id,
"stream": False,
}
response2 = await client.post("/api/v1/responses", json=payload2, headers=headers)
assert response2.status_code == 200
data2 = response2.json()
if "error" not in data2 or data2["error"] is None:
# The previous_response_id might be preserved in the response
# This depends on the implementation, so we just check it doesn't error
# We'll just verify that the request was processed successfully
assert "id" in data2
assert "output" in data2
@pytest.mark.api_key_required
@pytest.mark.integration
async def test_openai_responses_response_format(client: AsyncClient, created_api_key):
"""Test OpenAI response format compliance."""
flow, headers = await load_and_prepare_flow(client, created_api_key)
payload = {"model": flow["id"], "input": "Hello", "stream": False}
response = await client.post("/api/v1/responses", json=payload, headers=headers)
assert response.status_code == 200
data = response.json()
if "error" not in data or data["error"] is None:
# Check OpenAI response format compliance
required_fields = ["id", "object", "created_at", "status", "model", "output"]
for field in required_fields:
assert field in data, f"Missing required field: {field}"
# Check field types and values
assert isinstance(data["id"], str)
assert data["object"] == "response"
assert isinstance(data["created_at"], int)
assert data["status"] in ["completed", "in_progress", "failed"]
assert isinstance(data["model"], str)
assert isinstance(data["output"], list)
# Check optional fields with expected defaults
assert data["parallel_tool_calls"] is True
assert data["store"] is True
assert data["temperature"] == 1.0
assert data["top_p"] == 1.0
assert data["truncation"] == "disabled"
assert data["tool_choice"] == "auto"
assert isinstance(data["tools"], list)
assert isinstance(data["reasoning"], dict)
assert isinstance(data["text"], dict)
assert isinstance(data["metadata"], dict)
@pytest.mark.api_key_required
@pytest.mark.integration
async def test_openai_responses_stream_chunk_format(client: AsyncClient, created_api_key):
"""Test OpenAI streaming response chunk format compliance."""
flow, headers = await load_and_prepare_flow(client, created_api_key)
payload = {"model": flow["id"], "input": "Hello", "stream": True}
response = await client.post("/api/v1/responses", json=payload, headers=headers)
assert response.status_code == 200
content = await response.aread()
text_content = content.decode("utf-8")
# Parse the events
events = text_content.strip().split("\n\n")
data_events = [evt for evt in events if evt.startswith("data:") and not evt.startswith("data: [DONE]")]
if data_events:
# Check first chunk format
first_chunk_json = data_events[0].replace("data: ", "")
try:
first_chunk = json.loads(first_chunk_json)
# Basic checks for streaming response
assert "id" in first_chunk
assert "delta" in first_chunk
assert isinstance(first_chunk["id"], str)
assert isinstance(first_chunk["delta"], dict)
# Check OpenAI stream chunk format compliance if fields exist
if "object" in first_chunk:
assert first_chunk["object"] == "response.chunk"
if "created" in first_chunk:
assert isinstance(first_chunk["created"], int)
if "model" in first_chunk:
assert isinstance(first_chunk["model"], str)
# Status is optional in chunks and can be None
if "status" in first_chunk and first_chunk["status"] is not None:
assert first_chunk["status"] in ["completed", "in_progress", "failed"]
except json.JSONDecodeError:
# If streaming format is different or not JSON, just ensure we have data
assert len(data_events) > 0
else:
# If no streaming chunks, ensure we have the [DONE] marker or valid response
assert "data: [DONE]" in text_content or len(text_content) > 0
@pytest.mark.api_key_required
@pytest.mark.integration
async def test_openai_responses_rate_limiting_simulation(client: AsyncClient, created_api_key):
"""Test behavior under rapid successive requests."""
flow, headers = await load_and_prepare_flow(client, created_api_key)
# Send 10 rapid requests
rapid_requests = []
for i in range(10):
payload = {"model": flow["id"], "input": f"Rapid request {i}", "stream": False}
rapid_requests.append(client.post("/api/v1/responses", json=payload, headers=headers))
# Wait for all requests to complete
responses = await asyncio.gather(*rapid_requests, return_exceptions=True)
# Check that most requests succeeded (allowing for some potential failures)
successful_responses = [r for r in responses if not isinstance(r, Exception) and r.status_code == 200]
# At least 50% should succeed
assert len(successful_responses) >= 5
# Check that successful responses have unique IDs
response_ids = []
for response in successful_responses:
data = response.json()
if "error" not in data or data["error"] is None:
response_ids.append(data["id"])
# All response IDs should be unique
assert len(response_ids) == len(set(response_ids))

View file

@ -0,0 +1,167 @@
import asyncio
import json
import os
import pathlib
import pytest
from dotenv import find_dotenv, load_dotenv
from httpx import AsyncClient
from loguru import logger
load_dotenv(find_dotenv())
async def create_global_variable(client: AsyncClient, headers, name, value, variable_type="credential"):
"""Create a global variable in Langflow."""
payload = {"name": name, "value": value, "type": variable_type, "default_fields": []}
response = await client.post("/api/v1/variables/", json=payload, headers=headers)
if response.status_code != 201:
logger.error(f"Failed to create global variable: {response.content}")
return False
logger.info(f"Successfully created global variable: {name}")
return True
async def load_and_prepare_flow(client: AsyncClient, created_api_key):
"""Load a flow template, create it, and wait for it to be ready."""
# Set up headers
headers = {"x-api-key": created_api_key.api_key}
# Create OPENAI_API_KEY global variable
openai_api_key = os.getenv("OPENAI_API_KEY")
if not openai_api_key:
pytest.skip("OPENAI_API_KEY environment variable not set")
await create_global_variable(client, headers, "OPENAI_API_KEY", openai_api_key)
# Load the Basic Prompting template
template_path = (
pathlib.Path(__file__).resolve().parent.parent.parent
/ "base"
/ "langflow"
/ "initial_setup"
/ "starter_projects"
/ "Basic Prompting.json"
)
flow_data = await asyncio.to_thread(lambda: json.loads(pathlib.Path(template_path).read_text()))
# Add the flow
response = await client.post("/api/v1/flows/", json=flow_data, headers=headers)
logger.info(f"Flow creation response: {response.status_code}")
assert response.status_code == 201
flow = response.json()
# Poll for flow builds to complete
max_attempts = 10
for attempt in range(max_attempts):
# Get the flow builds
builds_response = await client.get(f"/api/v1/monitor/builds?flow_id={flow['id']}", headers=headers)
if builds_response.status_code == 200:
builds = builds_response.json().get("vertex_builds", {})
# Check if builds are complete
all_valid = True
for build_list in builds.values():
if not build_list or build_list[0].get("valid") is not True:
all_valid = False
break
if all_valid and builds:
logger.info(f"Flow builds completed successfully after {attempt + 1} attempts")
break
# Wait before polling again
if attempt < max_attempts - 1:
logger.info(f"Waiting for flow builds to complete (attempt {attempt + 1}/{max_attempts})...")
await asyncio.sleep(1)
else:
logger.warning("Flow builds polling timed out, proceeding anyway")
return flow, headers
@pytest.mark.api_key_required
@pytest.mark.integration
async def test_openai_responses_non_streaming(client: AsyncClient, created_api_key):
"""Test the OpenAI-compatible non-streaming responses endpoint directly."""
flow, headers = await load_and_prepare_flow(client, created_api_key)
# Now test the OpenAI-compatible endpoint
payload = {"model": flow["id"], "input": "Hello, Langflow!", "stream": False}
# Make the request
response = await client.post("/api/v1/responses", json=payload, headers=headers)
logger.info(f"Response status: {response.status_code}")
logger.debug(f"Response content: {response.content}")
# Handle potential errors
if response.status_code != 200:
logger.error(f"Error response: {response.content}")
pytest.fail(f"Request failed with status {response.status_code}")
try:
data = response.json()
if "error" in data and data["error"] is not None:
logger.error(f"Error in response: {data['error']}")
# Don't fail immediately, log more details for debugging
logger.error(f"Full error details: {data}")
error_msg = "Unknown error"
if isinstance(data.get("error"), dict):
error_msg = data["error"].get("message", "Unknown error")
elif data.get("error"):
error_msg = str(data["error"])
pytest.fail(f"Error in response: {error_msg}")
# Validate the response
assert "id" in data
assert "output" in data
except Exception as exc:
logger.exception("Exception parsing response")
pytest.fail(f"Failed to parse response: {exc}")
@pytest.mark.api_key_required
@pytest.mark.integration
async def test_openai_responses_streaming(client: AsyncClient, created_api_key):
"""Test the OpenAI-compatible streaming responses endpoint directly."""
flow, headers = await load_and_prepare_flow(client, created_api_key)
# Now test the OpenAI-compatible streaming endpoint
payload = {"model": flow["id"], "input": "Hello, stream!", "stream": True}
# Make the request
response = await client.post("/api/v1/responses", json=payload, headers=headers)
logger.info(f"Response status: {response.status_code}")
# Handle potential errors
if response.status_code != 200:
logger.error(f"Error response: {response.content}")
pytest.fail(f"Request failed with status {response.status_code}")
# For streaming, we should get a stream of server-sent events
content = await response.aread()
text_content = content.decode("utf-8")
logger.debug(f"Response content (first 200 chars): {text_content[:200]}")
# Check that we got some SSE data events
assert "data:" in text_content
# Parse the events to validate structure and final [DONE] marker
events = text_content.strip().split("\n\n")
# The stream must end with the OpenAI '[DONE]' sentinel
assert events, "No events in stream"
assert events[-1].strip() == "data: [DONE]", "Stream did not end with [DONE] marker"
# Filter out the [DONE] marker to inspect JSON data events
data_events = [evt for evt in events if evt.startswith("data:") and not evt.startswith("data: [DONE]")]
assert data_events, "No streaming events were received"
# Parse the first and last JSON events to check their structure
first_event = json.loads(data_events[0].replace("data: ", ""))
last_event = json.loads(data_events[-1].replace("data: ", ""))
assert "delta" in first_event
assert "delta" in last_event

View file

@ -0,0 +1,343 @@
import asyncio
import json
import os
import pathlib
import httpx
import pytest
from dotenv import load_dotenv
from httpx import AsyncClient
from loguru import logger
# Load environment variables from .env file
def load_env_vars():
"""Load environment variables from .env files."""
possible_paths = [
pathlib.Path(".env"),
pathlib.Path("../../.env"),
pathlib.Path("../../../.env"),
]
for env_path in possible_paths:
if env_path.exists():
logger.info(f"Loading environment variables from {env_path.absolute()}")
load_dotenv(env_path)
return True
logger.warning("No .env file found. Using existing environment variables.")
return False
# Load environment variables at module import time
load_env_vars()
async def create_global_variable(client: AsyncClient, headers, name, value, variable_type="credential"):
"""Create a global variable in Langflow."""
payload = {"name": name, "value": value, "type": variable_type, "default_fields": []}
response = await client.post("/api/v1/variables/", json=payload, headers=headers)
if response.status_code != 201:
logger.error(f"Failed to create global variable: {response.content}")
return False
logger.info(f"Successfully created global variable: {name}")
return True
async def load_and_prepare_flow(client: AsyncClient, created_api_key):
"""Load Simple Agent flow and wait for it to be ready."""
headers = {"x-api-key": created_api_key.api_key}
# Create OPENAI_API_KEY global variable
openai_api_key = os.getenv("OPENAI_API_KEY")
if not openai_api_key or openai_api_key == "dummy":
pytest.skip("OPENAI_API_KEY environment variable not set")
await create_global_variable(client, headers, "OPENAI_API_KEY", openai_api_key)
# Load the Simple Agent template
template_path = (
pathlib.Path(__file__).resolve().parent.parent.parent
/ "base"
/ "langflow"
/ "initial_setup"
/ "starter_projects"
/ "Simple Agent.json"
)
flow_data = await asyncio.to_thread(lambda: json.loads(pathlib.Path(template_path).read_text()))
# Add the flow
response = await client.post("/api/v1/flows/", json=flow_data, headers=headers)
assert response.status_code == 201
flow = response.json()
# Poll for flow builds to complete
max_attempts = 10
for attempt in range(max_attempts):
builds_response = await client.get(f"/api/v1/monitor/builds?flow_id={flow['id']}", headers=headers)
if builds_response.status_code == 200:
builds = builds_response.json().get("vertex_builds", {})
all_valid = True
for build_list in builds.values():
if not build_list or build_list[0].get("valid") is not True:
all_valid = False
break
if all_valid and builds:
break
if attempt < max_attempts - 1:
await asyncio.sleep(1)
return flow, headers
@pytest.mark.api_key_required
@pytest.mark.integration
async def test_openai_streaming_format_comparison(client: AsyncClient, created_api_key):
"""Compare raw HTTP streaming formats between OpenAI and our API."""
# Test input
input_msg = "What is 25 + 17? Use your calculator tool."
# Tools definition
tools = [
{
"type": "function",
"name": "evaluate_expression",
"description": "Perform basic arithmetic operations on a given expression.",
"parameters": {
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "The arithmetic expression to evaluate (e.g., '4*4*(33/22)+12-20').",
}
},
"required": ["expression"],
},
}
]
# Get OpenAI API key
openai_api_key = os.getenv("OPENAI_API_KEY")
if not openai_api_key:
pytest.skip("OPENAI_API_KEY environment variable not set")
# === Test OpenAI's raw HTTP streaming format ===
logger.info("=== Testing OpenAI API Raw HTTP Format ===")
async with httpx.AsyncClient() as openai_client:
openai_payload = {"model": "gpt-4o-mini", "input": input_msg, "tools": tools, "stream": True}
openai_response = await openai_client.post(
"https://api.openai.com/v1/responses",
headers={"Authorization": f"Bearer {openai_api_key}", "Content-Type": "application/json"},
json=openai_payload,
)
logger.info(f"OpenAI status: {openai_response.status_code}")
if openai_response.status_code != 200:
logger.error(f"OpenAI error: {openai_response.text}")
pytest.skip("OpenAI API request failed")
# Parse OpenAI's raw SSE stream
openai_content = await openai_response.aread()
openai_text = openai_content.decode("utf-8")
openai_events = openai_text.strip().split("\n\n")
openai_data_events = [evt for evt in openai_events if "data: " in evt and not evt.startswith("data: [DONE]")]
# === Test Our API's streaming format ===
logger.info("=== Testing Our API Format ===")
flow, headers = await load_and_prepare_flow(client, created_api_key)
our_payload = {"model": flow["id"], "input": input_msg, "stream": True, "include": ["tool_call.results"]}
our_response = await client.post("/api/v1/responses", json=our_payload, headers=headers)
assert our_response.status_code == 200
our_content = await our_response.aread()
our_text = our_content.decode("utf-8")
our_events = our_text.strip().split("\n\n")
our_data_events = [evt for evt in our_events if "data: " in evt and not evt.startswith("data: [DONE]")]
# === Parse and compare events ===
# Extract JSON data from OpenAI events
openai_parsed = []
for event_block in openai_data_events:
lines = event_block.strip().split("\n")
for line in lines:
if line.startswith("data: "):
try:
json_str = line.replace("data: ", "", 1)
event_data = json.loads(json_str)
openai_parsed.append(event_data)
break
except json.JSONDecodeError:
continue
# Extract JSON data from our events
our_parsed = []
for event_block in our_data_events:
lines = event_block.strip().split("\n")
for line in lines:
if line.startswith("data: "):
try:
json_str = line.replace("data: ", "", 1)
event_data = json.loads(json_str)
our_parsed.append(event_data)
break
except json.JSONDecodeError:
continue
# === Analysis ===
logger.info("Event counts:")
logger.info(f" OpenAI: {len(openai_parsed)} events")
logger.info(f" Our API: {len(our_parsed)} events")
# Check for tool call events with detailed logging
logger.info("Detailed OpenAI event analysis:")
output_item_added_events = [e for e in openai_parsed if e.get("type") == "response.output_item.added"]
logger.info(f" Found {len(output_item_added_events)} 'response.output_item.added' events")
for i, event in enumerate(output_item_added_events):
item = event.get("item", {})
item_type = item.get("type", "unknown")
logger.info(f" Event {i}: item.type = '{item_type}'")
logger.info(f" Event {i}: item keys = {list(item.keys())}")
if "name" in item:
logger.info(f" Event {i}: item.name = '{item.get('name')}'")
logger.debug(f" Event {i}: full item = {json.dumps(item, indent=6)}")
openai_tool_events = [
e
for e in openai_parsed
if e.get("type") == "response.output_item.added" and e.get("item", {}).get("type") == "tool_call"
]
openai_function_events = [
e
for e in openai_parsed
if e.get("type") == "response.output_item.added" and e.get("item", {}).get("type") == "function_call"
]
logger.info("Detailed Our API event analysis:")
our_output_item_added_events = [e for e in our_parsed if e.get("type") == "response.output_item.added"]
logger.info(f" Found {len(our_output_item_added_events)} 'response.output_item.added' events")
for i, event in enumerate(our_output_item_added_events):
item = event.get("item", {})
item_type = item.get("type", "unknown")
logger.info(f" Event {i}: item.type = '{item_type}'")
logger.info(f" Event {i}: item keys = {list(item.keys())}")
if "name" in item:
logger.info(f" Event {i}: item.name = '{item.get('name')}'")
logger.debug(f" Event {i}: full item = {json.dumps(item, indent=6)}")
our_function_events = [
e
for e in our_parsed
if e.get("type") == "response.output_item.added" and e.get("item", {}).get("type") == "function_call"
]
logger.info("Tool call detection results:")
logger.info(f" OpenAI tool_call events: {len(openai_tool_events)}")
logger.info(f" OpenAI function_call events: {len(openai_function_events)}")
logger.info(f" Our function_call events: {len(our_function_events)}")
# Use the correct event type for OpenAI (function_call vs tool_call)
openai_actual_tool_events = openai_function_events if openai_function_events else openai_tool_events
logger.info("Function call events:")
logger.info(f" OpenAI: {len(openai_actual_tool_events)} function call events")
logger.info(f" Our API: {len(our_function_events)} function call events")
# Show event types
openai_types = {e.get("type", e.get("object", "unknown")) for e in openai_parsed}
our_types = {e.get("type", e.get("object", "unknown")) for e in our_parsed}
logger.info("Event types:")
logger.info(f" OpenAI: {sorted(openai_types)}")
logger.info(f" Our API: {sorted(our_types)}")
# Print sample events for debugging
logger.info("Sample OpenAI events:")
for i, event in enumerate(openai_parsed[:3]):
logger.debug(f" {i}: {json.dumps(event, indent=2)[:200]}...")
logger.info("Sample Our events:")
for i, event in enumerate(our_parsed[:3]):
logger.debug(f" {i}: {json.dumps(event, indent=2)[:200]}...")
# Check delta content for duplicates/accumulation
logger.info("Checking delta content for proper streaming:")
delta_contents = []
for i, event in enumerate(our_parsed):
if event.get("object") == "response.chunk" and "delta" in event:
delta_content = event["delta"].get("content", "")
if delta_content: # Only track non-empty content
delta_contents.append(delta_content)
logger.info(f" Delta {i}: '{delta_content}'")
# Check for accumulated content (bad) vs incremental content (good)
if len(delta_contents) > 1:
logger.info("Analyzing delta content patterns:")
accumulated_pattern = True
for i in range(1, len(delta_contents)):
if not delta_contents[i].startswith(delta_contents[i - 1]):
accumulated_pattern = False
break
if accumulated_pattern:
logger.error("❌ DETECTED ACCUMULATED CONTENT PATTERN (BAD)")
logger.error("Each delta contains the full accumulated message instead of just new content")
logger.error("Example:")
for i, content in enumerate(delta_contents[:3]):
logger.error(f" Delta {i}: '{content}'")
else:
logger.success("✅ DETECTED INCREMENTAL CONTENT PATTERN (GOOD)")
logger.success("Each delta contains only new content")
else:
logger.info("Not enough delta content to analyze pattern")
if openai_actual_tool_events:
logger.info("OpenAI tool call example:")
logger.debug(f" {json.dumps(openai_actual_tool_events[0], indent=2)}")
if our_function_events:
logger.info("Our function call example:")
logger.debug(f" {json.dumps(our_function_events[0], indent=2)}")
# === Validation ===
# Basic validation
assert len(openai_parsed) > 0, "No OpenAI events received"
assert len(our_parsed) > 0, "No events from our API"
# Check if both APIs produced function call events
if len(openai_actual_tool_events) > 0:
logger.success("✅ OpenAI produced function call events")
if len(our_function_events) > 0:
logger.success("✅ Our API also produced function call events")
logger.success("✅ Both APIs support function call streaming")
else:
logger.error("❌ Our API did not produce function call events")
pytest.fail("Our API should produce function call events when OpenAI does")
else:
logger.info("No function calls were made by OpenAI")
logger.info("📊 Test Summary:")
logger.info(f" OpenAI events: {len(openai_parsed)}")
logger.info(f" Our events: {len(our_parsed)}")
logger.info(f" OpenAI function events: {len(openai_actual_tool_events)}")
logger.info(f" Our function events: {len(our_function_events)}")
compatibility_result = (
"✅ PASS" if len(our_function_events) > 0 or len(openai_actual_tool_events) == 0 else "❌ FAIL"
)
logger.info(f" Format compatibility: {compatibility_result}")