Admission control, entropy-based micro-faulting, phantom tool injection for backing store queries, and xMemory session hierarchy for long conversations (50+ turns). Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
893 lines
31 KiB
Python
893 lines
31 KiB
Python
"""Tests for the hierarchical segmentation system (Strategy B).
|
|
|
|
Covers:
|
|
- Episode creation and clustering
|
|
- SemanticTheme creation
|
|
- Theme creation
|
|
- SessionHierarchy: add_object, rebuild, retrieve, maintenance
|
|
- Edge cases: empty, single object, identical embeddings, no embeddings
|
|
- Vector math utilities
|
|
- Incremental vs full rebuild consistency
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
from mnemosyne.hierarchy import (
|
|
EPISODE_RETRIEVAL_THRESHOLD,
|
|
EPISODE_SIMILARITY_THRESHOLD,
|
|
Episode,
|
|
RECLUSTER_INTERVAL,
|
|
SEMANTIC_SIMILARITY_THRESHOLD,
|
|
SemanticTheme,
|
|
SessionHierarchy,
|
|
THEME_RETRIEVAL_THRESHOLD,
|
|
THEME_SIMILARITY_THRESHOLD,
|
|
Theme,
|
|
_agglomerative_cluster,
|
|
_centroid,
|
|
_cosine_similarity,
|
|
_cosine_similarity_matrix,
|
|
_generate_episode_summary,
|
|
_generate_theme_label,
|
|
_make_episode,
|
|
_make_semantic_theme,
|
|
_make_theme,
|
|
)
|
|
from mnemosyne.object_store import DummyEmbedder, StoredObject, _estimate_tokens
|
|
|
|
|
|
# ── Helpers ──────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _make_stored_object(
|
|
content: str = "test content",
|
|
*,
|
|
session_id: str = "sess-1",
|
|
object_type: str = "file_context",
|
|
source_tool: str | None = "Read",
|
|
source_key: str | None = None,
|
|
stub: str | None = None,
|
|
embedding: list[float] | None = None,
|
|
object_id: str | None = None,
|
|
turn: int | None = None,
|
|
) -> StoredObject:
|
|
"""Create a StoredObject with sensible defaults for testing."""
|
|
return StoredObject(
|
|
id=object_id or f"obj-{hash(content) % 100000:05d}",
|
|
session_id=session_id,
|
|
object_type=object_type,
|
|
source_tool=source_tool,
|
|
source_key=source_key,
|
|
content_full=content,
|
|
summary_detailed=None,
|
|
summary_compact=None,
|
|
stub=stub or f"{object_type}: {content[:40]}",
|
|
tokens_l0=_estimate_tokens(content),
|
|
tokens_l3=_estimate_tokens(stub or f"{object_type}: test"),
|
|
embedding=embedding or [],
|
|
created_at="2025-01-01T00:00:00+00:00",
|
|
last_accessed="2025-01-01T00:00:00+00:00",
|
|
source_turn_start=turn,
|
|
source_turn_end=turn,
|
|
)
|
|
|
|
|
|
def _make_unit_vector(dim: int = 384, seed: int = 42) -> list[float]:
|
|
"""Create a deterministic unit vector."""
|
|
rng = np.random.default_rng(seed)
|
|
vec = rng.standard_normal(dim)
|
|
vec = vec / np.linalg.norm(vec)
|
|
return vec.tolist()
|
|
|
|
|
|
def _make_similar_vector(base: list[float], noise: float = 0.05, seed: int = 99) -> list[float]:
|
|
"""Create a vector similar to `base` by adding small noise."""
|
|
rng = np.random.default_rng(seed)
|
|
arr = np.asarray(base, dtype=np.float64)
|
|
perturbation = rng.standard_normal(len(base)) * noise
|
|
result = arr + perturbation
|
|
result = result / np.linalg.norm(result)
|
|
return result.tolist()
|
|
|
|
|
|
def _make_orthogonal_vector(base: list[float], seed: int = 77) -> list[float]:
|
|
"""Create a vector roughly orthogonal to `base`."""
|
|
rng = np.random.default_rng(seed)
|
|
random_vec = rng.standard_normal(len(base))
|
|
arr = np.asarray(base, dtype=np.float64)
|
|
# Gram-Schmidt: subtract projection onto base
|
|
proj = np.dot(random_vec, arr) / np.dot(arr, arr) * arr
|
|
ortho = random_vec - proj
|
|
norm = np.linalg.norm(ortho)
|
|
if norm > 0:
|
|
ortho = ortho / norm
|
|
return ortho.tolist()
|
|
|
|
|
|
# ── Vector math tests ────────────────────────────────────────────────────
|
|
|
|
|
|
class TestCosineSimlarity:
|
|
def test_identical_vectors(self):
|
|
v = _make_unit_vector(seed=1)
|
|
assert _cosine_similarity(v, v) == pytest.approx(1.0, abs=1e-6)
|
|
|
|
def test_orthogonal_vectors(self):
|
|
v1 = _make_unit_vector(seed=1)
|
|
v2 = _make_orthogonal_vector(v1, seed=2)
|
|
assert abs(_cosine_similarity(v1, v2)) < 0.05
|
|
|
|
def test_opposite_vectors(self):
|
|
v = _make_unit_vector(seed=1)
|
|
neg_v = [-x for x in v]
|
|
assert _cosine_similarity(v, neg_v) == pytest.approx(-1.0, abs=1e-6)
|
|
|
|
def test_empty_vectors(self):
|
|
assert _cosine_similarity([], []) == 0.0
|
|
assert _cosine_similarity([1.0, 0.0], []) == 0.0
|
|
assert _cosine_similarity([], [1.0, 0.0]) == 0.0
|
|
|
|
def test_zero_vector(self):
|
|
v = _make_unit_vector(seed=1)
|
|
zero = [0.0] * len(v)
|
|
assert _cosine_similarity(v, zero) == 0.0
|
|
|
|
def test_similar_vectors_high_similarity(self):
|
|
v1 = _make_unit_vector(seed=1)
|
|
v2 = _make_similar_vector(v1, noise=0.01, seed=2)
|
|
sim = _cosine_similarity(v1, v2)
|
|
assert sim > 0.9
|
|
|
|
|
|
class TestCosineSimlarityMatrix:
|
|
def test_single_vector(self):
|
|
v = _make_unit_vector(seed=1)
|
|
mat = _cosine_similarity_matrix(np.array([v]))
|
|
assert mat.shape == (1, 1)
|
|
assert mat[0, 0] == pytest.approx(1.0, abs=1e-6)
|
|
|
|
def test_identity_diagonal(self):
|
|
vecs = [_make_unit_vector(seed=i) for i in range(5)]
|
|
mat = _cosine_similarity_matrix(np.array(vecs))
|
|
for i in range(5):
|
|
assert mat[i, i] == pytest.approx(1.0, abs=1e-6)
|
|
|
|
def test_symmetry(self):
|
|
vecs = [_make_unit_vector(seed=i) for i in range(3)]
|
|
mat = _cosine_similarity_matrix(np.array(vecs))
|
|
for i in range(3):
|
|
for j in range(3):
|
|
assert mat[i, j] == pytest.approx(mat[j, i], abs=1e-10)
|
|
|
|
def test_empty(self):
|
|
mat = _cosine_similarity_matrix(np.empty((0, 384)))
|
|
assert mat.shape == (0, 0)
|
|
|
|
|
|
class TestCentroid:
|
|
def test_single_vector(self):
|
|
v = _make_unit_vector(seed=1)
|
|
c = _centroid([v])
|
|
# Centroid of a single unit vector is itself
|
|
assert _cosine_similarity(v, c) == pytest.approx(1.0, abs=1e-6)
|
|
|
|
def test_identical_vectors(self):
|
|
v = _make_unit_vector(seed=1)
|
|
c = _centroid([v, v, v])
|
|
assert _cosine_similarity(v, c) == pytest.approx(1.0, abs=1e-6)
|
|
|
|
def test_empty(self):
|
|
assert _centroid([]) == []
|
|
|
|
def test_centroid_is_normalized(self):
|
|
vecs = [_make_unit_vector(seed=i) for i in range(5)]
|
|
c = _centroid(vecs)
|
|
norm = float(np.linalg.norm(c))
|
|
assert norm == pytest.approx(1.0, abs=1e-6)
|
|
|
|
|
|
# ── Agglomerative clustering tests ──────────────────────────────────────
|
|
|
|
|
|
class TestAgglomerativeClustering:
|
|
def test_empty(self):
|
|
assert _agglomerative_cluster([], 0.5) == []
|
|
|
|
def test_single_item(self):
|
|
v = _make_unit_vector(seed=1)
|
|
clusters = _agglomerative_cluster([v], 0.5)
|
|
assert len(clusters) == 1
|
|
assert clusters[0] == [0]
|
|
|
|
def test_identical_items_merge(self):
|
|
v = _make_unit_vector(seed=1)
|
|
clusters = _agglomerative_cluster([v, v, v], 0.5)
|
|
assert len(clusters) == 1
|
|
assert sorted(clusters[0]) == [0, 1, 2]
|
|
|
|
def test_dissimilar_items_separate(self):
|
|
v1 = _make_unit_vector(seed=1)
|
|
v2 = _make_orthogonal_vector(v1, seed=2)
|
|
clusters = _agglomerative_cluster([v1, v2], 0.5)
|
|
assert len(clusters) == 2
|
|
|
|
def test_similar_items_merge(self):
|
|
v1 = _make_unit_vector(seed=1)
|
|
v2 = _make_similar_vector(v1, noise=0.02, seed=2)
|
|
clusters = _agglomerative_cluster([v1, v2], 0.5)
|
|
assert len(clusters) == 1
|
|
|
|
def test_mixed_clusters(self):
|
|
"""Two groups of similar vectors should form two clusters."""
|
|
v1 = _make_unit_vector(seed=1)
|
|
v1b = _make_similar_vector(v1, noise=0.02, seed=10)
|
|
v2 = _make_orthogonal_vector(v1, seed=2)
|
|
v2b = _make_similar_vector(v2, noise=0.02, seed=20)
|
|
clusters = _agglomerative_cluster([v1, v1b, v2, v2b], 0.5)
|
|
assert len(clusters) == 2
|
|
|
|
def test_threshold_boundary(self):
|
|
"""Items with high similarity should merge at a reasonable threshold."""
|
|
v = _make_unit_vector(seed=1)
|
|
# Identical vectors have similarity ~1.0 (floating point)
|
|
clusters = _agglomerative_cluster([v, v], 0.99)
|
|
assert len(clusters) == 1
|
|
|
|
|
|
# ── Episode dataclass tests ─────────────────────────────────────────────
|
|
|
|
|
|
class TestEpisode:
|
|
def test_creation(self):
|
|
ep = Episode(id="ep-1")
|
|
assert ep.id == "ep-1"
|
|
assert ep.objects == []
|
|
assert ep.embedding == []
|
|
assert ep.summary == ""
|
|
assert ep.turn_range == (0, 0)
|
|
assert ep.object_types == set()
|
|
|
|
def test_hash_and_equality(self):
|
|
ep1 = Episode(id="ep-1")
|
|
ep2 = Episode(id="ep-1")
|
|
ep3 = Episode(id="ep-2")
|
|
assert ep1 == ep2
|
|
assert ep1 != ep3
|
|
assert hash(ep1) == hash(ep2)
|
|
|
|
def test_make_episode(self):
|
|
embedder = DummyEmbedder()
|
|
obj1 = _make_stored_object(
|
|
"content A", object_id="a", turn=5, embedding=embedder.embed("content A")
|
|
)
|
|
obj2 = _make_stored_object(
|
|
"content B", object_id="b", turn=8, embedding=embedder.embed("content B")
|
|
)
|
|
ep = _make_episode([obj1, obj2])
|
|
assert len(ep.objects) == 2
|
|
assert ep.turn_range == (5, 8)
|
|
assert "file_context" in ep.object_types
|
|
assert ep.embedding # Should have a centroid
|
|
assert ep.summary # Should have auto-generated summary
|
|
|
|
|
|
class TestSemanticTheme:
|
|
def test_creation(self):
|
|
st = SemanticTheme(id="st-1")
|
|
assert st.id == "st-1"
|
|
assert st.episodes == []
|
|
assert st.embedding == []
|
|
assert st.label == ""
|
|
|
|
def test_hash_and_equality(self):
|
|
st1 = SemanticTheme(id="st-1")
|
|
st2 = SemanticTheme(id="st-1")
|
|
st3 = SemanticTheme(id="st-2")
|
|
assert st1 == st2
|
|
assert st1 != st3
|
|
|
|
|
|
class TestTheme:
|
|
def test_creation(self):
|
|
t = Theme(id="t-1")
|
|
assert t.id == "t-1"
|
|
assert t.semantic_themes == []
|
|
|
|
def test_hash_and_equality(self):
|
|
t1 = Theme(id="t-1")
|
|
t2 = Theme(id="t-1")
|
|
t3 = Theme(id="t-2")
|
|
assert t1 == t2
|
|
assert t1 != t3
|
|
|
|
|
|
# ── Summary generation tests ────────────────────────────────────────────
|
|
|
|
|
|
class TestSummaryGeneration:
|
|
def test_episode_summary_from_stubs(self):
|
|
obj = _make_stored_object("hello world", stub="file_context: hello")
|
|
summary = _generate_episode_summary([obj])
|
|
assert "file_context: hello" in summary
|
|
|
|
def test_episode_summary_empty(self):
|
|
summary = _generate_episode_summary([])
|
|
assert "empty" in summary.lower()
|
|
|
|
def test_episode_summary_many_objects(self):
|
|
objs = [
|
|
_make_stored_object(f"content {i}", object_id=f"o{i}", stub=f"stub {i}")
|
|
for i in range(10)
|
|
]
|
|
summary = _generate_episode_summary(objs)
|
|
assert "+5 more" in summary
|
|
|
|
def test_theme_label_from_episodes(self):
|
|
ep = Episode(id="ep-1", object_types={"file_context", "tool_result"})
|
|
label = _generate_theme_label([ep])
|
|
assert "file_context" in label or "tool_result" in label
|
|
assert "1 episodes" in label
|
|
|
|
def test_theme_label_empty(self):
|
|
label = _generate_theme_label([])
|
|
assert "empty" in label.lower()
|
|
|
|
|
|
# ── SessionHierarchy tests ──────────────────────────────────────────────
|
|
|
|
|
|
class TestSessionHierarchyInit:
|
|
def test_init(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
assert h.enabled is True
|
|
assert h.object_count == 0
|
|
assert h.episode_count == 0
|
|
assert h.theme_count == 0
|
|
|
|
def test_disable(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
h.enabled = False
|
|
assert h.enabled is False
|
|
|
|
def test_summary_empty(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
s = h.summary()
|
|
assert s["objects"] == 0
|
|
assert s["episodes"] == 0
|
|
assert s["themes"] == 0
|
|
|
|
|
|
class TestSessionHierarchyAddObject:
|
|
def test_add_single_object(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
obj = _make_stored_object(
|
|
"test content", object_id="o1", embedding=embedder.embed("test content")
|
|
)
|
|
ep = h.add_object(obj)
|
|
assert ep is not None
|
|
assert h.object_count == 1
|
|
assert h.episode_count == 1
|
|
|
|
def test_add_duplicate_ignored(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
obj = _make_stored_object("test", object_id="o1", embedding=embedder.embed("test"))
|
|
h.add_object(obj)
|
|
result = h.add_object(obj)
|
|
assert result is None
|
|
assert h.object_count == 1
|
|
|
|
def test_add_similar_objects_same_episode(self):
|
|
"""Objects with very similar embeddings should join the same episode."""
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
|
|
# Use the same text to get identical embeddings
|
|
emb = embedder.embed("shared topic content")
|
|
obj1 = _make_stored_object("shared topic content A", object_id="o1", embedding=emb)
|
|
obj2 = _make_stored_object(
|
|
"shared topic content B", object_id="o2", embedding=emb
|
|
) # Same embedding
|
|
|
|
ep1 = h.add_object(obj1)
|
|
ep2 = h.add_object(obj2)
|
|
assert ep1 is not None
|
|
assert ep2 is not None
|
|
assert ep1.id == ep2.id # Same episode
|
|
assert h.episode_count == 1
|
|
|
|
def test_add_dissimilar_objects_different_episodes(self):
|
|
"""Objects with very different embeddings should go to different episodes."""
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
|
|
# DummyEmbedder produces different vectors for different text
|
|
obj1 = _make_stored_object(
|
|
"python programming language",
|
|
object_id="o1",
|
|
embedding=embedder.embed("python programming language"),
|
|
)
|
|
obj2 = _make_stored_object(
|
|
"cooking recipes for dinner",
|
|
object_id="o2",
|
|
embedding=embedder.embed("cooking recipes for dinner"),
|
|
)
|
|
|
|
h.add_object(obj1)
|
|
h.add_object(obj2)
|
|
# DummyEmbedder is hash-based, so different texts → different vectors
|
|
# They should be in different episodes (similarity < 0.6)
|
|
assert h.episode_count == 2
|
|
|
|
def test_add_object_without_embedding(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
obj = _make_stored_object("no embedding", object_id="o1", embedding=[])
|
|
ep = h.add_object(obj)
|
|
assert ep is not None
|
|
assert h.episode_count == 1
|
|
|
|
def test_add_object_disabled(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
h.enabled = False
|
|
obj = _make_stored_object("test", object_id="o1", embedding=embedder.embed("test"))
|
|
result = h.add_object(obj)
|
|
assert result is None
|
|
assert h.object_count == 0
|
|
|
|
def test_add_updates_episode_metadata(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
emb = embedder.embed("shared content")
|
|
obj1 = _make_stored_object(
|
|
"shared content A", object_id="o1", object_type="file_context", embedding=emb, turn=5
|
|
)
|
|
obj2 = _make_stored_object(
|
|
"shared content B", object_id="o2", object_type="tool_result", embedding=emb, turn=10
|
|
)
|
|
h.add_object(obj1)
|
|
ep = h.add_object(obj2)
|
|
assert ep is not None
|
|
assert "file_context" in ep.object_types
|
|
assert "tool_result" in ep.object_types
|
|
assert ep.turn_range == (5, 10)
|
|
|
|
|
|
class TestSessionHierarchyRebuild:
|
|
def test_rebuild_empty(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
h.rebuild([])
|
|
assert h.object_count == 0
|
|
assert h.episode_count == 0
|
|
|
|
def test_rebuild_single_object(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
obj = _make_stored_object("test", object_id="o1", embedding=embedder.embed("test"))
|
|
h.rebuild([obj])
|
|
assert h.object_count == 1
|
|
assert h.episode_count == 1
|
|
assert len(h.get_themes()) >= 1
|
|
assert len(h.get_top_themes()) >= 1
|
|
|
|
def test_rebuild_clusters_similar_objects(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
emb = embedder.embed("shared topic")
|
|
objects = [
|
|
_make_stored_object(f"shared topic {i}", object_id=f"o{i}", embedding=emb)
|
|
for i in range(5)
|
|
]
|
|
h.rebuild(objects)
|
|
assert h.object_count == 5
|
|
# All identical embeddings → 1 episode
|
|
assert h.episode_count == 1
|
|
|
|
def test_rebuild_separates_dissimilar_objects(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
objects = [
|
|
_make_stored_object(
|
|
f"unique topic number {i} with distinct content",
|
|
object_id=f"o{i}",
|
|
embedding=embedder.embed(f"unique topic number {i} with distinct content"),
|
|
)
|
|
for i in range(10)
|
|
]
|
|
h.rebuild(objects)
|
|
assert h.object_count == 10
|
|
# DummyEmbedder with different texts → different vectors → multiple episodes
|
|
assert h.episode_count > 1
|
|
|
|
def test_rebuild_handles_mixed_embeddings(self):
|
|
"""Objects with and without embeddings should both be included."""
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
obj_with = _make_stored_object(
|
|
"with embedding", object_id="o1", embedding=embedder.embed("with embedding")
|
|
)
|
|
obj_without = _make_stored_object("no embedding", object_id="o2", embedding=[])
|
|
h.rebuild([obj_with, obj_without])
|
|
assert h.object_count == 2
|
|
assert h.episode_count == 2 # Each in its own episode
|
|
|
|
def test_rebuild_clears_previous_state(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
obj1 = _make_stored_object("first", object_id="o1", embedding=embedder.embed("first"))
|
|
h.rebuild([obj1])
|
|
assert h.object_count == 1
|
|
|
|
obj2 = _make_stored_object("second", object_id="o2", embedding=embedder.embed("second"))
|
|
h.rebuild([obj2])
|
|
assert h.object_count == 1 # Only the new object
|
|
assert h.episode_count == 1
|
|
|
|
def test_rebuild_creates_all_levels(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
objects = [
|
|
_make_stored_object(
|
|
f"content {i}", object_id=f"o{i}", embedding=embedder.embed(f"content {i}")
|
|
)
|
|
for i in range(20)
|
|
]
|
|
h.rebuild(objects)
|
|
assert h.object_count == 20
|
|
assert h.episode_count > 0
|
|
assert len(h.get_themes()) > 0
|
|
assert len(h.get_top_themes()) > 0
|
|
|
|
|
|
class TestSessionHierarchyRetrieve:
|
|
def test_retrieve_empty_hierarchy(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
query = embedder.embed("test query")
|
|
results = h.retrieve(query, limit=5)
|
|
assert results == []
|
|
|
|
def test_retrieve_returns_relevant_objects(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
emb = embedder.embed("python programming")
|
|
obj = _make_stored_object("python programming guide", object_id="o1", embedding=emb)
|
|
h.rebuild([obj])
|
|
results = h.retrieve(emb, limit=5)
|
|
assert len(results) == 1
|
|
assert results[0].id == "o1"
|
|
|
|
def test_retrieve_respects_limit(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
emb = embedder.embed("shared")
|
|
objects = [
|
|
_make_stored_object(f"shared content {i}", object_id=f"o{i}", embedding=emb)
|
|
for i in range(20)
|
|
]
|
|
h.rebuild(objects)
|
|
results = h.retrieve(emb, limit=5)
|
|
assert len(results) <= 5
|
|
|
|
def test_retrieve_empty_query(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
obj = _make_stored_object("test", object_id="o1", embedding=embedder.embed("test"))
|
|
h.rebuild([obj])
|
|
results = h.retrieve([], limit=5)
|
|
assert results == []
|
|
|
|
def test_retrieve_disabled_falls_back_to_flat(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
emb = embedder.embed("test")
|
|
obj = _make_stored_object("test content", object_id="o1", embedding=emb)
|
|
h.rebuild([obj])
|
|
h.enabled = False
|
|
results = h.retrieve(emb, limit=5)
|
|
# Falls back to flat search
|
|
assert len(results) == 1
|
|
|
|
def test_retrieve_sorted_by_similarity(self):
|
|
"""Results should be sorted by similarity to query (highest first)."""
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
|
|
query_emb = embedder.embed("target query")
|
|
# Create objects with varying similarity to query
|
|
obj_close = _make_stored_object(
|
|
"target query exact", object_id="close", embedding=query_emb
|
|
)
|
|
obj_far = _make_stored_object(
|
|
"completely unrelated xyz",
|
|
object_id="far",
|
|
embedding=embedder.embed("completely unrelated xyz"),
|
|
)
|
|
|
|
h.rebuild([obj_close, obj_far])
|
|
results = h.retrieve(query_emb, limit=10)
|
|
assert len(results) >= 1
|
|
# The close object should be first
|
|
assert results[0].id == "close"
|
|
|
|
def test_retrieve_no_duplicates(self):
|
|
"""Retrieve should not return duplicate objects."""
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
emb = embedder.embed("shared")
|
|
objects = [
|
|
_make_stored_object(f"shared {i}", object_id=f"o{i}", embedding=emb) for i in range(5)
|
|
]
|
|
h.rebuild(objects)
|
|
results = h.retrieve(emb, limit=10)
|
|
ids = [r.id for r in results]
|
|
assert len(ids) == len(set(ids))
|
|
|
|
def test_flat_search_fallback(self):
|
|
"""When hierarchy has no themes, should fall back to flat search."""
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
emb = embedder.embed("test")
|
|
obj = _make_stored_object("test", object_id="o1", embedding=emb)
|
|
# Add object but clear themes to force fallback
|
|
h._all_objects = [obj]
|
|
h._object_ids = {obj.id}
|
|
h._themes = []
|
|
results = h.retrieve(emb, limit=5)
|
|
assert len(results) == 1
|
|
|
|
|
|
class TestSessionHierarchyMaintenance:
|
|
def test_maintenance_at_interval(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
emb = embedder.embed("test")
|
|
obj = _make_stored_object("test", object_id="o1", embedding=emb)
|
|
h.add_object(obj)
|
|
|
|
# Turn 20 should trigger rebuild
|
|
rebuilt = h.maintenance(turn=RECLUSTER_INTERVAL)
|
|
assert rebuilt is True
|
|
|
|
def test_maintenance_not_at_interval(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
emb = embedder.embed("test")
|
|
obj = _make_stored_object("test", object_id="o1", embedding=emb)
|
|
h.add_object(obj)
|
|
|
|
rebuilt = h.maintenance(turn=15)
|
|
assert rebuilt is False
|
|
|
|
def test_maintenance_on_goal_change(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
emb = embedder.embed("test")
|
|
obj = _make_stored_object("test", object_id="o1", embedding=emb)
|
|
h.add_object(obj)
|
|
|
|
rebuilt = h.maintenance(turn=5, goal_hash="goal-1")
|
|
assert rebuilt is True # First goal → always rebuild
|
|
|
|
rebuilt = h.maintenance(turn=6, goal_hash="goal-1")
|
|
assert rebuilt is False # Same goal, not at interval
|
|
|
|
rebuilt = h.maintenance(turn=7, goal_hash="goal-2")
|
|
assert rebuilt is True # Goal changed
|
|
|
|
def test_maintenance_disabled(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
h.enabled = False
|
|
rebuilt = h.maintenance(turn=RECLUSTER_INTERVAL)
|
|
assert rebuilt is False
|
|
|
|
def test_maintenance_no_double_rebuild(self):
|
|
"""Same turn should not trigger rebuild twice."""
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
emb = embedder.embed("test")
|
|
obj = _make_stored_object("test", object_id="o1", embedding=emb)
|
|
h.add_object(obj)
|
|
|
|
rebuilt1 = h.maintenance(turn=RECLUSTER_INTERVAL)
|
|
assert rebuilt1 is True
|
|
rebuilt2 = h.maintenance(turn=RECLUSTER_INTERVAL)
|
|
assert rebuilt2 is False
|
|
|
|
def test_maintenance_at_multiple_intervals(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
emb = embedder.embed("test")
|
|
obj = _make_stored_object("test", object_id="o1", embedding=emb)
|
|
h.add_object(obj)
|
|
|
|
assert h.maintenance(turn=20) is True
|
|
assert h.maintenance(turn=40) is True
|
|
assert h.maintenance(turn=60) is True
|
|
|
|
def test_maintenance_turn_zero_no_rebuild(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
rebuilt = h.maintenance(turn=0)
|
|
assert rebuilt is False
|
|
|
|
|
|
class TestSessionHierarchyGetters:
|
|
def test_get_episodes(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
emb = embedder.embed("test")
|
|
obj = _make_stored_object("test", object_id="o1", embedding=emb)
|
|
h.add_object(obj)
|
|
episodes = h.get_episodes()
|
|
assert len(episodes) == 1
|
|
assert episodes[0].objects[0].id == "o1"
|
|
|
|
def test_get_themes(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
emb = embedder.embed("test")
|
|
obj = _make_stored_object("test", object_id="o1", embedding=emb)
|
|
h.add_object(obj)
|
|
themes = h.get_themes()
|
|
assert len(themes) >= 1
|
|
|
|
def test_get_top_themes(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
emb = embedder.embed("test")
|
|
obj = _make_stored_object("test", object_id="o1", embedding=emb)
|
|
h.add_object(obj)
|
|
top_themes = h.get_top_themes()
|
|
assert len(top_themes) >= 1
|
|
|
|
|
|
class TestSessionHierarchySummary:
|
|
def test_summary_after_rebuild(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
objects = [
|
|
_make_stored_object(
|
|
f"content {i}", object_id=f"o{i}", embedding=embedder.embed(f"content {i}")
|
|
)
|
|
for i in range(10)
|
|
]
|
|
h.rebuild(objects)
|
|
s = h.summary()
|
|
assert s["objects"] == 10
|
|
assert s["episodes"] > 0
|
|
assert s["themes"] > 0
|
|
assert s["enabled"] is True
|
|
assert isinstance(s["avg_episode_size"], float)
|
|
|
|
|
|
# ── Edge case tests ─────────────────────────────────────────────────────
|
|
|
|
|
|
class TestEdgeCases:
|
|
def test_all_identical_embeddings(self):
|
|
"""All objects with identical embeddings → single episode."""
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
emb = embedder.embed("identical")
|
|
objects = [
|
|
_make_stored_object(f"identical {i}", object_id=f"o{i}", embedding=emb)
|
|
for i in range(10)
|
|
]
|
|
h.rebuild(objects)
|
|
assert h.episode_count == 1
|
|
assert h.object_count == 10
|
|
|
|
def test_all_objects_no_embeddings(self):
|
|
"""Objects without embeddings each get their own episode."""
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
objects = [
|
|
_make_stored_object(f"no emb {i}", object_id=f"o{i}", embedding=[]) for i in range(5)
|
|
]
|
|
h.rebuild(objects)
|
|
assert h.episode_count == 5
|
|
|
|
def test_large_number_of_objects(self):
|
|
"""Hierarchy should handle 100+ objects without error."""
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
objects = [
|
|
_make_stored_object(
|
|
f"object number {i} with unique content",
|
|
object_id=f"o{i}",
|
|
embedding=embedder.embed(f"object number {i} with unique content"),
|
|
)
|
|
for i in range(100)
|
|
]
|
|
h.rebuild(objects)
|
|
assert h.object_count == 100
|
|
assert h.episode_count > 0
|
|
assert len(h.get_top_themes()) > 0
|
|
|
|
def test_retrieve_with_many_objects(self):
|
|
"""Retrieval should work efficiently with many objects."""
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
objects = [
|
|
_make_stored_object(
|
|
f"content {i}", object_id=f"o{i}", embedding=embedder.embed(f"content {i}")
|
|
)
|
|
for i in range(50)
|
|
]
|
|
h.rebuild(objects)
|
|
query = embedder.embed("content 0")
|
|
results = h.retrieve(query, limit=5)
|
|
assert len(results) <= 5
|
|
# The exact match should be in results
|
|
result_ids = [r.id for r in results]
|
|
assert "o0" in result_ids
|
|
|
|
def test_incremental_then_rebuild_consistency(self):
|
|
"""Incremental adds followed by rebuild should produce valid hierarchy."""
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
|
|
objects = []
|
|
for i in range(10):
|
|
obj = _make_stored_object(
|
|
f"content {i}", object_id=f"o{i}", embedding=embedder.embed(f"content {i}")
|
|
)
|
|
objects.append(obj)
|
|
h.add_object(obj)
|
|
|
|
# Now rebuild from scratch
|
|
h.rebuild(objects)
|
|
assert h.object_count == 10
|
|
assert h.episode_count > 0
|
|
|
|
def test_rebuild_with_single_object_no_crash(self):
|
|
embedder = DummyEmbedder()
|
|
h = SessionHierarchy(embedder)
|
|
obj = _make_stored_object("solo", object_id="o1", embedding=embedder.embed("solo"))
|
|
h.rebuild([obj])
|
|
results = h.retrieve(embedder.embed("solo"), limit=5)
|
|
assert len(results) == 1
|
|
|
|
def test_episode_not_equal_to_non_episode(self):
|
|
ep = Episode(id="ep-1")
|
|
assert ep != "not an episode"
|
|
|
|
def test_semantic_theme_not_equal_to_non_theme(self):
|
|
st = SemanticTheme(id="st-1")
|
|
assert st != 42
|
|
|
|
def test_theme_not_equal_to_non_theme(self):
|
|
t = Theme(id="t-1")
|
|
assert t != []
|
|
|
|
|
|
# ── Constants tests ─────────────────────────────────────────────────────
|
|
|
|
|
|
class TestConstants:
|
|
def test_episode_threshold(self):
|
|
assert EPISODE_SIMILARITY_THRESHOLD == 0.6
|
|
|
|
def test_semantic_threshold(self):
|
|
assert SEMANTIC_SIMILARITY_THRESHOLD == 0.4
|
|
|
|
def test_theme_threshold(self):
|
|
assert THEME_SIMILARITY_THRESHOLD == 0.25
|
|
|
|
def test_retrieval_thresholds(self):
|
|
assert THEME_RETRIEVAL_THRESHOLD == 0.3
|
|
assert EPISODE_RETRIEVAL_THRESHOLD == 0.4
|
|
|
|
def test_recluster_interval(self):
|
|
assert RECLUSTER_INTERVAL == 20
|