diff --git a/src/backend/base/langflow/api/v1/chat.py b/src/backend/base/langflow/api/v1/chat.py index c0feb621f..d6e54e99a 100644 --- a/src/backend/base/langflow/api/v1/chat.py +++ b/src/backend/base/langflow/api/v1/chat.py @@ -29,8 +29,10 @@ from langflow.graph.graph.base import Graph from langflow.schema.schema import OutputLog from langflow.services.auth.utils import get_current_active_user from langflow.services.chat.service import ChatService -from langflow.services.deps import get_chat_service, get_session, get_session_service +from langflow.services.deps import get_chat_service, get_session, get_session_service, get_telemetry_service from langflow.services.monitor.utils import log_vertex_build +from langflow.services.telemetry.schema import ComponentPayload, PlaygroundPayload +from langflow.services.telemetry.service import TelemetryService if TYPE_CHECKING: from langflow.graph.vertex.types import InterfaceVertex @@ -58,11 +60,13 @@ async def try_running_celery_task(vertex, user_id): @router.post("/build/{flow_id}/vertices", response_model=VerticesOrderResponse) async def retrieve_vertices_order( flow_id: uuid.UUID, + background_tasks: BackgroundTasks, data: Optional[Annotated[Optional[FlowDataRequest], Body(embed=True)]] = None, stop_component_id: Optional[str] = None, start_component_id: Optional[str] = None, chat_service: "ChatService" = Depends(get_chat_service), session=Depends(get_session), + telemetry_service: "TelemetryService" = Depends(get_telemetry_service), ): """ Retrieve the vertices order for a given flow. @@ -81,6 +85,7 @@ async def retrieve_vertices_order( Raises: HTTPException: If there is an error checking the build status. """ + start_time = time.perf_counter() try: flow_id_str = str(flow_id) # First, we need to check if the flow_id is in the cache @@ -109,11 +114,28 @@ async def retrieve_vertices_order( # Now vertices is a list of lists # We need to get the id of each vertex # and return the same structure but only with the ids + components_count = len(graph.vertices) vertices_to_run = list(graph.vertices_to_run) + get_top_level_vertices(graph, graph.vertices_to_run) await chat_service.set_cache(str(flow_id), graph) + background_tasks.add_task( + telemetry_service.log_package_playground, + PlaygroundPayload( + seconds=int(time.perf_counter() - start_time), + componentCount=components_count, + success=True, + ), + ) return VerticesOrderResponse(ids=first_layer, run_id=graph._run_id, vertices_to_run=vertices_to_run) - except Exception as exc: + background_tasks.add_task( + telemetry_service.log_package_playground, + PlaygroundPayload( + seconds=int(time.perf_counter() - start_time), + componentCount=components_count, + success=False, + errorMessage=str(exc), + ), + ) if "stream or streaming set to True" in str(exc): raise HTTPException(status_code=400, detail=str(exc)) logger.error(f"Error checking build status: {exc}") @@ -130,6 +152,7 @@ async def build_vertex( files: Optional[list[str]] = None, chat_service: "ChatService" = Depends(get_chat_service), current_user=Depends(get_current_active_user), + telemetry_service: "TelemetryService" = Depends(get_telemetry_service), ): """Build a vertex instead of the entire graph. @@ -152,8 +175,8 @@ async def build_vertex( next_runnable_vertices = [] top_level_vertices = [] + start_time = time.perf_counter() try: - start_time = time.perf_counter() cache = await chat_service.get_cache(flow_id_str) if not cache: # If there's no cache @@ -253,8 +276,26 @@ async def build_vertex( id=vertex.id, data=result_data_response, ) + background_tasks.add_task( + telemetry_service.log_package_component, + ComponentPayload( + name=vertex_id, + seconds=int(time.perf_counter() - start_time), + success=valid, + errorMessage=params, + ), + ) return build_response except Exception as exc: + background_tasks.add_task( + telemetry_service.log_package_component, + ComponentPayload( + name=vertex_id, + seconds=int(time.perf_counter() - start_time), + success=False, + errorMessage=str(exc), + ), + ) logger.error(f"Error building Component:\n\n{exc}") logger.exception(exc) message = parse_exception(exc) diff --git a/src/backend/base/langflow/services/telemetry/schema.py b/src/backend/base/langflow/services/telemetry/schema.py index d9ac221b0..6884a2ee4 100644 --- a/src/backend/base/langflow/services/telemetry/schema.py +++ b/src/backend/base/langflow/services/telemetry/schema.py @@ -1,3 +1,4 @@ +from os import error from pydantic import BaseModel @@ -26,6 +27,7 @@ class PlaygroundPayload(BaseModel): seconds: int componentCount: int success: bool + errorMessage: str = "" class ComponentPayload(BaseModel):