5-level fidelity manager (L0-Full to L4-Evicted) with helper LLM (Haiku 4.5) for intelligent summarization during degradation. Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
861 lines
30 KiB
Python
861 lines
30 KiB
Python
"""Tests for the multi-fidelity state machine."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from mnemosyne.fidelity import (
|
|
VALID_OBJECT_TYPES,
|
|
FidelityLevel,
|
|
FidelityManager,
|
|
PressureZone,
|
|
SemanticObject,
|
|
_estimate_tokens,
|
|
make_object,
|
|
)
|
|
|
|
# ── Helpers ──────────────────────────────────────────────────
|
|
|
|
|
|
def _make_obj(
|
|
content: str = "x" * 400,
|
|
*,
|
|
object_type: str = "file_context",
|
|
turn: int = 0,
|
|
summary_detailed: str | None = None,
|
|
summary_compact: str | None = None,
|
|
stub: str | None = None,
|
|
) -> SemanticObject:
|
|
"""Create a SemanticObject with sensible defaults for testing."""
|
|
return make_object(
|
|
object_type=object_type,
|
|
content_full=content,
|
|
created_at_turn=turn,
|
|
summary_detailed=summary_detailed or ("summary " * 20), # ~140 chars
|
|
summary_compact=summary_compact or ("compact " * 5), # ~40 chars
|
|
stub=stub or "file_context: test object",
|
|
)
|
|
|
|
|
|
def _fill_manager(
|
|
manager: FidelityManager,
|
|
count: int,
|
|
*,
|
|
content_size: int = 400,
|
|
turn: int = 0,
|
|
) -> list[str]:
|
|
"""Register `count` objects and return their IDs."""
|
|
ids = []
|
|
for i in range(count):
|
|
obj = _make_obj("x" * content_size, turn=turn + i)
|
|
oid = manager.register_object(obj)
|
|
ids.append(oid)
|
|
return ids
|
|
|
|
|
|
# ── FidelityLevel enum ───────────────────────────────────────
|
|
|
|
|
|
class TestFidelityLevel:
|
|
def test_five_levels(self):
|
|
assert len(FidelityLevel) == 5
|
|
|
|
def test_ordering(self):
|
|
assert FidelityLevel.L0 < FidelityLevel.L1 < FidelityLevel.L2
|
|
assert FidelityLevel.L2 < FidelityLevel.L3 < FidelityLevel.L4
|
|
|
|
def test_values(self):
|
|
assert FidelityLevel.L0 == 0
|
|
assert FidelityLevel.L4 == 4
|
|
|
|
def test_names(self):
|
|
assert FidelityLevel.L0.name == "L0"
|
|
assert FidelityLevel.L4.name == "L4"
|
|
|
|
|
|
# ── PressureZone enum ────────────────────────────────────────
|
|
|
|
|
|
class TestPressureZone:
|
|
def test_five_zones(self):
|
|
assert len(PressureZone) == 5
|
|
|
|
def test_ordering(self):
|
|
assert PressureZone.NORMAL < PressureZone.CAUTION
|
|
assert PressureZone.CAUTION < PressureZone.WARNING
|
|
assert PressureZone.WARNING < PressureZone.CRITICAL
|
|
assert PressureZone.CRITICAL < PressureZone.EMERGENCY
|
|
|
|
def test_names(self):
|
|
expected = {"NORMAL", "CAUTION", "WARNING", "CRITICAL", "EMERGENCY"}
|
|
assert {z.name for z in PressureZone} == expected
|
|
|
|
|
|
# ── SemanticObject ────────────────────────────────────────────
|
|
|
|
|
|
class TestSemanticObject:
|
|
def test_required_fields(self):
|
|
obj = _make_obj()
|
|
assert obj.id
|
|
assert obj.object_type == "file_context"
|
|
assert obj.content_full == "x" * 400
|
|
assert obj.current_fidelity == FidelityLevel.L0
|
|
assert obj.pinned is False
|
|
assert obj.pin_until_turn is None
|
|
assert obj.fault_count == 0
|
|
|
|
def test_token_estimation(self):
|
|
obj = _make_obj("a" * 1000)
|
|
assert obj.token_count_l0 == 250 # 1000 / 4
|
|
|
|
def test_tokens_at_each_level(self):
|
|
obj = _make_obj(
|
|
"a" * 400,
|
|
summary_detailed="b" * 120,
|
|
summary_compact="c" * 20,
|
|
stub="d" * 100,
|
|
)
|
|
assert obj.tokens_at(FidelityLevel.L0) == 100 # 400/4
|
|
assert obj.tokens_at(FidelityLevel.L1) == 30 # 120/4
|
|
assert obj.tokens_at(FidelityLevel.L2) == 5 # 20/4
|
|
assert obj.tokens_at(FidelityLevel.L3) == 25 # 100/4
|
|
assert obj.tokens_at(FidelityLevel.L4) == 0 # evicted
|
|
|
|
def test_current_tokens_tracks_fidelity(self):
|
|
obj = _make_obj("a" * 400, summary_detailed="b" * 120)
|
|
assert obj.current_tokens == 100 # L0: 400/4
|
|
obj.current_fidelity = FidelityLevel.L1
|
|
assert obj.current_tokens == 30 # L1: 120/4
|
|
|
|
def test_losses_default_empty(self):
|
|
obj = _make_obj()
|
|
assert obj.losses_l1 == []
|
|
assert obj.losses_l2 == []
|
|
|
|
def test_losses_populated(self):
|
|
obj = make_object(
|
|
object_type="design_decision",
|
|
content_full="decision content",
|
|
losses_l1=["exact error codes"],
|
|
losses_l2=["exact error codes", "function signatures"],
|
|
)
|
|
assert "exact error codes" in obj.losses_l1
|
|
assert len(obj.losses_l2) == 2
|
|
|
|
def test_valid_object_types(self):
|
|
expected = {
|
|
"conversation_phase",
|
|
"design_decision",
|
|
"debugging_session",
|
|
"file_context",
|
|
"tool_result",
|
|
"plan",
|
|
"error_context",
|
|
"external_reference",
|
|
}
|
|
assert VALID_OBJECT_TYPES == expected
|
|
|
|
def test_queryability_fields(self):
|
|
obj = make_object(
|
|
object_type="file_context",
|
|
content_full="content",
|
|
can_answer=["what functions are defined"],
|
|
fault_when=["need exact line numbers"],
|
|
key_entities=["auth.py", "middleware"],
|
|
)
|
|
assert obj.can_answer == ["what functions are defined"]
|
|
assert obj.fault_when == ["need exact line numbers"]
|
|
assert obj.key_entities == ["auth.py", "middleware"]
|
|
|
|
|
|
# ── Token estimation ──────────────────────────────────────────
|
|
|
|
|
|
class TestTokenEstimation:
|
|
def test_basic(self):
|
|
assert _estimate_tokens("a" * 100) == 25
|
|
|
|
def test_none(self):
|
|
assert _estimate_tokens(None) == 0
|
|
|
|
def test_empty(self):
|
|
# len("") = 0, 0 // 4 = 0, max(1, 0) = 1
|
|
assert _estimate_tokens("") == 1
|
|
|
|
def test_empty_returns_minimum(self):
|
|
# Empty string: len=0, 0//4=0, max(1,0)=1
|
|
assert _estimate_tokens("") == 1
|
|
|
|
def test_short(self):
|
|
assert _estimate_tokens("hi") == 1 # 2//4=0, max(1,0)=1
|
|
|
|
def test_exact_multiple(self):
|
|
assert _estimate_tokens("a" * 400) == 100
|
|
|
|
|
|
# ── make_object factory ───────────────────────────────────────
|
|
|
|
|
|
class TestMakeObject:
|
|
def test_generates_id(self):
|
|
obj = make_object(object_type="plan", content_full="plan content")
|
|
assert len(obj.id) == 16
|
|
assert obj.id.isalnum()
|
|
|
|
def test_unique_ids(self):
|
|
ids = {make_object(object_type="plan", content_full="x").id for _ in range(100)}
|
|
assert len(ids) == 100
|
|
|
|
def test_auto_token_estimates(self):
|
|
obj = make_object(
|
|
object_type="file_context",
|
|
content_full="a" * 800,
|
|
summary_detailed="b" * 240,
|
|
summary_compact="c" * 40,
|
|
stub="d" * 100,
|
|
)
|
|
assert obj.token_count_l0 == 200
|
|
assert obj.token_count_l1 == 60
|
|
assert obj.token_count_l2 == 10
|
|
assert obj.token_count_l3 == 25 # 100/4
|
|
|
|
def test_default_stub_tokens(self):
|
|
obj = make_object(object_type="plan", content_full="content")
|
|
assert obj.token_count_l3 == 25 # default when no stub provided
|
|
|
|
def test_starts_at_l0(self):
|
|
obj = make_object(object_type="plan", content_full="content")
|
|
assert obj.current_fidelity == FidelityLevel.L0
|
|
|
|
|
|
# ── FidelityManager basics ────────────────────────────────────
|
|
|
|
|
|
class TestFidelityManagerBasics:
|
|
def test_default_window_size(self):
|
|
fm = FidelityManager()
|
|
assert fm.window_size == 200_000
|
|
|
|
def test_custom_window_size(self):
|
|
fm = FidelityManager(window_size=100_000)
|
|
assert fm.window_size == 100_000
|
|
|
|
def test_register_and_get(self):
|
|
fm = FidelityManager()
|
|
obj = _make_obj()
|
|
oid = fm.register_object(obj)
|
|
assert fm.get_object(oid) is obj
|
|
|
|
def test_get_nonexistent(self):
|
|
fm = FidelityManager()
|
|
assert fm.get_object("nonexistent") is None
|
|
|
|
def test_register_returns_id(self):
|
|
fm = FidelityManager()
|
|
obj = _make_obj()
|
|
oid = fm.register_object(obj)
|
|
assert oid == obj.id
|
|
|
|
def test_total_tokens_empty(self):
|
|
fm = FidelityManager()
|
|
assert fm.total_tokens() == 0
|
|
|
|
def test_total_tokens_sums_current_fidelity(self):
|
|
fm = FidelityManager()
|
|
obj1 = _make_obj("a" * 400) # 100 tokens at L0
|
|
obj2 = _make_obj("b" * 800) # 200 tokens at L0
|
|
fm.register_object(obj1)
|
|
fm.register_object(obj2)
|
|
assert fm.total_tokens() == 300
|
|
|
|
def test_total_tokens_respects_fidelity_change(self):
|
|
fm = FidelityManager()
|
|
obj = _make_obj("a" * 400, summary_detailed="b" * 120)
|
|
fm.register_object(obj)
|
|
assert fm.total_tokens() == 100 # L0: 400/4
|
|
obj.current_fidelity = FidelityLevel.L1
|
|
assert fm.total_tokens() == 30 # L1: 120/4
|
|
|
|
|
|
# ── Pressure zones ────────────────────────────────────────────
|
|
|
|
|
|
class TestPressureZones:
|
|
def test_normal_zone(self):
|
|
fm = FidelityManager(window_size=1000)
|
|
# 400 chars = 100 tokens, 10% of 1000 → NORMAL
|
|
_fill_manager(fm, 1, content_size=400)
|
|
assert fm.current_pressure() == PressureZone.NORMAL
|
|
|
|
def test_caution_zone(self):
|
|
fm = FidelityManager(window_size=1000)
|
|
# 2400 chars = 600 tokens, 60% of 1000 → CAUTION
|
|
_fill_manager(fm, 1, content_size=2400)
|
|
assert fm.current_pressure() == PressureZone.CAUTION
|
|
|
|
def test_warning_zone(self):
|
|
fm = FidelityManager(window_size=1000)
|
|
# 3200 chars = 800 tokens, 80% of 1000 → WARNING
|
|
_fill_manager(fm, 1, content_size=3200)
|
|
assert fm.current_pressure() == PressureZone.WARNING
|
|
|
|
def test_critical_zone(self):
|
|
fm = FidelityManager(window_size=1000)
|
|
# 3600 chars = 900 tokens, 90% of 1000 → CRITICAL
|
|
_fill_manager(fm, 1, content_size=3600)
|
|
assert fm.current_pressure() == PressureZone.CRITICAL
|
|
|
|
def test_emergency_zone(self):
|
|
fm = FidelityManager(window_size=1000)
|
|
# 3840 chars = 960 tokens, 96% of 1000 → EMERGENCY
|
|
_fill_manager(fm, 1, content_size=3840)
|
|
assert fm.current_pressure() == PressureZone.EMERGENCY
|
|
|
|
def test_exact_boundary_50pct(self):
|
|
fm = FidelityManager(window_size=1000)
|
|
# 2000 chars = 500 tokens, exactly 50% → CAUTION (>= threshold)
|
|
_fill_manager(fm, 1, content_size=2000)
|
|
assert fm.current_pressure() == PressureZone.CAUTION
|
|
|
|
def test_just_below_50pct(self):
|
|
fm = FidelityManager(window_size=1000)
|
|
# 1996 chars = 499 tokens, 49.9% → NORMAL
|
|
_fill_manager(fm, 1, content_size=1996)
|
|
assert fm.current_pressure() == PressureZone.NORMAL
|
|
|
|
def test_zero_window_is_emergency(self):
|
|
fm = FidelityManager(window_size=0)
|
|
assert fm.current_pressure() == PressureZone.EMERGENCY
|
|
|
|
|
|
# ── Degradation ───────────────────────────────────────────────
|
|
|
|
|
|
class TestDegradation:
|
|
def test_no_degradation_in_normal(self):
|
|
fm = FidelityManager(window_size=10_000)
|
|
_fill_manager(fm, 1, content_size=400) # 100 tokens, 1% → NORMAL
|
|
transitions = fm.degrade(current_turn=1)
|
|
assert transitions == []
|
|
|
|
def test_caution_degrades_l0_to_l1(self):
|
|
fm = FidelityManager(window_size=1000)
|
|
# 2400 chars = 600 tokens → CAUTION
|
|
_fill_manager(fm, 3, content_size=800)
|
|
assert fm.current_pressure() == PressureZone.CAUTION
|
|
|
|
transitions = fm.degrade(current_turn=5)
|
|
# Should have degraded some L0 objects to L1
|
|
assert len(transitions) > 0
|
|
for _oid, old, new in transitions:
|
|
assert old == FidelityLevel.L0
|
|
assert new == FidelityLevel.L1
|
|
|
|
def test_caution_degrades_oldest_first(self):
|
|
fm = FidelityManager(window_size=1000)
|
|
# Create objects at different turns
|
|
# 1200 chars each = 300 tokens each, 600 total = 60% → CAUTION
|
|
obj_old = _make_obj("a" * 1200, turn=0)
|
|
obj_new = _make_obj("b" * 1200, turn=5)
|
|
id_old = fm.register_object(obj_old)
|
|
id_new = fm.register_object(obj_new)
|
|
|
|
# Mark new one as recently accessed
|
|
fm.mark_accessed(id_new, current_turn=10)
|
|
|
|
assert fm.current_pressure() == PressureZone.CAUTION
|
|
|
|
transitions = fm.degrade(current_turn=10)
|
|
# Oldest should be degraded first
|
|
if transitions:
|
|
assert transitions[0][0] == id_old
|
|
|
|
def test_warning_degrades_multiple_levels(self):
|
|
fm = FidelityManager(window_size=1000)
|
|
# 3200 chars = 800 tokens → WARNING
|
|
_fill_manager(fm, 4, content_size=800)
|
|
assert fm.current_pressure() == PressureZone.WARNING
|
|
|
|
transitions = fm.degrade(current_turn=10)
|
|
assert len(transitions) > 0
|
|
# Should see L0→L1 transitions at minimum
|
|
levels_seen = {(old, new) for _, old, new in transitions}
|
|
assert (FidelityLevel.L0, FidelityLevel.L1) in levels_seen
|
|
|
|
def test_critical_degrades_aggressively(self):
|
|
fm = FidelityManager(window_size=1000)
|
|
# 3600 chars = 900 tokens → CRITICAL
|
|
_fill_manager(fm, 3, content_size=1200)
|
|
assert fm.current_pressure() == PressureZone.CRITICAL
|
|
|
|
transitions = fm.degrade(current_turn=10)
|
|
assert len(transitions) > 0
|
|
# Should see objects pushed to L3 or L4
|
|
final_levels = {new for _, _, new in transitions}
|
|
assert FidelityLevel.L3 in final_levels or FidelityLevel.L4 in final_levels
|
|
|
|
def test_emergency_evicts_all_unpinned(self):
|
|
fm = FidelityManager(window_size=1000)
|
|
# 3840 chars = 960 tokens → EMERGENCY
|
|
ids = _fill_manager(fm, 4, content_size=960)
|
|
assert fm.current_pressure() == PressureZone.EMERGENCY
|
|
|
|
fm.degrade(current_turn=10)
|
|
# All objects should end at L4
|
|
for obj_id in ids:
|
|
obj = fm.get_object(obj_id)
|
|
assert obj is not None
|
|
assert obj.current_fidelity == FidelityLevel.L4
|
|
|
|
def test_emergency_respects_pins(self):
|
|
fm = FidelityManager(window_size=1000)
|
|
ids = _fill_manager(fm, 4, content_size=960)
|
|
|
|
# Pin one object
|
|
fm.record_fault(ids[0], current_turn=5, pin_duration=20)
|
|
|
|
fm.degrade(current_turn=10)
|
|
# Pinned object should NOT be at L4
|
|
pinned_obj = fm.get_object(ids[0])
|
|
assert pinned_obj is not None
|
|
assert pinned_obj.current_fidelity < FidelityLevel.L4
|
|
|
|
# Others should be at L4
|
|
for oid in ids[1:]:
|
|
obj = fm.get_object(oid)
|
|
assert obj is not None
|
|
assert obj.current_fidelity == FidelityLevel.L4
|
|
|
|
def test_degrade_returns_transitions(self):
|
|
fm = FidelityManager(window_size=1000)
|
|
_fill_manager(fm, 3, content_size=800)
|
|
transitions = fm.degrade(current_turn=5)
|
|
for oid, old, new in transitions:
|
|
assert isinstance(oid, str)
|
|
assert isinstance(old, FidelityLevel)
|
|
assert isinstance(new, FidelityLevel)
|
|
assert new > old # Degradation means higher numeric level
|
|
|
|
def test_degrade_stops_when_pressure_relieved(self):
|
|
fm = FidelityManager(window_size=1000)
|
|
# Create objects with large L0 but small L1
|
|
for i in range(3):
|
|
obj = _make_obj(
|
|
"a" * 800, # 200 tokens at L0
|
|
summary_detailed="b" * 40, # 10 tokens at L1
|
|
turn=i,
|
|
)
|
|
fm.register_object(obj)
|
|
|
|
# 600 tokens → CAUTION
|
|
assert fm.current_pressure() == PressureZone.CAUTION
|
|
|
|
fm.degrade(current_turn=5)
|
|
# After degrading enough objects, pressure should drop
|
|
# Not all objects need to be degraded
|
|
assert fm.current_pressure() <= PressureZone.CAUTION
|
|
|
|
|
|
# ── Upgrade ───────────────────────────────────────────────────
|
|
|
|
|
|
class TestUpgrade:
|
|
def test_upgrade_l1_to_l0(self):
|
|
fm = FidelityManager()
|
|
obj = _make_obj("content" * 50)
|
|
oid = fm.register_object(obj)
|
|
obj.current_fidelity = FidelityLevel.L1
|
|
|
|
result = fm.upgrade(oid, FidelityLevel.L0, current_turn=5)
|
|
assert result is True
|
|
assert obj.current_fidelity == FidelityLevel.L0
|
|
|
|
def test_upgrade_updates_last_accessed(self):
|
|
fm = FidelityManager()
|
|
obj = _make_obj("content" * 50)
|
|
oid = fm.register_object(obj)
|
|
obj.current_fidelity = FidelityLevel.L2
|
|
|
|
fm.upgrade(oid, FidelityLevel.L0, current_turn=42)
|
|
assert obj.last_accessed_turn == 42
|
|
|
|
def test_upgrade_rejects_same_level(self):
|
|
fm = FidelityManager()
|
|
obj = _make_obj()
|
|
oid = fm.register_object(obj)
|
|
|
|
result = fm.upgrade(oid, FidelityLevel.L0, current_turn=5)
|
|
assert result is False # Already at L0
|
|
|
|
def test_upgrade_rejects_downgrade(self):
|
|
fm = FidelityManager()
|
|
obj = _make_obj()
|
|
oid = fm.register_object(obj)
|
|
|
|
result = fm.upgrade(oid, FidelityLevel.L1, current_turn=5)
|
|
assert result is False # L1 > L0, not an upgrade
|
|
|
|
def test_upgrade_nonexistent_object(self):
|
|
fm = FidelityManager()
|
|
result = fm.upgrade("nonexistent", FidelityLevel.L0, current_turn=5)
|
|
assert result is False
|
|
|
|
def test_upgrade_l3_to_l1(self):
|
|
fm = FidelityManager()
|
|
obj = _make_obj(summary_detailed="detailed summary content")
|
|
oid = fm.register_object(obj)
|
|
obj.current_fidelity = FidelityLevel.L3
|
|
|
|
result = fm.upgrade(oid, FidelityLevel.L1, current_turn=10)
|
|
assert result is True
|
|
assert obj.current_fidelity == FidelityLevel.L1
|
|
|
|
def test_upgrade_requires_content_at_target(self):
|
|
fm = FidelityManager()
|
|
obj = make_object(
|
|
object_type="file_context",
|
|
content_full="full content",
|
|
# No summary_detailed provided
|
|
)
|
|
oid = fm.register_object(obj)
|
|
obj.current_fidelity = FidelityLevel.L3
|
|
|
|
# Can upgrade to L0 (has content_full)
|
|
result = fm.upgrade(oid, FidelityLevel.L0, current_turn=5)
|
|
assert result is True
|
|
|
|
def test_upgrade_rejects_missing_l1_content(self):
|
|
fm = FidelityManager()
|
|
obj = SemanticObject(
|
|
id="test123",
|
|
object_type="file_context",
|
|
content_full="full",
|
|
summary_detailed=None, # No L1 content
|
|
token_count_l0=1,
|
|
)
|
|
fm.register_object(obj)
|
|
obj.current_fidelity = FidelityLevel.L3
|
|
|
|
result = fm.upgrade("test123", FidelityLevel.L1, current_turn=5)
|
|
assert result is False
|
|
|
|
|
|
# ── Fault-driven pinning ──────────────────────────────────────
|
|
|
|
|
|
class TestFaultPinning:
|
|
def test_record_fault_pins_object(self):
|
|
fm = FidelityManager()
|
|
obj = _make_obj()
|
|
oid = fm.register_object(obj)
|
|
|
|
fm.record_fault(oid, current_turn=10, pin_duration=5)
|
|
assert obj.pinned is True
|
|
assert obj.pin_until_turn == 15
|
|
assert obj.fault_count == 1
|
|
|
|
def test_record_fault_updates_access(self):
|
|
fm = FidelityManager()
|
|
obj = _make_obj()
|
|
oid = fm.register_object(obj)
|
|
|
|
fm.record_fault(oid, current_turn=10)
|
|
assert obj.last_accessed_turn == 10
|
|
|
|
def test_multiple_faults_increment_count(self):
|
|
fm = FidelityManager()
|
|
obj = _make_obj()
|
|
oid = fm.register_object(obj)
|
|
|
|
fm.record_fault(oid, current_turn=10)
|
|
fm.record_fault(oid, current_turn=12)
|
|
assert obj.fault_count == 2
|
|
|
|
def test_pin_prevents_degradation(self):
|
|
fm = FidelityManager(window_size=1000)
|
|
ids = _fill_manager(fm, 3, content_size=800)
|
|
|
|
# Pin the oldest object
|
|
fm.record_fault(ids[0], current_turn=5, pin_duration=20)
|
|
|
|
transitions = fm.degrade(current_turn=10)
|
|
# Pinned object should not appear in transitions
|
|
degraded_ids = {oid for oid, _, _ in transitions}
|
|
assert ids[0] not in degraded_ids
|
|
|
|
def test_pin_expires(self):
|
|
fm = FidelityManager(window_size=1000)
|
|
ids = _fill_manager(fm, 3, content_size=800)
|
|
|
|
fm.record_fault(ids[0], current_turn=5, pin_duration=3)
|
|
# Pin expires at turn 8
|
|
|
|
fm.degrade(current_turn=9)
|
|
# Now the object can be degraded
|
|
obj = fm.get_object(ids[0])
|
|
assert obj is not None
|
|
assert obj.pinned is False
|
|
|
|
def test_default_pin_duration(self):
|
|
fm = FidelityManager()
|
|
obj = _make_obj()
|
|
oid = fm.register_object(obj)
|
|
|
|
fm.record_fault(oid, current_turn=10)
|
|
assert obj.pin_until_turn == 15 # default duration = 5
|
|
|
|
def test_fault_nonexistent_object(self):
|
|
fm = FidelityManager()
|
|
# Should not raise
|
|
fm.record_fault("nonexistent", current_turn=10)
|
|
|
|
|
|
# ── mark_accessed ─────────────────────────────────────────────
|
|
|
|
|
|
class TestMarkAccessed:
|
|
def test_updates_last_accessed(self):
|
|
fm = FidelityManager()
|
|
obj = _make_obj(turn=0)
|
|
oid = fm.register_object(obj)
|
|
|
|
fm.mark_accessed(oid, current_turn=42)
|
|
assert obj.last_accessed_turn == 42
|
|
|
|
def test_nonexistent_object(self):
|
|
fm = FidelityManager()
|
|
# Should not raise
|
|
fm.mark_accessed("nonexistent", current_turn=10)
|
|
|
|
|
|
# ── eviction_candidates ──────────────────────────────────────
|
|
|
|
|
|
class TestEvictionCandidates:
|
|
def test_empty_manager(self):
|
|
fm = FidelityManager()
|
|
assert fm.eviction_candidates(current_turn=0) == []
|
|
|
|
def test_excludes_l4_objects(self):
|
|
fm = FidelityManager()
|
|
obj = _make_obj()
|
|
fm.register_object(obj)
|
|
obj.current_fidelity = FidelityLevel.L4
|
|
|
|
candidates = fm.eviction_candidates(current_turn=0)
|
|
assert len(candidates) == 0
|
|
|
|
def test_unpinned_before_pinned(self):
|
|
fm = FidelityManager()
|
|
obj_pinned = _make_obj(turn=0)
|
|
obj_free = _make_obj(turn=1)
|
|
id_pinned = fm.register_object(obj_pinned)
|
|
id_free = fm.register_object(obj_free)
|
|
|
|
fm.record_fault(id_pinned, current_turn=5, pin_duration=20)
|
|
|
|
candidates = fm.eviction_candidates(current_turn=6)
|
|
assert len(candidates) == 2
|
|
assert candidates[0].id == id_free # Unpinned first
|
|
|
|
def test_older_access_more_evictable(self):
|
|
fm = FidelityManager()
|
|
obj_old = _make_obj(turn=0)
|
|
obj_new = _make_obj(turn=0)
|
|
id_old = fm.register_object(obj_old)
|
|
id_new = fm.register_object(obj_new)
|
|
|
|
fm.mark_accessed(id_new, current_turn=10)
|
|
|
|
candidates = fm.eviction_candidates(current_turn=10)
|
|
assert candidates[0].id == id_old
|
|
|
|
def test_lower_fidelity_more_evictable(self):
|
|
fm = FidelityManager()
|
|
obj_l0 = _make_obj(turn=0)
|
|
obj_l3 = _make_obj(turn=0)
|
|
fm.register_object(obj_l0)
|
|
id_l3 = fm.register_object(obj_l3)
|
|
obj_l3.current_fidelity = FidelityLevel.L3
|
|
|
|
candidates = fm.eviction_candidates(current_turn=0)
|
|
# L3 (closer to eviction) should come first
|
|
assert candidates[0].id == id_l3
|
|
|
|
def test_expired_pins_are_evictable(self):
|
|
fm = FidelityManager()
|
|
obj = _make_obj(turn=0)
|
|
oid = fm.register_object(obj)
|
|
|
|
fm.record_fault(oid, current_turn=5, pin_duration=3)
|
|
# Pin expires at turn 8
|
|
|
|
candidates = fm.eviction_candidates(current_turn=9)
|
|
assert len(candidates) == 1
|
|
assert candidates[0].pinned is False
|
|
|
|
|
|
# ── objects_at_fidelity ───────────────────────────────────────
|
|
|
|
|
|
class TestObjectsAtFidelity:
|
|
def test_all_at_l0(self):
|
|
fm = FidelityManager()
|
|
_fill_manager(fm, 3)
|
|
assert len(fm.objects_at_fidelity(FidelityLevel.L0)) == 3
|
|
assert len(fm.objects_at_fidelity(FidelityLevel.L1)) == 0
|
|
|
|
def test_mixed_levels(self):
|
|
fm = FidelityManager()
|
|
ids = _fill_manager(fm, 3)
|
|
obj0 = fm.get_object(ids[0])
|
|
obj1 = fm.get_object(ids[1])
|
|
assert obj0 is not None
|
|
assert obj1 is not None
|
|
obj0.current_fidelity = FidelityLevel.L1
|
|
obj1.current_fidelity = FidelityLevel.L3
|
|
|
|
assert len(fm.objects_at_fidelity(FidelityLevel.L0)) == 1
|
|
assert len(fm.objects_at_fidelity(FidelityLevel.L1)) == 1
|
|
assert len(fm.objects_at_fidelity(FidelityLevel.L3)) == 1
|
|
|
|
|
|
# ── summary ───────────────────────────────────────────────────
|
|
|
|
|
|
class TestSummary:
|
|
def test_summary_structure(self):
|
|
fm = FidelityManager(window_size=10_000)
|
|
_fill_manager(fm, 2, content_size=400)
|
|
|
|
s = fm.summary()
|
|
assert "total_objects" in s
|
|
assert "total_tokens" in s
|
|
assert "window_size" in s
|
|
assert "pressure_zone" in s
|
|
assert "objects_by_level" in s
|
|
assert "pinned_count" in s
|
|
assert "total_faults" in s
|
|
|
|
def test_summary_values(self):
|
|
fm = FidelityManager(window_size=10_000)
|
|
ids = _fill_manager(fm, 3, content_size=400)
|
|
fm.record_fault(ids[0], current_turn=5)
|
|
|
|
s = fm.summary()
|
|
assert s["total_objects"] == 3
|
|
assert s["total_tokens"] == 300 # 3 * 100
|
|
assert s["window_size"] == 10_000
|
|
assert s["pressure_zone"] == "NORMAL"
|
|
assert s["objects_by_level"]["L0"] == 3
|
|
assert s["pinned_count"] == 1
|
|
assert s["total_faults"] == 1
|
|
|
|
|
|
# ── Integration: full lifecycle ───────────────────────────────
|
|
|
|
|
|
class TestIntegration:
|
|
def test_register_degrade_upgrade_cycle(self):
|
|
"""Full lifecycle: register → pressure → degrade → access → upgrade."""
|
|
fm = FidelityManager(window_size=500)
|
|
|
|
# Register objects that push into CAUTION
|
|
ids = []
|
|
for i in range(5):
|
|
obj = _make_obj(
|
|
"x" * 400, # 100 tokens each
|
|
summary_detailed="y" * 120, # 30 tokens
|
|
summary_compact="z" * 20, # 5 tokens
|
|
stub="stub text",
|
|
turn=i,
|
|
)
|
|
ids.append(fm.register_object(obj))
|
|
|
|
# 500 tokens in 500 window → 100% → EMERGENCY
|
|
assert fm.current_pressure() == PressureZone.EMERGENCY
|
|
|
|
# Degrade
|
|
transitions = fm.degrade(current_turn=10)
|
|
assert len(transitions) > 0
|
|
|
|
# All should be evicted (emergency)
|
|
for oid in ids:
|
|
obj = fm.get_object(oid)
|
|
assert obj is not None
|
|
assert obj.current_fidelity == FidelityLevel.L4
|
|
|
|
# Upgrade one back to L0
|
|
result = fm.upgrade(ids[0], FidelityLevel.L0, current_turn=11)
|
|
assert result is True
|
|
upgraded = fm.get_object(ids[0])
|
|
assert upgraded is not None
|
|
assert upgraded.current_fidelity == FidelityLevel.L0
|
|
|
|
def test_fault_pin_degrade_cycle(self):
|
|
"""Fault → pin → degrade respects pin → pin expires → degrade works."""
|
|
fm = FidelityManager(window_size=1000)
|
|
ids = _fill_manager(fm, 4, content_size=800)
|
|
|
|
# Record fault on first object
|
|
fm.record_fault(ids[0], current_turn=5, pin_duration=3)
|
|
|
|
# Degrade at turn 6 — pinned object survives
|
|
transitions = fm.degrade(current_turn=6)
|
|
degraded_ids = {oid for oid, _, _ in transitions}
|
|
assert ids[0] not in degraded_ids
|
|
|
|
# At turn 9, pin expired — now it can be degraded
|
|
obj = fm.get_object(ids[0])
|
|
assert obj is not None
|
|
# Reset to L0 for clean test
|
|
obj.current_fidelity = FidelityLevel.L0
|
|
|
|
# Re-fill to get pressure back up
|
|
_fill_manager(fm, 2, content_size=800, turn=9)
|
|
|
|
fm.degrade(current_turn=9)
|
|
# Now the previously-pinned object should be degradable
|
|
obj_after = fm.get_object(ids[0])
|
|
assert obj_after is not None
|
|
assert obj_after.pinned is False
|
|
|
|
def test_token_accounting_through_degradation(self):
|
|
"""Token count decreases as objects are degraded."""
|
|
fm = FidelityManager(window_size=1000)
|
|
|
|
for i in range(3):
|
|
obj = _make_obj(
|
|
"a" * 800, # 200 tokens at L0
|
|
summary_detailed="b" * 120, # 30 tokens at L1
|
|
turn=i,
|
|
)
|
|
fm.register_object(obj)
|
|
|
|
initial_tokens = fm.total_tokens()
|
|
assert initial_tokens == 600 # 3 * 200
|
|
|
|
fm.degrade(current_turn=10)
|
|
|
|
# Tokens should have decreased
|
|
assert fm.total_tokens() < initial_tokens
|
|
|
|
def test_multiple_degrade_passes(self):
|
|
"""Multiple degrade calls progressively reduce fidelity."""
|
|
fm = FidelityManager(window_size=200)
|
|
|
|
for i in range(3):
|
|
obj = _make_obj(
|
|
"a" * 400, # 100 tokens at L0
|
|
summary_detailed="b" * 120, # 30 tokens at L1
|
|
summary_compact="c" * 20, # 5 tokens at L2
|
|
stub="stub",
|
|
turn=i,
|
|
)
|
|
fm.register_object(obj)
|
|
|
|
# 300 tokens in 200 window → EMERGENCY
|
|
fm.degrade(current_turn=10)
|
|
|
|
# After emergency, all should be L4
|
|
for obj in fm._objects.values():
|
|
assert obj.current_fidelity == FidelityLevel.L4
|