feat: Add TelemetryService and TelemetryServiceFactory

This commit adds the `TelemetryService` class and `TelemetryServiceFactory` class to the `langflow.services.telemetry` module. The `TelemetryService` is responsible for handling telemetry-related functionality, while the `TelemetryServiceFactory` is used to create instances of the `TelemetryService` class. This addition enables the application to collect and analyze telemetry data.
This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-06-23 00:35:38 -03:00
commit 609b99e9f3
5 changed files with 180 additions and 0 deletions

View file

@ -18,6 +18,7 @@ if TYPE_CHECKING:
from langflow.services.storage.service import StorageService
from langflow.services.store.service import StoreService
from langflow.services.task.service import TaskService
from langflow.services.telemetry.service import TelemetryService
from langflow.services.tracing.service import TracingService
from langflow.services.variable.service import VariableService
@ -42,6 +43,18 @@ def get_service(service_type: ServiceType, default=None):
return service_manager.get(service_type, default) # type: ignore
def get_telemetry_service() -> "TelemetryService":
"""
Retrieves the TelemetryService instance from the service manager.
Returns:
TelemetryService: The TelemetryService instance.
"""
from langflow.services.telemetry.factory import TelemetryServiceFactory
return get_service(ServiceType.TELEMETRY_SERVICE, TelemetryServiceFactory()) # type: ignore
def get_tracing_service() -> "TracingService":
"""
Retrieves the TracingService instance from the service manager.

View file

@ -22,3 +22,4 @@ class ServiceType(str, Enum):
# SOCKETIO_SERVICE = "socket_service"
STATE_SERVICE = "state_service"
TRACING_SERVICE = "tracing_service"
TELEMETRY_SERVICE = "telemetry_service"

View file

@ -0,0 +1,15 @@
from typing import TYPE_CHECKING
from langflow.services.factory import ServiceFactory
from langflow.services.telemetry.service import TelemetryService
if TYPE_CHECKING:
from langflow.services.settings.service import SettingsService
class TelemetryServiceFactory(ServiceFactory):
def __init__(self):
super().__init__(TelemetryService)
def create(self, settings_service: "SettingsService"):
return TelemetryService(settings_service)

View file

@ -0,0 +1,151 @@
import asyncio
import os
import platform
from typing import TYPE_CHECKING
import httpx
from loguru import logger
from pydantic import BaseModel
from langflow.services.base import Service
from langflow.utils.version import get_version_info
if TYPE_CHECKING:
from langflow.services.settings.service import SettingsService
class RunPayload(BaseModel):
isEndpointName: str
isWebhook: bool
seconds: int
success: bool
errorMessage: str
class ShutdownPayload(BaseModel):
timeRunning: int
startingFlowsCount: int
finalFlowsCount: int
startingCompCount: int
finalCompCount: int
class VersionPayload(BaseModel):
version: str
platform: str
python: str
cacheType: str
backendOnly: bool
class PlaygroundPayload(BaseModel):
seconds: int
componentCount: int
success: bool
class ComponentPayload(BaseModel):
name: str
seconds: int
success: bool
errorMessage: str
class TelemetryService(Service):
name = "telemetry_service"
def __init__(self, settings_service: "SettingsService"):
super().__init__()
self.settings_service = settings_service
self.base_url = settings_service.settings.telemetry_base_url
self.telemetry_queue = asyncio.Queue()
self.client = httpx.AsyncClient(timeout=None)
self.running = False
self.package = get_version_info()["package"]
# Check for do-not-track settings
self.do_not_track = (
os.getenv("DO_NOT_TRACK", "False").lower() == "true" or settings_service.settings.do_not_track
)
async def telemetry_worker(self):
while self.running:
func, payload, path = await self.telemetry_queue.get()
try:
await func(payload, path)
except Exception as e:
logger.error(f"Error sending telemetry data: {e}")
finally:
self.telemetry_queue.task_done()
async def send_telemetry_data(self, payload: BaseModel, path: str | None = None):
if self.do_not_track:
logger.debug("Telemetry tracking is disabled.")
return
url = f"{self.base_url}/{self.package}"
if path:
url = f"{url}/{path}"
try:
response = await self.client.get(url, params=payload.model_dump())
if response.status_code != 200:
logger.error(f"Failed to send telemetry data: {response.status_code} {response.text}")
else:
logger.debug("Telemetry data sent successfully.")
except Exception as e:
logger.error(f"Failed to send telemetry data due to: {e}")
async def log_package_run(self, payload: RunPayload):
await self.telemetry_queue.put((self.send_telemetry_data, payload, "run"))
async def log_package_shutdown(self, payload: ShutdownPayload):
await self.telemetry_queue.put((self.send_telemetry_data, payload, "shutdown"))
async def log_package_version(self):
python_version = ".".join(platform.python_version().split(".")[:2])
version_info = get_version_info()
payload = VersionPayload(
version=version_info["version"],
platform=platform.platform(),
python=python_version,
cacheType=self.settings_service.settings.cache_type,
backendOnly=self.settings_service.settings.backend_only,
)
await self.telemetry_queue.put((self.send_telemetry_data, payload, None))
async def log_package_playground(self, payload: PlaygroundPayload):
await self.telemetry_queue.put((self.send_telemetry_data, payload, "playground"))
async def log_package_component(self, payload: ComponentPayload):
await self.telemetry_queue.put((self.send_telemetry_data, payload, "component"))
async def start(self):
if self.running or self.do_not_track:
return
try:
self.running = True
self.worker_task = asyncio.create_task(self.telemetry_worker())
await self.worker_task
await self.log_package_version()
except Exception as e:
logger.error(f"Error starting telemetry service: {e}")
async def flush(self):
if self.do_not_track:
return
try:
await self.telemetry_queue.join()
except Exception as e:
logger.error(f"Error flushing logs: {e}")
async def stop(self):
if self.do_not_track:
return
try:
self.running = False
await self.flush()
self.worker_task.cancel()
if self.worker_task:
await self.worker_task
except Exception as e:
logger.error(f"Error stopping tracing service: {e}")