diff --git a/src/backend/langflow/graph/vertex/utils.py b/src/backend/langflow/graph/vertex/utils.py index 3fa065e62..0964e7b0a 100644 --- a/src/backend/langflow/graph/vertex/utils.py +++ b/src/backend/langflow/graph/vertex/utils.py @@ -2,8 +2,8 @@ from typing import Any, Optional, Union from langchain_core.messages import BaseMessage from langchain_core.runnables import Runnable -from langflow.services.deps import get_socket_service from langflow.utils.constants import PYTHON_BASIC_TYPES +from loguru import logger def is_basic_type(obj): @@ -11,16 +11,43 @@ def is_basic_type(obj): async def invoke_lc_runnable( - built_object: Runnable, inputs: dict, has_external_output: bool, session_id: Optional[str] = None + langchain_object: Runnable, inputs: dict, has_external_output: bool, session_id: Optional[str] = None, **kwargs ) -> Union[str, BaseMessage]: - if has_external_output: - socketio_service = get_socket_service() - result = "" - stream = built_object.astream(inputs) - async for chunk in stream: - await socketio_service.emit_token(session_id, chunk) - result += chunk - return await built_object.ainvoke(inputs) + # Setup callbacks for asynchronous execution + from langflow.processing.base import setup_callbacks + + callbacks = setup_callbacks(sync=False, trace_id=session_id, **kwargs) + + try: + if has_external_output and hasattr(langchain_object, "astream"): + # Asynchronous stream handling if supported and required + output = "" + async for chunk in langchain_object.astream(inputs, {"callbacks": callbacks}): + output += chunk + return output + else: + # Direct asynchronous invocation + return await langchain_object.ainvoke(inputs, {"callbacks": callbacks}) + except Exception as async_exc: + logger.debug(f"Async error, falling back to sync: {str(async_exc)}") + + # Setup synchronous callbacks for the fallback + sync_callbacks = setup_callbacks(sync=True, trace_id=session_id, **kwargs) + try: + # Synchronous fallback if asynchronous execution fails + if has_external_output and hasattr(langchain_object, "stream"): + # Synchronous stream handling if supported and required + output = "" + for chunk in langchain_object.stream(inputs, {"callbacks": sync_callbacks}): + output += chunk + return output + else: + # Direct synchronous invocation + return langchain_object.invoke(inputs, {"callbacks": sync_callbacks}) + except Exception as sync_exc: + logger.error(f"Sync error after async failure: {str(sync_exc)}") + # Handle or re-raise exception as appropriate for your application + raise sync_exc from async_exc async def generate_result(built_object: Any, inputs: dict, has_external_output: bool, session_id: Optional[str] = None):