diff --git a/src/backend/langflow/processing/base.py b/src/backend/langflow/processing/base.py index 13ff6a385..bcfcf13e8 100644 --- a/src/backend/langflow/processing/base.py +++ b/src/backend/langflow/processing/base.py @@ -8,6 +8,34 @@ from langflow.utils.logger import logger from langchain.agents.agent import AgentExecutor +def setup_callbacks(sync, **kwargs): + """Setup callbacks for langchain object""" + callbacks = [] + if sync: + callbacks.append(StreamingLLMCallbackHandler(**kwargs)) + else: + callbacks.append(AsyncStreamingLLMCallbackHandler(**kwargs)) + + if langfuse_callback := get_langfuse_callback(): + callbacks.append(langfuse_callback) + return callbacks + + +def get_langfuse_callback(): + from langflow.settings import settings + + if settings.LANGFUSE_PUBLIC_KEY and settings.LANGFUSE_SECRET_KEY: + try: + from langfuse.callback import CallbackHandler + + return CallbackHandler( + settings.LANGFUSE_PUBLIC_KEY, settings.LANGFUSE_SECRET_KEY + ) + except Exception as exc: + logger.error(f"Error initializing langfuse callback: {exc}") + return None + + async def get_result_and_steps(langchain_object, inputs: Union[dict, str], **kwargs): """Get result and thought from extracted json""" @@ -27,12 +55,12 @@ async def get_result_and_steps(langchain_object, inputs: Union[dict, str], **kwa logger.error(f"Error fixing memory inputs: {exc}") try: - async_callbacks = [AsyncStreamingLLMCallbackHandler(**kwargs)] + async_callbacks = setup_callbacks(sync=False, **kwargs) output = await langchain_object.acall(inputs, callbacks=async_callbacks) except Exception as exc: # make the error message more informative logger.debug(f"Error: {str(exc)}") - sync_callbacks = [StreamingLLMCallbackHandler(**kwargs)] + sync_callbacks = setup_callbacks(sync=True, **kwargs) output = langchain_object(inputs, callbacks=sync_callbacks) intermediate_steps = ( diff --git a/src/backend/langflow/settings.py b/src/backend/langflow/settings.py index 6a10f0506..1c350f11e 100644 --- a/src/backend/langflow/settings.py +++ b/src/backend/langflow/settings.py @@ -35,6 +35,9 @@ class Settings(BaseSettings): REMOVE_API_KEYS: bool = False COMPONENTS_PATH: List[str] = [] + LANGFUSE_SECRET_KEY: Optional[str] = None + LANGFUSE_PUBLIC_KEY: Optional[str] = None + @validator("DATABASE_URL", pre=True) def set_database_url(cls, value): if not value: