Skip to content

Commit

Permalink
Merge branch 'master' into metric
Browse files Browse the repository at this point in the history
  • Loading branch information
lzchen authored Sep 25, 2020
2 parents d24043a + c1ec444 commit be3687c
Show file tree
Hide file tree
Showing 37 changed files with 636 additions and 178 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
name: Test

on: [push, pull_request]
on:
push:
branches-ignore:
- 'release/*'
pull_request:

jobs:
build:
Expand Down
5 changes: 5 additions & 0 deletions docs-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ sphinx~=2.4
sphinx-rtd-theme~=0.4
sphinx-autodoc-typehints~=1.10.2

# Need to install the api/sdk in the venv for autodoc. Modifying sys.path
# doesn't work for pkg_resources.
./opentelemetry-api
./opentelemetry-sdk

# Required by ext packages
asgiref~=3.0
asyncpg>=0.12.0
Expand Down
2 changes: 0 additions & 2 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@


source_dirs = [
os.path.abspath("../opentelemetry-api/src/"),
os.path.abspath("../opentelemetry-sdk/src/"),
os.path.abspath("../opentelemetry-instrumentation/src/"),
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,22 @@
import collections
import logging
import re
from typing import Sequence
from typing import Iterable, Optional, Sequence, Union

from prometheus_client import start_http_server
from prometheus_client.core import (
REGISTRY,
CollectorRegistry,
CounterMetricFamily,
SummaryMetricFamily,
UnknownMetricFamily,
)

from opentelemetry.metrics import Counter, Metric, ValueRecorder
from opentelemetry.metrics import Counter, ValueRecorder
from opentelemetry.sdk.metrics.export import (
MetricRecord,
MetricsExporter,
MetricsExportResult,
)
from opentelemetry.sdk.metrics.export.aggregate import MinMaxSumCountAggregator

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -110,8 +110,8 @@ def shutdown(self) -> None:


class CustomCollector:
""" CustomCollector represents the Prometheus Collector object
https://github.com/prometheus/client_python#custom-collectors
"""CustomCollector represents the Prometheus Collector object
https://github.com/prometheus/client_python#custom-collectors
"""

def __init__(self, prefix: str = ""):
Expand All @@ -121,7 +121,7 @@ def __init__(self, prefix: str = ""):
r"[^\w]", re.UNICODE | re.IGNORECASE
)

def add_metrics_data(self, metric_records: Sequence[MetricRecord]):
def add_metrics_data(self, metric_records: Sequence[MetricRecord]) -> None:
self._metrics_to_export.append(metric_records)

def collect(self):
Expand Down Expand Up @@ -152,34 +152,44 @@ def _translate_to_prometheus(self, metric_record: MetricRecord):
metric_name = self._prefix + "_"
metric_name += self._sanitize(metric_record.instrument.name)

description = getattr(metric_record.instrument, "description", "")
if isinstance(metric_record.instrument, Counter):
prometheus_metric = CounterMetricFamily(
name=metric_name,
documentation=metric_record.instrument.description,
labels=label_keys,
name=metric_name, documentation=description, labels=label_keys
)
prometheus_metric.add_metric(
labels=label_values, value=metric_record.aggregator.checkpoint
)
# TODO: Add support for histograms when supported in OT
elif isinstance(metric_record.instrument, ValueRecorder):
prometheus_metric = UnknownMetricFamily(
name=metric_name,
documentation=metric_record.instrument.description,
labels=label_keys,
)
prometheus_metric.add_metric(
labels=label_values, value=metric_record.aggregator.checkpoint
)
value = metric_record.aggregator.checkpoint
if isinstance(metric_record.aggregator, MinMaxSumCountAggregator):
prometheus_metric = SummaryMetricFamily(
name=metric_name,
documentation=description,
labels=label_keys,
)
prometheus_metric.add_metric(
labels=label_values,
count_value=value.count,
sum_value=value.sum,
)
else:
prometheus_metric = UnknownMetricFamily(
name=metric_name,
documentation=description,
labels=label_keys,
)
prometheus_metric.add_metric(labels=label_values, value=value)

else:
logger.warning(
"Unsupported metric type. %s", type(metric_record.instrument)
)
return prometheus_metric

def _sanitize(self, key):
""" sanitize the given metric name or label according to Prometheus rule.
def _sanitize(self, key: str) -> str:
"""sanitize the given metric name or label according to Prometheus rule.
Replace all characters other than [A-Za-z0-9_] with '_'.
"""
return self._non_letters_nor_digits_re.sub("_", key)
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import unittest
from unittest import mock

from prometheus_client import generate_latest
from prometheus_client.core import CounterMetricFamily

from opentelemetry.exporter.prometheus import (
Expand All @@ -24,7 +25,11 @@
from opentelemetry.metrics import get_meter_provider, set_meter_provider
from opentelemetry.sdk import metrics
from opentelemetry.sdk.metrics.export import MetricRecord, MetricsExportResult
from opentelemetry.sdk.metrics.export.aggregate import SumAggregator
from opentelemetry.sdk.metrics.export.aggregate import (
MinMaxSumCountAggregator,
SumAggregator,
)
from opentelemetry.sdk.util import get_dict_as_key


class TestPrometheusMetricExporter(unittest.TestCase):
Expand All @@ -35,7 +40,7 @@ def setUp(self):
"testname", "testdesc", "unit", int, metrics.Counter,
)
labels = {"environment": "staging"}
self._labels_key = metrics.get_dict_as_key(labels)
self._labels_key = get_dict_as_key(labels)

self._mock_registry_register = mock.Mock()
self._registry_register_patch = mock.patch(
Expand Down Expand Up @@ -70,13 +75,32 @@ def test_export(self):
self.assertEqual(len(exporter._collector._metrics_to_export), 1)
self.assertIs(result, MetricsExportResult.SUCCESS)

def test_min_max_sum_aggregator_to_prometheus(self):
meter = get_meter_provider().get_meter(__name__)
metric = meter.create_metric(
"test@name", "testdesc", "unit", int, metrics.ValueRecorder, []
)
labels = {}
key_labels = get_dict_as_key(labels)
aggregator = MinMaxSumCountAggregator()
aggregator.update(123)
aggregator.update(456)
aggregator.take_checkpoint()
record = MetricRecord(metric, key_labels, aggregator)
collector = CustomCollector("testprefix")
collector.add_metrics_data([record])
result_bytes = generate_latest(collector)
result = result_bytes.decode("utf-8")
self.assertIn("testprefix_test_name_count 2.0", result)
self.assertIn("testprefix_test_name_sum 579.0", result)

def test_counter_to_prometheus(self):
meter = get_meter_provider().get_meter(__name__)
metric = meter.create_metric(
"test@name", "testdesc", "unit", int, metrics.Counter,
)
labels = {"environment@": "staging", "os": "Windows"}
key_labels = metrics.get_dict_as_key(labels)
key_labels = get_dict_as_key(labels)
aggregator = SumAggregator()
aggregator.update(123)
aggregator.take_checkpoint()
Expand Down Expand Up @@ -107,7 +131,7 @@ def test_invalid_metric(self):
"tesname", "testdesc", "unit", int, StubMetric
)
labels = {"environment": "staging"}
key_labels = metrics.get_dict_as_key(labels)
key_labels = get_dict_as_key(labels)
record = MetricRecord(metric, key_labels, None)
collector = CustomCollector("testprefix")
collector.add_metrics_data([record])
Expand Down
3 changes: 3 additions & 0 deletions exporter/opentelemetry-exporter-zipkin/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

- Zipkin exporter now accepts a ``max_tag_value_length`` attribute to customize the
maximum allowed size a tag value can have. ([#1151](https://github.com/open-telemetry/opentelemetry-python/pull/1151))

## Version 0.13b0

Released 2020-09-17
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@

DEFAULT_RETRY = False
DEFAULT_URL = "http://localhost:9411/api/v2/spans"
DEFAULT_MAX_TAG_VALUE_LENGTH = 128
ZIPKIN_HEADERS = {"Content-Type": "application/json"}

SPAN_KIND_MAP = {
Expand Down Expand Up @@ -108,6 +109,7 @@ def __init__(
ipv4: Optional[str] = None,
ipv6: Optional[str] = None,
retry: Optional[str] = DEFAULT_RETRY,
max_tag_value_length: Optional[int] = DEFAULT_MAX_TAG_VALUE_LENGTH,
):
self.service_name = service_name
if url is None:
Expand All @@ -122,6 +124,7 @@ def __init__(
self.ipv4 = ipv4
self.ipv6 = ipv6
self.retry = retry
self.max_tag_value_length = max_tag_value_length

def export(self, spans: Sequence[Span]) -> SpanExportResult:
zipkin_spans = self._translate_to_zipkin(spans)
Expand All @@ -141,6 +144,9 @@ def export(self, spans: Sequence[Span]) -> SpanExportResult:
return SpanExportResult.FAILURE
return SpanExportResult.SUCCESS

def shutdown(self) -> None:
pass

def _translate_to_zipkin(self, spans: Sequence[Span]):

local_endpoint = {"serviceName": self.service_name, "port": self.port}
Expand Down Expand Up @@ -171,8 +177,10 @@ def _translate_to_zipkin(self, spans: Sequence[Span]):
"duration": duration_mus,
"localEndpoint": local_endpoint,
"kind": SPAN_KIND_MAP[span.kind],
"tags": _extract_tags_from_span(span),
"annotations": _extract_annotations_from_events(span.events),
"tags": self._extract_tags_from_span(span),
"annotations": self._extract_annotations_from_events(
span.events
),
}

if span.instrumentation_info is not None:
Expand All @@ -184,9 +192,9 @@ def _translate_to_zipkin(self, spans: Sequence[Span]):
] = span.instrumentation_info.version

if span.status is not None:
zipkin_span["tags"][
"otel.status_code"
] = span.status.canonical_code.value
zipkin_span["tags"]["otel.status_code"] = str(
span.status.canonical_code.value
)
if span.status.description is not None:
zipkin_span["tags"][
"otel.status_description"
Expand All @@ -205,42 +213,44 @@ def _translate_to_zipkin(self, spans: Sequence[Span]):
zipkin_spans.append(zipkin_span)
return zipkin_spans

def shutdown(self) -> None:
pass

def _extract_tags_from_dict(self, tags_dict):
tags = {}
if not tags_dict:
return tags
for attribute_key, attribute_value in tags_dict.items():
if isinstance(attribute_value, (int, bool, float)):
value = str(attribute_value)
elif isinstance(attribute_value, str):
value = attribute_value
else:
logger.warning("Could not serialize tag %s", attribute_key)
continue

if self.max_tag_value_length > 0:
value = value[: self.max_tag_value_length]
tags[attribute_key] = value
return tags

def _extract_tags_from_dict(tags_dict):
tags = {}
if not tags_dict:
def _extract_tags_from_span(self, span: Span):
tags = self._extract_tags_from_dict(getattr(span, "attributes", None))
if span.resource:
tags.update(self._extract_tags_from_dict(span.resource.attributes))
return tags
for attribute_key, attribute_value in tags_dict.items():
if isinstance(attribute_value, (int, bool, float)):
value = str(attribute_value)
elif isinstance(attribute_value, str):
value = attribute_value[:128]
else:
logger.warning("Could not serialize tag %s", attribute_key)
continue
tags[attribute_key] = value
return tags


def _extract_tags_from_span(span: Span):
tags = _extract_tags_from_dict(getattr(span, "attributes", None))
if span.resource:
tags.update(_extract_tags_from_dict(span.resource.attributes))
return tags


def _extract_annotations_from_events(events):
return (
[
{"timestamp": _nsec_to_usec_round(e.timestamp), "value": e.name}
for e in events
]
if events
else None
)

def _extract_annotations_from_events(
self, events
): # pylint: disable=R0201
return (
[
{
"timestamp": _nsec_to_usec_round(e.timestamp),
"value": e.name,
}
for e in events
]
if events
else None
)


def _nsec_to_usec_round(nsec):
Expand Down
Loading

0 comments on commit be3687c

Please sign in to comment.