fix: improve process cleanup in MCP tests (#9354)

This commit is contained in:
Ítalo Johnny 2025-08-11 19:02:47 -03:00 committed by GitHub
commit 515786c888
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -9,6 +9,7 @@ import contextlib
import os
import platform
import shutil
import time
import psutil
import pytest
@ -16,6 +17,24 @@ from langflow.base.mcp.util import MCPSessionManager
from loguru import logger
from mcp import StdioServerParameters
pytestmark = pytest.mark.timeout(300, method="thread")
async def wait_tools(session, t=20):
return await asyncio.wait_for(session.list_tools(), timeout=t)
async def wait_no_children(proc, max_wait=10, target=None):
deadline = time.monotonic() + max_wait
while time.monotonic() < deadline:
children = proc.children(recursive=True)
if target is not None and len(children) <= target:
return True
if not children:
return True
await asyncio.sleep(0.2)
return False
@pytest.fixture
def mcp_server_params():
@ -47,26 +66,32 @@ def process_tracker():
# Give some time for cleanup to complete before checking for leftover processes
# Collect child processes that we expect to wait for
try:
child_processes = process.children(recursive=True)
if child_processes:
# Wait up to 1 second for processes to exit naturally
gone, alive = psutil.wait_procs(child_processes, timeout=1)
children = process.children(recursive=True)
if not children:
return
# Log processes that exited naturally
if gone:
logger.debug("Processes that exited naturally: %s", [p.pid for p in gone])
gone, alive = psutil.wait_procs(children, timeout=2)
if gone:
logger.debug("Processes exited naturally: %s", [p.pid for p in gone])
# Handle processes that are still alive after waiting
if alive:
logger.debug("Processes still alive after 1s wait: %s", [p.pid for p in alive])
# Try to terminate remaining processes
for child in alive:
try:
child.terminate()
child.wait(timeout=5) # Increased timeout for cleanup
except (psutil.NoSuchProcess, psutil.TimeoutExpired):
with contextlib.suppress(psutil.NoSuchProcess):
child.kill()
if alive:
logger.debug("Processes still alive after 2s: %s", [p.pid for p in alive])
for p in alive:
with contextlib.suppress(psutil.NoSuchProcess):
p.terminate()
gone2, alive2 = psutil.wait_procs(alive, timeout=5)
if gone2:
logger.debug("Processes terminated gracefully: %s", [p.pid for p in gone2])
for p in alive2:
with contextlib.suppress(psutil.NoSuchProcess):
p.kill()
_ = psutil.wait_procs(alive2, timeout=2)
leftover = process.children(recursive=True)
assert not leftover, f"Leftover child processes: {[p.pid for p in leftover]}"
except Exception as e:
logger.exception("Error cleaning up child processes: %s", e)
@ -89,7 +114,7 @@ async def test_session_reuse_prevents_subprocess_leak(mcp_server_params, process
sessions.append(session)
# Verify session is working
tools_response = await session.list_tools()
tools_response = await wait_tools(session)
assert len(tools_response.tools) > 0
# Check subprocess count after creating sessions
@ -102,11 +127,12 @@ async def test_session_reuse_prevents_subprocess_leak(mcp_server_params, process
# Verify all sessions are functional
for session in sessions:
tools_response = await session.list_tools()
tools_response = await wait_tools(session)
assert len(tools_response.tools) > 0
finally:
await session_manager.cleanup_all()
await wait_no_children(process, max_wait=10, target=initial_count)
await asyncio.sleep(2) # Allow cleanup to complete
@ -121,7 +147,7 @@ async def test_session_cleanup_removes_subprocesses(mcp_server_params, process_t
try:
# Create a session
session = await session_manager.get_session("cleanup_test", mcp_server_params, "stdio")
tools_response = await session.list_tools()
tools_response = await wait_tools(session)
assert len(tools_response.tools) > 0
# Verify subprocess was created
@ -131,6 +157,7 @@ async def test_session_cleanup_removes_subprocesses(mcp_server_params, process_t
finally:
# Clean up session
await session_manager.cleanup_all()
await wait_no_children(process, max_wait=10, target=initial_count)
await asyncio.sleep(2) # Allow cleanup to complete
# Verify subprocess was cleaned up
@ -152,7 +179,7 @@ async def test_session_health_check_and_recovery(mcp_server_params, process_trac
try:
# Create a session
session1 = await session_manager.get_session("health_test", mcp_server_params, "stdio")
tools_response = await session1.list_tools()
tools_response = await wait_tools(session1)
assert len(tools_response.tools) > 0
# Simulate session becoming unhealthy by accessing internal state
@ -185,11 +212,12 @@ async def test_session_health_check_and_recovery(mcp_server_params, process_trac
# Try to get a session again - should create a new healthy one
session2 = await session_manager.get_session("health_test_2", mcp_server_params, "stdio")
tools_response = await session2.list_tools()
tools_response = await wait_tools(session2)
assert len(tools_response.tools) > 0
finally:
await session_manager.cleanup_all()
await wait_no_children(process, max_wait=10, target=initial_count)
await asyncio.sleep(2)
@ -235,6 +263,7 @@ async def test_multiple_servers_isolation(process_tracker):
finally:
await session_manager.cleanup_all()
await wait_no_children(process, max_wait=10, target=initial_count)
await asyncio.sleep(2)
@ -331,8 +360,9 @@ async def test_session_manager_connectivity_validation():
@pytest.mark.asyncio
async def test_session_manager_cleanup_all():
async def test_session_manager_cleanup_all(process_tracker):
"""Test that cleanup_all properly cleans up all sessions."""
process, initial_count = process_tracker
session_manager = MCPSessionManager()
# Mock some sessions using the correct structure
@ -366,6 +396,7 @@ async def test_session_manager_cleanup_all():
# Cleanup all
await session_manager.cleanup_all()
await wait_no_children(process, max_wait=10, target=initial_count)
# Verify cleanup
if hasattr(session_manager, "sessions_by_server"):