fix: Make logger file sink non-blocking when used with asyncio (#4301)
The Loguru FileSink is blocking so it is bad to use it in the event loop. Use instead an AsyncSink wrapping the FileSink to delegate the log writing to the default thread pool. NB: when the AsyncFileSink is used, logs produced outside of the event loop are not written to the file.
This commit is contained in:
parent
4aa7aed0a9
commit
d53107ce11
3 changed files with 31 additions and 6 deletions
|
|
@ -3,6 +3,7 @@ import json
|
|||
from pathlib import Path
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from langflow.graph import Graph
|
||||
from langflow.graph.schema import RunOutputs
|
||||
|
|
@ -43,7 +44,7 @@ def load_flow_from_json(
|
|||
"""
|
||||
# If input is a file path, load JSON from the file
|
||||
log_file_path = Path(log_file) if log_file else None
|
||||
configure(log_level=log_level, log_file=log_file_path, disable=disable_logs)
|
||||
configure(log_level=log_level, log_file=log_file_path, disable=disable_logs, async_file=True)
|
||||
|
||||
# override env variables with .env file
|
||||
if env_file:
|
||||
|
|
@ -119,7 +120,7 @@ async def arun_flow_from_json(
|
|||
cache=cache,
|
||||
disable_logs=disable_logs,
|
||||
)
|
||||
return await run_graph(
|
||||
result = await run_graph(
|
||||
graph=graph,
|
||||
session_id=session_id,
|
||||
input_value=input_value,
|
||||
|
|
@ -128,6 +129,8 @@ async def arun_flow_from_json(
|
|||
output_component=output_component,
|
||||
fallback_to_env_vars=fallback_to_env_vars,
|
||||
)
|
||||
await logger.complete()
|
||||
return result
|
||||
|
||||
|
||||
def run_flow_from_json(
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
|
|
@ -8,7 +9,10 @@ from threading import Lock, Semaphore
|
|||
from typing import TypedDict
|
||||
|
||||
import orjson
|
||||
from loguru import logger
|
||||
from loguru import _defaults, logger
|
||||
from loguru._error_interceptor import ErrorInterceptor
|
||||
from loguru._file_sink import FileSink
|
||||
from loguru._simple_sinks import AsyncSink
|
||||
from platformdirs import user_cache_dir
|
||||
from rich.logging import RichHandler
|
||||
from typing_extensions import NotRequired
|
||||
|
|
@ -136,12 +140,30 @@ class LogConfig(TypedDict):
|
|||
log_env: NotRequired[str]
|
||||
|
||||
|
||||
class AsyncFileSink(AsyncSink):
|
||||
def __init__(self, file):
|
||||
self._sink = FileSink(
|
||||
path=file,
|
||||
rotation="10 MB", # Log rotation based on file size
|
||||
)
|
||||
super().__init__(self.write_async, None, ErrorInterceptor(_defaults.LOGURU_CATCH, -1))
|
||||
|
||||
async def complete(self):
|
||||
await asyncio.to_thread(self._sink.stop)
|
||||
for task in self._tasks:
|
||||
await self._complete_task(task)
|
||||
|
||||
async def write_async(self, message):
|
||||
await asyncio.to_thread(self._sink.write, message)
|
||||
|
||||
|
||||
def configure(
|
||||
*,
|
||||
log_level: str | None = None,
|
||||
log_file: Path | None = None,
|
||||
disable: bool | None = False,
|
||||
log_env: str | None = None,
|
||||
async_file: bool = False,
|
||||
) -> None:
|
||||
if disable and log_level is None and log_file is None:
|
||||
logger.disable("langflow")
|
||||
|
|
@ -187,14 +209,12 @@ def configure(
|
|||
log_file = cache_dir / "langflow.log"
|
||||
logger.debug(f"Log file: {log_file}")
|
||||
try:
|
||||
log_file = Path(log_file)
|
||||
log_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
logger.add(
|
||||
sink=str(log_file),
|
||||
sink=AsyncFileSink(log_file) if async_file else log_file,
|
||||
level=log_level.upper(),
|
||||
format=log_format,
|
||||
rotation="10 MB", # Log rotation based on file size
|
||||
serialize=True,
|
||||
)
|
||||
except Exception: # noqa: BLE001
|
||||
|
|
|
|||
|
|
@ -94,6 +94,7 @@ def get_lifespan(*, fix_migration=False, version=None):
|
|||
|
||||
@asynccontextmanager
|
||||
async def lifespan(_app: FastAPI):
|
||||
configure(async_file=True)
|
||||
# Startup message
|
||||
if version:
|
||||
rprint(f"[bold green]Starting Langflow v{version}...[/bold green]")
|
||||
|
|
@ -113,6 +114,7 @@ def get_lifespan(*, fix_migration=False, version=None):
|
|||
# Shutdown message
|
||||
rprint("[bold red]Shutting down Langflow...[/bold red]")
|
||||
await teardown_services()
|
||||
await logger.complete()
|
||||
|
||||
return lifespan
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue