From 5497ec34bff000993dafd797566cafd222a741e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Fern=C3=A1ndez=20Haro?= Date: Mon, 20 Feb 2023 15:26:21 +0100 Subject: [PATCH] [Usage Collection] Reuse concurrent collectors --- .../server/collector/collector_set.test.ts | 52 +++++++++++++++++++ .../server/collector/collector_set.ts | 33 ++++++++---- 2 files changed, 75 insertions(+), 10 deletions(-) diff --git a/src/plugins/usage_collection/server/collector/collector_set.test.ts b/src/plugins/usage_collection/server/collector/collector_set.test.ts index bf3381c4510d3..56b4e55eccfc4 100644 --- a/src/plugins/usage_collection/server/collector/collector_set.test.ts +++ b/src/plugins/usage_collection/server/collector/collector_set.test.ts @@ -601,5 +601,57 @@ describe('CollectorSet', () => { expect.any(Function) ); }); + + it('reuses ongoing collectors for subsequent calls', async () => { + const fetchMock = jest.fn( + () => new Promise((resolve) => setTimeout(() => resolve({ test: 1000 }), 100)) + ); + + collectorSet.registerCollector( + collectorSet.makeUsageCollector({ + type: 'slow_collector', + isReady: () => true, + schema: { test: { type: 'long' } }, + fetch: fetchMock, + }) + ); + + const mockEsClient = elasticsearchServiceMock.createClusterClient().asInternalUser; + const mockSoClient = savedObjectsClientMock.create(); + + // Call bulkFetch twice concurrently + await Promise.all([ + collectorSet.bulkFetch(mockEsClient, mockSoClient), + collectorSet.bulkFetch(mockEsClient, mockSoClient), + ]); + + // It should be called once + expect(fetchMock).toHaveBeenCalledTimes(1); + }); + + it('calls completed collectors on subsequent calls', async () => { + const fetchMock = jest.fn( + () => new Promise((resolve) => setTimeout(() => resolve({ test: 1000 }), 100)) + ); + + collectorSet.registerCollector( + collectorSet.makeUsageCollector({ + type: 'slow_collector', + isReady: () => true, + schema: { test: { type: 'long' } }, + fetch: fetchMock, + }) + ); + + const mockEsClient = elasticsearchServiceMock.createClusterClient().asInternalUser; + const mockSoClient = savedObjectsClientMock.create(); + + // Call bulkFetch twice sequentially + await collectorSet.bulkFetch(mockEsClient, mockSoClient); + await collectorSet.bulkFetch(mockEsClient, mockSoClient); + + // It should be called once + expect(fetchMock).toHaveBeenCalledTimes(2); + }); }); }); diff --git a/src/plugins/usage_collection/server/collector/collector_set.ts b/src/plugins/usage_collection/server/collector/collector_set.ts index 8251b95a1beb8..d1b06e63b3b95 100644 --- a/src/plugins/usage_collection/server/collector/collector_set.ts +++ b/src/plugins/usage_collection/server/collector/collector_set.ts @@ -31,6 +31,12 @@ interface CollectorWithStatus { collector: AnyCollector; } +interface FetchCollectorOutput { + result?: unknown; + status: 'failed' | 'success'; + type: string; +} + export interface CollectorSetConfig { logger: Logger; executionContext: ExecutionContextSetup; @@ -43,6 +49,7 @@ export class CollectorSet { private readonly executionContext: ExecutionContextSetup; private readonly maximumWaitTimeForAllCollectorsInS: number; private readonly collectors: Map; + private readonly fetchingCollectors = new WeakMap>(); constructor({ logger, executionContext, @@ -190,11 +197,7 @@ export class CollectorSet { private fetchCollector = async ( collector: AnyCollector, context: CollectorFetchContext - ): Promise<{ - result?: unknown; - status: 'failed' | 'success'; - type: string; - }> => { + ): Promise => { const { type } = collector; this.logger.debug(`Fetching data from ${type} collector`); const executionContext: KibanaExecutionContext = { @@ -231,12 +234,22 @@ export class CollectorSet { const fetchExecutions = await Promise.all( readyCollectors.map(async (collector) => { - const wrappedPromise = perfTimerify( - `fetch_${collector.type}`, - async () => await this.fetchCollector(collector, context) - ); + // If the collector is processing from a concurrent request, reuse it. + let wrappedPromise = this.fetchingCollectors.get(collector); + + if (!wrappedPromise) { + // Otherwise, call it + wrappedPromise = perfTimerify( + `fetch_${collector.type}`, + async () => await this.fetchCollector(collector, context) + )(); + } + + this.fetchingCollectors.set(collector, wrappedPromise); + + wrappedPromise.finally(() => this.fetchingCollectors.delete(collector)); - return await wrappedPromise(); + return await wrappedPromise; }) ); const durationMarks = getMarks();