From 85a1068c03df357a82e2d2512c59d3d6542493fe Mon Sep 17 00:00:00 2001 From: Lucas Oliveira <62335616+lucaseduoli@users.noreply.github.com> Date: Thu, 10 Jul 2025 08:24:10 -0300 Subject: [PATCH] fix: make mcp server component handle cache miss gracefully (#8966) Co-authored-by: Gabriel Luiz Freitas Almeida --- .../components/agents/mcp_component.py | 83 ++++++++++++++----- 1 file changed, 64 insertions(+), 19 deletions(-) diff --git a/src/backend/base/langflow/components/agents/mcp_component.py b/src/backend/base/langflow/components/agents/mcp_component.py index 8ee4dc8ae..056fff5e1 100644 --- a/src/backend/base/langflow/components/agents/mcp_component.py +++ b/src/backend/base/langflow/components/agents/mcp_component.py @@ -1,9 +1,9 @@ +from __future__ import annotations + import asyncio import re import uuid -from typing import Any - -from langchain_core.tools import StructuredTool +from typing import TYPE_CHECKING, Any from langflow.api.v2.mcp import get_server from langflow.base.mcp.util import ( @@ -13,18 +13,24 @@ from langflow.base.mcp.util import ( update_tools, ) from langflow.custom.custom_component.component_with_cache import ComponentWithCache -from langflow.inputs.inputs import InputTypes from langflow.io import DropdownInput, McpInput, MessageTextInput, Output from langflow.io.schema import flatten_schema, schema_to_langflow_inputs from langflow.logging import logger from langflow.schema.dataframe import DataFrame from langflow.schema.message import Message from langflow.services.auth.utils import create_user_longterm_token +from langflow.services.cache.utils import CacheMiss # Import get_server from the backend API from langflow.services.database.models.user.crud import get_user_by_id from langflow.services.deps import get_session, get_settings_service, get_storage_service +if TYPE_CHECKING: + from langchain_core.tools import StructuredTool + + from langflow.inputs.inputs import InputTypes + from langflow.services.shared_component_cache.service import SharedComponentCacheService + def maybe_unflatten_dict(flat: dict[str, Any]) -> dict[str, Any]: """If any key looks nested (contains a dot or "[index]"), rebuild the. @@ -64,6 +70,26 @@ def maybe_unflatten_dict(flat: dict[str, Any]) -> dict[str, Any]: return nested +def safe_cache_get(cache: SharedComponentCacheService, key, default=None): + """Safely get a value from cache, handling CacheMiss objects.""" + try: + value = cache.get(key) + if isinstance(value, CacheMiss): + return default + except (AttributeError, KeyError, TypeError): + return default + else: + return value + + +def safe_cache_set(cache: SharedComponentCacheService, key, value): + """Safely set a value in cache, handling potential errors.""" + try: + cache.set(key, value) + except (AttributeError, TypeError) as e: + logger.warning(f"Failed to set cache key '{key}': {e}") + + class MCPToolsComponent(ComponentWithCache): schema_inputs: list = [] tools: list[StructuredTool] = [] @@ -74,15 +100,24 @@ class MCPToolsComponent(ComponentWithCache): def __init__(self, **data) -> None: super().__init__(**data) # Initialize cache keys to avoid CacheMiss when accessing them - if "servers" not in self._shared_component_cache: - self._shared_component_cache["servers"] = {} - if "last_selected_server" not in self._shared_component_cache: - self._shared_component_cache["last_selected_server"] = "" + self._ensure_cache_structure() # Initialize clients with access to the component cache self.stdio_client: MCPStdioClient = MCPStdioClient(component_cache=self._shared_component_cache) self.sse_client: MCPSseClient = MCPSseClient(component_cache=self._shared_component_cache) + def _ensure_cache_structure(self): + """Ensure the cache has the required structure.""" + # Check if servers key exists and is not CacheMiss + servers_value = safe_cache_get(self._shared_component_cache, "servers") + if servers_value is None: + safe_cache_set(self._shared_component_cache, "servers", {}) + + # Check if last_selected_server key exists and is not CacheMiss + last_server_value = safe_cache_get(self._shared_component_cache, "last_selected_server") + if last_server_value is None: + safe_cache_set(self._shared_component_cache, "last_selected_server", "") + default_keys: list[str] = [ "code", "_type", @@ -170,7 +205,9 @@ class MCPToolsComponent(ComponentWithCache): return [], {"name": server_name, "config": server_config_from_value} # Use shared cache if available - cached = self._shared_component_cache["servers"].get(server_name) + servers_cache = safe_cache_get(self._shared_component_cache, "servers", {}) + cached = servers_cache.get(server_name) if isinstance(servers_cache, dict) else None + if cached is not None: self.tools = cached["tools"] self.tool_names = cached["tool_names"] @@ -211,13 +248,19 @@ class MCPToolsComponent(ComponentWithCache): self._tool_cache = tool_cache self.tools = tool_list # Cache the result using shared cache - - self._shared_component_cache["servers"][server_name] = { + cache_data = { "tools": tool_list, "tool_names": self.tool_names, "tool_cache": tool_cache, "config": server_config, } + + # Safely update the servers cache + current_servers_cache = safe_cache_get(self._shared_component_cache, "servers", {}) + if isinstance(current_servers_cache, dict): + current_servers_cache[server_name] = cache_data + safe_cache_set(self._shared_component_cache, "servers", current_servers_cache) + return tool_list, {"name": server_name, "config": server_config} except (TimeoutError, asyncio.TimeoutError) as e: msg = f"Timeout updating tool list: {e!s}" @@ -288,7 +331,7 @@ class MCPToolsComponent(ComponentWithCache): build_config["tool_placeholder"]["tool_mode"] = True current_server_name = field_value.get("name") if isinstance(field_value, dict) else field_value - _last_selected_server = self._shared_component_cache.get("last_selected_server") or "" + _last_selected_server = safe_cache_get(self._shared_component_cache, "last_selected_server", "") # To avoid unnecessary updates, only proceed if the server has actually changed if (_last_selected_server in (current_server_name, "")) and build_config["tool"]["show"]: @@ -296,17 +339,19 @@ class MCPToolsComponent(ComponentWithCache): # Determine if "Tool Mode" is active by checking if the tool dropdown is hidden. is_in_tool_mode = build_config["tools_metadata"]["show"] - self._shared_component_cache.set("last_selected_server", current_server_name) + safe_cache_set(self._shared_component_cache, "last_selected_server", current_server_name) # Check if tools are already cached for this server before clearing cached_tools = None if current_server_name: - cached = self._shared_component_cache["servers"].get(current_server_name) - if cached is not None: - cached_tools = cached["tools"] - self.tools = cached_tools - self.tool_names = cached["tool_names"] - self._tool_cache = cached["tool_cache"] + servers_cache = safe_cache_get(self._shared_component_cache, "servers", {}) + if isinstance(servers_cache, dict): + cached = servers_cache.get(current_server_name) + if cached is not None: + cached_tools = cached["tools"] + self.tools = cached_tools + self.tool_names = cached["tool_names"] + self._tool_cache = cached["tool_cache"] # Only clear tools if we don't have cached tools for the current server if not cached_tools: