From 7bd105007e68aaef1425c29d13ef1b4cb551ad3d Mon Sep 17 00:00:00 2001 From: ming Date: Wed, 10 Jul 2024 15:01:38 -0400 Subject: [PATCH] feat: add opentelemetry utility functions and unit tests (#2570) * add opentelemetry utility functions and unit tests * review comments * add label validation --- .vscode/launch.json | 3 +- .../services/telemetry/opentelemetry.py | 223 +++++++++++++++++- tests/unit/test_telemetry.py | 112 +++++++++ 3 files changed, 324 insertions(+), 14 deletions(-) create mode 100644 tests/unit/test_telemetry.py diff --git a/.vscode/launch.json b/.vscode/launch.json index 5929f0eff..bd9f47334 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -23,7 +23,8 @@ "jinja": true, "justMyCode": false, "env": { - "LANGFLOW_LOG_LEVEL": "debug" + "LANGFLOW_LOG_LEVEL": "debug", + "LANGFLOW_PROMETHEUS_PORT": "9090" }, "envFile": "${workspaceFolder}/.env" }, diff --git a/src/backend/base/langflow/services/telemetry/opentelemetry.py b/src/backend/base/langflow/services/telemetry/opentelemetry.py index 50236dd96..8e6bd0284 100644 --- a/src/backend/base/langflow/services/telemetry/opentelemetry.py +++ b/src/backend/base/langflow/services/telemetry/opentelemetry.py @@ -1,30 +1,227 @@ +from enum import Enum from opentelemetry import metrics from opentelemetry.exporter.prometheus import PrometheusMetricReader +from opentelemetry.metrics import Observation, CallbackOptions +from opentelemetry.metrics._internal.instrument import Counter, Histogram, UpDownCounter from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.resources import Resource +from typing import Any, Dict, Mapping, Tuple, Union +from weakref import WeakValueDictionary + +import threading + +# a default OpenTelelmetry meter name +langflow_meter_name = "langflow" + +""" +If the measurement values are non-additive, use an Asynchronous Gauge. + ObservableGauge reports the current absolute value when observed. +If the measurement values are additive: If the value is monotonically increasing - use an Asynchronous Counter. +If the value is NOT monotonically increasing - use an Asynchronous UpDownCounter. + UpDownCounter reports changes/deltas to the last observed value. +If the measurement values are additive and you want to observe the distribution of the values - use a Histogram. +""" -class OpenTelemetry: +class MetricType(Enum): + COUNTER = "counter" + OBSERVABLE_GAUGE = "observable_gauge" + HISTOGRAM = "histogram" + UP_DOWN_COUNTER = "up_down_counter" + + +mandatory_label = True +optional_label = False + + +class ObservableGaugeWrapper: + """ + Wrapper class for ObservableGauge + Since OpenTelemetry does not provide a way to set the value of an ObservableGauge, + instead it uses a callback function to get the value, we need to create a wrapper class. + """ + + def __init__(self, name: str, description: str, unit: str): + self._values: Dict[Tuple[Tuple[str, str], ...], float] = {} + self._meter = metrics.get_meter(langflow_meter_name) + self._gauge = self._meter.create_observable_gauge( + name=name, description=description, unit=unit, callbacks=[self._callback] + ) + + def _callback(self, options: CallbackOptions): + return [Observation(value, attributes=dict(labels)) for labels, value in self._values.items()] + + # return [Observation(self._value)] + + def set_value(self, value: float, labels: Mapping[str, str]): + self._values[tuple(sorted(labels.items()))] = value + + +class Metric: + def __init__( + self, + name: str, + description: str, + type: MetricType, + labels: Dict[str, bool], + unit: str = "", + ): + self.name = name + self.description = description + self.type = type + self.unit = unit + self.labels = labels + self.mandatory_labels = [label for label, required in labels.items() if required] + self.allowed_labels = [label for label in labels.keys()] + + def validate_labels(self, labels: Mapping[str, str]): + """ + Validate if the labels provided are valid + """ + if labels is None or len(labels) == 0: + raise ValueError("Labels must be provided for the metric") + + missing_labels = set(self.mandatory_labels) - set(labels.keys()) + if missing_labels: + raise ValueError(f"Missing required labels: {missing_labels}") + + def __repr__(self): + return f"Metric(name='{self.name}', description='{self.description}', type={self.type}, unit='{self.unit}')" + + +class ThreadSafeSingletonMetaUsingWeakref(type): + """ + Thread-safe Singleton metaclass using WeakValueDictionary + """ + + _instances: WeakValueDictionary[Any, Any] = WeakValueDictionary() + _lock: threading.Lock = threading.Lock() + + def __call__(cls, *args, **kwargs): + if cls not in cls._instances: + with cls._lock: + if cls not in cls._instances: + instance = super(ThreadSafeSingletonMetaUsingWeakref, cls).__call__(*args, **kwargs) + cls._instances[cls] = instance + return cls._instances[cls] + + +class OpenTelemetry(metaclass=ThreadSafeSingletonMetaUsingWeakref): + _metrics_registry: Dict[str, Metric] = dict() + + def _add_metric(self, name: str, description: str, unit: str, metric_type: MetricType, labels: Dict[str, bool]): + metric = Metric(name=name, description=description, type=metric_type, unit=unit, labels=labels) + self._metrics_registry[name] = metric + if labels is None or len(labels) == 0: + raise ValueError("Labels must be provided for the metric upon registration") + + def _register_metric(self): + """ + Define any custom metrics here + A thread safe singleton class to manage metrics + """ + self._add_metric( + name="file_uploads", + description="The uploaded file size in bytes", + unit="bytes", + metric_type=MetricType.OBSERVABLE_GAUGE, + labels={"flow_id": mandatory_label}, + ) + self._add_metric( + name="num_files_uploaded", + description="The number of file uploaded", + unit="", + metric_type=MetricType.COUNTER, + labels={"flow_id": mandatory_label}, + ) + + _metrics: Dict[str, Union[Counter, ObservableGaugeWrapper, Histogram, UpDownCounter]] = {} + def __init__(self, prometheus_enabled: bool = True): + self._register_metric() + resource = Resource.create({"service.name": "langflow"}) meter_provider = MeterProvider(resource=resource) + + # configure prometheus exporter self.prometheus_enabled = prometheus_enabled if prometheus_enabled: reader = PrometheusMetricReader() meter_provider = MeterProvider(resource=resource, metric_readers=[reader]) metrics.set_meter_provider(meter_provider) - self.meter = meter_provider.get_meter("langflow") + self.meter = meter_provider.get_meter(langflow_meter_name) - self._register_metrics() + for name, metric in self._metrics_registry.items(): + # enforce the key in the mapping and metric's name are the same + # this error can get caught at unit test + if name != metric.name: + raise ValueError(f"Key '{name}' does not match metric name '{metric.name}'") + if metric.type == MetricType.COUNTER: + counter = self.meter.create_counter( + name=metric.name, + unit=metric.unit, + description=metric.description, + ) + self._metrics[metric.name] = counter + elif metric.type == MetricType.OBSERVABLE_GAUGE: + gauge = ObservableGaugeWrapper( + name=metric.name, + description=metric.description, + unit=metric.unit, + ) + self._metrics[metric.name] = gauge + elif metric.type == MetricType.UP_DOWN_COUNTER: + up_down_counter = self.meter.create_up_down_counter( + name=metric.name, + unit=metric.unit, + description=metric.description, + ) + self._metrics[metric.name] = up_down_counter + elif metric.type == MetricType.HISTOGRAM: + histogram = self.meter.create_histogram( + name=metric.name, + unit=metric.unit, + description=metric.description, + ) + self._metrics[metric.name] = histogram + else: + raise ValueError(f"Unknown metric type: {metric.type}") - def _register_metrics(self): - pass - """ - metrics can be registered in this function - self.counter = self.meter.create_counter( - name = "requests", - unit = "bytes", - description="The number of requests", - ) - """ + def validate_labels(self, metric_name: str, labels: Mapping[str, str]): + reg = self._metrics_registry.get(metric_name) + if reg is None: + raise ValueError(f"Metric '{metric_name}' is not registered") + reg.validate_labels(labels) + + def increment_counter(self, metric_name: str, labels: Mapping[str, str], value: float = 1.0): + self.validate_labels(metric_name, labels) + counter = self._metrics.get(metric_name) + if isinstance(counter, Counter): + counter.add(value, labels) + else: + raise ValueError(f"Metric '{metric_name}' is not a counter") + + def up_down_counter(self, metric_name: str, value: float, labels: Mapping[str, str]): + self.validate_labels(metric_name, labels) + up_down_counter = self._metrics.get(metric_name) + if isinstance(up_down_counter, UpDownCounter): + up_down_counter.add(value, labels) + else: + raise ValueError(f"Metric '{metric_name}' is not an up down counter") + + def update_gauge(self, metric_name: str, value: float, labels: Mapping[str, str]): + self.validate_labels(metric_name, labels) + gauge = self._metrics.get(metric_name) + if isinstance(gauge, ObservableGaugeWrapper): + gauge.set_value(value, labels) + else: + raise ValueError(f"Metric '{metric_name}' is not a gauge") + + def observe_histogram(self, metric_name: str, value: float, labels: Mapping[str, str]): + self.validate_labels(metric_name, labels) + histogram = self._metrics.get(metric_name) + if isinstance(histogram, Histogram): + histogram.record(value, labels) + else: + raise ValueError(f"Metric '{metric_name}' is not a histogram") diff --git a/tests/unit/test_telemetry.py b/tests/unit/test_telemetry.py new file mode 100644 index 000000000..abef3bfd6 --- /dev/null +++ b/tests/unit/test_telemetry.py @@ -0,0 +1,112 @@ +import pytest +import threading +from langflow.services.telemetry.opentelemetry import OpenTelemetry +from concurrent.futures import ThreadPoolExecutor, as_completed + + +fixed_labels = {"flow_id": "this_flow_id", "service": "this", "user": "that"} + + +@pytest.fixture +def opentelemetry_instance(): + return OpenTelemetry() + + +def test_init(opentelemetry_instance): + assert isinstance(opentelemetry_instance, OpenTelemetry) + assert len(opentelemetry_instance._metrics) > 1 + assert len(opentelemetry_instance._metrics) == len(opentelemetry_instance._metrics_registry) == 2 + assert "file_uploads" in opentelemetry_instance._metrics + + +def test_gauge(opentelemetry_instance): + opentelemetry_instance.update_gauge("file_uploads", 1024, fixed_labels) + + +def test_gauge_with_counter_method(opentelemetry_instance): + with pytest.raises(ValueError, match="Metric 'file_uploads' is not a counter"): + opentelemetry_instance.increment_counter(metric_name="file_uploads", value=1, labels=fixed_labels) + + +def test_gauge_with_historgram_method(opentelemetry_instance): + with pytest.raises(ValueError, match="Metric 'file_uploads' is not a histogram"): + opentelemetry_instance.observe_histogram("file_uploads", 1, fixed_labels) + + +def test_gauge_with_up_down_counter_method(opentelemetry_instance): + with pytest.raises(ValueError, match="Metric 'file_uploads' is not an up down counter"): + opentelemetry_instance.up_down_counter("file_uploads", 1, labels=fixed_labels) + + +def test_increment_counter(opentelemetry_instance): + opentelemetry_instance.increment_counter(metric_name="num_files_uploaded", value=5, labels=fixed_labels) + + +def test_increment_counter_empty_label(opentelemetry_instance): + with pytest.raises(ValueError, match="Labels must be provided for the metric"): + opentelemetry_instance.increment_counter(metric_name="num_files_uploaded", value=5, labels={}) + + +def test_increment_counter_missing_mandatory_label(opentelemetry_instance): + with pytest.raises(ValueError, match="Missing required labels: {'flow_id'}"): + opentelemetry_instance.increment_counter(metric_name="num_files_uploaded", value=5, labels={"service": "one"}) + + +def test_increment_counter_unregisted_metric(opentelemetry_instance): + with pytest.raises(ValueError, match="Metric 'num_files_uploaded_1' is not registered"): + opentelemetry_instance.increment_counter(metric_name="num_files_uploaded_1", value=5, labels=fixed_labels) + + +def test_opentelementry_singleton(opentelemetry_instance): + opentelemetry_instance_2 = OpenTelemetry() + assert opentelemetry_instance is opentelemetry_instance_2 + + opentelemetry_instance_3 = OpenTelemetry(prometheus_enabled=False) + assert opentelemetry_instance is opentelemetry_instance_3 + assert opentelemetry_instance.prometheus_enabled == opentelemetry_instance_3.prometheus_enabled + + +def test_missing_labels(opentelemetry_instance): + with pytest.raises(ValueError, match="Labels must be provided for the metric"): + opentelemetry_instance.increment_counter(metric_name="num_files_uploaded", labels=None, value=1.0) + with pytest.raises(ValueError, match="Labels must be provided for the metric"): + opentelemetry_instance.up_down_counter("num_files_uploaded", 1, None) + with pytest.raises(ValueError, match="Labels must be provided for the metric"): + opentelemetry_instance.update_gauge(metric_name="num_files_uploaded", value=1.0, labels=dict()) + with pytest.raises(ValueError, match="Labels must be provided for the metric"): + opentelemetry_instance.observe_histogram("num_files_uploaded", 1, dict()) + + +def test_multithreaded_singleton(): + def create_instance(): + return OpenTelemetry() + + # Create instances in multiple threads + with ThreadPoolExecutor(max_workers=10) as executor: + futures = [executor.submit(create_instance) for _ in range(100)] + instances = [future.result() for future in as_completed(futures)] + + # Check that all instances are the same + first_instance = instances[0] + for instance in instances[1:]: + assert instance is first_instance + + +def test_multithreaded_singleton_race_condition(): + # This test simulates a potential race condition + start_event = threading.Event() + + def create_instance(): + start_event.wait() # Wait for all threads to be ready + return OpenTelemetry() + + # Create instances in multiple threads, all starting at the same time + with ThreadPoolExecutor(max_workers=100) as executor: + futures = [executor.submit(create_instance) for _ in range(100)] + start_event.set() # Start all threads simultaneously + instances = [future.result() for future in as_completed(futures)] + + # Check that all instances are the same + first_instance = instances[0] + for instance in instances[1:]: + assert instance is first_instance