diff --git a/src/backend/langflow/processing/base.py b/src/backend/langflow/processing/base.py index 80ee3fef4..4e2b1d716 100644 --- a/src/backend/langflow/processing/base.py +++ b/src/backend/langflow/processing/base.py @@ -1,4 +1,4 @@ -from typing import Union +from typing import List, Union, TYPE_CHECKING from langflow.api.v1.callback import ( AsyncStreamingLLMCallbackHandler, StreamingLLMCallbackHandler, @@ -6,6 +6,10 @@ from langflow.api.v1.callback import ( from langflow.processing.process import fix_memory_inputs, format_actions from langflow.utils.logger import logger from langchain.agents.agent import AgentExecutor +from langchain.callbacks.base import BaseCallbackHandler + +if TYPE_CHECKING: + from langfuse.callback import CallbackHandler # type: ignore def setup_callbacks(sync, **kwargs): @@ -47,6 +51,18 @@ def get_langfuse_callback(): return None +def flush_langfuse_callback_if_present( + callbacks: List[Union[BaseCallbackHandler, "CallbackHandler"]] +): + """ + If langfuse callback is present, run callback.langfuse.flush() + """ + for callback in callbacks: + if hasattr(callback, "langfuse"): + callback.langfuse.flush() + break + + async def get_result_and_steps(langchain_object, inputs: Union[dict, str], **kwargs): """Get result and thought from extracted json""" @@ -66,13 +82,16 @@ async def get_result_and_steps(langchain_object, inputs: Union[dict, str], **kwa logger.error(f"Error fixing memory inputs: {exc}") try: - async_callbacks = setup_callbacks(sync=False, **kwargs) - output = await langchain_object.acall(inputs, callbacks=async_callbacks) + callbacks = setup_callbacks(sync=False, **kwargs) + output = await langchain_object.acall(inputs, callbacks=callbacks) except Exception as exc: # make the error message more informative logger.debug(f"Error: {str(exc)}") - sync_callbacks = setup_callbacks(sync=True, **kwargs) - output = langchain_object(inputs, callbacks=sync_callbacks) + callbacks = setup_callbacks(sync=True, **kwargs) + output = langchain_object(inputs, callbacks=callbacks) + + # if langfuse callback is present, run callback.langfuse.flush() + flush_langfuse_callback_if_present(callbacks) intermediate_steps = ( output.get("intermediate_steps", []) if isinstance(output, dict) else []