mnemosyne/tests/test_fidelity.py
Joey Yakimowich-Payne d26c56c2f0 feat: add multi-fidelity compression engine
5-level fidelity manager (L0-Full to L4-Evicted) with helper LLM
(Haiku 4.5) for intelligent summarization during degradation.

Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
2026-03-13 11:40:56 -06:00

861 lines
30 KiB
Python

"""Tests for the multi-fidelity state machine."""
from __future__ import annotations
from mnemosyne.fidelity import (
VALID_OBJECT_TYPES,
FidelityLevel,
FidelityManager,
PressureZone,
SemanticObject,
_estimate_tokens,
make_object,
)
# ── Helpers ──────────────────────────────────────────────────
def _make_obj(
content: str = "x" * 400,
*,
object_type: str = "file_context",
turn: int = 0,
summary_detailed: str | None = None,
summary_compact: str | None = None,
stub: str | None = None,
) -> SemanticObject:
"""Create a SemanticObject with sensible defaults for testing."""
return make_object(
object_type=object_type,
content_full=content,
created_at_turn=turn,
summary_detailed=summary_detailed or ("summary " * 20), # ~140 chars
summary_compact=summary_compact or ("compact " * 5), # ~40 chars
stub=stub or "file_context: test object",
)
def _fill_manager(
manager: FidelityManager,
count: int,
*,
content_size: int = 400,
turn: int = 0,
) -> list[str]:
"""Register `count` objects and return their IDs."""
ids = []
for i in range(count):
obj = _make_obj("x" * content_size, turn=turn + i)
oid = manager.register_object(obj)
ids.append(oid)
return ids
# ── FidelityLevel enum ───────────────────────────────────────
class TestFidelityLevel:
def test_five_levels(self):
assert len(FidelityLevel) == 5
def test_ordering(self):
assert FidelityLevel.L0 < FidelityLevel.L1 < FidelityLevel.L2
assert FidelityLevel.L2 < FidelityLevel.L3 < FidelityLevel.L4
def test_values(self):
assert FidelityLevel.L0 == 0
assert FidelityLevel.L4 == 4
def test_names(self):
assert FidelityLevel.L0.name == "L0"
assert FidelityLevel.L4.name == "L4"
# ── PressureZone enum ────────────────────────────────────────
class TestPressureZone:
def test_five_zones(self):
assert len(PressureZone) == 5
def test_ordering(self):
assert PressureZone.NORMAL < PressureZone.CAUTION
assert PressureZone.CAUTION < PressureZone.WARNING
assert PressureZone.WARNING < PressureZone.CRITICAL
assert PressureZone.CRITICAL < PressureZone.EMERGENCY
def test_names(self):
expected = {"NORMAL", "CAUTION", "WARNING", "CRITICAL", "EMERGENCY"}
assert {z.name for z in PressureZone} == expected
# ── SemanticObject ────────────────────────────────────────────
class TestSemanticObject:
def test_required_fields(self):
obj = _make_obj()
assert obj.id
assert obj.object_type == "file_context"
assert obj.content_full == "x" * 400
assert obj.current_fidelity == FidelityLevel.L0
assert obj.pinned is False
assert obj.pin_until_turn is None
assert obj.fault_count == 0
def test_token_estimation(self):
obj = _make_obj("a" * 1000)
assert obj.token_count_l0 == 250 # 1000 / 4
def test_tokens_at_each_level(self):
obj = _make_obj(
"a" * 400,
summary_detailed="b" * 120,
summary_compact="c" * 20,
stub="d" * 100,
)
assert obj.tokens_at(FidelityLevel.L0) == 100 # 400/4
assert obj.tokens_at(FidelityLevel.L1) == 30 # 120/4
assert obj.tokens_at(FidelityLevel.L2) == 5 # 20/4
assert obj.tokens_at(FidelityLevel.L3) == 25 # 100/4
assert obj.tokens_at(FidelityLevel.L4) == 0 # evicted
def test_current_tokens_tracks_fidelity(self):
obj = _make_obj("a" * 400, summary_detailed="b" * 120)
assert obj.current_tokens == 100 # L0: 400/4
obj.current_fidelity = FidelityLevel.L1
assert obj.current_tokens == 30 # L1: 120/4
def test_losses_default_empty(self):
obj = _make_obj()
assert obj.losses_l1 == []
assert obj.losses_l2 == []
def test_losses_populated(self):
obj = make_object(
object_type="design_decision",
content_full="decision content",
losses_l1=["exact error codes"],
losses_l2=["exact error codes", "function signatures"],
)
assert "exact error codes" in obj.losses_l1
assert len(obj.losses_l2) == 2
def test_valid_object_types(self):
expected = {
"conversation_phase",
"design_decision",
"debugging_session",
"file_context",
"tool_result",
"plan",
"error_context",
"external_reference",
}
assert VALID_OBJECT_TYPES == expected
def test_queryability_fields(self):
obj = make_object(
object_type="file_context",
content_full="content",
can_answer=["what functions are defined"],
fault_when=["need exact line numbers"],
key_entities=["auth.py", "middleware"],
)
assert obj.can_answer == ["what functions are defined"]
assert obj.fault_when == ["need exact line numbers"]
assert obj.key_entities == ["auth.py", "middleware"]
# ── Token estimation ──────────────────────────────────────────
class TestTokenEstimation:
def test_basic(self):
assert _estimate_tokens("a" * 100) == 25
def test_none(self):
assert _estimate_tokens(None) == 0
def test_empty(self):
# len("") = 0, 0 // 4 = 0, max(1, 0) = 1
assert _estimate_tokens("") == 1
def test_empty_returns_minimum(self):
# Empty string: len=0, 0//4=0, max(1,0)=1
assert _estimate_tokens("") == 1
def test_short(self):
assert _estimate_tokens("hi") == 1 # 2//4=0, max(1,0)=1
def test_exact_multiple(self):
assert _estimate_tokens("a" * 400) == 100
# ── make_object factory ───────────────────────────────────────
class TestMakeObject:
def test_generates_id(self):
obj = make_object(object_type="plan", content_full="plan content")
assert len(obj.id) == 16
assert obj.id.isalnum()
def test_unique_ids(self):
ids = {make_object(object_type="plan", content_full="x").id for _ in range(100)}
assert len(ids) == 100
def test_auto_token_estimates(self):
obj = make_object(
object_type="file_context",
content_full="a" * 800,
summary_detailed="b" * 240,
summary_compact="c" * 40,
stub="d" * 100,
)
assert obj.token_count_l0 == 200
assert obj.token_count_l1 == 60
assert obj.token_count_l2 == 10
assert obj.token_count_l3 == 25 # 100/4
def test_default_stub_tokens(self):
obj = make_object(object_type="plan", content_full="content")
assert obj.token_count_l3 == 25 # default when no stub provided
def test_starts_at_l0(self):
obj = make_object(object_type="plan", content_full="content")
assert obj.current_fidelity == FidelityLevel.L0
# ── FidelityManager basics ────────────────────────────────────
class TestFidelityManagerBasics:
def test_default_window_size(self):
fm = FidelityManager()
assert fm.window_size == 200_000
def test_custom_window_size(self):
fm = FidelityManager(window_size=100_000)
assert fm.window_size == 100_000
def test_register_and_get(self):
fm = FidelityManager()
obj = _make_obj()
oid = fm.register_object(obj)
assert fm.get_object(oid) is obj
def test_get_nonexistent(self):
fm = FidelityManager()
assert fm.get_object("nonexistent") is None
def test_register_returns_id(self):
fm = FidelityManager()
obj = _make_obj()
oid = fm.register_object(obj)
assert oid == obj.id
def test_total_tokens_empty(self):
fm = FidelityManager()
assert fm.total_tokens() == 0
def test_total_tokens_sums_current_fidelity(self):
fm = FidelityManager()
obj1 = _make_obj("a" * 400) # 100 tokens at L0
obj2 = _make_obj("b" * 800) # 200 tokens at L0
fm.register_object(obj1)
fm.register_object(obj2)
assert fm.total_tokens() == 300
def test_total_tokens_respects_fidelity_change(self):
fm = FidelityManager()
obj = _make_obj("a" * 400, summary_detailed="b" * 120)
fm.register_object(obj)
assert fm.total_tokens() == 100 # L0: 400/4
obj.current_fidelity = FidelityLevel.L1
assert fm.total_tokens() == 30 # L1: 120/4
# ── Pressure zones ────────────────────────────────────────────
class TestPressureZones:
def test_normal_zone(self):
fm = FidelityManager(window_size=1000)
# 400 chars = 100 tokens, 10% of 1000 → NORMAL
_fill_manager(fm, 1, content_size=400)
assert fm.current_pressure() == PressureZone.NORMAL
def test_caution_zone(self):
fm = FidelityManager(window_size=1000)
# 2400 chars = 600 tokens, 60% of 1000 → CAUTION
_fill_manager(fm, 1, content_size=2400)
assert fm.current_pressure() == PressureZone.CAUTION
def test_warning_zone(self):
fm = FidelityManager(window_size=1000)
# 3200 chars = 800 tokens, 80% of 1000 → WARNING
_fill_manager(fm, 1, content_size=3200)
assert fm.current_pressure() == PressureZone.WARNING
def test_critical_zone(self):
fm = FidelityManager(window_size=1000)
# 3600 chars = 900 tokens, 90% of 1000 → CRITICAL
_fill_manager(fm, 1, content_size=3600)
assert fm.current_pressure() == PressureZone.CRITICAL
def test_emergency_zone(self):
fm = FidelityManager(window_size=1000)
# 3840 chars = 960 tokens, 96% of 1000 → EMERGENCY
_fill_manager(fm, 1, content_size=3840)
assert fm.current_pressure() == PressureZone.EMERGENCY
def test_exact_boundary_50pct(self):
fm = FidelityManager(window_size=1000)
# 2000 chars = 500 tokens, exactly 50% → CAUTION (>= threshold)
_fill_manager(fm, 1, content_size=2000)
assert fm.current_pressure() == PressureZone.CAUTION
def test_just_below_50pct(self):
fm = FidelityManager(window_size=1000)
# 1996 chars = 499 tokens, 49.9% → NORMAL
_fill_manager(fm, 1, content_size=1996)
assert fm.current_pressure() == PressureZone.NORMAL
def test_zero_window_is_emergency(self):
fm = FidelityManager(window_size=0)
assert fm.current_pressure() == PressureZone.EMERGENCY
# ── Degradation ───────────────────────────────────────────────
class TestDegradation:
def test_no_degradation_in_normal(self):
fm = FidelityManager(window_size=10_000)
_fill_manager(fm, 1, content_size=400) # 100 tokens, 1% → NORMAL
transitions = fm.degrade(current_turn=1)
assert transitions == []
def test_caution_degrades_l0_to_l1(self):
fm = FidelityManager(window_size=1000)
# 2400 chars = 600 tokens → CAUTION
_fill_manager(fm, 3, content_size=800)
assert fm.current_pressure() == PressureZone.CAUTION
transitions = fm.degrade(current_turn=5)
# Should have degraded some L0 objects to L1
assert len(transitions) > 0
for _oid, old, new in transitions:
assert old == FidelityLevel.L0
assert new == FidelityLevel.L1
def test_caution_degrades_oldest_first(self):
fm = FidelityManager(window_size=1000)
# Create objects at different turns
# 1200 chars each = 300 tokens each, 600 total = 60% → CAUTION
obj_old = _make_obj("a" * 1200, turn=0)
obj_new = _make_obj("b" * 1200, turn=5)
id_old = fm.register_object(obj_old)
id_new = fm.register_object(obj_new)
# Mark new one as recently accessed
fm.mark_accessed(id_new, current_turn=10)
assert fm.current_pressure() == PressureZone.CAUTION
transitions = fm.degrade(current_turn=10)
# Oldest should be degraded first
if transitions:
assert transitions[0][0] == id_old
def test_warning_degrades_multiple_levels(self):
fm = FidelityManager(window_size=1000)
# 3200 chars = 800 tokens → WARNING
_fill_manager(fm, 4, content_size=800)
assert fm.current_pressure() == PressureZone.WARNING
transitions = fm.degrade(current_turn=10)
assert len(transitions) > 0
# Should see L0→L1 transitions at minimum
levels_seen = {(old, new) for _, old, new in transitions}
assert (FidelityLevel.L0, FidelityLevel.L1) in levels_seen
def test_critical_degrades_aggressively(self):
fm = FidelityManager(window_size=1000)
# 3600 chars = 900 tokens → CRITICAL
_fill_manager(fm, 3, content_size=1200)
assert fm.current_pressure() == PressureZone.CRITICAL
transitions = fm.degrade(current_turn=10)
assert len(transitions) > 0
# Should see objects pushed to L3 or L4
final_levels = {new for _, _, new in transitions}
assert FidelityLevel.L3 in final_levels or FidelityLevel.L4 in final_levels
def test_emergency_evicts_all_unpinned(self):
fm = FidelityManager(window_size=1000)
# 3840 chars = 960 tokens → EMERGENCY
ids = _fill_manager(fm, 4, content_size=960)
assert fm.current_pressure() == PressureZone.EMERGENCY
fm.degrade(current_turn=10)
# All objects should end at L4
for obj_id in ids:
obj = fm.get_object(obj_id)
assert obj is not None
assert obj.current_fidelity == FidelityLevel.L4
def test_emergency_respects_pins(self):
fm = FidelityManager(window_size=1000)
ids = _fill_manager(fm, 4, content_size=960)
# Pin one object
fm.record_fault(ids[0], current_turn=5, pin_duration=20)
fm.degrade(current_turn=10)
# Pinned object should NOT be at L4
pinned_obj = fm.get_object(ids[0])
assert pinned_obj is not None
assert pinned_obj.current_fidelity < FidelityLevel.L4
# Others should be at L4
for oid in ids[1:]:
obj = fm.get_object(oid)
assert obj is not None
assert obj.current_fidelity == FidelityLevel.L4
def test_degrade_returns_transitions(self):
fm = FidelityManager(window_size=1000)
_fill_manager(fm, 3, content_size=800)
transitions = fm.degrade(current_turn=5)
for oid, old, new in transitions:
assert isinstance(oid, str)
assert isinstance(old, FidelityLevel)
assert isinstance(new, FidelityLevel)
assert new > old # Degradation means higher numeric level
def test_degrade_stops_when_pressure_relieved(self):
fm = FidelityManager(window_size=1000)
# Create objects with large L0 but small L1
for i in range(3):
obj = _make_obj(
"a" * 800, # 200 tokens at L0
summary_detailed="b" * 40, # 10 tokens at L1
turn=i,
)
fm.register_object(obj)
# 600 tokens → CAUTION
assert fm.current_pressure() == PressureZone.CAUTION
fm.degrade(current_turn=5)
# After degrading enough objects, pressure should drop
# Not all objects need to be degraded
assert fm.current_pressure() <= PressureZone.CAUTION
# ── Upgrade ───────────────────────────────────────────────────
class TestUpgrade:
def test_upgrade_l1_to_l0(self):
fm = FidelityManager()
obj = _make_obj("content" * 50)
oid = fm.register_object(obj)
obj.current_fidelity = FidelityLevel.L1
result = fm.upgrade(oid, FidelityLevel.L0, current_turn=5)
assert result is True
assert obj.current_fidelity == FidelityLevel.L0
def test_upgrade_updates_last_accessed(self):
fm = FidelityManager()
obj = _make_obj("content" * 50)
oid = fm.register_object(obj)
obj.current_fidelity = FidelityLevel.L2
fm.upgrade(oid, FidelityLevel.L0, current_turn=42)
assert obj.last_accessed_turn == 42
def test_upgrade_rejects_same_level(self):
fm = FidelityManager()
obj = _make_obj()
oid = fm.register_object(obj)
result = fm.upgrade(oid, FidelityLevel.L0, current_turn=5)
assert result is False # Already at L0
def test_upgrade_rejects_downgrade(self):
fm = FidelityManager()
obj = _make_obj()
oid = fm.register_object(obj)
result = fm.upgrade(oid, FidelityLevel.L1, current_turn=5)
assert result is False # L1 > L0, not an upgrade
def test_upgrade_nonexistent_object(self):
fm = FidelityManager()
result = fm.upgrade("nonexistent", FidelityLevel.L0, current_turn=5)
assert result is False
def test_upgrade_l3_to_l1(self):
fm = FidelityManager()
obj = _make_obj(summary_detailed="detailed summary content")
oid = fm.register_object(obj)
obj.current_fidelity = FidelityLevel.L3
result = fm.upgrade(oid, FidelityLevel.L1, current_turn=10)
assert result is True
assert obj.current_fidelity == FidelityLevel.L1
def test_upgrade_requires_content_at_target(self):
fm = FidelityManager()
obj = make_object(
object_type="file_context",
content_full="full content",
# No summary_detailed provided
)
oid = fm.register_object(obj)
obj.current_fidelity = FidelityLevel.L3
# Can upgrade to L0 (has content_full)
result = fm.upgrade(oid, FidelityLevel.L0, current_turn=5)
assert result is True
def test_upgrade_rejects_missing_l1_content(self):
fm = FidelityManager()
obj = SemanticObject(
id="test123",
object_type="file_context",
content_full="full",
summary_detailed=None, # No L1 content
token_count_l0=1,
)
fm.register_object(obj)
obj.current_fidelity = FidelityLevel.L3
result = fm.upgrade("test123", FidelityLevel.L1, current_turn=5)
assert result is False
# ── Fault-driven pinning ──────────────────────────────────────
class TestFaultPinning:
def test_record_fault_pins_object(self):
fm = FidelityManager()
obj = _make_obj()
oid = fm.register_object(obj)
fm.record_fault(oid, current_turn=10, pin_duration=5)
assert obj.pinned is True
assert obj.pin_until_turn == 15
assert obj.fault_count == 1
def test_record_fault_updates_access(self):
fm = FidelityManager()
obj = _make_obj()
oid = fm.register_object(obj)
fm.record_fault(oid, current_turn=10)
assert obj.last_accessed_turn == 10
def test_multiple_faults_increment_count(self):
fm = FidelityManager()
obj = _make_obj()
oid = fm.register_object(obj)
fm.record_fault(oid, current_turn=10)
fm.record_fault(oid, current_turn=12)
assert obj.fault_count == 2
def test_pin_prevents_degradation(self):
fm = FidelityManager(window_size=1000)
ids = _fill_manager(fm, 3, content_size=800)
# Pin the oldest object
fm.record_fault(ids[0], current_turn=5, pin_duration=20)
transitions = fm.degrade(current_turn=10)
# Pinned object should not appear in transitions
degraded_ids = {oid for oid, _, _ in transitions}
assert ids[0] not in degraded_ids
def test_pin_expires(self):
fm = FidelityManager(window_size=1000)
ids = _fill_manager(fm, 3, content_size=800)
fm.record_fault(ids[0], current_turn=5, pin_duration=3)
# Pin expires at turn 8
fm.degrade(current_turn=9)
# Now the object can be degraded
obj = fm.get_object(ids[0])
assert obj is not None
assert obj.pinned is False
def test_default_pin_duration(self):
fm = FidelityManager()
obj = _make_obj()
oid = fm.register_object(obj)
fm.record_fault(oid, current_turn=10)
assert obj.pin_until_turn == 15 # default duration = 5
def test_fault_nonexistent_object(self):
fm = FidelityManager()
# Should not raise
fm.record_fault("nonexistent", current_turn=10)
# ── mark_accessed ─────────────────────────────────────────────
class TestMarkAccessed:
def test_updates_last_accessed(self):
fm = FidelityManager()
obj = _make_obj(turn=0)
oid = fm.register_object(obj)
fm.mark_accessed(oid, current_turn=42)
assert obj.last_accessed_turn == 42
def test_nonexistent_object(self):
fm = FidelityManager()
# Should not raise
fm.mark_accessed("nonexistent", current_turn=10)
# ── eviction_candidates ──────────────────────────────────────
class TestEvictionCandidates:
def test_empty_manager(self):
fm = FidelityManager()
assert fm.eviction_candidates(current_turn=0) == []
def test_excludes_l4_objects(self):
fm = FidelityManager()
obj = _make_obj()
fm.register_object(obj)
obj.current_fidelity = FidelityLevel.L4
candidates = fm.eviction_candidates(current_turn=0)
assert len(candidates) == 0
def test_unpinned_before_pinned(self):
fm = FidelityManager()
obj_pinned = _make_obj(turn=0)
obj_free = _make_obj(turn=1)
id_pinned = fm.register_object(obj_pinned)
id_free = fm.register_object(obj_free)
fm.record_fault(id_pinned, current_turn=5, pin_duration=20)
candidates = fm.eviction_candidates(current_turn=6)
assert len(candidates) == 2
assert candidates[0].id == id_free # Unpinned first
def test_older_access_more_evictable(self):
fm = FidelityManager()
obj_old = _make_obj(turn=0)
obj_new = _make_obj(turn=0)
id_old = fm.register_object(obj_old)
id_new = fm.register_object(obj_new)
fm.mark_accessed(id_new, current_turn=10)
candidates = fm.eviction_candidates(current_turn=10)
assert candidates[0].id == id_old
def test_lower_fidelity_more_evictable(self):
fm = FidelityManager()
obj_l0 = _make_obj(turn=0)
obj_l3 = _make_obj(turn=0)
fm.register_object(obj_l0)
id_l3 = fm.register_object(obj_l3)
obj_l3.current_fidelity = FidelityLevel.L3
candidates = fm.eviction_candidates(current_turn=0)
# L3 (closer to eviction) should come first
assert candidates[0].id == id_l3
def test_expired_pins_are_evictable(self):
fm = FidelityManager()
obj = _make_obj(turn=0)
oid = fm.register_object(obj)
fm.record_fault(oid, current_turn=5, pin_duration=3)
# Pin expires at turn 8
candidates = fm.eviction_candidates(current_turn=9)
assert len(candidates) == 1
assert candidates[0].pinned is False
# ── objects_at_fidelity ───────────────────────────────────────
class TestObjectsAtFidelity:
def test_all_at_l0(self):
fm = FidelityManager()
_fill_manager(fm, 3)
assert len(fm.objects_at_fidelity(FidelityLevel.L0)) == 3
assert len(fm.objects_at_fidelity(FidelityLevel.L1)) == 0
def test_mixed_levels(self):
fm = FidelityManager()
ids = _fill_manager(fm, 3)
obj0 = fm.get_object(ids[0])
obj1 = fm.get_object(ids[1])
assert obj0 is not None
assert obj1 is not None
obj0.current_fidelity = FidelityLevel.L1
obj1.current_fidelity = FidelityLevel.L3
assert len(fm.objects_at_fidelity(FidelityLevel.L0)) == 1
assert len(fm.objects_at_fidelity(FidelityLevel.L1)) == 1
assert len(fm.objects_at_fidelity(FidelityLevel.L3)) == 1
# ── summary ───────────────────────────────────────────────────
class TestSummary:
def test_summary_structure(self):
fm = FidelityManager(window_size=10_000)
_fill_manager(fm, 2, content_size=400)
s = fm.summary()
assert "total_objects" in s
assert "total_tokens" in s
assert "window_size" in s
assert "pressure_zone" in s
assert "objects_by_level" in s
assert "pinned_count" in s
assert "total_faults" in s
def test_summary_values(self):
fm = FidelityManager(window_size=10_000)
ids = _fill_manager(fm, 3, content_size=400)
fm.record_fault(ids[0], current_turn=5)
s = fm.summary()
assert s["total_objects"] == 3
assert s["total_tokens"] == 300 # 3 * 100
assert s["window_size"] == 10_000
assert s["pressure_zone"] == "NORMAL"
assert s["objects_by_level"]["L0"] == 3
assert s["pinned_count"] == 1
assert s["total_faults"] == 1
# ── Integration: full lifecycle ───────────────────────────────
class TestIntegration:
def test_register_degrade_upgrade_cycle(self):
"""Full lifecycle: register → pressure → degrade → access → upgrade."""
fm = FidelityManager(window_size=500)
# Register objects that push into CAUTION
ids = []
for i in range(5):
obj = _make_obj(
"x" * 400, # 100 tokens each
summary_detailed="y" * 120, # 30 tokens
summary_compact="z" * 20, # 5 tokens
stub="stub text",
turn=i,
)
ids.append(fm.register_object(obj))
# 500 tokens in 500 window → 100% → EMERGENCY
assert fm.current_pressure() == PressureZone.EMERGENCY
# Degrade
transitions = fm.degrade(current_turn=10)
assert len(transitions) > 0
# All should be evicted (emergency)
for oid in ids:
obj = fm.get_object(oid)
assert obj is not None
assert obj.current_fidelity == FidelityLevel.L4
# Upgrade one back to L0
result = fm.upgrade(ids[0], FidelityLevel.L0, current_turn=11)
assert result is True
upgraded = fm.get_object(ids[0])
assert upgraded is not None
assert upgraded.current_fidelity == FidelityLevel.L0
def test_fault_pin_degrade_cycle(self):
"""Fault → pin → degrade respects pin → pin expires → degrade works."""
fm = FidelityManager(window_size=1000)
ids = _fill_manager(fm, 4, content_size=800)
# Record fault on first object
fm.record_fault(ids[0], current_turn=5, pin_duration=3)
# Degrade at turn 6 — pinned object survives
transitions = fm.degrade(current_turn=6)
degraded_ids = {oid for oid, _, _ in transitions}
assert ids[0] not in degraded_ids
# At turn 9, pin expired — now it can be degraded
obj = fm.get_object(ids[0])
assert obj is not None
# Reset to L0 for clean test
obj.current_fidelity = FidelityLevel.L0
# Re-fill to get pressure back up
_fill_manager(fm, 2, content_size=800, turn=9)
fm.degrade(current_turn=9)
# Now the previously-pinned object should be degradable
obj_after = fm.get_object(ids[0])
assert obj_after is not None
assert obj_after.pinned is False
def test_token_accounting_through_degradation(self):
"""Token count decreases as objects are degraded."""
fm = FidelityManager(window_size=1000)
for i in range(3):
obj = _make_obj(
"a" * 800, # 200 tokens at L0
summary_detailed="b" * 120, # 30 tokens at L1
turn=i,
)
fm.register_object(obj)
initial_tokens = fm.total_tokens()
assert initial_tokens == 600 # 3 * 200
fm.degrade(current_turn=10)
# Tokens should have decreased
assert fm.total_tokens() < initial_tokens
def test_multiple_degrade_passes(self):
"""Multiple degrade calls progressively reduce fidelity."""
fm = FidelityManager(window_size=200)
for i in range(3):
obj = _make_obj(
"a" * 400, # 100 tokens at L0
summary_detailed="b" * 120, # 30 tokens at L1
summary_compact="c" * 20, # 5 tokens at L2
stub="stub",
turn=i,
)
fm.register_object(obj)
# 300 tokens in 200 window → EMERGENCY
fm.degrade(current_turn=10)
# After emergency, all should be L4
for obj in fm._objects.values():
assert obj.current_fidelity == FidelityLevel.L4