Skip to content

Commit

Permalink
[Usage Collection] Reuse concurrent collectors
Browse files Browse the repository at this point in the history
  • Loading branch information
afharo committed Feb 20, 2023
1 parent 9109fd5 commit 5497ec3
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -601,5 +601,57 @@ describe('CollectorSet', () => {
expect.any(Function)
);
});

it('reuses ongoing collectors for subsequent calls', async () => {
const fetchMock = jest.fn(
() => new Promise((resolve) => setTimeout(() => resolve({ test: 1000 }), 100))
);

collectorSet.registerCollector(
collectorSet.makeUsageCollector({
type: 'slow_collector',
isReady: () => true,
schema: { test: { type: 'long' } },
fetch: fetchMock,
})
);

const mockEsClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
const mockSoClient = savedObjectsClientMock.create();

// Call bulkFetch twice concurrently
await Promise.all([
collectorSet.bulkFetch(mockEsClient, mockSoClient),
collectorSet.bulkFetch(mockEsClient, mockSoClient),
]);

// It should be called once
expect(fetchMock).toHaveBeenCalledTimes(1);
});

it('calls completed collectors on subsequent calls', async () => {
const fetchMock = jest.fn(
() => new Promise((resolve) => setTimeout(() => resolve({ test: 1000 }), 100))
);

collectorSet.registerCollector(
collectorSet.makeUsageCollector({
type: 'slow_collector',
isReady: () => true,
schema: { test: { type: 'long' } },
fetch: fetchMock,
})
);

const mockEsClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
const mockSoClient = savedObjectsClientMock.create();

// Call bulkFetch twice sequentially
await collectorSet.bulkFetch(mockEsClient, mockSoClient);
await collectorSet.bulkFetch(mockEsClient, mockSoClient);

// It should be called once
expect(fetchMock).toHaveBeenCalledTimes(2);
});
});
});
33 changes: 23 additions & 10 deletions src/plugins/usage_collection/server/collector/collector_set.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ interface CollectorWithStatus {
collector: AnyCollector;
}

interface FetchCollectorOutput {
result?: unknown;
status: 'failed' | 'success';
type: string;
}

export interface CollectorSetConfig {
logger: Logger;
executionContext: ExecutionContextSetup;
Expand All @@ -43,6 +49,7 @@ export class CollectorSet {
private readonly executionContext: ExecutionContextSetup;
private readonly maximumWaitTimeForAllCollectorsInS: number;
private readonly collectors: Map<string, AnyCollector>;
private readonly fetchingCollectors = new WeakMap<AnyCollector, Promise<FetchCollectorOutput>>();
constructor({
logger,
executionContext,
Expand Down Expand Up @@ -190,11 +197,7 @@ export class CollectorSet {
private fetchCollector = async (
collector: AnyCollector,
context: CollectorFetchContext
): Promise<{
result?: unknown;
status: 'failed' | 'success';
type: string;
}> => {
): Promise<FetchCollectorOutput> => {
const { type } = collector;
this.logger.debug(`Fetching data from ${type} collector`);
const executionContext: KibanaExecutionContext = {
Expand Down Expand Up @@ -231,12 +234,22 @@ export class CollectorSet {

const fetchExecutions = await Promise.all(
readyCollectors.map(async (collector) => {
const wrappedPromise = perfTimerify(
`fetch_${collector.type}`,
async () => await this.fetchCollector(collector, context)
);
// If the collector is processing from a concurrent request, reuse it.
let wrappedPromise = this.fetchingCollectors.get(collector);

if (!wrappedPromise) {
// Otherwise, call it
wrappedPromise = perfTimerify(
`fetch_${collector.type}`,
async () => await this.fetchCollector(collector, context)
)();
}

this.fetchingCollectors.set(collector, wrappedPromise);

wrappedPromise.finally(() => this.fetchingCollectors.delete(collector));

return await wrappedPromise();
return await wrappedPromise;
})
);
const durationMarks = getMarks();
Expand Down

0 comments on commit 5497ec3

Please sign in to comment.