Skip to content

Commit

Permalink
Ran linter
Browse files Browse the repository at this point in the history
  • Loading branch information
Seefooo committed Sep 27, 2022
1 parent 9c3766f commit b1443be
Show file tree
Hide file tree
Showing 7 changed files with 517 additions and 301 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@
import psutil

from opentelemetry import metrics

from opentelemetry.exporter.prometheus_remote_write import (
PrometheusRemoteWriteMetricsExporter,
)
from opentelemetry.metrics import (
Observation,
get_meter_provider,
set_meter_provider,
)
from opentelemetry.exporter.prometheus_remote_write import (
PrometheusRemoteWriteMetricsExporter,
)
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader

Expand All @@ -29,7 +28,7 @@
endpoint="http://cortex:9009/api/prom/push",
headers={"X-Scope-Org-ID": "5"},
)
reader = PeriodicExportingMetricReader(exporter,1000)
reader = PeriodicExportingMetricReader(exporter, 1000)
provider = MeterProvider(metric_readers=[reader])
metrics.set_meter_provider(provider)
meter = metrics.get_meter(__name__)
Expand Down Expand Up @@ -98,7 +97,7 @@ def get_ram_usage_callback(observer):
# updown counter
requests_active.add(num % 7231 + 200, testing_labels)

request_latency.record(num % 92,testing_labels)
request_latency.record(num % 92, testing_labels)
logger.log(level=INFO, msg="completed metrics collection cycle")
time.sleep(1)
num += 9791
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@

import logging
import re
from typing import Dict, Sequence

from collections import defaultdict
from itertools import chain
from typing import Dict, Sequence

import requests
import snappy

Expand All @@ -29,29 +29,30 @@
Sample,
TimeSeries,
)
from opentelemetry.sdk.metrics import Counter
from opentelemetry.sdk.metrics import Histogram as ClientHistogram
from opentelemetry.sdk.metrics import (
ObservableCounter,
ObservableGauge,
ObservableUpDownCounter,
UpDownCounter,
)
from opentelemetry.sdk.metrics.export import (
AggregationTemporality,
Gauge,
Sum,
Histogram,
Metric,
MetricExporter,
MetricExportResult,
MetricsData,
Metric,
)
from opentelemetry.sdk.metrics import (
Counter,
Histogram as ClientHistogram,
ObservableCounter,
ObservableGauge,
ObservableUpDownCounter,
UpDownCounter,
Sum,
)

logger = logging.getLogger(__name__)

PROMETHEUS_NAME_REGEX = re.compile(r'[^\w:]')
PROMETHEUS_LABEL_REGEX = re.compile(r'[^\w]')
PROMETHEUS_NAME_REGEX = re.compile(r"[^\w:]")
PROMETHEUS_LABEL_REGEX = re.compile(r"[^\w]")


