🐛 fix(base.py): add support for langfuse callback to flush if present

 feat(base.py): add support for langfuse callback to flush if present when getting result and steps
This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-08-22 09:47:22 -03:00
commit c8d9c41f9a

View file

@ -1,4 +1,4 @@
from typing import Union
from typing import List, Union, TYPE_CHECKING
from langflow.api.v1.callback import (
AsyncStreamingLLMCallbackHandler,
StreamingLLMCallbackHandler,
@ -6,6 +6,10 @@ from langflow.api.v1.callback import (
from langflow.processing.process import fix_memory_inputs, format_actions
from langflow.utils.logger import logger
from langchain.agents.agent import AgentExecutor
from langchain.callbacks.base import BaseCallbackHandler
if TYPE_CHECKING:
from langfuse.callback import CallbackHandler # type: ignore
def setup_callbacks(sync, **kwargs):
@ -47,6 +51,18 @@ def get_langfuse_callback():
return None
def flush_langfuse_callback_if_present(
callbacks: List[Union[BaseCallbackHandler, "CallbackHandler"]]
):
"""
If langfuse callback is present, run callback.langfuse.flush()
"""
for callback in callbacks:
if hasattr(callback, "langfuse"):
callback.langfuse.flush()
break
async def get_result_and_steps(langchain_object, inputs: Union[dict, str], **kwargs):
"""Get result and thought from extracted json"""
@ -66,13 +82,16 @@ async def get_result_and_steps(langchain_object, inputs: Union[dict, str], **kwa
logger.error(f"Error fixing memory inputs: {exc}")
try:
async_callbacks = setup_callbacks(sync=False, **kwargs)
output = await langchain_object.acall(inputs, callbacks=async_callbacks)
callbacks = setup_callbacks(sync=False, **kwargs)
output = await langchain_object.acall(inputs, callbacks=callbacks)
except Exception as exc:
# make the error message more informative
logger.debug(f"Error: {str(exc)}")
sync_callbacks = setup_callbacks(sync=True, **kwargs)
output = langchain_object(inputs, callbacks=sync_callbacks)
callbacks = setup_callbacks(sync=True, **kwargs)
output = langchain_object(inputs, callbacks=callbacks)
# if langfuse callback is present, run callback.langfuse.flush()
flush_langfuse_callback_if_present(callbacks)
intermediate_steps = (
output.get("intermediate_steps", []) if isinstance(output, dict) else []