Refactor async execution and add error handling
This commit is contained in:
parent
1934a77ffa
commit
4b9d402b71
1 changed files with 37 additions and 10 deletions
|
|
@ -2,8 +2,8 @@ from typing import Any, Optional, Union
|
|||
|
||||
from langchain_core.messages import BaseMessage
|
||||
from langchain_core.runnables import Runnable
|
||||
from langflow.services.deps import get_socket_service
|
||||
from langflow.utils.constants import PYTHON_BASIC_TYPES
|
||||
from loguru import logger
|
||||
|
||||
|
||||
def is_basic_type(obj):
|
||||
|
|
@ -11,16 +11,43 @@ def is_basic_type(obj):
|
|||
|
||||
|
||||
async def invoke_lc_runnable(
|
||||
built_object: Runnable, inputs: dict, has_external_output: bool, session_id: Optional[str] = None
|
||||
langchain_object: Runnable, inputs: dict, has_external_output: bool, session_id: Optional[str] = None, **kwargs
|
||||
) -> Union[str, BaseMessage]:
|
||||
if has_external_output:
|
||||
socketio_service = get_socket_service()
|
||||
result = ""
|
||||
stream = built_object.astream(inputs)
|
||||
async for chunk in stream:
|
||||
await socketio_service.emit_token(session_id, chunk)
|
||||
result += chunk
|
||||
return await built_object.ainvoke(inputs)
|
||||
# Setup callbacks for asynchronous execution
|
||||
from langflow.processing.base import setup_callbacks
|
||||
|
||||
callbacks = setup_callbacks(sync=False, trace_id=session_id, **kwargs)
|
||||
|
||||
try:
|
||||
if has_external_output and hasattr(langchain_object, "astream"):
|
||||
# Asynchronous stream handling if supported and required
|
||||
output = ""
|
||||
async for chunk in langchain_object.astream(inputs, {"callbacks": callbacks}):
|
||||
output += chunk
|
||||
return output
|
||||
else:
|
||||
# Direct asynchronous invocation
|
||||
return await langchain_object.ainvoke(inputs, {"callbacks": callbacks})
|
||||
except Exception as async_exc:
|
||||
logger.debug(f"Async error, falling back to sync: {str(async_exc)}")
|
||||
|
||||
# Setup synchronous callbacks for the fallback
|
||||
sync_callbacks = setup_callbacks(sync=True, trace_id=session_id, **kwargs)
|
||||
try:
|
||||
# Synchronous fallback if asynchronous execution fails
|
||||
if has_external_output and hasattr(langchain_object, "stream"):
|
||||
# Synchronous stream handling if supported and required
|
||||
output = ""
|
||||
for chunk in langchain_object.stream(inputs, {"callbacks": sync_callbacks}):
|
||||
output += chunk
|
||||
return output
|
||||
else:
|
||||
# Direct synchronous invocation
|
||||
return langchain_object.invoke(inputs, {"callbacks": sync_callbacks})
|
||||
except Exception as sync_exc:
|
||||
logger.error(f"Sync error after async failure: {str(sync_exc)}")
|
||||
# Handle or re-raise exception as appropriate for your application
|
||||
raise sync_exc from async_exc
|
||||
|
||||
|
||||
async def generate_result(built_object: Any, inputs: dict, has_external_output: bool, session_id: Optional[str] = None):
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue