diff --git a/src/backend/base/langflow/base/mcp/util.py b/src/backend/base/langflow/base/mcp/util.py index a2adb95c6..05c356c60 100644 --- a/src/backend/base/langflow/base/mcp/util.py +++ b/src/backend/base/langflow/base/mcp/util.py @@ -387,17 +387,26 @@ class MCPSessionManager: try: session = session_info["session"] task = session_info["task"] - if ( - not task.done() - and hasattr(session, "_closed") - and not session._closed - and hasattr(session, "_write_stream") - and hasattr(session._write_stream, "_closed") - and not session._write_stream._closed - ): + + # Break down the health check to understand why cleanup is triggered + task_not_done = not task.done() + + logger.debug(f"Session health check for context_id {context_id}:") + logger.debug(f" - task_not_done: {task_not_done}") + + # For MCP ClientSession, we rely on the background task being alive + session_is_healthy = task_not_done + + logger.debug(f" - session_is_healthy: {session_is_healthy}") + + if session_is_healthy: + logger.debug(f"Session for context_id {context_id} is healthy, reusing") return session - except Exception: # noqa: BLE001 - msg = f"Session for context_id {context_id} is dead" + msg = f"Session for context_id {context_id} failed health check: background task is done" + logger.info(msg) + + except Exception as e: # noqa: BLE001 + msg = f"Session for context_id {context_id} is dead due to exception: {e}" logger.info(msg) # Session is dead, clean it up await self._cleanup_session(context_id) @@ -618,49 +627,6 @@ class MCPStdioClient: session_manager = self._get_session_manager() return await session_manager.get_session(self._session_context, self._connection_params, "stdio") - async def _create_fresh_session(self) -> ClientSession: - """Create a fresh session using the async context manager pattern.""" - from mcp.client.stdio import stdio_client - - # This creates a temporary session that will be closed when the context exits - # We need to use this within an async context - stdio_ctx = stdio_client(self._connection_params) - read, write = await stdio_ctx.__aenter__() - session = ClientSession(read, write) - await session.__aenter__() - await session.initialize() - - # Store the context managers so they can be cleaned up later - session._stdio_ctx = stdio_ctx - session._stdio_read = read - session._stdio_write = write - - return session - - async def _cleanup_session(self, session: ClientSession): - """Clean up a session and its associated resources.""" - try: - await session.__aexit__(None, None, None) - except Exception as e: # noqa: BLE001 - logger.info(f"issue cleaning up mcp session: {e}") - try: - if hasattr(session, "_stdio_ctx"): - await session._stdio_ctx.__aexit__(None, None, None) - except Exception as e: # noqa: BLE001 - logger.info(f"issue cleaning up mcp session: {e}") - - async def disconnect(self): - """Properly close the connection and clean up resources.""" - # Clean up session using session manager - if self._session_context: - session_manager = self._get_session_manager() - await session_manager._cleanup_session(self._session_context) - - self.session = None - self._connection_params = None - self._connected = False - self._session_context = None - async def run_tool(self, tool_name: str, arguments: dict[str, Any]) -> Any: """Run a tool with the given arguments using context-specific session.