diff --git a/.vscode/launch.json b/.vscode/launch.json index 962599449..40a60f354 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -32,7 +32,7 @@ "args": [ "run", "--path", - "${workspaceFolder}/src/backend/langflow/frontend" + "${workspaceFolder}/src/backend/base/langflow/frontend" ], "jinja": true, "justMyCode": false, diff --git a/src/backend/base/langflow/__main__.py b/src/backend/base/langflow/__main__.py index 55f792c3c..bfb2a5311 100644 --- a/src/backend/base/langflow/__main__.py +++ b/src/backend/base/langflow/__main__.py @@ -2,25 +2,25 @@ import platform import socket import sys import time -import webbrowser from pathlib import Path from typing import Optional +import click import httpx import typer from dotenv import load_dotenv -from multiprocess import Process, cpu_count # type: ignore -from rich import box -from rich import print as rprint -from rich.console import Console -from rich.panel import Panel -from rich.table import Table - from langflow.main import setup_app from langflow.services.database.utils import session_getter from langflow.services.deps import get_db_service, get_settings_service from langflow.services.utils import initialize_services, initialize_settings_service from langflow.utils.logger import configure, logger +from multiprocess import Process, cpu_count # type: ignore +from packaging import version as pkg_version +from rich import box +from rich import print as rprint +from rich.console import Console +from rich.panel import Panel +from rich.table import Table console = Console() @@ -99,8 +99,12 @@ def update_settings( @app.command() def run( - host: str = typer.Option("127.0.0.1", help="Host to bind the server to.", envvar="LANGFLOW_HOST"), - workers: int = typer.Option(1, help="Number of worker processes.", envvar="LANGFLOW_WORKERS"), + host: str = typer.Option( + "127.0.0.1", help="Host to bind the server to.", envvar="LANGFLOW_HOST" + ), + workers: int = typer.Option( + 1, help="Number of worker processes.", envvar="LANGFLOW_WORKERS" + ), timeout: int = typer.Option(300, help="Worker timeout in seconds."), port: int = typer.Option(7860, help="Port to listen on.", envvar="LANGFLOW_PORT"), components_path: Optional[Path] = typer.Option( @@ -108,11 +112,19 @@ def run( help="Path to the directory containing custom components.", envvar="LANGFLOW_COMPONENTS_PATH", ), - config: str = typer.Option(Path(__file__).parent / "config.yaml", help="Path to the configuration file."), + config: str = typer.Option( + Path(__file__).parent / "config.yaml", help="Path to the configuration file." + ), # .env file param - env_file: Path = typer.Option(None, help="Path to the .env file containing environment variables."), - log_level: str = typer.Option("critical", help="Logging level.", envvar="LANGFLOW_LOG_LEVEL"), - log_file: Path = typer.Option("logs/langflow.log", help="Path to the log file.", envvar="LANGFLOW_LOG_FILE"), + env_file: Path = typer.Option( + None, help="Path to the .env file containing environment variables." + ), + log_level: str = typer.Option( + "critical", help="Logging level.", envvar="LANGFLOW_LOG_LEVEL" + ), + log_file: Path = typer.Option( + "logs/langflow.log", help="Path to the log file.", envvar="LANGFLOW_LOG_FILE" + ), cache: Optional[str] = typer.Option( envvar="LANGFLOW_LANGCHAIN_CACHE", help="Type of cache to use. (InMemoryCache, SQLiteCache)", @@ -189,22 +201,30 @@ def run( else: # Run using gunicorn on Linux run_on_mac_or_linux(host, port, log_level, options, app, open_browser) + if open_browser: + click.launch(f"http://{host}:{port}") -def run_on_mac_or_linux(host, port, log_level, options, app, open_browser=True): - webapp_process = Process(target=run_langflow, args=(host, port, log_level, options, app)) - webapp_process.start() +def wait_for_server_ready(host, port): + """ + Wait for the server to become ready by polling the health endpoint. + """ status_code = 0 while status_code != 200: try: status_code = httpx.get(f"http://{host}:{port}/health").status_code - except Exception: time.sleep(1) + +def run_on_mac_or_linux(host, port, log_level, options, app): + webapp_process = Process( + target=run_langflow, args=(host, port, log_level, options, app) + ) + webapp_process.start() + wait_for_server_ready(host, port) + print_banner(host, port) - if open_browser: - webbrowser.open(f"http://{host}:{port}") def run_on_windows(host, port, log_level, options, app): @@ -245,40 +265,165 @@ def get_free_port(port): return port -def print_banner(host, port): +def version_is_prerelease(version: str): + """ + Check if a version is a pre-release version. + """ + return "a" in version or "b" in version or "rc" in version + + +def get_letter_from_version(version: str): + """ + Get the letter from a pre-release version. + """ + if "a" in version: + return "a" + if "b" in version: + return "b" + if "rc" in version: + return "rc" + return None + + +def build_new_version_notice(current_version: str, package_name: str): + """ + Build a new version notice. + """ + # The idea here is that we want to show a notice to the user + # when a new version of Langflow is available. + # The key is that if the version the user has is a pre-release + # e.g 0.0.0a1, then we find the latest version that is pre-release + # otherwise we find the latest stable version. + # we will show the notice either way, but only if the version + # the user has is not the latest version. + if version_is_prerelease(current_version): + # curl -s "https://pypi.org/pypi/langflow/json" | jq -r '.releases | keys | .[]' | sort -V | tail -n 1 + # this command will give us the latest pre-release version + package_info = httpx.get(f"https://pypi.org/pypi/{package_name}/json").json() + # 4.0.0a1 or 4.0.0b1 or 4.0.0rc1 + # find which type of pre-release version we have + # could be a1, b1, rc1 + # we want the a, b, or rc and the number + suffix_letter = get_letter_from_version(current_version) + number_version = current_version.split(suffix_letter)[0] + latest_version = sorted( + package_info["releases"].keys(), + key=lambda x: x.split(suffix_letter)[-1] and number_version in x, + )[-1] + if version_is_prerelease(latest_version) and latest_version != current_version: + return ( + True, + f"A new pre-release version of {package_name} is available: {latest_version}", + ) + else: + latest_version = httpx.get(f"https://pypi.org/pypi/{package_name}/json").json()[ + "info" + ]["version"] + if not version_is_prerelease(latest_version): + return ( + False, + f"A new version of {package_name} is available: {latest_version}", + ) + return False, "" + + +def is_prerelease(version: str) -> bool: + return "a" in version or "b" in version or "rc" in version + + +def fetch_latest_version(package_name: str, include_prerelease: bool) -> str: + response = httpx.get(f"https://pypi.org/pypi/{package_name}/json") + versions = response.json()["releases"].keys() + valid_versions = [v for v in versions if include_prerelease or not is_prerelease(v)] + if not valid_versions: + return None # Handle case where no valid versions are found + return max(valid_versions, key=lambda v: pkg_version.parse(v)) + + +def build_version_notice(current_version: str, package_name: str) -> str: + latest_version = fetch_latest_version(package_name, is_prerelease(current_version)) + if latest_version and pkg_version.parse(current_version) < pkg_version.parse( + latest_version + ): + release_type = "pre-release" if is_prerelease(latest_version) else "version" + return f"A new {release_type} of {package_name} is available: {latest_version}" + return "" + + +def generate_pip_command(package_names, is_pre_release): + """ + Generate the pip install command based on the packages and whether it's a pre-release. + """ + base_command = "pip install" + if is_pre_release: + return f"{base_command} {' '.join(package_names)} -U --pre" + else: + return f"{base_command} {' '.join(package_names)} -U" + + +def stylize_text(text: str, to_style: str, is_prerelease: bool) -> str: + color = "#42a7f5" if is_prerelease else "#6e42f5" + # return "".join(f"[{color}]{char}[/]" for char in text) + styled_text = f"[{color}]{to_style}[/]" + return text.replace(to_style, styled_text) + + +def print_banner(host: str, port: int): + notices = [] + package_names = [] # Track package names for pip install instructions + is_pre_release = False # Track if any package is a pre-release + package_name = "" + try: - from langflow.version import __version__ # type: ignore + from langflow.version import __version__ as langflow_version - version = __version__ - word = "Langflow" + is_pre_release |= is_prerelease(langflow_version) # Update pre-release status + notice = build_version_notice(langflow_version, "langflow") + notice = stylize_text(notice, "langflow", is_pre_release) + if notice: + notices.append(notice) + package_names.append("langflow") + package_name = "Langflow" except ImportError: - from importlib import metadata + langflow_version = None - version = metadata.version("langflow-base") - word = "Langflow Base" + # Attempt to handle langflow-base similarly + if langflow_version is None: # This means langflow.version was not imported + try: + from importlib import metadata - colors = ["#6e42f5"] + langflow_base_version = metadata.version("langflow-base") + is_pre_release |= is_prerelease( + langflow_base_version + ) # Update pre-release status + notice = build_version_notice(langflow_base_version, "langflow-base") + notice = stylize_text(notice, "langflow-base", is_pre_release) + if notice: + notices.append(notice) + package_names.append("langflow-base") + package_name = "Langflow Base" + except ImportError as e: + logger.exception(e) + raise e - styled_word = "" + # Generate pip command based on the collected data + pip_command = generate_pip_command(package_names, is_pre_release) - for i, char in enumerate(word): - color = colors[i % len(colors)] - styled_word += f"[{color}]{char}[/]" + # Add pip install command to notices if any package needs an update + if notices: + notices.append(f"Run '{pip_command}' to update.") - # Title with emojis and gradient text - title = ( - f"[bold]Welcome to :chains: {styled_word} v{version}[/bold]\n" - f"Access [link=http://{host}:{port}]http://{host}:{port}[/link]" - ) - info_text = ( - "Collaborate, and contribute at our " - "[bold][link=https://github.com/logspace-ai/langflow]GitHub Repo[/link][/bold] :rocket:" + styled_notices = [f"[bold]{notice}[/bold]" for notice in notices if notice] + styled_package_name = stylize_text( + package_name, package_name, any("pre-release" in notice for notice in notices) ) - # Create a panel with the title and the info text, and a border around it - panel = Panel(f"{title}\n{info_text}", box=box.ROUNDED, border_style="blue", expand=False) + title = f"[bold]Welcome to :chains: {styled_package_name}[/bold]\n" + info_text = "Collaborate, and contribute at our [bold][link=https://github.com/logspace-ai/langflow]GitHub Repo[/link][/bold] :rocket:" + access_link = f"Access [link=http://{host}:{port}]http://{host}:{port}[/link]" - # Print the banner with a separator line before and after + panel_content = "\n\n".join([title, *styled_notices, info_text, access_link]) + panel = Panel(panel_content, box=box.ROUNDED, border_style="blue", expand=False) rprint(panel) @@ -314,8 +459,12 @@ def run_langflow(host, port, log_level, options, app): @app.command() def superuser( username: str = typer.Option(..., prompt=True, help="Username for the superuser."), - password: str = typer.Option(..., prompt=True, hide_input=True, help="Password for the superuser."), - log_level: str = typer.Option("error", help="Logging level.", envvar="LANGFLOW_LOG_LEVEL"), + password: str = typer.Option( + ..., prompt=True, hide_input=True, help="Password for the superuser." + ), + log_level: str = typer.Option( + "error", help="Logging level.", envvar="LANGFLOW_LOG_LEVEL" + ), ): """ Create a superuser. diff --git a/src/backend/base/langflow/server.py b/src/backend/base/langflow/server.py index b7e86934d..6a1509dc0 100644 --- a/src/backend/base/langflow/server.py +++ b/src/backend/base/langflow/server.py @@ -1,24 +1,45 @@ -import os +import logging +from gunicorn import glogging from gunicorn.app.base import BaseApplication # type: ignore from uvicorn.workers import UvicornWorker +from langflow.utils.logger import InterceptHandler # type: ignore + class LangflowUvicornWorker(UvicornWorker): CONFIG_KWARGS = {"loop": "asyncio"} +class Logger(glogging.Logger): + """Implements and overrides the gunicorn logging interface. + + This class inherits from the standard gunicorn logger and overrides it by + replacing the handlers with `InterceptHandler` in order to route the + gunicorn logs to loguru. + """ + + def __init__(self, cfg): + super().__init__(cfg) + logging.getLogger("gunicorn.error").handlers = [InterceptHandler()] + logging.getLogger("gunicorn.access").handlers = [InterceptHandler()] + + class LangflowApplication(BaseApplication): def __init__(self, app, options=None): self.options = options or {} self.options["worker_class"] = "langflow.server.LangflowUvicornWorker" - self.options["loglevel"] = os.getenv("LANGFLOW_LOG_LEVEL", "error").lower() + self.options["logger_class"] = Logger self.application = app super().__init__() def load_config(self): - config = {key: value for key, value in self.options.items() if key in self.cfg.settings and value is not None} + config = { + key: value + for key, value in self.options.items() + if key in self.cfg.settings and value is not None + } for key, value in config.items(): self.cfg.set(key.lower(), value) diff --git a/src/backend/base/langflow/services/database/service.py b/src/backend/base/langflow/services/database/service.py index f3d77f6d7..f06f6edc7 100644 --- a/src/backend/base/langflow/services/database/service.py +++ b/src/backend/base/langflow/services/database/service.py @@ -1,22 +1,21 @@ -from datetime import datetime import time +from datetime import datetime from pathlib import Path from typing import TYPE_CHECKING import sqlalchemy as sa from alembic import command, util from alembic.config import Config -from loguru import logger -from sqlalchemy import inspect -from sqlalchemy.exc import OperationalError -from sqlmodel import Session, SQLModel, create_engine, select, text - from langflow.services.base import Service from langflow.services.database import models # noqa from langflow.services.database.models.user.crud import get_user_by_username from langflow.services.database.utils import Result, TableResults from langflow.services.deps import get_settings_service from langflow.services.utils import teardown_superuser +from loguru import logger +from sqlalchemy import inspect +from sqlalchemy.exc import OperationalError +from sqlmodel import Session, SQLModel, create_engine, select, text if TYPE_CHECKING: from sqlalchemy.engine import Engine @@ -37,7 +36,10 @@ class DatabaseService(Service): def _create_engine(self) -> "Engine": """Create the engine for the database.""" settings_service = get_settings_service() - if settings_service.settings.DATABASE_URL and settings_service.settings.DATABASE_URL.startswith("sqlite"): + if ( + settings_service.settings.DATABASE_URL + and settings_service.settings.DATABASE_URL.startswith("sqlite") + ): connect_args = {"check_same_thread": False} else: connect_args = {} @@ -49,7 +51,9 @@ class DatabaseService(Service): def __exit__(self, exc_type, exc_value, traceback): if exc_type is not None: # If an exception has been raised - logger.error(f"Session rollback because of exception: {exc_type.__name__} {exc_value}") + logger.error( + f"Session rollback because of exception: {exc_type.__name__} {exc_value}" + ) self._session.rollback() else: self._session.commit() @@ -66,7 +70,9 @@ class DatabaseService(Service): settings_service = get_settings_service() if settings_service.auth_settings.AUTO_LOGIN: with Session(self.engine) as session: - flows = session.exec(select(models.Flow).where(models.Flow.user_id is None)).all() + flows = session.exec( + select(models.Flow).where(models.Flow.user_id is None) + ).all() if flows: logger.debug("Migrating flows to default superuser") username = settings_service.auth_settings.SUPERUSER @@ -96,14 +102,16 @@ class DatabaseService(Service): expected_columns = list(model.model_fields.keys()) try: - available_columns = [col["name"] for col in inspector.get_columns(table)] + available_columns = [ + col["name"] for col in inspector.get_columns(table) + ] except sa.exc.NoSuchTableError: - logger.error(f"Missing table: {table}") + logger.debug(f"Missing table: {table}") return False for column in expected_columns: if column not in available_columns: - logger.error(f"Missing column: {column} in table {table}") + logger.debug(f"Missing column: {column} in table {table}") return False for table in legacy_tables: @@ -160,7 +168,9 @@ class DatabaseService(Service): buffer.write(f"{datetime.now().isoformat()}: Checking migrations\n") command.check(alembic_cfg) except Exception as exc: - if isinstance(exc, (util.exc.CommandError, util.exc.AutogenerateDiffsDetected)): + if isinstance( + exc, (util.exc.CommandError, util.exc.AutogenerateDiffsDetected) + ): command.upgrade(alembic_cfg, "head") time.sleep(3) @@ -197,7 +207,10 @@ class DatabaseService(Service): # We will check that all models are in the database # and that the database is up to date with all columns sql_models = [models.Flow, models.User, models.ApiKey] - return [TableResults(sql_model.__tablename__, self.check_table(sql_model)) for sql_model in sql_models] + return [ + TableResults(sql_model.__tablename__, self.check_table(sql_model)) + for sql_model in sql_models + ] def check_table(self, model): results = [] @@ -206,7 +219,9 @@ class DatabaseService(Service): expected_columns = list(model.__fields__.keys()) available_columns = [] try: - available_columns = [col["name"] for col in inspector.get_columns(table_name)] + available_columns = [ + col["name"] for col in inspector.get_columns(table_name) + ] results.append(Result(name=table_name, type="table", success=True)) except sa.exc.NoSuchTableError: logger.error(f"Missing table: {table_name}") @@ -237,7 +252,9 @@ class DatabaseService(Service): try: table.create(self.engine, checkfirst=True) except OperationalError as oe: - logger.warning(f"Table {table} already exists, skipping. Exception: {oe}") + logger.warning( + f"Table {table} already exists, skipping. Exception: {oe}" + ) except Exception as exc: logger.error(f"Error creating table {table}: {exc}") raise RuntimeError(f"Error creating table {table}") from exc @@ -249,7 +266,9 @@ class DatabaseService(Service): if table not in table_names: logger.error("Something went wrong creating the database and tables.") logger.error("Please check your database settings.") - raise RuntimeError("Something went wrong creating the database and tables.") + raise RuntimeError( + "Something went wrong creating the database and tables." + ) logger.debug("Database and tables created successfully") diff --git a/src/backend/base/langflow/utils/logger.py b/src/backend/base/langflow/utils/logger.py index d92bdb0e1..a656a8462 100644 --- a/src/backend/base/langflow/utils/logger.py +++ b/src/backend/base/langflow/utils/logger.py @@ -1,3 +1,4 @@ +import logging import os from pathlib import Path from typing import Optional @@ -25,7 +26,10 @@ def patching(record): def configure(log_level: Optional[str] = None, log_file: Optional[Path] = None): - if os.getenv("LANGFLOW_LOG_LEVEL", "").upper() in VALID_LOG_LEVELS and log_level is None: + if ( + os.getenv("LANGFLOW_LOG_LEVEL", "").upper() in VALID_LOG_LEVELS + and log_level is None + ): log_level = os.getenv("LANGFLOW_LOG_LEVEL") if log_level is None: log_level = "ERROR" @@ -67,3 +71,46 @@ def configure(log_level: Optional[str] = None, log_file: Optional[Path] = None): logger.debug(f"Logger set up with log level: {log_level}") if log_file: logger.debug(f"Log file: {log_file}") + + setup_uvicorn_logger() + setup_gunicorn_logger() + + +def setup_uvicorn_logger(): + loggers = ( + logging.getLogger(name) + for name in logging.root.manager.loggerDict + if name.startswith("uvicorn.") + ) + for uvicorn_logger in loggers: + uvicorn_logger.handlers = [] + logging.getLogger("uvicorn").handlers = [InterceptHandler()] + + +def setup_gunicorn_logger(): + logging.getLogger("gunicorn.error").handlers = [InterceptHandler()] + logging.getLogger("gunicorn.access").handlers = [InterceptHandler()] + + +class InterceptHandler(logging.Handler): + """ + Default handler from examples in loguru documentaion. + See https://loguru.readthedocs.io/en/stable/overview.html#entirely-compatible-with-standard-logging + """ + + def emit(self, record): + # Get corresponding Loguru level if it exists + try: + level = logger.level(record.levelname).name + except ValueError: + level = record.levelno + + # Find caller from where originated the logged message + frame, depth = logging.currentframe(), 2 + while frame.f_code.co_filename == logging.__file__: + frame = frame.f_back + depth += 1 + + logger.opt(depth=depth, exception=record.exc_info).log( + level, record.getMessage() + )