Skip to content

Commit

Permalink
Add _ViewInstrumentMatch
Browse files Browse the repository at this point in the history
Fixes #2296
  • Loading branch information
ocelotl committed Dec 3, 2021
1 parent c19cf0b commit a7c28cb
Showing 1 changed file with 129 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Callable, Dict, Iterable, List
from dataclasses import replace
from logging import getLogger
from threading import Lock
from typing import Callable, Dict, Iterable, List, Optional

from opentelemetry.sdk._metrics.aggregation import Aggregation
from opentelemetry.sdk._metrics.export import Metric
from opentelemetry.sdk._metrics.aggregation import Aggregation, _PointVarT
from opentelemetry.sdk._metrics.export import (
AGGREGATION_TEMPORALITY_CUMULATIVE,
AGGREGATION_TEMPORALITY_DELTA,
Gauge,
Metric,
Sum,
)
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk.resources import Resource

_logger = getLogger(__name__)


class _ViewInstrumentMatch:
def __init__(
Expand All @@ -32,10 +43,122 @@ def __init__(
exemplar_reservoir: Callable,
resource: Resource,
):
pass
self._name = name
self._unit = unit
self._description = description

if attribute_keys is None:
self._attribute_keys = set()
else:
self._attribute_keys = set(attribute_keys.items())

self._extra_dimensions = extra_dimensions
self._aggregation = aggregation
self._exemplar_reservoir = exemplar_reservoir
self._attributes_aggregation = {}
self._attributes_previous_value = {}
self._lock = Lock()

def _process(self, measurement: Measurement) -> None:
pass
if measurement.attributes is None:
attributes = {}

else:
attributes = measurement.attributes

attributes = frozenset(
set(attributes).difference(self._attribute_keys)
)

if attributes not in self._attributes_aggregation.keys():
# FIXME how to handle aggregations that support config?
with self._lock:
self._attributes_aggregation[attributes] = self._aggregation[
attributes
]

self._attributes_aggregation[attributes].aggregate(measurement.amount)

def _collect(self, temporality: int) -> Iterable[Metric]:
pass
with self._lock:
for (
attributes,
aggregation,
) in self._attributes_aggregation.items():

previous_point = self._attributes_previous_point.get(
attributes
)

current_point = aggregation.make_point()

if current_point is not None:

yield Metric(
attributes=dict(attributes),
# FIXME check this is right
description=self._description,
# FIXME get instrumentation_info from the instrument
instrumentation_info=self._instrumentation_info,
name=self._name,
resource=self._resource,
unit=self._unit,
point=_convert_aggregation_temporality(
previous_point,
current_point,
AGGREGATION_TEMPORALITY_CUMULATIVE,
),
)


def _convert_aggregation_temporality(
previous_point: Optional[_PointVarT],
current_point: _PointVarT,
aggregation_temporality: int,
) -> _PointVarT:

previous_point_type = type(previous_point)
current_point_type = type(current_point)

if previous_point is not None and type(previous_point) is not type(
current_point
):
_logger.warning(
"convert_aggregation_temporality called with mismatched "
"point types: %s and %s",
previous_point_type,
current_point_type,
)

return current_point

if current_point_type is Sum:
if previous_point is None:

return replace(
current_point, aggregation_temporality=aggregation_temporality
)

if current_point.aggregation_temporality is aggregation_temporality:
return current_point

if aggregation_temporality == AGGREGATION_TEMPORALITY_DELTA:
value = current_point.value - previous_point.value

else:
value = current_point.value + previous_point.value

is_monotonic = (
previous_point.is_monotonic and current_point.is_monotonic
)

return Sum(
aggregation_temporality=aggregation_temporality,
is_monotonic=is_monotonic,
start_time_unix_nano=previous_point.start_time_unix_nano,
time_unix_nano=current_point.time_unix_nano,
value=value,
)

elif current_point_type is Gauge:
return current_point

0 comments on commit a7c28cb

Please sign in to comment.