101 lines
3.2 KiB
Python
101 lines
3.2 KiB
Python
import json
|
|
import os
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
from langflow.logging.logger import SizedLogBuffer
|
|
|
|
|
|
@pytest.fixture
|
|
def sized_log_buffer():
|
|
return SizedLogBuffer()
|
|
|
|
|
|
def test_init_default():
|
|
buffer = SizedLogBuffer()
|
|
assert buffer.max == 0
|
|
assert buffer._max_readers == 20
|
|
|
|
|
|
def test_init_with_env_variable():
|
|
with patch.dict(os.environ, {"LANGFLOW_LOG_RETRIEVER_BUFFER_SIZE": "100"}):
|
|
buffer = SizedLogBuffer()
|
|
assert buffer.max == 100
|
|
|
|
|
|
def test_write(sized_log_buffer):
|
|
message = json.dumps({"text": "Test log", "record": {"time": {"timestamp": 1625097600.1244334}}})
|
|
sized_log_buffer.max = 1 # Set max size to 1 for testing
|
|
sized_log_buffer.write(message)
|
|
assert len(sized_log_buffer.buffer) == 1
|
|
assert sized_log_buffer.buffer[0][0] == 1625097600124
|
|
assert sized_log_buffer.buffer[0][1] == "Test log"
|
|
|
|
|
|
def test_write_overflow(sized_log_buffer):
|
|
sized_log_buffer.max = 2
|
|
messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(3)]
|
|
for message in messages:
|
|
sized_log_buffer.write(message)
|
|
|
|
assert len(sized_log_buffer.buffer) == 2
|
|
assert sized_log_buffer.buffer[0][0] == 1625097601000
|
|
assert sized_log_buffer.buffer[1][0] == 1625097602000
|
|
|
|
|
|
def test_len(sized_log_buffer):
|
|
sized_log_buffer.max = 3
|
|
messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(3)]
|
|
for message in messages:
|
|
sized_log_buffer.write(message)
|
|
|
|
assert len(sized_log_buffer) == 3
|
|
|
|
|
|
def test_get_after_timestamp(sized_log_buffer):
|
|
sized_log_buffer.max = 5
|
|
messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(5)]
|
|
for message in messages:
|
|
sized_log_buffer.write(message)
|
|
|
|
result = sized_log_buffer.get_after_timestamp(1625097602000, lines=2)
|
|
assert len(result) == 2
|
|
assert 1625097603000 in result
|
|
assert 1625097602000 in result
|
|
|
|
|
|
def test_get_before_timestamp(sized_log_buffer):
|
|
sized_log_buffer.max = 5
|
|
messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(5)]
|
|
for message in messages:
|
|
sized_log_buffer.write(message)
|
|
|
|
result = sized_log_buffer.get_before_timestamp(1625097603000, lines=2)
|
|
assert len(result) == 2
|
|
assert 1625097601000 in result
|
|
assert 1625097602000 in result
|
|
|
|
|
|
def test_get_last_n(sized_log_buffer):
|
|
sized_log_buffer.max = 5
|
|
messages = [json.dumps({"text": f"Log {i}", "record": {"time": {"timestamp": 1625097600 + i}}}) for i in range(5)]
|
|
for message in messages:
|
|
sized_log_buffer.write(message)
|
|
|
|
result = sized_log_buffer.get_last_n(3)
|
|
assert len(result) == 3
|
|
assert 1625097602000 in result
|
|
assert 1625097603000 in result
|
|
assert 1625097604000 in result
|
|
|
|
|
|
def test_enabled(sized_log_buffer):
|
|
assert not sized_log_buffer.enabled()
|
|
sized_log_buffer.max = 1
|
|
assert sized_log_buffer.enabled()
|
|
|
|
|
|
def test_max_size(sized_log_buffer):
|
|
assert sized_log_buffer.max_size() == 0
|
|
sized_log_buffer.max = 100
|
|
assert sized_log_buffer.max_size() == 100
|