diff --git a/src/backend/base/langflow/api/__init__.py b/src/backend/base/langflow/api/__init__.py index 64516cd11..64d72c61d 100644 --- a/src/backend/base/langflow/api/__init__.py +++ b/src/backend/base/langflow/api/__init__.py @@ -1,4 +1,5 @@ from langflow.api.router import router from langflow.api.health_check_router import health_check_router +from langflow.api.log_router import log_router -__all__ = ["router", "health_check_router"] +__all__ = ["router", "health_check_router", "log_router"] diff --git a/src/backend/base/langflow/api/log_router.py b/src/backend/base/langflow/api/log_router.py new file mode 100644 index 000000000..7bd49a321 --- /dev/null +++ b/src/backend/base/langflow/api/log_router.py @@ -0,0 +1,86 @@ +import asyncio +import json +from fastapi import APIRouter, Query, HTTPException, Request +from fastapi.responses import JSONResponse, StreamingResponse +from http import HTTPStatus +from langflow.utils.logger import log_buffer + +log_router = APIRouter(tags=["Log"]) + + +async def event_generator(request: Request): + # latest_timestamp = time.time() + global log_buffer + + last_line = log_buffer.get_last_n(1) + latest_timestamp, _ = last_line.popitem() + while True: + if await request.is_disconnected(): + break + + new_logs = log_buffer.get_after_timestamp(timestamp=latest_timestamp, lines=100) + if new_logs: + temp_ts = 0.0 + for ts, msg in new_logs.items(): + if ts > latest_timestamp: + yield f"{json.dumps({ts:msg})}\n" + temp_ts = ts + # for the next query iteration + latest_timestamp = temp_ts + else: + yield ": keepalive\n\n" + + await asyncio.sleep(1) + + +@log_router.get("/logs-stream") +async def stream_logs( + request: Request, +): + """ + HTTP/2 Server-Sent-Event (SSE) endpoint for streaming logs + it establishes a long-lived connection to the server and receives log messages in real-time + the client should use the head "Accept: text/event-stream" + """ + global log_buffer + if log_buffer.enabled() is False: + raise HTTPException( + status_code=HTTPStatus.NOT_IMPLEMENTED, + detail="Log retrieval is disabled", + ) + + return StreamingResponse(event_generator(request), media_type="text/event-stream") + + +@log_router.get("/logs") +async def logs( + lines_before: int = Query(1, ge=1, description="The number of logs before the timestamp or the last log"), + lines_after: int = Query(0, ge=1, description="The number of logs after the timestamp"), + timestamp: float = Query(0, description="The timestamp to start streaming logs from"), +): + global log_buffer + if log_buffer.enabled() is False: + raise HTTPException( + status_code=HTTPStatus.NOT_IMPLEMENTED, + detail="Log retrieval is disabled", + ) + + logs = dict() + if lines_after > 0 and timestamp == 0: + raise HTTPException( + status_code=HTTPStatus.BAD_REQUEST, + detail="Timestamp is required when requesting logs after the timestamp", + ) + + if lines_after > 0 and timestamp > 0: + logs = log_buffer.get_after_timestamp(timestamp=timestamp, lines=lines_after) + return JSONResponse(content=logs) + + if timestamp == 0: + if lines_before > 0: + logs = log_buffer.get_last_n(lines_before) + else: + if lines_before > 0: + logs = log_buffer.get_before_timestamp(timestamp=timestamp, lines=lines_before) + + return JSONResponse(content=logs) diff --git a/src/backend/base/langflow/main.py b/src/backend/base/langflow/main.py index ea76206ac..7aa67b335 100644 --- a/src/backend/base/langflow/main.py +++ b/src/backend/base/langflow/main.py @@ -18,7 +18,7 @@ from rich import print as rprint from starlette.middleware.base import BaseHTTPMiddleware from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor -from langflow.api import router, health_check_router +from langflow.api import router, health_check_router, log_router from langflow.initial_setup.setup import ( create_or_update_starter_projects, initialize_super_user_if_needed, @@ -158,6 +158,7 @@ def create_app(): app.include_router(router) app.include_router(health_check_router) + app.include_router(log_router) @app.exception_handler(Exception) async def exception_handler(request: Request, exc: Exception): diff --git a/src/backend/base/langflow/utils/logger.py b/src/backend/base/langflow/utils/logger.py index c84cbeced..250dfb2a6 100644 --- a/src/backend/base/langflow/utils/logger.py +++ b/src/backend/base/langflow/utils/logger.py @@ -1,7 +1,11 @@ +import json import logging import os import sys from pathlib import Path +from collections import OrderedDict +from itertools import islice +from threading import Lock, Semaphore from typing import Optional import orjson @@ -12,6 +16,98 @@ from rich.logging import RichHandler VALID_LOG_LEVELS = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] +class SizedLogBuffer: + def __init__( + self, + max_readers: int = 20, # max number of concurrent readers for the buffer + ): + """ + a buffer for storing log messages for the log retrieval API + the buffer can be overwritten by an env variable LANGFLOW_LOG_RETRIEVER_BUFFER_SIZE + because the logger is initialized before the settings_service are loaded + """ + self.max: int = 0 + env_buffer_size = os.getenv("LANGFLOW_LOG_RETRIEVER_BUFFER_SIZE", "0") + if env_buffer_size.isdigit(): + self.max = int(env_buffer_size) + + self.buffer: OrderedDict[float, str] = OrderedDict() + + self._max_readers = max_readers + self._wlock = Lock() + self._rsemaphore = Semaphore(max_readers) + + def write(self, message: str): + record = json.loads(message) + log_entry = record["text"] + epoch = record["record"]["time"]["timestamp"] + + # wait until all reader semaphore are released + while self._rsemaphore._value != self._max_readers: + continue + + with self._wlock: + if len(self.buffer) >= self.max: + # remove the oldest log entry if the buffer is full + self.buffer.popitem(last=False) + self.buffer[epoch] = log_entry + + def __len__(self): + return len(self.buffer) + + def get_after_timestamp(self, timestamp: float, lines: int = 5) -> dict[float, str]: + rc = dict() + + # wait until no write + while self._wlock.locked(): + continue + self._rsemaphore.acquire() + for ts, msg in self.buffer.items(): + if lines == 0: + break + if ts >= timestamp and lines > 0: + rc[ts] = msg + lines -= 1 + self._rsemaphore.release() + + return rc + + def get_before_timestamp(self, timestamp: float, lines: int = 5) -> dict[float, str]: + rc = dict() + # wait until no write + while self._wlock.locked(): + continue + self._rsemaphore.acquire() + for ts, msg in reversed(self.buffer.items()): + if lines == 0: + break + if ts < timestamp and lines > 0: + rc[ts] = msg + lines -= 1 + self._rsemaphore.release() + + return rc + + def get_last_n(self, last_idx: int) -> dict[float, str]: + # wait until no write + while self._wlock.locked(): + continue + self._rsemaphore.acquire() + rc = dict(islice(reversed(self.buffer.items()), last_idx)) + self._rsemaphore.release() + return rc + + def enabled(self) -> bool: + return self.max > 0 + + def max_size(self) -> int: + return self.max + + +# log buffer for capturing log messages +log_buffer = SizedLogBuffer() + + def serialize_log(record): subset = { "timestamp": record["time"].timestamp(), @@ -85,6 +181,9 @@ def configure( except Exception as exc: logger.error(f"Error setting up log file: {exc}") + if log_buffer.enabled(): + logger.add(sink=log_buffer.write, format="{time} {level} {message}", serialize=True) + logger.debug(f"Logger set up with log level: {log_level}") setup_uvicorn_logger() diff --git a/tests/unit/test_logger.py b/tests/unit/test_logger.py new file mode 100644 index 000000000..ec2d022cc --- /dev/null +++ b/tests/unit/test_logger.py @@ -0,0 +1,102 @@ +import pytest +import os +import json +from collections import OrderedDict +from unittest.mock import patch +from langflow.utils.logger import SizedLogBuffer # Replace 'your_module' with the actual module name + + +@pytest.fixture +def sized_log_buffer(): + return SizedLogBuffer() + + +def test_init_default(): + buffer = SizedLogBuffer() + assert buffer.max == 0 + assert buffer._max_readers == 20 + assert isinstance(buffer.buffer, OrderedDict) + + +def test_init_with_env_variable(): + with patch.dict(os.environ, {"LANGFLOW_LOG_RETRIEVER_BUFFER_SIZE": "100"}): + buffer = SizedLogBuffer() + assert buffer.max == 100 + + +def test_write(sized_log_buffer): + message = json.dumps({"text": "Test log", "record": {"time": {"timestamp": 1625097600}}}) + sized_log_buffer.max = 1 # Set max size to 1 for testing + sized_log_buffer.write(message) + assert len(sized_log_buffer.buffer) == 1 + assert 1625097600 in sized_log_buffer.buffer + assert sized_log_buffer.buffer[1625097600] == "Test log" + + +def test_write_overflow(sized_log_buffer): + sized_log_buffer.max = 2 + messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(3)] + for message in messages: + sized_log_buffer.write(message) + + assert len(sized_log_buffer.buffer) == 2 + assert 1625097601 in sized_log_buffer.buffer + assert 1625097602 in sized_log_buffer.buffer + + +def test_len(sized_log_buffer): + sized_log_buffer.max = 3 + messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(3)] + for message in messages: + sized_log_buffer.write(message) + + assert len(sized_log_buffer) == 3 + + +def test_get_after_timestamp(sized_log_buffer): + sized_log_buffer.max = 5 + messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(5)] + for message in messages: + sized_log_buffer.write(message) + + result = sized_log_buffer.get_after_timestamp(1625097602, lines=2) + assert len(result) == 2 + assert 1625097603 in result + assert 1625097602 in result + + +def test_get_before_timestamp(sized_log_buffer): + sized_log_buffer.max = 5 + messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(5)] + for message in messages: + sized_log_buffer.write(message) + + result = sized_log_buffer.get_before_timestamp(1625097603, lines=2) + assert len(result) == 2 + assert 1625097601 in result + assert 1625097602 in result + + +def test_get_last_n(sized_log_buffer): + sized_log_buffer.max = 5 + messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(5)] + for message in messages: + sized_log_buffer.write(message) + + result = sized_log_buffer.get_last_n(3) + assert len(result) == 3 + assert 1625097602 in result + assert 1625097603 in result + assert 1625097604 in result + + +def test_enabled(sized_log_buffer): + assert not sized_log_buffer.enabled() + sized_log_buffer.max = 1 + assert sized_log_buffer.enabled() + + +def test_max_size(sized_log_buffer): + assert sized_log_buffer.max_size() == 0 + sized_log_buffer.max = 100 + assert sized_log_buffer.max_size() == 100