From b39ee0ae2d16968516acc7af6b9dda3e72012cb5 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 29 Oct 2024 12:04:22 -0300 Subject: [PATCH] refactor: update OpenTelemetry initialization to avoid multiple calls (#4306) * Ensure single initialization of OpenTelemetry and optimize meter provider setup * Simplify condition by removing redundant class attribute --- localhost-19-2.db | Bin 4096 -> 0 bytes .../services/telemetry/opentelemetry.py | 35 ++++++++++++------ 2 files changed, 23 insertions(+), 12 deletions(-) delete mode 100644 localhost-19-2.db diff --git a/localhost-19-2.db b/localhost-19-2.db deleted file mode 100644 index 7ee7c113a09428e4daafacb6e70a35d18573e608..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 4096 zcmWFz^vNtqRY=P(%1ta$FlG>7U}9o$P*7lCU|@t|AVoG{WYDWB;00+HAlr;ljiVtj n8UmvsFd71*Aut*OqaiRF0;3@?8UmvsFd71*Aut*O6ovo*4{!$i diff --git a/src/backend/base/langflow/services/telemetry/opentelemetry.py b/src/backend/base/langflow/services/telemetry/opentelemetry.py index 4fe30d675..9daf19a8a 100644 --- a/src/backend/base/langflow/services/telemetry/opentelemetry.py +++ b/src/backend/base/langflow/services/telemetry/opentelemetry.py @@ -1,5 +1,4 @@ import threading -import warnings from collections.abc import Mapping from enum import Enum from typing import Any @@ -110,6 +109,8 @@ class OpenTelemetry(metaclass=ThreadSafeSingletonMetaUsingWeakref): _metrics_registry: dict[str, Metric] = {} _metrics: dict[str, Counter | ObservableGaugeWrapper | Histogram | UpDownCounter] = {} _meter_provider: MeterProvider | None = None + _initialized: bool = False # Add initialization flag + prometheus_enabled: bool = True def _add_metric( self, name: str, description: str, unit: str, metric_type: MetricType, labels: dict[str, bool] @@ -141,20 +142,29 @@ class OpenTelemetry(metaclass=ThreadSafeSingletonMetaUsingWeakref): ) def __init__(self, *, prometheus_enabled: bool = True): + # Only initialize once + self.prometheus_enabled = prometheus_enabled + if OpenTelemetry._initialized: + return + if not self._metrics_registry: self._register_metric() if self._meter_provider is None: - resource = Resource.create({"service.name": "langflow"}) - metric_readers = [] + # Get existing meter provider if any + existing_provider = metrics.get_meter_provider() - # configure prometheus exporter - self.prometheus_enabled = prometheus_enabled - if prometheus_enabled: - metric_readers.append(PrometheusMetricReader()) + # Check if FastAPI instrumentation is already set up + if hasattr(existing_provider, "get_meter") and existing_provider.get_meter("http.server"): + self._meter_provider = existing_provider + else: + resource = Resource.create({"service.name": "langflow"}) + metric_readers = [] + if self.prometheus_enabled: + metric_readers.append(PrometheusMetricReader()) - self._meter_provider = MeterProvider(resource=resource, metric_readers=metric_readers) - metrics.set_meter_provider(self._meter_provider) + self._meter_provider = MeterProvider(resource=resource, metric_readers=metric_readers) + metrics.set_meter_provider(self._meter_provider) self.meter = self._meter_provider.get_meter(langflow_meter_name) @@ -163,11 +173,12 @@ class OpenTelemetry(metaclass=ThreadSafeSingletonMetaUsingWeakref): msg = f"Key '{name}' does not match metric name '{metric.name}'" raise ValueError(msg) if name not in self._metrics: - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - self._metrics[metric.name] = self._create_metric(metric) + self._metrics[metric.name] = self._create_metric(metric) + + OpenTelemetry._initialized = True def _create_metric(self, metric): + # Remove _created_instruments check if metric.name in self._metrics: return self._metrics[metric.name]