diff --git a/src/backend/base/langflow/load/load.py b/src/backend/base/langflow/load/load.py index 1cbc7b4f3..13d0ab051 100644 --- a/src/backend/base/langflow/load/load.py +++ b/src/backend/base/langflow/load/load.py @@ -3,6 +3,7 @@ import json from pathlib import Path from dotenv import load_dotenv +from loguru import logger from langflow.graph import Graph from langflow.graph.schema import RunOutputs @@ -43,7 +44,7 @@ def load_flow_from_json( """ # If input is a file path, load JSON from the file log_file_path = Path(log_file) if log_file else None - configure(log_level=log_level, log_file=log_file_path, disable=disable_logs) + configure(log_level=log_level, log_file=log_file_path, disable=disable_logs, async_file=True) # override env variables with .env file if env_file: @@ -119,7 +120,7 @@ async def arun_flow_from_json( cache=cache, disable_logs=disable_logs, ) - return await run_graph( + result = await run_graph( graph=graph, session_id=session_id, input_value=input_value, @@ -128,6 +129,8 @@ async def arun_flow_from_json( output_component=output_component, fallback_to_env_vars=fallback_to_env_vars, ) + await logger.complete() + return result def run_flow_from_json( diff --git a/src/backend/base/langflow/logging/logger.py b/src/backend/base/langflow/logging/logger.py index 8fc7f0003..5ea90d392 100644 --- a/src/backend/base/langflow/logging/logger.py +++ b/src/backend/base/langflow/logging/logger.py @@ -1,3 +1,4 @@ +import asyncio import json import logging import os @@ -8,7 +9,10 @@ from threading import Lock, Semaphore from typing import TypedDict import orjson -from loguru import logger +from loguru import _defaults, logger +from loguru._error_interceptor import ErrorInterceptor +from loguru._file_sink import FileSink +from loguru._simple_sinks import AsyncSink from platformdirs import user_cache_dir from rich.logging import RichHandler from typing_extensions import NotRequired @@ -136,12 +140,30 @@ class LogConfig(TypedDict): log_env: NotRequired[str] +class AsyncFileSink(AsyncSink): + def __init__(self, file): + self._sink = FileSink( + path=file, + rotation="10 MB", # Log rotation based on file size + ) + super().__init__(self.write_async, None, ErrorInterceptor(_defaults.LOGURU_CATCH, -1)) + + async def complete(self): + await asyncio.to_thread(self._sink.stop) + for task in self._tasks: + await self._complete_task(task) + + async def write_async(self, message): + await asyncio.to_thread(self._sink.write, message) + + def configure( *, log_level: str | None = None, log_file: Path | None = None, disable: bool | None = False, log_env: str | None = None, + async_file: bool = False, ) -> None: if disable and log_level is None and log_file is None: logger.disable("langflow") @@ -187,14 +209,12 @@ def configure( log_file = cache_dir / "langflow.log" logger.debug(f"Log file: {log_file}") try: - log_file = Path(log_file) log_file.parent.mkdir(parents=True, exist_ok=True) logger.add( - sink=str(log_file), + sink=AsyncFileSink(log_file) if async_file else log_file, level=log_level.upper(), format=log_format, - rotation="10 MB", # Log rotation based on file size serialize=True, ) except Exception: # noqa: BLE001 diff --git a/src/backend/base/langflow/main.py b/src/backend/base/langflow/main.py index d3f5ae981..17ea9face 100644 --- a/src/backend/base/langflow/main.py +++ b/src/backend/base/langflow/main.py @@ -94,6 +94,7 @@ def get_lifespan(*, fix_migration=False, version=None): @asynccontextmanager async def lifespan(_app: FastAPI): + configure(async_file=True) # Startup message if version: rprint(f"[bold green]Starting Langflow v{version}...[/bold green]") @@ -113,6 +114,7 @@ def get_lifespan(*, fix_migration=False, version=None): # Shutdown message rprint("[bold red]Shutting down Langflow...[/bold red]") await teardown_services() + await logger.complete() return lifespan