mnemosyne/tests/test_gateway_integration.py
Joey Yakimowich-Payne 9b25b33a50 test: add gateway integration tests
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
2026-03-13 11:41:28 -06:00

643 lines
24 KiB
Python

"""Integration tests for semantic object pipeline wiring in the gateway.
Tests the Session ↔ Segmenter ↔ ObjectStore ↔ ContextAssembler integration:
- Session.__init__ creates segmenter, object_store, _segmented_objects
- _preprocess segments messages and stores objects in ObjectStore
- memory_query phantom tool resolves through ContextAssembler
- Graceful degradation when embedder is unavailable (DummyEmbedder)
"""
from __future__ import annotations
import asyncio
import copy
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from mnemosyne.context_assembler import ContextAssembler, MicroFaultResult
from mnemosyne.gateway import Session, _run_async
from mnemosyne.object_store import DummyEmbedder, InMemoryBackend, ObjectStore
from mnemosyne.phantom import PhantomCall, _handle_phantom_call, inject_phantom_results
from mnemosyne.segmenter import SegmentedObject, Segmenter
# ── Fixtures ─────────────────────────────────────────────────────────────
@pytest.fixture
def tmp_log_dir():
with TemporaryDirectory() as d:
yield Path(d)
@pytest.fixture
def session(tmp_log_dir):
return Session("integ01", tmp_log_dir)
def _user_msg(text: str) -> dict:
return {"role": "user", "content": text}
def _assistant_msg(text: str) -> dict:
return {"role": "assistant", "content": text}
def _tool_use_block(tool_id: str, name: str, input_data: dict) -> dict:
return {"type": "tool_use", "id": tool_id, "name": name, "input": input_data}
def _tool_result_block(tool_id: str, content: str) -> dict:
return {"type": "tool_result", "tool_use_id": tool_id, "content": content}
# ── Session initialization ───────────────────────────────────────────────
class TestSessionSemanticAttributes:
"""Session.__init__ must create segmenter, object_store, and related attrs."""
def test_session_has_segmenter(self, session):
assert hasattr(session, "segmenter")
assert isinstance(session.segmenter, Segmenter)
def test_session_has_object_store(self, session):
assert hasattr(session, "object_store")
assert isinstance(session.object_store, ObjectStore)
def test_session_has_segmented_objects_list(self, session):
assert hasattr(session, "_segmented_objects")
assert isinstance(session._segmented_objects, list)
assert len(session._segmented_objects) == 0
def test_session_has_context_assembler_slot(self, session):
assert hasattr(session, "context_assembler")
# Initially None — set by create_app after helper_llm is resolved
assert session.context_assembler is None
def test_object_store_uses_in_memory_backend(self, session):
assert isinstance(session.object_store._backend, InMemoryBackend)
def test_object_store_has_embedder(self, session):
"""ObjectStore should have an embedder (DummyEmbedder fallback if no sentence-transformers)."""
assert session.object_store._embedder is not None
def test_segmenter_per_session(self, tmp_log_dir):
s1 = Session("sess_a", tmp_log_dir)
s2 = Session("sess_b", tmp_log_dir)
assert s1.segmenter is not s2.segmenter
def test_object_store_per_session(self, tmp_log_dir):
s1 = Session("sess_a", tmp_log_dir)
s2 = Session("sess_b", tmp_log_dir)
assert s1.object_store is not s2.object_store
# ── Segmenter integration ───────────────────────────────────────────────
class TestSegmenterIntegration:
"""Test that the segmenter correctly processes messages into objects."""
def test_segment_simple_conversation(self, session):
messages = [
_user_msg("Please read the auth module"),
_assistant_msg(
"I'll read the auth module for you. The authentication system uses "
"JWT tokens with RS256 signing. The main entry point is auth.py which "
"handles token validation, refresh, and revocation."
),
]
result = session.segmenter.segment_incremental(
messages, session._segmented_objects, start_turn=0
)
assert len(result) > 0
assert all(isinstance(obj, SegmentedObject) for obj in result)
def test_segment_incremental_extends(self, session):
msgs1 = [
_user_msg("What is the project structure?"),
_assistant_msg("The project has src/, tests/, and docs/ directories."),
]
result1 = session.segmenter.segment_incremental(
msgs1, session._segmented_objects, start_turn=0
)
session._segmented_objects = result1
msgs2 = msgs1 + [
_user_msg("Tell me about the src/ directory"),
_assistant_msg("The src/ directory contains the main application code."),
]
result2 = session.segmenter.segment_incremental(
msgs2, session._segmented_objects, start_turn=0
)
assert len(result2) >= len(result1)
def test_segment_with_tool_results(self, session):
messages = [
_user_msg("Read the config file"),
{
"role": "assistant",
"content": [
{"type": "text", "text": "I'll read the config file."},
_tool_use_block("tu_1", "Read", {"file_path": "/app/config.py"}),
],
},
{
"role": "user",
"content": [
_tool_result_block(
"tu_1",
"# config.py\nDATABASE_URL = 'postgres://localhost/mydb'\nDEBUG = True",
),
],
},
]
result = session.segmenter.segment_incremental(
messages, session._segmented_objects, start_turn=0
)
assert len(result) > 0
# At least one object should be file_context or tool_result
types = {obj.object_type for obj in result}
assert types & {"file_context", "tool_result", "conversation_phase"}
# ── ObjectStore integration ──────────────────────────────────────────────
class TestObjectStoreIntegration:
"""Test that segmented objects are stored in ObjectStore via _run_async."""
def test_store_segmented_object(self, session):
"""Manually store a segmented object and verify it's in the store."""
seg_obj = SegmentedObject(
content="The auth module uses JWT tokens for authentication.",
object_type="file_context",
source_tool="Read",
source_key="/app/auth.py",
stub="[file_context: auth.py — JWT authentication]",
turn_start=1,
turn_end=1,
token_estimate=12,
key_entities=["auth.py", "JWT"],
tags=["auth"],
)
stored = _run_async(
session.object_store.store_object(
session_id=session.id,
content=seg_obj.content,
object_type=seg_obj.object_type,
source_tool=seg_obj.source_tool,
source_key=seg_obj.source_key,
stub=seg_obj.stub,
tags=seg_obj.tags,
key_entities=seg_obj.key_entities,
turn=seg_obj.turn_start,
)
)
assert stored is not None
assert stored.session_id == session.id
assert stored.object_type == "file_context"
assert stored.content_full == seg_obj.content
assert stored.source_tool == "Read"
assert stored.source_key == "/app/auth.py"
assert len(stored.embedding) == 384 # DummyEmbedder produces 384-dim
def test_store_multiple_objects_and_search(self, session):
"""Store multiple objects and verify semantic search works."""
contents = [
("The database uses PostgreSQL with connection pooling.", "file_context"),
("Authentication is handled by JWT tokens with RS256.", "file_context"),
("The CI pipeline runs pytest and mypy on every PR.", "conversation_phase"),
]
for content, obj_type in contents:
_run_async(
session.object_store.store_object(
session_id=session.id,
content=content,
object_type=obj_type,
)
)
# Search for auth-related content
results = _run_async(
session.object_store.semantic_search(session.id, "authentication JWT", limit=3)
)
assert len(results) > 0
# The auth-related object should be in results
found_auth = any("JWT" in obj.content_full for obj, _score in results)
assert found_auth
def test_run_async_from_sync_context(self):
"""_run_async should work from a plain sync context."""
async def simple_coro():
return 42
result = _run_async(simple_coro())
assert result == 42
# ── memory_query phantom tool resolution ─────────────────────────────────
class TestMemoryQueryResolution:
"""Test that memory_query resolves through ContextAssembler."""
def test_memory_query_with_context_assembler(self, session):
"""memory_query should resolve through ContextAssembler when available."""
# Store some content first
_run_async(
session.object_store.store_object(
session_id=session.id,
content="The server runs on port 8080 with TLS enabled.",
object_type="file_context",
source_tool="Read",
source_key="/app/server.py",
tags=["server", "config"],
key_entities=["port 8080", "TLS"],
)
)
# Create ContextAssembler without HelperLLM (fallback mode)
assembler = ContextAssembler(session.object_store, helper_llm=None)
call = PhantomCall(
name="memory_query",
tool_use_id="toolu_mq_001",
input={"question": "What port does the server run on?"},
)
result = _handle_phantom_call(
call,
page_store=None,
context_assembler=assembler,
session_id=session.id,
)
# Should NOT contain the pending placeholder
assert "[memory_query:pending]" not in result
# Should contain the resolved answer
assert "[memory_query resolved:" in result
def test_memory_query_with_scope(self, session):
"""memory_query with scope should narrow the search."""
_run_async(
session.object_store.store_object(
session_id=session.id,
content="Database config: host=localhost, port=5432, name=mydb",
object_type="file_context",
source_key="/app/db.py",
tags=["database"],
)
)
_run_async(
session.object_store.store_object(
session_id=session.id,
content="Server config: host=0.0.0.0, port=8080",
object_type="file_context",
source_key="/app/server.py",
tags=["server"],
)
)
assembler = ContextAssembler(session.object_store, helper_llm=None)
call = PhantomCall(
name="memory_query",
tool_use_id="toolu_mq_002",
input={
"question": "What is the database port?",
"scope": "database config",
},
)
result = _handle_phantom_call(
call,
page_store=None,
context_assembler=assembler,
session_id=session.id,
)
assert "[memory_query:pending]" not in result
assert "[memory_query resolved:" in result
def test_memory_query_fallback_without_assembler(self):
"""memory_query without context_assembler should return pending placeholder."""
call = PhantomCall(
name="memory_query",
tool_use_id="toolu_mq_003",
input={"question": "What auth library is used?"},
)
result = _handle_phantom_call(call, page_store=None)
assert "[memory_query:pending]" in result
assert "What auth library is used?" in result
def test_memory_query_no_results(self, session):
"""memory_query with no matching content should still resolve gracefully."""
assembler = ContextAssembler(session.object_store, helper_llm=None)
call = PhantomCall(
name="memory_query",
tool_use_id="toolu_mq_004",
input={"question": "What is the meaning of life?"},
)
result = _handle_phantom_call(
call,
page_store=None,
context_assembler=assembler,
session_id=session.id,
)
# Should resolve (not pending) even with no results
assert "[memory_query:pending]" not in result
assert "[memory_query resolved:" in result
def test_memory_query_with_mock_helper_llm(self, session):
"""memory_query with a mocked HelperLLM should return the LLM's answer."""
_run_async(
session.object_store.store_object(
session_id=session.id,
content="The application uses bcrypt for password hashing with cost factor 12.",
object_type="file_context",
source_key="/app/auth.py",
key_entities=["bcrypt", "password hashing"],
)
)
# Mock HelperLLM
mock_helper = MagicMock()
mock_helper.answer_micro_fault = AsyncMock(
return_value="The application uses bcrypt with cost factor 12."
)
assembler = ContextAssembler(session.object_store, helper_llm=mock_helper)
call = PhantomCall(
name="memory_query",
tool_use_id="toolu_mq_005",
input={"question": "What password hashing is used?"},
)
result = _handle_phantom_call(
call,
page_store=None,
context_assembler=assembler,
session_id=session.id,
)
assert "[memory_query:pending]" not in result
assert "bcrypt" in result
assert "[memory_query resolved:" in result
mock_helper.answer_micro_fault.assert_called_once()
def test_inject_phantom_results_passes_assembler(self, session):
"""inject_phantom_results should pass context_assembler to handler."""
_run_async(
session.object_store.store_object(
session_id=session.id,
content="The API rate limit is 100 requests per minute.",
object_type="conversation_phase",
key_entities=["rate limit", "100 rpm"],
)
)
assembler = ContextAssembler(session.object_store, helper_llm=None)
phantom_calls = [
PhantomCall(
name="memory_query",
tool_use_id="toolu_inject_001",
input={"question": "What is the API rate limit?"},
)
]
messages = [
_user_msg("Hello"),
{
"role": "assistant",
"content": [
{"type": "text", "text": "Let me check."},
],
},
]
result_messages = inject_phantom_results(
messages,
phantom_calls,
page_store=None,
observe_only=False,
context_assembler=assembler,
session_id=session.id,
)
# The result should have injected a tool_result
# Find the user message with tool_result
found_result = False
for msg in result_messages:
content = msg.get("content", [])
if isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get("type") == "tool_result":
result_text = block.get("content", "")
if "[memory_query resolved:" in result_text:
found_result = True
assert found_result, "Expected resolved memory_query in injected results"
# ── DummyEmbedder fallback ───────────────────────────────────────────────
class TestDummyEmbedderFallback:
"""Test graceful degradation when real embedder is unavailable."""
def test_dummy_embedder_produces_384_dim(self):
embedder = DummyEmbedder()
vec = embedder.embed("test text")
assert len(vec) == 384
def test_dummy_embedder_deterministic(self):
embedder = DummyEmbedder()
v1 = embedder.embed("same text")
v2 = embedder.embed("same text")
assert v1 == v2
def test_dummy_embedder_different_for_different_text(self):
embedder = DummyEmbedder()
v1 = embedder.embed("text one")
v2 = embedder.embed("text two")
assert v1 != v2
def test_object_store_works_with_dummy_embedder(self):
"""ObjectStore should function correctly with DummyEmbedder."""
backend = InMemoryBackend()
embedder = DummyEmbedder()
store = ObjectStore(backend, embedder=embedder)
stored = _run_async(
store.store_object(
session_id="test_session",
content="Test content for embedding",
object_type="conversation_phase",
)
)
assert len(stored.embedding) == 384
# Search should work
results = _run_async(store.semantic_search("test_session", "test content", limit=5))
assert len(results) == 1
assert results[0][0].id == stored.id
@patch("mnemosyne.embedder.try_get_embedder", return_value=None)
def test_session_falls_back_to_dummy_embedder(self, mock_try, tmp_log_dir):
"""When try_get_embedder returns None, Session should use DummyEmbedder."""
s = Session("fallback_test", tmp_log_dir)
assert s.object_store._embedder is not None
assert isinstance(s.object_store._embedder, DummyEmbedder)
# ── End-to-end segmentation + storage ────────────────────────────────────
class TestEndToEndSegmentationStorage:
"""Test the full pipeline: messages → segmenter → object_store."""
def test_segment_and_store_conversation(self, session):
"""Simulate what _preprocess does: segment messages and store objects."""
messages = [
_user_msg("Can you read the database configuration?"),
_assistant_msg(
"I'll read the database configuration file. It contains "
"the connection string, pool size, and timeout settings. "
"The database is PostgreSQL running on port 5432 with "
"a connection pool of 20 connections."
),
_user_msg("What about the authentication setup?"),
_assistant_msg(
"The authentication system uses OAuth 2.0 with JWT tokens. "
"Tokens are signed with RS256 and have a 1-hour expiry. "
"Refresh tokens are stored in the database with a 30-day TTL."
),
]
segmented = session.segmenter.segment_incremental(
messages, session._segmented_objects, start_turn=0
)
new_count = len(segmented) - len(session._segmented_objects)
assert new_count > 0
new_objects = segmented[len(session._segmented_objects) :]
for seg_obj in new_objects:
_run_async(
session.object_store.store_object(
session_id=session.id,
content=seg_obj.content,
object_type=seg_obj.object_type,
source_tool=seg_obj.source_tool,
source_key=seg_obj.source_key,
stub=seg_obj.stub,
tags=seg_obj.tags,
key_entities=seg_obj.key_entities,
turn=seg_obj.turn_start,
)
)
session._segmented_objects = segmented
# Verify objects are in the store
stored_objects = _run_async(session.object_store.get_session_objects(session.id))
assert len(stored_objects) == new_count
# Verify search works on stored objects
results = _run_async(
session.object_store.semantic_search(session.id, "database PostgreSQL", limit=5)
)
assert len(results) > 0
def test_incremental_segmentation_stores_only_new(self, session):
"""Incremental segmentation should only store new objects."""
msgs1 = [
_user_msg("Hello"),
_assistant_msg("Hi there! How can I help you today?"),
]
seg1 = session.segmenter.segment_incremental(
msgs1, session._segmented_objects, start_turn=0
)
for seg_obj in seg1:
_run_async(
session.object_store.store_object(
session_id=session.id,
content=seg_obj.content,
object_type=seg_obj.object_type,
)
)
session._segmented_objects = seg1
count1 = len(seg1)
# Add more messages
msgs2 = msgs1 + [
_user_msg("Tell me about the project architecture"),
_assistant_msg(
"The project follows a layered architecture with "
"gateway, service, and repository layers."
),
]
seg2 = session.segmenter.segment_incremental(
msgs2, session._segmented_objects, start_turn=0
)
new_count = len(seg2) - len(session._segmented_objects)
# Only store new objects
if new_count > 0:
new_objects = seg2[len(session._segmented_objects) :]
for seg_obj in new_objects:
_run_async(
session.object_store.store_object(
session_id=session.id,
content=seg_obj.content,
object_type=seg_obj.object_type,
)
)
session._segmented_objects = seg2
# Total stored should be count1 + new_count
stored = _run_async(session.object_store.get_session_objects(session.id))
assert len(stored) == count1 + new_count
# ── _run_async helper ────────────────────────────────────────────────────
class TestRunAsync:
"""Test the _run_async helper for sync/async bridging."""
def test_run_async_simple(self):
async def add(a, b):
return a + b
assert _run_async(add(2, 3)) == 5
def test_run_async_with_await(self):
async def delayed():
await asyncio.sleep(0.01)
return "done"
assert _run_async(delayed()) == "done"
def test_run_async_exception_propagates(self):
async def failing():
raise ValueError("test error")
with pytest.raises(ValueError, match="test error"):
_run_async(failing())