feat: log retriever endpoint (#2601)

* log retriever endpoint

* disabled by default

* realtime log stream via http/2 SSE

* read and write lock

* unit test
This commit is contained in:
ming 2024-07-11 05:19:21 -04:00 committed by GitHub
commit 81849d5f8b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 291 additions and 2 deletions

View file

@ -1,4 +1,5 @@
from langflow.api.router import router
from langflow.api.health_check_router import health_check_router
from langflow.api.log_router import log_router
__all__ = ["router", "health_check_router"]
__all__ = ["router", "health_check_router", "log_router"]

View file

@ -0,0 +1,86 @@
import asyncio
import json
from fastapi import APIRouter, Query, HTTPException, Request
from fastapi.responses import JSONResponse, StreamingResponse
from http import HTTPStatus
from langflow.utils.logger import log_buffer
log_router = APIRouter(tags=["Log"])
async def event_generator(request: Request):
# latest_timestamp = time.time()
global log_buffer
last_line = log_buffer.get_last_n(1)
latest_timestamp, _ = last_line.popitem()
while True:
if await request.is_disconnected():
break
new_logs = log_buffer.get_after_timestamp(timestamp=latest_timestamp, lines=100)
if new_logs:
temp_ts = 0.0
for ts, msg in new_logs.items():
if ts > latest_timestamp:
yield f"{json.dumps({ts:msg})}\n"
temp_ts = ts
# for the next query iteration
latest_timestamp = temp_ts
else:
yield ": keepalive\n\n"
await asyncio.sleep(1)
@log_router.get("/logs-stream")
async def stream_logs(
request: Request,
):
"""
HTTP/2 Server-Sent-Event (SSE) endpoint for streaming logs
it establishes a long-lived connection to the server and receives log messages in real-time
the client should use the head "Accept: text/event-stream"
"""
global log_buffer
if log_buffer.enabled() is False:
raise HTTPException(
status_code=HTTPStatus.NOT_IMPLEMENTED,
detail="Log retrieval is disabled",
)
return StreamingResponse(event_generator(request), media_type="text/event-stream")
@log_router.get("/logs")
async def logs(
lines_before: int = Query(1, ge=1, description="The number of logs before the timestamp or the last log"),
lines_after: int = Query(0, ge=1, description="The number of logs after the timestamp"),
timestamp: float = Query(0, description="The timestamp to start streaming logs from"),
):
global log_buffer
if log_buffer.enabled() is False:
raise HTTPException(
status_code=HTTPStatus.NOT_IMPLEMENTED,
detail="Log retrieval is disabled",
)
logs = dict()
if lines_after > 0 and timestamp == 0:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail="Timestamp is required when requesting logs after the timestamp",
)
if lines_after > 0 and timestamp > 0:
logs = log_buffer.get_after_timestamp(timestamp=timestamp, lines=lines_after)
return JSONResponse(content=logs)
if timestamp == 0:
if lines_before > 0:
logs = log_buffer.get_last_n(lines_before)
else:
if lines_before > 0:
logs = log_buffer.get_before_timestamp(timestamp=timestamp, lines=lines_before)
return JSONResponse(content=logs)

View file

