fix: make end_all_traces be called at the correct moment (#2516)

* fix(tracing/service.py): remove wait_for_all_tracers call

* refactor: set reasonable timeout for TelemetryService client

* fix: handle HTTP and request errors in TelemetryService

Handle HTTPStatusError and RequestError exceptions separately in the send_telemetry_data method of TelemetryService to provide more specific error messages. Also, catch any unexpected exceptions and log them with an appropriate error message.

* fix: cancel worker task and close client in TelemetryService stop method

Cancel the worker task and close the client in the stop method of TelemetryService to ensure proper cleanup and prevent potential resource leaks. Also handle any exceptions that may occur during the cleanup process and log appropriate error messages.

* style(telemetry/service.py): fix indentation issue in await statement to comply with PEP8 guidelines

* feat(graph): add method to remove vertex from runnables in Graph class

* fix(chat.py): fix issue where vertex was not being removed from runnables list to prevent duplication of results

* fix(chat.py): defines when the end_all_traces call should happen
This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-07-04 09:24:45 -03:00 committed by GitHub
commit 15aa68a342
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 17 additions and 10 deletions

View file

@ -111,6 +111,7 @@ async def retrieve_vertices_order(
# which duplicates the results
for vertex_id in first_layer:
graph.remove_from_predecessors(vertex_id)
graph.remove_vertex_from_runnables(vertex_id)
# Now vertices is a list of lists
# We need to get the id of each vertex
@ -160,7 +161,7 @@ async def build_vertex(
Args:
flow_id (str): The ID of the flow.
vertex_id (str): The ID of the vertex to build.
background_tasks (BackgroundTasks): The background tasks object for logging.
background_tasks (BackgroundTasks): The background tasks dependency.
inputs (Optional[InputValueRequest], optional): The input values for the vertex. Defaults to None.
chat_service (ChatService, optional): The chat service dependency. Defaults to Depends(get_chat_service).
current_user (Any, optional): The current user dependency. Defaults to Depends(get_current_active_user).
@ -211,8 +212,6 @@ async def build_vertex(
)
top_level_vertices = graph.run_manager.get_top_level_vertices(graph, next_runnable_vertices)
result_data_response = ResultDataResponse(**result_dict.model_dump())
result_data_response = ResultDataResponse.model_validate(result_dict, from_attributes=True)
except Exception as exc:
if isinstance(exc, ComponentBuildException):
@ -265,7 +264,7 @@ async def build_vertex(
if graph.stop_vertex and graph.stop_vertex in next_runnable_vertices:
next_runnable_vertices = [graph.stop_vertex]
if not next_runnable_vertices:
if not graph.run_manager.vertices_to_run and not next_runnable_vertices:
background_tasks.add_task(graph.end_all_traces)
build_response = VertexBuildResponse(

View file

@ -1469,6 +1469,9 @@ class Graph:
def remove_from_predecessors(self, vertex_id: str):
self.run_manager.remove_from_predecessors(vertex_id)
def remove_vertex_from_runnables(self, vertex_id: str):
self.run_manager.remove_vertex_from_runnables(vertex_id)
def build_in_degree(self, edges: List[ContractEdge]) -> Dict[str, int]:
in_degree: Dict[str, int] = defaultdict(int)
for edge in edges:

View file

@ -1,4 +1,5 @@
import asyncio
import contextlib
import os
import platform
from datetime import datetime, timezone
@ -30,7 +31,7 @@ class TelemetryService(Service):
self.settings_service = settings_service
self.base_url = settings_service.settings.telemetry_base_url
self.telemetry_queue: asyncio.Queue = asyncio.Queue()
self.client = httpx.AsyncClient(timeout=None)
self.client = httpx.AsyncClient(timeout=10.0) # Set a reasonable timeout
self.running = False
self.package = get_version_info()["package"]
@ -63,8 +64,12 @@ class TelemetryService(Service):
logger.error(f"Failed to send telemetry data: {response.status_code} {response.text}")
else:
logger.debug("Telemetry data sent successfully.")
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error occurred: {e}")
except httpx.RequestError as e:
logger.error(f"Request error occurred: {e}")
except Exception as e:
logger.error(f"Failed to send telemetry data due to: {e}")
logger.error(f"Unexpected error occurred: {e}")
async def log_package_run(self, payload: RunPayload):
await self.telemetry_queue.put((self.send_telemetry_data, payload, "run"))
@ -119,8 +124,10 @@ class TelemetryService(Service):
try:
self.running = False
await self.flush()
self.worker_task.cancel()
if self.worker_task:
await self.worker_task
self.worker_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self.worker_task
await self.client.aclose()
except Exception as e:
logger.error(f"Error stopping tracing service: {e}")

View file

@ -7,7 +7,6 @@ from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Dict, Optional
from uuid import UUID
from langchain.callbacks.tracers.langchain import wait_for_all_tracers
from loguru import logger
from langflow.schema.data import Data
@ -292,5 +291,4 @@ class LangSmithTracer(BaseTracer):
self._run_tree.add_metadata(metadata)
self._run_tree.end(outputs=outputs, error=error)
self._run_tree.post()
wait_for_all_tracers()
self._run_link = self._run_tree.get_url()