fix: make MCP display loading states when loading tools, surface errors on MCP, sanitize MCP names (#8792)

* Catch timeout errors on check server

* Make errors propagate from the MCP clients

* Apply timeout error handling and made Server change only trigger a loader on the Tools dropdown

* Add placeholder to ToolsInput on errors

* Updated useEffect to run when nothing is selected

* Added timeout handling to mcp component

* Added placeholder to tools component

* removed unused props

* Added timeout handling on loading of tools on config page

* Fixed key pair input not working

* Set key pair values as empty list

* Surface final error from mcp

* Removed ID from tool mode turning on

* Turn exception on to more places

* Fixed cache on mcp component and make tool mode data not reset

* Added loading placeholder only if there are no data

* Refresh data if placeholder is Loading on tool mode

* Show modal if no tools are available

* Add useEffect to run handleOnNewValue if placeholder is Loading actions...

* Removed checks from toolsTable to run handleOnNewValue

* Sanitized MCP name

* Updated message

* Fixed actions not loading in mcp component

* [autofix.ci] apply automated fixes

* reuse mcp servers

* mypy fixes

* fix: update tool reference in MCPToolsComponent to use field_value

* Added last_updated to backend

* get latest version of node and compare last_updated before returning post template value

* assign last updated and only set node class if newTemplate exists

* Adds type

* Removed timeout from backend to frontend

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: phact <estevezsebastian@gmail.com>
Co-authored-by: Edwin Jose <edwin.jose@datastax.com>
Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
This commit is contained in:
Lucas Oliveira 2025-07-02 12:31:48 -03:00 committed by GitHub
commit 6460e23bfb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 798 additions and 316 deletions

View file

@ -21,7 +21,7 @@ from langflow.api.utils import CurrentActiveMCPUser
from langflow.api.v1.endpoints import simple_run_flow
from langflow.api.v1.schemas import SimplifiedAPIRequest
from langflow.base.mcp.constants import MAX_MCP_TOOL_NAME_LENGTH
from langflow.base.mcp.util import get_flow_snake_case
from langflow.base.mcp.util import get_flow_snake_case, sanitize_mcp_name
from langflow.helpers.flow import json_schema_from_flow
from langflow.schema.message import Message
from langflow.services.database.models.flow.model import Flow
@ -186,7 +186,7 @@ async def handle_list_tools():
if flow.user_id is None:
continue
base_name = "_".join(flow.name.lower().split())
base_name = sanitize_mcp_name(flow.name)
name = base_name[:MAX_MCP_TOOL_NAME_LENGTH]
if name in existing_names:
i = 1

View file

@ -32,7 +32,7 @@ from langflow.api.v1.mcp import (
)
from langflow.api.v1.schemas import MCPInstallRequest, MCPSettings, SimplifiedAPIRequest
from langflow.base.mcp.constants import MAX_MCP_SERVER_NAME_LENGTH, MAX_MCP_TOOL_NAME_LENGTH
from langflow.base.mcp.util import get_flow_snake_case, get_unique_name
from langflow.base.mcp.util import get_flow_snake_case, get_unique_name, sanitize_mcp_name
from langflow.helpers.flow import json_schema_from_flow
from langflow.schema.message import Message
from langflow.services.database.models import Flow, Folder
@ -96,10 +96,10 @@ async def list_project_tools(
continue
# Format the flow name according to MCP conventions (snake_case)
flow_name = "_".join(flow.name.lower().split())
flow_name = sanitize_mcp_name(flow.name)
# Use action_name and action_description if available, otherwise use defaults
name = flow.action_name or flow_name
name = sanitize_mcp_name(flow.action_name) if flow.action_name else flow_name
description = flow.action_description or (
flow.description if flow.description else f"Tool generated from flow: {flow_name}"
)
@ -395,14 +395,14 @@ async def install_mcp_config(
# Create the MCP configuration
mcp_config = {
"mcpServers": {
f"lf-{name.lower().replace(' ', '_')[: (MAX_MCP_SERVER_NAME_LENGTH - 4)]}": {
f"lf-{sanitize_mcp_name(name)[: (MAX_MCP_SERVER_NAME_LENGTH - 4)]}": {
"command": command,
"args": args,
}
}
}
server_name = f"lf-{name.lower().replace(' ', '_')[: (MAX_MCP_SERVER_NAME_LENGTH - 4)]}"
server_name = f"lf-{sanitize_mcp_name(name)[: (MAX_MCP_SERVER_NAME_LENGTH - 4)]}"
logger.debug("Installing MCP config for project: %s (server name: %s)", project.name, server_name)
# Determine the config file path based on the client and OS
@ -518,7 +518,7 @@ async def check_installed_mcp_servers(
# Project server name pattern (must match the logic in install function)
name = project.name
name = NEW_FOLDER_NAME if name == DEFAULT_FOLDER_NAME else name
project_server_name = f"lf-{name.lower().replace(' ', '_')[: (MAX_MCP_SERVER_NAME_LENGTH - 4)]}"
project_server_name = f"lf-{sanitize_mcp_name(name)[: (MAX_MCP_SERVER_NAME_LENGTH - 4)]}"
logger.debug(
"Checking for installed MCP servers for project: %s (server name: %s)", project.name, project_server_name
@ -670,7 +670,9 @@ class ProjectMCPServer:
continue
# Use action_name if available, otherwise construct from flow name
base_name = flow.action_name or "_".join(flow.name.lower().split())
base_name = (
sanitize_mcp_name(flow.action_name) if flow.action_name else sanitize_mcp_name(flow.name)
)
name = get_unique_name(base_name, MAX_MCP_TOOL_NAME_LENGTH, existing_names)
# Use action_description if available, otherwise use defaults

View file

@ -128,10 +128,14 @@ async def get_servers(
# Configuration validation errors, invalid URLs, etc.
logger.error(f"Configuration error for server {server_name}: {e}")
server_info["error"] = f"Configuration error: {e}"
except (ConnectionError, TimeoutError) as e:
except ConnectionError as e:
# Network connection and timeout issues
logger.error(f"Connection error for server {server_name}: {e}")
server_info["error"] = f"Connection failed: {e}"
except (TimeoutError, asyncio.TimeoutError) as e:
# Timeout errors
logger.error(f"Timeout error for server {server_name}: {e}")
server_info["error"] = "Timeout when checking server tools"
except OSError as e:
# System-level errors (process execution, file access)
logger.error(f"System error for server {server_name}: {e}")
@ -149,24 +153,21 @@ async def get_servers(
if hasattr(e, "exceptions") and e.exceptions:
# Extract the first underlying exception for a more meaningful error message
underlying_error = e.exceptions[0]
logger.exception(f"Error checking server {server_name}: {underlying_error}")
if hasattr(underlying_error, "exceptions"):
logger.error(
f"Error checking server {server_name}: {underlying_error}, {underlying_error.exceptions}"
)
underlying_error = underlying_error.exceptions[0]
else:
logger.exception(f"Error checking server {server_name}: {underlying_error}")
server_info["error"] = f"Error loading server: {underlying_error}"
else:
logger.exception(f"Error checking server {server_name}: {e}")
server_info["error"] = f"Error loading server: {e}"
return server_info
async def check_server_with_timeout(server_name: str) -> dict:
try:
return await asyncio.wait_for(
check_server(server_name), timeout=get_settings_service().settings.mcp_server_timeout
)
except asyncio.TimeoutError:
logger.error(f"Timeout checking server {server_name}")
return {"name": server_name, "mode": None, "toolsCount": None, "error": "Server check timed out."}
# Run all server checks concurrently
tasks = [check_server_with_timeout(server) for server in server_list["mcpServers"]]
tasks = [check_server(server) for server in server_list["mcpServers"]]
return await asyncio.gather(*tasks, return_exceptions=True)

View file

@ -1,7 +1,9 @@
import asyncio
import os
import platform
import re
import shutil
import unicodedata
from collections.abc import Awaitable, Callable
from typing import Any
from urllib.parse import urlparse
@ -22,6 +24,77 @@ HTTP_ERROR_STATUS_CODE = httpx_codes.BAD_REQUEST # HTTP status code for client
NULLABLE_TYPE_LENGTH = 2 # Number of types in a nullable union (the type itself + null)
def sanitize_mcp_name(name: str, max_length: int = 46) -> str:
"""Sanitize a name for MCP usage by removing emojis, diacritics, and special characters.
Args:
name: The original name to sanitize
max_length: Maximum length for the sanitized name
Returns:
A sanitized name containing only letters, numbers, hyphens, and underscores
"""
if not name or not name.strip():
return ""
# Remove emojis using regex pattern
emoji_pattern = re.compile(
"["
"\U0001f600-\U0001f64f" # emoticons
"\U0001f300-\U0001f5ff" # symbols & pictographs
"\U0001f680-\U0001f6ff" # transport & map symbols
"\U0001f1e0-\U0001f1ff" # flags (iOS)
"\U00002500-\U00002bef" # chinese char
"\U00002702-\U000027b0"
"\U00002702-\U000027b0"
"\U000024c2-\U0001f251"
"\U0001f926-\U0001f937"
"\U00010000-\U0010ffff"
"\u2640-\u2642"
"\u2600-\u2b55"
"\u200d"
"\u23cf"
"\u23e9"
"\u231a"
"\ufe0f" # dingbats
"\u3030"
"]+",
flags=re.UNICODE,
)
# Remove emojis
name = emoji_pattern.sub("", name)
# Normalize unicode characters to remove diacritics
name = unicodedata.normalize("NFD", name)
name = "".join(char for char in name if unicodedata.category(char) != "Mn")
# Replace spaces and special characters with underscores
name = re.sub(r"[^\w\s-]", "", name) # Keep only word chars, spaces, and hyphens
name = re.sub(r"[-\s]+", "_", name) # Replace spaces and hyphens with underscores
name = re.sub(r"_+", "_", name) # Collapse multiple underscores
# Remove leading/trailing underscores
name = name.strip("_")
# Ensure it starts with a letter or underscore (not a number)
if name and name[0].isdigit():
name = f"_{name}"
# Convert to lowercase
name = name.lower()
# Truncate to max length
if len(name) > max_length:
name = name[:max_length].rstrip("_")
# If empty after sanitization, provide a default
if not name:
name = "unnamed"
return name
def create_tool_coroutine(tool_name: str, arg_schema: type[BaseModel], client) -> Callable[..., Awaitable]:
async def tool_coroutine(*args, **kwargs):
# Get field names from the model (preserving order)
@ -101,7 +174,11 @@ async def get_flow_snake_case(flow_name: str, user_id: str, session, is_action:
flows = (await session.exec(stmt)).all()
for flow in flows:
this_flow_name = flow.action_name if is_action and flow.action_name else "_".join(flow.name.lower().split())
if is_action and flow.action_name:
this_flow_name = sanitize_mcp_name(flow.action_name)
else:
this_flow_name = sanitize_mcp_name(flow.name)
if this_flow_name == flow_name:
return flow
return None
@ -295,16 +372,179 @@ async def _validate_connection_params(mode: str, command: str | None = None, url
raise ValueError(msg)
class MCPStdioClient:
class MCPSessionManager:
"""Manages persistent MCP sessions with proper context manager lifecycle."""
def __init__(self):
self.sessions = {} # context_id -> session_info
self._background_tasks = set() # Keep references to background tasks
async def get_session(self, context_id: str, connection_params, transport_type: str):
"""Get or create a persistent session."""
if context_id in self.sessions:
session_info = self.sessions[context_id]
# Check if session and background task are still alive
try:
session = session_info["session"]
task = session_info["task"]
if (
not task.done()
and hasattr(session, "_closed")
and not session._closed
and hasattr(session, "_write_stream")
and hasattr(session._write_stream, "_closed")
and not session._write_stream._closed
):
return session
except Exception: # noqa: BLE001
msg = f"Session for context_id {context_id} is dead"
logger.info(msg)
# Session is dead, clean it up
await self._cleanup_session(context_id)
# Create new session
if transport_type == "stdio":
return await self._create_stdio_session(context_id, connection_params)
if transport_type == "sse":
return await self._create_sse_session(context_id, connection_params)
msg = f"Unknown transport type: {transport_type}"
raise ValueError(msg)
async def _create_stdio_session(self, context_id: str, connection_params):
"""Create a new stdio session as a background task to avoid context issues."""
import asyncio
from mcp.client.stdio import stdio_client
# Create a future to get the session
session_future: asyncio.Future[ClientSession] = asyncio.Future()
async def session_task():
"""Background task that keeps the session alive."""
try:
async with stdio_client(connection_params) as (read, write):
session = ClientSession(read, write)
async with session:
await session.initialize()
# Signal that session is ready
session_future.set_result(session)
# Keep the session alive until cancelled
import anyio
event = anyio.Event()
try:
await event.wait()
except asyncio.CancelledError:
# Session is being shut down
msg = "Message is shutting down"
logger.info(msg)
except Exception as e: # noqa: BLE001
if not session_future.done():
session_future.set_exception(e)
# Start the background task
task = asyncio.create_task(session_task())
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
# Wait for session to be ready
session = await session_future
# Store session info
self.sessions[context_id] = {"session": session, "task": task, "type": "stdio"}
return session
async def _create_sse_session(self, context_id: str, connection_params):
"""Create a new SSE session as a background task to avoid context issues."""
import asyncio
from mcp.client.sse import sse_client
# Create a future to get the session
session_future: asyncio.Future[ClientSession] = asyncio.Future()
async def session_task():
"""Background task that keeps the session alive."""
try:
async with sse_client(
connection_params["url"],
connection_params["headers"],
connection_params["timeout_seconds"],
connection_params["sse_read_timeout_seconds"],
) as (read, write):
session = ClientSession(read, write)
async with session:
await session.initialize()
# Signal that session is ready
session_future.set_result(session)
# Keep the session alive until cancelled
import anyio
event = anyio.Event()
try:
await event.wait()
except asyncio.CancelledError:
# Session is being shut down
msg = "Message is shutting down"
logger.info(msg)
except Exception as e: # noqa: BLE001
if not session_future.done():
session_future.set_exception(e)
# Start the background task
task = asyncio.create_task(session_task())
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
# Wait for session to be ready
session = await session_future
# Store session info
self.sessions[context_id] = {"session": session, "task": task, "type": "sse"}
return session
async def _cleanup_session(self, context_id: str):
"""Clean up a session by cancelling its background task."""
if context_id not in self.sessions:
return
session_info = self.sessions[context_id]
try:
# Cancel the background task which will properly close the session
if "task" in session_info:
task = session_info["task"]
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
logger.info(f"Issue cancelling task for context_id {context_id}")
except Exception as e: # noqa: BLE001
logger.info(f"issue cleaning up mcp session: {e}")
finally:
del self.sessions[context_id]
async def cleanup_all(self):
"""Clean up all sessions."""
for context_id in list(self.sessions.keys()):
await self._cleanup_session(context_id)
class MCPStdioClient:
def __init__(self, component_cache=None):
self.session: ClientSession | None = None
self._connection_params = None
self._connected = False
self._session_context: str | None = None
self._component_cache = component_cache
async def _connect_to_server(self, command_str: str, env: dict[str, str] | None = None) -> list[StructuredTool]:
"""Connect to MCP server using stdio transport (SDK style)."""
from mcp import StdioServerParameters
from mcp.client.stdio import stdio_client
command = command_str.split(" ")
env_data: dict[str, str] = {"DEBUG": "true", "PATH": os.environ["PATH"], **(env or {})}
@ -328,39 +568,101 @@ class MCPStdioClient:
# Store connection parameters for later use in run_tool
self._connection_params = server_params
try:
async with stdio_client(server_params) as (read, write), ClientSession(read, write) as session:
await session.initialize()
response = await session.list_tools()
self._connected = True
return response.tools
except (ConnectionError, TimeoutError, OSError, ValueError) as e:
logger.error(f"Failed to connect to MCP stdio server: {e}")
self._connection_params = None
self._connected = False
return []
# If no session context is set, create a default one
if not self._session_context:
# Generate a fallback context based on connection parameters
import uuid
param_hash = uuid.uuid4().hex[:8]
self._session_context = f"default_{param_hash}"
# Get or create a persistent session
session = await self._get_or_create_session()
response = await session.list_tools()
self._connected = True
return response.tools
async def connect_to_server(self, command_str: str, env: dict[str, str] | None = None) -> list[StructuredTool]:
"""Connect to MCP server using stdio transport (SDK style)."""
return await asyncio.wait_for(
self._connect_to_server(command_str, env), timeout=get_settings_service().settings.mcp_server_timeout
)
def set_session_context(self, context_id: str):
"""Set the session context (e.g., flow_id + user_id + session_id)."""
self._session_context = context_id
def _get_session_manager(self) -> MCPSessionManager:
"""Get or create session manager from component cache."""
if not self._component_cache:
# Fallback to instance-level session manager if no cache
if not hasattr(self, "_session_manager"):
self._session_manager = MCPSessionManager()
return self._session_manager
from langflow.services.cache.utils import CacheMiss
session_manager = self._component_cache.get("mcp_session_manager")
if isinstance(session_manager, CacheMiss):
session_manager = MCPSessionManager()
self._component_cache.set("mcp_session_manager", session_manager)
return session_manager
async def _get_or_create_session(self) -> ClientSession:
"""Get or create a persistent session for the current context."""
if not self._session_context or not self._connection_params:
msg = "Session context and connection params must be set"
raise ValueError(msg)
# Use cached session manager to get/create persistent session
session_manager = self._get_session_manager()
return await session_manager.get_session(self._session_context, self._connection_params, "stdio")
async def _create_fresh_session(self) -> ClientSession:
"""Create a fresh session using the async context manager pattern."""
from mcp.client.stdio import stdio_client
# This creates a temporary session that will be closed when the context exits
# We need to use this within an async context
stdio_ctx = stdio_client(self._connection_params)
read, write = await stdio_ctx.__aenter__()
session = ClientSession(read, write)
await session.__aenter__()
await session.initialize()
# Store the context managers so they can be cleaned up later
session._stdio_ctx = stdio_ctx
session._stdio_read = read
session._stdio_write = write
return session
async def _cleanup_session(self, session: ClientSession):
"""Clean up a session and its associated resources."""
try:
return await asyncio.wait_for(
self._connect_to_server(command_str, env), timeout=get_settings_service().settings.mcp_server_timeout
)
except (ConnectionError, TimeoutError, OSError, ValueError) as e:
logger.error(f"Failed to connect to MCP stdio server: {e}")
self._connection_params = None
self._connected = False
msg = f"Failed to connect to MCP stdio server: {e}"
raise ValueError(msg) from e
await session.__aexit__(None, None, None)
except Exception as e: # noqa: BLE001
logger.info(f"issue cleaning up mcp session: {e}")
try:
if hasattr(session, "_stdio_ctx"):
await session._stdio_ctx.__aexit__(None, None, None)
except Exception as e: # noqa: BLE001
logger.info(f"issue cleaning up mcp session: {e}")
async def disconnect(self):
"""Properly close the connection and clean up resources."""
# Clean up session using session manager
if self._session_context:
session_manager = self._get_session_manager()
await session_manager._cleanup_session(self._session_context)
self.session = None
self._connection_params = None
self._connected = False
self._session_context = None
async def run_tool(self, tool_name: str, arguments: dict[str, Any]) -> Any:
"""Run a tool with the given arguments.
"""Run a tool with the given arguments using context-specific session.
Args:
tool_name: Name of the tool to run
@ -376,16 +678,26 @@ class MCPStdioClient:
msg = "Session not initialized or disconnected. Call connect_to_server first."
raise ValueError(msg)
try:
from mcp.client.stdio import stdio_client
# If no session context is set, create a default one
if not self._session_context:
# Generate a fallback context based on connection parameters
import uuid
param_hash = uuid.uuid4().hex[:8]
self._session_context = f"default_{param_hash}"
try:
# Get or create persistent session
session = await self._get_or_create_session()
return await session.call_tool(tool_name, arguments=arguments)
async with stdio_client(self._connection_params) as (read, write), ClientSession(read, write) as session:
await session.initialize()
return await session.call_tool(tool_name, arguments=arguments)
except (ConnectionError, TimeoutError, OSError, ValueError) as e:
msg = f"Failed to run tool '{tool_name}': {e}"
logger.error(msg)
# Mark as disconnected on error
# Clean up failed session from cache
if self._session_context and self._component_cache:
cache_key = f"mcp_session_stdio_{self._session_context}"
self._component_cache.delete(cache_key)
self._connected = False
raise ValueError(msg) from e
@ -397,10 +709,28 @@ class MCPStdioClient:
class MCPSseClient:
def __init__(self):
def __init__(self, component_cache=None):
self.session: ClientSession | None = None
self._connection_params = None
self._connected = False
self._session_context: str | None = None
self._component_cache = component_cache
def _get_session_manager(self) -> MCPSessionManager:
"""Get or create session manager from component cache."""
if not self._component_cache:
# Fallback to instance-level session manager if no cache
if not hasattr(self, "_session_manager"):
self._session_manager = MCPSessionManager()
return self._session_manager
from langflow.services.cache.utils import CacheMiss
session_manager = self._component_cache.get("mcp_session_manager")
if isinstance(session_manager, CacheMiss):
session_manager = MCPSessionManager()
self._component_cache.set("mcp_session_manager", session_manager)
return session_manager
async def validate_url(self, url: str | None) -> tuple[bool, str]:
"""Validate the SSE URL before attempting connection."""
@ -447,8 +777,6 @@ class MCPSseClient:
sse_read_timeout_seconds: int = 30,
) -> list[StructuredTool]:
"""Connect to MCP server using SSE transport (SDK style)."""
from mcp.client.sse import sse_client
if headers is None:
headers = {}
if url is None:
@ -469,42 +797,54 @@ class MCPSseClient:
"sse_read_timeout_seconds": sse_read_timeout_seconds,
}
try:
async with (
sse_client(url, headers, timeout_seconds, sse_read_timeout_seconds) as (read, write),
ClientSession(read, write) as session,
):
await session.initialize()
response = await session.list_tools()
self._connected = True
return response.tools
except (ConnectionError, TimeoutError, OSError, ValueError) as e:
logger.error(f"Failed to connect to MCP SSE server: {e}")
self._connection_params = None
self._connected = False
msg = f"Failed to connect to MCP SSE server: {e}"
raise ValueError(msg) from e
# If no session context is set, create a default one
if not self._session_context:
# Generate a fallback context based on connection parameters
import uuid
param_hash = uuid.uuid4().hex[:8]
self._session_context = f"default_sse_{param_hash}"
# Get or create a persistent session
session = await self._get_or_create_session()
response = await session.list_tools()
self._connected = True
return response.tools
async def connect_to_server(self, url: str, headers: dict[str, str] | None = None) -> list[StructuredTool]:
"""Connect to MCP server using SSE transport (SDK style)."""
try:
return await asyncio.wait_for(
self._connect_to_server(url, headers), timeout=get_settings_service().settings.mcp_server_timeout
)
except (ConnectionError, TimeoutError, OSError, ValueError) as e:
logger.error(f"Failed to connect to MCP SSE server: {e}")
self._connection_params = None
self._connected = False
return []
return await asyncio.wait_for(
self._connect_to_server(url, headers), timeout=get_settings_service().settings.mcp_server_timeout
)
def set_session_context(self, context_id: str):
"""Set the session context (e.g., flow_id + user_id + session_id)."""
self._session_context = context_id
async def _get_or_create_session(self) -> ClientSession:
"""Get or create a persistent session for the current context."""
if not self._session_context or not self._connection_params:
msg = "Session context and params must be set"
raise ValueError(msg)
# Use cached session manager to get/create persistent session
session_manager = self._get_session_manager()
return await session_manager.get_session(self._session_context, self._connection_params, "sse")
async def disconnect(self):
"""Properly close the connection and clean up resources."""
# Clean up session using session manager
if self._session_context:
session_manager = self._get_session_manager()
await session_manager._cleanup_session(self._session_context)
self.session = None
self._connection_params = None
self._connected = False
self._session_context = None
async def run_tool(self, tool_name: str, arguments: dict[str, Any]) -> Any:
"""Run a tool with the given arguments.
"""Run a tool with the given arguments using context-specific session.
Args:
tool_name: Name of the tool to run
@ -520,22 +860,26 @@ class MCPSseClient:
msg = "Session not initialized or disconnected. Call connect_to_server first."
raise ValueError(msg)
try:
from mcp.client.sse import sse_client
# If no session context is set, create a default one
if not self._session_context:
# Generate a fallback context based on connection parameters
import uuid
param_hash = uuid.uuid4().hex[:8]
self._session_context = f"default_sse_{param_hash}"
try:
# Get or create persistent session
session = await self._get_or_create_session()
return await session.call_tool(tool_name, arguments=arguments)
params = self._connection_params
async with (
sse_client(
params["url"], params["headers"], params["timeout_seconds"], params["sse_read_timeout_seconds"]
) as (read, write),
ClientSession(read, write) as session,
):
await session.initialize()
return await session.call_tool(tool_name, arguments=arguments)
except (ConnectionError, TimeoutError, OSError, ValueError) as e:
msg = f"Failed to run tool '{tool_name}': {e}"
logger.error(msg)
# Mark as disconnected on error
# Clean up failed session from cache
if self._session_context and self._component_cache:
cache_key = f"mcp_session_sse_{self._session_context}"
self._component_cache.delete(cache_key)
self._connected = False
raise ValueError(msg) from e
@ -562,78 +906,66 @@ async def update_tools(
if mcp_sse_client is None:
mcp_sse_client = MCPSseClient()
# Fetch server config from backend
mode = "Stdio" if "command" in server_config else "SSE" if "url" in server_config else ""
command = server_config.get("command", "")
url = server_config.get("url", "")
tools = []
headers = _process_headers(server_config.get("headers", {}))
try:
# Fetch server config from backend
mode = "Stdio" if "command" in server_config else "SSE" if "url" in server_config else ""
command = server_config.get("command", "")
url = server_config.get("url", "")
tools = []
headers = _process_headers(server_config.get("headers", {}))
await _validate_connection_params(mode, command, url)
except ValueError as e:
logger.error(f"Invalid MCP server configuration for '{server_name}': {e}")
raise
try:
await _validate_connection_params(mode, command, url)
except ValueError as e:
logger.error(f"Invalid MCP server configuration for '{server_name}': {e}")
return "", [], {}
# Determine connection type and parameters
client: MCPStdioClient | MCPSseClient | None = None
if mode == "Stdio":
# Stdio connection
args = server_config.get("args", [])
env = server_config.get("env", {})
full_command = " ".join([command, *args])
tools = await mcp_stdio_client.connect_to_server(full_command, env)
client = mcp_stdio_client
elif mode == "SSE":
# SSE connection
tools = await mcp_sse_client.connect_to_server(url, headers=headers)
client = mcp_sse_client
else:
logger.error(f"Invalid MCP server mode for '{server_name}': {mode}")
return "", [], {}
# Determine connection type and parameters
client: MCPStdioClient | MCPSseClient | None = None
if not tools or not client or not client._connected:
logger.warning(f"No tools available from MCP server '{server_name}' or connection failed")
return "", [], {}
tool_list = []
tool_cache: dict[str, StructuredTool] = {}
for tool in tools:
if not tool or not hasattr(tool, "name"):
continue
try:
if mode == "Stdio":
# Stdio connection
args = server_config.get("args", [])
env = server_config.get("env", {})
full_command = " ".join([command, *args])
tools = await mcp_stdio_client.connect_to_server(full_command, env)
client = mcp_stdio_client
elif mode == "SSE":
# SSE connection
tools = await mcp_sse_client.connect_to_server(url, headers=headers)
client = mcp_sse_client
else:
logger.error(f"Invalid MCP server mode for '{server_name}': {mode}")
return "", [], {}
args_schema = create_input_schema_from_json_schema(tool.inputSchema)
if not args_schema:
logger.warning(f"Could not create schema for tool '{tool.name}' from server '{server_name}'")
continue
tool_obj = StructuredTool(
name=tool.name,
description=tool.description or "",
args_schema=args_schema,
func=create_tool_func(tool.name, args_schema, client),
coroutine=create_tool_coroutine(tool.name, args_schema, client),
tags=[tool.name],
metadata={"server_name": server_name},
)
tool_list.append(tool_obj)
tool_cache[tool.name] = tool_obj
except (ConnectionError, TimeoutError, OSError, ValueError) as e:
logger.error(f"Failed to connect to MCP server '{server_name}': {e}")
# return "", [], {}
msg = f"Failed to connect to MCP server '{server_name}': {e}"
logger.error(msg)
logger.error(f"Failed to create tool '{tool.name}' from server '{server_name}': {e}")
msg = f"Failed to create tool '{tool.name}' from server '{server_name}': {e}"
raise ValueError(msg) from e
if not tools or not client or not client._connected:
logger.warning(f"No tools available from MCP server '{server_name}' or connection failed")
return "", [], {}
tool_list = []
tool_cache: dict[str, StructuredTool] = {}
for tool in tools:
if not tool or not hasattr(tool, "name"):
continue
try:
args_schema = create_input_schema_from_json_schema(tool.inputSchema)
if not args_schema:
logger.warning(f"Could not create schema for tool '{tool.name}' from server '{server_name}'")
continue
tool_obj = StructuredTool(
name=tool.name,
description=tool.description or "",
args_schema=args_schema,
func=create_tool_func(tool.name, args_schema, client),
coroutine=create_tool_coroutine(tool.name, args_schema, client),
tags=[tool.name],
metadata={"server_name": server_name},
)
tool_list.append(tool_obj)
tool_cache[tool.name] = tool_obj
except (ConnectionError, TimeoutError, OSError, ValueError) as e:
logger.error(f"Failed to create tool '{tool.name}' from server '{server_name}': {e}")
msg = f"Failed to create tool '{tool.name}' from server '{server_name}': {e}"
raise ValueError(msg) from e
logger.info(f"Successfully loaded {len(tool_list)} tools from MCP server '{server_name}'")
except (ConnectionError, TimeoutError, OSError, ValueError, AttributeError, AssertionError) as e:
logger.error(f"Unexpected error while updating tools for MCP server '{server_name}': {e}")
return "", [], {}
else:
return mode, tool_list, tool_cache
logger.info(f"Successfully loaded {len(tool_list)} tools from MCP server '{server_name}'")
return mode, tool_list, tool_cache

View file

@ -1,6 +1,10 @@
import asyncio
import re
import uuid
from typing import Any
from langchain_core.tools import StructuredTool
from langflow.api.v2.mcp import get_server
from langflow.base.mcp.util import (
MCPSseClient,
@ -10,13 +14,12 @@ from langflow.base.mcp.util import (
)
from langflow.custom.custom_component.component_with_cache import ComponentWithCache
from langflow.inputs.inputs import InputTypes
from langflow.io import DropdownInput, McpInput, MessageTextInput, Output # Import McpInput from langflow.io
from langflow.io import DropdownInput, McpInput, MessageTextInput, Output
from langflow.io.schema import flatten_schema, schema_to_langflow_inputs
from langflow.logging import logger
from langflow.schema.dataframe import DataFrame
from langflow.schema.message import Message
from langflow.services.auth.utils import create_user_longterm_token
from langflow.services.cache.utils import CacheMiss
# Import get_server from the backend API
from langflow.services.database.models.user.crud import get_user_by_id
@ -63,11 +66,23 @@ def maybe_unflatten_dict(flat: dict[str, Any]) -> dict[str, Any]:
class MCPToolsComponent(ComponentWithCache):
schema_inputs: list = []
stdio_client: MCPStdioClient = MCPStdioClient()
sse_client: MCPSseClient = MCPSseClient()
tools: list = []
tools: list[StructuredTool] = []
_not_load_actions: bool = False
_tool_cache: dict = {}
_last_selected_server: str | None = None # Cache for the last selected server
def __init__(self, **data) -> None:
super().__init__(**data)
# Initialize cache keys to avoid CacheMiss when accessing them
if "servers" not in self._shared_component_cache:
self._shared_component_cache["servers"] = {}
if "last_selected_server" not in self._shared_component_cache:
self._shared_component_cache["last_selected_server"] = ""
# Initialize clients with access to the component cache
self.stdio_client: MCPStdioClient = MCPStdioClient(component_cache=self._shared_component_cache)
self.sse_client: MCPSseClient = MCPSseClient(component_cache=self._shared_component_cache)
default_keys: list[str] = [
"code",
"_type",
@ -154,8 +169,8 @@ class MCPToolsComponent(ComponentWithCache):
return [], {"name": server_name, "config": server_config_from_value}
# Use shared cache if available
cached = self._shared_component_cache.get(server_name)
if not isinstance(cached, CacheMiss):
cached = self._shared_component_cache["servers"].get(server_name)
if cached is not None:
self.tools = cached["tools"]
self.tool_names = cached["tool_names"]
self._tool_cache = cached["tool_cache"]
@ -195,16 +210,18 @@ class MCPToolsComponent(ComponentWithCache):
self._tool_cache = tool_cache
self.tools = tool_list
# Cache the result using shared cache
self._shared_component_cache.set(
server_name,
{
"tools": tool_list,
"tool_names": self.tool_names,
"tool_cache": tool_cache,
"config": server_config,
},
)
self._shared_component_cache["servers"][server_name] = {
"tools": tool_list,
"tool_names": self.tool_names,
"tool_cache": tool_cache,
"config": server_config,
}
return tool_list, {"name": server_name, "config": server_config}
except (TimeoutError, asyncio.TimeoutError) as e:
msg = f"Timeout updating tool list: {e!s}"
logger.exception(msg)
raise TimeoutError(msg) from e
except Exception as e:
msg = f"Error updating tool list: {e!s}"
logger.exception(msg)
@ -219,22 +236,35 @@ class MCPToolsComponent(ComponentWithCache):
try:
self.tools, build_config["mcp_server"]["value"] = await self.update_tool_list()
build_config["tool"]["options"] = [tool.name for tool in self.tools]
build_config["tool"]["placeholder"] = "Select a tool"
except (TimeoutError, asyncio.TimeoutError) as e:
msg = f"Timeout updating tool list: {e!s}"
logger.exception(msg)
if not build_config["tools_metadata"]["show"]:
build_config["tool"]["show"] = True
build_config["tool"]["options"] = []
build_config["tool"]["value"] = ""
build_config["tool"]["placeholder"] = "Timeout on MCP server"
else:
build_config["tool"]["show"] = False
except ValueError:
build_config["tool"]["options"] = []
build_config["tool"]["value"] = ""
build_config["tool"]["placeholder"] = "Error on MCP Server"
return build_config
build_config["tool"]["placeholder"] = ""
if not build_config["tools_metadata"]["show"]:
build_config["tool"]["show"] = True
build_config["tool"]["options"] = []
build_config["tool"]["value"] = ""
build_config["tool"]["placeholder"] = "Error on MCP Server"
else:
build_config["tool"]["show"] = False
if field_value == "":
return build_config
tool_obj = None
for tool in self.tools:
if tool.name == self.tool:
if tool.name == field_value:
tool_obj = tool
break
if tool_obj is None:
msg = f"Tool {self.tool} not found in available tools: {self.tools}"
msg = f"Tool {field_value} not found in available tools: {self.tools}"
logger.warning(msg)
return build_config
await self._update_tool_config(build_config, field_value)
@ -254,71 +284,63 @@ class MCPToolsComponent(ComponentWithCache):
return build_config
current_server_name = field_value.get("name") if isinstance(field_value, dict) else field_value
_last_selected_server = self._shared_component_cache.get("last_selected_server") or ""
# To avoid unnecessary updates, only proceed if the server has actually changed
if self._last_selected_server == current_server_name:
if (_last_selected_server in (current_server_name, "")) and build_config["tool"]["show"]:
return build_config
# Determine if "Tool Mode" is active by checking if the tool dropdown is hidden.
# The check for _last_selected_server handles the initial state where the dropdown is
# hidden by default but we are not yet in "Tool Mode".
is_in_tool_mode = build_config["tools_metadata"]["value"]
self._last_selected_server = current_server_name
self.tools = [] # Clear previous tools
is_in_tool_mode = build_config["tools_metadata"]["show"]
self._shared_component_cache.set("last_selected_server", current_server_name)
# Check if tools are already cached for this server before clearing
cached_tools = None
if current_server_name:
cached = self._shared_component_cache["servers"].get(current_server_name)
if cached is not None:
cached_tools = cached["tools"]
self.tools = cached_tools
self.tool_names = cached["tool_names"]
self._tool_cache = cached["tool_cache"]
# Only clear tools if we don't have cached tools for the current server
if not cached_tools:
self.tools = [] # Clear previous tools only if no cache
self.remove_non_default_keys(build_config) # Clear previous tool inputs
# Only show the tool dropdown if not in tool_mode
if not is_in_tool_mode:
build_config["tool"]["show"] = True
build_config["tool"]["placeholder"] = "Loading tools..."
build_config["tool"]["options"] = []
build_config["tool"]["value"] = ""
if cached_tools:
# Use cached tools to populate options immediately
build_config["tool"]["options"] = [tool.name for tool in cached_tools]
build_config["tool"]["placeholder"] = "Select a tool"
else:
# Show loading state only when we need to fetch tools
build_config["tool"]["placeholder"] = "Loading tools..."
build_config["tool"]["options"] = []
build_config["tool"]["value"] = uuid.uuid4()
else:
# Keep the tool dropdown hidden if in tool_mode
self._not_load_actions = True
build_config["tool"]["show"] = False
try:
# Fetch tools for the newly selected server
tools, server_info = await self.update_tool_list(field_value)
build_config["mcp_server"]["value"] = server_info
if tools:
tool_names = [tool.name for tool in tools]
if not is_in_tool_mode:
build_config["tool"]["options"] = tool_names
build_config["tool"]["placeholder"] = "Select a tool"
elif not is_in_tool_mode:
build_config["tool"]["placeholder"] = "No tools found"
except (ValueError, AttributeError, KeyError, TypeError, ConnectionError, TimeoutError) as e:
logger.error(f"Failed to fetch tools for server '{current_server_name}': {e}")
if not is_in_tool_mode:
build_config["tool"]["placeholder"] = "Error fetching tools"
build_config["tool"]["options"] = []
build_config["tool"]["value"] = ""
finally:
# Ensure we don't show inputs for a tool that might no longer be valid
await self._update_tool_config(build_config, "")
elif field_name == "tool_mode":
try:
self.tools, build_config["mcp_server"]["value"] = await self.update_tool_list()
except ValueError:
if not build_config["tools_metadata"]["show"]:
build_config["tool"]["show"] = True
build_config["tool"]["options"] = []
build_config["tool"]["value"] = ""
build_config["tool"]["placeholder"] = "Error on MCP Server"
else:
build_config["tool"]["show"] = False
build_config["tool"]["placeholder"] = ""
build_config["tool"]["show"] = not field_value
for key, value in list(build_config.items()):
if key not in self.default_keys and isinstance(value, dict) and "show" in value:
build_config[key]["show"] = not field_value
if not field_value:
build_config["tool"]["options"] = [tool.name for tool in self.tools]
await self._update_tool_config(build_config, build_config["tool"]["value"])
self.remove_non_default_keys(build_config)
self.tool = build_config["tool"]["value"]
if field_value:
self._not_load_actions = True
else:
build_config["tool"]["value"] = uuid.uuid4()
build_config["tool"]["options"] = []
build_config["tool"]["show"] = True
build_config["tool"]["placeholder"] = "Loading tools..."
elif field_name == "tools_metadata":
self._not_load_actions = False
except Exception as e:
msg = f"Error in update_build_config: {e!s}"
@ -434,6 +456,12 @@ class MCPToolsComponent(ComponentWithCache):
try:
self.tools, _ = await self.update_tool_list()
if self.tool != "":
# Set session context for persistent MCP sessions using Langflow session ID
session_context = self._get_session_context()
if session_context:
self.stdio_client.set_session_context(session_context)
self.sse_client.set_session_context(session_context)
exec_tool = self._tool_cache[self.tool]
tool_args = self.get_inputs_for_all_tools(self.tools)[self.tool]
kwargs = {}
@ -460,8 +488,25 @@ class MCPToolsComponent(ComponentWithCache):
logger.exception(msg)
raise ValueError(msg) from e
def _get_session_context(self) -> str | None:
"""Get the Langflow session ID for MCP session caching."""
# Try to get session ID from the component's execution context
if hasattr(self, "graph") and hasattr(self.graph, "session_id"):
session_id = self.graph.session_id
# Include server name to ensure different servers get different sessions
server_name = ""
mcp_server = getattr(self, "mcp_server", None)
if isinstance(mcp_server, dict):
server_name = mcp_server.get("name", "")
elif mcp_server:
server_name = str(mcp_server)
return f"{session_id}_{server_name}" if session_id else None
return None
async def _get_tools(self):
"""Get cached tools or update if necessary."""
mcp_server = getattr(self, "mcp_server", None)
tools, _ = await self.update_tool_list(mcp_server)
return tools
if not self._not_load_actions:
tools, _ = await self.update_tool_list(mcp_server)
return tools
return []

View file

@ -1294,7 +1294,20 @@ class Component(CustomComponent):
}
async def _build_tools_metadata_input(self):
tools = await self._get_tools()
try:
from langflow.io import ToolsInput
except ImportError as e:
msg = "Failed to import ToolsInput from langflow.io"
raise ImportError(msg) from e
placeholder = None
tools = []
try:
tools = await self._get_tools()
placeholder = "Loading actions..." if len(tools) == 0 else ""
except (TimeoutError, asyncio.TimeoutError):
placeholder = "Timeout loading actions"
except (ConnectionError, OSError, ValueError):
placeholder = "Error loading actions"
# Always use the latest tool data
tool_data = [self._build_tool_data(tool) for tool in tools]
# print(tool_data)
@ -1322,14 +1335,9 @@ class Component(CustomComponent):
item["status"] = any(enabled_name in [item["name"], *item["tags"]] for enabled_name in enabled)
self.tools_metadata = tool_data
try:
from langflow.io import ToolsInput
except ImportError as e:
msg = "Failed to import ToolsInput from langflow.io"
raise ImportError(msg) from e
return ToolsInput(
name=TOOLS_METADATA_INPUT_NAME,
placeholder=placeholder,
display_name="Actions",
info=TOOLS_METADATA_INFO,
value=tool_data,

View file

@ -67,6 +67,7 @@ class CustomComponentFrontendNode(FrontendNode):
)
description: str | None = None
base_classes: list[str] = []
last_updated: str | None = None
class ComponentFrontendNode(FrontendNode):

View file

@ -77,7 +77,7 @@ dependencies = [
"validators>=0.34.0",
"networkx>=3.4.2",
"json-repair>=0.30.3",
"mcp~=1.9.4",
"mcp~=1.10.1",
"aiosqlite>=0.20.0",
"greenlet>=3.1.1",
"jsonquerylang>=1.1.1",