Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
643 lines
24 KiB
Python
643 lines
24 KiB
Python
"""Integration tests for semantic object pipeline wiring in the gateway.
|
|
|
|
Tests the Session ↔ Segmenter ↔ ObjectStore ↔ ContextAssembler integration:
|
|
- Session.__init__ creates segmenter, object_store, _segmented_objects
|
|
- _preprocess segments messages and stores objects in ObjectStore
|
|
- memory_query phantom tool resolves through ContextAssembler
|
|
- Graceful degradation when embedder is unavailable (DummyEmbedder)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import copy
|
|
from pathlib import Path
|
|
from tempfile import TemporaryDirectory
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from mnemosyne.context_assembler import ContextAssembler, MicroFaultResult
|
|
from mnemosyne.gateway import Session, _run_async
|
|
from mnemosyne.object_store import DummyEmbedder, InMemoryBackend, ObjectStore
|
|
from mnemosyne.phantom import PhantomCall, _handle_phantom_call, inject_phantom_results
|
|
from mnemosyne.segmenter import SegmentedObject, Segmenter
|
|
|
|
|
|
# ── Fixtures ─────────────────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.fixture
|
|
def tmp_log_dir():
|
|
with TemporaryDirectory() as d:
|
|
yield Path(d)
|
|
|
|
|
|
@pytest.fixture
|
|
def session(tmp_log_dir):
|
|
return Session("integ01", tmp_log_dir)
|
|
|
|
|
|
def _user_msg(text: str) -> dict:
|
|
return {"role": "user", "content": text}
|
|
|
|
|
|
def _assistant_msg(text: str) -> dict:
|
|
return {"role": "assistant", "content": text}
|
|
|
|
|
|
def _tool_use_block(tool_id: str, name: str, input_data: dict) -> dict:
|
|
return {"type": "tool_use", "id": tool_id, "name": name, "input": input_data}
|
|
|
|
|
|
def _tool_result_block(tool_id: str, content: str) -> dict:
|
|
return {"type": "tool_result", "tool_use_id": tool_id, "content": content}
|
|
|
|
|
|
# ── Session initialization ───────────────────────────────────────────────
|
|
|
|
|
|
class TestSessionSemanticAttributes:
|
|
"""Session.__init__ must create segmenter, object_store, and related attrs."""
|
|
|
|
def test_session_has_segmenter(self, session):
|
|
assert hasattr(session, "segmenter")
|
|
assert isinstance(session.segmenter, Segmenter)
|
|
|
|
def test_session_has_object_store(self, session):
|
|
assert hasattr(session, "object_store")
|
|
assert isinstance(session.object_store, ObjectStore)
|
|
|
|
def test_session_has_segmented_objects_list(self, session):
|
|
assert hasattr(session, "_segmented_objects")
|
|
assert isinstance(session._segmented_objects, list)
|
|
assert len(session._segmented_objects) == 0
|
|
|
|
def test_session_has_context_assembler_slot(self, session):
|
|
assert hasattr(session, "context_assembler")
|
|
# Initially None — set by create_app after helper_llm is resolved
|
|
assert session.context_assembler is None
|
|
|
|
def test_object_store_uses_in_memory_backend(self, session):
|
|
assert isinstance(session.object_store._backend, InMemoryBackend)
|
|
|
|
def test_object_store_has_embedder(self, session):
|
|
"""ObjectStore should have an embedder (DummyEmbedder fallback if no sentence-transformers)."""
|
|
assert session.object_store._embedder is not None
|
|
|
|
def test_segmenter_per_session(self, tmp_log_dir):
|
|
s1 = Session("sess_a", tmp_log_dir)
|
|
s2 = Session("sess_b", tmp_log_dir)
|
|
assert s1.segmenter is not s2.segmenter
|
|
|
|
def test_object_store_per_session(self, tmp_log_dir):
|
|
s1 = Session("sess_a", tmp_log_dir)
|
|
s2 = Session("sess_b", tmp_log_dir)
|
|
assert s1.object_store is not s2.object_store
|
|
|
|
|
|
# ── Segmenter integration ───────────────────────────────────────────────
|
|
|
|
|
|
class TestSegmenterIntegration:
|
|
"""Test that the segmenter correctly processes messages into objects."""
|
|
|
|
def test_segment_simple_conversation(self, session):
|
|
messages = [
|
|
_user_msg("Please read the auth module"),
|
|
_assistant_msg(
|
|
"I'll read the auth module for you. The authentication system uses "
|
|
"JWT tokens with RS256 signing. The main entry point is auth.py which "
|
|
"handles token validation, refresh, and revocation."
|
|
),
|
|
]
|
|
result = session.segmenter.segment_incremental(
|
|
messages, session._segmented_objects, start_turn=0
|
|
)
|
|
assert len(result) > 0
|
|
assert all(isinstance(obj, SegmentedObject) for obj in result)
|
|
|
|
def test_segment_incremental_extends(self, session):
|
|
msgs1 = [
|
|
_user_msg("What is the project structure?"),
|
|
_assistant_msg("The project has src/, tests/, and docs/ directories."),
|
|
]
|
|
result1 = session.segmenter.segment_incremental(
|
|
msgs1, session._segmented_objects, start_turn=0
|
|
)
|
|
session._segmented_objects = result1
|
|
|
|
msgs2 = msgs1 + [
|
|
_user_msg("Tell me about the src/ directory"),
|
|
_assistant_msg("The src/ directory contains the main application code."),
|
|
]
|
|
result2 = session.segmenter.segment_incremental(
|
|
msgs2, session._segmented_objects, start_turn=0
|
|
)
|
|
assert len(result2) >= len(result1)
|
|
|
|
def test_segment_with_tool_results(self, session):
|
|
messages = [
|
|
_user_msg("Read the config file"),
|
|
{
|
|
"role": "assistant",
|
|
"content": [
|
|
{"type": "text", "text": "I'll read the config file."},
|
|
_tool_use_block("tu_1", "Read", {"file_path": "/app/config.py"}),
|
|
],
|
|
},
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
_tool_result_block(
|
|
"tu_1",
|
|
"# config.py\nDATABASE_URL = 'postgres://localhost/mydb'\nDEBUG = True",
|
|
),
|
|
],
|
|
},
|
|
]
|
|
result = session.segmenter.segment_incremental(
|
|
messages, session._segmented_objects, start_turn=0
|
|
)
|
|
assert len(result) > 0
|
|
# At least one object should be file_context or tool_result
|
|
types = {obj.object_type for obj in result}
|
|
assert types & {"file_context", "tool_result", "conversation_phase"}
|
|
|
|
|
|
# ── ObjectStore integration ──────────────────────────────────────────────
|
|
|
|
|
|
class TestObjectStoreIntegration:
|
|
"""Test that segmented objects are stored in ObjectStore via _run_async."""
|
|
|
|
def test_store_segmented_object(self, session):
|
|
"""Manually store a segmented object and verify it's in the store."""
|
|
seg_obj = SegmentedObject(
|
|
content="The auth module uses JWT tokens for authentication.",
|
|
object_type="file_context",
|
|
source_tool="Read",
|
|
source_key="/app/auth.py",
|
|
stub="[file_context: auth.py — JWT authentication]",
|
|
turn_start=1,
|
|
turn_end=1,
|
|
token_estimate=12,
|
|
key_entities=["auth.py", "JWT"],
|
|
tags=["auth"],
|
|
)
|
|
|
|
stored = _run_async(
|
|
session.object_store.store_object(
|
|
session_id=session.id,
|
|
content=seg_obj.content,
|
|
object_type=seg_obj.object_type,
|
|
source_tool=seg_obj.source_tool,
|
|
source_key=seg_obj.source_key,
|
|
stub=seg_obj.stub,
|
|
tags=seg_obj.tags,
|
|
key_entities=seg_obj.key_entities,
|
|
turn=seg_obj.turn_start,
|
|
)
|
|
)
|
|
|
|
assert stored is not None
|
|
assert stored.session_id == session.id
|
|
assert stored.object_type == "file_context"
|
|
assert stored.content_full == seg_obj.content
|
|
assert stored.source_tool == "Read"
|
|
assert stored.source_key == "/app/auth.py"
|
|
assert len(stored.embedding) == 384 # DummyEmbedder produces 384-dim
|
|
|
|
def test_store_multiple_objects_and_search(self, session):
|
|
"""Store multiple objects and verify semantic search works."""
|
|
contents = [
|
|
("The database uses PostgreSQL with connection pooling.", "file_context"),
|
|
("Authentication is handled by JWT tokens with RS256.", "file_context"),
|
|
("The CI pipeline runs pytest and mypy on every PR.", "conversation_phase"),
|
|
]
|
|
|
|
for content, obj_type in contents:
|
|
_run_async(
|
|
session.object_store.store_object(
|
|
session_id=session.id,
|
|
content=content,
|
|
object_type=obj_type,
|
|
)
|
|
)
|
|
|
|
# Search for auth-related content
|
|
results = _run_async(
|
|
session.object_store.semantic_search(session.id, "authentication JWT", limit=3)
|
|
)
|
|
assert len(results) > 0
|
|
# The auth-related object should be in results
|
|
found_auth = any("JWT" in obj.content_full for obj, _score in results)
|
|
assert found_auth
|
|
|
|
def test_run_async_from_sync_context(self):
|
|
"""_run_async should work from a plain sync context."""
|
|
|
|
async def simple_coro():
|
|
return 42
|
|
|
|
result = _run_async(simple_coro())
|
|
assert result == 42
|
|
|
|
|
|
# ── memory_query phantom tool resolution ─────────────────────────────────
|
|
|
|
|
|
class TestMemoryQueryResolution:
|
|
"""Test that memory_query resolves through ContextAssembler."""
|
|
|
|
def test_memory_query_with_context_assembler(self, session):
|
|
"""memory_query should resolve through ContextAssembler when available."""
|
|
# Store some content first
|
|
_run_async(
|
|
session.object_store.store_object(
|
|
session_id=session.id,
|
|
content="The server runs on port 8080 with TLS enabled.",
|
|
object_type="file_context",
|
|
source_tool="Read",
|
|
source_key="/app/server.py",
|
|
tags=["server", "config"],
|
|
key_entities=["port 8080", "TLS"],
|
|
)
|
|
)
|
|
|
|
# Create ContextAssembler without HelperLLM (fallback mode)
|
|
assembler = ContextAssembler(session.object_store, helper_llm=None)
|
|
|
|
call = PhantomCall(
|
|
name="memory_query",
|
|
tool_use_id="toolu_mq_001",
|
|
input={"question": "What port does the server run on?"},
|
|
)
|
|
|
|
result = _handle_phantom_call(
|
|
call,
|
|
page_store=None,
|
|
context_assembler=assembler,
|
|
session_id=session.id,
|
|
)
|
|
|
|
# Should NOT contain the pending placeholder
|
|
assert "[memory_query:pending]" not in result
|
|
# Should contain the resolved answer
|
|
assert "[memory_query resolved:" in result
|
|
|
|
def test_memory_query_with_scope(self, session):
|
|
"""memory_query with scope should narrow the search."""
|
|
_run_async(
|
|
session.object_store.store_object(
|
|
session_id=session.id,
|
|
content="Database config: host=localhost, port=5432, name=mydb",
|
|
object_type="file_context",
|
|
source_key="/app/db.py",
|
|
tags=["database"],
|
|
)
|
|
)
|
|
_run_async(
|
|
session.object_store.store_object(
|
|
session_id=session.id,
|
|
content="Server config: host=0.0.0.0, port=8080",
|
|
object_type="file_context",
|
|
source_key="/app/server.py",
|
|
tags=["server"],
|
|
)
|
|
)
|
|
|
|
assembler = ContextAssembler(session.object_store, helper_llm=None)
|
|
|
|
call = PhantomCall(
|
|
name="memory_query",
|
|
tool_use_id="toolu_mq_002",
|
|
input={
|
|
"question": "What is the database port?",
|
|
"scope": "database config",
|
|
},
|
|
)
|
|
|
|
result = _handle_phantom_call(
|
|
call,
|
|
page_store=None,
|
|
context_assembler=assembler,
|
|
session_id=session.id,
|
|
)
|
|
|
|
assert "[memory_query:pending]" not in result
|
|
assert "[memory_query resolved:" in result
|
|
|
|
def test_memory_query_fallback_without_assembler(self):
|
|
"""memory_query without context_assembler should return pending placeholder."""
|
|
call = PhantomCall(
|
|
name="memory_query",
|
|
tool_use_id="toolu_mq_003",
|
|
input={"question": "What auth library is used?"},
|
|
)
|
|
|
|
result = _handle_phantom_call(call, page_store=None)
|
|
assert "[memory_query:pending]" in result
|
|
assert "What auth library is used?" in result
|
|
|
|
def test_memory_query_no_results(self, session):
|
|
"""memory_query with no matching content should still resolve gracefully."""
|
|
assembler = ContextAssembler(session.object_store, helper_llm=None)
|
|
|
|
call = PhantomCall(
|
|
name="memory_query",
|
|
tool_use_id="toolu_mq_004",
|
|
input={"question": "What is the meaning of life?"},
|
|
)
|
|
|
|
result = _handle_phantom_call(
|
|
call,
|
|
page_store=None,
|
|
context_assembler=assembler,
|
|
session_id=session.id,
|
|
)
|
|
|
|
# Should resolve (not pending) even with no results
|
|
assert "[memory_query:pending]" not in result
|
|
assert "[memory_query resolved:" in result
|
|
|
|
def test_memory_query_with_mock_helper_llm(self, session):
|
|
"""memory_query with a mocked HelperLLM should return the LLM's answer."""
|
|
_run_async(
|
|
session.object_store.store_object(
|
|
session_id=session.id,
|
|
content="The application uses bcrypt for password hashing with cost factor 12.",
|
|
object_type="file_context",
|
|
source_key="/app/auth.py",
|
|
key_entities=["bcrypt", "password hashing"],
|
|
)
|
|
)
|
|
|
|
# Mock HelperLLM
|
|
mock_helper = MagicMock()
|
|
mock_helper.answer_micro_fault = AsyncMock(
|
|
return_value="The application uses bcrypt with cost factor 12."
|
|
)
|
|
|
|
assembler = ContextAssembler(session.object_store, helper_llm=mock_helper)
|
|
|
|
call = PhantomCall(
|
|
name="memory_query",
|
|
tool_use_id="toolu_mq_005",
|
|
input={"question": "What password hashing is used?"},
|
|
)
|
|
|
|
result = _handle_phantom_call(
|
|
call,
|
|
page_store=None,
|
|
context_assembler=assembler,
|
|
session_id=session.id,
|
|
)
|
|
|
|
assert "[memory_query:pending]" not in result
|
|
assert "bcrypt" in result
|
|
assert "[memory_query resolved:" in result
|
|
mock_helper.answer_micro_fault.assert_called_once()
|
|
|
|
def test_inject_phantom_results_passes_assembler(self, session):
|
|
"""inject_phantom_results should pass context_assembler to handler."""
|
|
_run_async(
|
|
session.object_store.store_object(
|
|
session_id=session.id,
|
|
content="The API rate limit is 100 requests per minute.",
|
|
object_type="conversation_phase",
|
|
key_entities=["rate limit", "100 rpm"],
|
|
)
|
|
)
|
|
|
|
assembler = ContextAssembler(session.object_store, helper_llm=None)
|
|
|
|
phantom_calls = [
|
|
PhantomCall(
|
|
name="memory_query",
|
|
tool_use_id="toolu_inject_001",
|
|
input={"question": "What is the API rate limit?"},
|
|
)
|
|
]
|
|
|
|
messages = [
|
|
_user_msg("Hello"),
|
|
{
|
|
"role": "assistant",
|
|
"content": [
|
|
{"type": "text", "text": "Let me check."},
|
|
],
|
|
},
|
|
]
|
|
|
|
result_messages = inject_phantom_results(
|
|
messages,
|
|
phantom_calls,
|
|
page_store=None,
|
|
observe_only=False,
|
|
context_assembler=assembler,
|
|
session_id=session.id,
|
|
)
|
|
|
|
# The result should have injected a tool_result
|
|
# Find the user message with tool_result
|
|
found_result = False
|
|
for msg in result_messages:
|
|
content = msg.get("content", [])
|
|
if isinstance(content, list):
|
|
for block in content:
|
|
if isinstance(block, dict) and block.get("type") == "tool_result":
|
|
result_text = block.get("content", "")
|
|
if "[memory_query resolved:" in result_text:
|
|
found_result = True
|
|
assert found_result, "Expected resolved memory_query in injected results"
|
|
|
|
|
|
# ── DummyEmbedder fallback ───────────────────────────────────────────────
|
|
|
|
|
|
class TestDummyEmbedderFallback:
|
|
"""Test graceful degradation when real embedder is unavailable."""
|
|
|
|
def test_dummy_embedder_produces_384_dim(self):
|
|
embedder = DummyEmbedder()
|
|
vec = embedder.embed("test text")
|
|
assert len(vec) == 384
|
|
|
|
def test_dummy_embedder_deterministic(self):
|
|
embedder = DummyEmbedder()
|
|
v1 = embedder.embed("same text")
|
|
v2 = embedder.embed("same text")
|
|
assert v1 == v2
|
|
|
|
def test_dummy_embedder_different_for_different_text(self):
|
|
embedder = DummyEmbedder()
|
|
v1 = embedder.embed("text one")
|
|
v2 = embedder.embed("text two")
|
|
assert v1 != v2
|
|
|
|
def test_object_store_works_with_dummy_embedder(self):
|
|
"""ObjectStore should function correctly with DummyEmbedder."""
|
|
backend = InMemoryBackend()
|
|
embedder = DummyEmbedder()
|
|
store = ObjectStore(backend, embedder=embedder)
|
|
|
|
stored = _run_async(
|
|
store.store_object(
|
|
session_id="test_session",
|
|
content="Test content for embedding",
|
|
object_type="conversation_phase",
|
|
)
|
|
)
|
|
|
|
assert len(stored.embedding) == 384
|
|
|
|
# Search should work
|
|
results = _run_async(store.semantic_search("test_session", "test content", limit=5))
|
|
assert len(results) == 1
|
|
assert results[0][0].id == stored.id
|
|
|
|
@patch("mnemosyne.embedder.try_get_embedder", return_value=None)
|
|
def test_session_falls_back_to_dummy_embedder(self, mock_try, tmp_log_dir):
|
|
"""When try_get_embedder returns None, Session should use DummyEmbedder."""
|
|
s = Session("fallback_test", tmp_log_dir)
|
|
assert s.object_store._embedder is not None
|
|
assert isinstance(s.object_store._embedder, DummyEmbedder)
|
|
|
|
|
|
# ── End-to-end segmentation + storage ────────────────────────────────────
|
|
|
|
|
|
class TestEndToEndSegmentationStorage:
|
|
"""Test the full pipeline: messages → segmenter → object_store."""
|
|
|
|
def test_segment_and_store_conversation(self, session):
|
|
"""Simulate what _preprocess does: segment messages and store objects."""
|
|
messages = [
|
|
_user_msg("Can you read the database configuration?"),
|
|
_assistant_msg(
|
|
"I'll read the database configuration file. It contains "
|
|
"the connection string, pool size, and timeout settings. "
|
|
"The database is PostgreSQL running on port 5432 with "
|
|
"a connection pool of 20 connections."
|
|
),
|
|
_user_msg("What about the authentication setup?"),
|
|
_assistant_msg(
|
|
"The authentication system uses OAuth 2.0 with JWT tokens. "
|
|
"Tokens are signed with RS256 and have a 1-hour expiry. "
|
|
"Refresh tokens are stored in the database with a 30-day TTL."
|
|
),
|
|
]
|
|
|
|
segmented = session.segmenter.segment_incremental(
|
|
messages, session._segmented_objects, start_turn=0
|
|
)
|
|
|
|
new_count = len(segmented) - len(session._segmented_objects)
|
|
assert new_count > 0
|
|
|
|
new_objects = segmented[len(session._segmented_objects) :]
|
|
for seg_obj in new_objects:
|
|
_run_async(
|
|
session.object_store.store_object(
|
|
session_id=session.id,
|
|
content=seg_obj.content,
|
|
object_type=seg_obj.object_type,
|
|
source_tool=seg_obj.source_tool,
|
|
source_key=seg_obj.source_key,
|
|
stub=seg_obj.stub,
|
|
tags=seg_obj.tags,
|
|
key_entities=seg_obj.key_entities,
|
|
turn=seg_obj.turn_start,
|
|
)
|
|
)
|
|
session._segmented_objects = segmented
|
|
|
|
# Verify objects are in the store
|
|
stored_objects = _run_async(session.object_store.get_session_objects(session.id))
|
|
assert len(stored_objects) == new_count
|
|
|
|
# Verify search works on stored objects
|
|
results = _run_async(
|
|
session.object_store.semantic_search(session.id, "database PostgreSQL", limit=5)
|
|
)
|
|
assert len(results) > 0
|
|
|
|
def test_incremental_segmentation_stores_only_new(self, session):
|
|
"""Incremental segmentation should only store new objects."""
|
|
msgs1 = [
|
|
_user_msg("Hello"),
|
|
_assistant_msg("Hi there! How can I help you today?"),
|
|
]
|
|
|
|
seg1 = session.segmenter.segment_incremental(
|
|
msgs1, session._segmented_objects, start_turn=0
|
|
)
|
|
for seg_obj in seg1:
|
|
_run_async(
|
|
session.object_store.store_object(
|
|
session_id=session.id,
|
|
content=seg_obj.content,
|
|
object_type=seg_obj.object_type,
|
|
)
|
|
)
|
|
session._segmented_objects = seg1
|
|
count1 = len(seg1)
|
|
|
|
# Add more messages
|
|
msgs2 = msgs1 + [
|
|
_user_msg("Tell me about the project architecture"),
|
|
_assistant_msg(
|
|
"The project follows a layered architecture with "
|
|
"gateway, service, and repository layers."
|
|
),
|
|
]
|
|
|
|
seg2 = session.segmenter.segment_incremental(
|
|
msgs2, session._segmented_objects, start_turn=0
|
|
)
|
|
new_count = len(seg2) - len(session._segmented_objects)
|
|
|
|
# Only store new objects
|
|
if new_count > 0:
|
|
new_objects = seg2[len(session._segmented_objects) :]
|
|
for seg_obj in new_objects:
|
|
_run_async(
|
|
session.object_store.store_object(
|
|
session_id=session.id,
|
|
content=seg_obj.content,
|
|
object_type=seg_obj.object_type,
|
|
)
|
|
)
|
|
session._segmented_objects = seg2
|
|
|
|
# Total stored should be count1 + new_count
|
|
stored = _run_async(session.object_store.get_session_objects(session.id))
|
|
assert len(stored) == count1 + new_count
|
|
|
|
|
|
# ── _run_async helper ────────────────────────────────────────────────────
|
|
|
|
|
|
class TestRunAsync:
|
|
"""Test the _run_async helper for sync/async bridging."""
|
|
|
|
def test_run_async_simple(self):
|
|
async def add(a, b):
|
|
return a + b
|
|
|
|
assert _run_async(add(2, 3)) == 5
|
|
|
|
def test_run_async_with_await(self):
|
|
async def delayed():
|
|
await asyncio.sleep(0.01)
|
|
return "done"
|
|
|
|
assert _run_async(delayed()) == "done"
|
|
|
|
def test_run_async_exception_propagates(self):
|
|
async def failing():
|
|
raise ValueError("test error")
|
|
|
|
with pytest.raises(ValueError, match="test error"):
|
|
_run_async(failing())
|