Skip to content

Commit

Permalink
[Usage collection] Update bulkFetch logic (elastic#121437)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bamieh authored Dec 20, 2021
1 parent 64f0e39 commit 46a0999
Show file tree
Hide file tree
Showing 15 changed files with 231 additions and 134 deletions.
35 changes: 0 additions & 35 deletions src/plugins/telemetry/server/fetcher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,38 +35,6 @@ describe('FetcherTask', () => {
);
});

it('stops when all collectors are not ready', async () => {
const initializerContext = coreMock.createPluginInitializerContext({});
const fetcherTask = new FetcherTask(initializerContext);
const getCurrentConfigs = jest.fn().mockResolvedValue({});
const areAllCollectorsReady = jest.fn().mockResolvedValue(false);
const shouldSendReport = jest.fn().mockReturnValue(true);
const fetchTelemetry = jest.fn();
const sendTelemetry = jest.fn();
const updateReportFailure = jest.fn();

Object.assign(fetcherTask, {
getCurrentConfigs,
areAllCollectorsReady,
shouldSendReport,
fetchTelemetry,
updateReportFailure,
sendTelemetry,
});

await fetcherTask['sendIfDue']();

expect(fetchTelemetry).toBeCalledTimes(0);
expect(sendTelemetry).toBeCalledTimes(0);

expect(areAllCollectorsReady).toBeCalledTimes(1);
expect(updateReportFailure).toBeCalledTimes(0);
expect(fetcherTask['logger'].warn).toBeCalledTimes(1);
expect(fetcherTask['logger'].warn).toHaveBeenCalledWith(
`Error fetching usage. (Error: Not all collectors are ready.)`
);
});

