From 515786c88810932dcea0cb3ce58c8869075f30fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=8Dtalo=20Johnny?= Date: Mon, 11 Aug 2025 19:02:47 -0300 Subject: [PATCH] fix: improve process cleanup in MCP tests (#9354) --- .../components/mcp/test_mcp_memory_leak.py | 79 +++++++++++++------ 1 file changed, 55 insertions(+), 24 deletions(-) diff --git a/src/backend/tests/integration/components/mcp/test_mcp_memory_leak.py b/src/backend/tests/integration/components/mcp/test_mcp_memory_leak.py index 09366c863..05e81e5b4 100644 --- a/src/backend/tests/integration/components/mcp/test_mcp_memory_leak.py +++ b/src/backend/tests/integration/components/mcp/test_mcp_memory_leak.py @@ -9,6 +9,7 @@ import contextlib import os import platform import shutil +import time import psutil import pytest @@ -16,6 +17,24 @@ from langflow.base.mcp.util import MCPSessionManager from loguru import logger from mcp import StdioServerParameters +pytestmark = pytest.mark.timeout(300, method="thread") + + +async def wait_tools(session, t=20): + return await asyncio.wait_for(session.list_tools(), timeout=t) + + +async def wait_no_children(proc, max_wait=10, target=None): + deadline = time.monotonic() + max_wait + while time.monotonic() < deadline: + children = proc.children(recursive=True) + if target is not None and len(children) <= target: + return True + if not children: + return True + await asyncio.sleep(0.2) + return False + @pytest.fixture def mcp_server_params(): @@ -47,26 +66,32 @@ def process_tracker(): # Give some time for cleanup to complete before checking for leftover processes # Collect child processes that we expect to wait for try: - child_processes = process.children(recursive=True) - if child_processes: - # Wait up to 1 second for processes to exit naturally - gone, alive = psutil.wait_procs(child_processes, timeout=1) + children = process.children(recursive=True) + if not children: + return - # Log processes that exited naturally - if gone: - logger.debug("Processes that exited naturally: %s", [p.pid for p in gone]) + gone, alive = psutil.wait_procs(children, timeout=2) + if gone: + logger.debug("Processes exited naturally: %s", [p.pid for p in gone]) - # Handle processes that are still alive after waiting - if alive: - logger.debug("Processes still alive after 1s wait: %s", [p.pid for p in alive]) - # Try to terminate remaining processes - for child in alive: - try: - child.terminate() - child.wait(timeout=5) # Increased timeout for cleanup - except (psutil.NoSuchProcess, psutil.TimeoutExpired): - with contextlib.suppress(psutil.NoSuchProcess): - child.kill() + if alive: + logger.debug("Processes still alive after 2s: %s", [p.pid for p in alive]) + for p in alive: + with contextlib.suppress(psutil.NoSuchProcess): + p.terminate() + + gone2, alive2 = psutil.wait_procs(alive, timeout=5) + if gone2: + logger.debug("Processes terminated gracefully: %s", [p.pid for p in gone2]) + + for p in alive2: + with contextlib.suppress(psutil.NoSuchProcess): + p.kill() + + _ = psutil.wait_procs(alive2, timeout=2) + + leftover = process.children(recursive=True) + assert not leftover, f"Leftover child processes: {[p.pid for p in leftover]}" except Exception as e: logger.exception("Error cleaning up child processes: %s", e) @@ -89,7 +114,7 @@ async def test_session_reuse_prevents_subprocess_leak(mcp_server_params, process sessions.append(session) # Verify session is working - tools_response = await session.list_tools() + tools_response = await wait_tools(session) assert len(tools_response.tools) > 0 # Check subprocess count after creating sessions @@ -102,11 +127,12 @@ async def test_session_reuse_prevents_subprocess_leak(mcp_server_params, process # Verify all sessions are functional for session in sessions: - tools_response = await session.list_tools() + tools_response = await wait_tools(session) assert len(tools_response.tools) > 0 finally: await session_manager.cleanup_all() + await wait_no_children(process, max_wait=10, target=initial_count) await asyncio.sleep(2) # Allow cleanup to complete @@ -121,7 +147,7 @@ async def test_session_cleanup_removes_subprocesses(mcp_server_params, process_t try: # Create a session session = await session_manager.get_session("cleanup_test", mcp_server_params, "stdio") - tools_response = await session.list_tools() + tools_response = await wait_tools(session) assert len(tools_response.tools) > 0 # Verify subprocess was created @@ -131,6 +157,7 @@ async def test_session_cleanup_removes_subprocesses(mcp_server_params, process_t finally: # Clean up session await session_manager.cleanup_all() + await wait_no_children(process, max_wait=10, target=initial_count) await asyncio.sleep(2) # Allow cleanup to complete # Verify subprocess was cleaned up @@ -152,7 +179,7 @@ async def test_session_health_check_and_recovery(mcp_server_params, process_trac try: # Create a session session1 = await session_manager.get_session("health_test", mcp_server_params, "stdio") - tools_response = await session1.list_tools() + tools_response = await wait_tools(session1) assert len(tools_response.tools) > 0 # Simulate session becoming unhealthy by accessing internal state @@ -185,11 +212,12 @@ async def test_session_health_check_and_recovery(mcp_server_params, process_trac # Try to get a session again - should create a new healthy one session2 = await session_manager.get_session("health_test_2", mcp_server_params, "stdio") - tools_response = await session2.list_tools() + tools_response = await wait_tools(session2) assert len(tools_response.tools) > 0 finally: await session_manager.cleanup_all() + await wait_no_children(process, max_wait=10, target=initial_count) await asyncio.sleep(2) @@ -235,6 +263,7 @@ async def test_multiple_servers_isolation(process_tracker): finally: await session_manager.cleanup_all() + await wait_no_children(process, max_wait=10, target=initial_count) await asyncio.sleep(2) @@ -331,8 +360,9 @@ async def test_session_manager_connectivity_validation(): @pytest.mark.asyncio -async def test_session_manager_cleanup_all(): +async def test_session_manager_cleanup_all(process_tracker): """Test that cleanup_all properly cleans up all sessions.""" + process, initial_count = process_tracker session_manager = MCPSessionManager() # Mock some sessions using the correct structure @@ -366,6 +396,7 @@ async def test_session_manager_cleanup_all(): # Cleanup all await session_manager.cleanup_all() + await wait_no_children(process, max_wait=10, target=initial_count) # Verify cleanup if hasattr(session_manager, "sessions_by_server"):