Skip to content

Commit

Permalink
check collectors ready before sending
Browse files Browse the repository at this point in the history
  • Loading branch information
Bamieh committed Oct 4, 2020
1 parent acedb35 commit 32ad765
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 16 deletions.
37 changes: 34 additions & 3 deletions src/plugins/telemetry/server/fetcher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,46 @@ describe('FetcherTask', () => {
const initializerContext = coreMock.createPluginInitializerContext({});
const fetcherTask = new FetcherTask(initializerContext);
const mockError = new Error('Some message.');
fetcherTask['getCurrentConfigs'] = async () => {
throw mockError;
};
const getCurrentConfigs = jest.fn().mockRejectedValue(mockError);
Object.assign(fetcherTask, {
getCurrentConfigs,
});
const result = await fetcherTask['sendIfDue']();
expect(result).toBe(undefined);
expect(getCurrentConfigs).toBeCalledTimes(1);
expect(fetcherTask['logger'].warn).toBeCalledTimes(1);
expect(fetcherTask['logger'].warn).toHaveBeenCalledWith(
`Error fetching telemetry configs: ${mockError}`
);
});

it('fails when all collectors are not ready', async () => {
const initializerContext = coreMock.createPluginInitializerContext({});
const fetcherTask = new FetcherTask(initializerContext);
const getCurrentConfigs = jest.fn().mockResolvedValue({});
const areAllCollectorsReady = jest.fn().mockResolvedValue(false);
const shouldSendReport = jest.fn().mockReturnValue(true);
const fetchTelemetry = jest.fn();
const updateReportFailure = jest.fn();

Object.assign(fetcherTask, {
getCurrentConfigs,
areAllCollectorsReady,
shouldSendReport,
fetchTelemetry,
updateReportFailure,
});

await fetcherTask['sendIfDue']();

expect(fetchTelemetry).toBeCalledTimes(0);

expect(areAllCollectorsReady).toBeCalledTimes(1);
expect(updateReportFailure).toBeCalledTimes(1);
expect(fetcherTask['logger'].warn).toBeCalledTimes(1);
expect(fetcherTask['logger'].warn).toHaveBeenCalledWith(
`Error sending telemetry usage data. (Error: Not all collectors are ready.)`
);
});
});
});
11 changes: 10 additions & 1 deletion src/plugins/telemetry/server/fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ export class FetcherTask {
}
}

private async areAllCollectorsReady() {
return await this.telemetryCollectionManager!.areAllCollectorsReady();
}

private async sendIfDue() {
if (this.isSending) {
return;
Expand All @@ -113,6 +117,11 @@ export class FetcherTask {

try {
this.isSending = true;
const allCollectorsReady = await this.areAllCollectorsReady();

if (!allCollectorsReady) {
throw new Error('Not all collectors are ready.');
}
const clusters = await this.fetchTelemetry();
const { telemetryUrl } = telemetryConfig;
for (const cluster of clusters) {
Expand All @@ -123,7 +132,7 @@ export class FetcherTask {
} catch (err) {
await this.updateReportFailure(telemetryConfig);

this.logger.warn(`Error sending telemetry usage data: ${err}`);
this.logger.warn(`Error sending telemetry usage data. (${err})`);
}
this.isSending = false;
}
Expand Down
6 changes: 6 additions & 0 deletions src/plugins/telemetry_collection_manager/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ export class TelemetryCollectionManagerPlugin
setCollection: this.setCollection.bind(this),
getOptInStats: this.getOptInStats.bind(this),
getStats: this.getStats.bind(this),
areAllCollectorsReady: this.areAllCollectorsReady.bind(this),
};
}

Expand All @@ -75,6 +76,7 @@ export class TelemetryCollectionManagerPlugin
setCollection: this.setCollection.bind(this),
getOptInStats: this.getOptInStats.bind(this),
getStats: this.getStats.bind(this),
areAllCollectorsReady: this.areAllCollectorsReady.bind(this),
};
}

Expand Down Expand Up @@ -185,6 +187,10 @@ export class TelemetryCollectionManagerPlugin
return [];
}

private areAllCollectorsReady = async () => {
return await this.usageCollection?.areAllCollectorsReady();
};

private getOptInStatsForCollection = async (
collection: Collection,
optInStatus: boolean,
Expand Down
2 changes: 2 additions & 0 deletions src/plugins/telemetry_collection_manager/server/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ export interface TelemetryCollectionManagerPluginSetup {
) => void;
getOptInStats: TelemetryCollectionManagerPlugin['getOptInStats'];
getStats: TelemetryCollectionManagerPlugin['getStats'];
areAllCollectorsReady: TelemetryCollectionManagerPlugin['areAllCollectorsReady'];
}

export interface TelemetryCollectionManagerPluginStart {
Expand All @@ -42,6 +43,7 @@ export interface TelemetryCollectionManagerPluginStart {
) => void;
getOptInStats: TelemetryCollectionManagerPlugin['getOptInStats'];
getStats: TelemetryCollectionManagerPlugin['getStats'];
areAllCollectorsReady: TelemetryCollectionManagerPlugin['areAllCollectorsReady'];
}

export interface TelemetryOptInStats {
Expand Down
30 changes: 18 additions & 12 deletions src/plugins/usage_collection/server/collector/collector_set.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { snakeCase } from 'lodash';
import { Logger, LegacyAPICaller, ElasticsearchClient } from 'kibana/server';
import { Collector, CollectorOptions } from './collector';
import { UsageCollector } from './usage_collector';
import { awaitBefore } from './await_before';

interface CollectorSetConfig {
logger: Logger;
Expand Down Expand Up @@ -76,23 +77,27 @@ export class CollectorSet {
};

public areAllCollectorsReady = async (collectorSet: CollectorSet = this) => {
// Kept this for runtime validation in JS code.
if (!(collectorSet instanceof CollectorSet)) {
throw new Error(
`areAllCollectorsReady method given bad collectorSet parameter: ` + typeof collectorSet
);
}

const collectorTypesNotReady = (
await Promise.all(
[...collectorSet.collectors.values()].map(async (collector) => {
if (!(await collector.isReady())) {
return collector.type;
}
})
)
).filter((collectorType): collectorType is string => !!collectorType);
const allReady = collectorTypesNotReady.length === 0;
const collectors = [...collectorSet.collectors.values()];
const collectorsWithStatus = await Promise.all(
collectors.map(async (collector) => {
return {
isReady: await collector.isReady(),
collector,
};
})
);

const collectorsTypesNotReady = collectorsWithStatus
.filter((collectorWithStatus) => collectorWithStatus.isReady === false)
.map((collectorWithStatus) => collectorWithStatus.collector.type);

const allReady = collectorsTypesNotReady.length === 0;

if (!allReady && this.maximumWaitTimeForAllCollectorsInS >= 0) {
const nowTimestamp = +new Date();
Expand All @@ -102,10 +107,11 @@ export class CollectorSet {
const timeLeftInMS = this.maximumWaitTimeForAllCollectorsInS * 1000 - timeWaitedInMS;
if (timeLeftInMS <= 0) {
this.logger.debug(
`All collectors are not ready (waiting for ${collectorTypesNotReady.join(',')}) ` +
`All collectors are not ready (waiting for ${collectorsTypesNotReady.join(',')}) ` +
`but we have waited the required ` +
`${this.maximumWaitTimeForAllCollectorsInS}s and will return data from all collectors that are ready.`
);

return true;
} else {
this.logger.debug(`All collectors are not ready. Waiting for ${timeLeftInMS}ms longer.`);
Expand Down

0 comments on commit 32ad765

Please sign in to comment.