feat: Add TelemetryService and log package run time

This commit adds the `TelemetryService` class and updates the code in `endpoints.py` to log the run time of package execution. The `TelemetryService` is responsible for handling telemetry-related functionality, including logging package run time. This addition enables the application to collect and analyze telemetry data, providing insights into the performance of package execution.
This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-06-23 01:10:40 -03:00
commit fdee73f63c

View file

@ -1,9 +1,9 @@
import time
from asyncio import Lock
from http import HTTPStatus
from typing import TYPE_CHECKING, Annotated, List, Optional, Union
from uuid import UUID
from langflow.utils.version import get_version_info
import sqlalchemy as sa
from fastapi import APIRouter, BackgroundTasks, Body, Depends, HTTPException, Request, UploadFile, status
from loguru import logger
@ -39,9 +39,13 @@ from langflow.services.deps import (
get_session_service,
get_settings_service,
get_task_service,
get_telemetry_service,
)
from langflow.services.session.service import SessionService
from langflow.services.task.service import TaskService
from langflow.services.telemetry.schema import RunPayload
from langflow.services.telemetry.service import TelemetryService
from langflow.utils.version import get_version_info
if TYPE_CHECKING:
from langflow.services.cache.base import CacheService
@ -125,6 +129,8 @@ async def simplified_run_flow(
input_request: SimplifiedAPIRequest = SimplifiedAPIRequest(),
stream: bool = False,
api_key_user: User = Depends(api_key_security),
telemetry_service: "TelemetryService" = Depends(get_telemetry_service),
background_tasks: BackgroundTasks = BackgroundTasks(),
):
"""
Executes a specified flow by ID with input customization, performance enhancements through caching, and optional data streaming.
@ -174,14 +180,27 @@ async def simplified_run_flow(
This endpoint provides a powerful interface for executing flows with enhanced flexibility and efficiency, supporting a wide range of applications by allowing for dynamic input and output configuration along with performance optimizations through session management and caching.
"""
start_time = time.perf_counter()
try:
return await simple_run_flow(
result = await simple_run_flow(
flow=flow,
input_request=input_request,
stream=stream,
api_key_user=api_key_user,
)
end_time = time.perf_counter()
background_tasks.add_task(
telemetry_service.log_package_run,
RunPayload(IsWebhook=False, seconds=int(end_time - start_time), success=True, errorMessage=""),
)
return result
except ValueError as exc:
end_time = time.perf_counter()
background_tasks.add_task(
telemetry_service.log_package_run,
RunPayload(IsWebhook=False, seconds=int(end_time - start_time), success=False, errorMessage=str(exc)),
)
if "badly formed hexadecimal UUID string" in str(exc):
# This means the Flow ID is not a valid UUID which means it can't find the flow
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
@ -192,16 +211,19 @@ async def simplified_run_flow(
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) from exc
except Exception as exc:
logger.exception(exc)
background_tasks.add_task(
telemetry_service.log_package_run,
RunPayload(IsWebhook=False, seconds=int(end_time - start_time), success=False, errorMessage=str(exc)),
)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) from exc
@router.post("/webhook/{flow_id_or_name}", response_model=dict, status_code=HTTPStatus.ACCEPTED)
async def webhook_run_flow(
db: Annotated[Session, Depends(get_session)],
flow: Annotated[Flow, Depends(get_flow_by_id_or_endpoint_name)],
request: Request,
background_tasks: BackgroundTasks,
session_service: SessionService = Depends(get_session_service),
telemetry_service: "TelemetryService" = Depends(get_telemetry_service),
):
"""
Run a flow using a webhook request.
@ -220,6 +242,7 @@ async def webhook_run_flow(
HTTPException: If the flow is not found or if there is an error processing the request.
"""
try:
start_time = time.perf_counter()
logger.debug("Received webhook request")
data = await request.body()
if not data:
@ -247,8 +270,18 @@ async def webhook_run_flow(
flow=flow,
input_request=input_request,
)
background_tasks.add_task(
telemetry_service.log_package_run,
RunPayload(IsWebhook=True, seconds=int(time.perf_counter() - start_time), success=True, errorMessage=""),
)
return {"message": "Task started in the background", "status": "in progress"}
except Exception as exc:
background_tasks.add_task(
telemetry_service.log_package_run,
RunPayload(
IsWebhook=True, seconds=int(time.perf_counter() - start_time), success=False, errorMessage=str(exc)
),
)
if "Flow ID is required" in str(exc) or "Request body is empty" in str(exc):
raise HTTPException(status_code=400, detail=str(exc)) from exc
logger.exception(exc)