From fdee73f63cafc77da8a931ddaddcd05d042ffb41 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sun, 23 Jun 2024 01:10:40 -0300 Subject: [PATCH] feat: Add TelemetryService and log package run time This commit adds the `TelemetryService` class and updates the code in `endpoints.py` to log the run time of package execution. The `TelemetryService` is responsible for handling telemetry-related functionality, including logging package run time. This addition enables the application to collect and analyze telemetry data, providing insights into the performance of package execution. --- src/backend/base/langflow/api/v1/endpoints.py | 41 +++++++++++++++++-- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/src/backend/base/langflow/api/v1/endpoints.py b/src/backend/base/langflow/api/v1/endpoints.py index d28348237..668f10482 100644 --- a/src/backend/base/langflow/api/v1/endpoints.py +++ b/src/backend/base/langflow/api/v1/endpoints.py @@ -1,9 +1,9 @@ +import time from asyncio import Lock from http import HTTPStatus from typing import TYPE_CHECKING, Annotated, List, Optional, Union from uuid import UUID -from langflow.utils.version import get_version_info import sqlalchemy as sa from fastapi import APIRouter, BackgroundTasks, Body, Depends, HTTPException, Request, UploadFile, status from loguru import logger @@ -39,9 +39,13 @@ from langflow.services.deps import ( get_session_service, get_settings_service, get_task_service, + get_telemetry_service, ) from langflow.services.session.service import SessionService from langflow.services.task.service import TaskService +from langflow.services.telemetry.schema import RunPayload +from langflow.services.telemetry.service import TelemetryService +from langflow.utils.version import get_version_info if TYPE_CHECKING: from langflow.services.cache.base import CacheService @@ -125,6 +129,8 @@ async def simplified_run_flow( input_request: SimplifiedAPIRequest = SimplifiedAPIRequest(), stream: bool = False, api_key_user: User = Depends(api_key_security), + telemetry_service: "TelemetryService" = Depends(get_telemetry_service), + background_tasks: BackgroundTasks = BackgroundTasks(), ): """ Executes a specified flow by ID with input customization, performance enhancements through caching, and optional data streaming. @@ -174,14 +180,27 @@ async def simplified_run_flow( This endpoint provides a powerful interface for executing flows with enhanced flexibility and efficiency, supporting a wide range of applications by allowing for dynamic input and output configuration along with performance optimizations through session management and caching. """ + start_time = time.perf_counter() try: - return await simple_run_flow( + result = await simple_run_flow( flow=flow, input_request=input_request, stream=stream, api_key_user=api_key_user, ) + end_time = time.perf_counter() + background_tasks.add_task( + telemetry_service.log_package_run, + RunPayload(IsWebhook=False, seconds=int(end_time - start_time), success=True, errorMessage=""), + ) + return result + except ValueError as exc: + end_time = time.perf_counter() + background_tasks.add_task( + telemetry_service.log_package_run, + RunPayload(IsWebhook=False, seconds=int(end_time - start_time), success=False, errorMessage=str(exc)), + ) if "badly formed hexadecimal UUID string" in str(exc): # This means the Flow ID is not a valid UUID which means it can't find the flow raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc @@ -192,16 +211,19 @@ async def simplified_run_flow( raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) from exc except Exception as exc: logger.exception(exc) + background_tasks.add_task( + telemetry_service.log_package_run, + RunPayload(IsWebhook=False, seconds=int(end_time - start_time), success=False, errorMessage=str(exc)), + ) raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) from exc @router.post("/webhook/{flow_id_or_name}", response_model=dict, status_code=HTTPStatus.ACCEPTED) async def webhook_run_flow( - db: Annotated[Session, Depends(get_session)], flow: Annotated[Flow, Depends(get_flow_by_id_or_endpoint_name)], request: Request, background_tasks: BackgroundTasks, - session_service: SessionService = Depends(get_session_service), + telemetry_service: "TelemetryService" = Depends(get_telemetry_service), ): """ Run a flow using a webhook request. @@ -220,6 +242,7 @@ async def webhook_run_flow( HTTPException: If the flow is not found or if there is an error processing the request. """ try: + start_time = time.perf_counter() logger.debug("Received webhook request") data = await request.body() if not data: @@ -247,8 +270,18 @@ async def webhook_run_flow( flow=flow, input_request=input_request, ) + background_tasks.add_task( + telemetry_service.log_package_run, + RunPayload(IsWebhook=True, seconds=int(time.perf_counter() - start_time), success=True, errorMessage=""), + ) return {"message": "Task started in the background", "status": "in progress"} except Exception as exc: + background_tasks.add_task( + telemetry_service.log_package_run, + RunPayload( + IsWebhook=True, seconds=int(time.perf_counter() - start_time), success=False, errorMessage=str(exc) + ), + ) if "Flow ID is required" in str(exc) or "Request body is empty" in str(exc): raise HTTPException(status_code=400, detail=str(exc)) from exc logger.exception(exc)