diff --git a/src/backend/base/langflow/base/mcp/util.py b/src/backend/base/langflow/base/mcp/util.py index 62d0ca2b9..07f493134 100644 --- a/src/backend/base/langflow/base/mcp/util.py +++ b/src/backend/base/langflow/base/mcp/util.py @@ -1,5 +1,6 @@ import asyncio import contextlib +import inspect import os import platform import re @@ -804,6 +805,38 @@ class MCPSessionManager: session_info = sessions[session_id] try: + # First try to properly close the session if it exists + if "session" in session_info: + session = session_info["session"] + + # Try async close first (aclose method) + if hasattr(session, "aclose"): + try: + await session.aclose() + logger.debug("Successfully closed session %s using aclose()", session_id) + except Exception as e: # noqa: BLE001 + logger.debug("Error closing session %s with aclose(): %s", session_id, e) + + # If no aclose, try regular close method + elif hasattr(session, "close"): + try: + # Check if close() is awaitable using inspection + if inspect.iscoroutinefunction(session.close): + # It's an async method + await session.close() + logger.debug("Successfully closed session %s using async close()", session_id) + else: + # Try calling it and check if result is awaitable + close_result = session.close() + if inspect.isawaitable(close_result): + await close_result + logger.debug("Successfully closed session %s using awaitable close()", session_id) + else: + # It's a synchronous close + logger.debug("Successfully closed session %s using sync close()", session_id) + except Exception as e: # noqa: BLE001 + logger.debug("Error closing session %s with close(): %s", session_id, e) + # Cancel the background task which will properly close the session if "task" in session_info: task = session_info["task"] @@ -854,6 +887,10 @@ class MCPSessionManager: with contextlib.suppress(asyncio.CancelledError): await task + # Give a bit more time for subprocess transports to clean up + # This helps prevent the BaseSubprocessTransport.__del__ warnings + await asyncio.sleep(0.5) + async def _cleanup_session(self, context_id: str): """Backward-compat cleanup by context_id. diff --git a/src/backend/tests/integration/components/mcp/test_mcp_memory_leak.py b/src/backend/tests/integration/components/mcp/test_mcp_memory_leak.py index fc92cfb2d..09366c863 100644 --- a/src/backend/tests/integration/components/mcp/test_mcp_memory_leak.py +++ b/src/backend/tests/integration/components/mcp/test_mcp_memory_leak.py @@ -44,15 +44,30 @@ def process_tracker(): yield process, initial_count - # Cleanup any remaining child processes + # Give some time for cleanup to complete before checking for leftover processes + # Collect child processes that we expect to wait for try: - for child in process.children(recursive=True): - try: - child.terminate() - child.wait(timeout=3) - except (psutil.NoSuchProcess, psutil.TimeoutExpired): - with contextlib.suppress(psutil.NoSuchProcess): - child.kill() + child_processes = process.children(recursive=True) + if child_processes: + # Wait up to 1 second for processes to exit naturally + gone, alive = psutil.wait_procs(child_processes, timeout=1) + + # Log processes that exited naturally + if gone: + logger.debug("Processes that exited naturally: %s", [p.pid for p in gone]) + + # Handle processes that are still alive after waiting + if alive: + logger.debug("Processes still alive after 1s wait: %s", [p.pid for p in alive]) + # Try to terminate remaining processes + for child in alive: + try: + child.terminate() + child.wait(timeout=5) # Increased timeout for cleanup + except (psutil.NoSuchProcess, psutil.TimeoutExpired): + with contextlib.suppress(psutil.NoSuchProcess): + child.kill() + except Exception as e: logger.exception("Error cleaning up child processes: %s", e)