Skip to content

Commit

Permalink
refactor(metric-reader): move callWithTimeout and TimeoutError to src…
Browse files Browse the repository at this point in the history
…/utils.ts
  • Loading branch information
pichlermarc committed Jan 5, 2022
1 parent 8021fb3 commit 1ec3150
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import * as api from '@opentelemetry/api';
import { AggregationTemporality } from './AggregationTemporality';
import { MetricProducer } from './MetricProducer';
import { MetricData } from './MetricData';
import { callWithTimeout } from '../utils';

export type ReaderOptions = {
timeoutMillis?: number
Expand All @@ -29,47 +30,6 @@ export type ReaderShutdownOptions = ReaderOptions;

export type ReaderForceFlushOptions = ReaderOptions;

/**
* Error that is thrown on timeouts (i.e. timeout on forceFlush or shutdown)
*/
export class ReaderTimeoutError extends Error {
constructor(message?: string) {
super(message);
Object.setPrototypeOf(this, ReaderTimeoutError.prototype);
}
}

/**
* Adds a timeout to a promise and rejects if the specified timeout has elapsed. Also rejects if the specified promise
* rejects, and resolves if the specified promise resolves.
*
* <p> NOTE: this operation will continue even after it throws a {@link ReaderTimeoutError}.
*
* @param promise promise to use with timeout.
* @param timeout the timeout in milliseconds until the returned promise is rejected.
*/
export function callWithTimeout<T>(promise: Promise<T>, timeout: number): Promise<T> {
let timeoutHandle: ReturnType<typeof setTimeout>;

const timeoutPromise = new Promise<never>(function timeoutFunction(_resolve, reject) {
timeoutHandle = setTimeout(
function timeoutHandler() {
reject(new ReaderTimeoutError('Operation timed out.'));
},
timeout
);
});

return Promise.race([promise, timeoutPromise]).then(result => {
clearTimeout(timeoutHandle);
return result;
},
reason => {
clearTimeout(timeoutHandle);
throw reason;
});
}

// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricreader

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
*/

import * as api from '@opentelemetry/api';
import { callWithTimeout, MetricReader, ReaderTimeoutError } from './MetricReader';
import { MetricReader } from './MetricReader';
import { MetricExporter } from './MetricExporter';
import { callWithTimeout, TimeoutError } from '../utils';

export type PeriodicExportingMetricReaderOptions = {
exporter: MetricExporter
Expand Down Expand Up @@ -70,7 +71,7 @@ export class PeriodicExportingMetricReader extends MetricReader {
try {
await callWithTimeout(this._runOnce(), this._exportTimeout);
} catch (err) {
if (err instanceof ReaderTimeoutError) {
if (err instanceof TimeoutError) {
api.diag.error('Export took longer than %s milliseconds and timed out.', this._exportTimeout);
return;
}
Expand Down
41 changes: 41 additions & 0 deletions experimental/packages/opentelemetry-sdk-metrics-base/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,44 @@ export function hashAttributes(attributes: Attributes): string {
return (result += key + ':' + attributes[key]);
}, '|#');
}

/**
* Error that is thrown on timeouts.
*/
export class TimeoutError extends Error {
constructor(message?: string) {
super(message);
Object.setPrototypeOf(this, TimeoutError.prototype);
}
}

/**
* Adds a timeout to a promise and rejects if the specified timeout has elapsed. Also rejects if the specified promise
* rejects, and resolves if the specified promise resolves.
*
* <p> NOTE: this operation will continue even after it throws a {@link TimeoutError}.
*
* @param promise promise to use with timeout.
* @param timeout the timeout in milliseconds until the returned promise is rejected.
*/
export function callWithTimeout<T>(promise: Promise<T>, timeout: number): Promise<T> {
let timeoutHandle: ReturnType<typeof setTimeout>;

const timeoutPromise = new Promise<never>(function timeoutFunction(_resolve, reject) {
timeoutHandle = setTimeout(
function timeoutHandler() {
reject(new TimeoutError('Operation timed out.'));
},
timeout
);
});

return Promise.race([promise, timeoutPromise]).then(result => {
clearTimeout(timeoutHandle);
return result;
},
reason => {
clearTimeout(timeoutHandle);
throw reason;
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@

import { PeriodicExportingMetricReader } from '../../src/export/PeriodicExportingMetricReader';
import { AggregationTemporality } from '../../src/export/AggregationTemporality';
import { MetricExporter, ReaderTimeoutError } from '../../src';
import { MetricExporter } from '../../src';
import { MetricData } from '../../src/export/MetricData';
import * as assert from 'assert';
import * as sinon from 'sinon';
import { MetricProducer } from '../../src/export/MetricProducer';
import { TimeoutError } from '../../src/utils';

const MAX_32_BIT_INT = 2 ** 31 - 1

Expand Down Expand Up @@ -205,7 +206,7 @@ describe('PeriodicExportingMetricReader', () => {

reader.setMetricProducer(new TestMetricProducer());
await assert.rejects(() => reader.forceFlush({ timeoutMillis: 20 }),
thrown => thrown instanceof ReaderTimeoutError);
thrown => thrown instanceof TimeoutError);
await reader.shutdown({});
});

Expand Down Expand Up @@ -267,7 +268,7 @@ describe('PeriodicExportingMetricReader', () => {

reader.setMetricProducer(new TestMetricProducer());
await assert.rejects(() => reader.shutdown({ timeoutMillis: 20 }),
thrown => thrown instanceof ReaderTimeoutError);
thrown => thrown instanceof TimeoutError);
});

it('called twice should call export shutdown only once', async () => {
Expand Down

0 comments on commit 1ec3150

Please sign in to comment.