mnemosyne/tests/test_gateway_fidelity.py
Joey Yakimowich-Payne d26c56c2f0 feat: add multi-fidelity compression engine
5-level fidelity manager (L0-Full to L4-Evicted) with helper LLM
(Haiku 4.5) for intelligent summarization during degradation.

Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
2026-03-13 11:40:56 -06:00

513 lines
18 KiB
Python

"""Integration tests for FidelityManager integration in the gateway.
Tests the Phase 2.4 fidelity pipeline: object registration, pressure
calculation, degradation, and content replacement in ephemeral payloads.
Does NOT test HelperLLM integration (requires mocking).
"""
from __future__ import annotations
import copy
from pathlib import Path
from tempfile import TemporaryDirectory
import pytest
from mnemosyne.fidelity import FidelityLevel, FidelityManager, PressureZone, make_object
from mnemosyne.gateway import (
Session,
_apply_fidelity,
_auto_stub,
_block_text,
_content_key,
)
# ── Fixtures ─────────────────────────────────────────────────────────────
@pytest.fixture
def tmp_log_dir():
with TemporaryDirectory() as d:
yield Path(d)
@pytest.fixture
def session(tmp_log_dir):
return Session("test01", tmp_log_dir)
def _make_large_text(size: int = 600) -> str:
"""Generate a text string of approximately `size` bytes."""
return "x" * size
def _tool_result_block(tool_use_id: str, content: str) -> dict:
return {
"type": "tool_result",
"tool_use_id": tool_use_id,
"content": content,
}
def _text_block(text: str) -> dict:
return {"type": "text", "text": text}
def _msg(role: str, blocks: list[dict]) -> dict:
return {"role": role, "content": blocks}
# ── Test: FidelityManager created per session ────────────────────────────
class TestSessionFidelityManager:
def test_session_has_fidelity_manager(self, session):
assert hasattr(session, "fidelity_manager")
assert isinstance(session.fidelity_manager, FidelityManager)
def test_fidelity_manager_default_window(self, session):
assert session.fidelity_manager.window_size == 200_000
def test_fidelity_manager_per_session(self, tmp_log_dir):
s1 = Session("sess_a", tmp_log_dir)
s2 = Session("sess_b", tmp_log_dir)
assert s1.fidelity_manager is not s2.fidelity_manager
def test_session_has_content_map(self, session):
assert hasattr(session, "_fidelity_content_map")
assert isinstance(session._fidelity_content_map, dict)
assert len(session._fidelity_content_map) == 0
# ── Test: Content key derivation ─────────────────────────────────────────
class TestContentKey:
def test_tool_result_key(self):
block = _tool_result_block("toolu_abc123", "some content")
key = _content_key(block, {})
assert key == "tool:toolu_abc123"
def test_large_text_key(self):
text = _make_large_text(600)
block = _text_block(text)
key = _content_key(block, {})
assert key is not None
assert key.startswith("text:")
def test_small_text_returns_none(self):
block = _text_block("short")
key = _content_key(block, {})
assert key is None
def test_stable_key_for_same_content(self):
text = _make_large_text(600)
block1 = _text_block(text)
block2 = _text_block(text)
assert _content_key(block1, {}) == _content_key(block2, {})
def test_different_key_for_different_content(self):
block1 = _text_block("a" * 600)
block2 = _text_block("b" * 600)
assert _content_key(block1, {}) != _content_key(block2, {})
# ── Test: Block text extraction ──────────────────────────────────────────
class TestBlockText:
def test_text_block(self):
assert _block_text(_text_block("hello")) == "hello"
def test_tool_result_string_content(self):
block = _tool_result_block("id1", "result text")
assert _block_text(block) == "result text"
def test_tool_result_list_content(self):
block = {
"type": "tool_result",
"tool_use_id": "id2",
"content": [
{"type": "text", "text": "line 1"},
{"type": "text", "text": "line 2"},
],
}
assert _block_text(block) == "line 1\nline 2"
# ── Test: Auto stub generation ───────────────────────────────────────────
class TestAutoStub:
def test_short_content(self):
stub = _auto_stub("Hello world")
assert stub == "[evicted content: Hello world]"
def test_multiline_uses_first_line(self):
stub = _auto_stub("First line\nSecond line\nThird line")
assert "First line" in stub
assert "Second line" not in stub
def test_long_first_line_truncated(self):
long_line = "x" * 200
stub = _auto_stub(long_line)
assert len(stub) < 200
assert "..." in stub
# ── Test: Object registration via _apply_fidelity ───────────────────────
class TestApplyFidelityRegistration:
def test_registers_tool_result(self, session):
session.token_state["turn"] = 1
payload = {
"messages": [
_msg("user", [_tool_result_block("tool_1", _make_large_text(600))]),
]
}
_apply_fidelity(payload, session)
fm = session.fidelity_manager
assert fm.total_tokens() > 0
assert len(session._fidelity_content_map) == 1
assert "tool:tool_1" in session._fidelity_content_map
def test_registers_large_text_block(self, session):
session.token_state["turn"] = 1
large_text = _make_large_text(800)
payload = {
"messages": [
_msg("assistant", [_text_block(large_text)]),
]
}
_apply_fidelity(payload, session)
fm = session.fidelity_manager
assert fm.total_tokens() > 0
assert len(session._fidelity_content_map) == 1
def test_ignores_small_text_block(self, session):
session.token_state["turn"] = 1
payload = {
"messages": [
_msg("user", [_text_block("small content")]),
]
}
_apply_fidelity(payload, session)
assert len(session._fidelity_content_map) == 0
def test_multiple_blocks_registered(self, session):
session.token_state["turn"] = 1
payload = {
"messages": [
_msg(
"user",
[
_tool_result_block("tool_a", _make_large_text(600)),
_tool_result_block("tool_b", _make_large_text(700)),
_text_block(_make_large_text(800)),
],
),
]
}
_apply_fidelity(payload, session)
assert len(session._fidelity_content_map) == 3
def test_idempotent_on_second_call(self, session):
session.token_state["turn"] = 1
payload = {
"messages": [
_msg("user", [_tool_result_block("tool_1", _make_large_text(600))]),
]
}
_apply_fidelity(payload, session)
count_after_first = len(session._fidelity_content_map)
# Second call with same content — should not re-register
payload2 = {
"messages": [
_msg("user", [_tool_result_block("tool_1", _make_large_text(600))]),
]
}
session.token_state["turn"] = 2
_apply_fidelity(payload2, session)
assert len(session._fidelity_content_map) == count_after_first
def test_new_objects_start_at_l0(self, session):
session.token_state["turn"] = 1
payload = {
"messages": [
_msg("user", [_tool_result_block("tool_1", _make_large_text(600))]),
]
}
_apply_fidelity(payload, session)
obj_id = session._fidelity_content_map["tool:tool_1"]
obj = session.fidelity_manager.get_object(obj_id)
assert obj is not None
assert obj.current_fidelity == FidelityLevel.L0
# ── Test: Pressure calculation with real payloads ────────────────────────
class TestPressureCalculation:
def test_normal_pressure_small_payload(self, session):
session.token_state["turn"] = 1
payload = {
"messages": [
_msg("user", [_tool_result_block("t1", _make_large_text(600))]),
]
}
_apply_fidelity(payload, session)
# 600 bytes ≈ 150 tokens, window=200k → well under 50%
assert session.fidelity_manager.current_pressure() == PressureZone.NORMAL
def test_high_pressure_large_payload(self, session):
# Fill the fidelity manager with enough objects to exceed caution threshold
fm = session.fidelity_manager
# 200k window, caution at 50% = 100k tokens
# Each object: ~25k tokens (100k chars / 4)
for i in range(5):
obj = make_object(
object_type="tool_result",
content_full="x" * 100_000,
created_at_turn=1,
stub=f"[stub {i}]",
)
fm.register_object(obj)
# 5 * 25k = 125k tokens > 100k caution threshold
assert fm.current_pressure() >= PressureZone.CAUTION
def test_pressure_zones_ordered(self, session):
fm = session.fidelity_manager
# Verify zone thresholds are ordered correctly
assert fm.threshold_caution < fm.threshold_warning
assert fm.threshold_warning < fm.threshold_critical
assert fm.threshold_critical < fm.threshold_emergency
# ── Test: Degradation replaces content in ephemeral payload ──────────────
class TestDegradationReplacement:
def test_degraded_tool_result_replaced_with_stub(self, session):
session.token_state["turn"] = 1
original_content = _make_large_text(600)
tool_id = "tool_degrade"
# Register the object
payload1 = {
"messages": [
_msg("user", [_tool_result_block(tool_id, original_content)]),
]
}
_apply_fidelity(payload1, session)
# Manually degrade the object to L3 (stub)
obj_id = session._fidelity_content_map[f"tool:{tool_id}"]
obj = session.fidelity_manager.get_object(obj_id)
obj.current_fidelity = FidelityLevel.L3
# Apply fidelity again — should replace content with stub
session.token_state["turn"] = 2
payload2 = {
"messages": [
_msg("user", [_tool_result_block(tool_id, original_content)]),
]
}
_apply_fidelity(payload2, session)
replaced_content = payload2["messages"][0]["content"][0]["content"]
assert replaced_content != original_content
assert "[evicted content:" in replaced_content
def test_degraded_text_block_replaced_with_stub(self, session):
session.token_state["turn"] = 1
original_text = "Important data: " + _make_large_text(600)
payload1 = {
"messages": [
_msg("assistant", [_text_block(original_text)]),
]
}
_apply_fidelity(payload1, session)
# Find the content key and degrade
assert len(session._fidelity_content_map) == 1
obj_id = list(session._fidelity_content_map.values())[0]
obj = session.fidelity_manager.get_object(obj_id)
obj.current_fidelity = FidelityLevel.L3
# Apply again
session.token_state["turn"] = 2
payload2 = {
"messages": [
_msg("assistant", [_text_block(original_text)]),
]
}
_apply_fidelity(payload2, session)
replaced_text = payload2["messages"][0]["content"][0]["text"]
assert replaced_text != original_text
assert "[evicted content:" in replaced_text
def test_l0_content_not_replaced(self, session):
session.token_state["turn"] = 1
original_content = _make_large_text(600)
tool_id = "tool_keep"
payload = {
"messages": [
_msg("user", [_tool_result_block(tool_id, original_content)]),
]
}
_apply_fidelity(payload, session)
# Object stays at L0 — content should be unchanged
result_content = payload["messages"][0]["content"][0]["content"]
assert result_content == original_content
def test_l4_evicted_also_replaced(self, session):
session.token_state["turn"] = 1
original_content = _make_large_text(600)
tool_id = "tool_evict"
payload1 = {
"messages": [
_msg("user", [_tool_result_block(tool_id, original_content)]),
]
}
_apply_fidelity(payload1, session)
# Degrade to L4 (evicted)
obj_id = session._fidelity_content_map[f"tool:{tool_id}"]
obj = session.fidelity_manager.get_object(obj_id)
obj.current_fidelity = FidelityLevel.L4
# Apply again — L4 is >= L3, so should still replace
session.token_state["turn"] = 2
payload2 = {
"messages": [
_msg("user", [_tool_result_block(tool_id, original_content)]),
]
}
_apply_fidelity(payload2, session)
replaced_content = payload2["messages"][0]["content"][0]["content"]
assert "[evicted content:" in replaced_content
# ── Test: Degradation triggered by pressure ──────────────────────────────
class TestPressureDegradation:
def test_degrade_under_pressure(self, session):
"""When pressure exceeds NORMAL, _apply_fidelity triggers degradation."""
fm = session.fidelity_manager
# Use a tiny window to force pressure
fm.window_size = 1000 # 1000 tokens
session.token_state["turn"] = 1
# Register objects that exceed the window
# 2000 bytes ≈ 500 tokens per object, 3 objects = 1500 tokens > 1000
payload = {
"messages": [
_msg(
"user",
[
_tool_result_block("t1", _make_large_text(2000)),
_tool_result_block("t2", _make_large_text(2000)),
_tool_result_block("t3", _make_large_text(2000)),
],
),
]
}
_apply_fidelity(payload, session)
# After apply, some objects should have been degraded
degraded = [obj for obj in fm._objects.values() if obj.current_fidelity > FidelityLevel.L0]
# At least some degradation should have occurred
assert len(degraded) > 0 or fm.current_pressure() == PressureZone.NORMAL
def test_mark_accessed_updates_turn(self, session):
"""Objects seen again get their last_accessed_turn updated."""
session.token_state["turn"] = 1
payload = {
"messages": [
_msg("user", [_tool_result_block("t1", _make_large_text(600))]),
]
}
_apply_fidelity(payload, session)
obj_id = session._fidelity_content_map["tool:t1"]
obj = session.fidelity_manager.get_object(obj_id)
assert obj.last_accessed_turn == 1
# See it again at turn 5
session.token_state["turn"] = 5
payload2 = {
"messages": [
_msg("user", [_tool_result_block("t1", _make_large_text(600))]),
]
}
_apply_fidelity(payload2, session)
assert obj.last_accessed_turn == 5
# ── Test: Mixed content payloads ─────────────────────────────────────────
class TestMixedPayloads:
def test_mixed_small_and_large_blocks(self, session):
"""Only large blocks get tracked; small ones pass through."""
session.token_state["turn"] = 1
payload = {
"messages": [
_msg(
"user",
[
_text_block("small question"),
_tool_result_block("t1", _make_large_text(600)),
_text_block("another small bit"),
],
),
]
}
_apply_fidelity(payload, session)
# Only the tool_result should be tracked
assert len(session._fidelity_content_map) == 1
assert "tool:t1" in session._fidelity_content_map
def test_string_content_messages_ignored(self, session):
"""Messages with string content (not list) are skipped."""
session.token_state["turn"] = 1
payload = {
"messages": [
{"role": "user", "content": "Just a plain string message"},
]
}
_apply_fidelity(payload, session)
assert len(session._fidelity_content_map) == 0
def test_multiple_messages_all_tracked(self, session):
"""Objects across multiple messages are all registered."""
session.token_state["turn"] = 1
payload = {
"messages": [
_msg("user", [_tool_result_block("t1", _make_large_text(600))]),
_msg("assistant", [_text_block(_make_large_text(800))]),
_msg("user", [_tool_result_block("t2", _make_large_text(700))]),
]
}
_apply_fidelity(payload, session)
# t1, large text, t2 = 3 tracked objects
assert len(session._fidelity_content_map) == 3