@ -18,7 +18,7 @@ from rich import print as rprint
from starlette.middleware.base import BaseHTTPMiddleware
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from langflow.api import router, health_check_router
from langflow.api import router, health_check_router, log_router
from langflow.initial_setup.setup import (
create_or_update_starter_projects,
initialize_super_user_if_needed,
@ -158,6 +158,7 @@ def create_app():
app.include_router(router)
app.include_router(health_check_router)
app.include_router(log_router)
@app.exception_handler(Exception)
async def exception_handler(request: Request, exc: Exception):

View file

@ -1,7 +1,11 @@
import json
import logging
import os
import sys
from pathlib import Path
from collections import OrderedDict
from itertools import islice
from threading import Lock, Semaphore
from typing import Optional
import orjson
@ -12,6 +16,98 @@ from rich.logging import RichHandler
VALID_LOG_LEVELS = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
class SizedLogBuffer:
def __init__(
self,
max_readers: int = 20, # max number of concurrent readers for the buffer
):
"""
a buffer for storing log messages for the log retrieval API
the buffer can be overwritten by an env variable LANGFLOW_LOG_RETRIEVER_BUFFER_SIZE
because the logger is initialized before the settings_service are loaded
"""
self.max: int = 0
env_buffer_size = os.getenv("LANGFLOW_LOG_RETRIEVER_BUFFER_SIZE", "0")
if env_buffer_size.isdigit():
self.max = int(env_buffer_size)
self.buffer: OrderedDict[float, str] = OrderedDict()
self._max_readers = max_readers
self._wlock = Lock()
self._rsemaphore = Semaphore(max_readers)
def write(self, message: str):
record = json.loads(message)
log_entry = record["text"]
epoch = record["record"]["time"]["timestamp"]
# wait until all reader semaphore are released
while self._rsemaphore._value != self._max_readers:
continue
with self._wlock:
if len(self.buffer) >= self.max:
# remove the oldest log entry if the buffer is full
self.buffer.popitem(last=False)
self.buffer[epoch] = log_entry
def __len__(self):
return len(self.buffer)
def get_after_timestamp(self, timestamp: float, lines: int = 5) -> dict[float, str]:
rc = dict()
# wait until no write
while self._wlock.locked():
continue
self._rsemaphore.acquire()
for ts, msg in self.buffer.items():
if lines == 0:
break
if ts >= timestamp and lines > 0:
rc[ts] = msg
lines -= 1
self._rsemaphore.release()
return rc
def get_before_timestamp(self, timestamp: float, lines: int = 5) -> dict[float, str]:
rc = dict()
# wait until no write
while self._wlock.locked():
continue
self._rsemaphore.acquire()
for ts, msg in reversed(self.buffer.items()):
if lines == 0:
break
if ts < timestamp and lines > 0:
rc[ts] = msg
lines -= 1
self._rsemaphore.release()
return rc
def get_last_n(self, last_idx: int) -> dict[float, str]:
# wait until no write
while self._wlock.locked():
continue
self._rsemaphore.acquire()
rc = dict(islice(reversed(self.buffer.items()), last_idx))
self._rsemaphore.release()
return rc
def enabled(self) -> bool:
return self.max > 0
def max_size(self) -> int:
return self.max
# log buffer for capturing log messages
log_buffer = SizedLogBuffer()
def serialize_log(record):
subset = {
"timestamp": record["time"].timestamp(),
@ -85,6 +181,9 @@ def configure(
except Exception as exc:
logger.error(f"Error setting up log file: {exc}")
if log_buffer.enabled():
logger.add(sink=log_buffer.write, format="{time} {level} {message}", serialize=True)
logger.debug(f"Logger set up with log level: {log_level}")
setup_uvicorn_logger()

102
tests/unit/test_logger.py Normal file
View file

@ -0,0 +1,102 @@
import pytest
import os
import json
from collections import OrderedDict
from unittest.mock import patch
from langflow.utils.logger import SizedLogBuffer # Replace 'your_module' with the actual module name
@pytest.fixture
def sized_log_buffer():
return SizedLogBuffer()
def test_init_default():
buffer = SizedLogBuffer()
assert buffer.max == 0
assert buffer._max_readers == 20
assert isinstance(buffer.buffer, OrderedDict)
def test_init_with_env_variable():
with patch.dict(os.environ, {"LANGFLOW_LOG_RETRIEVER_BUFFER_SIZE": "100"}):
buffer = SizedLogBuffer()
assert buffer.max == 100
def test_write(sized_log_buffer):
message = json.dumps({"text": "Test log", "record": {"time": {"timestamp": 1625097600}}})
sized_log_buffer.max = 1 # Set max size to 1 for testing
sized_log_buffer.write(message)
assert len(sized_log_buffer.buffer) == 1
assert 1625097600 in sized_log_buffer.buffer
assert sized_log_buffer.buffer[1625097600] == "Test log"
def test_write_overflow(sized_log_buffer):
sized_log_buffer.max = 2
messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(3)]
for message in messages:
sized_log_buffer.write(message)
assert len(sized_log_buffer.buffer) == 2
assert 1625097601 in sized_log_buffer.buffer
assert 1625097602 in sized_log_buffer.buffer
def test_len(sized_log_buffer):
sized_log_buffer.max = 3
messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(3)]
for message in messages:
sized_log_buffer.write(message)
assert len(sized_log_buffer) == 3
def test_get_after_timestamp(sized_log_buffer):
sized_log_buffer.max = 5
messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(5)]
for message in messages:
sized_log_buffer.write(message)
result = sized_log_buffer.get_after_timestamp(1625097602, lines=2)
assert len(result) == 2
assert 1625097603 in result
assert 1625097602 in result
def test_get_before_timestamp(sized_log_buffer):
sized_log_buffer.max = 5
messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(5)]
for message in messages:
sized_log_buffer.write(message)
result = sized_log_buffer.get_before_timestamp(1625097603, lines=2)
assert len(result) == 2
assert 1625097601 in result
assert 1625097602 in result
def test_get_last_n(sized_log_buffer):
sized_log_buffer.max = 5
messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(5)]
for message in messages:
sized_log_buffer.write(message)
result = sized_log_buffer.get_last_n(3)
assert len(result) == 3
assert 1625097602 in result
assert 1625097603 in result
assert 1625097604 in result
def test_enabled(sized_log_buffer):
assert not sized_log_buffer.enabled()
sized_log_buffer.max = 1
assert sized_log_buffer.enabled()
def test_max_size(sized_log_buffer):
assert sized_log_buffer.max_size() == 0
sized_log_buffer.max = 100
assert sized_log_buffer.max_size() == 100