ref: MCP Tests (#9349)
* update MCP Tests * [autofix.ci] apply automated fixes * Update util.py * [autofix.ci] apply automated fixes * Refactor MCP session manager for better configurability and cleanup (#9176) * Add log rotation and header validation features Introduces support for log rotation via the LANGFLOW_LOG_ROTATION environment variable and CLI/config options, with documentation updates. Adds header validation and sanitization for MCP connections, ensuring RFC 7230 compliance and security. Frontend and backend now support passing custom headers for MCP servers. Includes extensive new and updated unit tests for header handling, MCP utilities, and log rotation. * Add unit tests for MCP utilities and update disconnect logic Added comprehensive unit tests for MCP utility functions, session management, header validation, and client classes in test_mcp_util.py. Updated MCPStdioClient and MCPSseClient disconnect methods for clearer session cleanup logic. Refactored test_mcp_component.py to remove redundant and duplicated tests, consolidating coverage in the new test suite. * [autofix.ci] apply automated fixes * Update test_mcp_memory_leak.py * Update util.py * Improve session and process cleanup in MCP Added explicit session closing and a short delay to allow subprocess transports to clean up, reducing warnings and potential memory leaks. Test code now waits longer after cleanup and increases process termination timeout to ensure all child processes are properly terminated. * Improve session and process cleanup logic Enhanced MCPSessionManager to handle both async and sync session closing methods, using inspection to determine awaitability. Updated memory leak test to more robustly wait for and clean up child processes, logging process states and handling termination more gracefully. --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
This commit is contained in:
parent
b093c1fadb
commit
1b3493aafd
2 changed files with 60 additions and 8 deletions
|
|
@ -1,5 +1,6 @@
|
|||
import asyncio
|
||||
import contextlib
|
||||
import inspect
|
||||
import os
|
||||
import platform
|
||||
import re
|
||||
|
|
@ -804,6 +805,38 @@ class MCPSessionManager:
|
|||
|
||||
session_info = sessions[session_id]
|
||||
try:
|
||||
# First try to properly close the session if it exists
|
||||
if "session" in session_info:
|
||||
session = session_info["session"]
|
||||
|
||||
# Try async close first (aclose method)
|
||||
if hasattr(session, "aclose"):
|
||||
try:
|
||||
await session.aclose()
|
||||
logger.debug("Successfully closed session %s using aclose()", session_id)
|
||||
except Exception as e: # noqa: BLE001
|
||||
logger.debug("Error closing session %s with aclose(): %s", session_id, e)
|
||||
|
||||
# If no aclose, try regular close method
|
||||
elif hasattr(session, "close"):
|
||||
try:
|
||||
# Check if close() is awaitable using inspection
|
||||
if inspect.iscoroutinefunction(session.close):
|
||||
# It's an async method
|
||||
await session.close()
|
||||
logger.debug("Successfully closed session %s using async close()", session_id)
|
||||
else:
|
||||
# Try calling it and check if result is awaitable
|
||||
close_result = session.close()
|
||||
if inspect.isawaitable(close_result):
|
||||
await close_result
|
||||
logger.debug("Successfully closed session %s using awaitable close()", session_id)
|
||||
else:
|
||||
# It's a synchronous close
|
||||
logger.debug("Successfully closed session %s using sync close()", session_id)
|
||||
except Exception as e: # noqa: BLE001
|
||||
logger.debug("Error closing session %s with close(): %s", session_id, e)
|
||||
|
||||
# Cancel the background task which will properly close the session
|
||||
if "task" in session_info:
|
||||
task = session_info["task"]
|
||||
|
|
@ -854,6 +887,10 @@ class MCPSessionManager:
|
|||
with contextlib.suppress(asyncio.CancelledError):
|
||||
await task
|
||||
|
||||
# Give a bit more time for subprocess transports to clean up
|
||||
# This helps prevent the BaseSubprocessTransport.__del__ warnings
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
async def _cleanup_session(self, context_id: str):
|
||||
"""Backward-compat cleanup by context_id.
|
||||
|
||||
|
|
|
|||
|
|
@ -44,15 +44,30 @@ def process_tracker():
|
|||
|
||||
yield process, initial_count
|
||||
|
||||
# Cleanup any remaining child processes
|
||||
# Give some time for cleanup to complete before checking for leftover processes
|
||||
# Collect child processes that we expect to wait for
|
||||
try:
|
||||
for child in process.children(recursive=True):
|
||||
try:
|
||||
child.terminate()
|
||||
child.wait(timeout=3)
|
||||
except (psutil.NoSuchProcess, psutil.TimeoutExpired):
|
||||
with contextlib.suppress(psutil.NoSuchProcess):
|
||||
child.kill()
|
||||
child_processes = process.children(recursive=True)
|
||||
if child_processes:
|
||||
# Wait up to 1 second for processes to exit naturally
|
||||
gone, alive = psutil.wait_procs(child_processes, timeout=1)
|
||||
|
||||
# Log processes that exited naturally
|
||||
if gone:
|
||||
logger.debug("Processes that exited naturally: %s", [p.pid for p in gone])
|
||||
|
||||
# Handle processes that are still alive after waiting
|
||||
if alive:
|
||||
logger.debug("Processes still alive after 1s wait: %s", [p.pid for p in alive])
|
||||
# Try to terminate remaining processes
|
||||
for child in alive:
|
||||
try:
|
||||
child.terminate()
|
||||
child.wait(timeout=5) # Increased timeout for cleanup
|
||||
except (psutil.NoSuchProcess, psutil.TimeoutExpired):
|
||||
with contextlib.suppress(psutil.NoSuchProcess):
|
||||
child.kill()
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("Error cleaning up child processes: %s", e)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue