Skip to content

Commit

Permalink
Merge branch 'main' into process_resource_attributes
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv authored May 6, 2022
2 parents 702dd34 + 07a5b64 commit eadf665
Show file tree
Hide file tree
Showing 3 changed files with 596 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
_Aggregation,
_convert_aggregation_temporality,
_PointVarT,
_SumAggregation,
)
from opentelemetry.sdk._metrics._internal.sdk_configuration import (
SdkConfiguration,
Expand Down Expand Up @@ -52,6 +53,39 @@ def __init__(
self._attributes_previous_point: Dict[frozenset, _PointVarT] = {}
self._lock = Lock()
self._instrument_class_aggregation = instrument_class_aggregation
self._name = self._view._name or self._instrument.name
self._description = (
self._view._description or self._instrument.description
)
if not isinstance(self._view._aggregation, DefaultAggregation):
self._aggregation = self._view._aggregation._create_aggregation(
self._instrument
)
else:
self._aggregation = self._instrument_class_aggregation[
self._instrument.__class__
]._create_aggregation(self._instrument)

def conflicts(self, other: "_ViewInstrumentMatch") -> bool:
# pylint: disable=protected-access

result = (
self._name == other._name
and self._instrument.unit == other._instrument.unit
# The aggregation class is being used here instead of data point
# type since they are functionally equivalent.
and self._aggregation.__class__ == other._aggregation.__class__
)
if isinstance(self._aggregation, _SumAggregation):
result = (
result
and self._aggregation._instrument_is_monotonic
== other._aggregation._instrument_is_monotonic
and self._aggregation._instrument_temporality
== other._aggregation._instrument_temporality
)

return result

# pylint: disable=protected-access
def consume_measurement(self, measurement: Measurement) -> None:
Expand Down Expand Up @@ -118,14 +152,11 @@ def collect(

yield Metric(
attributes=dict(attributes),
description=(
self._view._description
or self._instrument.description
),
description=self._description,
instrumentation_scope=(
self._instrument.instrumentation_scope
),
name=self._view._name or self._instrument.name,
name=self._name,
resource=self._sdk_config.resource,
unit=self._instrument.unit,
point=_convert_aggregation_temporality(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,27 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from logging import getLogger
from threading import RLock
from typing import Dict, Iterable, List

from opentelemetry._metrics import Instrument
from opentelemetry._metrics import Asynchronous, Instrument
from opentelemetry.sdk._metrics._internal._view_instrument_match import (
_ViewInstrumentMatch,
)
from opentelemetry.sdk._metrics._internal.aggregation import Aggregation
from opentelemetry.sdk._metrics._internal.sdk_configuration import (
SdkConfiguration,
)
from opentelemetry.sdk._metrics._internal.view import View
from opentelemetry.sdk._metrics.aggregation import (
Aggregation,
ExplicitBucketHistogramAggregation,
)
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric

_logger = getLogger(__name__)

_DEFAULT_VIEW = View(instrument_name="")


Expand All @@ -40,7 +46,7 @@ def __init__(
) -> None:
self._lock = RLock()
self._sdk_config = sdk_config
self._view_instrument_match: Dict[
self._instrument_view_instrument_matches: Dict[
Instrument, List[_ViewInstrumentMatch]
] = {}
self._instrument_class_aggregation = instrument_class_aggregation
Expand All @@ -51,29 +57,20 @@ def _get_or_init_view_instrument_match(
# Optimistically get the relevant views for the given instrument. Once set for a given
# instrument, the mapping will never change

if instrument in self._view_instrument_match:
return self._view_instrument_match[instrument]
if instrument in self._instrument_view_instrument_matches:
return self._instrument_view_instrument_matches[instrument]

with self._lock:
# double check if it was set before we held the lock
if instrument in self._view_instrument_match:
return self._view_instrument_match[instrument]
if instrument in self._instrument_view_instrument_matches:
return self._instrument_view_instrument_matches[instrument]

# not present, hold the lock and add a new mapping
view_instrument_matches = []
for view in self._sdk_config.views:
# pylint: disable=protected-access
if view._match(instrument):
view_instrument_matches.append(
_ViewInstrumentMatch(
view=view,
instrument=instrument,
sdk_config=self._sdk_config,
instrument_class_aggregation=(
self._instrument_class_aggregation
),
)
)

self._handle_view_instrument_match(
instrument, view_instrument_matches
)

# if no view targeted the instrument, use the default
if not view_instrument_matches:
Expand All @@ -87,7 +84,10 @@ def _get_or_init_view_instrument_match(
),
)
)
self._view_instrument_match[instrument] = view_instrument_matches
self._instrument_view_instrument_matches[
instrument
] = view_instrument_matches

return view_instrument_matches

def consume_measurement(self, measurement: Measurement) -> None:
Expand All @@ -114,7 +114,7 @@ def collect(
with self._lock:
for (
view_instrument_matches
) in self._view_instrument_match.values():
) in self._instrument_view_instrument_matches.values():
for view_instrument_match in view_instrument_matches:
metrics.extend(
view_instrument_match.collect(
Expand All @@ -123,3 +123,72 @@ def collect(
)

return metrics

def _handle_view_instrument_match(
self,
instrument: Instrument,
view_instrument_matches: List["_ViewInstrumentMatch"],
) -> None:
for view in self._sdk_config.views:
# pylint: disable=protected-access
if not view._match(instrument):
continue

if not self._check_view_instrument_compatibility(view, instrument):
continue

new_view_instrument_match = _ViewInstrumentMatch(
view=view,
instrument=instrument,
sdk_config=self._sdk_config,
instrument_class_aggregation=(
self._instrument_class_aggregation
),
)

for (
existing_view_instrument_matches
) in self._instrument_view_instrument_matches.values():
for (
existing_view_instrument_match
) in existing_view_instrument_matches:
if existing_view_instrument_match.conflicts(
new_view_instrument_match
):

_logger.warning(
"Views %s and %s will cause conflicting "
"metrics identities",
existing_view_instrument_match._view,
new_view_instrument_match._view,
)

view_instrument_matches.append(new_view_instrument_match)

@staticmethod
def _check_view_instrument_compatibility(
view: View, instrument: Instrument
) -> bool:
"""
Checks if a view and an instrument are compatible.
Returns `true` if they are compatible and a `_ViewInstrumentMatch`
object should be created, `false` otherwise.
"""

result = True

# pylint: disable=protected-access
if isinstance(instrument, Asynchronous) and isinstance(
view._aggregation, ExplicitBucketHistogramAggregation
):
_logger.warning(
"View %s and instrument %s will produce "
"semantic errors when matched, the view "
"has not been applied.",
view,
instrument,
)
result = False

return result
Loading

0 comments on commit eadf665

Please sign in to comment.