5-level fidelity manager (L0-Full to L4-Evicted) with helper LLM (Haiku 4.5) for intelligent summarization during degradation. Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
513 lines
18 KiB
Python
513 lines
18 KiB
Python
"""Integration tests for FidelityManager integration in the gateway.
|
|
|
|
Tests the Phase 2.4 fidelity pipeline: object registration, pressure
|
|
calculation, degradation, and content replacement in ephemeral payloads.
|
|
Does NOT test HelperLLM integration (requires mocking).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import copy
|
|
from pathlib import Path
|
|
from tempfile import TemporaryDirectory
|
|
|
|
import pytest
|
|
|
|
from mnemosyne.fidelity import FidelityLevel, FidelityManager, PressureZone, make_object
|
|
from mnemosyne.gateway import (
|
|
Session,
|
|
_apply_fidelity,
|
|
_auto_stub,
|
|
_block_text,
|
|
_content_key,
|
|
)
|
|
|
|
|
|
# ── Fixtures ─────────────────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.fixture
|
|
def tmp_log_dir():
|
|
with TemporaryDirectory() as d:
|
|
yield Path(d)
|
|
|
|
|
|
@pytest.fixture
|
|
def session(tmp_log_dir):
|
|
return Session("test01", tmp_log_dir)
|
|
|
|
|
|
def _make_large_text(size: int = 600) -> str:
|
|
"""Generate a text string of approximately `size` bytes."""
|
|
return "x" * size
|
|
|
|
|
|
def _tool_result_block(tool_use_id: str, content: str) -> dict:
|
|
return {
|
|
"type": "tool_result",
|
|
"tool_use_id": tool_use_id,
|
|
"content": content,
|
|
}
|
|
|
|
|
|
def _text_block(text: str) -> dict:
|
|
return {"type": "text", "text": text}
|
|
|
|
|
|
def _msg(role: str, blocks: list[dict]) -> dict:
|
|
return {"role": role, "content": blocks}
|
|
|
|
|
|
# ── Test: FidelityManager created per session ────────────────────────────
|
|
|
|
|
|
class TestSessionFidelityManager:
|
|
def test_session_has_fidelity_manager(self, session):
|
|
assert hasattr(session, "fidelity_manager")
|
|
assert isinstance(session.fidelity_manager, FidelityManager)
|
|
|
|
def test_fidelity_manager_default_window(self, session):
|
|
assert session.fidelity_manager.window_size == 200_000
|
|
|
|
def test_fidelity_manager_per_session(self, tmp_log_dir):
|
|
s1 = Session("sess_a", tmp_log_dir)
|
|
s2 = Session("sess_b", tmp_log_dir)
|
|
assert s1.fidelity_manager is not s2.fidelity_manager
|
|
|
|
def test_session_has_content_map(self, session):
|
|
assert hasattr(session, "_fidelity_content_map")
|
|
assert isinstance(session._fidelity_content_map, dict)
|
|
assert len(session._fidelity_content_map) == 0
|
|
|
|
|
|
# ── Test: Content key derivation ─────────────────────────────────────────
|
|
|
|
|
|
class TestContentKey:
|
|
def test_tool_result_key(self):
|
|
block = _tool_result_block("toolu_abc123", "some content")
|
|
key = _content_key(block, {})
|
|
assert key == "tool:toolu_abc123"
|
|
|
|
def test_large_text_key(self):
|
|
text = _make_large_text(600)
|
|
block = _text_block(text)
|
|
key = _content_key(block, {})
|
|
assert key is not None
|
|
assert key.startswith("text:")
|
|
|
|
def test_small_text_returns_none(self):
|
|
block = _text_block("short")
|
|
key = _content_key(block, {})
|
|
assert key is None
|
|
|
|
def test_stable_key_for_same_content(self):
|
|
text = _make_large_text(600)
|
|
block1 = _text_block(text)
|
|
block2 = _text_block(text)
|
|
assert _content_key(block1, {}) == _content_key(block2, {})
|
|
|
|
def test_different_key_for_different_content(self):
|
|
block1 = _text_block("a" * 600)
|
|
block2 = _text_block("b" * 600)
|
|
assert _content_key(block1, {}) != _content_key(block2, {})
|
|
|
|
|
|
# ── Test: Block text extraction ──────────────────────────────────────────
|
|
|
|
|
|
class TestBlockText:
|
|
def test_text_block(self):
|
|
assert _block_text(_text_block("hello")) == "hello"
|
|
|
|
def test_tool_result_string_content(self):
|
|
block = _tool_result_block("id1", "result text")
|
|
assert _block_text(block) == "result text"
|
|
|
|
def test_tool_result_list_content(self):
|
|
block = {
|
|
"type": "tool_result",
|
|
"tool_use_id": "id2",
|
|
"content": [
|
|
{"type": "text", "text": "line 1"},
|
|
{"type": "text", "text": "line 2"},
|
|
],
|
|
}
|
|
assert _block_text(block) == "line 1\nline 2"
|
|
|
|
|
|
# ── Test: Auto stub generation ───────────────────────────────────────────
|
|
|
|
|
|
class TestAutoStub:
|
|
def test_short_content(self):
|
|
stub = _auto_stub("Hello world")
|
|
assert stub == "[evicted content: Hello world]"
|
|
|
|
def test_multiline_uses_first_line(self):
|
|
stub = _auto_stub("First line\nSecond line\nThird line")
|
|
assert "First line" in stub
|
|
assert "Second line" not in stub
|
|
|
|
def test_long_first_line_truncated(self):
|
|
long_line = "x" * 200
|
|
stub = _auto_stub(long_line)
|
|
assert len(stub) < 200
|
|
assert "..." in stub
|
|
|
|
|
|
# ── Test: Object registration via _apply_fidelity ───────────────────────
|
|
|
|
|
|
class TestApplyFidelityRegistration:
|
|
def test_registers_tool_result(self, session):
|
|
session.token_state["turn"] = 1
|
|
payload = {
|
|
"messages": [
|
|
_msg("user", [_tool_result_block("tool_1", _make_large_text(600))]),
|
|
]
|
|
}
|
|
_apply_fidelity(payload, session)
|
|
|
|
fm = session.fidelity_manager
|
|
assert fm.total_tokens() > 0
|
|
assert len(session._fidelity_content_map) == 1
|
|
assert "tool:tool_1" in session._fidelity_content_map
|
|
|
|
def test_registers_large_text_block(self, session):
|
|
session.token_state["turn"] = 1
|
|
large_text = _make_large_text(800)
|
|
payload = {
|
|
"messages": [
|
|
_msg("assistant", [_text_block(large_text)]),
|
|
]
|
|
}
|
|
_apply_fidelity(payload, session)
|
|
|
|
fm = session.fidelity_manager
|
|
assert fm.total_tokens() > 0
|
|
assert len(session._fidelity_content_map) == 1
|
|
|
|
def test_ignores_small_text_block(self, session):
|
|
session.token_state["turn"] = 1
|
|
payload = {
|
|
"messages": [
|
|
_msg("user", [_text_block("small content")]),
|
|
]
|
|
}
|
|
_apply_fidelity(payload, session)
|
|
|
|
assert len(session._fidelity_content_map) == 0
|
|
|
|
def test_multiple_blocks_registered(self, session):
|
|
session.token_state["turn"] = 1
|
|
payload = {
|
|
"messages": [
|
|
_msg(
|
|
"user",
|
|
[
|
|
_tool_result_block("tool_a", _make_large_text(600)),
|
|
_tool_result_block("tool_b", _make_large_text(700)),
|
|
_text_block(_make_large_text(800)),
|
|
],
|
|
),
|
|
]
|
|
}
|
|
_apply_fidelity(payload, session)
|
|
|
|
assert len(session._fidelity_content_map) == 3
|
|
|
|
def test_idempotent_on_second_call(self, session):
|
|
session.token_state["turn"] = 1
|
|
payload = {
|
|
"messages": [
|
|
_msg("user", [_tool_result_block("tool_1", _make_large_text(600))]),
|
|
]
|
|
}
|
|
_apply_fidelity(payload, session)
|
|
count_after_first = len(session._fidelity_content_map)
|
|
|
|
# Second call with same content — should not re-register
|
|
payload2 = {
|
|
"messages": [
|
|
_msg("user", [_tool_result_block("tool_1", _make_large_text(600))]),
|
|
]
|
|
}
|
|
session.token_state["turn"] = 2
|
|
_apply_fidelity(payload2, session)
|
|
assert len(session._fidelity_content_map) == count_after_first
|
|
|
|
def test_new_objects_start_at_l0(self, session):
|
|
session.token_state["turn"] = 1
|
|
payload = {
|
|
"messages": [
|
|
_msg("user", [_tool_result_block("tool_1", _make_large_text(600))]),
|
|
]
|
|
}
|
|
_apply_fidelity(payload, session)
|
|
|
|
obj_id = session._fidelity_content_map["tool:tool_1"]
|
|
obj = session.fidelity_manager.get_object(obj_id)
|
|
assert obj is not None
|
|
assert obj.current_fidelity == FidelityLevel.L0
|
|
|
|
|
|
# ── Test: Pressure calculation with real payloads ────────────────────────
|
|
|
|
|
|
class TestPressureCalculation:
|
|
def test_normal_pressure_small_payload(self, session):
|
|
session.token_state["turn"] = 1
|
|
payload = {
|
|
"messages": [
|
|
_msg("user", [_tool_result_block("t1", _make_large_text(600))]),
|
|
]
|
|
}
|
|
_apply_fidelity(payload, session)
|
|
|
|
# 600 bytes ≈ 150 tokens, window=200k → well under 50%
|
|
assert session.fidelity_manager.current_pressure() == PressureZone.NORMAL
|
|
|
|
def test_high_pressure_large_payload(self, session):
|
|
# Fill the fidelity manager with enough objects to exceed caution threshold
|
|
fm = session.fidelity_manager
|
|
# 200k window, caution at 50% = 100k tokens
|
|
# Each object: ~25k tokens (100k chars / 4)
|
|
for i in range(5):
|
|
obj = make_object(
|
|
object_type="tool_result",
|
|
content_full="x" * 100_000,
|
|
created_at_turn=1,
|
|
stub=f"[stub {i}]",
|
|
)
|
|
fm.register_object(obj)
|
|
|
|
# 5 * 25k = 125k tokens > 100k caution threshold
|
|
assert fm.current_pressure() >= PressureZone.CAUTION
|
|
|
|
def test_pressure_zones_ordered(self, session):
|
|
fm = session.fidelity_manager
|
|
# Verify zone thresholds are ordered correctly
|
|
assert fm.threshold_caution < fm.threshold_warning
|
|
assert fm.threshold_warning < fm.threshold_critical
|
|
assert fm.threshold_critical < fm.threshold_emergency
|
|
|
|
|
|
# ── Test: Degradation replaces content in ephemeral payload ──────────────
|
|
|
|
|
|
class TestDegradationReplacement:
|
|
def test_degraded_tool_result_replaced_with_stub(self, session):
|
|
session.token_state["turn"] = 1
|
|
original_content = _make_large_text(600)
|
|
tool_id = "tool_degrade"
|
|
|
|
# Register the object
|
|
payload1 = {
|
|
"messages": [
|
|
_msg("user", [_tool_result_block(tool_id, original_content)]),
|
|
]
|
|
}
|
|
_apply_fidelity(payload1, session)
|
|
|
|
# Manually degrade the object to L3 (stub)
|
|
obj_id = session._fidelity_content_map[f"tool:{tool_id}"]
|
|
obj = session.fidelity_manager.get_object(obj_id)
|
|
obj.current_fidelity = FidelityLevel.L3
|
|
|
|
# Apply fidelity again — should replace content with stub
|
|
session.token_state["turn"] = 2
|
|
payload2 = {
|
|
"messages": [
|
|
_msg("user", [_tool_result_block(tool_id, original_content)]),
|
|
]
|
|
}
|
|
_apply_fidelity(payload2, session)
|
|
|
|
replaced_content = payload2["messages"][0]["content"][0]["content"]
|
|
assert replaced_content != original_content
|
|
assert "[evicted content:" in replaced_content
|
|
|
|
def test_degraded_text_block_replaced_with_stub(self, session):
|
|
session.token_state["turn"] = 1
|
|
original_text = "Important data: " + _make_large_text(600)
|
|
|
|
payload1 = {
|
|
"messages": [
|
|
_msg("assistant", [_text_block(original_text)]),
|
|
]
|
|
}
|
|
_apply_fidelity(payload1, session)
|
|
|
|
# Find the content key and degrade
|
|
assert len(session._fidelity_content_map) == 1
|
|
obj_id = list(session._fidelity_content_map.values())[0]
|
|
obj = session.fidelity_manager.get_object(obj_id)
|
|
obj.current_fidelity = FidelityLevel.L3
|
|
|
|
# Apply again
|
|
session.token_state["turn"] = 2
|
|
payload2 = {
|
|
"messages": [
|
|
_msg("assistant", [_text_block(original_text)]),
|
|
]
|
|
}
|
|
_apply_fidelity(payload2, session)
|
|
|
|
replaced_text = payload2["messages"][0]["content"][0]["text"]
|
|
assert replaced_text != original_text
|
|
assert "[evicted content:" in replaced_text
|
|
|
|
def test_l0_content_not_replaced(self, session):
|
|
session.token_state["turn"] = 1
|
|
original_content = _make_large_text(600)
|
|
tool_id = "tool_keep"
|
|
|
|
payload = {
|
|
"messages": [
|
|
_msg("user", [_tool_result_block(tool_id, original_content)]),
|
|
]
|
|
}
|
|
_apply_fidelity(payload, session)
|
|
|
|
# Object stays at L0 — content should be unchanged
|
|
result_content = payload["messages"][0]["content"][0]["content"]
|
|
assert result_content == original_content
|
|
|
|
def test_l4_evicted_also_replaced(self, session):
|
|
session.token_state["turn"] = 1
|
|
original_content = _make_large_text(600)
|
|
tool_id = "tool_evict"
|
|
|
|
payload1 = {
|
|
"messages": [
|
|
_msg("user", [_tool_result_block(tool_id, original_content)]),
|
|
]
|
|
}
|
|
_apply_fidelity(payload1, session)
|
|
|
|
# Degrade to L4 (evicted)
|
|
obj_id = session._fidelity_content_map[f"tool:{tool_id}"]
|
|
obj = session.fidelity_manager.get_object(obj_id)
|
|
obj.current_fidelity = FidelityLevel.L4
|
|
|
|
# Apply again — L4 is >= L3, so should still replace
|
|
session.token_state["turn"] = 2
|
|
payload2 = {
|
|
"messages": [
|
|
_msg("user", [_tool_result_block(tool_id, original_content)]),
|
|
]
|
|
}
|
|
_apply_fidelity(payload2, session)
|
|
|
|
replaced_content = payload2["messages"][0]["content"][0]["content"]
|
|
assert "[evicted content:" in replaced_content
|
|
|
|
|
|
# ── Test: Degradation triggered by pressure ──────────────────────────────
|
|
|
|
|
|
class TestPressureDegradation:
|
|
def test_degrade_under_pressure(self, session):
|
|
"""When pressure exceeds NORMAL, _apply_fidelity triggers degradation."""
|
|
fm = session.fidelity_manager
|
|
# Use a tiny window to force pressure
|
|
fm.window_size = 1000 # 1000 tokens
|
|
|
|
session.token_state["turn"] = 1
|
|
# Register objects that exceed the window
|
|
# 2000 bytes ≈ 500 tokens per object, 3 objects = 1500 tokens > 1000
|
|
payload = {
|
|
"messages": [
|
|
_msg(
|
|
"user",
|
|
[
|
|
_tool_result_block("t1", _make_large_text(2000)),
|
|
_tool_result_block("t2", _make_large_text(2000)),
|
|
_tool_result_block("t3", _make_large_text(2000)),
|
|
],
|
|
),
|
|
]
|
|
}
|
|
_apply_fidelity(payload, session)
|
|
|
|
# After apply, some objects should have been degraded
|
|
degraded = [obj for obj in fm._objects.values() if obj.current_fidelity > FidelityLevel.L0]
|
|
# At least some degradation should have occurred
|
|
assert len(degraded) > 0 or fm.current_pressure() == PressureZone.NORMAL
|
|
|
|
def test_mark_accessed_updates_turn(self, session):
|
|
"""Objects seen again get their last_accessed_turn updated."""
|
|
session.token_state["turn"] = 1
|
|
payload = {
|
|
"messages": [
|
|
_msg("user", [_tool_result_block("t1", _make_large_text(600))]),
|
|
]
|
|
}
|
|
_apply_fidelity(payload, session)
|
|
|
|
obj_id = session._fidelity_content_map["tool:t1"]
|
|
obj = session.fidelity_manager.get_object(obj_id)
|
|
assert obj.last_accessed_turn == 1
|
|
|
|
# See it again at turn 5
|
|
session.token_state["turn"] = 5
|
|
payload2 = {
|
|
"messages": [
|
|
_msg("user", [_tool_result_block("t1", _make_large_text(600))]),
|
|
]
|
|
}
|
|
_apply_fidelity(payload2, session)
|
|
assert obj.last_accessed_turn == 5
|
|
|
|
|
|
# ── Test: Mixed content payloads ─────────────────────────────────────────
|
|
|
|
|
|
class TestMixedPayloads:
|
|
def test_mixed_small_and_large_blocks(self, session):
|
|
"""Only large blocks get tracked; small ones pass through."""
|
|
session.token_state["turn"] = 1
|
|
payload = {
|
|
"messages": [
|
|
_msg(
|
|
"user",
|
|
[
|
|
_text_block("small question"),
|
|
_tool_result_block("t1", _make_large_text(600)),
|
|
_text_block("another small bit"),
|
|
],
|
|
),
|
|
]
|
|
}
|
|
_apply_fidelity(payload, session)
|
|
|
|
# Only the tool_result should be tracked
|
|
assert len(session._fidelity_content_map) == 1
|
|
assert "tool:t1" in session._fidelity_content_map
|
|
|
|
def test_string_content_messages_ignored(self, session):
|
|
"""Messages with string content (not list) are skipped."""
|
|
session.token_state["turn"] = 1
|
|
payload = {
|
|
"messages": [
|
|
{"role": "user", "content": "Just a plain string message"},
|
|
]
|
|
}
|
|
_apply_fidelity(payload, session)
|
|
assert len(session._fidelity_content_map) == 0
|
|
|
|
def test_multiple_messages_all_tracked(self, session):
|
|
"""Objects across multiple messages are all registered."""
|
|
session.token_state["turn"] = 1
|
|
payload = {
|
|
"messages": [
|
|
_msg("user", [_tool_result_block("t1", _make_large_text(600))]),
|
|
_msg("assistant", [_text_block(_make_large_text(800))]),
|
|
_msg("user", [_tool_result_block("t2", _make_large_text(700))]),
|
|
]
|
|
}
|
|
_apply_fidelity(payload, session)
|
|
|
|
# t1, large text, t2 = 3 tracked objects
|
|
assert len(session._fidelity_content_map) == 3
|