refactor: remove AsyncFileSink logger, add rotation support, and update docs (#9105)

* refactor(logs): remove asyncfilesink logger, add rotation support, and update docs

* refactor(logs): remove asyncfilesink logger, add rotation support, and update docs

---------

Co-authored-by: Harry Lyu <harrylyu@umich.edu>
Co-authored-by: Mendon Kissling <59585235+mendonk@users.noreply.github.com>
This commit is contained in:
XiaochuanLyu 2025-07-26 01:50:45 +08:00 committed by GitHub
commit 324caf486c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 45 additions and 27 deletions

View file

@ -51,6 +51,10 @@ LANGFLOW_LOG_LEVEL=
# Example: LANGFLOW_LOG_FILE=logs/langflow.log
LANGFLOW_LOG_FILE=
# Time/Size for log to rotate
# Example: LANGFLOW_LOG_ROTATION=10 MB/1 day
LANGFLOW_LOG_ROTATION=
# Path to the frontend directory containing build files
# Example: LANGFLOW_FRONTEND_PATH=/path/to/frontend/build/files
LANGFLOW_FRONTEND_PATH=

View file

@ -40,6 +40,12 @@ For example, `LANGFLOW_LOG_FILE=path/to/logfile.log`.
* `LANGFLOW_LOG_ENV=container_csv`: Outputs CSV-formatted plain text to stdout.
* `LANGFLOW_LOG_ENV=default` or unset: Outputs prettified output with [RichHandler](https://rich.readthedocs.io/en/stable/reference/logging.html).
* `LANGFLOW_LOG_ROTATION` controls when the log file is rotated, either based on time or file size. By default, logs are rotated every 1 day.
* Time-based rotation: "1 day", "12 hours", "1 week"
* Size-based rotation: "10 MB", "1 GB"
* Disable rotation: "None" (log files will grow without limit)
A complete example `.env` file is available in the [Langflow repository](https://github.com/langflow-ai/langflow/blob/main/.env.example).
## Flow and component logs

View file

@ -188,6 +188,7 @@ def run(
show_default=False,
),
log_file: Path | None = typer.Option(None, help="Path to the log file.", show_default=False),
log_rotation: str | None = typer.Option(None, help="Log rotation(Time/Size).", show_default=False),
cache: str | None = typer.Option( # noqa: ARG001
None,
help="Type of cache to use. (InMemoryCache, SQLiteCache)",
@ -263,7 +264,7 @@ def run(
else:
os.environ["LANGFLOW_LOG_LEVEL"] = env_log_level.lower()
configure(log_level=log_level, log_file=log_file)
configure(log_level=log_level, log_file=log_file, log_rotation=log_rotation)
# Create progress indicator (show verbose timing if log level is DEBUG)
verbose = log_level == "debug"

View file

@ -21,6 +21,7 @@ async def aload_flow_from_json(
tweaks: dict | None = None,
log_level: str | None = None,
log_file: str | None = None,
log_rotation: str | None = None,
env_file: str | None = None,
cache: str | None = None,
disable_logs: bool | None = True,
@ -33,6 +34,7 @@ async def aload_flow_from_json(
tweaks (Optional[dict]): Optional tweaks to apply to the loaded flow graph.
log_level (Optional[str]): Optional log level to configure for the flow processing.
log_file (Optional[str]): Optional log file to configure for the flow processing.
log_rotation (Optional[str]): Optional log rotation(Time/Size) to configure for the flow processing.
env_file (Optional[str]): Optional .env file to override environment variables.
cache (Optional[str]): Optional cache path to update the flow settings.
disable_logs (Optional[bool], default=True): Optional flag to disable logs during flow processing.
@ -47,7 +49,9 @@ async def aload_flow_from_json(
"""
# If input is a file path, load JSON from the file
log_file_path = Path(log_file) if log_file else None
configure(log_level=log_level, log_file=log_file_path, disable=disable_logs, async_file=True)
configure(
log_level=log_level, log_file=log_file_path, disable=disable_logs, async_file=True, log_rotation=log_rotation
)
# override env variables with .env file
if env_file and tweaks is not None:
@ -83,6 +87,7 @@ def load_flow_from_json(
tweaks: dict | None = None,
log_level: str | None = None,
log_file: str | None = None,
log_rotation: str | None = None,
env_file: str | None = None,
cache: str | None = None,
disable_logs: bool | None = True,
@ -95,6 +100,7 @@ def load_flow_from_json(
tweaks (Optional[dict]): Optional tweaks to apply to the loaded flow graph.
log_level (Optional[str]): Optional log level to configure for the flow processing.
log_file (Optional[str]): Optional log file to configure for the flow processing.
log_rotation (Optional[str]): Optional log rotation(Time/Size) to configure for the flow processing.
env_file (Optional[str]): Optional .env file to override environment variables.
cache (Optional[str]): Optional cache path to update the flow settings.
disable_logs (Optional[bool], default=True): Optional flag to disable logs during flow processing.
@ -113,6 +119,7 @@ def load_flow_from_json(
tweaks=tweaks,
log_level=log_level,
log_file=log_file,
log_rotation=log_rotation,
env_file=env_file,
cache=cache,
disable_logs=disable_logs,
@ -131,6 +138,7 @@ async def arun_flow_from_json(
output_component: str | None = None,
log_level: str | None = None,
log_file: str | None = None,
log_rotation: str | None = None,
env_file: str | None = None,
cache: str | None = None,
disable_logs: bool | None = True,
@ -148,6 +156,7 @@ async def arun_flow_from_json(
output_component (Optional[str], optional): The specific component to output. Defaults to None.
log_level (Optional[str], optional): The log level to use. Defaults to None.
log_file (Optional[str], optional): The log file to write logs to. Defaults to None.
log_rotation (Optional[str], optional): The log rotation to use. Defaults to None.
env_file (Optional[str], optional): The environment file to load. Defaults to None.
cache (Optional[str], optional): The cache directory to use. Defaults to None.
disable_logs (Optional[bool], optional): Whether to disable logs. Defaults to True.
@ -165,6 +174,7 @@ async def arun_flow_from_json(
tweaks=tweaks,
log_level=log_level,
log_file=log_file,
log_rotation=log_rotation,
env_file=env_file,
cache=cache,
disable_logs=disable_logs,
@ -193,6 +203,7 @@ def run_flow_from_json(
output_component: str | None = None,
log_level: str | None = None,
log_file: str | None = None,
log_rotation: str | None = None,
env_file: str | None = None,
cache: str | None = None,
disable_logs: bool | None = True,
@ -214,6 +225,7 @@ def run_flow_from_json(
output_component (Optional[str], optional): The specific component to output. Defaults to None.
log_level (Optional[str], optional): The log level to use. Defaults to None.
log_file (Optional[str], optional): The log file to write logs to. Defaults to None.
log_rotation (Optional[str], optional): The log rotation to use. Defaults to None.
env_file (Optional[str], optional): The environment file to load. Defaults to None.
cache (Optional[str], optional): The cache directory to use. Defaults to None.
disable_logs (Optional[bool], optional): Whether to disable logs. Defaults to True.
@ -234,6 +246,7 @@ def run_flow_from_json(
output_component=output_component,
log_level=log_level,
log_file=log_file,
log_rotation=log_rotation,
env_file=env_file,
cache=cache,
disable_logs=disable_logs,

View file

@ -1,4 +1,3 @@
import asyncio
import json
import logging
import os
@ -9,10 +8,7 @@ from threading import Lock, Semaphore
from typing import TypedDict
import orjson
from loguru import _defaults, logger
from loguru._error_interceptor import ErrorInterceptor
from loguru._file_sink import FileSink
from loguru._simple_sinks import AsyncSink
from loguru import logger
from platformdirs import user_cache_dir
from rich.logging import RichHandler
from typing_extensions import NotRequired, override
@ -154,24 +150,6 @@ class LogConfig(TypedDict):
log_format: NotRequired[str]
class AsyncFileSink(AsyncSink):
def __init__(self, file):
self._sink = FileSink(
path=file,
rotation="10 MB", # Log rotation based on file size
delay=True,
)
super().__init__(self.write_async, None, ErrorInterceptor(_defaults.LOGURU_CATCH, -1))
async def complete(self):
await asyncio.to_thread(self._sink.stop)
for task in self._tasks:
await self._complete_task(task)
async def write_async(self, message):
await asyncio.to_thread(self._sink.write, message)
def is_valid_log_format(format_string) -> bool:
"""Validates a logging format string by attempting to format it with a dummy LogRecord.
@ -204,6 +182,7 @@ def configure(
log_env: str | None = None,
log_format: str | None = None,
async_file: bool = False,
log_rotation: str | None = None,
) -> None:
if disable and log_level is None and log_file is None:
logger.disable("langflow")
@ -252,12 +231,20 @@ def configure(
logger.debug(f"Cache directory: {cache_dir}")
log_file = cache_dir / "langflow.log"
logger.debug(f"Log file: {log_file}")
if os.getenv("LANGFLOW_LOG_ROTATION") and log_rotation is None:
log_rotation = os.getenv("LANGFLOW_LOG_ROTATION")
elif log_rotation is None:
log_rotation = "1 day"
try:
logger.add(
sink=AsyncFileSink(log_file) if async_file else log_file,
sink=log_file,
level=log_level.upper(),
format=log_format,
serialize=True,
enqueue=async_file,
rotation=log_rotation,
)
except Exception: # noqa: BLE001
logger.exception("Error setting up log file")

View file

@ -46,12 +46,19 @@ class LangflowRunnerExperimental:
should_initialize_db: bool = True,
log_level: str | None = None,
log_file: str | None = None,
log_rotation: str | None = None,
disable_logs: bool = False,
async_log_file: bool = True,
):
self.should_initialize_db = should_initialize_db
log_file_path = Path(log_file) if log_file else None
configure(log_level=log_level, log_file=log_file_path, disable=disable_logs, async_file=async_log_file)
configure(
log_level=log_level,
log_file=log_file_path,
log_rotation=log_rotation,
disable=disable_logs,
async_file=async_log_file,
)
async def run(
self,