Object-addressed memory: segment messages into semantic objects, embed with sentence-transformers, store in pgvector-backed store, and reassemble context via goal-aware retrieval. Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
980 lines
36 KiB
Python
980 lines
36 KiB
Python
"""Tests for the semantic object segmenter.
|
|
|
|
Covers: tool result segmentation, text classification, user message handling,
|
|
entity extraction, stub/tag generation, merging, incremental segmentation,
|
|
realistic multi-turn payloads, and edge cases.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
|
|
from mnemosyne.segmenter import (
|
|
VALID_OBJECT_TYPES,
|
|
Segmenter,
|
|
SegmentedObject,
|
|
_estimate_tokens,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture
|
|
def seg() -> Segmenter:
|
|
"""Default segmenter with standard thresholds."""
|
|
return Segmenter()
|
|
|
|
|
|
@pytest.fixture
|
|
def small_seg() -> Segmenter:
|
|
"""Segmenter with low min_object_tokens for testing merging."""
|
|
return Segmenter(min_object_tokens=10, max_object_tokens=500)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _msg(role: str, content: str | list) -> dict:
|
|
"""Shorthand for creating a message dict."""
|
|
return {"role": role, "content": content}
|
|
|
|
|
|
def _tool_use_block(tool_id: str, name: str, input_: dict | None = None) -> dict:
|
|
return {
|
|
"type": "tool_use",
|
|
"id": tool_id,
|
|
"name": name,
|
|
"input": input_ or {},
|
|
}
|
|
|
|
|
|
def _tool_result_block(tool_use_id: str, content: str) -> dict:
|
|
return {
|
|
"type": "tool_result",
|
|
"tool_use_id": tool_use_id,
|
|
"content": content,
|
|
}
|
|
|
|
|
|
def _text_block(text: str) -> dict:
|
|
return {"type": "text", "text": text}
|
|
|
|
|
|
def _long_text(base: str = "x", tokens: int = 200) -> str:
|
|
"""Generate text of approximately `tokens` estimated tokens."""
|
|
return base * (tokens * 4)
|
|
|
|
|
|
# ===========================================================================
|
|
# 1. Tool result segmentation
|
|
# ===========================================================================
|
|
|
|
|
|
class TestToolResultSegmentation:
|
|
"""Test that tool results are classified by tool name."""
|
|
|
|
def test_read_tool_produces_file_context(self, seg: Segmenter):
|
|
messages = [
|
|
_msg(
|
|
"assistant",
|
|
[
|
|
_text_block("Let me read the file."),
|
|
_tool_use_block("t1", "Read", {"file_path": "src/main.py"}),
|
|
],
|
|
),
|
|
_msg(
|
|
"user",
|
|
[
|
|
_tool_result_block("t1", _long_text("def main():\n pass\n")),
|
|
],
|
|
),
|
|
]
|
|
objects = seg.segment_messages(messages)
|
|
file_objs = [o for o in objects if o.object_type == "file_context"]
|
|
assert len(file_objs) >= 1
|
|
assert file_objs[0].source_tool == "Read"
|
|
assert file_objs[0].source_key == "src/main.py"
|
|
|
|
def test_bash_tool_produces_tool_result(self, seg: Segmenter):
|
|
messages = [
|
|
_msg(
|
|
"assistant",
|
|
[
|
|
_text_block("Running tests."),
|
|
_tool_use_block("t1", "Bash", {"command": "pytest"}),
|
|
],
|
|
),
|
|
_msg(
|
|
"user",
|
|
[
|
|
_tool_result_block("t1", _long_text("All 42 tests passed.\n")),
|
|
],
|
|
),
|
|
]
|
|
objects = seg.segment_messages(messages)
|
|
tool_objs = [o for o in objects if o.object_type == "tool_result"]
|
|
assert len(tool_objs) >= 1
|
|
assert tool_objs[0].source_tool == "Bash"
|
|
|
|
def test_grep_tool_produces_tool_result(self, seg: Segmenter):
|
|
messages = [
|
|
_msg(
|
|
"assistant",
|
|
[
|
|
_text_block("Searching for usage."),
|
|
_tool_use_block("t1", "Grep", {"pattern": "handleAuth"}),
|
|
],
|
|
),
|
|
_msg(
|
|
"user",
|
|
[
|
|
_tool_result_block("t1", _long_text("src/auth.ts:15: handleAuth()\n")),
|
|
],
|
|
),
|
|
]
|
|
objects = seg.segment_messages(messages)
|
|
tool_objs = [o for o in objects if o.object_type == "tool_result"]
|
|
assert len(tool_objs) >= 1
|
|
assert tool_objs[0].source_tool == "Grep"
|
|
|
|
def test_write_tool_produces_tool_result(self, seg: Segmenter):
|
|
messages = [
|
|
_msg(
|
|
"assistant",
|
|
[
|
|
_text_block("Writing the file."),
|
|
_tool_use_block("t1", "Write", {"file_path": "out.py"}),
|
|
],
|
|
),
|
|
_msg(
|
|
"user",
|
|
[
|
|
_tool_result_block("t1", _long_text("File written successfully.\n")),
|
|
],
|
|
),
|
|
]
|
|
objects = seg.segment_messages(messages)
|
|
tool_objs = [o for o in objects if o.object_type == "tool_result"]
|
|
assert len(tool_objs) >= 1
|
|
|
|
def test_unknown_tool_produces_tool_result(self, seg: Segmenter):
|
|
messages = [
|
|
_msg(
|
|
"assistant",
|
|
[
|
|
_text_block("Using custom tool."),
|
|
_tool_use_block("t1", "CustomTool", {}),
|
|
],
|
|
),
|
|
_msg(
|
|
"user",
|
|
[
|
|
_tool_result_block("t1", _long_text("Custom output here.\n")),
|
|
],
|
|
),
|
|
]
|
|
objects = seg.segment_messages(messages)
|
|
tool_objs = [o for o in objects if o.object_type == "tool_result"]
|
|
assert len(tool_objs) >= 1
|
|
assert tool_objs[0].source_tool == "CustomTool"
|
|
|
|
def test_web_fetch_produces_external_reference(self, seg: Segmenter):
|
|
messages = [
|
|
_msg(
|
|
"assistant",
|
|
[
|
|
_text_block("Fetching docs."),
|
|
_tool_use_block("t1", "WebFetch", {"url": "https://docs.example.com"}),
|
|
],
|
|
),
|
|
_msg(
|
|
"user",
|
|
[
|
|
_tool_result_block("t1", _long_text("# API Documentation\n")),
|
|
],
|
|
),
|
|
]
|
|
objects = seg.segment_messages(messages)
|
|
ext_objs = [o for o in objects if o.object_type == "external_reference"]
|
|
assert len(ext_objs) >= 1
|
|
assert ext_objs[0].source_key == "https://docs.example.com"
|
|
|
|
def test_read_with_filePath_key(self, seg: Segmenter):
|
|
"""Test that filePath (camelCase) is also recognized."""
|
|
messages = [
|
|
_msg(
|
|
"assistant",
|
|
[
|
|
_tool_use_block("t1", "Read", {"filePath": "/etc/config.toml"}),
|
|
],
|
|
),
|
|
_msg(
|
|
"user",
|
|
[
|
|
_tool_result_block("t1", _long_text("key = 'value'\n")),
|
|
],
|
|
),
|
|
]
|
|
objects = seg.segment_messages(messages)
|
|
file_objs = [o for o in objects if o.object_type == "file_context"]
|
|
assert len(file_objs) >= 1
|
|
assert file_objs[0].source_key == "/etc/config.toml"
|
|
|
|
|
|
# ===========================================================================
|
|
# 2. Text classification
|
|
# ===========================================================================
|
|
|
|
|
|
class TestTextClassification:
|
|
"""Test assistant text classification into object types."""
|
|
|
|
def test_error_text_classified(self, seg: Segmenter):
|
|
error_text = _long_text(
|
|
"Traceback (most recent call last):\n"
|
|
' File "src/main.py", line 42, in run\n'
|
|
" raise ValueError('bad input')\n"
|
|
"ValueError: bad input\n"
|
|
)
|
|
messages = [_msg("assistant", error_text)]
|
|
objects = seg.segment_messages(messages)
|
|
assert any(o.object_type == "error_context" for o in objects)
|
|
|
|
def test_plan_text_classified(self, seg: Segmenter):
|
|
plan_text = _long_text(
|
|
"Here's the implementation plan:\n"
|
|
"1. Create the database schema\n"
|
|
"2. Implement the API endpoints\n"
|
|
"3. Write integration tests\n"
|
|
"4. Deploy to staging\n"
|
|
)
|
|
messages = [_msg("assistant", plan_text)]
|
|
objects = seg.segment_messages(messages)
|
|
assert any(o.object_type == "plan" for o in objects)
|
|
|
|
def test_decision_text_classified(self, seg: Segmenter):
|
|
decision_text = _long_text(
|
|
"I decided to use JWT tokens for authentication because "
|
|
"they're stateless and work well with our microservice architecture. "
|
|
"I chose JWT over session cookies because we need cross-domain support.\n"
|
|
)
|
|
messages = [_msg("assistant", decision_text)]
|
|
objects = seg.segment_messages(messages)
|
|
assert any(o.object_type == "design_decision" for o in objects)
|
|
|
|
def test_debug_text_classified(self, seg: Segmenter):
|
|
debug_text = _long_text(
|
|
"I'm investigating the race condition. After debugging, "
|
|
"I found the root cause: the mutex wasn't being held during "
|
|
"the token refresh. Fixed by adding a lock around the critical section.\n"
|
|
)
|
|
messages = [_msg("assistant", debug_text)]
|
|
objects = seg.segment_messages(messages)
|
|
assert any(o.object_type == "debugging_session" for o in objects)
|
|
|
|
def test_default_conversation_phase(self, seg: Segmenter):
|
|
text = _long_text(
|
|
"Sure, I'll help you with that. Let me take a look at the code "
|
|
"and see what we can improve here.\n"
|
|
)
|
|
messages = [_msg("assistant", text)]
|
|
objects = seg.segment_messages(messages)
|
|
assert any(o.object_type == "conversation_phase" for o in objects)
|
|
|
|
def test_error_needs_multiple_signals(self, seg: Segmenter):
|
|
"""A single 'error' word shouldn't trigger error_context."""
|
|
text = _long_text("There might be an error somewhere in the logic.\n")
|
|
messages = [_msg("assistant", text)]
|
|
objects = seg.segment_messages(messages)
|
|
# Should NOT be error_context with just one weak signal
|
|
assert all(o.object_type != "error_context" for o in objects)
|
|
|
|
|
|
# ===========================================================================
|
|
# 3. User message handling
|
|
# ===========================================================================
|
|
|
|
|
|
class TestUserMessageHandling:
|
|
"""Test user message boundary and merge behavior."""
|
|
|
|
def test_short_user_message_merges(self, seg: Segmenter):
|
|
"""Short messages like 'ok' should merge with next segment."""
|
|
messages = [
|
|
_msg("user", "ok"),
|
|
_msg("assistant", _long_text("I'll proceed with the implementation.\n")),
|
|
]
|
|
objects = seg.segment_messages(messages)
|
|
# Should produce a single merged object, not two
|
|
assert len(objects) <= 2
|
|
|
|
def test_normal_user_message_creates_boundary(self, seg: Segmenter):
|
|
long_user = _long_text(
|
|
"Can you refactor the authentication module to use OAuth2 "
|
|
"instead of the current basic auth approach?\n"
|
|
)
|
|
messages = [
|
|
_msg("user", long_user),
|
|
_msg("assistant", _long_text("I'll refactor the auth module.\n")),
|
|
]
|
|
objects = seg.segment_messages(messages)
|
|
assert len(objects) >= 1
|
|
|
|
def test_short_messages_recognized(self, seg: Segmenter):
|
|
"""Various short messages should be recognized."""
|
|
short_msgs = ["ok", "yes", "continue", "go ahead", "thanks", "lgtm", "done"]
|
|
for short in short_msgs:
|
|
messages = [
|
|
_msg("user", short),
|
|
_msg("assistant", _long_text("Continuing...\n")),
|
|
]
|
|
objects = seg.segment_messages(messages)
|
|
# Should merge — at most 1 object
|
|
assert len(objects) <= 2, f"'{short}' was not merged"
|
|
|
|
def test_user_message_with_tool_results(self, seg: Segmenter):
|
|
"""User messages can contain both text and tool_result blocks."""
|
|
messages = [
|
|
_msg(
|
|
"assistant",
|
|
[
|
|
_text_block("Let me read the file."),
|
|
_tool_use_block("t1", "Read", {"file_path": "src/app.py"}),
|
|
],
|
|
),
|
|
_msg(
|
|
"user",
|
|
[
|
|
_tool_result_block(
|
|
"t1", _long_text("import flask\napp = flask.Flask(__name__)\n")
|
|
),
|
|
],
|
|
),
|
|
]
|
|
objects = seg.segment_messages(messages)
|
|
assert any(o.object_type == "file_context" for o in objects)
|
|
|
|
|
|
# ===========================================================================
|
|
# 4. Entity extraction
|
|
# ===========================================================================
|
|
|
|
|
|
class TestEntityExtraction:
|
|
"""Test extraction of file paths, function names, and packages."""
|
|
|
|
def test_file_paths_extracted(self, seg: Segmenter):
|
|
text = _long_text(
|
|
"I read src/auth/middleware.ts and tests/auth.test.ts. "
|
|
"The main logic is in src/core/handler.py.\n"
|
|
)
|
|
messages = [_msg("assistant", text)]
|
|
objects = seg.segment_messages(messages)
|
|
entities = objects[0].key_entities
|
|
assert any("middleware.ts" in e for e in entities)
|
|
assert any("handler.py" in e for e in entities)
|
|
|
|
def test_function_names_extracted(self, seg: Segmenter):
|
|
text = _long_text("def handleAuth(request):\n return authenticate(request.token)\n")
|
|
messages = [_msg("assistant", text)]
|
|
objects = seg.segment_messages(messages)
|
|
entities = objects[0].key_entities
|
|
assert any("handleAuth" in e for e in entities)
|
|
|
|
def test_import_names_extracted(self, seg: Segmenter):
|
|
text = _long_text("import flask\nfrom sqlalchemy import Column\nimport os\n")
|
|
messages = [_msg("assistant", text)]
|
|
objects = seg.segment_messages(messages)
|
|
entities = objects[0].key_entities
|
|
assert any("flask" in e for e in entities)
|
|
assert any("sqlalchemy" in e for e in entities)
|
|
|
|
def test_js_require_extracted(self, seg: Segmenter):
|
|
text = _long_text(
|
|
"const express = require('express');\nconst jwt = require('jsonwebtoken');\n"
|
|
)
|
|
messages = [_msg("assistant", text)]
|
|
objects = seg.segment_messages(messages)
|
|
entities = objects[0].key_entities
|
|
assert any("express" in e for e in entities)
|
|
assert any("jsonwebtoken" in e for e in entities)
|
|
|
|
def test_entity_cap_at_20(self, seg: Segmenter):
|
|
"""Entities should be capped at 20."""
|
|
# Generate text with many unique file paths
|
|
paths = [f"src/module{i}/file{i}.py" for i in range(30)]
|
|
text = _long_text(" ".join(paths) + "\n")
|
|
messages = [_msg("assistant", text)]
|
|
objects = seg.segment_messages(messages)
|
|
assert len(objects[0].key_entities) <= 20
|
|
|
|
|
|
# ===========================================================================
|
|
# 5. Stub generation
|
|
# ===========================================================================
|
|
|
|
|
|
class TestStubGeneration:
|
|
"""Test auto-generated stubs."""
|
|
|
|
def test_stub_format(self, seg: Segmenter):
|
|
text = _long_text("This is the first line of content.\nSecond line here.\n")
|
|
messages = [_msg("assistant", text)]
|
|
objects = seg.segment_messages(messages)
|
|
stub = objects[0].stub
|
|
assert stub.startswith("[")
|
|
assert stub.endswith("]")
|
|
assert ":" in stub
|
|
|
|
def test_stub_contains_type(self, seg: Segmenter):
|
|
text = _long_text("Some conversation content here.\n")
|
|
messages = [_msg("assistant", text)]
|
|
objects = seg.segment_messages(messages)
|
|
assert objects[0].object_type in objects[0].stub
|
|
|
|
def test_stub_truncates_long_first_line(self, seg: Segmenter):
|
|
long_line = "A" * 200 + "\nSecond line."
|
|
text = _long_text(long_line)
|
|
messages = [_msg("assistant", text)]
|
|
objects = seg.segment_messages(messages)
|
|
stub = objects[0].stub
|
|
# Type prefix + ": " + content, content part should be <= 100 chars
|
|
content_part = stub.split(": ", 1)[1].rstrip("]")
|
|
assert len(content_part) <= 103 # 100 + "..."
|
|
|
|
def test_stub_uses_first_line_only(self, seg: Segmenter):
|
|
text = _long_text("First line here.\nSecond line should not appear.\n")
|
|
messages = [_msg("assistant", text)]
|
|
objects = seg.segment_messages(messages)
|
|
assert "Second line" not in objects[0].stub
|
|
|
|
|
|
# ===========================================================================
|
|
# 6. Tag generation
|
|
# ===========================================================================
|
|
|
|
|
|
class TestTagGeneration:
|
|
"""Test auto-generated tags."""
|
|
|
|
def test_tags_include_object_type(self, seg: Segmenter):
|
|
text = _long_text("Some content.\n")
|
|
messages = [_msg("assistant", text)]
|
|
objects = seg.segment_messages(messages)
|
|
assert objects[0].object_type in objects[0].tags
|
|
|
|
def test_tags_include_source_tool(self, seg: Segmenter):
|
|
messages = [
|
|
_msg(
|
|
"assistant",
|
|
[
|
|
_tool_use_block("t1", "Read", {"file_path": "x.py"}),
|
|
],
|
|
),
|
|
_msg(
|
|
"user",
|
|
[
|
|
_tool_result_block("t1", _long_text("content of x.py\n")),
|
|
],
|
|
),
|
|
]
|
|
objects = seg.segment_messages(messages)
|
|
file_objs = [o for o in objects if o.source_tool == "Read"]
|
|
assert len(file_objs) >= 1
|
|
assert "read" in file_objs[0].tags
|
|
|
|
def test_tags_include_file_extensions(self, seg: Segmenter):
|
|
text = _long_text("Modified src/auth.ts and src/handler.py to fix the issue.\n")
|
|
messages = [_msg("assistant", text)]
|
|
objects = seg.segment_messages(messages)
|
|
tags = objects[0].tags
|
|
assert ".ts" in tags
|
|
assert ".py" in tags
|
|
|
|
def test_tags_no_duplicate_extensions(self, seg: Segmenter):
|
|
text = _long_text("Read src/a.py and src/b.py and src/c.py.\n")
|
|
messages = [_msg("assistant", text)]
|
|
objects = seg.segment_messages(messages)
|
|
py_count = sum(1 for t in objects[0].tags if t == ".py")
|
|
assert py_count == 1
|
|
|
|
|
|
# ===========================================================================
|
|
# 7. Minimum object size merging
|
|
# ===========================================================================
|
|
|
|
|
|
class TestMinObjectSizeMerging:
|
|
"""Test that undersized objects get merged."""
|
|
|
|
def test_small_segments_merge(self, seg: Segmenter):
|
|
"""Very small adjacent segments should merge."""
|
|
messages = [
|
|
_msg("assistant", "Hi."),
|
|
_msg("user", "Hello."),
|
|
_msg("assistant", "How can I help?"),
|
|
]
|
|
objects = seg.segment_messages(messages)
|
|
# These are all tiny — should merge into fewer objects
|
|
assert len(objects) <= 2
|
|
|
|
def test_large_segments_stay_separate(self, seg: Segmenter):
|
|
"""Segments above min_object_tokens stay separate."""
|
|
messages = [
|
|
_msg("assistant", _long_text("First large block of content.\n")),
|
|
_msg("user", _long_text("Second large block of content.\n")),
|
|
]
|
|
objects = seg.segment_messages(messages)
|
|
assert len(objects) >= 1 # At least one object
|
|
|
|
def test_incompatible_types_dont_merge(self, small_seg: Segmenter):
|
|
"""file_context and plan shouldn't merge even if small."""
|
|
messages = [
|
|
_msg(
|
|
"assistant",
|
|
[
|
|
_tool_use_block("t1", "Read", {"file_path": "a.py"}),
|
|
],
|
|
),
|
|
_msg(
|
|
"user",
|
|
[
|
|
_tool_result_block("t1", "x = 1"),
|
|
],
|
|
),
|
|
_msg(
|
|
"assistant",
|
|
[
|
|
_tool_use_block("t2", "Read", {"file_path": "b.py"}),
|
|
],
|
|
),
|
|
_msg(
|
|
"user",
|
|
[
|
|
_tool_result_block("t2", "y = 2"),
|
|
],
|
|
),
|
|
]
|
|
objects = small_seg.segment_messages(messages)
|
|
file_objs = [o for o in objects if o.object_type == "file_context"]
|
|
# Even if small, file_context objects from different files should
|
|
# remain separate (they have different source_keys)
|
|
# But they might merge if compatible — the key thing is they exist
|
|
assert len(file_objs) >= 1
|
|
|
|
|
|
# ===========================================================================
|
|
# 8. Incremental segmentation
|
|
# ===========================================================================
|
|
|
|
|
|
class TestIncrementalSegmentation:
|
|
"""Test segment_incremental for extending existing objects."""
|
|
|
|
def test_incremental_appends_new(self, seg: Segmenter):
|
|
existing = [
|
|
SegmentedObject(
|
|
content=_long_text("Previous conversation.\n"),
|
|
object_type="conversation_phase",
|
|
turn_start=0,
|
|
turn_end=1,
|
|
token_estimate=200,
|
|
)
|
|
]
|
|
new_messages = [
|
|
_msg(
|
|
"assistant",
|
|
[
|
|
_tool_use_block("t1", "Read", {"file_path": "new.py"}),
|
|
],
|
|
),
|
|
_msg(
|
|
"user",
|
|
[
|
|
_tool_result_block("t1", _long_text("new file content\n")),
|
|
],
|
|
),
|
|
]
|
|
result = seg.segment_incremental(new_messages, existing, start_turn=2)
|
|
assert len(result) > len(existing)
|
|
assert result[0] is existing[0] # First object unchanged
|
|
|
|
def test_incremental_merges_compatible(self, seg: Segmenter):
|
|
existing = [
|
|
SegmentedObject(
|
|
content=_long_text("Starting the discussion.\n"),
|
|
object_type="conversation_phase",
|
|
turn_start=0,
|
|
turn_end=0,
|
|
token_estimate=200,
|
|
)
|
|
]
|
|
new_messages = [
|
|
_msg("assistant", _long_text("Continuing the discussion.\n")),
|
|
]
|
|
result = seg.segment_incremental(new_messages, existing, start_turn=1)
|
|
# Should merge the conversation_phase objects
|
|
assert len(result) >= 1
|
|
|
|
def test_incremental_empty_new(self, seg: Segmenter):
|
|
existing = [
|
|
SegmentedObject(
|
|
content="test",
|
|
object_type="conversation_phase",
|
|
turn_start=0,
|
|
turn_end=0,
|
|
token_estimate=1,
|
|
)
|
|
]
|
|
result = seg.segment_incremental([], existing)
|
|
assert len(result) == 1
|
|
|
|
def test_incremental_empty_existing(self, seg: Segmenter):
|
|
new_messages = [
|
|
_msg("assistant", _long_text("Hello world.\n")),
|
|
]
|
|
result = seg.segment_incremental(new_messages, [])
|
|
assert len(result) >= 1
|
|
|
|
def test_incremental_no_merge_across_types(self, seg: Segmenter):
|
|
"""file_context shouldn't merge with conversation_phase."""
|
|
existing = [
|
|
SegmentedObject(
|
|
content=_long_text("file content here\n"),
|
|
object_type="file_context",
|
|
source_tool="Read",
|
|
turn_start=0,
|
|
turn_end=0,
|
|
token_estimate=200,
|
|
)
|
|
]
|
|
new_messages = [
|
|
_msg("assistant", _long_text("Now let me explain what I found.\n")),
|
|
]
|
|
result = seg.segment_incremental(new_messages, existing, start_turn=1)
|
|
# Should NOT merge file_context with conversation_phase
|
|
assert len(result) >= 2
|
|
|
|
|
|
# ===========================================================================
|
|
# 9. Realistic multi-turn conversation payloads
|
|
# ===========================================================================
|
|
|
|
|
|
class TestRealisticPayloads:
|
|
"""Test with realistic multi-turn conversation structures."""
|
|
|
|
def test_typical_coding_session(self, seg: Segmenter):
|
|
"""Simulate: user asks → assistant reads file → assistant explains."""
|
|
messages = [
|
|
_msg(
|
|
"user",
|
|
_long_text(
|
|
"Can you look at the auth middleware and tell me "
|
|
"how it handles token refresh?\n"
|
|
),
|
|
),
|
|
_msg(
|
|
"assistant",
|
|
[
|
|
_text_block("I'll read the auth middleware file."),
|
|
_tool_use_block("t1", "Read", {"file_path": "src/auth/middleware.ts"}),
|
|
],
|
|
),
|
|
_msg(
|
|
"user",
|
|
[
|
|
_tool_result_block(
|
|
"t1",
|
|
_long_text(
|
|
"import jwt from 'jsonwebtoken';\n"
|
|
"export function handleAuth(req, res, next) {\n"
|
|
" const token = req.headers.authorization;\n"
|
|
" // ... token validation logic\n"
|
|
"}\n"
|
|
),
|
|
),
|
|
],
|
|
),
|
|
_msg(
|
|
"assistant",
|
|
_long_text(
|
|
"The auth middleware in src/auth/middleware.ts handles "
|
|
"token refresh by checking the JWT expiry and issuing "
|
|
"a new token if within the refresh window.\n"
|
|
),
|
|
),
|
|
]
|
|
objects = seg.segment_messages(messages)
|
|
types = {o.object_type for o in objects}
|
|
assert "file_context" in types
|
|
assert len(objects) >= 2
|
|
|
|
def test_debugging_flow(self, seg: Segmenter):
|
|
"""Simulate: error → investigation → fix."""
|
|
messages = [
|
|
_msg("user", _long_text("The tests are failing with this error.\n")),
|
|
_msg(
|
|
"assistant",
|
|
_long_text(
|
|
"I'm investigating the test failure. Let me look at the "
|
|
"root cause. The error seems to be a TypeError in the "
|
|
"authentication module.\n"
|
|
),
|
|
),
|
|
_msg(
|
|
"assistant",
|
|
[
|
|
_text_block("Let me run the failing test."),
|
|
_tool_use_block("t1", "Bash", {"command": "pytest tests/test_auth.py -v"}),
|
|
],
|
|
),
|
|
_msg(
|
|
"user",
|
|
[
|
|
_tool_result_block(
|
|
"t1",
|
|
_long_text(
|
|
"FAILED tests/test_auth.py::test_refresh - TypeError: "
|
|
"'NoneType' object is not subscriptable\n"
|
|
"Traceback (most recent call last):\n"
|
|
' File "tests/test_auth.py", line 42\n'
|
|
" token = response['access_token']\n"
|
|
"TypeError: 'NoneType' object is not subscriptable\n"
|
|
),
|
|
),
|
|
],
|
|
),
|
|
_msg(
|
|
"assistant",
|
|
_long_text(
|
|
"Found the problem! The root cause is that the refresh "
|
|
"endpoint returns None when the token is expired. "
|
|
"Fixed by adding a null check before accessing the response.\n"
|
|
),
|
|
),
|
|
]
|
|
objects = seg.segment_messages(messages)
|
|
types = {o.object_type for o in objects}
|
|
# Should have debugging and/or error objects
|
|
assert types & {"debugging_session", "error_context", "tool_result"}
|
|
|
|
def test_planning_session(self, seg: Segmenter):
|
|
"""Simulate: user asks for plan → assistant creates plan."""
|
|
messages = [
|
|
_msg(
|
|
"user",
|
|
_long_text(
|
|
"I need to add OAuth2 support. Can you create an implementation plan?\n"
|
|
),
|
|
),
|
|
_msg(
|
|
"assistant",
|
|
_long_text(
|
|
"Here's the implementation plan for OAuth2:\n"
|
|
"1. Install the oauth2 library\n"
|
|
"2. Create the OAuth2 provider configuration\n"
|
|
"3. Implement the authorization flow\n"
|
|
"4. Add callback handling\n"
|
|
"5. Write integration tests\n"
|
|
"Step 1 involves adding the dependency to package.json.\n"
|
|
),
|
|
),
|
|
]
|
|
objects = seg.segment_messages(messages)
|
|
assert any(o.object_type == "plan" for o in objects)
|
|
|
|
def test_multi_tool_turn(self, seg: Segmenter):
|
|
"""Assistant uses multiple tools in one turn."""
|
|
messages = [
|
|
_msg(
|
|
"assistant",
|
|
[
|
|
_text_block("Let me check both files."),
|
|
_tool_use_block("t1", "Read", {"file_path": "src/a.py"}),
|
|
_tool_use_block("t2", "Read", {"file_path": "src/b.py"}),
|
|
],
|
|
),
|
|
_msg(
|
|
"user",
|
|
[
|
|
_tool_result_block("t1", _long_text("# File A content\nclass A:\n pass\n")),
|
|
_tool_result_block("t2", _long_text("# File B content\nclass B:\n pass\n")),
|
|
],
|
|
),
|
|
]
|
|
objects = seg.segment_messages(messages)
|
|
file_objs = [o for o in objects if o.object_type == "file_context"]
|
|
assert len(file_objs) >= 2
|
|
|
|
def test_long_conversation_produces_multiple_objects(self, seg: Segmenter):
|
|
"""A long conversation should produce multiple objects."""
|
|
messages = []
|
|
for i in range(10):
|
|
messages.append(_msg("user", _long_text(f"Question {i} about the codebase.\n")))
|
|
messages.append(_msg("assistant", _long_text(f"Answer {i} with details.\n")))
|
|
objects = seg.segment_messages(messages)
|
|
assert len(objects) >= 3 # Should have multiple objects
|
|
|
|
|
|
# ===========================================================================
|
|
# 10. Empty and edge cases
|
|
# ===========================================================================
|
|
|
|
|
|
class TestEdgeCases:
|
|
"""Test empty inputs, malformed messages, and boundary conditions."""
|
|
|
|
def test_empty_messages(self, seg: Segmenter):
|
|
assert seg.segment_messages([]) == []
|
|
|
|
def test_empty_content_string(self, seg: Segmenter):
|
|
messages = [_msg("assistant", "")]
|
|
assert seg.segment_messages(messages) == []
|
|
|
|
def test_empty_content_list(self, seg: Segmenter):
|
|
messages = [_msg("assistant", [])]
|
|
assert seg.segment_messages(messages) == []
|
|
|
|
def test_whitespace_only_content(self, seg: Segmenter):
|
|
messages = [_msg("assistant", " \n\t ")]
|
|
assert seg.segment_messages(messages) == []
|
|
|
|
def test_missing_role(self, seg: Segmenter):
|
|
"""Messages without role should be handled gracefully."""
|
|
messages = [{"content": "no role here"}]
|
|
# Should not crash
|
|
result = seg.segment_messages(messages)
|
|
assert isinstance(result, list)
|
|
|
|
def test_non_dict_content_blocks(self, seg: Segmenter):
|
|
"""Content list with non-dict items should be handled."""
|
|
messages = [_msg("assistant", ["not a dict", 42, None])]
|
|
result = seg.segment_messages(messages)
|
|
assert isinstance(result, list)
|
|
|
|
def test_tool_result_with_list_content(self, seg: Segmenter):
|
|
"""tool_result content can be a list of text blocks."""
|
|
messages = [
|
|
_msg(
|
|
"assistant",
|
|
[
|
|
_tool_use_block("t1", "Read", {"file_path": "x.py"}),
|
|
],
|
|
),
|
|
_msg(
|
|
"user",
|
|
[
|
|
{
|
|
"type": "tool_result",
|
|
"tool_use_id": "t1",
|
|
"content": [{"type": "text", "text": _long_text("file content\n")}],
|
|
},
|
|
],
|
|
),
|
|
]
|
|
objects = seg.segment_messages(messages)
|
|
file_objs = [o for o in objects if o.object_type == "file_context"]
|
|
assert len(file_objs) >= 1
|
|
|
|
def test_token_estimate_accuracy(self, seg: Segmenter):
|
|
"""Token estimate should be approximately len/4."""
|
|
text = "a" * 400
|
|
assert _estimate_tokens(text) == 100
|
|
|
|
def test_token_estimate_minimum(self):
|
|
"""Token estimate should be at least 1."""
|
|
assert _estimate_tokens("") == 1
|
|
assert _estimate_tokens("a") == 1
|
|
|
|
def test_all_object_types_valid(self, seg: Segmenter):
|
|
"""All produced object types should be in VALID_OBJECT_TYPES."""
|
|
messages = [
|
|
_msg("user", _long_text("Question.\n")),
|
|
_msg(
|
|
"assistant",
|
|
_long_text(
|
|
'Traceback (most recent call last):\n File "x.py", line 1\nValueError: bad\n'
|
|
),
|
|
),
|
|
_msg("assistant", _long_text("Here's the plan:\n1. Do this\n2. Do that\n")),
|
|
_msg(
|
|
"assistant",
|
|
[
|
|
_tool_use_block("t1", "Read", {"file_path": "f.py"}),
|
|
],
|
|
),
|
|
_msg(
|
|
"user",
|
|
[
|
|
_tool_result_block("t1", _long_text("content\n")),
|
|
],
|
|
),
|
|
]
|
|
objects = seg.segment_messages(messages)
|
|
for obj in objects:
|
|
assert obj.object_type in VALID_OBJECT_TYPES, f"Invalid type: {obj.object_type}"
|
|
|
|
def test_oversized_content_gets_split(self, seg: Segmenter):
|
|
"""Content exceeding max_object_tokens should be split."""
|
|
small_seg = Segmenter(min_object_tokens=10, max_object_tokens=100)
|
|
huge_text = "x" * 2000 # ~500 tokens, well over 100
|
|
messages = [_msg("assistant", huge_text)]
|
|
objects = small_seg.segment_messages(messages)
|
|
assert len(objects) >= 2
|
|
for obj in objects:
|
|
assert obj.token_estimate <= 200 # Some slack for splitting
|
|
|
|
def test_segmented_object_defaults(self):
|
|
"""SegmentedObject should have sensible defaults."""
|
|
obj = SegmentedObject(content="test", object_type="conversation_phase")
|
|
assert obj.source_tool is None
|
|
assert obj.source_key is None
|
|
assert obj.stub == ""
|
|
assert obj.turn_start == 0
|
|
assert obj.turn_end == 0
|
|
assert obj.token_estimate == 0
|
|
assert obj.key_entities == []
|
|
assert obj.tags == []
|
|
|
|
def test_tool_result_with_error_content(self, seg: Segmenter):
|
|
"""Tool result containing errors should be classified as error_context."""
|
|
messages = [
|
|
_msg(
|
|
"assistant",
|
|
[
|
|
_tool_use_block("t1", "Bash", {"command": "npm test"}),
|
|
],
|
|
),
|
|
_msg(
|
|
"user",
|
|
[
|
|
_tool_result_block(
|
|
"t1",
|
|
_long_text(
|
|
"FAILED test_auth.py\n"
|
|
"TypeError: Cannot read property 'token' of undefined\n"
|
|
" at handleAuth (src/auth.js:42:15)\n"
|
|
" at processTicksAndRejections (internal/process/task_queues.js:95:5)\n"
|
|
),
|
|
),
|
|
],
|
|
),
|
|
]
|
|
objects = seg.segment_messages(messages)
|
|
assert any(o.object_type == "error_context" for o in objects)
|
|
|
|
def test_system_role_ignored(self, seg: Segmenter):
|
|
"""System messages should be ignored (not user or assistant)."""
|
|
messages = [
|
|
{"role": "system", "content": "You are a helpful assistant."},
|
|
_msg("assistant", _long_text("Hello!\n")),
|
|
]
|
|
objects = seg.segment_messages(messages)
|
|
# Should only have objects from the assistant message
|
|
assert all("helpful assistant" not in o.content for o in objects)
|