class PrometheusRemoteWriteMetricsExporter(MetricExporter):
"""
Expand All @@ -74,7 +75,7 @@ def __init__(
timeout: int = 30,
tls_config: Dict = None,
proxies: Dict = None,
resources_as_labels : bool = True,
resources_as_labels: bool = True,
preferred_temporality: Dict[type, AggregationTemporality] = None,
preferred_aggregation: Dict = None,
):
Expand All @@ -95,9 +96,8 @@ def __init__(
ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
ObservableGauge: AggregationTemporality.CUMULATIVE,
}
logger.error("Calling MetricExporter")

super().__init__(preferred_temporality,preferred_aggregation)
super().__init__(preferred_temporality, preferred_aggregation)

@property
def endpoint(self):
Expand Down Expand Up @@ -180,9 +180,9 @@ def headers(self, headers: Dict):

def export(
self,
metrics_data : MetricsData,
metrics_data: MetricsData,
timeout_millis: float = 10_000,
) ->MetricExportResult:
) -> MetricExportResult:
if not metrics_data:
return MetricExportResult.SUCCESS
timeseries = self._translate_data(metrics_data)
Expand All @@ -203,121 +203,129 @@ def _translate_data(self, data: MetricsData) -> Sequence[TimeSeries]:
# OTLP Data model suggests combining some attrs into job/instance
# Should we do that here?
if self.resources_as_labels:
resource_labels = [ (n,str(v)) for n,v in resource.attributes.items() ]
resource_labels = [
(n, str(v)) for n, v in resource.attributes.items()
]
else:
resource_labels = []
# Scope name/version probably not too useful from a labeling perspective
for scope_metrics in resource_metrics.scope_metrics:
for metric in scope_metrics.metrics:
rw_timeseries.extend( self._parse_metric(metric,resource_labels) )
rw_timeseries.extend(
self._parse_metric(metric, resource_labels)
)
return rw_timeseries

def _parse_metric(self, metric: Metric, resource_labels: Sequence) -> Sequence[TimeSeries]:
def _parse_metric(
self, metric: Metric, resource_labels: Sequence
) -> Sequence[TimeSeries]:
"""
Parses the Metric & lower objects, then converts the output into
OM TimeSeries. Returns a List of TimeSeries objects based on one Metric
"""


# Create the metric name, will be a label later
if metric.unit:
#Prom. naming guidelines add unit to the name
name =f"{metric.name}_{metric.unit}"
# Prom. naming guidelines add unit to the name
name = f"{metric.name}_{metric.unit}"
else:
name = metric.name

# datapoints have attributes associated with them. these would be sent
# to RW as different metrics: name & labels is a unique time series
sample_sets = defaultdict(list)
if isinstance(metric.data,(Gauge,Sum)):
if isinstance(metric.data, (Gauge, Sum)):
for dp in metric.data.data_points:
attrs,sample = self._parse_data_point(dp,name)
attrs, sample = self._parse_data_point(dp, name)
sample_sets[attrs].append(sample)
elif isinstance(metric.data,Histogram):
elif isinstance(metric.data, Histogram):
for dp in metric.data.data_points:
dp_result = self._parse_histogram_data_point(dp,name)
for attrs,sample in dp_result:
dp_result = self._parse_histogram_data_point(dp, name)
for attrs, sample in dp_result:
sample_sets[attrs].append(sample)
else:
logger.warn("Unsupported Metric Type: %s",type(metric.data))
logger.warn("Unsupported Metric Type: %s", type(metric.data))
return []

timeseries = []
for labels, samples in sample_sets.items():
ts = TimeSeries()
for label_name,label_value in chain(resource_labels,labels):
for label_name, label_value in chain(resource_labels, labels):
# Previous implementation did not str() the names...
ts.labels.append(self._label(label_name,str(label_value)))
for value,timestamp in samples:
ts.samples.append(self._sample(value,timestamp))
ts.labels.append(self._label(label_name, str(label_value)))
for value, timestamp in samples:
ts.samples.append(self._sample(value, timestamp))
timeseries.append(ts)
return timeseries

def _sample(self,value: int,timestamp :int) -> Sample:
def _sample(self, value: int, timestamp: int) -> Sample:
sample = Sample()
sample.value = value
sample.timestamp = timestamp
return sample

def _label(self,name:str,value:str) -> Label:
def _label(self, name: str, value: str) -> Label:
label = Label()
label.name = PROMETHEUS_LABEL_REGEX.sub("_",name)
label.name = PROMETHEUS_LABEL_REGEX.sub("_", name)
label.value = value
return label

def _sanitize_name(self,name):
def _sanitize_name(self, name):
# I Think Prometheus requires names to NOT start with a number this
# would not catch that, but do cover the other cases. The naming rules
# don't explicit say this, but the supplied regex implies it.
# Got a little weird trying to do substitution with it, but can be
# fixed if we allow numeric beginnings to metric names
return PROMETHEUS_NAME_REGEX.sub("_",name)
return PROMETHEUS_NAME_REGEX.sub("_", name)

def _parse_histogram_data_point(self, data_point, name):

#if (len(data_point.explicit_bounds)+1) != len(data_point.bucket_counts):
# if (len(data_point.explicit_bounds)+1) != len(data_point.bucket_counts):
# raise ValueError("Number of buckets must be 1 more than the explicit bounds!")

sample_attr_pairs = []

base_attrs = [(n,v) for n,v in data_point.attributes.items()]
base_attrs = [(n, v) for n, v in data_point.attributes.items()]
timestamp = data_point.time_unix_nano // 1_000_000


def handle_bucket(value,bound=None,name_override=None):
def handle_bucket(value, bound=None, name_override=None):
# Metric Level attributes + the bucket boundry attribute + name
ts_attrs = base_attrs.copy()
ts_attrs.append(("__name__",self._sanitize_name(name_override or name)))
ts_attrs.append(
("__name__", self._sanitize_name(name_override or name))
)
if bound:
ts_attrs.append(("le",str(bound)))
ts_attrs.append(("le", str(bound)))
# Value is count of values in each bucket
ts_sample = (value,timestamp)
ts_sample = (value, timestamp)
return tuple(ts_attrs), ts_sample

for bound_pos,bound in enumerate(data_point.explicit_bounds):
for bound_pos, bound in enumerate(data_point.explicit_bounds):
sample_attr_pairs.append(
handle_bucket(data_point.bucket_counts[bound_pos],bound)
handle_bucket(data_point.bucket_counts[bound_pos], bound)
)

# Add the last label for implicit +inf bucket
sample_attr_pairs.append(
handle_bucket(data_point.bucket_counts[-1],bound="+Inf")
handle_bucket(data_point.bucket_counts[-1], bound="+Inf")
)

#Lastly, add series for count & sum
# Lastly, add series for count & sum
sample_attr_pairs.append(
handle_bucket(data_point.sum,name_override=f"{name}_sum")
handle_bucket(data_point.sum, name_override=f"{name}_sum")
)
sample_attr_pairs.append(
handle_bucket(data_point.count,name_override=f"{name}_count")
handle_bucket(data_point.count, name_override=f"{name}_count")
)
return sample_attr_pairs

def _parse_data_point(self, data_point,name=None):
def _parse_data_point(self, data_point, name=None):

attrs = tuple(data_point.attributes.items()) + (("__name__",self._sanitize_name(name)),)
sample = (data_point.value,(data_point.time_unix_nano // 1_000_000))
return attrs,sample
attrs = tuple(data_point.attributes.items()) + (
("__name__", self._sanitize_name(name)),
)
sample = (data_point.value, (data_point.time_unix_nano // 1_000_000))
return attrs, sample

# pylint: disable=no-member,no-self-use
def _build_message(self, timeseries: Sequence[TimeSeries]) -> bytes:
Expand Down Expand Up @@ -383,4 +391,3 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool:

def shutdown(self) -> None:
pass

Loading

0 comments on commit b1443be

Please sign in to comment.