Skip to content

Commit

Permalink
feat: Metrics SDK - aggregator, batcher, controller (open-telemetry#738)
Browse files Browse the repository at this point in the history
* add Batcher and Aggregator

* JSDoc comments

* final patch

* Descriptor to MetricDescriptor

* merge createMeasure PR and fix the tests

* move Aggregator interface to types
  • Loading branch information
mayurkale22 authored Feb 18, 2020
1 parent 014b324 commit d35ac4f
Show file tree
Hide file tree
Showing 15 changed files with 674 additions and 737 deletions.
89 changes: 43 additions & 46 deletions packages/opentelemetry-exporter-prometheus/src/prometheus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@
import { ExportResult } from '@opentelemetry/base';
import { NoopLogger } from '@opentelemetry/core';
import {
LabelValue,
MetricDescriptor,
MetricDescriptorType,
MetricExporter,
ReadableMetric,
MetricRecord,
MetricDescriptor,
LastValue,
MetricKind,
Sum,
} from '@opentelemetry/metrics';
import * as types from '@opentelemetry/api';
import { createServer, IncomingMessage, Server, ServerResponse } from 'http';
import { Counter, Gauge, labelValues, Metric, Registry } from 'prom-client';
import * as url from 'url';
import { ExporterConfig } from './export/types';
import { LabelSet } from '@opentelemetry/metrics/build/src/LabelSet';
import { CounterSumAggregator } from '@opentelemetry/metrics/build/src/export/Aggregator';

export class PrometheusExporter implements MetricExporter {
static readonly DEFAULT_OPTIONS = {
Expand Down Expand Up @@ -81,13 +84,10 @@ export class PrometheusExporter implements MetricExporter {
* be a no-op and the exporter should reach into the metrics when the export endpoint is
* called. As there is currently no interface to do this, this is our only option.
*
* @param readableMetrics Metrics to be sent to the prometheus backend
* @param records Metrics to be sent to the prometheus backend
* @param cb result callback to be called on finish
*/
export(
readableMetrics: ReadableMetric[],
cb: (result: ExportResult) => void
) {
export(records: MetricRecord[], cb: (result: ExportResult) => void) {
if (!this._server) {
// It is conceivable that the _server may not be started as it is an async startup
// However unlikely, if this happens the caller may retry the export
Expand All @@ -97,8 +97,8 @@ export class PrometheusExporter implements MetricExporter {

this._logger.debug('Prometheus exporter export');

for (const readableMetric of readableMetrics) {
this._updateMetric(readableMetric);
for (const record of records) {
this._updateMetric(record);
}

cb(ExportResult.SUCCESS);
Expand All @@ -117,51 +117,53 @@ export class PrometheusExporter implements MetricExporter {
/**
* Updates the value of a single metric in the registry
*
* @param readableMetric Metric value to be saved
* @param record Metric value to be saved
*/
private _updateMetric(readableMetric: ReadableMetric) {
const metric = this._registerMetric(readableMetric);
private _updateMetric(record: MetricRecord) {
const metric = this._registerMetric(record);
if (!metric) return;

const labelKeys = readableMetric.descriptor.labelKeys;
const labelKeys = record.descriptor.labelKeys;
const value = record.aggregator.value();

if (metric instanceof Counter) {
for (const ts of readableMetric.timeseries) {
// Prometheus counter saves internal state and increments by given value.
// ReadableMetric value is the current state, not the delta to be incremented by.
// Currently, _registerMetric creates a new counter every time the value changes,
// so the increment here behaves as a set value (increment from 0)
metric.inc(
this._getLabelValues(labelKeys, ts.labelValues),
ts.points[0].value as number
);
}
// Prometheus counter saves internal state and increments by given value.
// ReadableMetric value is the current state, not the delta to be incremented by.
// Currently, _registerMetric creates a new counter every time the value changes,
// so the increment here behaves as a set value (increment from 0)
metric.inc(this._getLabelValues(labelKeys, record.labels), value as Sum);
}

if (metric instanceof Gauge) {
for (const ts of readableMetric.timeseries) {
if (record.aggregator instanceof CounterSumAggregator) {
metric.set(
this._getLabelValues(labelKeys, record.labels),
value as Sum
);
} else {
metric.set(
this._getLabelValues(labelKeys, ts.labelValues),
ts.points[0].value as number
this._getLabelValues(labelKeys, record.labels),
(value as LastValue).value
);
}
}

// TODO: only counter and gauge are implemented in metrics so far
}

private _getLabelValues(keys: string[], values: LabelValue[]) {
private _getLabelValues(keys: string[], values: LabelSet) {
const labelValues: labelValues = {};
const labels = values.labels;
for (let i = 0; i < keys.length; i++) {
if (values[i].value !== null) {
labelValues[keys[i]] = values[i].value!;
if (labels[keys[i]] !== null) {
labelValues[keys[i]] = labels[keys[i]];
}
}
return labelValues;
}

private _registerMetric(readableMetric: ReadableMetric): Metric | undefined {
const metricName = this._getPrometheusMetricName(readableMetric.descriptor);
private _registerMetric(record: MetricRecord): Metric | undefined {
const metricName = this._getPrometheusMetricName(record.descriptor);
const metric = this._registry.getSingleMetric(metricName);

/**
Expand All @@ -177,31 +179,26 @@ export class PrometheusExporter implements MetricExporter {
this._registry.removeSingleMetric(metricName);
} else if (metric) return metric;

return this._newMetric(readableMetric, metricName);
return this._newMetric(record, metricName);
}

private _newMetric(
readableMetric: ReadableMetric,
name: string
): Metric | undefined {
private _newMetric(record: MetricRecord, name: string): Metric | undefined {
const metricObject = {
name,
// prom-client throws with empty description which is our default
help: readableMetric.descriptor.description || 'description missing',
labelNames: readableMetric.descriptor.labelKeys,
help: record.descriptor.description || 'description missing',
labelNames: record.descriptor.labelKeys,
// list of registries to register the newly created metric
registers: [this._registry],
};

switch (readableMetric.descriptor.type) {
case MetricDescriptorType.COUNTER_DOUBLE:
case MetricDescriptorType.COUNTER_INT64:
switch (record.descriptor.metricKind) {
case MetricKind.COUNTER:
// there is no such thing as a non-monotonic counter in prometheus
return readableMetric.descriptor.monotonic
return record.descriptor.monotonic
? new Counter(metricObject)
: new Gauge(metricObject);
case MetricDescriptorType.GAUGE_DOUBLE:
case MetricDescriptorType.GAUGE_INT64:
case MetricKind.GAUGE:
return new Gauge(metricObject);
default:
// Other metric types are currently unimplemented
Expand Down
33 changes: 21 additions & 12 deletions packages/opentelemetry-exporter-prometheus/test/prometheus.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,13 @@ describe('PrometheusExporter', () => {

const boundCounter = counter.bind(meter.labels({ key1: 'labelValue1' }));
boundCounter.add(10);
exporter.export(meter.getMetrics(), () => {
meter.collect();
exporter.export(meter.getBatcher().checkPointSet(), () => {
// This is to test the special case where counters are destroyed
// and recreated in the exporter in order to get around prom-client's
// aggregation and use ours.
boundCounter.add(10);
exporter.export(meter.getMetrics(), () => {
exporter.export(meter.getBatcher().checkPointSet(), () => {
http
.get('http://localhost:9464/metrics', res => {
res.on('data', chunk => {
Expand Down Expand Up @@ -227,7 +228,8 @@ describe('PrometheusExporter', () => {

const boundGauge = gauge.bind(meter.labels({ key1: 'labelValue1' }));
boundGauge.set(10);
exporter.export([gauge.get()!], () => {
meter.collect();
exporter.export(meter.getBatcher().checkPointSet(), () => {
http
.get('http://localhost:9464/metrics', res => {
res.on('data', chunk => {
Expand Down Expand Up @@ -259,9 +261,10 @@ describe('PrometheusExporter', () => {
labelKeys: ['counterKey1'],
}) as CounterMetric;

gauge.bind(meter.labels({ key1: 'labelValue1' })).set(10);
counter.bind(meter.labels({ key1: 'labelValue1' })).add(10);
exporter.export([gauge.get()!, counter.get()!], () => {
gauge.bind(meter.labels({ gaugeKey1: 'labelValue1' })).set(10);
counter.bind(meter.labels({ counterKey1: 'labelValue1' })).add(10);
meter.collect();
exporter.export(meter.getBatcher().checkPointSet(), () => {
http
.get('http://localhost:9464/metrics', res => {
res.on('data', chunk => {
Expand Down Expand Up @@ -308,7 +311,8 @@ describe('PrometheusExporter', () => {

const boundGauge = gauge.bind(meter.labels({ key1: 'labelValue1' }));
boundGauge.set(10);
exporter.export([gauge.get()!], () => {
meter.collect();
exporter.export(meter.getBatcher().checkPointSet(), () => {
http
.get('http://localhost:9464/metrics', res => {
res.on('data', chunk => {
Expand All @@ -333,7 +337,8 @@ describe('PrometheusExporter', () => {
const gauge = meter.createGauge('gauge.bad-name') as GaugeMetric;
const boundGauge = gauge.bind(meter.labels({ key1: 'labelValue1' }));
boundGauge.set(10);
exporter.export([gauge.get()!], () => {
meter.collect();
exporter.export(meter.getBatcher().checkPointSet(), () => {
http
.get('http://localhost:9464/metrics', res => {
res.on('data', chunk => {
Expand Down Expand Up @@ -362,7 +367,8 @@ describe('PrometheusExporter', () => {
});

counter.bind(meter.labels({ key1: 'labelValue1' })).add(20);
exporter.export(meter.getMetrics(), () => {
meter.collect();
exporter.export(meter.getBatcher().checkPointSet(), () => {
http
.get('http://localhost:9464/metrics', res => {
res.on('data', chunk => {
Expand Down Expand Up @@ -407,7 +413,8 @@ describe('PrometheusExporter', () => {
});

exporter.startServer(() => {
exporter!.export(meter.getMetrics(), () => {
meter.collect();
exporter!.export(meter.getBatcher().checkPointSet(), () => {
http
.get('http://localhost:9464/metrics', res => {
res.on('data', chunk => {
Expand Down Expand Up @@ -435,7 +442,8 @@ describe('PrometheusExporter', () => {
});

exporter.startServer(() => {
exporter!.export(meter.getMetrics(), () => {
meter.collect();
exporter!.export(meter.getBatcher().checkPointSet(), () => {
http
.get('http://localhost:8080/metrics', res => {
res.on('data', chunk => {
Expand Down Expand Up @@ -463,7 +471,8 @@ describe('PrometheusExporter', () => {
});

exporter.startServer(() => {
exporter!.export(meter.getMetrics(), () => {
meter.collect();
exporter!.export(meter.getBatcher().checkPointSet(), () => {
http
.get('http://localhost:9464/test', res => {
res.on('data', chunk => {
Expand Down
Loading

0 comments on commit d35ac4f

Please sign in to comment.