From d53107ce11524f47033e3f8366407be7614eb0c5 Mon Sep 17 00:00:00 2001 From: Christophe Bornet Date: Mon, 28 Oct 2024 17:38:40 +0100 Subject: [PATCH] fix: Make logger file sink non-blocking when used with asyncio (#4301) The Loguru FileSink is blocking so it is bad to use it in the event loop. Use instead an AsyncSink wrapping the FileSink to delegate the log writing to the default thread pool. NB: when the AsyncFileSink is used, logs produced outside of the event loop are not written to the file. --- src/backend/base/langflow/load/load.py | 7 ++++-- src/backend/base/langflow/logging/logger.py | 28 ++++++++++++++++++--- src/backend/base/langflow/main.py | 2 ++ 3 files changed, 31 insertions(+), 6 deletions(-) diff --git a/src/backend/base/langflow/load/load.py b/src/backend/base/langflow/load/load.py index 1cbc7b4f3..13d0ab051 100644 --- a/src/backend/base/langflow/load/load.py +++ b/src/backend/base/langflow/load/load.py @@ -3,6 +3,7 @@ import json from pathlib import Path from dotenv import load_dotenv +from loguru import logger from langflow.graph import Graph from langflow.graph.schema import RunOutputs @@ -43,7 +44,7 @@ def load_flow_from_json( """ # If input is a file path, load JSON from the file log_file_path = Path(log_file) if log_file else None - configure(log_level=log_level, log_file=log_file_path, disable=disable_logs) + configure(log_level=log_level, log_file=log_file_path, disable=disable_logs, async_file=True) # override env variables with .env file if env_file: @@ -119,7 +120,7 @@ async def arun_flow_from_json( cache=cache, disable_logs=disable_logs, ) - return await run_graph( + result = await run_graph( graph=graph, session_id=session_id, input_value=input_value, @@ -128,6 +129,8 @@ async def arun_flow_from_json( output_component=output_component, fallback_to_env_vars=fallback_to_env_vars, ) + await logger.complete() + return result def run_flow_from_json( diff --git a/src/backend/base/langflow/logging/logger.py b/src/backend/base/langflow/logging/logger.py index 8fc7f0003..5ea90d392 100644 --- a/src/backend/base/langflow/logging/logger.py +++ b/src/backend/base/langflow/logging/logger.py @@ -1,3 +1,4 @@ +import asyncio import json import logging import os @@ -8,7 +9,10 @@ from threading import Lock, Semaphore from typing import TypedDict import orjson -from loguru import logger +from loguru import _defaults, logger +from loguru._error_interceptor import ErrorInterceptor +from loguru._file_sink import FileSink +from loguru._simple_sinks import AsyncSink from platformdirs import user_cache_dir from rich.logging import RichHandler from typing_extensions import NotRequired @@ -136,12 +140,30 @@ class LogConfig(TypedDict): log_env: NotRequired[str] +class AsyncFileSink(AsyncSink): + def __init__(self, file): + self._sink = FileSink( + path=file, + rotation="10 MB", # Log rotation based on file size + ) + super().__init__(self.write_async, None, ErrorInterceptor(_defaults.LOGURU_CATCH, -1)) + + async def complete(self): + await asyncio.to_thread(self._sink.stop) + for task in self._tasks: + await self._complete_task(task) + + async def write_async(self, message): + await asyncio.to_thread(self._sink.write, message) + + def configure( *, log_level: str | None = None, log_file: Path | None = None, disable: bool | None = False, log_env: str | None = None, + async_file: bool = False, ) -> None: if disable and log_level is None and log_file is None: logger.disable("langflow") @@ -187,14 +209,12 @@ def configure( log_file = cache_dir / "langflow.log" logger.debug(f"Log file: {log_file}") try: - log_file = Path(log_file) log_file.parent.mkdir(parents=True, exist_ok=True) logger.add( - sink=str(log_file), + sink=AsyncFileSink(log_file) if async_file else log_file, level=log_level.upper(), format=log_format, - rotation="10 MB", # Log rotation based on file size serialize=True, ) except Exception: # noqa: BLE001 diff --git a/src/backend/base/langflow/main.py b/src/backend/base/langflow/main.py index d3f5ae981..17ea9face 100644 --- a/src/backend/base/langflow/main.py +++ b/src/backend/base/langflow/main.py @@ -94,6 +94,7 @@ def get_lifespan(*, fix_migration=False, version=None): @asynccontextmanager async def lifespan(_app: FastAPI): + configure(async_file=True) # Startup message if version: rprint(f"[bold green]Starting Langflow v{version}...[/bold green]") @@ -113,6 +114,7 @@ def get_lifespan(*, fix_migration=False, version=None): # Shutdown message rprint("[bold red]Shutting down Langflow...[/bold red]") await teardown_services() + await logger.complete() return lifespan