it('fetches usage and send telemetry', async () => {
const initializerContext = coreMock.createPluginInitializerContext({});
const fetcherTask = new FetcherTask(initializerContext);
Expand All @@ -79,7 +47,6 @@ describe('FetcherTask', () => {
const getCurrentConfigs = jest.fn().mockResolvedValue({
telemetryUrl: mockTelemetryUrl,
});
const areAllCollectorsReady = jest.fn().mockResolvedValue(true);
const shouldSendReport = jest.fn().mockReturnValue(true);

const fetchTelemetry = jest.fn().mockResolvedValue(mockClusters);
Expand All @@ -88,7 +55,6 @@ describe('FetcherTask', () => {

Object.assign(fetcherTask, {
getCurrentConfigs,
areAllCollectorsReady,
shouldSendReport,
fetchTelemetry,
updateReportFailure,
Expand All @@ -97,7 +63,6 @@ describe('FetcherTask', () => {

await fetcherTask['sendIfDue']();

expect(areAllCollectorsReady).toBeCalledTimes(1);
expect(fetchTelemetry).toBeCalledTimes(1);
expect(sendTelemetry).toBeCalledTimes(1);
expect(sendTelemetry).toHaveBeenNthCalledWith(1, mockTelemetryUrl, mockClusters);
Expand Down
8 changes: 0 additions & 8 deletions src/plugins/telemetry/server/fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,6 @@ export class FetcherTask {
}
}

private async areAllCollectorsReady() {
return (await this.telemetryCollectionManager?.areAllCollectorsReady()) ?? false;
}

private async sendIfDue() {
if (this.isSending) {
return;
Expand All @@ -102,10 +98,6 @@ export class FetcherTask {
this.isSending = true;

try {
const allCollectorsReady = await this.areAllCollectorsReady();
if (!allCollectorsReady) {
throw new Error('Not all collectors are ready.');
}
clusters = await this.fetchTelemetry();
} catch (err) {
this.logger.warn(`Error fetching usage. (${err})`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ describe('Telemetry Collection Manager', () => {
describe('everything works when no collection mechanisms are registered', () => {
const telemetryCollectionManager = new TelemetryCollectionManagerPlugin(initializerContext);
const setupApi = telemetryCollectionManager.setup(coreMock.createSetup(), { usageCollection });
test('All collectors are ready (there are none)', async () => {
await expect(setupApi.areAllCollectorsReady()).resolves.toBe(true);
});
test('getStats returns empty', async () => {
const config: StatsGetterConfig = { unencrypted: false };
await expect(setupApi.getStats(config)).resolves.toStrictEqual([]);
Expand Down
6 changes: 0 additions & 6 deletions src/plugins/telemetry_collection_manager/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ export class TelemetryCollectionManagerPlugin
setCollectionStrategy: this.setCollectionStrategy.bind(this),
getOptInStats: this.getOptInStats.bind(this),
getStats: this.getStats.bind(this),
areAllCollectorsReady: this.areAllCollectorsReady.bind(this),
};
}

Expand All @@ -80,7 +79,6 @@ export class TelemetryCollectionManagerPlugin
return {
getOptInStats: this.getOptInStats.bind(this),
getStats: this.getStats.bind(this),
areAllCollectorsReady: this.areAllCollectorsReady.bind(this),
};
}

Expand Down Expand Up @@ -221,10 +219,6 @@ export class TelemetryCollectionManagerPlugin
return [];
}

private async areAllCollectorsReady() {
return await this.usageCollection?.areAllCollectorsReady();
}

private getOptInStatsForCollection = async (
collection: CollectionStrategy,
optInStatus: boolean,
Expand Down
2 changes: 0 additions & 2 deletions src/plugins/telemetry_collection_manager/server/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@ export interface TelemetryCollectionManagerPluginSetup {
) => void;
getOptInStats: TelemetryCollectionManagerPlugin['getOptInStats'];
getStats: TelemetryCollectionManagerPlugin['getStats'];
areAllCollectorsReady: TelemetryCollectionManagerPlugin['areAllCollectorsReady'];
}

export interface TelemetryCollectionManagerPluginStart {
getOptInStats: TelemetryCollectionManagerPlugin['getOptInStats'];
getStats: TelemetryCollectionManagerPlugin['getStats'];
areAllCollectorsReady: TelemetryCollectionManagerPlugin['areAllCollectorsReady'];
}

export interface TelemetryOptInStats {
Expand Down
2 changes: 1 addition & 1 deletion src/plugins/usage_collection/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@
*/

export const KIBANA_STATS_TYPE = 'kibana_stats';
export const DEFAULT_MAXIMUM_WAIT_TIME_FOR_ALL_COLLECTORS_IN_S = 60;
export const DEFAULT_MAXIMUM_WAIT_TIME_FOR_ALL_COLLECTORS_IN_S = 1;
export const MAIN_APP_DEFAULT_VIEW_ID = 'main';
164 changes: 157 additions & 7 deletions src/plugins/usage_collection/server/collector/collector_set.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,22 @@ import { noop } from 'lodash';
import { Collector } from './collector';
import { CollectorSet } from './collector_set';
import { UsageCollector } from './usage_collector';

import {
elasticsearchServiceMock,
loggingSystemMock,
savedObjectsClientMock,
httpServerMock,
} from '../../../../core/server/mocks';

const logger = loggingSystemMock.createLogger();
describe('CollectorSet', () => {
const logger = loggingSystemMock.createLogger();

const loggerSpies = {
debug: jest.spyOn(logger, 'debug'),
warn: jest.spyOn(logger, 'warn'),
};
const loggerSpies = {
debug: jest.spyOn(logger, 'debug'),
warn: jest.spyOn(logger, 'warn'),
};

describe('CollectorSet', () => {
describe('registers a collector set and runs lifecycle events', () => {
let fetch: Function;
beforeEach(() => {
Expand Down Expand Up @@ -83,7 +85,8 @@ describe('CollectorSet', () => {
);

const result = await collectors.bulkFetch(mockEsClient, mockSoClient, req);
expect(loggerSpies.debug).toHaveBeenCalledTimes(1);
expect(loggerSpies.debug).toHaveBeenCalledTimes(2);
expect(loggerSpies.debug).toHaveBeenCalledWith('Getting ready collectors');
expect(loggerSpies.debug).toHaveBeenCalledWith(
'Fetching data from MY_TEST_COLLECTOR collector'
);
Expand Down Expand Up @@ -487,4 +490,151 @@ describe('CollectorSet', () => {
).toStrictEqual({ test: 1 });
});
});

describe('bulkFetch', () => {
const collectorSetConfig = { logger, maximumWaitTimeForAllCollectorsInS: 1 };
let collectorSet = new CollectorSet(collectorSetConfig);
afterEach(() => {
collectorSet = new CollectorSet(collectorSetConfig);
});

it('skips collectors that are not ready', async () => {
const mockIsReady = jest.fn().mockReturnValue(true);
const mockIsNotReady = jest.fn().mockResolvedValue(false);
const mockNonReadyFetch = jest.fn().mockResolvedValue({});
const mockReadyFetch = jest.fn().mockResolvedValue({});
collectorSet.registerCollector(
collectorSet.makeUsageCollector({
type: 'ready_col',
isReady: mockIsReady,
schema: {},
fetch: mockReadyFetch,
})
);
collectorSet.registerCollector(
collectorSet.makeUsageCollector({
type: 'not_ready_col',
isReady: mockIsNotReady,
schema: {},
fetch: mockNonReadyFetch,
})
);

const mockEsClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
const mockSoClient = savedObjectsClientMock.create();
const results = await collectorSet.bulkFetch(mockEsClient, mockSoClient, undefined);

expect(mockIsReady).toBeCalledTimes(1);
expect(mockReadyFetch).toBeCalledTimes(1);
expect(mockIsNotReady).toBeCalledTimes(1);
expect(mockNonReadyFetch).toBeCalledTimes(0);

expect(results).toMatchInlineSnapshot(`
Array [
Object {
"result": Object {},
"type": "ready_col",
},
]
`);
});

it('skips collectors that have timed out', async () => {
const mockFastReady = jest.fn().mockImplementation(async () => {
return new Promise((res) => {
setTimeout(() => res(true), 0.5 * 1000);
});
});
const mockTimedOutReady = jest.fn().mockImplementation(async () => {
return new Promise((res) => {
setTimeout(() => res(true), 2 * 1000);
});
});
const mockNonReadyFetch = jest.fn().mockResolvedValue({});
const mockReadyFetch = jest.fn().mockResolvedValue({});
collectorSet.registerCollector(
collectorSet.makeUsageCollector({
type: 'ready_col',
isReady: mockFastReady,
schema: {},
fetch: mockReadyFetch,
})
);
collectorSet.registerCollector(
collectorSet.makeUsageCollector({
type: 'timeout_col',
isReady: mockTimedOutReady,
schema: {},
fetch: mockNonReadyFetch,
})
);

const mockEsClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
const mockSoClient = savedObjectsClientMock.create();
const results = await collectorSet.bulkFetch(mockEsClient, mockSoClient, undefined);

expect(mockFastReady).toBeCalledTimes(1);
expect(mockReadyFetch).toBeCalledTimes(1);
expect(mockTimedOutReady).toBeCalledTimes(1);
expect(mockNonReadyFetch).toBeCalledTimes(0);

expect(results).toMatchInlineSnapshot(`
Array [
Object {
"result": Object {},
"type": "ready_col",
},
]
`);
});

it('passes context to fetch', async () => {
const mockReadyFetch = jest.fn().mockResolvedValue({});
collectorSet.registerCollector(
collectorSet.makeUsageCollector({
type: 'ready_col',
isReady: () => true,
schema: {},
fetch: mockReadyFetch,
})
);

const mockEsClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
const mockSoClient = savedObjectsClientMock.create();
const results = await collectorSet.bulkFetch(mockEsClient, mockSoClient, undefined);

expect(mockReadyFetch).toBeCalledTimes(1);
expect(mockReadyFetch).toBeCalledWith({
esClient: mockEsClient,
soClient: mockSoClient,
});
expect(results).toHaveLength(1);
});

it('adds extra context to collectors with extendFetchContext config', async () => {
const mockReadyFetch = jest.fn().mockResolvedValue({});
collectorSet.registerCollector(
collectorSet.makeUsageCollector({
type: 'ready_col',
isReady: () => true,
schema: {},
fetch: mockReadyFetch,
extendFetchContext: { kibanaRequest: true },
})
);

const mockEsClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
const mockSoClient = savedObjectsClientMock.create();
const request = httpServerMock.createKibanaRequest();
const results = await collectorSet.bulkFetch(mockEsClient, mockSoClient, request);

expect(mockReadyFetch).toBeCalledTimes(1);
expect(mockReadyFetch).toBeCalledWith({
esClient: mockEsClient,
soClient: mockSoClient,
kibanaRequest: request,
});
expect(results).toHaveLength(1);
});
});
});
Loading

0 comments on commit 46a0999

Please sign in to comment.