Refactor invoke_lc_runnable function parameters

This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-02-07 14:07:02 -03:00
commit ee43952e42

View file

@ -11,7 +11,7 @@ def is_basic_type(obj):
async def invoke_lc_runnable(
langchain_object: Runnable, inputs: dict, has_external_output: bool, session_id: Optional[str] = None, **kwargs
built_object: Runnable, inputs: dict, has_external_output: bool, session_id: Optional[str] = None, **kwargs
) -> Union[str, BaseMessage]:
# Setup callbacks for asynchronous execution
from langflow.processing.base import setup_callbacks
@ -19,15 +19,15 @@ async def invoke_lc_runnable(
callbacks = setup_callbacks(sync=False, trace_id=session_id, **kwargs)
try:
if has_external_output and hasattr(langchain_object, "astream"):
if has_external_output and hasattr(built_object, "astream"):
# Asynchronous stream handling if supported and required
output = ""
async for chunk in langchain_object.astream(inputs, {"callbacks": callbacks}):
async for chunk in built_object.astream(inputs, {"callbacks": callbacks}):
output += chunk
return output
else:
# Direct asynchronous invocation
return await langchain_object.ainvoke(inputs, {"callbacks": callbacks})
return await built_object.ainvoke(inputs, {"callbacks": callbacks})
except Exception as async_exc:
logger.debug(f"Async error, falling back to sync: {str(async_exc)}")
@ -35,15 +35,15 @@ async def invoke_lc_runnable(
sync_callbacks = setup_callbacks(sync=True, trace_id=session_id, **kwargs)
try:
# Synchronous fallback if asynchronous execution fails
if has_external_output and hasattr(langchain_object, "stream"):
if has_external_output and hasattr(built_object, "stream"):
# Synchronous stream handling if supported and required
output = ""
for chunk in langchain_object.stream(inputs, {"callbacks": sync_callbacks}):
for chunk in built_object.stream(inputs, {"callbacks": sync_callbacks}):
output += chunk
return output
else:
# Direct synchronous invocation
return langchain_object.invoke(inputs, {"callbacks": sync_callbacks})
return built_object.invoke(inputs, {"callbacks": sync_callbacks})
except Exception as sync_exc:
logger.error(f"Sync error after async failure: {str(sync_exc)}")
# Handle or re-raise exception as appropriate for your application