diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py b/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py index ebbc79507d..b841c6600a 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py @@ -11,9 +11,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import re from typing import Dict, Sequence +from opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2 import ( + WriteRequest, +) from opentelemetry.exporter.prometheus_remote_write.gen.types_pb2 import ( Label, Sample, @@ -24,6 +27,13 @@ MetricsExporter, MetricsExportResult, ) +from opentelemetry.sdk.metrics.export.aggregate import ( + HistogramAggregator, + LastValueAggregator, + MinMaxSumCountAggregator, + SumAggregator, + ValueObserverAggregator, +) class PrometheusRemoteWriteMetricsExporter(MetricsExporter): @@ -131,31 +141,88 @@ def shutdown(self) -> None: def convert_to_timeseries( self, export_records: Sequence[ExportRecord] ) -> Sequence[TimeSeries]: - raise NotImplementedError() + converter_map = { + MinMaxSumCountAggregator: self.convert_from_min_max_sum_count, + SumAggregator: self.convert_from_sum, + HistogramAggregator: self.convert_from_histogram, + LastValueAggregator: self.convert_from_last_value, + ValueObserverAggregator: self.convert_from_last_value, + } + timeseries = [] + for export_record in export_records: + aggregator_type = type(export_record.aggregator) + converter = converter_map.get(aggregator_type) + if not converter: + raise ValueError( + str(aggregator_type) + " conversion is not supported" + ) + timeseries.extend(converter(export_record)) + return timeseries def convert_from_sum(self, sum_record: ExportRecord) -> TimeSeries: - raise NotImplementedError() + name = sum_record.instrument.name + value = sum_record.aggregator.checkpoint + return [self.create_timeseries(sum_record, name, value)] def convert_from_min_max_sum_count( self, min_max_sum_count_record: ExportRecord ) -> TimeSeries: - raise NotImplementedError() + timeseries = [] + agg_types = ["min", "max", "sum", "count"] + for agg_type in agg_types: + name = min_max_sum_count_record.instrument.name + "_" + agg_type + value = getattr( + min_max_sum_count_record.aggregator.checkpoint, agg_type + ) + timeseries.append( + self.create_timeseries(min_max_sum_count_record, name, value) + ) + return timeseries def convert_from_histogram( self, histogram_record: ExportRecord ) -> TimeSeries: - raise NotImplementedError() + count = 0 + timeseries = [] + for bound in histogram_record.aggregator.checkpoint.keys(): + bb = "+Inf" if bound == float("inf") else str(bound) + name = ( + histogram_record.instrument.name + '_bucket{le="' + bb + '"}' + ) + value = histogram_record.aggregator.checkpoint[bound] + timeseries.append( + self.create_timeseries(histogram_record, name, value) + ) + count += value + name = histogram_record.instrument.name + "_count" + timeseries.append( + self.create_timeseries(histogram_record, name, float(count)) + ) + return timeseries def convert_from_last_value( self, last_value_record: ExportRecord ) -> TimeSeries: - raise NotImplementedError() + name = last_value_record.instrument.name + value = last_value_record.aggregator.checkpoint + return [self.create_timeseries(last_value_record, name, value)] def convert_from_value_observer( self, value_observer_record: ExportRecord ) -> TimeSeries: - raise NotImplementedError() + timeseries = [] + agg_types = ["min", "max", "sum", "count", "last"] + for agg_type in agg_types: + name = value_observer_record.instrument.name + "_" + agg_type + value = getattr( + value_observer_record.aggregator.checkpoint, agg_type + ) + timeseries.append( + self.create_timeseries(value_observer_record, name, value) + ) + return timeseries + # TODO: Implement convert from quantile once supported by SDK for Prometheus Summaries def convert_from_quantile( self, summary_record: ExportRecord ) -> TimeSeries: @@ -165,13 +232,37 @@ def convert_from_quantile( def create_timeseries( self, export_record: ExportRecord, name, value: float ) -> TimeSeries: - raise NotImplementedError() + timeseries = TimeSeries() + # Add name label, record labels and resource labels + timeseries.labels.append(self.create_label("__name__", name)) + resource_attributes = export_record.resource.attributes + for label_name, label_value in resource_attributes.items(): + timeseries.labels.append( + self.create_label(label_name, label_value) + ) + for label in export_record.labels: + if label[0] not in resource_attributes.keys(): + timeseries.labels.append(self.create_label(label[0], label[1])) + # Add sample + timeseries.samples.append( + self.create_sample( + export_record.aggregator.last_update_timestamp, value + ) + ) + return timeseries def create_sample(self, timestamp: int, value: float) -> Sample: - raise NotImplementedError() + sample = Sample() + sample.timestamp = int(timestamp / 1000000) + sample.value = value + return sample def create_label(self, name: str, value: str) -> Label: - raise NotImplementedError() + label = Label() + # Label name must contain only alphanumeric characters and underscores + label.name = re.sub("[^0-9a-zA-Z_]+", "_", name) + label.value = value + return label def build_message(self, timeseries: Sequence[TimeSeries]) -> bytes: raise NotImplementedError() diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py b/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py index fa4de62a63..8cdd3b46b4 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py @@ -13,10 +13,25 @@ # limitations under the License. import unittest +from unittest import mock from opentelemetry.exporter.prometheus_remote_write import ( PrometheusRemoteWriteMetricsExporter, ) +from opentelemetry.exporter.prometheus_remote_write.gen.types_pb2 import ( + TimeSeries, +) +from opentelemetry.sdk.metrics import Counter +from opentelemetry.sdk.metrics.export import ExportRecord, MetricsExportResult +from opentelemetry.sdk.metrics.export.aggregate import ( + HistogramAggregator, + LastValueAggregator, + MinMaxSumCountAggregator, + SumAggregator, + ValueObserverAggregator, +) +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.util import get_dict_as_key class TestValidation(unittest.TestCase): @@ -90,35 +105,151 @@ def test_invalid_conflicting_auth_param(self): class TestConversion(unittest.TestCase): # Initializes test data that is reused across tests def setUp(self): - pass + self._test_metric = Counter( + "testname", "testdesc", "testunit", int, None + ) + self._exporter = PrometheusRemoteWriteMetricsExporter( + endpoint="/prom/test_endpoint" + ) + + def generate_record(aggregator_type): + return ExportRecord( + self._test_metric, None, aggregator_type(), Resource({}), + ) + + self._generate_record = generate_record + + def converter_method(record, name, value): + return (type(record.aggregator), name, value) + + self._converter_mock = mock.MagicMock(return_value=converter_method) # Ensures conversion to timeseries function works with valid aggregation types def test_valid_convert_to_timeseries(self): - pass + timeseries_mock_method = mock.Mock(return_value=["test_value"]) + self._exporter.convert_from_sum = timeseries_mock_method + self._exporter.convert_from_min_max_sum_count = timeseries_mock_method + self._exporter.convert_from_histogram = timeseries_mock_method + self._exporter.convert_from_last_value = timeseries_mock_method + self._exporter.convert_from_value_observer = timeseries_mock_method + test_records = [ + self._generate_record(SumAggregator), + self._generate_record(MinMaxSumCountAggregator), + self._generate_record(HistogramAggregator), + self._generate_record(LastValueAggregator), + self._generate_record(ValueObserverAggregator), + ] + data = self._exporter.convert_to_timeseries(test_records) + self.assertEqual(len(data), 5) + for timeseries in data: + self.assertEqual(timeseries, "test_value") + + no_type_records = [self._generate_record(lambda: None)] + with self.assertRaises(ValueError): + self._exporter.convert_to_timeseries(no_type_records) # Ensures conversion to timeseries fails for unsupported aggregation types def test_invalid_convert_to_timeseries(self): - pass + no_type_records = [self._generate_record(lambda: None)] + with self.assertRaises(ValueError): + self._exporter.convert_to_timeseries(no_type_records) # Ensures sum aggregator is correctly converted to timeseries def test_convert_from_sum(self): - pass + sum_record = self._generate_record(SumAggregator) + sum_record.aggregator.update(3) + sum_record.aggregator.update(2) + sum_record.aggregator.take_checkpoint() + + self._exporter.create_timeseries = self._converter_mock() + timeseries = self._exporter.convert_from_sum(sum_record) + self.assertEqual(timeseries[0], (SumAggregator, "testname", 5)) # Ensures sum min_max_count aggregator is correctly converted to timeseries def test_convert_from_min_max_sum_count(self): - pass + min_max_sum_count_record = self._generate_record( + MinMaxSumCountAggregator + ) + min_max_sum_count_record.aggregator.update(5) + min_max_sum_count_record.aggregator.update(1) + min_max_sum_count_record.aggregator.take_checkpoint() + + self._exporter.create_timeseries = self._converter_mock() + timeseries = self._exporter.convert_from_min_max_sum_count( + min_max_sum_count_record + ) + self.assertEqual( + timeseries[0], (MinMaxSumCountAggregator, "testname_min", 1) + ) + self.assertEqual( + timeseries[1], (MinMaxSumCountAggregator, "testname_max", 5) + ) + self.assertEqual( + timeseries[2], (MinMaxSumCountAggregator, "testname_sum", 6) + ) + self.assertEqual( + timeseries[3], (MinMaxSumCountAggregator, "testname_count", 2) + ) # Ensures histogram aggregator is correctly converted to timeseries def test_convert_from_histogram(self): - pass + histogram_record = self._generate_record(HistogramAggregator) + histogram_record.aggregator.update(5) + histogram_record.aggregator.update(2) + histogram_record.aggregator.update(-1) + histogram_record.aggregator.take_checkpoint() + + self._exporter.create_timeseries = self._converter_mock() + timeseries = self._exporter.convert_from_histogram(histogram_record) + self.assertEqual( + timeseries[0], (HistogramAggregator, 'testname_bucket{le="0"}', 1) + ) + self.assertEqual( + timeseries[1], + (HistogramAggregator, 'testname_bucket{le="+Inf"}', 2), + ) + self.assertEqual( + timeseries[2], (HistogramAggregator, "testname_count", 3) + ) # Ensures last value aggregator is correctly converted to timeseries def test_convert_from_last_value(self): - pass + last_value_record = self._generate_record(LastValueAggregator) + last_value_record.aggregator.update(1) + last_value_record.aggregator.update(5) + last_value_record.aggregator.take_checkpoint() + + self._exporter.create_timeseries = self._converter_mock() + timeseries = self._exporter.convert_from_last_value(last_value_record) + self.assertEqual(timeseries[0], (LastValueAggregator, "testname", 5)) # Ensures value observer aggregator is correctly converted to timeseries def test_convert_from_value_observer(self): - pass + value_observer_record = self._generate_record(ValueObserverAggregator) + value_observer_record.aggregator.update(5) + value_observer_record.aggregator.update(1) + value_observer_record.aggregator.update(2) + value_observer_record.aggregator.take_checkpoint() + + self._exporter.create_timeseries = self._converter_mock() + timeseries = self._exporter.convert_from_value_observer( + value_observer_record + ) + self.assertEqual( + timeseries[0], (ValueObserverAggregator, "testname_min", 1) + ) + self.assertEqual( + timeseries[1], (ValueObserverAggregator, "testname_max", 5) + ) + self.assertEqual( + timeseries[2], (ValueObserverAggregator, "testname_sum", 8) + ) + self.assertEqual( + timeseries[3], (ValueObserverAggregator, "testname_count", 3) + ) + self.assertEqual( + timeseries[4], (ValueObserverAggregator, "testname_last", 2) + ) # Ensures quantile aggregator is correctly converted to timeseries # TODO: Add test once method is implemented @@ -127,7 +258,34 @@ def test_convert_from_quantile(self): # Ensures timeseries produced contains appropriate sample and labels def test_create_timeseries(self): - pass + sum_aggregator = SumAggregator() + sum_aggregator.update(5) + sum_aggregator.take_checkpoint() + sum_aggregator.last_update_timestamp = 10 + export_record = ExportRecord( + self._test_metric, + get_dict_as_key({"record_name": "record_value"}), + sum_aggregator, + Resource({"resource_name": "resource_value"}), + ) + + expected_timeseries = TimeSeries() + expected_timeseries.labels.append( + self._exporter.create_label("__name__", "testname") + ) + expected_timeseries.labels.append( + self._exporter.create_label("resource_name", "resource_value") + ) + expected_timeseries.labels.append( + self._exporter.create_label("record_name", "record_value") + ) + expected_timeseries.samples.append( + self._exporter.create_sample(10, 5.0), + ) + timeseries = self._exporter.create_timeseries( + export_record, "testname", 5.0, + ) + self.assertEqual(timeseries, expected_timeseries) class TestExport(unittest.TestCase):