diff --git a/src/backend/base/langflow/services/deps.py b/src/backend/base/langflow/services/deps.py index 304fce1bc..588d0ced5 100644 --- a/src/backend/base/langflow/services/deps.py +++ b/src/backend/base/langflow/services/deps.py @@ -18,6 +18,7 @@ if TYPE_CHECKING: from langflow.services.storage.service import StorageService from langflow.services.store.service import StoreService from langflow.services.task.service import TaskService + from langflow.services.telemetry.service import TelemetryService from langflow.services.tracing.service import TracingService from langflow.services.variable.service import VariableService @@ -42,6 +43,18 @@ def get_service(service_type: ServiceType, default=None): return service_manager.get(service_type, default) # type: ignore +def get_telemetry_service() -> "TelemetryService": + """ + Retrieves the TelemetryService instance from the service manager. + + Returns: + TelemetryService: The TelemetryService instance. + """ + from langflow.services.telemetry.factory import TelemetryServiceFactory + + return get_service(ServiceType.TELEMETRY_SERVICE, TelemetryServiceFactory()) # type: ignore + + def get_tracing_service() -> "TracingService": """ Retrieves the TracingService instance from the service manager. diff --git a/src/backend/base/langflow/services/schema.py b/src/backend/base/langflow/services/schema.py index d3e9ca048..eafc0c085 100644 --- a/src/backend/base/langflow/services/schema.py +++ b/src/backend/base/langflow/services/schema.py @@ -22,3 +22,4 @@ class ServiceType(str, Enum): # SOCKETIO_SERVICE = "socket_service" STATE_SERVICE = "state_service" TRACING_SERVICE = "tracing_service" + TELEMETRY_SERVICE = "telemetry_service" diff --git a/src/backend/base/langflow/services/telemetry/__init__.py b/src/backend/base/langflow/services/telemetry/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/backend/base/langflow/services/telemetry/factory.py b/src/backend/base/langflow/services/telemetry/factory.py new file mode 100644 index 000000000..86f048112 --- /dev/null +++ b/src/backend/base/langflow/services/telemetry/factory.py @@ -0,0 +1,15 @@ +from typing import TYPE_CHECKING + +from langflow.services.factory import ServiceFactory +from langflow.services.telemetry.service import TelemetryService + +if TYPE_CHECKING: + from langflow.services.settings.service import SettingsService + + +class TelemetryServiceFactory(ServiceFactory): + def __init__(self): + super().__init__(TelemetryService) + + def create(self, settings_service: "SettingsService"): + return TelemetryService(settings_service) diff --git a/src/backend/base/langflow/services/telemetry/service.py b/src/backend/base/langflow/services/telemetry/service.py new file mode 100644 index 000000000..822b40b21 --- /dev/null +++ b/src/backend/base/langflow/services/telemetry/service.py @@ -0,0 +1,151 @@ +import asyncio +import os +import platform +from typing import TYPE_CHECKING + +import httpx +from loguru import logger +from pydantic import BaseModel + +from langflow.services.base import Service +from langflow.utils.version import get_version_info + +if TYPE_CHECKING: + from langflow.services.settings.service import SettingsService + + +class RunPayload(BaseModel): + isEndpointName: str + isWebhook: bool + seconds: int + success: bool + errorMessage: str + + +class ShutdownPayload(BaseModel): + timeRunning: int + startingFlowsCount: int + finalFlowsCount: int + startingCompCount: int + finalCompCount: int + + +class VersionPayload(BaseModel): + version: str + platform: str + python: str + cacheType: str + backendOnly: bool + + +class PlaygroundPayload(BaseModel): + seconds: int + componentCount: int + success: bool + + +class ComponentPayload(BaseModel): + name: str + seconds: int + success: bool + errorMessage: str + + +class TelemetryService(Service): + name = "telemetry_service" + + def __init__(self, settings_service: "SettingsService"): + super().__init__() + self.settings_service = settings_service + self.base_url = settings_service.settings.telemetry_base_url + self.telemetry_queue = asyncio.Queue() + self.client = httpx.AsyncClient(timeout=None) + self.running = False + self.package = get_version_info()["package"] + + # Check for do-not-track settings + self.do_not_track = ( + os.getenv("DO_NOT_TRACK", "False").lower() == "true" or settings_service.settings.do_not_track + ) + + async def telemetry_worker(self): + while self.running: + func, payload, path = await self.telemetry_queue.get() + try: + await func(payload, path) + except Exception as e: + logger.error(f"Error sending telemetry data: {e}") + finally: + self.telemetry_queue.task_done() + + async def send_telemetry_data(self, payload: BaseModel, path: str | None = None): + if self.do_not_track: + logger.debug("Telemetry tracking is disabled.") + return + + url = f"{self.base_url}/{self.package}" + if path: + url = f"{url}/{path}" + try: + response = await self.client.get(url, params=payload.model_dump()) + if response.status_code != 200: + logger.error(f"Failed to send telemetry data: {response.status_code} {response.text}") + else: + logger.debug("Telemetry data sent successfully.") + except Exception as e: + logger.error(f"Failed to send telemetry data due to: {e}") + + async def log_package_run(self, payload: RunPayload): + await self.telemetry_queue.put((self.send_telemetry_data, payload, "run")) + + async def log_package_shutdown(self, payload: ShutdownPayload): + await self.telemetry_queue.put((self.send_telemetry_data, payload, "shutdown")) + + async def log_package_version(self): + python_version = ".".join(platform.python_version().split(".")[:2]) + version_info = get_version_info() + payload = VersionPayload( + version=version_info["version"], + platform=platform.platform(), + python=python_version, + cacheType=self.settings_service.settings.cache_type, + backendOnly=self.settings_service.settings.backend_only, + ) + await self.telemetry_queue.put((self.send_telemetry_data, payload, None)) + + async def log_package_playground(self, payload: PlaygroundPayload): + await self.telemetry_queue.put((self.send_telemetry_data, payload, "playground")) + + async def log_package_component(self, payload: ComponentPayload): + await self.telemetry_queue.put((self.send_telemetry_data, payload, "component")) + + async def start(self): + if self.running or self.do_not_track: + return + try: + self.running = True + self.worker_task = asyncio.create_task(self.telemetry_worker()) + await self.worker_task + await self.log_package_version() + except Exception as e: + logger.error(f"Error starting telemetry service: {e}") + + async def flush(self): + if self.do_not_track: + return + try: + await self.telemetry_queue.join() + except Exception as e: + logger.error(f"Error flushing logs: {e}") + + async def stop(self): + if self.do_not_track: + return + try: + self.running = False + await self.flush() + self.worker_task.cancel() + if self.worker_task: + await self.worker_task + except Exception as e: + logger.error(f"Error stopping tracing service: {e}")