Skip to content

Commit

Permalink
Return proxy instruments from ProxyMeter (open-telemetry#2169)
Browse files Browse the repository at this point in the history
  • Loading branch information
aabmass authored and ocelotl committed Nov 17, 2021
1 parent 4e0b705 commit c2a59ff
Show file tree
Hide file tree
Showing 2 changed files with 233 additions and 53 deletions.
192 changes: 140 additions & 52 deletions opentelemetry-api/src/opentelemetry/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
from abc import ABC, abstractmethod
from logging import getLogger
from os import environ
from typing import Optional, cast
from threading import Lock
from typing import List, Optional, cast

from opentelemetry.environment_variables import OTEL_PYTHON_METER_PROVIDER
from opentelemetry.metrics.instrument import (
Expand All @@ -41,7 +42,15 @@
ObservableGauge,
ObservableUpDownCounter,
UpDownCounter,
_ProxyCounter,
_ProxyHistogram,
_ProxyInstrument,
_ProxyObservableCounter,
_ProxyObservableGauge,
_ProxyObservableUpDownCounter,
_ProxyUpDownCounter,
)
from opentelemetry.util._once import Once
from opentelemetry.util._providers import _load_provider

_logger = getLogger(__name__)
Expand Down Expand Up @@ -70,18 +79,33 @@ def get_meter(
return _DefaultMeter(name, version=version, schema_url=schema_url)


class ProxyMeterProvider(MeterProvider):
class _ProxyMeterProvider(MeterProvider):
def __init__(self) -> None:
self._lock = Lock()
self._meters: List[_ProxyMeter] = []
self._real_meter_provider: Optional[MeterProvider] = None

def get_meter(
self,
name,
version=None,
schema_url=None,
) -> "Meter":
if _METER_PROVIDER:
return _METER_PROVIDER.get_meter(
name, version=version, schema_url=schema_url
)
return ProxyMeter(name, version=version, schema_url=schema_url)
with self._lock:
if self._real_meter_provider is not None:
return self._real_meter_provider.get_meter(
name, version, schema_url
)

meter = _ProxyMeter(name, version=version, schema_url=schema_url)
self._meters.append(meter)
return meter

def on_set_meter_provider(self, meter_provider: MeterProvider) -> None:
with self._lock:
self._real_meter_provider = meter_provider
for meter in self._meters:
meter.on_set_meter_provider(meter_provider)


class Meter(ABC):
Expand Down Expand Up @@ -225,51 +249,109 @@ def create_observable_up_down_counter(
self._secure_instrument_name(name)


class ProxyMeter(Meter):
class _ProxyMeter(Meter):
def __init__(
self,
name,
version=None,
schema_url=None,
):
super().__init__(name, version=version, schema_url=schema_url)
self._lock = Lock()
self._instruments: List[_ProxyInstrument] = []
self._real_meter: Optional[Meter] = None
self._noop_meter = _DefaultMeter(
name, version=version, schema_url=schema_url

def on_set_meter_provider(self, meter_provider: MeterProvider) -> None:
"""Called when a real meter provider is set on the creating _ProxyMeterProvider
Creates a real backing meter for this instance and notifies all created
instruments so they can create real backing instruments.
"""
real_meter = meter_provider.get_meter(
self._name, self._version, self._schema_url
)

@property
def _meter(self) -> Meter:
if self._real_meter is not None:
return self._real_meter

if _METER_PROVIDER:
self._real_meter = _METER_PROVIDER.get_meter(
self._name,
self._version,
)
return self._real_meter
return self._noop_meter
with self._lock:
self._real_meter = real_meter
# notify all proxy instruments of the new meter so they can create
# real instruments to back themselves
for instrument in self._instruments:
instrument.on_meter_set(real_meter)

def create_counter(self, *args, **kwargs) -> Counter:
return self._meter.create_counter(*args, **kwargs)
def create_counter(self, name, unit="", description="") -> Counter:
with self._lock:
if self._real_meter:
return self._real_meter.create_counter(name, unit, description)
proxy = _ProxyCounter(name, unit, description)
self._instruments.append(proxy)
return proxy

def create_up_down_counter(self, *args, **kwargs) -> UpDownCounter:
return self._meter.create_up_down_counter(*args, **kwargs)
def create_up_down_counter(
self, name, unit="", description=""
) -> UpDownCounter:
with self._lock:
if self._real_meter:
return self._real_meter.create_up_down_counter(
name, unit, description
)
proxy = _ProxyUpDownCounter(name, unit, description)
self._instruments.append(proxy)
return proxy

def create_observable_counter(self, *args, **kwargs) -> ObservableCounter:
return self._meter.create_observable_counter(*args, **kwargs)
def create_observable_counter(
self, name, callback, unit="", description=""
) -> ObservableCounter:
with self._lock:
if self._real_meter:
return self._real_meter.create_observable_counter(
name, callback, unit, description
)
proxy = _ProxyObservableCounter(
name, callback, unit=unit, description=description
)
self._instruments.append(proxy)
return proxy

def create_histogram(self, *args, **kwargs) -> Histogram:
return self._meter.create_histogram(*args, **kwargs)
def create_histogram(self, name, unit="", description="") -> Histogram:
with self._lock:
if self._real_meter:
return self._real_meter.create_histogram(
name, unit, description
)
proxy = _ProxyHistogram(name, unit, description)
self._instruments.append(proxy)
return proxy

def create_observable_gauge(self, *args, **kwargs) -> ObservableGauge:
return self._meter.create_observable_gauge(*args, **kwargs)
def create_observable_gauge(
self, name, callback, unit="", description=""
) -> ObservableGauge:
with self._lock:
if self._real_meter:
return self._real_meter.create_observable_gauge(
name, callback, unit, description
)
proxy = _ProxyObservableGauge(
name, callback, unit=unit, description=description
)
self._instruments.append(proxy)
return proxy

def create_observable_up_down_counter(
self, *args, **kwargs
self, name, callback, unit="", description=""
) -> ObservableUpDownCounter:
return self._meter.create_observable_up_down_counter(*args, **kwargs)
with self._lock:
if self._real_meter:
return self._real_meter.create_observable_up_down_counter(
name,
callback,
unit,
description,
)
proxy = _ProxyObservableUpDownCounter(
name, callback, unit=unit, description=description
)
self._instruments.append(proxy)
return proxy


class _DefaultMeter(Meter):
Expand Down Expand Up @@ -329,8 +411,9 @@ def create_observable_up_down_counter(
)


_METER_PROVIDER = None
_PROXY_METER_PROVIDER = None
_METER_PROVIDER_SET_ONCE = Once()
_METER_PROVIDER: Optional[MeterProvider] = None
_PROXY_METER_PROVIDER = _ProxyMeterProvider()


def get_meter(
Expand All @@ -350,35 +433,40 @@ def get_meter(
return meter_provider.get_meter(name, version)


def _set_meter_provider(meter_provider: MeterProvider, log: bool) -> None:
def set_mp() -> None:
global _METER_PROVIDER # pylint: disable=global-statement
_METER_PROVIDER = meter_provider

# gives all proxies real instruments off the newly set meter provider
_PROXY_METER_PROVIDER.on_set_meter_provider(meter_provider)

did_set = _METER_PROVIDER_SET_ONCE.do_once(set_mp)

if log and not did_set:
_logger.warning("Overriding of current MeterProvider is not allowed")


def set_meter_provider(meter_provider: MeterProvider) -> None:
"""Sets the current global :class:`~.MeterProvider` object.
This can only be done once, a warning will be logged if any furter attempt
is made.
"""
global _METER_PROVIDER # pylint: disable=global-statement

if _METER_PROVIDER is not None:
_logger.warning("Overriding of current MeterProvider is not allowed")
return

_METER_PROVIDER = meter_provider
_set_meter_provider(meter_provider, log=True)


def get_meter_provider() -> MeterProvider:
"""Gets the current global :class:`~.MeterProvider` object."""
# pylint: disable=global-statement
global _METER_PROVIDER
global _PROXY_METER_PROVIDER

if _METER_PROVIDER is None:
if OTEL_PYTHON_METER_PROVIDER not in environ.keys():
if _PROXY_METER_PROVIDER is None:
_PROXY_METER_PROVIDER = ProxyMeterProvider()
return _PROXY_METER_PROVIDER

_METER_PROVIDER = cast(
"MeterProvider",
_load_provider(OTEL_PYTHON_METER_PROVIDER, "meter_provider"),
meter_provider: MeterProvider = _load_provider(
OTEL_PYTHON_METER_PROVIDER, "meter_provider"
)
return _METER_PROVIDER
_set_meter_provider(meter_provider, log=False)

# _METER_PROVIDER will have been set by one thread
return cast("MeterProvider", _METER_PROVIDER)
Loading

0 comments on commit c2a59ff

Please sign in to comment.