From c8d9c41f9a32e73d381823e28e6630334e82dae5 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 22 Aug 2023 09:47:22 -0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20fix(base.py):=20add=20support=20?= =?UTF-8?q?for=20langfuse=20callback=20to=20flush=20if=20present=20?= =?UTF-8?q?=E2=9C=A8=20feat(base.py):=20add=20support=20for=20langfuse=20c?= =?UTF-8?q?allback=20to=20flush=20if=20present=20when=20getting=20result?= =?UTF-8?q?=20and=20steps?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/langflow/processing/base.py | 29 ++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/src/backend/langflow/processing/base.py b/src/backend/langflow/processing/base.py index 80ee3fef4..4e2b1d716 100644 --- a/src/backend/langflow/processing/base.py +++ b/src/backend/langflow/processing/base.py @@ -1,4 +1,4 @@ -from typing import Union +from typing import List, Union, TYPE_CHECKING from langflow.api.v1.callback import ( AsyncStreamingLLMCallbackHandler, StreamingLLMCallbackHandler, @@ -6,6 +6,10 @@ from langflow.api.v1.callback import ( from langflow.processing.process import fix_memory_inputs, format_actions from langflow.utils.logger import logger from langchain.agents.agent import AgentExecutor +from langchain.callbacks.base import BaseCallbackHandler + +if TYPE_CHECKING: + from langfuse.callback import CallbackHandler # type: ignore def setup_callbacks(sync, **kwargs): @@ -47,6 +51,18 @@ def get_langfuse_callback(): return None +def flush_langfuse_callback_if_present( + callbacks: List[Union[BaseCallbackHandler, "CallbackHandler"]] +): + """ + If langfuse callback is present, run callback.langfuse.flush() + """ + for callback in callbacks: + if hasattr(callback, "langfuse"): + callback.langfuse.flush() + break + + async def get_result_and_steps(langchain_object, inputs: Union[dict, str], **kwargs): """Get result and thought from extracted json""" @@ -66,13 +82,16 @@ async def get_result_and_steps(langchain_object, inputs: Union[dict, str], **kwa logger.error(f"Error fixing memory inputs: {exc}") try: - async_callbacks = setup_callbacks(sync=False, **kwargs) - output = await langchain_object.acall(inputs, callbacks=async_callbacks) + callbacks = setup_callbacks(sync=False, **kwargs) + output = await langchain_object.acall(inputs, callbacks=callbacks) except Exception as exc: # make the error message more informative logger.debug(f"Error: {str(exc)}") - sync_callbacks = setup_callbacks(sync=True, **kwargs) - output = langchain_object(inputs, callbacks=sync_callbacks) + callbacks = setup_callbacks(sync=True, **kwargs) + output = langchain_object(inputs, callbacks=callbacks) + + # if langfuse callback is present, run callback.langfuse.flush() + flush_langfuse_callback_if_present(callbacks) intermediate_steps = ( output.get("intermediate_steps", []) if isinstance(output, dict) else []