diff --git a/src/backend/base/langflow/api/v1/mcp.py b/src/backend/base/langflow/api/v1/mcp.py index 2ceeb579b..099312ab3 100644 --- a/src/backend/base/langflow/api/v1/mcp.py +++ b/src/backend/base/langflow/api/v1/mcp.py @@ -251,10 +251,7 @@ async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent input_value=processed_inputs.get("input_value", ""), session_id=conversation_id ) - async def send_progress_updates(): - if not (mcp_config.enable_progress_notifications and server.request_context.meta.progressToken): - return - + async def send_progress_updates(progress_token): try: progress = 0.0 while True: @@ -274,7 +271,7 @@ async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent try: progress_task = None if mcp_config.enable_progress_notifications and server.request_context.meta.progressToken: - progress_task = asyncio.create_task(send_progress_updates()) + progress_task = asyncio.create_task(send_progress_updates(server.request_context.meta.progressToken)) try: try: @@ -284,20 +281,25 @@ async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent stream=False, api_key_user=current_user, ) - # Process all outputs and messages + # Process all outputs and messages, ensuring no duplicates + processed_texts = set() + + def add_result(text: str): + if text not in processed_texts: + processed_texts.add(text) + collected_results.append(types.TextContent(type="text", text=text)) + for run_output in result.outputs: for component_output in run_output.outputs: # Handle messages for msg in component_output.messages or []: - text_content = types.TextContent(type="text", text=msg.message) - collected_results.append(text_content) + add_result(msg.message) # Handle results for value in (component_output.results or {}).values(): if isinstance(value, Message): - text_content = types.TextContent(type="text", text=value.get_text()) - collected_results.append(text_content) + add_result(value.get_text()) else: - collected_results.append(types.TextContent(type="text", text=str(value))) + add_result(str(value)) except Exception as e: # noqa: BLE001 error_msg = f"Error Executing the {flow.name} tool. Error: {e!s}" collected_results.append(types.TextContent(type="text", text=error_msg)) @@ -306,9 +308,7 @@ async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent finally: if progress_task: progress_task.cancel() - await asyncio.wait([progress_task]) - if not progress_task.cancelled() and (exc := progress_task.exception()) is not None: - raise exc + await asyncio.gather(progress_task, return_exceptions=True) except Exception: if mcp_config.enable_progress_notifications and ( diff --git a/src/backend/base/langflow/api/v1/mcp_projects.py b/src/backend/base/langflow/api/v1/mcp_projects.py index c64471fc8..39648dd73 100644 --- a/src/backend/base/langflow/api/v1/mcp_projects.py +++ b/src/backend/base/langflow/api/v1/mcp_projects.py @@ -838,20 +838,25 @@ class ProjectMCPServer: stream=False, api_key_user=current_user, ) - # Process all outputs and messages + # Process all outputs and messages, ensuring no duplicates + processed_texts = set() + + def add_result(text: str): + if text not in processed_texts: + processed_texts.add(text) + collected_results.append(types.TextContent(type="text", text=text)) + for run_output in result.outputs: for component_output in run_output.outputs: # Handle messages for msg in component_output.messages or []: - text_content = types.TextContent(type="text", text=msg.message) - collected_results.append(text_content) + add_result(msg.message) # Handle results for value in (component_output.results or {}).values(): if isinstance(value, Message): - text_content = types.TextContent(type="text", text=value.get_text()) - collected_results.append(text_content) + add_result(value.get_text()) else: - collected_results.append(types.TextContent(type="text", text=str(value))) + add_result(str(value)) except Exception as e: # noqa: BLE001 error_msg = f"Error Executing the {flow.name} tool. Error: {e!s}" collected_results.append(types.TextContent(type="text", text=error_msg))