Object-addressed memory: segment messages into semantic objects, embed with sentence-transformers, store in pgvector-backed store, and reassemble context via goal-aware retrieval. Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
996 lines
35 KiB
Python
996 lines
35 KiB
Python
"""Tests for the semantic object backing store."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
|
|
import pytest
|
|
|
|
from mnemosyne.object_store import (
|
|
DummyEmbedder,
|
|
InMemoryBackend,
|
|
ObjectStore,
|
|
ObjectStoreBackend,
|
|
StoredObject,
|
|
_cosine_similarity,
|
|
_estimate_tokens,
|
|
)
|
|
|
|
|
|
# ── Helpers ──────────────────────────────────────────────────
|
|
|
|
|
|
def _make_stored_object(
|
|
session_id: str = "sess-1",
|
|
content: str = "x" * 400,
|
|
*,
|
|
object_type: str = "file_context",
|
|
source_tool: str | None = "Read",
|
|
source_key: str | None = None,
|
|
stub: str | None = None,
|
|
embedding: list[float] | None = None,
|
|
object_id: str | None = None,
|
|
) -> StoredObject:
|
|
"""Create a StoredObject with sensible defaults for testing."""
|
|
return StoredObject(
|
|
id=object_id or f"obj-{id(content) % 100000:05d}",
|
|
session_id=session_id,
|
|
object_type=object_type,
|
|
source_tool=source_tool,
|
|
source_key=source_key,
|
|
content_full=content,
|
|
summary_detailed=None,
|
|
summary_compact=None,
|
|
stub=stub or f"{object_type}: test object",
|
|
tokens_l0=_estimate_tokens(content),
|
|
tokens_l3=_estimate_tokens(stub or f"{object_type}: test object"),
|
|
embedding=embedding or [],
|
|
created_at="2025-01-01T00:00:00+00:00",
|
|
last_accessed="2025-01-01T00:00:00+00:00",
|
|
)
|
|
|
|
|
|
# ── StoredObject dataclass ───────────────────────────────────
|
|
|
|
|
|
class TestStoredObject:
|
|
def test_required_fields(self):
|
|
obj = _make_stored_object()
|
|
assert obj.id
|
|
assert obj.session_id == "sess-1"
|
|
assert obj.object_type == "file_context"
|
|
assert obj.content_full == "x" * 400
|
|
assert obj.current_fidelity == 0
|
|
assert obj.pinned is False
|
|
assert obj.fault_count == 0
|
|
assert obj.micro_fault_count == 0
|
|
|
|
def test_default_lists_are_empty(self):
|
|
obj = _make_stored_object()
|
|
assert obj.losses_l1 == []
|
|
assert obj.losses_l2 == []
|
|
assert obj.can_answer_l1 == []
|
|
assert obj.can_answer_l2 == []
|
|
assert obj.fault_when == []
|
|
assert obj.key_entities == []
|
|
assert obj.tags == []
|
|
|
|
def test_content_at_levels(self):
|
|
obj = _make_stored_object(content="full content")
|
|
obj.summary_detailed = "detailed"
|
|
obj.summary_compact = "compact"
|
|
obj.stub = "stub line"
|
|
|
|
assert obj.content_at(0) == "full content"
|
|
assert obj.content_at(1) == "detailed"
|
|
assert obj.content_at(2) == "compact"
|
|
assert obj.content_at(3) == "stub line"
|
|
assert obj.content_at(4) is None
|
|
|
|
def test_tokens_at_levels(self):
|
|
obj = _make_stored_object(content="a" * 400)
|
|
obj.tokens_l0 = 100
|
|
obj.tokens_l1 = 30
|
|
obj.tokens_l2 = 5
|
|
obj.tokens_l3 = 3
|
|
|
|
assert obj.tokens_at(0) == 100
|
|
assert obj.tokens_at(1) == 30
|
|
assert obj.tokens_at(2) == 5
|
|
assert obj.tokens_at(3) == 3
|
|
assert obj.tokens_at(4) == 0
|
|
|
|
def test_tokens_at_none_levels(self):
|
|
obj = _make_stored_object()
|
|
obj.tokens_l1 = None
|
|
obj.tokens_l2 = None
|
|
assert obj.tokens_at(1) == 0
|
|
assert obj.tokens_at(2) == 0
|
|
|
|
def test_current_tokens_tracks_fidelity(self):
|
|
obj = _make_stored_object(content="a" * 400)
|
|
obj.tokens_l0 = 100
|
|
obj.tokens_l1 = 30
|
|
assert obj.current_tokens == 100 # L0
|
|
obj.current_fidelity = 1
|
|
assert obj.current_tokens == 30 # L1
|
|
|
|
def test_embedding_default_empty(self):
|
|
obj = _make_stored_object()
|
|
assert obj.embedding == []
|
|
|
|
|
|
# ── DummyEmbedder ────────────────────────────────────────────
|
|
|
|
|
|
class TestDummyEmbedder:
|
|
def test_embed_returns_384_dim(self):
|
|
emb = DummyEmbedder()
|
|
vec = emb.embed("hello world")
|
|
assert len(vec) == 384
|
|
|
|
def test_embed_deterministic(self):
|
|
emb = DummyEmbedder()
|
|
v1 = emb.embed("test input")
|
|
v2 = emb.embed("test input")
|
|
assert v1 == v2
|
|
|
|
def test_embed_different_inputs_differ(self):
|
|
emb = DummyEmbedder()
|
|
v1 = emb.embed("input A")
|
|
v2 = emb.embed("input B")
|
|
assert v1 != v2
|
|
|
|
def test_embed_is_normalized(self):
|
|
import numpy as np
|
|
|
|
emb = DummyEmbedder()
|
|
vec = emb.embed("normalize me")
|
|
norm = float(np.linalg.norm(vec))
|
|
assert abs(norm - 1.0) < 1e-6
|
|
|
|
def test_embed_batch(self):
|
|
emb = DummyEmbedder()
|
|
texts = ["alpha", "beta", "gamma"]
|
|
vecs = emb.embed_batch(texts)
|
|
assert len(vecs) == 3
|
|
assert all(len(v) == 384 for v in vecs)
|
|
|
|
def test_embed_batch_matches_single(self):
|
|
emb = DummyEmbedder()
|
|
texts = ["one", "two"]
|
|
batch = emb.embed_batch(texts)
|
|
singles = [emb.embed(t) for t in texts]
|
|
assert batch == singles
|
|
|
|
|
|
# ── Cosine similarity ────────────────────────────────────────
|
|
|
|
|
|
class TestCosineSimilarity:
|
|
def test_identical_vectors(self):
|
|
v = [1.0, 0.0, 0.0]
|
|
assert abs(_cosine_similarity(v, v) - 1.0) < 1e-9
|
|
|
|
def test_orthogonal_vectors(self):
|
|
a = [1.0, 0.0, 0.0]
|
|
b = [0.0, 1.0, 0.0]
|
|
assert abs(_cosine_similarity(a, b)) < 1e-9
|
|
|
|
def test_opposite_vectors(self):
|
|
a = [1.0, 0.0]
|
|
b = [-1.0, 0.0]
|
|
assert abs(_cosine_similarity(a, b) - (-1.0)) < 1e-9
|
|
|
|
def test_empty_vectors(self):
|
|
assert _cosine_similarity([], []) == 0.0
|
|
assert _cosine_similarity([1.0], []) == 0.0
|
|
|
|
def test_zero_vector(self):
|
|
assert _cosine_similarity([0.0, 0.0], [1.0, 0.0]) == 0.0
|
|
|
|
|
|
# ── InMemoryBackend CRUD ─────────────────────────────────────
|
|
|
|
|
|
class TestInMemoryBackendCRUD:
|
|
async def test_store_and_get(self):
|
|
backend = InMemoryBackend()
|
|
obj = _make_stored_object(object_id="obj-001")
|
|
await backend.store(obj)
|
|
|
|
result = await backend.get("obj-001")
|
|
assert result is obj
|
|
|
|
async def test_get_nonexistent(self):
|
|
backend = InMemoryBackend()
|
|
assert await backend.get("nonexistent") is None
|
|
|
|
async def test_store_overwrites(self):
|
|
backend = InMemoryBackend()
|
|
obj1 = _make_stored_object(object_id="obj-001", content="original")
|
|
obj2 = _make_stored_object(object_id="obj-001", content="updated")
|
|
|
|
await backend.store(obj1)
|
|
await backend.store(obj2)
|
|
|
|
result = await backend.get("obj-001")
|
|
assert result is not None
|
|
assert result.content_full == "updated"
|
|
|
|
async def test_get_by_session(self):
|
|
backend = InMemoryBackend()
|
|
obj1 = _make_stored_object(session_id="s1", object_id="o1")
|
|
obj2 = _make_stored_object(session_id="s1", object_id="o2")
|
|
obj3 = _make_stored_object(session_id="s2", object_id="o3")
|
|
|
|
await backend.store(obj1)
|
|
await backend.store(obj2)
|
|
await backend.store(obj3)
|
|
|
|
s1_objs = await backend.get_by_session("s1")
|
|
assert len(s1_objs) == 2
|
|
assert {o.id for o in s1_objs} == {"o1", "o2"}
|
|
|
|
async def test_get_by_session_filters_fidelity(self):
|
|
backend = InMemoryBackend()
|
|
obj1 = _make_stored_object(session_id="s1", object_id="o1")
|
|
obj2 = _make_stored_object(session_id="s1", object_id="o2")
|
|
obj2.current_fidelity = 4 # evicted
|
|
|
|
await backend.store(obj1)
|
|
await backend.store(obj2)
|
|
|
|
# Default: include evicted
|
|
all_objs = await backend.get_by_session("s1", fidelity_max=4)
|
|
assert len(all_objs) == 2
|
|
|
|
# Exclude evicted
|
|
active_objs = await backend.get_by_session("s1", fidelity_max=3)
|
|
assert len(active_objs) == 1
|
|
assert active_objs[0].id == "o1"
|
|
|
|
async def test_get_by_session_empty(self):
|
|
backend = InMemoryBackend()
|
|
assert await backend.get_by_session("nonexistent") == []
|
|
|
|
async def test_delete_session(self):
|
|
backend = InMemoryBackend()
|
|
obj1 = _make_stored_object(session_id="s1", object_id="o1")
|
|
obj2 = _make_stored_object(session_id="s1", object_id="o2")
|
|
obj3 = _make_stored_object(session_id="s2", object_id="o3")
|
|
|
|
await backend.store(obj1)
|
|
await backend.store(obj2)
|
|
await backend.store(obj3)
|
|
|
|
count = await backend.delete_session("s1")
|
|
assert count == 2
|
|
|
|
assert await backend.get("o1") is None
|
|
assert await backend.get("o2") is None
|
|
assert await backend.get("o3") is not None
|
|
|
|
async def test_delete_session_nonexistent(self):
|
|
backend = InMemoryBackend()
|
|
count = await backend.delete_session("nonexistent")
|
|
assert count == 0
|
|
|
|
|
|
# ── InMemoryBackend fidelity updates ─────────────────────────
|
|
|
|
|
|
class TestInMemoryBackendFidelity:
|
|
async def test_update_fidelity_basic(self):
|
|
backend = InMemoryBackend()
|
|
obj = _make_stored_object(object_id="o1")
|
|
await backend.store(obj)
|
|
|
|
await backend.update_fidelity("o1", 2)
|
|
result = await backend.get("o1")
|
|
assert result is not None
|
|
assert result.current_fidelity == 2
|
|
|
|
async def test_update_fidelity_with_summary_l1(self):
|
|
backend = InMemoryBackend()
|
|
obj = _make_stored_object(object_id="o1")
|
|
await backend.store(obj)
|
|
|
|
await backend.update_fidelity(
|
|
"o1", 1, summary="detailed summary", losses=["exact line numbers"]
|
|
)
|
|
result = await backend.get("o1")
|
|
assert result is not None
|
|
assert result.current_fidelity == 1
|
|
assert result.summary_detailed == "detailed summary"
|
|
assert result.losses_l1 == ["exact line numbers"]
|
|
assert result.tokens_l1 is not None
|
|
assert result.tokens_l1 > 0
|
|
|
|
async def test_update_fidelity_with_summary_l2(self):
|
|
backend = InMemoryBackend()
|
|
obj = _make_stored_object(object_id="o1")
|
|
await backend.store(obj)
|
|
|
|
await backend.update_fidelity(
|
|
"o1", 2, summary="compact summary", losses=["function bodies"]
|
|
)
|
|
result = await backend.get("o1")
|
|
assert result is not None
|
|
assert result.summary_compact == "compact summary"
|
|
assert result.losses_l2 == ["function bodies"]
|
|
|
|
async def test_update_fidelity_nonexistent(self):
|
|
backend = InMemoryBackend()
|
|
# Should not raise
|
|
await backend.update_fidelity("nonexistent", 1)
|
|
|
|
|
|
# ── InMemoryBackend source_key dedup ─────────────────────────
|
|
|
|
|
|
class TestInMemoryBackendSourceKey:
|
|
async def test_get_by_source_key(self):
|
|
backend = InMemoryBackend()
|
|
obj = _make_stored_object(session_id="s1", object_id="o1", source_key="src/auth.py")
|
|
await backend.store(obj)
|
|
|
|
result = await backend.get_by_source_key("s1", "src/auth.py")
|
|
assert result is not None
|
|
assert result.id == "o1"
|
|
|
|
async def test_get_by_source_key_not_found(self):
|
|
backend = InMemoryBackend()
|
|
assert await backend.get_by_source_key("s1", "nonexistent.py") is None
|
|
|
|
async def test_get_by_source_key_session_isolation(self):
|
|
backend = InMemoryBackend()
|
|
obj = _make_stored_object(session_id="s1", object_id="o1", source_key="src/auth.py")
|
|
await backend.store(obj)
|
|
|
|
# Different session should not find it
|
|
assert await backend.get_by_source_key("s2", "src/auth.py") is None
|
|
|
|
async def test_get_by_source_key_returns_latest(self):
|
|
backend = InMemoryBackend()
|
|
obj_old = _make_stored_object(session_id="s1", object_id="o1", source_key="src/auth.py")
|
|
obj_old.created_at = "2025-01-01T00:00:00+00:00"
|
|
|
|
obj_new = _make_stored_object(session_id="s1", object_id="o2", source_key="src/auth.py")
|
|
obj_new.created_at = "2025-01-02T00:00:00+00:00"
|
|
|
|
await backend.store(obj_old)
|
|
await backend.store(obj_new)
|
|
|
|
result = await backend.get_by_source_key("s1", "src/auth.py")
|
|
assert result is not None
|
|
assert result.id == "o2"
|
|
|
|
|
|
# ── InMemoryBackend embedding search ─────────────────────────
|
|
|
|
|
|
class TestInMemoryBackendEmbeddingSearch:
|
|
async def test_search_by_embedding_basic(self):
|
|
emb = DummyEmbedder()
|
|
backend = InMemoryBackend()
|
|
|
|
obj = _make_stored_object(
|
|
session_id="s1",
|
|
object_id="o1",
|
|
content="authentication middleware",
|
|
embedding=emb.embed("authentication middleware"),
|
|
)
|
|
await backend.store(obj)
|
|
|
|
query_vec = emb.embed("authentication middleware")
|
|
results = await backend.search_by_embedding("s1", query_vec, limit=5)
|
|
assert len(results) == 1
|
|
assert results[0][0].id == "o1"
|
|
# Same text → same embedding → similarity ≈ 1.0
|
|
assert results[0][1] > 0.99
|
|
|
|
async def test_search_by_embedding_ranking(self):
|
|
emb = DummyEmbedder()
|
|
backend = InMemoryBackend()
|
|
|
|
# Store objects with different content
|
|
for i, content in enumerate(["auth login", "database schema", "test runner"]):
|
|
obj = _make_stored_object(
|
|
session_id="s1",
|
|
object_id=f"o{i}",
|
|
content=content,
|
|
embedding=emb.embed(content),
|
|
)
|
|
await backend.store(obj)
|
|
|
|
# Search for exact match
|
|
query_vec = emb.embed("auth login")
|
|
results = await backend.search_by_embedding("s1", query_vec, limit=3)
|
|
assert len(results) == 3
|
|
# Exact match should be first with highest similarity
|
|
assert results[0][0].id == "o0"
|
|
assert results[0][1] > results[1][1]
|
|
|
|
async def test_search_by_embedding_session_isolation(self):
|
|
emb = DummyEmbedder()
|
|
backend = InMemoryBackend()
|
|
|
|
obj_s1 = _make_stored_object(
|
|
session_id="s1",
|
|
object_id="o1",
|
|
content="hello",
|
|
embedding=emb.embed("hello"),
|
|
)
|
|
obj_s2 = _make_stored_object(
|
|
session_id="s2",
|
|
object_id="o2",
|
|
content="hello",
|
|
embedding=emb.embed("hello"),
|
|
)
|
|
await backend.store(obj_s1)
|
|
await backend.store(obj_s2)
|
|
|
|
results = await backend.search_by_embedding("s1", emb.embed("hello"), limit=10)
|
|
assert len(results) == 1
|
|
assert results[0][0].id == "o1"
|
|
|
|
async def test_search_by_embedding_respects_limit(self):
|
|
emb = DummyEmbedder()
|
|
backend = InMemoryBackend()
|
|
|
|
for i in range(10):
|
|
obj = _make_stored_object(
|
|
session_id="s1",
|
|
object_id=f"o{i}",
|
|
content=f"content {i}",
|
|
embedding=emb.embed(f"content {i}"),
|
|
)
|
|
await backend.store(obj)
|
|
|
|
results = await backend.search_by_embedding("s1", emb.embed("content 0"), limit=3)
|
|
assert len(results) == 3
|
|
|
|
async def test_search_by_embedding_skips_no_embedding(self):
|
|
backend = InMemoryBackend()
|
|
obj = _make_stored_object(session_id="s1", object_id="o1", embedding=[])
|
|
await backend.store(obj)
|
|
|
|
results = await backend.search_by_embedding("s1", [0.1] * 384, limit=5)
|
|
assert len(results) == 0
|
|
|
|
|
|
# ── InMemoryBackend text search ──────────────────────────────
|
|
|
|
|
|
class TestInMemoryBackendTextSearch:
|
|
async def test_search_by_text_content(self):
|
|
backend = InMemoryBackend()
|
|
obj = _make_stored_object(
|
|
session_id="s1",
|
|
object_id="o1",
|
|
content="authentication middleware for JWT tokens",
|
|
)
|
|
await backend.store(obj)
|
|
|
|
results = await backend.search_by_text("s1", "JWT")
|
|
assert len(results) == 1
|
|
assert results[0].id == "o1"
|
|
|
|
async def test_search_by_text_case_insensitive(self):
|
|
backend = InMemoryBackend()
|
|
obj = _make_stored_object(session_id="s1", object_id="o1", content="Authentication")
|
|
await backend.store(obj)
|
|
|
|
results = await backend.search_by_text("s1", "authentication")
|
|
assert len(results) == 1
|
|
|
|
async def test_search_by_text_in_stub(self):
|
|
backend = InMemoryBackend()
|
|
obj = _make_stored_object(
|
|
session_id="s1",
|
|
object_id="o1",
|
|
content="some content",
|
|
stub="file_context: auth middleware",
|
|
)
|
|
await backend.store(obj)
|
|
|
|
results = await backend.search_by_text("s1", "auth middleware")
|
|
assert len(results) == 1
|
|
|
|
async def test_search_by_text_in_key_entities(self):
|
|
backend = InMemoryBackend()
|
|
obj = _make_stored_object(session_id="s1", object_id="o1", content="code")
|
|
obj.key_entities = ["AuthService", "JWTValidator"]
|
|
await backend.store(obj)
|
|
|
|
results = await backend.search_by_text("s1", "JWTValidator")
|
|
assert len(results) == 1
|
|
|
|
async def test_search_by_text_no_match(self):
|
|
backend = InMemoryBackend()
|
|
obj = _make_stored_object(session_id="s1", object_id="o1", content="hello")
|
|
await backend.store(obj)
|
|
|
|
results = await backend.search_by_text("s1", "nonexistent")
|
|
assert len(results) == 0
|
|
|
|
async def test_search_by_text_session_isolation(self):
|
|
backend = InMemoryBackend()
|
|
obj = _make_stored_object(session_id="s1", object_id="o1", content="shared keyword")
|
|
await backend.store(obj)
|
|
|
|
results = await backend.search_by_text("s2", "shared keyword")
|
|
assert len(results) == 0
|
|
|
|
async def test_search_by_text_respects_limit(self):
|
|
backend = InMemoryBackend()
|
|
for i in range(10):
|
|
obj = _make_stored_object(
|
|
session_id="s1",
|
|
object_id=f"o{i}",
|
|
content=f"common keyword item {i}",
|
|
)
|
|
await backend.store(obj)
|
|
|
|
results = await backend.search_by_text("s1", "common keyword", limit=3)
|
|
assert len(results) == 3
|
|
|
|
|
|
# ── Session isolation ────────────────────────────────────────
|
|
|
|
|
|
class TestSessionIsolation:
|
|
async def test_objects_isolated_by_session(self):
|
|
backend = InMemoryBackend()
|
|
obj_s1 = _make_stored_object(session_id="s1", object_id="o1")
|
|
obj_s2 = _make_stored_object(session_id="s2", object_id="o2")
|
|
|
|
await backend.store(obj_s1)
|
|
await backend.store(obj_s2)
|
|
|
|
s1_objs = await backend.get_by_session("s1")
|
|
s2_objs = await backend.get_by_session("s2")
|
|
|
|
assert len(s1_objs) == 1
|
|
assert s1_objs[0].id == "o1"
|
|
assert len(s2_objs) == 1
|
|
assert s2_objs[0].id == "o2"
|
|
|
|
async def test_delete_session_does_not_affect_other(self):
|
|
backend = InMemoryBackend()
|
|
obj_s1 = _make_stored_object(session_id="s1", object_id="o1")
|
|
obj_s2 = _make_stored_object(session_id="s2", object_id="o2")
|
|
|
|
await backend.store(obj_s1)
|
|
await backend.store(obj_s2)
|
|
|
|
await backend.delete_session("s1")
|
|
|
|
assert await backend.get("o1") is None
|
|
assert await backend.get("o2") is not None
|
|
assert len(await backend.get_by_session("s2")) == 1
|
|
|
|
async def test_source_key_scoped_to_session(self):
|
|
backend = InMemoryBackend()
|
|
obj_s1 = _make_stored_object(session_id="s1", object_id="o1", source_key="file.py")
|
|
obj_s2 = _make_stored_object(session_id="s2", object_id="o2", source_key="file.py")
|
|
|
|
await backend.store(obj_s1)
|
|
await backend.store(obj_s2)
|
|
|
|
result = await backend.get_by_source_key("s1", "file.py")
|
|
assert result is not None
|
|
assert result.id == "o1"
|
|
|
|
|
|
# ── ObjectStore facade ───────────────────────────────────────
|
|
|
|
|
|
class TestObjectStoreFacade:
|
|
async def test_store_object_creates_with_defaults(self):
|
|
store = ObjectStore(InMemoryBackend(), DummyEmbedder())
|
|
obj = await store.store_object(
|
|
session_id="s1",
|
|
content="def authenticate(token): ...",
|
|
object_type="file_context",
|
|
source_tool="Read",
|
|
)
|
|
|
|
assert obj.id
|
|
assert len(obj.id) == 16
|
|
assert obj.session_id == "s1"
|
|
assert obj.object_type == "file_context"
|
|
assert obj.content_full == "def authenticate(token): ..."
|
|
assert obj.source_tool == "Read"
|
|
assert obj.current_fidelity == 0
|
|
assert obj.created_at
|
|
assert obj.last_accessed
|
|
assert obj.tokens_l0 > 0
|
|
assert len(obj.embedding) == 384
|
|
|
|
async def test_store_object_auto_generates_stub(self):
|
|
store = ObjectStore(InMemoryBackend(), DummyEmbedder())
|
|
obj = await store.store_object(
|
|
session_id="s1",
|
|
content="some content here",
|
|
object_type="tool_result",
|
|
)
|
|
assert "tool_result:" in obj.stub
|
|
|
|
async def test_store_object_custom_stub(self):
|
|
store = ObjectStore(InMemoryBackend(), DummyEmbedder())
|
|
obj = await store.store_object(
|
|
session_id="s1",
|
|
content="content",
|
|
object_type="file_context",
|
|
stub="Read src/auth.py (150 lines)",
|
|
)
|
|
assert obj.stub == "Read src/auth.py (150 lines)"
|
|
|
|
async def test_store_object_with_tags_and_entities(self):
|
|
store = ObjectStore(InMemoryBackend(), DummyEmbedder())
|
|
obj = await store.store_object(
|
|
session_id="s1",
|
|
content="content",
|
|
object_type="file_context",
|
|
tags=["auth", "middleware"],
|
|
key_entities=["AuthService"],
|
|
)
|
|
assert obj.tags == ["auth", "middleware"]
|
|
assert obj.key_entities == ["AuthService"]
|
|
|
|
async def test_store_object_with_turn(self):
|
|
store = ObjectStore(InMemoryBackend(), DummyEmbedder())
|
|
obj = await store.store_object(
|
|
session_id="s1",
|
|
content="content",
|
|
object_type="file_context",
|
|
turn=7,
|
|
)
|
|
assert obj.source_turn_start == 7
|
|
assert obj.source_turn_end == 7
|
|
|
|
async def test_store_object_no_embedder(self):
|
|
store = ObjectStore(InMemoryBackend(), embedder=None)
|
|
obj = await store.store_object(
|
|
session_id="s1",
|
|
content="content",
|
|
object_type="file_context",
|
|
)
|
|
assert obj.embedding == []
|
|
|
|
async def test_get_retrieves_stored(self):
|
|
store = ObjectStore(InMemoryBackend(), DummyEmbedder())
|
|
obj = await store.store_object(
|
|
session_id="s1", content="content", object_type="file_context"
|
|
)
|
|
result = await store.get(obj.id)
|
|
assert result is obj
|
|
|
|
async def test_get_nonexistent(self):
|
|
store = ObjectStore(InMemoryBackend())
|
|
assert await store.get("nonexistent") is None
|
|
|
|
async def test_get_session_objects_excludes_evicted(self):
|
|
store = ObjectStore(InMemoryBackend(), DummyEmbedder())
|
|
obj1 = await store.store_object(
|
|
session_id="s1", content="active", object_type="file_context"
|
|
)
|
|
obj2 = await store.store_object(
|
|
session_id="s1", content="evicted", object_type="file_context"
|
|
)
|
|
await store.update_fidelity(obj2.id, 4)
|
|
|
|
active = await store.get_session_objects("s1")
|
|
assert len(active) == 1
|
|
assert active[0].id == obj1.id
|
|
|
|
async def test_get_session_objects_includes_evicted(self):
|
|
store = ObjectStore(InMemoryBackend(), DummyEmbedder())
|
|
await store.store_object(session_id="s1", content="active", object_type="file_context")
|
|
obj2 = await store.store_object(
|
|
session_id="s1", content="evicted", object_type="file_context"
|
|
)
|
|
await store.update_fidelity(obj2.id, 4)
|
|
|
|
all_objs = await store.get_session_objects("s1", include_evicted=True)
|
|
assert len(all_objs) == 2
|
|
|
|
async def test_update_fidelity(self):
|
|
store = ObjectStore(InMemoryBackend(), DummyEmbedder())
|
|
obj = await store.store_object(
|
|
session_id="s1", content="content", object_type="file_context"
|
|
)
|
|
await store.update_fidelity(obj.id, 1, summary="summary", losses=["details"])
|
|
|
|
result = await store.get(obj.id)
|
|
assert result is not None
|
|
assert result.current_fidelity == 1
|
|
assert result.summary_detailed == "summary"
|
|
assert result.losses_l1 == ["details"]
|
|
|
|
|
|
# ── ObjectStore semantic search ──────────────────────────────
|
|
|
|
|
|
class TestObjectStoreSemanticSearch:
|
|
async def test_semantic_search_finds_exact_match(self):
|
|
store = ObjectStore(InMemoryBackend(), DummyEmbedder())
|
|
await store.store_object(
|
|
session_id="s1",
|
|
content="authentication middleware for JWT",
|
|
object_type="file_context",
|
|
)
|
|
await store.store_object(
|
|
session_id="s1",
|
|
content="database migration script",
|
|
object_type="file_context",
|
|
)
|
|
|
|
results = await store.semantic_search("s1", "authentication middleware for JWT")
|
|
assert len(results) >= 1
|
|
# Exact content match should rank first
|
|
assert results[0][0].content_full == "authentication middleware for JWT"
|
|
|
|
async def test_semantic_search_hybrid_boost(self):
|
|
"""Text match should boost ranking via hybrid scoring."""
|
|
store = ObjectStore(InMemoryBackend(), DummyEmbedder())
|
|
await store.store_object(
|
|
session_id="s1",
|
|
content="the quick brown fox jumps over the lazy dog",
|
|
object_type="file_context",
|
|
)
|
|
await store.store_object(
|
|
session_id="s1",
|
|
content="unrelated content about databases",
|
|
object_type="file_context",
|
|
)
|
|
|
|
results = await store.semantic_search("s1", "quick brown fox")
|
|
assert len(results) >= 1
|
|
# Text match should help the fox content rank higher
|
|
assert "fox" in results[0][0].content_full
|
|
|
|
async def test_semantic_search_session_isolation(self):
|
|
store = ObjectStore(InMemoryBackend(), DummyEmbedder())
|
|
await store.store_object(
|
|
session_id="s1", content="session one content", object_type="file_context"
|
|
)
|
|
await store.store_object(
|
|
session_id="s2", content="session two content", object_type="file_context"
|
|
)
|
|
|
|
results = await store.semantic_search("s1", "session one content")
|
|
assert all(r[0].session_id == "s1" for r in results)
|
|
|
|
async def test_semantic_search_respects_limit(self):
|
|
store = ObjectStore(InMemoryBackend(), DummyEmbedder())
|
|
for i in range(10):
|
|
await store.store_object(
|
|
session_id="s1",
|
|
content=f"content item {i}",
|
|
object_type="file_context",
|
|
)
|
|
|
|
results = await store.semantic_search("s1", "content item", limit=3)
|
|
assert len(results) <= 3
|
|
|
|
async def test_semantic_search_no_embedder(self):
|
|
"""Without embedder, search falls back to text-only."""
|
|
store = ObjectStore(InMemoryBackend(), embedder=None)
|
|
await store.store_object(
|
|
session_id="s1",
|
|
content="findable keyword here",
|
|
object_type="file_context",
|
|
)
|
|
|
|
results = await store.semantic_search("s1", "findable keyword")
|
|
assert len(results) == 1
|
|
assert results[0][1] == 0.3 # text-only score
|
|
|
|
async def test_semantic_search_empty_session(self):
|
|
store = ObjectStore(InMemoryBackend(), DummyEmbedder())
|
|
results = await store.semantic_search("empty", "anything")
|
|
assert results == []
|
|
|
|
|
|
# ── ObjectStore deduplication ────────────────────────────────
|
|
|
|
|
|
class TestObjectStoreDedup:
|
|
async def test_find_duplicate_exists(self):
|
|
store = ObjectStore(InMemoryBackend(), DummyEmbedder())
|
|
await store.store_object(
|
|
session_id="s1",
|
|
content="file content",
|
|
object_type="file_context",
|
|
source_key="src/auth.py",
|
|
)
|
|
|
|
dup = await store.find_duplicate("s1", "src/auth.py")
|
|
assert dup is not None
|
|
assert dup.source_key == "src/auth.py"
|
|
|
|
async def test_find_duplicate_not_found(self):
|
|
store = ObjectStore(InMemoryBackend(), DummyEmbedder())
|
|
dup = await store.find_duplicate("s1", "nonexistent.py")
|
|
assert dup is None
|
|
|
|
async def test_find_duplicate_session_scoped(self):
|
|
store = ObjectStore(InMemoryBackend(), DummyEmbedder())
|
|
await store.store_object(
|
|
session_id="s1",
|
|
content="content",
|
|
object_type="file_context",
|
|
source_key="src/auth.py",
|
|
)
|
|
|
|
dup = await store.find_duplicate("s2", "src/auth.py")
|
|
assert dup is None
|
|
|
|
|
|
# ── ObjectStore access/fault tracking ────────────────────────
|
|
|
|
|
|
class TestObjectStoreTracking:
|
|
async def test_record_access(self):
|
|
store = ObjectStore(InMemoryBackend(), DummyEmbedder())
|
|
obj = await store.store_object(
|
|
session_id="s1", content="content", object_type="file_context"
|
|
)
|
|
original_accessed = obj.last_accessed
|
|
|
|
await store.record_access(obj.id)
|
|
assert obj.access_count == 1
|
|
assert obj.last_accessed >= original_accessed
|
|
|
|
async def test_record_access_increments(self):
|
|
store = ObjectStore(InMemoryBackend(), DummyEmbedder())
|
|
obj = await store.store_object(
|
|
session_id="s1", content="content", object_type="file_context"
|
|
)
|
|
|
|
await store.record_access(obj.id)
|
|
await store.record_access(obj.id)
|
|
await store.record_access(obj.id)
|
|
assert obj.access_count == 3
|
|
|
|
async def test_record_access_nonexistent(self):
|
|
store = ObjectStore(InMemoryBackend())
|
|
# Should not raise
|
|
await store.record_access("nonexistent")
|
|
|
|
async def test_record_fault(self):
|
|
store = ObjectStore(InMemoryBackend(), DummyEmbedder())
|
|
obj = await store.store_object(
|
|
session_id="s1", content="content", object_type="file_context"
|
|
)
|
|
|
|
await store.record_fault(obj.id)
|
|
assert obj.fault_count == 1
|
|
assert obj.micro_fault_count == 0
|
|
|
|
async def test_record_micro_fault(self):
|
|
store = ObjectStore(InMemoryBackend(), DummyEmbedder())
|
|
obj = await store.store_object(
|
|
session_id="s1", content="content", object_type="file_context"
|
|
)
|
|
|
|
await store.record_fault(obj.id, is_micro=True)
|
|
assert obj.fault_count == 0
|
|
assert obj.micro_fault_count == 1
|
|
|
|
async def test_record_fault_nonexistent(self):
|
|
store = ObjectStore(InMemoryBackend())
|
|
# Should not raise
|
|
await store.record_fault("nonexistent")
|
|
|
|
async def test_mixed_faults(self):
|
|
store = ObjectStore(InMemoryBackend(), DummyEmbedder())
|
|
obj = await store.store_object(
|
|
session_id="s1", content="content", object_type="file_context"
|
|
)
|
|
|
|
await store.record_fault(obj.id)
|
|
await store.record_fault(obj.id, is_micro=True)
|
|
await store.record_fault(obj.id)
|
|
await store.record_fault(obj.id, is_micro=True)
|
|
await store.record_fault(obj.id, is_micro=True)
|
|
|
|
assert obj.fault_count == 2
|
|
assert obj.micro_fault_count == 3
|
|
|
|
|
|
# ── Integration: full lifecycle ──────────────────────────────
|
|
|
|
|
|
class TestIntegration:
|
|
async def test_store_search_degrade_cycle(self):
|
|
"""Full lifecycle: store → search → degrade → search again."""
|
|
store = ObjectStore(InMemoryBackend(), DummyEmbedder())
|
|
|
|
# Store several objects
|
|
obj1 = await store.store_object(
|
|
session_id="s1",
|
|
content="authentication middleware handles JWT validation",
|
|
object_type="file_context",
|
|
source_tool="Read",
|
|
source_key="src/auth/middleware.ts",
|
|
)
|
|
obj2 = await store.store_object(
|
|
session_id="s1",
|
|
content="database migration adds users table with email column",
|
|
object_type="file_context",
|
|
source_tool="Read",
|
|
source_key="migrations/001_users.sql",
|
|
)
|
|
|
|
# Search finds relevant object
|
|
results = await store.semantic_search("s1", "JWT validation")
|
|
assert len(results) >= 1
|
|
|
|
# Degrade first object
|
|
await store.update_fidelity(
|
|
obj1.id,
|
|
1,
|
|
summary="Auth middleware: validates JWT tokens",
|
|
losses=["exact error handling code"],
|
|
)
|
|
degraded = await store.get(obj1.id)
|
|
assert degraded is not None
|
|
assert degraded.current_fidelity == 1
|
|
assert degraded.summary_detailed is not None
|
|
|
|
# Search still works after degradation
|
|
results2 = await store.semantic_search("s1", "JWT validation")
|
|
assert len(results2) >= 1
|
|
|
|
async def test_dedup_workflow(self):
|
|
"""Dedup: check for existing → store if new."""
|
|
store = ObjectStore(InMemoryBackend(), DummyEmbedder())
|
|
|
|
# First read
|
|
dup = await store.find_duplicate("s1", "src/auth.py")
|
|
assert dup is None
|
|
|
|
obj = await store.store_object(
|
|
session_id="s1",
|
|
content="original content",
|
|
object_type="file_context",
|
|
source_key="src/auth.py",
|
|
)
|
|
|
|
# Second read — duplicate found
|
|
dup = await store.find_duplicate("s1", "src/auth.py")
|
|
assert dup is not None
|
|
assert dup.id == obj.id
|
|
|
|
async def test_multi_session_lifecycle(self):
|
|
"""Multiple sessions operate independently."""
|
|
backend = InMemoryBackend()
|
|
store = ObjectStore(backend, DummyEmbedder())
|
|
|
|
await store.store_object(
|
|
session_id="s1", content="session 1 auth code", object_type="file_context"
|
|
)
|
|
await store.store_object(
|
|
session_id="s2", content="session 2 db code", object_type="file_context"
|
|
)
|
|
|
|
s1_objs = await store.get_session_objects("s1")
|
|
s2_objs = await store.get_session_objects("s2")
|
|
assert len(s1_objs) == 1
|
|
assert len(s2_objs) == 1
|
|
|
|
# Delete session 1
|
|
count = await backend.delete_session("s1")
|
|
assert count == 1
|
|
|
|
# Session 2 unaffected
|
|
s2_objs = await store.get_session_objects("s2")
|
|
assert len(s2_objs) == 1
|
|
|
|
# Session 1 empty
|
|
s1_objs = await store.get_session_objects("s1")
|
|
assert len(s1_objs) == 0
|