fix: make mcp server component handle cache miss gracefully (#8966)
Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
This commit is contained in:
parent
8f315a29bc
commit
85a1068c03
1 changed files with 64 additions and 19 deletions
|
|
@ -1,9 +1,9 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import re
|
||||
import uuid
|
||||
from typing import Any
|
||||
|
||||
from langchain_core.tools import StructuredTool
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from langflow.api.v2.mcp import get_server
|
||||
from langflow.base.mcp.util import (
|
||||
|
|
@ -13,18 +13,24 @@ from langflow.base.mcp.util import (
|
|||
update_tools,
|
||||
)
|
||||
from langflow.custom.custom_component.component_with_cache import ComponentWithCache
|
||||
from langflow.inputs.inputs import InputTypes
|
||||
from langflow.io import DropdownInput, McpInput, MessageTextInput, Output
|
||||
from langflow.io.schema import flatten_schema, schema_to_langflow_inputs
|
||||
from langflow.logging import logger
|
||||
from langflow.schema.dataframe import DataFrame
|
||||
from langflow.schema.message import Message
|
||||
from langflow.services.auth.utils import create_user_longterm_token
|
||||
from langflow.services.cache.utils import CacheMiss
|
||||
|
||||
# Import get_server from the backend API
|
||||
from langflow.services.database.models.user.crud import get_user_by_id
|
||||
from langflow.services.deps import get_session, get_settings_service, get_storage_service
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from langchain_core.tools import StructuredTool
|
||||
|
||||
from langflow.inputs.inputs import InputTypes
|
||||
from langflow.services.shared_component_cache.service import SharedComponentCacheService
|
||||
|
||||
|
||||
def maybe_unflatten_dict(flat: dict[str, Any]) -> dict[str, Any]:
|
||||
"""If any key looks nested (contains a dot or "[index]"), rebuild the.
|
||||
|
|
@ -64,6 +70,26 @@ def maybe_unflatten_dict(flat: dict[str, Any]) -> dict[str, Any]:
|
|||
return nested
|
||||
|
||||
|
||||
def safe_cache_get(cache: SharedComponentCacheService, key, default=None):
|
||||
"""Safely get a value from cache, handling CacheMiss objects."""
|
||||
try:
|
||||
value = cache.get(key)
|
||||
if isinstance(value, CacheMiss):
|
||||
return default
|
||||
except (AttributeError, KeyError, TypeError):
|
||||
return default
|
||||
else:
|
||||
return value
|
||||
|
||||
|
||||
def safe_cache_set(cache: SharedComponentCacheService, key, value):
|
||||
"""Safely set a value in cache, handling potential errors."""
|
||||
try:
|
||||
cache.set(key, value)
|
||||
except (AttributeError, TypeError) as e:
|
||||
logger.warning(f"Failed to set cache key '{key}': {e}")
|
||||
|
||||
|
||||
class MCPToolsComponent(ComponentWithCache):
|
||||
schema_inputs: list = []
|
||||
tools: list[StructuredTool] = []
|
||||
|
|
@ -74,15 +100,24 @@ class MCPToolsComponent(ComponentWithCache):
|
|||
def __init__(self, **data) -> None:
|
||||
super().__init__(**data)
|
||||
# Initialize cache keys to avoid CacheMiss when accessing them
|
||||
if "servers" not in self._shared_component_cache:
|
||||
self._shared_component_cache["servers"] = {}
|
||||
if "last_selected_server" not in self._shared_component_cache:
|
||||
self._shared_component_cache["last_selected_server"] = ""
|
||||
self._ensure_cache_structure()
|
||||
|
||||
# Initialize clients with access to the component cache
|
||||
self.stdio_client: MCPStdioClient = MCPStdioClient(component_cache=self._shared_component_cache)
|
||||
self.sse_client: MCPSseClient = MCPSseClient(component_cache=self._shared_component_cache)
|
||||
|
||||
def _ensure_cache_structure(self):
|
||||
"""Ensure the cache has the required structure."""
|
||||
# Check if servers key exists and is not CacheMiss
|
||||
servers_value = safe_cache_get(self._shared_component_cache, "servers")
|
||||
if servers_value is None:
|
||||
safe_cache_set(self._shared_component_cache, "servers", {})
|
||||
|
||||
# Check if last_selected_server key exists and is not CacheMiss
|
||||
last_server_value = safe_cache_get(self._shared_component_cache, "last_selected_server")
|
||||
if last_server_value is None:
|
||||
safe_cache_set(self._shared_component_cache, "last_selected_server", "")
|
||||
|
||||
default_keys: list[str] = [
|
||||
"code",
|
||||
"_type",
|
||||
|
|
@ -170,7 +205,9 @@ class MCPToolsComponent(ComponentWithCache):
|
|||
return [], {"name": server_name, "config": server_config_from_value}
|
||||
|
||||
# Use shared cache if available
|
||||
cached = self._shared_component_cache["servers"].get(server_name)
|
||||
servers_cache = safe_cache_get(self._shared_component_cache, "servers", {})
|
||||
cached = servers_cache.get(server_name) if isinstance(servers_cache, dict) else None
|
||||
|
||||
if cached is not None:
|
||||
self.tools = cached["tools"]
|
||||
self.tool_names = cached["tool_names"]
|
||||
|
|
@ -211,13 +248,19 @@ class MCPToolsComponent(ComponentWithCache):
|
|||
self._tool_cache = tool_cache
|
||||
self.tools = tool_list
|
||||
# Cache the result using shared cache
|
||||
|
||||
self._shared_component_cache["servers"][server_name] = {
|
||||
cache_data = {
|
||||
"tools": tool_list,
|
||||
"tool_names": self.tool_names,
|
||||
"tool_cache": tool_cache,
|
||||
"config": server_config,
|
||||
}
|
||||
|
||||
# Safely update the servers cache
|
||||
current_servers_cache = safe_cache_get(self._shared_component_cache, "servers", {})
|
||||
if isinstance(current_servers_cache, dict):
|
||||
current_servers_cache[server_name] = cache_data
|
||||
safe_cache_set(self._shared_component_cache, "servers", current_servers_cache)
|
||||
|
||||
return tool_list, {"name": server_name, "config": server_config}
|
||||
except (TimeoutError, asyncio.TimeoutError) as e:
|
||||
msg = f"Timeout updating tool list: {e!s}"
|
||||
|
|
@ -288,7 +331,7 @@ class MCPToolsComponent(ComponentWithCache):
|
|||
build_config["tool_placeholder"]["tool_mode"] = True
|
||||
|
||||
current_server_name = field_value.get("name") if isinstance(field_value, dict) else field_value
|
||||
_last_selected_server = self._shared_component_cache.get("last_selected_server") or ""
|
||||
_last_selected_server = safe_cache_get(self._shared_component_cache, "last_selected_server", "")
|
||||
|
||||
# To avoid unnecessary updates, only proceed if the server has actually changed
|
||||
if (_last_selected_server in (current_server_name, "")) and build_config["tool"]["show"]:
|
||||
|
|
@ -296,17 +339,19 @@ class MCPToolsComponent(ComponentWithCache):
|
|||
|
||||
# Determine if "Tool Mode" is active by checking if the tool dropdown is hidden.
|
||||
is_in_tool_mode = build_config["tools_metadata"]["show"]
|
||||
self._shared_component_cache.set("last_selected_server", current_server_name)
|
||||
safe_cache_set(self._shared_component_cache, "last_selected_server", current_server_name)
|
||||
|
||||
# Check if tools are already cached for this server before clearing
|
||||
cached_tools = None
|
||||
if current_server_name:
|
||||
cached = self._shared_component_cache["servers"].get(current_server_name)
|
||||
if cached is not None:
|
||||
cached_tools = cached["tools"]
|
||||
self.tools = cached_tools
|
||||
self.tool_names = cached["tool_names"]
|
||||
self._tool_cache = cached["tool_cache"]
|
||||
servers_cache = safe_cache_get(self._shared_component_cache, "servers", {})
|
||||
if isinstance(servers_cache, dict):
|
||||
cached = servers_cache.get(current_server_name)
|
||||
if cached is not None:
|
||||
cached_tools = cached["tools"]
|
||||
self.tools = cached_tools
|
||||
self.tool_names = cached["tool_names"]
|
||||
self._tool_cache = cached["tool_cache"]
|
||||
|
||||
# Only clear tools if we don't have cached tools for the current server
|
||||
if not cached_tools:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue