From ee43952e420992f5fbc2554c8a74f106bb47faf8 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Wed, 7 Feb 2024 14:07:02 -0300 Subject: [PATCH] Refactor invoke_lc_runnable function parameters --- src/backend/langflow/graph/vertex/utils.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/backend/langflow/graph/vertex/utils.py b/src/backend/langflow/graph/vertex/utils.py index 0964e7b0a..e67104365 100644 --- a/src/backend/langflow/graph/vertex/utils.py +++ b/src/backend/langflow/graph/vertex/utils.py @@ -11,7 +11,7 @@ def is_basic_type(obj): async def invoke_lc_runnable( - langchain_object: Runnable, inputs: dict, has_external_output: bool, session_id: Optional[str] = None, **kwargs + built_object: Runnable, inputs: dict, has_external_output: bool, session_id: Optional[str] = None, **kwargs ) -> Union[str, BaseMessage]: # Setup callbacks for asynchronous execution from langflow.processing.base import setup_callbacks @@ -19,15 +19,15 @@ async def invoke_lc_runnable( callbacks = setup_callbacks(sync=False, trace_id=session_id, **kwargs) try: - if has_external_output and hasattr(langchain_object, "astream"): + if has_external_output and hasattr(built_object, "astream"): # Asynchronous stream handling if supported and required output = "" - async for chunk in langchain_object.astream(inputs, {"callbacks": callbacks}): + async for chunk in built_object.astream(inputs, {"callbacks": callbacks}): output += chunk return output else: # Direct asynchronous invocation - return await langchain_object.ainvoke(inputs, {"callbacks": callbacks}) + return await built_object.ainvoke(inputs, {"callbacks": callbacks}) except Exception as async_exc: logger.debug(f"Async error, falling back to sync: {str(async_exc)}") @@ -35,15 +35,15 @@ async def invoke_lc_runnable( sync_callbacks = setup_callbacks(sync=True, trace_id=session_id, **kwargs) try: # Synchronous fallback if asynchronous execution fails - if has_external_output and hasattr(langchain_object, "stream"): + if has_external_output and hasattr(built_object, "stream"): # Synchronous stream handling if supported and required output = "" - for chunk in langchain_object.stream(inputs, {"callbacks": sync_callbacks}): + for chunk in built_object.stream(inputs, {"callbacks": sync_callbacks}): output += chunk return output else: # Direct synchronous invocation - return langchain_object.invoke(inputs, {"callbacks": sync_callbacks}) + return built_object.invoke(inputs, {"callbacks": sync_callbacks}) except Exception as sync_exc: logger.error(f"Sync error after async failure: {str(sync_exc)}") # Handle or re-raise exception as appropriate for your application