From b260f891ee1c4852bddc9cef4c639c935ee5f131 Mon Sep 17 00:00:00 2001 From: Dobes Vandermeer Date: Thu, 10 Dec 2020 10:21:12 -0800 Subject: [PATCH] feat(exporter-collector): implement concurrencyLimit option (#1708) This adds an option to the collector exporters `concurrencyLimit`. If this is set and the number of export operations is equal to the limit, additional export operations will fail immediately. This should be set in combination with the batch span processor be set such that the concurrency limit would not be reached under "normal" circumstances - only if there is an issue would spans start to be dropped. This helps us cap the amount of memory & sockets used by the exporter if it is not able to keep up with the data it is being provided. This could happen if the local network (e.g. in a browser) or the remote collector are too slow to handle all the activity. If we do not have this cap, and the exporter cannot keep up, resources such as memory and network sockets can be consumed without limit, causing crashes and other undesirable outcomes far worse than losing some telemetry data. This also updates the examples to use `BatchSpanProcessor` as I couldn't really think of any reason why you would want to use SimpleSpanProcessor in combination with the collector exporter. Co-authored-by: Valentin Marchaud Co-authored-by: Bartlomiej Obecny --- .../README.md | 28 ++++++++++---- .../src/CollectorExporterBase.ts | 14 +++++++ .../src/types.ts | 1 + .../common/CollectorTraceExporter.test.ts | 38 ++++++++++++++++++- 4 files changed, 73 insertions(+), 8 deletions(-) diff --git a/packages/opentelemetry-exporter-collector/README.md b/packages/opentelemetry-exporter-collector/README.md index d0a6ffb90f0..473f7b62611 100644 --- a/packages/opentelemetry-exporter-collector/README.md +++ b/packages/opentelemetry-exporter-collector/README.md @@ -19,18 +19,24 @@ npm install --save @opentelemetry/exporter-collector The CollectorTraceExporter in Web expects the endpoint to end in `/v1/trace`. ```js -import { SimpleSpanProcessor } from '@opentelemetry/tracing'; +import { BatchSpanProcessor } from '@opentelemetry/tracing'; import { WebTracerProvider } from '@opentelemetry/web'; import { CollectorTraceExporter } from '@opentelemetry/exporter-collector'; const collectorOptions = { url: '', // url is optional and can be omitted - default is http://localhost:55681/v1/trace - headers: {}, //an optional object containing custom headers to be sent with each request + headers: {}, // an optional object containing custom headers to be sent with each request + concurrencyLimit: 10, // an optional limit on pending requests }; const provider = new WebTracerProvider(); const exporter = new CollectorTraceExporter(collectorOptions); -provider.addSpanProcessor(new SimpleSpanProcessor(exporter)); +provider.addSpanProcessor(new BatchSpanProcessor(exporter, { + // send spans as soon as we have this many + bufferSize: 10, + // send spans if we have buffered spans older than this + bufferTimeout: 500, +})); provider.register(); @@ -45,7 +51,8 @@ import { MetricProvider } from '@opentelemetry/metrics'; import { CollectorMetricExporter } from '@opentelemetry/exporter-collector'; const collectorOptions = { url: '', // url is optional and can be omitted - default is http://localhost:55681/v1/metrics - headers: {}, //an optional object containing custom headers to be sent with each request + headers: {}, // an optional object containing custom headers to be sent with each request + concurrencyLimit: 1, // an optional limit on pending requests }; const exporter = new CollectorMetricExporter(collectorOptions); @@ -64,7 +71,7 @@ counter.add(10, { 'key': 'value' }); ## Traces in Node - JSON over http ```js -const { BasicTracerProvider, SimpleSpanProcessor } = require('@opentelemetry/tracing'); +const { BasicTracerProvider, BatchSpanProcessor } = require('@opentelemetry/tracing'); const { CollectorTraceExporter } = require('@opentelemetry/exporter-collector'); const collectorOptions = { @@ -72,12 +79,18 @@ const collectorOptions = { url: '', // url is optional and can be omitted - default is http://localhost:55681/v1/trace headers: { foo: 'bar' - }, //an optional object containing custom headers to be sent with each request will only work with http + }, // an optional object containing custom headers to be sent with each request will only work with http + concurrencyLimit: 10, // an optional limit on pending requests }; const provider = new BasicTracerProvider(); const exporter = new CollectorTraceExporter(collectorOptions); -provider.addSpanProcessor(new SimpleSpanProcessor(exporter)); +provider.addSpanProcessor(new BatchSpanProcessor(exporter, { + // send spans as soon as we have this many + bufferSize: 1000, + // send spans if we have buffered spans older than this + bufferTimeout: 30000, +})); provider.register(); @@ -91,6 +104,7 @@ const { CollectorMetricExporter } = require('@opentelemetry/exporter-collector' const collectorOptions = { serviceName: 'basic-service', url: '', // url is optional and can be omitted - default is http://localhost:55681/v1/metrics + concurrencyLimit: 1, // an optional limit on pending requests }; const exporter = new CollectorMetricExporter(collectorOptions); diff --git a/packages/opentelemetry-exporter-collector/src/CollectorExporterBase.ts b/packages/opentelemetry-exporter-collector/src/CollectorExporterBase.ts index 4b2775abb7c..cb4d535d941 100644 --- a/packages/opentelemetry-exporter-collector/src/CollectorExporterBase.ts +++ b/packages/opentelemetry-exporter-collector/src/CollectorExporterBase.ts @@ -39,6 +39,7 @@ export abstract class CollectorExporterBase< public readonly logger: Logger; public readonly hostname: string | undefined; public readonly attributes?: Attributes; + protected _concurrencyLimit: number; protected _isShutdown: boolean = false; private _shuttingDownPromise: Promise = Promise.resolve(); protected _sendingPromises: Promise[] = []; @@ -59,6 +60,11 @@ export abstract class CollectorExporterBase< this.shutdown = this.shutdown.bind(this); + this._concurrencyLimit = + typeof config.concurrencyLimit === 'number' + ? config.concurrencyLimit + : Infinity; + // platform dependent this.onInit(config); } @@ -77,6 +83,14 @@ export abstract class CollectorExporterBase< return; } + if (this._sendingPromises.length >= this._concurrencyLimit) { + resultCallback({ + code: ExportResultCode.FAILED, + error: new Error('Concurrent export limit reached'), + }); + return; + } + this._export(items) .then(() => { resultCallback({ code: ExportResultCode.SUCCESS }); diff --git a/packages/opentelemetry-exporter-collector/src/types.ts b/packages/opentelemetry-exporter-collector/src/types.ts index 694c60d8d9a..ab6423b8f94 100644 --- a/packages/opentelemetry-exporter-collector/src/types.ts +++ b/packages/opentelemetry-exporter-collector/src/types.ts @@ -341,6 +341,7 @@ export interface CollectorExporterConfigBase { serviceName?: string; attributes?: Attributes; url?: string; + concurrencyLimit?: number; } /** diff --git a/packages/opentelemetry-exporter-collector/test/common/CollectorTraceExporter.test.ts b/packages/opentelemetry-exporter-collector/test/common/CollectorTraceExporter.test.ts index 03aedf7ca4d..fa08f1b185d 100644 --- a/packages/opentelemetry-exporter-collector/test/common/CollectorTraceExporter.test.ts +++ b/packages/opentelemetry-exporter-collector/test/common/CollectorTraceExporter.test.ts @@ -31,7 +31,18 @@ class CollectorTraceExporter extends CollectorExporterBase< > { onInit() {} onShutdown() {} - send() {} + send( + items: any[], + onSuccess: () => void, + onError: (error: collectorTypes.CollectorExporterError) => void + ) { + const promise = Promise.resolve(null); + this._sendingPromises.push( + promise.then(() => + this._sendingPromises.splice(this._sendingPromises.indexOf(promise), 1) + ) + ); + } getDefaultUrl(config: CollectorExporterConfig): string { return config.url || ''; } @@ -187,7 +198,32 @@ describe('CollectorTraceExporter - common', () => { }); }); }); + describe('export - concurrency limit', () => { + it('should error if too many concurrent exports are queued', done => { + const collectorExporterWithConcurrencyLimit = new CollectorTraceExporter({ + ...collectorExporterConfig, + concurrencyLimit: 3, + }); + const spans: ReadableSpan[] = [{ ...mockedReadableSpan }]; + const callbackSpy = sinon.spy(); + for (let i = 0; i < 7; i++) { + collectorExporterWithConcurrencyLimit.export(spans, callbackSpy); + } + setTimeout(() => { + // Expect 4 failures + assert.strictEqual(callbackSpy.args.length, 4); + callbackSpy.args.forEach(([result]) => { + assert.strictEqual(result.code, ExportResultCode.FAILED); + assert.strictEqual( + result.error!.message, + 'Concurrent export limit reached' + ); + }); + done(); + }); + }); + }); describe('shutdown', () => { let onShutdownSpy: any; beforeEach(() => {