fix: prevent message duplication in ProjectMCPServer and handle_call_tool (#8918)
* Updated the message processing logic in both ProjectMCPServer and handle_call_tool to ensure that duplicate messages are not added to the collected results. * Introduced a helper function to manage the addition of results, enhancing code clarity and maintainability. Co-authored-by: Cristhian Zanforlin Lousa <cristhian.lousa@gmail.com>
This commit is contained in:
parent
114a70eebd
commit
fd92d16553
2 changed files with 25 additions and 20 deletions
|
|
@ -251,10 +251,7 @@ async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent
|
|||
input_value=processed_inputs.get("input_value", ""), session_id=conversation_id
|
||||
)
|
||||
|
||||
async def send_progress_updates():
|
||||
if not (mcp_config.enable_progress_notifications and server.request_context.meta.progressToken):
|
||||
return
|
||||
|
||||
async def send_progress_updates(progress_token):
|
||||
try:
|
||||
progress = 0.0
|
||||
while True:
|
||||
|
|
@ -274,7 +271,7 @@ async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent
|
|||
try:
|
||||
progress_task = None
|
||||
if mcp_config.enable_progress_notifications and server.request_context.meta.progressToken:
|
||||
progress_task = asyncio.create_task(send_progress_updates())
|
||||
progress_task = asyncio.create_task(send_progress_updates(server.request_context.meta.progressToken))
|
||||
|
||||
try:
|
||||
try:
|
||||
|
|
@ -284,20 +281,25 @@ async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent
|
|||
stream=False,
|
||||
api_key_user=current_user,
|
||||
)
|
||||
# Process all outputs and messages
|
||||
# Process all outputs and messages, ensuring no duplicates
|
||||
processed_texts = set()
|
||||
|
||||
def add_result(text: str):
|
||||
if text not in processed_texts:
|
||||
processed_texts.add(text)
|
||||
collected_results.append(types.TextContent(type="text", text=text))
|
||||
|
||||
for run_output in result.outputs:
|
||||
for component_output in run_output.outputs:
|
||||
# Handle messages
|
||||
for msg in component_output.messages or []:
|
||||
text_content = types.TextContent(type="text", text=msg.message)
|
||||
collected_results.append(text_content)
|
||||
add_result(msg.message)
|
||||
# Handle results
|
||||
for value in (component_output.results or {}).values():
|
||||
if isinstance(value, Message):
|
||||
text_content = types.TextContent(type="text", text=value.get_text())
|
||||
collected_results.append(text_content)
|
||||
add_result(value.get_text())
|
||||
else:
|
||||
collected_results.append(types.TextContent(type="text", text=str(value)))
|
||||
add_result(str(value))
|
||||
except Exception as e: # noqa: BLE001
|
||||
error_msg = f"Error Executing the {flow.name} tool. Error: {e!s}"
|
||||
collected_results.append(types.TextContent(type="text", text=error_msg))
|
||||
|
|
@ -306,9 +308,7 @@ async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent
|
|||
finally:
|
||||
if progress_task:
|
||||
progress_task.cancel()
|
||||
await asyncio.wait([progress_task])
|
||||
if not progress_task.cancelled() and (exc := progress_task.exception()) is not None:
|
||||
raise exc
|
||||
await asyncio.gather(progress_task, return_exceptions=True)
|
||||
|
||||
except Exception:
|
||||
if mcp_config.enable_progress_notifications and (
|
||||
|
|
|
|||
|
|
@ -838,20 +838,25 @@ class ProjectMCPServer:
|
|||
stream=False,
|
||||
api_key_user=current_user,
|
||||
)
|
||||
# Process all outputs and messages
|
||||
# Process all outputs and messages, ensuring no duplicates
|
||||
processed_texts = set()
|
||||
|
||||
def add_result(text: str):
|
||||
if text not in processed_texts:
|
||||
processed_texts.add(text)
|
||||
collected_results.append(types.TextContent(type="text", text=text))
|
||||
|
||||
for run_output in result.outputs:
|
||||
for component_output in run_output.outputs:
|
||||
# Handle messages
|
||||
for msg in component_output.messages or []:
|
||||
text_content = types.TextContent(type="text", text=msg.message)
|
||||
collected_results.append(text_content)
|
||||
add_result(msg.message)
|
||||
# Handle results
|
||||
for value in (component_output.results or {}).values():
|
||||
if isinstance(value, Message):
|
||||
text_content = types.TextContent(type="text", text=value.get_text())
|
||||
collected_results.append(text_content)
|
||||
add_result(value.get_text())
|
||||
else:
|
||||
collected_results.append(types.TextContent(type="text", text=str(value)))
|
||||
add_result(str(value))
|
||||
except Exception as e: # noqa: BLE001
|
||||
error_msg = f"Error Executing the {flow.name} tool. Error: {e!s}"
|
||||
collected_results.append(types.TextContent(type="text", text=error_msg))
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue