Skip to content

Commit

Permalink
feat(metric-reader): add metric-reader (#2681)
Browse files Browse the repository at this point in the history
  • Loading branch information
pichlermarc authored Jan 7, 2022
1 parent d61f7be commit 354c002
Show file tree
Hide file tree
Showing 7 changed files with 658 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,52 +14,143 @@
* limitations under the License.
*/

import * as api from '@opentelemetry/api';
import { AggregationTemporality } from './AggregationTemporality';
import { MetricExporter } from './MetricExporter';
import { MetricProducer } from './MetricProducer';
import { MetricData } from './MetricData';
import { callWithTimeout } from '../utils';

export type ReaderOptions = {
timeoutMillis?: number
}

export type ReaderCollectionOptions = ReaderOptions;

export type ReaderShutdownOptions = ReaderOptions;

export type ReaderForceFlushOptions = ReaderOptions;

// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricreader

/**
* A registered reader of metrics that, when linked to a {@link MetricProducer}, offers global
* control over metrics.
*/
export abstract class MetricReader {
// Tracks the shutdown state.
// TODO: use BindOncePromise here once a new version of @opentelemetry/core is available.
private _shutdown = false;
// MetricProducer used by this instance.
private _metricProducer?: MetricProducer;

constructor(private _exporter: MetricExporter) {}
constructor(private readonly _preferredAggregationTemporality = AggregationTemporality.CUMULATIVE) {
}

/**
* Set the {@link MetricProducer} used by this instance.
*
* @param metricProducer
*/
setMetricProducer(metricProducer: MetricProducer) {
this._metricProducer = metricProducer;
this.onInitialized();
}

/**
* Get the {@link AggregationTemporality} preferred by this {@link MetricReader}
*/
getPreferredAggregationTemporality(): AggregationTemporality {
return this._exporter.getPreferredAggregationTemporality();
return this._preferredAggregationTemporality;
}

/**
* Handle once the SDK has initialized this {@link MetricReader}
* Overriding this method is optional.
*/
protected onInitialized(): void {
// Default implementation is empty.
}

async collect(): Promise<void> {
/**
* Handle a shutdown signal by the SDK.
*
* <p> For push exporters, this should shut down any intervals and close any open connections.
* @protected
*/
protected abstract onShutdown(): Promise<void>;

/**
* Handle a force flush signal by the SDK.
*
* <p> In all scenarios metrics should be collected via {@link collect()}.
* <p> For push exporters, this should collect and report metrics.
* @protected
*/
protected abstract onForceFlush(): Promise<void>;

/**
* Collect all metrics from the associated {@link MetricProducer}
*/
async collect(options?: ReaderCollectionOptions): Promise<MetricData[]> {
if (this._metricProducer === undefined) {
throw new Error('MetricReader is not bound to a MeterProvider');
throw new Error('MetricReader is not bound to a MetricProducer');
}
const metrics = await this._metricProducer.collect();

// errors thrown to caller
await this._exporter.export(metrics);
// Subsequent invocations to collect are not allowed. SDKs SHOULD return some failure for these calls.
if (this._shutdown) {
api.diag.warn('Collection is not allowed after shutdown');
return [];
}

// No timeout if timeoutMillis is undefined or null.
if (options?.timeoutMillis == null) {
return await this._metricProducer.collect();
}

return await callWithTimeout(this._metricProducer.collect(), options.timeoutMillis);
}

async shutdown(): Promise<void> {
/**
* Shuts down the metric reader, the promise will reject after the optional timeout or resolve after completion.
*
* <p> NOTE: this operation will continue even after the promise rejects due to a timeout.
* @param options options with timeout.
*/
async shutdown(options?: ReaderShutdownOptions): Promise<void> {
// Do not call shutdown again if it has already been called.
if (this._shutdown) {
api.diag.error('Cannot call shutdown twice.');
return;
}

// No timeout if timeoutMillis is undefined or null.
if (options?.timeoutMillis == null) {
await this.onShutdown();
} else {
await callWithTimeout(this.onShutdown(), options.timeoutMillis);
}

this._shutdown = true;
// errors thrown to caller
await this._exporter.shutdown();
}

async forceFlush(): Promise<void> {
/**
* Flushes metrics read by this reader, the promise will reject after the optional timeout or resolve after completion.
*
* <p> NOTE: this operation will continue even after the promise rejects due to a timeout.
* @param options options with timeout.
*/
async forceFlush(options?: ReaderForceFlushOptions): Promise<void> {
if (this._shutdown) {
api.diag.warn('Cannot forceFlush on already shutdown MetricReader.');
return;
}

// No timeout if timeoutMillis is undefined or null.
if (options?.timeoutMillis == null) {
await this.onForceFlush();
return;
}

// errors thrown to caller
await this._exporter.forceFlush();
await callWithTimeout(this.onForceFlush(), options.timeoutMillis);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as api from '@opentelemetry/api';
import { MetricReader } from './MetricReader';
import { MetricExporter } from './MetricExporter';
import { callWithTimeout, TimeoutError } from '../utils';

export type PeriodicExportingMetricReaderOptions = {
exporter: MetricExporter
exportIntervalMillis?: number,
exportTimeoutMillis?: number
}

/**
* {@link MetricReader} which collects metrics based on a user-configurable time interval, and passes the metrics to
* the configured {@link MetricExporter}
*/
export class PeriodicExportingMetricReader extends MetricReader {
private _interval?: ReturnType<typeof setInterval>;

private _exporter: MetricExporter;

private readonly _exportInterval: number;

private readonly _exportTimeout: number;

constructor(options: PeriodicExportingMetricReaderOptions) {
super(options.exporter.getPreferredAggregationTemporality());

if (options.exportIntervalMillis !== undefined && options.exportIntervalMillis <= 0) {
throw Error('exportIntervalMillis must be greater than 0');
}

if (options.exportTimeoutMillis !== undefined && options.exportTimeoutMillis <= 0) {
throw Error('exportTimeoutMillis must be greater than 0');
}

if (options.exportTimeoutMillis !== undefined &&
options.exportIntervalMillis !== undefined &&
options.exportIntervalMillis < options.exportTimeoutMillis) {
throw Error('exportIntervalMillis must be greater than or equal to exportTimeoutMillis');
}

this._exportInterval = options.exportIntervalMillis ?? 60000;
this._exportTimeout = options.exportTimeoutMillis ?? 30000;
this._exporter = options.exporter;
}

private async _runOnce(): Promise<void> {
const metrics = await this.collect({});
await this._exporter.export(metrics);
}

protected override onInitialized(): void {
// start running the interval as soon as this reader is initialized and keep handle for shutdown.
this._interval = setInterval(async () => {
try {
await callWithTimeout(this._runOnce(), this._exportTimeout);
} catch (err) {
if (err instanceof TimeoutError) {
api.diag.error('Export took longer than %s milliseconds and timed out.', this._exportTimeout);
return;
}

api.diag.error('Unexpected error during export: %s', err);
}
}, this._exportInterval);
}

protected async onForceFlush(): Promise<void> {
await this._exporter.forceFlush();
}

protected async onShutdown(): Promise<void> {
if (this._interval) {
clearInterval(this._interval);
}

await this._exporter.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ export class MetricCollector implements MetricProducer {
* Delegates for MetricReader.forceFlush.
*/
async forceFlush(): Promise<void> {
return this._metricReader.forceFlush();
await this._metricReader.forceFlush();
}

/**
* Delegates for MetricReader.shutdown.
*/
async shutdown(): Promise<void> {
return this._metricReader.shutdown();
await this._metricReader.shutdown();
}
}

Expand Down
44 changes: 44 additions & 0 deletions experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,47 @@ export function hashAttributes(attributes: Attributes): string {
return (result += key + ':' + attributes[key]);
}, '|#');
}

/**
* Error that is thrown on timeouts.
*/
export class TimeoutError extends Error {
constructor(message?: string) {
super(message);

// manually adjust prototype to retain `instanceof` functionality when targeting ES5, see:
// https://github.com/Microsoft/TypeScript-wiki/blob/main/Breaking-Changes.md#extending-built-ins-like-error-array-and-map-may-no-longer-work
Object.setPrototypeOf(this, TimeoutError.prototype);
}
}

/**
* Adds a timeout to a promise and rejects if the specified timeout has elapsed. Also rejects if the specified promise
* rejects, and resolves if the specified promise resolves.
*
* <p> NOTE: this operation will continue even after it throws a {@link TimeoutError}.
*
* @param promise promise to use with timeout.
* @param timeout the timeout in milliseconds until the returned promise is rejected.
*/
export function callWithTimeout<T>(promise: Promise<T>, timeout: number): Promise<T> {
let timeoutHandle: ReturnType<typeof setTimeout>;

const timeoutPromise = new Promise<never>(function timeoutFunction(_resolve, reject) {
timeoutHandle = setTimeout(
function timeoutHandler() {
reject(new TimeoutError('Operation timed out.'));
},
timeout
);
});

return Promise.race([promise, timeoutPromise]).then(result => {
clearTimeout(timeoutHandle);
return result;
},
reason => {
clearTimeout(timeoutHandle);
throw reason;
});
}
Loading

0 comments on commit 354c002

Please sign in to comment.