Skip to content

Commit

Permalink
Add conversion to TimeSeries methods
Browse files Browse the repository at this point in the history
  • Loading branch information
shovnik committed Nov 24, 2020
1 parent 4b0b438 commit ed1f7d8
Show file tree
Hide file tree
Showing 3 changed files with 271 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@
((#180)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/180])
- Add Exporter constructor validation methods
((#206)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/206])
- Add conversion to TimeSeries methods
((#207)[https://github.com/open-telemetry/opentelemetry-python-contrib/pull/207])
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import re
from typing import Dict, Sequence

from opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2 import (
WriteRequest,
)
from opentelemetry.exporter.prometheus_remote_write.gen.types_pb2 import (
Label,
Sample,
Expand All @@ -24,6 +28,13 @@
MetricsExporter,
MetricsExportResult,
)
from opentelemetry.sdk.metrics.export.aggregate import (
HistogramAggregator,
LastValueAggregator,
MinMaxSumCountAggregator,
SumAggregator,
ValueObserverAggregator,
)


class PrometheusRemoteWriteMetricsExporter(MetricsExporter):
Expand Down Expand Up @@ -145,31 +156,88 @@ def shutdown(self) -> None:
def convert_to_timeseries(
self, export_records: Sequence[ExportRecord]
) -> Sequence[TimeSeries]:
raise NotImplementedError()
converter_map = {
MinMaxSumCountAggregator: self.convert_from_min_max_sum_count,
SumAggregator: self.convert_from_sum,
HistogramAggregator: self.convert_from_histogram,
LastValueAggregator: self.convert_from_last_value,
ValueObserverAggregator: self.convert_from_last_value,
}
timeseries = []
for export_record in export_records:
aggregator_type = type(export_record.aggregator)
converter = converter_map.get(aggregator_type)
if not converter:
raise ValueError(
str(aggregator_type) + " conversion is not supported"
)
timeseries.extend(converter(export_record))
return timeseries

def convert_from_sum(self, sum_record: ExportRecord) -> TimeSeries:
raise NotImplementedError()
name = sum_record.instrument.name
value = sum_record.aggregator.checkpoint
return [self.create_timeseries(sum_record, name, value)]

def convert_from_min_max_sum_count(
self, min_max_sum_count_record: ExportRecord
) -> TimeSeries:
raise NotImplementedError()
timeseries = []
agg_types = ["min", "max", "sum", "count"]
for agg_type in agg_types:
name = min_max_sum_count_record.instrument.name + "_" + agg_type
value = getattr(
min_max_sum_count_record.aggregator.checkpoint, agg_type
)
timeseries.append(
self.create_timeseries(min_max_sum_count_record, name, value)
)
return timeseries

def convert_from_histogram(
self, histogram_record: ExportRecord
) -> TimeSeries:
raise NotImplementedError()
count = 0
timeseries = []
for bound in histogram_record.aggregator.checkpoint.keys():
bb = "+Inf" if bound == float("inf") else str(bound)
name = (
histogram_record.instrument.name + '_bucket{le="' + bb + '"}'
)
value = histogram_record.aggregator.checkpoint[bound]
timeseries.append(
self.create_timeseries(histogram_record, name, value)
)
count += value
name = histogram_record.instrument.name + "_count"
timeseries.append(
self.create_timeseries(histogram_record, name, float(count))
)
return timeseries

def convert_from_last_value(
self, last_value_record: ExportRecord
) -> TimeSeries:
raise NotImplementedError()
name = last_value_record.instrument.name
value = last_value_record.aggregator.checkpoint
return [self.create_timeseries(last_value_record, name, value)]

def convert_from_value_observer(
self, value_observer_record: ExportRecord
) -> TimeSeries:
raise NotImplementedError()

timeseries = []
agg_types = ["min", "max", "sum", "count", "last"]
for agg_type in agg_types:
name = value_observer_record.instrument.name + "_" + agg_type
value = getattr(
value_observer_record.aggregator.checkpoint, agg_type
)
timeseries.append(
self.create_timeseries(value_observer_record, name, value)
)
return timeseries

# TODO: Implement convert from quantile once supported by SDK for Prometheus Summaries
def convert_from_quantile(
self, summary_record: ExportRecord
) -> TimeSeries:
Expand All @@ -179,13 +247,37 @@ def convert_from_quantile(
def create_timeseries(
self, export_record: ExportRecord, name, value: float
) -> TimeSeries:
raise NotImplementedError()
timeseries = TimeSeries()
# Add name label, record labels and resource labels
timeseries.labels.append(self.create_label("__name__", name))
resource_attributes = export_record.resource.attributes
for label_name, label_value in resource_attributes.items():
timeseries.labels.append(
self.create_label(label_name, label_value)
)
for label in export_record.labels:
if label[0] not in resource_attributes.keys():
timeseries.labels.append(self.create_label(label[0], label[1]))
# Add sample
timeseries.samples.append(
self.create_sample(
export_record.aggregator.last_update_timestamp, value
)
)
return timeseries

def create_sample(self, timestamp: int, value: float) -> Sample:
raise NotImplementedError()
sample = Sample()
sample.timestamp = int(timestamp / 1000000)
sample.value = value
return sample

def create_label(self, name: str, value: str) -> Label:
raise NotImplementedError()
label = Label()
# Label name must contain only alphanumeric characters and underscores
label.name = re.sub("[^0-9a-zA-Z_]+", "_", name)
label.value = value
return label

def build_message(self, timeseries: Sequence[TimeSeries]) -> bytes:
raise NotImplementedError()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,25 @@
# limitations under the License.

import unittest
from unittest import mock

from opentelemetry.exporter.prometheus_remote_write import (
PrometheusRemoteWriteMetricsExporter,
)
from opentelemetry.exporter.prometheus_remote_write.gen.types_pb2 import (
TimeSeries,
)
from opentelemetry.sdk.metrics import Counter
from opentelemetry.sdk.metrics.export import ExportRecord, MetricsExportResult
from opentelemetry.sdk.metrics.export.aggregate import (
HistogramAggregator,
LastValueAggregator,
MinMaxSumCountAggregator,
SumAggregator,
ValueObserverAggregator,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util import get_dict_as_key


class TestValidation(unittest.TestCase):
Expand Down Expand Up @@ -111,35 +126,151 @@ def test_invalid_tls_config_key_only_param(self):
class TestConversion(unittest.TestCase):
# Initializes test data that is reused across tests
def setUp(self):
pass
self._test_metric = Counter(
"testname", "testdesc", "testunit", int, None
)
self._exporter = PrometheusRemoteWriteMetricsExporter(
endpoint="/prom/test_endpoint"
)

def generate_record(aggregator_type):
return ExportRecord(
self._test_metric, None, aggregator_type(), Resource({}),
)

self._generate_record = generate_record

def converter_method(record, name, value):
return (type(record.aggregator), name, value)

self._converter_mock = mock.MagicMock(return_value=converter_method)

# Ensures conversion to timeseries function works with valid aggregation types
def test_valid_convert_to_timeseries(self):
pass
timeseries_mock_method = mock.Mock(return_value=["test_value"])
self._exporter.convert_from_sum = timeseries_mock_method
self._exporter.convert_from_min_max_sum_count = timeseries_mock_method
self._exporter.convert_from_histogram = timeseries_mock_method
self._exporter.convert_from_last_value = timeseries_mock_method
self._exporter.convert_from_value_observer = timeseries_mock_method
test_records = [
self._generate_record(SumAggregator),
self._generate_record(MinMaxSumCountAggregator),
self._generate_record(HistogramAggregator),
self._generate_record(LastValueAggregator),
self._generate_record(ValueObserverAggregator),
]
data = self._exporter.convert_to_timeseries(test_records)
self.assertEqual(len(data), 5)
for timeseries in data:
self.assertEqual(timeseries, "test_value")

no_type_records = [self._generate_record(lambda: None)]
with self.assertRaises(ValueError):
self._exporter.convert_to_timeseries(no_type_records)

# Ensures conversion to timeseries fails for unsupported aggregation types
def test_invalid_convert_to_timeseries(self):
pass
no_type_records = [self._generate_record(lambda: None)]
with self.assertRaises(ValueError):
self._exporter.convert_to_timeseries(no_type_records)

# Ensures sum aggregator is correctly converted to timeseries
def test_convert_from_sum(self):
pass
sum_record = self._generate_record(SumAggregator)
sum_record.aggregator.update(3)
sum_record.aggregator.update(2)
sum_record.aggregator.take_checkpoint()

self._exporter.create_timeseries = self._converter_mock()
timeseries = self._exporter.convert_from_sum(sum_record)
self.assertEqual(timeseries[0], (SumAggregator, "testname", 5))

# Ensures sum min_max_count aggregator is correctly converted to timeseries
def test_convert_from_min_max_sum_count(self):
pass
min_max_sum_count_record = self._generate_record(
MinMaxSumCountAggregator
)
min_max_sum_count_record.aggregator.update(5)
min_max_sum_count_record.aggregator.update(1)
min_max_sum_count_record.aggregator.take_checkpoint()

self._exporter.create_timeseries = self._converter_mock()
timeseries = self._exporter.convert_from_min_max_sum_count(
min_max_sum_count_record
)
self.assertEqual(
timeseries[0], (MinMaxSumCountAggregator, "testname_min", 1)
)
self.assertEqual(
timeseries[1], (MinMaxSumCountAggregator, "testname_max", 5)
)
self.assertEqual(
timeseries[2], (MinMaxSumCountAggregator, "testname_sum", 6)
)
self.assertEqual(
timeseries[3], (MinMaxSumCountAggregator, "testname_count", 2)
)

# Ensures histogram aggregator is correctly converted to timeseries
def test_convert_from_histogram(self):
pass
histogram_record = self._generate_record(HistogramAggregator)
histogram_record.aggregator.update(5)
histogram_record.aggregator.update(2)
histogram_record.aggregator.update(-1)
histogram_record.aggregator.take_checkpoint()

self._exporter.create_timeseries = self._converter_mock()
timeseries = self._exporter.convert_from_histogram(histogram_record)
self.assertEqual(
timeseries[0], (HistogramAggregator, 'testname_bucket{le="0"}', 1)
)
self.assertEqual(
timeseries[1],
(HistogramAggregator, 'testname_bucket{le="+Inf"}', 2),
)
self.assertEqual(
timeseries[2], (HistogramAggregator, "testname_count", 3)
)

# Ensures last value aggregator is correctly converted to timeseries
def test_convert_from_last_value(self):
pass
last_value_record = self._generate_record(LastValueAggregator)
last_value_record.aggregator.update(1)
last_value_record.aggregator.update(5)
last_value_record.aggregator.take_checkpoint()

self._exporter.create_timeseries = self._converter_mock()
timeseries = self._exporter.convert_from_last_value(last_value_record)
self.assertEqual(timeseries[0], (LastValueAggregator, "testname", 5))

# Ensures value observer aggregator is correctly converted to timeseries
def test_convert_from_value_observer(self):
pass
value_observer_record = self._generate_record(ValueObserverAggregator)
value_observer_record.aggregator.update(5)
value_observer_record.aggregator.update(1)
value_observer_record.aggregator.update(2)
value_observer_record.aggregator.take_checkpoint()

self._exporter.create_timeseries = self._converter_mock()
timeseries = self._exporter.convert_from_value_observer(
value_observer_record
)
self.assertEqual(
timeseries[0], (ValueObserverAggregator, "testname_min", 1)
)
self.assertEqual(
timeseries[1], (ValueObserverAggregator, "testname_max", 5)
)
self.assertEqual(
timeseries[2], (ValueObserverAggregator, "testname_sum", 8)
)
self.assertEqual(
timeseries[3], (ValueObserverAggregator, "testname_count", 3)
)
self.assertEqual(
timeseries[4], (ValueObserverAggregator, "testname_last", 2)
)

# Ensures quantile aggregator is correctly converted to timeseries
# TODO: Add test once method is implemented
Expand All @@ -148,7 +279,34 @@ def test_convert_from_quantile(self):

# Ensures timeseries produced contains appropriate sample and labels
def test_create_timeseries(self):
pass
sum_aggregator = SumAggregator()
sum_aggregator.update(5)
sum_aggregator.take_checkpoint()
sum_aggregator.last_update_timestamp = 10
export_record = ExportRecord(
self._test_metric,
get_dict_as_key({"record_name": "record_value"}),
sum_aggregator,
Resource({"resource_name": "resource_value"}),
)

expected_timeseries = TimeSeries()
expected_timeseries.labels.append(
self._exporter.create_label("__name__", "testname")
)
expected_timeseries.labels.append(
self._exporter.create_label("resource_name", "resource_value")
)
expected_timeseries.labels.append(
self._exporter.create_label("record_name", "record_value")
)
expected_timeseries.samples.append(
self._exporter.create_sample(10, 5.0),
)
timeseries = self._exporter.create_timeseries(
export_record, "testname", 5.0,
)
self.assertEqual(timeseries, expected_timeseries)


class TestExport(unittest.TestCase):
Expand Down

0 comments on commit ed1f7d8

Please sign in to comment.