From 6fe486b1647987c6ec0df52f8b44c0ba5a86a526 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 15 Aug 2023 12:46:54 -0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=A7=20chore(base.py):=20refactor=20set?= =?UTF-8?q?up=5Fcallbacks=20function=20to=20improve=20readability=20and=20?= =?UTF-8?q?maintainability=20=F0=9F=94=A7=20chore(base.py):=20refactor=20g?= =?UTF-8?q?et=5Flangfuse=5Fcallback=20function=20to=20improve=20readabilit?= =?UTF-8?q?y=20and=20maintainability=20=F0=9F=94=A7=20chore(base.py):=20re?= =?UTF-8?q?factor=20get=5Fresult=5Fand=5Fsteps=20function=20to=20improve?= =?UTF-8?q?=20readability=20and=20maintainability=20=F0=9F=94=A7=20chore(s?= =?UTF-8?q?ettings.py):=20add=20LANGFUSE=5FSECRET=5FKEY=20and=20LANGFUSE?= =?UTF-8?q?=5FPUBLIC=5FKEY=20optional=20settings?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/langflow/processing/base.py | 32 +++++++++++++++++++++++-- src/backend/langflow/settings.py | 3 +++ 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/src/backend/langflow/processing/base.py b/src/backend/langflow/processing/base.py index 13ff6a385..bcfcf13e8 100644 --- a/src/backend/langflow/processing/base.py +++ b/src/backend/langflow/processing/base.py @@ -8,6 +8,34 @@ from langflow.utils.logger import logger from langchain.agents.agent import AgentExecutor +def setup_callbacks(sync, **kwargs): + """Setup callbacks for langchain object""" + callbacks = [] + if sync: + callbacks.append(StreamingLLMCallbackHandler(**kwargs)) + else: + callbacks.append(AsyncStreamingLLMCallbackHandler(**kwargs)) + + if langfuse_callback := get_langfuse_callback(): + callbacks.append(langfuse_callback) + return callbacks + + +def get_langfuse_callback(): + from langflow.settings import settings + + if settings.LANGFUSE_PUBLIC_KEY and settings.LANGFUSE_SECRET_KEY: + try: + from langfuse.callback import CallbackHandler + + return CallbackHandler( + settings.LANGFUSE_PUBLIC_KEY, settings.LANGFUSE_SECRET_KEY + ) + except Exception as exc: + logger.error(f"Error initializing langfuse callback: {exc}") + return None + + async def get_result_and_steps(langchain_object, inputs: Union[dict, str], **kwargs): """Get result and thought from extracted json""" @@ -27,12 +55,12 @@ async def get_result_and_steps(langchain_object, inputs: Union[dict, str], **kwa logger.error(f"Error fixing memory inputs: {exc}") try: - async_callbacks = [AsyncStreamingLLMCallbackHandler(**kwargs)] + async_callbacks = setup_callbacks(sync=False, **kwargs) output = await langchain_object.acall(inputs, callbacks=async_callbacks) except Exception as exc: # make the error message more informative logger.debug(f"Error: {str(exc)}") - sync_callbacks = [StreamingLLMCallbackHandler(**kwargs)] + sync_callbacks = setup_callbacks(sync=True, **kwargs) output = langchain_object(inputs, callbacks=sync_callbacks) intermediate_steps = ( diff --git a/src/backend/langflow/settings.py b/src/backend/langflow/settings.py index 6a10f0506..1c350f11e 100644 --- a/src/backend/langflow/settings.py +++ b/src/backend/langflow/settings.py @@ -35,6 +35,9 @@ class Settings(BaseSettings): REMOVE_API_KEYS: bool = False COMPONENTS_PATH: List[str] = [] + LANGFUSE_SECRET_KEY: Optional[str] = None + LANGFUSE_PUBLIC_KEY: Optional[str] = None + @validator("DATABASE_URL", pre=True) def set_database_url(cls, value): if not value: