diff --git a/pyproject.toml b/pyproject.toml index be51ddeaf..17dac9154 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -105,7 +105,7 @@ dependencies = [ "arize-phoenix-otel>=0.6.1", "openinference-instrumentation-langchain>=0.1.29", "crewai==0.102.0", - "mcp>=1.9.4", + "mcp>=1.10.1", "uv>=0.5.7", "scipy>=1.14.1", "ag2>=0.1.0", diff --git a/src/backend/base/langflow/api/v1/mcp.py b/src/backend/base/langflow/api/v1/mcp.py index 5c1eb6dfe..2ceeb579b 100644 --- a/src/backend/base/langflow/api/v1/mcp.py +++ b/src/backend/base/langflow/api/v1/mcp.py @@ -21,7 +21,7 @@ from langflow.api.utils import CurrentActiveMCPUser from langflow.api.v1.endpoints import simple_run_flow from langflow.api.v1.schemas import SimplifiedAPIRequest from langflow.base.mcp.constants import MAX_MCP_TOOL_NAME_LENGTH -from langflow.base.mcp.util import get_flow_snake_case +from langflow.base.mcp.util import get_flow_snake_case, sanitize_mcp_name from langflow.helpers.flow import json_schema_from_flow from langflow.schema.message import Message from langflow.services.database.models.flow.model import Flow @@ -186,7 +186,7 @@ async def handle_list_tools(): if flow.user_id is None: continue - base_name = "_".join(flow.name.lower().split()) + base_name = sanitize_mcp_name(flow.name) name = base_name[:MAX_MCP_TOOL_NAME_LENGTH] if name in existing_names: i = 1 diff --git a/src/backend/base/langflow/api/v1/mcp_projects.py b/src/backend/base/langflow/api/v1/mcp_projects.py index 388123a88..6c4361be6 100644 --- a/src/backend/base/langflow/api/v1/mcp_projects.py +++ b/src/backend/base/langflow/api/v1/mcp_projects.py @@ -32,7 +32,7 @@ from langflow.api.v1.mcp import ( ) from langflow.api.v1.schemas import MCPInstallRequest, MCPSettings, SimplifiedAPIRequest from langflow.base.mcp.constants import MAX_MCP_SERVER_NAME_LENGTH, MAX_MCP_TOOL_NAME_LENGTH -from langflow.base.mcp.util import get_flow_snake_case, get_unique_name +from langflow.base.mcp.util import get_flow_snake_case, get_unique_name, sanitize_mcp_name from langflow.helpers.flow import json_schema_from_flow from langflow.schema.message import Message from langflow.services.database.models import Flow, Folder @@ -96,10 +96,10 @@ async def list_project_tools( continue # Format the flow name according to MCP conventions (snake_case) - flow_name = "_".join(flow.name.lower().split()) + flow_name = sanitize_mcp_name(flow.name) # Use action_name and action_description if available, otherwise use defaults - name = flow.action_name or flow_name + name = sanitize_mcp_name(flow.action_name) if flow.action_name else flow_name description = flow.action_description or ( flow.description if flow.description else f"Tool generated from flow: {flow_name}" ) @@ -395,14 +395,14 @@ async def install_mcp_config( # Create the MCP configuration mcp_config = { "mcpServers": { - f"lf-{name.lower().replace(' ', '_')[: (MAX_MCP_SERVER_NAME_LENGTH - 4)]}": { + f"lf-{sanitize_mcp_name(name)[: (MAX_MCP_SERVER_NAME_LENGTH - 4)]}": { "command": command, "args": args, } } } - server_name = f"lf-{name.lower().replace(' ', '_')[: (MAX_MCP_SERVER_NAME_LENGTH - 4)]}" + server_name = f"lf-{sanitize_mcp_name(name)[: (MAX_MCP_SERVER_NAME_LENGTH - 4)]}" logger.debug("Installing MCP config for project: %s (server name: %s)", project.name, server_name) # Determine the config file path based on the client and OS @@ -518,7 +518,7 @@ async def check_installed_mcp_servers( # Project server name pattern (must match the logic in install function) name = project.name name = NEW_FOLDER_NAME if name == DEFAULT_FOLDER_NAME else name - project_server_name = f"lf-{name.lower().replace(' ', '_')[: (MAX_MCP_SERVER_NAME_LENGTH - 4)]}" + project_server_name = f"lf-{sanitize_mcp_name(name)[: (MAX_MCP_SERVER_NAME_LENGTH - 4)]}" logger.debug( "Checking for installed MCP servers for project: %s (server name: %s)", project.name, project_server_name @@ -670,7 +670,9 @@ class ProjectMCPServer: continue # Use action_name if available, otherwise construct from flow name - base_name = flow.action_name or "_".join(flow.name.lower().split()) + base_name = ( + sanitize_mcp_name(flow.action_name) if flow.action_name else sanitize_mcp_name(flow.name) + ) name = get_unique_name(base_name, MAX_MCP_TOOL_NAME_LENGTH, existing_names) # Use action_description if available, otherwise use defaults diff --git a/src/backend/base/langflow/api/v2/mcp.py b/src/backend/base/langflow/api/v2/mcp.py index 7ac5b0a77..301b017a0 100644 --- a/src/backend/base/langflow/api/v2/mcp.py +++ b/src/backend/base/langflow/api/v2/mcp.py @@ -128,10 +128,14 @@ async def get_servers( # Configuration validation errors, invalid URLs, etc. logger.error(f"Configuration error for server {server_name}: {e}") server_info["error"] = f"Configuration error: {e}" - except (ConnectionError, TimeoutError) as e: + except ConnectionError as e: # Network connection and timeout issues logger.error(f"Connection error for server {server_name}: {e}") server_info["error"] = f"Connection failed: {e}" + except (TimeoutError, asyncio.TimeoutError) as e: + # Timeout errors + logger.error(f"Timeout error for server {server_name}: {e}") + server_info["error"] = "Timeout when checking server tools" except OSError as e: # System-level errors (process execution, file access) logger.error(f"System error for server {server_name}: {e}") @@ -149,24 +153,21 @@ async def get_servers( if hasattr(e, "exceptions") and e.exceptions: # Extract the first underlying exception for a more meaningful error message underlying_error = e.exceptions[0] - logger.exception(f"Error checking server {server_name}: {underlying_error}") + if hasattr(underlying_error, "exceptions"): + logger.error( + f"Error checking server {server_name}: {underlying_error}, {underlying_error.exceptions}" + ) + underlying_error = underlying_error.exceptions[0] + else: + logger.exception(f"Error checking server {server_name}: {underlying_error}") server_info["error"] = f"Error loading server: {underlying_error}" else: logger.exception(f"Error checking server {server_name}: {e}") server_info["error"] = f"Error loading server: {e}" return server_info - async def check_server_with_timeout(server_name: str) -> dict: - try: - return await asyncio.wait_for( - check_server(server_name), timeout=get_settings_service().settings.mcp_server_timeout - ) - except asyncio.TimeoutError: - logger.error(f"Timeout checking server {server_name}") - return {"name": server_name, "mode": None, "toolsCount": None, "error": "Server check timed out."} - # Run all server checks concurrently - tasks = [check_server_with_timeout(server) for server in server_list["mcpServers"]] + tasks = [check_server(server) for server in server_list["mcpServers"]] return await asyncio.gather(*tasks, return_exceptions=True) diff --git a/src/backend/base/langflow/base/mcp/util.py b/src/backend/base/langflow/base/mcp/util.py index eb10a264f..a2adb95c6 100644 --- a/src/backend/base/langflow/base/mcp/util.py +++ b/src/backend/base/langflow/base/mcp/util.py @@ -1,7 +1,9 @@ import asyncio import os import platform +import re import shutil +import unicodedata from collections.abc import Awaitable, Callable from typing import Any from urllib.parse import urlparse @@ -22,6 +24,77 @@ HTTP_ERROR_STATUS_CODE = httpx_codes.BAD_REQUEST # HTTP status code for client NULLABLE_TYPE_LENGTH = 2 # Number of types in a nullable union (the type itself + null) +def sanitize_mcp_name(name: str, max_length: int = 46) -> str: + """Sanitize a name for MCP usage by removing emojis, diacritics, and special characters. + + Args: + name: The original name to sanitize + max_length: Maximum length for the sanitized name + + Returns: + A sanitized name containing only letters, numbers, hyphens, and underscores + """ + if not name or not name.strip(): + return "" + + # Remove emojis using regex pattern + emoji_pattern = re.compile( + "[" + "\U0001f600-\U0001f64f" # emoticons + "\U0001f300-\U0001f5ff" # symbols & pictographs + "\U0001f680-\U0001f6ff" # transport & map symbols + "\U0001f1e0-\U0001f1ff" # flags (iOS) + "\U00002500-\U00002bef" # chinese char + "\U00002702-\U000027b0" + "\U00002702-\U000027b0" + "\U000024c2-\U0001f251" + "\U0001f926-\U0001f937" + "\U00010000-\U0010ffff" + "\u2640-\u2642" + "\u2600-\u2b55" + "\u200d" + "\u23cf" + "\u23e9" + "\u231a" + "\ufe0f" # dingbats + "\u3030" + "]+", + flags=re.UNICODE, + ) + + # Remove emojis + name = emoji_pattern.sub("", name) + + # Normalize unicode characters to remove diacritics + name = unicodedata.normalize("NFD", name) + name = "".join(char for char in name if unicodedata.category(char) != "Mn") + + # Replace spaces and special characters with underscores + name = re.sub(r"[^\w\s-]", "", name) # Keep only word chars, spaces, and hyphens + name = re.sub(r"[-\s]+", "_", name) # Replace spaces and hyphens with underscores + name = re.sub(r"_+", "_", name) # Collapse multiple underscores + + # Remove leading/trailing underscores + name = name.strip("_") + + # Ensure it starts with a letter or underscore (not a number) + if name and name[0].isdigit(): + name = f"_{name}" + + # Convert to lowercase + name = name.lower() + + # Truncate to max length + if len(name) > max_length: + name = name[:max_length].rstrip("_") + + # If empty after sanitization, provide a default + if not name: + name = "unnamed" + + return name + + def create_tool_coroutine(tool_name: str, arg_schema: type[BaseModel], client) -> Callable[..., Awaitable]: async def tool_coroutine(*args, **kwargs): # Get field names from the model (preserving order) @@ -101,7 +174,11 @@ async def get_flow_snake_case(flow_name: str, user_id: str, session, is_action: flows = (await session.exec(stmt)).all() for flow in flows: - this_flow_name = flow.action_name if is_action and flow.action_name else "_".join(flow.name.lower().split()) + if is_action and flow.action_name: + this_flow_name = sanitize_mcp_name(flow.action_name) + else: + this_flow_name = sanitize_mcp_name(flow.name) + if this_flow_name == flow_name: return flow return None @@ -295,16 +372,179 @@ async def _validate_connection_params(mode: str, command: str | None = None, url raise ValueError(msg) -class MCPStdioClient: +class MCPSessionManager: + """Manages persistent MCP sessions with proper context manager lifecycle.""" + def __init__(self): + self.sessions = {} # context_id -> session_info + self._background_tasks = set() # Keep references to background tasks + + async def get_session(self, context_id: str, connection_params, transport_type: str): + """Get or create a persistent session.""" + if context_id in self.sessions: + session_info = self.sessions[context_id] + # Check if session and background task are still alive + try: + session = session_info["session"] + task = session_info["task"] + if ( + not task.done() + and hasattr(session, "_closed") + and not session._closed + and hasattr(session, "_write_stream") + and hasattr(session._write_stream, "_closed") + and not session._write_stream._closed + ): + return session + except Exception: # noqa: BLE001 + msg = f"Session for context_id {context_id} is dead" + logger.info(msg) + # Session is dead, clean it up + await self._cleanup_session(context_id) + + # Create new session + if transport_type == "stdio": + return await self._create_stdio_session(context_id, connection_params) + if transport_type == "sse": + return await self._create_sse_session(context_id, connection_params) + msg = f"Unknown transport type: {transport_type}" + raise ValueError(msg) + + async def _create_stdio_session(self, context_id: str, connection_params): + """Create a new stdio session as a background task to avoid context issues.""" + import asyncio + + from mcp.client.stdio import stdio_client + + # Create a future to get the session + session_future: asyncio.Future[ClientSession] = asyncio.Future() + + async def session_task(): + """Background task that keeps the session alive.""" + try: + async with stdio_client(connection_params) as (read, write): + session = ClientSession(read, write) + async with session: + await session.initialize() + # Signal that session is ready + session_future.set_result(session) + + # Keep the session alive until cancelled + import anyio + + event = anyio.Event() + try: + await event.wait() + except asyncio.CancelledError: + # Session is being shut down + msg = "Message is shutting down" + logger.info(msg) + except Exception as e: # noqa: BLE001 + if not session_future.done(): + session_future.set_exception(e) + + # Start the background task + task = asyncio.create_task(session_task()) + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) + + # Wait for session to be ready + session = await session_future + + # Store session info + self.sessions[context_id] = {"session": session, "task": task, "type": "stdio"} + + return session + + async def _create_sse_session(self, context_id: str, connection_params): + """Create a new SSE session as a background task to avoid context issues.""" + import asyncio + + from mcp.client.sse import sse_client + + # Create a future to get the session + session_future: asyncio.Future[ClientSession] = asyncio.Future() + + async def session_task(): + """Background task that keeps the session alive.""" + try: + async with sse_client( + connection_params["url"], + connection_params["headers"], + connection_params["timeout_seconds"], + connection_params["sse_read_timeout_seconds"], + ) as (read, write): + session = ClientSession(read, write) + async with session: + await session.initialize() + # Signal that session is ready + session_future.set_result(session) + + # Keep the session alive until cancelled + import anyio + + event = anyio.Event() + try: + await event.wait() + except asyncio.CancelledError: + # Session is being shut down + msg = "Message is shutting down" + logger.info(msg) + except Exception as e: # noqa: BLE001 + if not session_future.done(): + session_future.set_exception(e) + + # Start the background task + task = asyncio.create_task(session_task()) + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) + + # Wait for session to be ready + session = await session_future + + # Store session info + self.sessions[context_id] = {"session": session, "task": task, "type": "sse"} + + return session + + async def _cleanup_session(self, context_id: str): + """Clean up a session by cancelling its background task.""" + if context_id not in self.sessions: + return + + session_info = self.sessions[context_id] + try: + # Cancel the background task which will properly close the session + if "task" in session_info: + task = session_info["task"] + if not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + logger.info(f"Issue cancelling task for context_id {context_id}") + except Exception as e: # noqa: BLE001 + logger.info(f"issue cleaning up mcp session: {e}") + finally: + del self.sessions[context_id] + + async def cleanup_all(self): + """Clean up all sessions.""" + for context_id in list(self.sessions.keys()): + await self._cleanup_session(context_id) + + +class MCPStdioClient: + def __init__(self, component_cache=None): self.session: ClientSession | None = None self._connection_params = None self._connected = False + self._session_context: str | None = None + self._component_cache = component_cache async def _connect_to_server(self, command_str: str, env: dict[str, str] | None = None) -> list[StructuredTool]: """Connect to MCP server using stdio transport (SDK style).""" from mcp import StdioServerParameters - from mcp.client.stdio import stdio_client command = command_str.split(" ") env_data: dict[str, str] = {"DEBUG": "true", "PATH": os.environ["PATH"], **(env or {})} @@ -328,39 +568,101 @@ class MCPStdioClient: # Store connection parameters for later use in run_tool self._connection_params = server_params - try: - async with stdio_client(server_params) as (read, write), ClientSession(read, write) as session: - await session.initialize() - response = await session.list_tools() - self._connected = True - return response.tools - except (ConnectionError, TimeoutError, OSError, ValueError) as e: - logger.error(f"Failed to connect to MCP stdio server: {e}") - self._connection_params = None - self._connected = False - return [] + # If no session context is set, create a default one + if not self._session_context: + # Generate a fallback context based on connection parameters + import uuid + + param_hash = uuid.uuid4().hex[:8] + self._session_context = f"default_{param_hash}" + + # Get or create a persistent session + session = await self._get_or_create_session() + response = await session.list_tools() + self._connected = True + return response.tools async def connect_to_server(self, command_str: str, env: dict[str, str] | None = None) -> list[StructuredTool]: """Connect to MCP server using stdio transport (SDK style).""" + return await asyncio.wait_for( + self._connect_to_server(command_str, env), timeout=get_settings_service().settings.mcp_server_timeout + ) + + def set_session_context(self, context_id: str): + """Set the session context (e.g., flow_id + user_id + session_id).""" + self._session_context = context_id + + def _get_session_manager(self) -> MCPSessionManager: + """Get or create session manager from component cache.""" + if not self._component_cache: + # Fallback to instance-level session manager if no cache + if not hasattr(self, "_session_manager"): + self._session_manager = MCPSessionManager() + return self._session_manager + + from langflow.services.cache.utils import CacheMiss + + session_manager = self._component_cache.get("mcp_session_manager") + if isinstance(session_manager, CacheMiss): + session_manager = MCPSessionManager() + self._component_cache.set("mcp_session_manager", session_manager) + return session_manager + + async def _get_or_create_session(self) -> ClientSession: + """Get or create a persistent session for the current context.""" + if not self._session_context or not self._connection_params: + msg = "Session context and connection params must be set" + raise ValueError(msg) + + # Use cached session manager to get/create persistent session + session_manager = self._get_session_manager() + return await session_manager.get_session(self._session_context, self._connection_params, "stdio") + + async def _create_fresh_session(self) -> ClientSession: + """Create a fresh session using the async context manager pattern.""" + from mcp.client.stdio import stdio_client + + # This creates a temporary session that will be closed when the context exits + # We need to use this within an async context + stdio_ctx = stdio_client(self._connection_params) + read, write = await stdio_ctx.__aenter__() + session = ClientSession(read, write) + await session.__aenter__() + await session.initialize() + + # Store the context managers so they can be cleaned up later + session._stdio_ctx = stdio_ctx + session._stdio_read = read + session._stdio_write = write + + return session + + async def _cleanup_session(self, session: ClientSession): + """Clean up a session and its associated resources.""" try: - return await asyncio.wait_for( - self._connect_to_server(command_str, env), timeout=get_settings_service().settings.mcp_server_timeout - ) - except (ConnectionError, TimeoutError, OSError, ValueError) as e: - logger.error(f"Failed to connect to MCP stdio server: {e}") - self._connection_params = None - self._connected = False - msg = f"Failed to connect to MCP stdio server: {e}" - raise ValueError(msg) from e + await session.__aexit__(None, None, None) + except Exception as e: # noqa: BLE001 + logger.info(f"issue cleaning up mcp session: {e}") + try: + if hasattr(session, "_stdio_ctx"): + await session._stdio_ctx.__aexit__(None, None, None) + except Exception as e: # noqa: BLE001 + logger.info(f"issue cleaning up mcp session: {e}") async def disconnect(self): """Properly close the connection and clean up resources.""" + # Clean up session using session manager + if self._session_context: + session_manager = self._get_session_manager() + await session_manager._cleanup_session(self._session_context) + self.session = None self._connection_params = None self._connected = False + self._session_context = None async def run_tool(self, tool_name: str, arguments: dict[str, Any]) -> Any: - """Run a tool with the given arguments. + """Run a tool with the given arguments using context-specific session. Args: tool_name: Name of the tool to run @@ -376,16 +678,26 @@ class MCPStdioClient: msg = "Session not initialized or disconnected. Call connect_to_server first." raise ValueError(msg) - try: - from mcp.client.stdio import stdio_client + # If no session context is set, create a default one + if not self._session_context: + # Generate a fallback context based on connection parameters + import uuid + + param_hash = uuid.uuid4().hex[:8] + self._session_context = f"default_{param_hash}" + + try: + # Get or create persistent session + session = await self._get_or_create_session() + return await session.call_tool(tool_name, arguments=arguments) - async with stdio_client(self._connection_params) as (read, write), ClientSession(read, write) as session: - await session.initialize() - return await session.call_tool(tool_name, arguments=arguments) except (ConnectionError, TimeoutError, OSError, ValueError) as e: msg = f"Failed to run tool '{tool_name}': {e}" logger.error(msg) - # Mark as disconnected on error + # Clean up failed session from cache + if self._session_context and self._component_cache: + cache_key = f"mcp_session_stdio_{self._session_context}" + self._component_cache.delete(cache_key) self._connected = False raise ValueError(msg) from e @@ -397,10 +709,28 @@ class MCPStdioClient: class MCPSseClient: - def __init__(self): + def __init__(self, component_cache=None): self.session: ClientSession | None = None self._connection_params = None self._connected = False + self._session_context: str | None = None + self._component_cache = component_cache + + def _get_session_manager(self) -> MCPSessionManager: + """Get or create session manager from component cache.""" + if not self._component_cache: + # Fallback to instance-level session manager if no cache + if not hasattr(self, "_session_manager"): + self._session_manager = MCPSessionManager() + return self._session_manager + + from langflow.services.cache.utils import CacheMiss + + session_manager = self._component_cache.get("mcp_session_manager") + if isinstance(session_manager, CacheMiss): + session_manager = MCPSessionManager() + self._component_cache.set("mcp_session_manager", session_manager) + return session_manager async def validate_url(self, url: str | None) -> tuple[bool, str]: """Validate the SSE URL before attempting connection.""" @@ -447,8 +777,6 @@ class MCPSseClient: sse_read_timeout_seconds: int = 30, ) -> list[StructuredTool]: """Connect to MCP server using SSE transport (SDK style).""" - from mcp.client.sse import sse_client - if headers is None: headers = {} if url is None: @@ -469,42 +797,54 @@ class MCPSseClient: "sse_read_timeout_seconds": sse_read_timeout_seconds, } - try: - async with ( - sse_client(url, headers, timeout_seconds, sse_read_timeout_seconds) as (read, write), - ClientSession(read, write) as session, - ): - await session.initialize() - response = await session.list_tools() - self._connected = True - return response.tools - except (ConnectionError, TimeoutError, OSError, ValueError) as e: - logger.error(f"Failed to connect to MCP SSE server: {e}") - self._connection_params = None - self._connected = False - msg = f"Failed to connect to MCP SSE server: {e}" - raise ValueError(msg) from e + # If no session context is set, create a default one + if not self._session_context: + # Generate a fallback context based on connection parameters + import uuid + + param_hash = uuid.uuid4().hex[:8] + self._session_context = f"default_sse_{param_hash}" + + # Get or create a persistent session + session = await self._get_or_create_session() + response = await session.list_tools() + self._connected = True + return response.tools async def connect_to_server(self, url: str, headers: dict[str, str] | None = None) -> list[StructuredTool]: """Connect to MCP server using SSE transport (SDK style).""" - try: - return await asyncio.wait_for( - self._connect_to_server(url, headers), timeout=get_settings_service().settings.mcp_server_timeout - ) - except (ConnectionError, TimeoutError, OSError, ValueError) as e: - logger.error(f"Failed to connect to MCP SSE server: {e}") - self._connection_params = None - self._connected = False - return [] + return await asyncio.wait_for( + self._connect_to_server(url, headers), timeout=get_settings_service().settings.mcp_server_timeout + ) + + def set_session_context(self, context_id: str): + """Set the session context (e.g., flow_id + user_id + session_id).""" + self._session_context = context_id + + async def _get_or_create_session(self) -> ClientSession: + """Get or create a persistent session for the current context.""" + if not self._session_context or not self._connection_params: + msg = "Session context and params must be set" + raise ValueError(msg) + + # Use cached session manager to get/create persistent session + session_manager = self._get_session_manager() + return await session_manager.get_session(self._session_context, self._connection_params, "sse") async def disconnect(self): """Properly close the connection and clean up resources.""" + # Clean up session using session manager + if self._session_context: + session_manager = self._get_session_manager() + await session_manager._cleanup_session(self._session_context) + self.session = None self._connection_params = None self._connected = False + self._session_context = None async def run_tool(self, tool_name: str, arguments: dict[str, Any]) -> Any: - """Run a tool with the given arguments. + """Run a tool with the given arguments using context-specific session. Args: tool_name: Name of the tool to run @@ -520,22 +860,26 @@ class MCPSseClient: msg = "Session not initialized or disconnected. Call connect_to_server first." raise ValueError(msg) - try: - from mcp.client.sse import sse_client + # If no session context is set, create a default one + if not self._session_context: + # Generate a fallback context based on connection parameters + import uuid + + param_hash = uuid.uuid4().hex[:8] + self._session_context = f"default_sse_{param_hash}" + + try: + # Get or create persistent session + session = await self._get_or_create_session() + return await session.call_tool(tool_name, arguments=arguments) - params = self._connection_params - async with ( - sse_client( - params["url"], params["headers"], params["timeout_seconds"], params["sse_read_timeout_seconds"] - ) as (read, write), - ClientSession(read, write) as session, - ): - await session.initialize() - return await session.call_tool(tool_name, arguments=arguments) except (ConnectionError, TimeoutError, OSError, ValueError) as e: msg = f"Failed to run tool '{tool_name}': {e}" logger.error(msg) - # Mark as disconnected on error + # Clean up failed session from cache + if self._session_context and self._component_cache: + cache_key = f"mcp_session_sse_{self._session_context}" + self._component_cache.delete(cache_key) self._connected = False raise ValueError(msg) from e @@ -562,78 +906,66 @@ async def update_tools( if mcp_sse_client is None: mcp_sse_client = MCPSseClient() + # Fetch server config from backend + mode = "Stdio" if "command" in server_config else "SSE" if "url" in server_config else "" + command = server_config.get("command", "") + url = server_config.get("url", "") + tools = [] + headers = _process_headers(server_config.get("headers", {})) + try: - # Fetch server config from backend - mode = "Stdio" if "command" in server_config else "SSE" if "url" in server_config else "" - command = server_config.get("command", "") - url = server_config.get("url", "") - tools = [] - headers = _process_headers(server_config.get("headers", {})) + await _validate_connection_params(mode, command, url) + except ValueError as e: + logger.error(f"Invalid MCP server configuration for '{server_name}': {e}") + raise - try: - await _validate_connection_params(mode, command, url) - except ValueError as e: - logger.error(f"Invalid MCP server configuration for '{server_name}': {e}") - return "", [], {} + # Determine connection type and parameters + client: MCPStdioClient | MCPSseClient | None = None + if mode == "Stdio": + # Stdio connection + args = server_config.get("args", []) + env = server_config.get("env", {}) + full_command = " ".join([command, *args]) + tools = await mcp_stdio_client.connect_to_server(full_command, env) + client = mcp_stdio_client + elif mode == "SSE": + # SSE connection + tools = await mcp_sse_client.connect_to_server(url, headers=headers) + client = mcp_sse_client + else: + logger.error(f"Invalid MCP server mode for '{server_name}': {mode}") + return "", [], {} - # Determine connection type and parameters - client: MCPStdioClient | MCPSseClient | None = None + if not tools or not client or not client._connected: + logger.warning(f"No tools available from MCP server '{server_name}' or connection failed") + return "", [], {} + + tool_list = [] + tool_cache: dict[str, StructuredTool] = {} + for tool in tools: + if not tool or not hasattr(tool, "name"): + continue try: - if mode == "Stdio": - # Stdio connection - args = server_config.get("args", []) - env = server_config.get("env", {}) - full_command = " ".join([command, *args]) - tools = await mcp_stdio_client.connect_to_server(full_command, env) - client = mcp_stdio_client - elif mode == "SSE": - # SSE connection - tools = await mcp_sse_client.connect_to_server(url, headers=headers) - client = mcp_sse_client - else: - logger.error(f"Invalid MCP server mode for '{server_name}': {mode}") - return "", [], {} + args_schema = create_input_schema_from_json_schema(tool.inputSchema) + if not args_schema: + logger.warning(f"Could not create schema for tool '{tool.name}' from server '{server_name}'") + continue + + tool_obj = StructuredTool( + name=tool.name, + description=tool.description or "", + args_schema=args_schema, + func=create_tool_func(tool.name, args_schema, client), + coroutine=create_tool_coroutine(tool.name, args_schema, client), + tags=[tool.name], + metadata={"server_name": server_name}, + ) + tool_list.append(tool_obj) + tool_cache[tool.name] = tool_obj except (ConnectionError, TimeoutError, OSError, ValueError) as e: - logger.error(f"Failed to connect to MCP server '{server_name}': {e}") - # return "", [], {} - msg = f"Failed to connect to MCP server '{server_name}': {e}" - logger.error(msg) + logger.error(f"Failed to create tool '{tool.name}' from server '{server_name}': {e}") + msg = f"Failed to create tool '{tool.name}' from server '{server_name}': {e}" raise ValueError(msg) from e - if not tools or not client or not client._connected: - logger.warning(f"No tools available from MCP server '{server_name}' or connection failed") - return "", [], {} - - tool_list = [] - tool_cache: dict[str, StructuredTool] = {} - for tool in tools: - if not tool or not hasattr(tool, "name"): - continue - try: - args_schema = create_input_schema_from_json_schema(tool.inputSchema) - if not args_schema: - logger.warning(f"Could not create schema for tool '{tool.name}' from server '{server_name}'") - continue - - tool_obj = StructuredTool( - name=tool.name, - description=tool.description or "", - args_schema=args_schema, - func=create_tool_func(tool.name, args_schema, client), - coroutine=create_tool_coroutine(tool.name, args_schema, client), - tags=[tool.name], - metadata={"server_name": server_name}, - ) - tool_list.append(tool_obj) - tool_cache[tool.name] = tool_obj - except (ConnectionError, TimeoutError, OSError, ValueError) as e: - logger.error(f"Failed to create tool '{tool.name}' from server '{server_name}': {e}") - msg = f"Failed to create tool '{tool.name}' from server '{server_name}': {e}" - raise ValueError(msg) from e - - logger.info(f"Successfully loaded {len(tool_list)} tools from MCP server '{server_name}'") - except (ConnectionError, TimeoutError, OSError, ValueError, AttributeError, AssertionError) as e: - logger.error(f"Unexpected error while updating tools for MCP server '{server_name}': {e}") - return "", [], {} - else: - return mode, tool_list, tool_cache + logger.info(f"Successfully loaded {len(tool_list)} tools from MCP server '{server_name}'") + return mode, tool_list, tool_cache diff --git a/src/backend/base/langflow/components/agents/mcp_component.py b/src/backend/base/langflow/components/agents/mcp_component.py index 3abf4f83d..52df3368e 100644 --- a/src/backend/base/langflow/components/agents/mcp_component.py +++ b/src/backend/base/langflow/components/agents/mcp_component.py @@ -1,6 +1,10 @@ +import asyncio import re +import uuid from typing import Any +from langchain_core.tools import StructuredTool + from langflow.api.v2.mcp import get_server from langflow.base.mcp.util import ( MCPSseClient, @@ -10,13 +14,12 @@ from langflow.base.mcp.util import ( ) from langflow.custom.custom_component.component_with_cache import ComponentWithCache from langflow.inputs.inputs import InputTypes -from langflow.io import DropdownInput, McpInput, MessageTextInput, Output # Import McpInput from langflow.io +from langflow.io import DropdownInput, McpInput, MessageTextInput, Output from langflow.io.schema import flatten_schema, schema_to_langflow_inputs from langflow.logging import logger from langflow.schema.dataframe import DataFrame from langflow.schema.message import Message from langflow.services.auth.utils import create_user_longterm_token -from langflow.services.cache.utils import CacheMiss # Import get_server from the backend API from langflow.services.database.models.user.crud import get_user_by_id @@ -63,11 +66,23 @@ def maybe_unflatten_dict(flat: dict[str, Any]) -> dict[str, Any]: class MCPToolsComponent(ComponentWithCache): schema_inputs: list = [] - stdio_client: MCPStdioClient = MCPStdioClient() - sse_client: MCPSseClient = MCPSseClient() - tools: list = [] + tools: list[StructuredTool] = [] + _not_load_actions: bool = False _tool_cache: dict = {} _last_selected_server: str | None = None # Cache for the last selected server + + def __init__(self, **data) -> None: + super().__init__(**data) + # Initialize cache keys to avoid CacheMiss when accessing them + if "servers" not in self._shared_component_cache: + self._shared_component_cache["servers"] = {} + if "last_selected_server" not in self._shared_component_cache: + self._shared_component_cache["last_selected_server"] = "" + + # Initialize clients with access to the component cache + self.stdio_client: MCPStdioClient = MCPStdioClient(component_cache=self._shared_component_cache) + self.sse_client: MCPSseClient = MCPSseClient(component_cache=self._shared_component_cache) + default_keys: list[str] = [ "code", "_type", @@ -154,8 +169,8 @@ class MCPToolsComponent(ComponentWithCache): return [], {"name": server_name, "config": server_config_from_value} # Use shared cache if available - cached = self._shared_component_cache.get(server_name) - if not isinstance(cached, CacheMiss): + cached = self._shared_component_cache["servers"].get(server_name) + if cached is not None: self.tools = cached["tools"] self.tool_names = cached["tool_names"] self._tool_cache = cached["tool_cache"] @@ -195,16 +210,18 @@ class MCPToolsComponent(ComponentWithCache): self._tool_cache = tool_cache self.tools = tool_list # Cache the result using shared cache - self._shared_component_cache.set( - server_name, - { - "tools": tool_list, - "tool_names": self.tool_names, - "tool_cache": tool_cache, - "config": server_config, - }, - ) + + self._shared_component_cache["servers"][server_name] = { + "tools": tool_list, + "tool_names": self.tool_names, + "tool_cache": tool_cache, + "config": server_config, + } return tool_list, {"name": server_name, "config": server_config} + except (TimeoutError, asyncio.TimeoutError) as e: + msg = f"Timeout updating tool list: {e!s}" + logger.exception(msg) + raise TimeoutError(msg) from e except Exception as e: msg = f"Error updating tool list: {e!s}" logger.exception(msg) @@ -219,22 +236,35 @@ class MCPToolsComponent(ComponentWithCache): try: self.tools, build_config["mcp_server"]["value"] = await self.update_tool_list() build_config["tool"]["options"] = [tool.name for tool in self.tools] + build_config["tool"]["placeholder"] = "Select a tool" + except (TimeoutError, asyncio.TimeoutError) as e: + msg = f"Timeout updating tool list: {e!s}" + logger.exception(msg) + if not build_config["tools_metadata"]["show"]: + build_config["tool"]["show"] = True + build_config["tool"]["options"] = [] + build_config["tool"]["value"] = "" + build_config["tool"]["placeholder"] = "Timeout on MCP server" + else: + build_config["tool"]["show"] = False except ValueError: - build_config["tool"]["options"] = [] - build_config["tool"]["value"] = "" - build_config["tool"]["placeholder"] = "Error on MCP Server" - return build_config - build_config["tool"]["placeholder"] = "" + if not build_config["tools_metadata"]["show"]: + build_config["tool"]["show"] = True + build_config["tool"]["options"] = [] + build_config["tool"]["value"] = "" + build_config["tool"]["placeholder"] = "Error on MCP Server" + else: + build_config["tool"]["show"] = False if field_value == "": return build_config tool_obj = None for tool in self.tools: - if tool.name == self.tool: + if tool.name == field_value: tool_obj = tool break if tool_obj is None: - msg = f"Tool {self.tool} not found in available tools: {self.tools}" + msg = f"Tool {field_value} not found in available tools: {self.tools}" logger.warning(msg) return build_config await self._update_tool_config(build_config, field_value) @@ -254,71 +284,63 @@ class MCPToolsComponent(ComponentWithCache): return build_config current_server_name = field_value.get("name") if isinstance(field_value, dict) else field_value + _last_selected_server = self._shared_component_cache.get("last_selected_server") or "" # To avoid unnecessary updates, only proceed if the server has actually changed - if self._last_selected_server == current_server_name: + if (_last_selected_server in (current_server_name, "")) and build_config["tool"]["show"]: return build_config # Determine if "Tool Mode" is active by checking if the tool dropdown is hidden. - # The check for _last_selected_server handles the initial state where the dropdown is - # hidden by default but we are not yet in "Tool Mode". - is_in_tool_mode = build_config["tools_metadata"]["value"] - self._last_selected_server = current_server_name - self.tools = [] # Clear previous tools + is_in_tool_mode = build_config["tools_metadata"]["show"] + self._shared_component_cache.set("last_selected_server", current_server_name) + + # Check if tools are already cached for this server before clearing + cached_tools = None + if current_server_name: + cached = self._shared_component_cache["servers"].get(current_server_name) + if cached is not None: + cached_tools = cached["tools"] + self.tools = cached_tools + self.tool_names = cached["tool_names"] + self._tool_cache = cached["tool_cache"] + + # Only clear tools if we don't have cached tools for the current server + if not cached_tools: + self.tools = [] # Clear previous tools only if no cache + self.remove_non_default_keys(build_config) # Clear previous tool inputs # Only show the tool dropdown if not in tool_mode if not is_in_tool_mode: build_config["tool"]["show"] = True - build_config["tool"]["placeholder"] = "Loading tools..." - build_config["tool"]["options"] = [] - build_config["tool"]["value"] = "" + if cached_tools: + # Use cached tools to populate options immediately + build_config["tool"]["options"] = [tool.name for tool in cached_tools] + build_config["tool"]["placeholder"] = "Select a tool" + else: + # Show loading state only when we need to fetch tools + build_config["tool"]["placeholder"] = "Loading tools..." + build_config["tool"]["options"] = [] + build_config["tool"]["value"] = uuid.uuid4() else: # Keep the tool dropdown hidden if in tool_mode + self._not_load_actions = True build_config["tool"]["show"] = False - try: - # Fetch tools for the newly selected server - tools, server_info = await self.update_tool_list(field_value) - build_config["mcp_server"]["value"] = server_info - - if tools: - tool_names = [tool.name for tool in tools] - if not is_in_tool_mode: - build_config["tool"]["options"] = tool_names - build_config["tool"]["placeholder"] = "Select a tool" - elif not is_in_tool_mode: - build_config["tool"]["placeholder"] = "No tools found" - - except (ValueError, AttributeError, KeyError, TypeError, ConnectionError, TimeoutError) as e: - logger.error(f"Failed to fetch tools for server '{current_server_name}': {e}") - if not is_in_tool_mode: - build_config["tool"]["placeholder"] = "Error fetching tools" - build_config["tool"]["options"] = [] - build_config["tool"]["value"] = "" - finally: - # Ensure we don't show inputs for a tool that might no longer be valid - await self._update_tool_config(build_config, "") - elif field_name == "tool_mode": - try: - self.tools, build_config["mcp_server"]["value"] = await self.update_tool_list() - except ValueError: - if not build_config["tools_metadata"]["show"]: - build_config["tool"]["show"] = True - build_config["tool"]["options"] = [] - build_config["tool"]["value"] = "" - build_config["tool"]["placeholder"] = "Error on MCP Server" - else: - build_config["tool"]["show"] = False build_config["tool"]["placeholder"] = "" build_config["tool"]["show"] = not field_value - for key, value in list(build_config.items()): - if key not in self.default_keys and isinstance(value, dict) and "show" in value: - build_config[key]["show"] = not field_value - if not field_value: - build_config["tool"]["options"] = [tool.name for tool in self.tools] - await self._update_tool_config(build_config, build_config["tool"]["value"]) + self.remove_non_default_keys(build_config) + self.tool = build_config["tool"]["value"] + if field_value: + self._not_load_actions = True + else: + build_config["tool"]["value"] = uuid.uuid4() + build_config["tool"]["options"] = [] + build_config["tool"]["show"] = True + build_config["tool"]["placeholder"] = "Loading tools..." + elif field_name == "tools_metadata": + self._not_load_actions = False except Exception as e: msg = f"Error in update_build_config: {e!s}" @@ -434,6 +456,12 @@ class MCPToolsComponent(ComponentWithCache): try: self.tools, _ = await self.update_tool_list() if self.tool != "": + # Set session context for persistent MCP sessions using Langflow session ID + session_context = self._get_session_context() + if session_context: + self.stdio_client.set_session_context(session_context) + self.sse_client.set_session_context(session_context) + exec_tool = self._tool_cache[self.tool] tool_args = self.get_inputs_for_all_tools(self.tools)[self.tool] kwargs = {} @@ -460,8 +488,25 @@ class MCPToolsComponent(ComponentWithCache): logger.exception(msg) raise ValueError(msg) from e + def _get_session_context(self) -> str | None: + """Get the Langflow session ID for MCP session caching.""" + # Try to get session ID from the component's execution context + if hasattr(self, "graph") and hasattr(self.graph, "session_id"): + session_id = self.graph.session_id + # Include server name to ensure different servers get different sessions + server_name = "" + mcp_server = getattr(self, "mcp_server", None) + if isinstance(mcp_server, dict): + server_name = mcp_server.get("name", "") + elif mcp_server: + server_name = str(mcp_server) + return f"{session_id}_{server_name}" if session_id else None + return None + async def _get_tools(self): """Get cached tools or update if necessary.""" mcp_server = getattr(self, "mcp_server", None) - tools, _ = await self.update_tool_list(mcp_server) - return tools + if not self._not_load_actions: + tools, _ = await self.update_tool_list(mcp_server) + return tools + return [] diff --git a/src/backend/base/langflow/custom/custom_component/component.py b/src/backend/base/langflow/custom/custom_component/component.py index 14a657fe2..7ad5fa56c 100644 --- a/src/backend/base/langflow/custom/custom_component/component.py +++ b/src/backend/base/langflow/custom/custom_component/component.py @@ -1294,7 +1294,20 @@ class Component(CustomComponent): } async def _build_tools_metadata_input(self): - tools = await self._get_tools() + try: + from langflow.io import ToolsInput + except ImportError as e: + msg = "Failed to import ToolsInput from langflow.io" + raise ImportError(msg) from e + placeholder = None + tools = [] + try: + tools = await self._get_tools() + placeholder = "Loading actions..." if len(tools) == 0 else "" + except (TimeoutError, asyncio.TimeoutError): + placeholder = "Timeout loading actions" + except (ConnectionError, OSError, ValueError): + placeholder = "Error loading actions" # Always use the latest tool data tool_data = [self._build_tool_data(tool) for tool in tools] # print(tool_data) @@ -1322,14 +1335,9 @@ class Component(CustomComponent): item["status"] = any(enabled_name in [item["name"], *item["tags"]] for enabled_name in enabled) self.tools_metadata = tool_data - try: - from langflow.io import ToolsInput - except ImportError as e: - msg = "Failed to import ToolsInput from langflow.io" - raise ImportError(msg) from e - return ToolsInput( name=TOOLS_METADATA_INPUT_NAME, + placeholder=placeholder, display_name="Actions", info=TOOLS_METADATA_INFO, value=tool_data, diff --git a/src/backend/base/langflow/template/frontend_node/custom_components.py b/src/backend/base/langflow/template/frontend_node/custom_components.py index 0dd476788..e465e2175 100644 --- a/src/backend/base/langflow/template/frontend_node/custom_components.py +++ b/src/backend/base/langflow/template/frontend_node/custom_components.py @@ -67,6 +67,7 @@ class CustomComponentFrontendNode(FrontendNode): ) description: str | None = None base_classes: list[str] = [] + last_updated: str | None = None class ComponentFrontendNode(FrontendNode): diff --git a/src/backend/base/pyproject.toml b/src/backend/base/pyproject.toml index 9ecf7c056..0ffa40ab0 100644 --- a/src/backend/base/pyproject.toml +++ b/src/backend/base/pyproject.toml @@ -77,7 +77,7 @@ dependencies = [ "validators>=0.34.0", "networkx>=3.4.2", "json-repair>=0.30.3", - "mcp~=1.9.4", + "mcp~=1.10.1", "aiosqlite>=0.20.0", "greenlet>=3.1.1", "jsonquerylang>=1.1.1", diff --git a/src/frontend/src/CustomNodes/helpers/mutate-template.ts b/src/frontend/src/CustomNodes/helpers/mutate-template.ts index 9d5942e02..220dc97ff 100644 --- a/src/frontend/src/CustomNodes/helpers/mutate-template.ts +++ b/src/frontend/src/CustomNodes/helpers/mutate-template.ts @@ -59,14 +59,15 @@ export const mutateTemplate = async ( newTemplate.outputs ?? [], ); newNode.tool_mode = toolMode ?? node.tool_mode; - } - try { - setNodeClass(newNode); - } catch (e) { - if (e instanceof Error && e.message === "Node not found") { - console.log("Node not found"); - } else { - throw e; + newNode.last_updated = newTemplate.last_updated; + try { + setNodeClass(newNode); + } catch (e) { + if (e instanceof Error && e.message === "Node not found") { + console.log("Node not found"); + } else { + throw e; + } } } callback?.(); diff --git a/src/frontend/src/components/core/parameterRenderComponent/components/ToolsComponent/index.tsx b/src/frontend/src/components/core/parameterRenderComponent/components/ToolsComponent/index.tsx index 5c68b1a50..b22512d40 100644 --- a/src/frontend/src/components/core/parameterRenderComponent/components/ToolsComponent/index.tsx +++ b/src/frontend/src/components/core/parameterRenderComponent/components/ToolsComponent/index.tsx @@ -15,6 +15,7 @@ export default function ToolsComponent({ id = "", handleOnNewValue, isAction = false, + placeholder, button_description, title, icon, @@ -46,18 +47,17 @@ export default function ToolsComponent({ disabled && "cursor-not-allowed", )} > - {value && ( - - )} +
setIsModalOpen(true)} > - {value.length === 0 ? "No actions available" : "Select actions"} + {placeholder || + (value.length === 0 + ? "No actions available" + : "Select actions")} )} diff --git a/src/frontend/src/components/core/parameterRenderComponent/components/mcpComponent/index.tsx b/src/frontend/src/components/core/parameterRenderComponent/components/mcpComponent/index.tsx index 87a73dfa0..3356d0b0e 100644 --- a/src/frontend/src/components/core/parameterRenderComponent/components/mcpComponent/index.tsx +++ b/src/frontend/src/components/core/parameterRenderComponent/components/mcpComponent/index.tsx @@ -26,7 +26,9 @@ export default function McpComponent({ description: server.toolsCount === null ? server.error - ? "Error" + ? server.error.startsWith("Timeout") + ? "Timeout" + : "Error" : "Loading..." : !server.toolsCount ? "No actions found" diff --git a/src/frontend/src/controllers/API/queries/nodes/use-post-template-value.ts b/src/frontend/src/controllers/API/queries/nodes/use-post-template-value.ts index 7c86f6318..20302a1cd 100644 --- a/src/frontend/src/controllers/API/queries/nodes/use-post-template-value.ts +++ b/src/frontend/src/controllers/API/queries/nodes/use-post-template-value.ts @@ -1,3 +1,4 @@ +import useFlowStore from "@/stores/flowStore"; import { APIClassType, ResponseErrorDetailAPI, @@ -26,6 +27,7 @@ export const usePostTemplateValue: useMutationFunctionType< ResponseErrorDetailAPI > = ({ parameterId, nodeId, node }, options?) => { const { mutate } = UseRequestProcessor(); + const getNode = useFlowStore((state) => state.getNode); const postTemplateValueFn = async ( payload: IPostTemplateValue, @@ -33,6 +35,7 @@ export const usePostTemplateValue: useMutationFunctionType< const template = node.template; if (!template) return; + const lastUpdated = new Date().toISOString(); const response = await api.post( getURL("CUSTOM_COMPONENT", { update: "update" }), { @@ -43,8 +46,19 @@ export const usePostTemplateValue: useMutationFunctionType< tool_mode: payload.tool_mode, }, ); + const newTemplate = response.data; + newTemplate.last_updated = lastUpdated; + const newNode = getNode(nodeId)?.data?.node as APIClassType | undefined; - return response.data; + if ( + !newNode?.last_updated || + !newTemplate.last_updated || + Date.parse(newNode.last_updated) < Date.parse(newTemplate.last_updated) + ) { + return newTemplate; + } + + return undefined; }; const mutation: UseMutationResult< diff --git a/src/frontend/src/modals/IOModal/components/IOFieldView/components/key-pair-input.tsx b/src/frontend/src/modals/IOModal/components/IOFieldView/components/key-pair-input.tsx index 4713d40d5..702b4f5e2 100644 --- a/src/frontend/src/modals/IOModal/components/IOFieldView/components/key-pair-input.tsx +++ b/src/frontend/src/modals/IOModal/components/IOFieldView/components/key-pair-input.tsx @@ -1,5 +1,5 @@ import _ from "lodash"; -import { useRef } from "react"; +import { useEffect, useState } from "react"; import IconComponent from "../../../../../components/common/genericIconComponent"; import { Input } from "../../../../../components/ui/input"; import { classNames } from "../../../../../utils/utils"; @@ -23,27 +23,38 @@ const IOKeyPairInput = ({ return Array.isArray(value) ? value : [value]; }; - const ref = useRef([]); - ref.current = - !value || value?.length === 0 ? [{ "": "" }] : checkValueType(value); + const [currentData, setCurrentData] = useState(() => { + return !value || value?.length === 0 ? [{ "": "" }] : checkValueType(value); + }); + + // Update internal state when external value changes + useEffect(() => { + const newData = + !value || value?.length === 0 ? [{ "": "" }] : checkValueType(value); + setCurrentData(newData); + }, [value]); const handleChangeKey = (event, idx) => { - const oldKey = Object.keys(ref.current[idx])[0]; - const updatedObj = { [event.target.value]: ref.current[idx][oldKey] }; - ref.current[idx] = updatedObj; - onChange(ref.current); + const oldKey = Object.keys(currentData[idx])[0]; + const updatedObj = { [event.target.value]: currentData[idx][oldKey] }; + const newData = [...currentData]; + newData[idx] = updatedObj; + setCurrentData(newData); + onChange(newData); }; const handleChangeValue = (newValue, idx) => { - const key = Object.keys(ref.current[idx])[0]; - ref.current[idx][key] = newValue; - onChange(ref.current); + const key = Object.keys(currentData[idx])[0]; + const newData = [...currentData]; + newData[idx] = { ...newData[idx], [key]: newValue }; + setCurrentData(newData); + onChange(newData); }; return ( <>
- {ref.current?.map((obj, index) => { + {currentData?.map((obj, index) => { return Object.keys(obj).map((key, idx) => { return (
@@ -66,12 +77,13 @@ const IOKeyPairInput = ({ disabled={!isInputField} /> - {isList && isInputField && index === ref.current.length - 1 ? ( + {isList && isInputField && index === currentData.length - 1 ? (