refactor: update OpenTelemetry initialization to avoid multiple calls (#4306)
* Ensure single initialization of OpenTelemetry and optimize meter provider setup * Simplify condition by removing redundant class attribute
This commit is contained in:
parent
ccba14bec4
commit
b39ee0ae2d
2 changed files with 23 additions and 12 deletions
Binary file not shown.
|
|
@ -1,5 +1,4 @@
|
|||
import threading
|
||||
import warnings
|
||||
from collections.abc import Mapping
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
|
|
@ -110,6 +109,8 @@ class OpenTelemetry(metaclass=ThreadSafeSingletonMetaUsingWeakref):
|
|||
_metrics_registry: dict[str, Metric] = {}
|
||||
_metrics: dict[str, Counter | ObservableGaugeWrapper | Histogram | UpDownCounter] = {}
|
||||
_meter_provider: MeterProvider | None = None
|
||||
_initialized: bool = False # Add initialization flag
|
||||
prometheus_enabled: bool = True
|
||||
|
||||
def _add_metric(
|
||||
self, name: str, description: str, unit: str, metric_type: MetricType, labels: dict[str, bool]
|
||||
|
|
@ -141,20 +142,29 @@ class OpenTelemetry(metaclass=ThreadSafeSingletonMetaUsingWeakref):
|
|||
)
|
||||
|
||||
def __init__(self, *, prometheus_enabled: bool = True):
|
||||
# Only initialize once
|
||||
self.prometheus_enabled = prometheus_enabled
|
||||
if OpenTelemetry._initialized:
|
||||
return
|
||||
|
||||
if not self._metrics_registry:
|
||||
self._register_metric()
|
||||
|
||||
if self._meter_provider is None:
|
||||
resource = Resource.create({"service.name": "langflow"})
|
||||
metric_readers = []
|
||||
# Get existing meter provider if any
|
||||
existing_provider = metrics.get_meter_provider()
|
||||
|
||||
# configure prometheus exporter
|
||||
self.prometheus_enabled = prometheus_enabled
|
||||
if prometheus_enabled:
|
||||
metric_readers.append(PrometheusMetricReader())
|
||||
# Check if FastAPI instrumentation is already set up
|
||||
if hasattr(existing_provider, "get_meter") and existing_provider.get_meter("http.server"):
|
||||
self._meter_provider = existing_provider
|
||||
else:
|
||||
resource = Resource.create({"service.name": "langflow"})
|
||||
metric_readers = []
|
||||
if self.prometheus_enabled:
|
||||
metric_readers.append(PrometheusMetricReader())
|
||||
|
||||
self._meter_provider = MeterProvider(resource=resource, metric_readers=metric_readers)
|
||||
metrics.set_meter_provider(self._meter_provider)
|
||||
self._meter_provider = MeterProvider(resource=resource, metric_readers=metric_readers)
|
||||
metrics.set_meter_provider(self._meter_provider)
|
||||
|
||||
self.meter = self._meter_provider.get_meter(langflow_meter_name)
|
||||
|
||||
|
|
@ -163,11 +173,12 @@ class OpenTelemetry(metaclass=ThreadSafeSingletonMetaUsingWeakref):
|
|||
msg = f"Key '{name}' does not match metric name '{metric.name}'"
|
||||
raise ValueError(msg)
|
||||
if name not in self._metrics:
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore")
|
||||
self._metrics[metric.name] = self._create_metric(metric)
|
||||
self._metrics[metric.name] = self._create_metric(metric)
|
||||
|
||||
OpenTelemetry._initialized = True
|
||||
|
||||
def _create_metric(self, metric):
|
||||
# Remove _created_instruments check
|
||||
if metric.name in self._metrics:
|
||||
return self._metrics[metric.name]
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue