Add version check and new version notice (#1616)
* Add version check and new version notice * Add function to extract letter from pre-release version * Update version banner and add package update notice * Change log level from error to debug * Refactor logging configuration and add InterceptHandler * update banner * Update launch.json path in .vscode directory
This commit is contained in:
parent
58d9086ce7
commit
5ed2c59c2a
5 changed files with 303 additions and 67 deletions
2
.vscode/launch.json
vendored
2
.vscode/launch.json
vendored
|
|
@ -32,7 +32,7 @@
|
|||
"args": [
|
||||
"run",
|
||||
"--path",
|
||||
"${workspaceFolder}/src/backend/langflow/frontend"
|
||||
"${workspaceFolder}/src/backend/base/langflow/frontend"
|
||||
],
|
||||
"jinja": true,
|
||||
"justMyCode": false,
|
||||
|
|
|
|||
|
|
@ -2,25 +2,25 @@ import platform
|
|||
import socket
|
||||
import sys
|
||||
import time
|
||||
import webbrowser
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import click
|
||||
import httpx
|
||||
import typer
|
||||
from dotenv import load_dotenv
|
||||
from multiprocess import Process, cpu_count # type: ignore
|
||||
from rich import box
|
||||
from rich import print as rprint
|
||||
from rich.console import Console
|
||||
from rich.panel import Panel
|
||||
from rich.table import Table
|
||||
|
||||
from langflow.main import setup_app
|
||||
from langflow.services.database.utils import session_getter
|
||||
from langflow.services.deps import get_db_service, get_settings_service
|
||||
from langflow.services.utils import initialize_services, initialize_settings_service
|
||||
from langflow.utils.logger import configure, logger
|
||||
from multiprocess import Process, cpu_count # type: ignore
|
||||
from packaging import version as pkg_version
|
||||
from rich import box
|
||||
from rich import print as rprint
|
||||
from rich.console import Console
|
||||
from rich.panel import Panel
|
||||
from rich.table import Table
|
||||
|
||||
console = Console()
|
||||
|
||||
|
|
@ -99,8 +99,12 @@ def update_settings(
|
|||
|
||||
@app.command()
|
||||
def run(
|
||||
host: str = typer.Option("127.0.0.1", help="Host to bind the server to.", envvar="LANGFLOW_HOST"),
|
||||
workers: int = typer.Option(1, help="Number of worker processes.", envvar="LANGFLOW_WORKERS"),
|
||||
host: str = typer.Option(
|
||||
"127.0.0.1", help="Host to bind the server to.", envvar="LANGFLOW_HOST"
|
||||
),
|
||||
workers: int = typer.Option(
|
||||
1, help="Number of worker processes.", envvar="LANGFLOW_WORKERS"
|
||||
),
|
||||
timeout: int = typer.Option(300, help="Worker timeout in seconds."),
|
||||
port: int = typer.Option(7860, help="Port to listen on.", envvar="LANGFLOW_PORT"),
|
||||
components_path: Optional[Path] = typer.Option(
|
||||
|
|
@ -108,11 +112,19 @@ def run(
|
|||
help="Path to the directory containing custom components.",
|
||||
envvar="LANGFLOW_COMPONENTS_PATH",
|
||||
),
|
||||
config: str = typer.Option(Path(__file__).parent / "config.yaml", help="Path to the configuration file."),
|
||||
config: str = typer.Option(
|
||||
Path(__file__).parent / "config.yaml", help="Path to the configuration file."
|
||||
),
|
||||
# .env file param
|
||||
env_file: Path = typer.Option(None, help="Path to the .env file containing environment variables."),
|
||||
log_level: str = typer.Option("critical", help="Logging level.", envvar="LANGFLOW_LOG_LEVEL"),
|
||||
log_file: Path = typer.Option("logs/langflow.log", help="Path to the log file.", envvar="LANGFLOW_LOG_FILE"),
|
||||
env_file: Path = typer.Option(
|
||||
None, help="Path to the .env file containing environment variables."
|
||||
),
|
||||
log_level: str = typer.Option(
|
||||
"critical", help="Logging level.", envvar="LANGFLOW_LOG_LEVEL"
|
||||
),
|
||||
log_file: Path = typer.Option(
|
||||
"logs/langflow.log", help="Path to the log file.", envvar="LANGFLOW_LOG_FILE"
|
||||
),
|
||||
cache: Optional[str] = typer.Option(
|
||||
envvar="LANGFLOW_LANGCHAIN_CACHE",
|
||||
help="Type of cache to use. (InMemoryCache, SQLiteCache)",
|
||||
|
|
@ -189,22 +201,30 @@ def run(
|
|||
else:
|
||||
# Run using gunicorn on Linux
|
||||
run_on_mac_or_linux(host, port, log_level, options, app, open_browser)
|
||||
if open_browser:
|
||||
click.launch(f"http://{host}:{port}")
|
||||
|
||||
|
||||
def run_on_mac_or_linux(host, port, log_level, options, app, open_browser=True):
|
||||
webapp_process = Process(target=run_langflow, args=(host, port, log_level, options, app))
|
||||
webapp_process.start()
|
||||
def wait_for_server_ready(host, port):
|
||||
"""
|
||||
Wait for the server to become ready by polling the health endpoint.
|
||||
"""
|
||||
status_code = 0
|
||||
while status_code != 200:
|
||||
try:
|
||||
status_code = httpx.get(f"http://{host}:{port}/health").status_code
|
||||
|
||||
except Exception:
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
def run_on_mac_or_linux(host, port, log_level, options, app):
|
||||
webapp_process = Process(
|
||||
target=run_langflow, args=(host, port, log_level, options, app)
|
||||
)
|
||||
webapp_process.start()
|
||||
wait_for_server_ready(host, port)
|
||||
|
||||
print_banner(host, port)
|
||||
if open_browser:
|
||||
webbrowser.open(f"http://{host}:{port}")
|
||||
|
||||
|
||||
def run_on_windows(host, port, log_level, options, app):
|
||||
|
|
@ -245,40 +265,165 @@ def get_free_port(port):
|
|||
return port
|
||||
|
||||
|
||||
def print_banner(host, port):
|
||||
def version_is_prerelease(version: str):
|
||||
"""
|
||||
Check if a version is a pre-release version.
|
||||
"""
|
||||
return "a" in version or "b" in version or "rc" in version
|
||||
|
||||
|
||||
def get_letter_from_version(version: str):
|
||||
"""
|
||||
Get the letter from a pre-release version.
|
||||
"""
|
||||
if "a" in version:
|
||||
return "a"
|
||||
if "b" in version:
|
||||
return "b"
|
||||
if "rc" in version:
|
||||
return "rc"
|
||||
return None
|
||||
|
||||
|
||||
def build_new_version_notice(current_version: str, package_name: str):
|
||||
"""
|
||||
Build a new version notice.
|
||||
"""
|
||||
# The idea here is that we want to show a notice to the user
|
||||
# when a new version of Langflow is available.
|
||||
# The key is that if the version the user has is a pre-release
|
||||
# e.g 0.0.0a1, then we find the latest version that is pre-release
|
||||
# otherwise we find the latest stable version.
|
||||
# we will show the notice either way, but only if the version
|
||||
# the user has is not the latest version.
|
||||
if version_is_prerelease(current_version):
|
||||
# curl -s "https://pypi.org/pypi/langflow/json" | jq -r '.releases | keys | .[]' | sort -V | tail -n 1
|
||||
# this command will give us the latest pre-release version
|
||||
package_info = httpx.get(f"https://pypi.org/pypi/{package_name}/json").json()
|
||||
# 4.0.0a1 or 4.0.0b1 or 4.0.0rc1
|
||||
# find which type of pre-release version we have
|
||||
# could be a1, b1, rc1
|
||||
# we want the a, b, or rc and the number
|
||||
suffix_letter = get_letter_from_version(current_version)
|
||||
number_version = current_version.split(suffix_letter)[0]
|
||||
latest_version = sorted(
|
||||
package_info["releases"].keys(),
|
||||
key=lambda x: x.split(suffix_letter)[-1] and number_version in x,
|
||||
)[-1]
|
||||
if version_is_prerelease(latest_version) and latest_version != current_version:
|
||||
return (
|
||||
True,
|
||||
f"A new pre-release version of {package_name} is available: {latest_version}",
|
||||
)
|
||||
else:
|
||||
latest_version = httpx.get(f"https://pypi.org/pypi/{package_name}/json").json()[
|
||||
"info"
|
||||
]["version"]
|
||||
if not version_is_prerelease(latest_version):
|
||||
return (
|
||||
False,
|
||||
f"A new version of {package_name} is available: {latest_version}",
|
||||
)
|
||||
return False, ""
|
||||
|
||||
|
||||
def is_prerelease(version: str) -> bool:
|
||||
return "a" in version or "b" in version or "rc" in version
|
||||
|
||||
|
||||
def fetch_latest_version(package_name: str, include_prerelease: bool) -> str:
|
||||
response = httpx.get(f"https://pypi.org/pypi/{package_name}/json")
|
||||
versions = response.json()["releases"].keys()
|
||||
valid_versions = [v for v in versions if include_prerelease or not is_prerelease(v)]
|
||||
if not valid_versions:
|
||||
return None # Handle case where no valid versions are found
|
||||
return max(valid_versions, key=lambda v: pkg_version.parse(v))
|
||||
|
||||
|
||||
def build_version_notice(current_version: str, package_name: str) -> str:
|
||||
latest_version = fetch_latest_version(package_name, is_prerelease(current_version))
|
||||
if latest_version and pkg_version.parse(current_version) < pkg_version.parse(
|
||||
latest_version
|
||||
):
|
||||
release_type = "pre-release" if is_prerelease(latest_version) else "version"
|
||||
return f"A new {release_type} of {package_name} is available: {latest_version}"
|
||||
return ""
|
||||
|
||||
|
||||
def generate_pip_command(package_names, is_pre_release):
|
||||
"""
|
||||
Generate the pip install command based on the packages and whether it's a pre-release.
|
||||
"""
|
||||
base_command = "pip install"
|
||||
if is_pre_release:
|
||||
return f"{base_command} {' '.join(package_names)} -U --pre"
|
||||
else:
|
||||
return f"{base_command} {' '.join(package_names)} -U"
|
||||
|
||||
|
||||
def stylize_text(text: str, to_style: str, is_prerelease: bool) -> str:
|
||||
color = "#42a7f5" if is_prerelease else "#6e42f5"
|
||||
# return "".join(f"[{color}]{char}[/]" for char in text)
|
||||
styled_text = f"[{color}]{to_style}[/]"
|
||||
return text.replace(to_style, styled_text)
|
||||
|
||||
|
||||
def print_banner(host: str, port: int):
|
||||
notices = []
|
||||
package_names = [] # Track package names for pip install instructions
|
||||
is_pre_release = False # Track if any package is a pre-release
|
||||
package_name = ""
|
||||
|
||||
try:
|
||||
from langflow.version import __version__ # type: ignore
|
||||
from langflow.version import __version__ as langflow_version
|
||||
|
||||
version = __version__
|
||||
word = "Langflow"
|
||||
is_pre_release |= is_prerelease(langflow_version) # Update pre-release status
|
||||
notice = build_version_notice(langflow_version, "langflow")
|
||||
notice = stylize_text(notice, "langflow", is_pre_release)
|
||||
if notice:
|
||||
notices.append(notice)
|
||||
package_names.append("langflow")
|
||||
package_name = "Langflow"
|
||||
except ImportError:
|
||||
from importlib import metadata
|
||||
langflow_version = None
|
||||
|
||||
version = metadata.version("langflow-base")
|
||||
word = "Langflow Base"
|
||||
# Attempt to handle langflow-base similarly
|
||||
if langflow_version is None: # This means langflow.version was not imported
|
||||
try:
|
||||
from importlib import metadata
|
||||
|
||||
colors = ["#6e42f5"]
|
||||
langflow_base_version = metadata.version("langflow-base")
|
||||
is_pre_release |= is_prerelease(
|
||||
langflow_base_version
|
||||
) # Update pre-release status
|
||||
notice = build_version_notice(langflow_base_version, "langflow-base")
|
||||
notice = stylize_text(notice, "langflow-base", is_pre_release)
|
||||
if notice:
|
||||
notices.append(notice)
|
||||
package_names.append("langflow-base")
|
||||
package_name = "Langflow Base"
|
||||
except ImportError as e:
|
||||
logger.exception(e)
|
||||
raise e
|
||||
|
||||
styled_word = ""
|
||||
# Generate pip command based on the collected data
|
||||
pip_command = generate_pip_command(package_names, is_pre_release)
|
||||
|
||||
for i, char in enumerate(word):
|
||||
color = colors[i % len(colors)]
|
||||
styled_word += f"[{color}]{char}[/]"
|
||||
# Add pip install command to notices if any package needs an update
|
||||
if notices:
|
||||
notices.append(f"Run '{pip_command}' to update.")
|
||||
|
||||
# Title with emojis and gradient text
|
||||
title = (
|
||||
f"[bold]Welcome to :chains: {styled_word} v{version}[/bold]\n"
|
||||
f"Access [link=http://{host}:{port}]http://{host}:{port}[/link]"
|
||||
)
|
||||
info_text = (
|
||||
"Collaborate, and contribute at our "
|
||||
"[bold][link=https://github.com/logspace-ai/langflow]GitHub Repo[/link][/bold] :rocket:"
|
||||
styled_notices = [f"[bold]{notice}[/bold]" for notice in notices if notice]
|
||||
styled_package_name = stylize_text(
|
||||
package_name, package_name, any("pre-release" in notice for notice in notices)
|
||||
)
|
||||
|
||||
# Create a panel with the title and the info text, and a border around it
|
||||
panel = Panel(f"{title}\n{info_text}", box=box.ROUNDED, border_style="blue", expand=False)
|
||||
title = f"[bold]Welcome to :chains: {styled_package_name}[/bold]\n"
|
||||
info_text = "Collaborate, and contribute at our [bold][link=https://github.com/logspace-ai/langflow]GitHub Repo[/link][/bold] :rocket:"
|
||||
access_link = f"Access [link=http://{host}:{port}]http://{host}:{port}[/link]"
|
||||
|
||||
# Print the banner with a separator line before and after
|
||||
panel_content = "\n\n".join([title, *styled_notices, info_text, access_link])
|
||||
panel = Panel(panel_content, box=box.ROUNDED, border_style="blue", expand=False)
|
||||
rprint(panel)
|
||||
|
||||
|
||||
|
|
@ -314,8 +459,12 @@ def run_langflow(host, port, log_level, options, app):
|
|||
@app.command()
|
||||
def superuser(
|
||||
username: str = typer.Option(..., prompt=True, help="Username for the superuser."),
|
||||
password: str = typer.Option(..., prompt=True, hide_input=True, help="Password for the superuser."),
|
||||
log_level: str = typer.Option("error", help="Logging level.", envvar="LANGFLOW_LOG_LEVEL"),
|
||||
password: str = typer.Option(
|
||||
..., prompt=True, hide_input=True, help="Password for the superuser."
|
||||
),
|
||||
log_level: str = typer.Option(
|
||||
"error", help="Logging level.", envvar="LANGFLOW_LOG_LEVEL"
|
||||
),
|
||||
):
|
||||
"""
|
||||
Create a superuser.
|
||||
|
|
|
|||
|
|
@ -1,24 +1,45 @@
|
|||
import os
|
||||
import logging
|
||||
|
||||
from gunicorn import glogging
|
||||
from gunicorn.app.base import BaseApplication # type: ignore
|
||||
from uvicorn.workers import UvicornWorker
|
||||
|
||||
from langflow.utils.logger import InterceptHandler # type: ignore
|
||||
|
||||
|
||||
class LangflowUvicornWorker(UvicornWorker):
|
||||
CONFIG_KWARGS = {"loop": "asyncio"}
|
||||
|
||||
|
||||
class Logger(glogging.Logger):
|
||||
"""Implements and overrides the gunicorn logging interface.
|
||||
|
||||
This class inherits from the standard gunicorn logger and overrides it by
|
||||
replacing the handlers with `InterceptHandler` in order to route the
|
||||
gunicorn logs to loguru.
|
||||
"""
|
||||
|
||||
def __init__(self, cfg):
|
||||
super().__init__(cfg)
|
||||
logging.getLogger("gunicorn.error").handlers = [InterceptHandler()]
|
||||
logging.getLogger("gunicorn.access").handlers = [InterceptHandler()]
|
||||
|
||||
|
||||
class LangflowApplication(BaseApplication):
|
||||
def __init__(self, app, options=None):
|
||||
self.options = options or {}
|
||||
|
||||
self.options["worker_class"] = "langflow.server.LangflowUvicornWorker"
|
||||
self.options["loglevel"] = os.getenv("LANGFLOW_LOG_LEVEL", "error").lower()
|
||||
self.options["logger_class"] = Logger
|
||||
self.application = app
|
||||
super().__init__()
|
||||
|
||||
def load_config(self):
|
||||
config = {key: value for key, value in self.options.items() if key in self.cfg.settings and value is not None}
|
||||
config = {
|
||||
key: value
|
||||
for key, value in self.options.items()
|
||||
if key in self.cfg.settings and value is not None
|
||||
}
|
||||
for key, value in config.items():
|
||||
self.cfg.set(key.lower(), value)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,22 +1,21 @@
|
|||
from datetime import datetime
|
||||
import time
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import command, util
|
||||
from alembic.config import Config
|
||||
from loguru import logger
|
||||
from sqlalchemy import inspect
|
||||
from sqlalchemy.exc import OperationalError
|
||||
from sqlmodel import Session, SQLModel, create_engine, select, text
|
||||
|
||||
from langflow.services.base import Service
|
||||
from langflow.services.database import models # noqa
|
||||
from langflow.services.database.models.user.crud import get_user_by_username
|
||||
from langflow.services.database.utils import Result, TableResults
|
||||
from langflow.services.deps import get_settings_service
|
||||
from langflow.services.utils import teardown_superuser
|
||||
from loguru import logger
|
||||
from sqlalchemy import inspect
|
||||
from sqlalchemy.exc import OperationalError
|
||||
from sqlmodel import Session, SQLModel, create_engine, select, text
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from sqlalchemy.engine import Engine
|
||||
|
|
@ -37,7 +36,10 @@ class DatabaseService(Service):
|
|||
def _create_engine(self) -> "Engine":
|
||||
"""Create the engine for the database."""
|
||||
settings_service = get_settings_service()
|
||||
if settings_service.settings.DATABASE_URL and settings_service.settings.DATABASE_URL.startswith("sqlite"):
|
||||
if (
|
||||
settings_service.settings.DATABASE_URL
|
||||
and settings_service.settings.DATABASE_URL.startswith("sqlite")
|
||||
):
|
||||
connect_args = {"check_same_thread": False}
|
||||
else:
|
||||
connect_args = {}
|
||||
|
|
@ -49,7 +51,9 @@ class DatabaseService(Service):
|
|||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
if exc_type is not None: # If an exception has been raised
|
||||
logger.error(f"Session rollback because of exception: {exc_type.__name__} {exc_value}")
|
||||
logger.error(
|
||||
f"Session rollback because of exception: {exc_type.__name__} {exc_value}"
|
||||
)
|
||||
self._session.rollback()
|
||||
else:
|
||||
self._session.commit()
|
||||
|
|
@ -66,7 +70,9 @@ class DatabaseService(Service):
|
|||
settings_service = get_settings_service()
|
||||
if settings_service.auth_settings.AUTO_LOGIN:
|
||||
with Session(self.engine) as session:
|
||||
flows = session.exec(select(models.Flow).where(models.Flow.user_id is None)).all()
|
||||
flows = session.exec(
|
||||
select(models.Flow).where(models.Flow.user_id is None)
|
||||
).all()
|
||||
if flows:
|
||||
logger.debug("Migrating flows to default superuser")
|
||||
username = settings_service.auth_settings.SUPERUSER
|
||||
|
|
@ -96,14 +102,16 @@ class DatabaseService(Service):
|
|||
expected_columns = list(model.model_fields.keys())
|
||||
|
||||
try:
|
||||
available_columns = [col["name"] for col in inspector.get_columns(table)]
|
||||
available_columns = [
|
||||
col["name"] for col in inspector.get_columns(table)
|
||||
]
|
||||
except sa.exc.NoSuchTableError:
|
||||
logger.error(f"Missing table: {table}")
|
||||
logger.debug(f"Missing table: {table}")
|
||||
return False
|
||||
|
||||
for column in expected_columns:
|
||||
if column not in available_columns:
|
||||
logger.error(f"Missing column: {column} in table {table}")
|
||||
logger.debug(f"Missing column: {column} in table {table}")
|
||||
return False
|
||||
|
||||
for table in legacy_tables:
|
||||
|
|
@ -160,7 +168,9 @@ class DatabaseService(Service):
|
|||
buffer.write(f"{datetime.now().isoformat()}: Checking migrations\n")
|
||||
command.check(alembic_cfg)
|
||||
except Exception as exc:
|
||||
if isinstance(exc, (util.exc.CommandError, util.exc.AutogenerateDiffsDetected)):
|
||||
if isinstance(
|
||||
exc, (util.exc.CommandError, util.exc.AutogenerateDiffsDetected)
|
||||
):
|
||||
command.upgrade(alembic_cfg, "head")
|
||||
time.sleep(3)
|
||||
|
||||
|
|
@ -197,7 +207,10 @@ class DatabaseService(Service):
|
|||
# We will check that all models are in the database
|
||||
# and that the database is up to date with all columns
|
||||
sql_models = [models.Flow, models.User, models.ApiKey]
|
||||
return [TableResults(sql_model.__tablename__, self.check_table(sql_model)) for sql_model in sql_models]
|
||||
return [
|
||||
TableResults(sql_model.__tablename__, self.check_table(sql_model))
|
||||
for sql_model in sql_models
|
||||
]
|
||||
|
||||
def check_table(self, model):
|
||||
results = []
|
||||
|
|
@ -206,7 +219,9 @@ class DatabaseService(Service):
|
|||
expected_columns = list(model.__fields__.keys())
|
||||
available_columns = []
|
||||
try:
|
||||
available_columns = [col["name"] for col in inspector.get_columns(table_name)]
|
||||
available_columns = [
|
||||
col["name"] for col in inspector.get_columns(table_name)
|
||||
]
|
||||
results.append(Result(name=table_name, type="table", success=True))
|
||||
except sa.exc.NoSuchTableError:
|
||||
logger.error(f"Missing table: {table_name}")
|
||||
|
|
@ -237,7 +252,9 @@ class DatabaseService(Service):
|
|||
try:
|
||||
table.create(self.engine, checkfirst=True)
|
||||
except OperationalError as oe:
|
||||
logger.warning(f"Table {table} already exists, skipping. Exception: {oe}")
|
||||
logger.warning(
|
||||
f"Table {table} already exists, skipping. Exception: {oe}"
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.error(f"Error creating table {table}: {exc}")
|
||||
raise RuntimeError(f"Error creating table {table}") from exc
|
||||
|
|
@ -249,7 +266,9 @@ class DatabaseService(Service):
|
|||
if table not in table_names:
|
||||
logger.error("Something went wrong creating the database and tables.")
|
||||
logger.error("Please check your database settings.")
|
||||
raise RuntimeError("Something went wrong creating the database and tables.")
|
||||
raise RuntimeError(
|
||||
"Something went wrong creating the database and tables."
|
||||
)
|
||||
|
||||
logger.debug("Database and tables created successfully")
|
||||
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
|
@ -25,7 +26,10 @@ def patching(record):
|
|||
|
||||
|
||||
def configure(log_level: Optional[str] = None, log_file: Optional[Path] = None):
|
||||
if os.getenv("LANGFLOW_LOG_LEVEL", "").upper() in VALID_LOG_LEVELS and log_level is None:
|
||||
if (
|
||||
os.getenv("LANGFLOW_LOG_LEVEL", "").upper() in VALID_LOG_LEVELS
|
||||
and log_level is None
|
||||
):
|
||||
log_level = os.getenv("LANGFLOW_LOG_LEVEL")
|
||||
if log_level is None:
|
||||
log_level = "ERROR"
|
||||
|
|
@ -67,3 +71,46 @@ def configure(log_level: Optional[str] = None, log_file: Optional[Path] = None):
|
|||
logger.debug(f"Logger set up with log level: {log_level}")
|
||||
if log_file:
|
||||
logger.debug(f"Log file: {log_file}")
|
||||
|
||||
setup_uvicorn_logger()
|
||||
setup_gunicorn_logger()
|
||||
|
||||
|
||||
def setup_uvicorn_logger():
|
||||
loggers = (
|
||||
logging.getLogger(name)
|
||||
for name in logging.root.manager.loggerDict
|
||||
if name.startswith("uvicorn.")
|
||||
)
|
||||
for uvicorn_logger in loggers:
|
||||
uvicorn_logger.handlers = []
|
||||
logging.getLogger("uvicorn").handlers = [InterceptHandler()]
|
||||
|
||||
|
||||
def setup_gunicorn_logger():
|
||||
logging.getLogger("gunicorn.error").handlers = [InterceptHandler()]
|
||||
logging.getLogger("gunicorn.access").handlers = [InterceptHandler()]
|
||||
|
||||
|
||||
class InterceptHandler(logging.Handler):
|
||||
"""
|
||||
Default handler from examples in loguru documentaion.
|
||||
See https://loguru.readthedocs.io/en/stable/overview.html#entirely-compatible-with-standard-logging
|
||||
"""
|
||||
|
||||
def emit(self, record):
|
||||
# Get corresponding Loguru level if it exists
|
||||
try:
|
||||
level = logger.level(record.levelname).name
|
||||
except ValueError:
|
||||
level = record.levelno
|
||||
|
||||
# Find caller from where originated the logged message
|
||||
frame, depth = logging.currentframe(), 2
|
||||
while frame.f_code.co_filename == logging.__file__:
|
||||
frame = frame.f_back
|
||||
depth += 1
|
||||
|
||||
logger.opt(depth=depth, exception=record.exc_info).log(
|
||||
level, record.getMessage()
|
||||
)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue