Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Usage Collection] Ensure no type duplicates #70946

Merged
merged 3 commits into from
Jul 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/plugins/usage_collection/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ All you need to provide is a `type` for organizing your fields, `schema` field t

// create usage collector
const myCollector = usageCollection.makeUsageCollector<Usage>({
type: MY_USAGE_TYPE,
type: 'MY_USAGE_TYPE',
schema: {
my_objects: {
total: 'long',
Expand All @@ -84,7 +84,11 @@ All you need to provide is a `type` for organizing your fields, `schema` field t
}
```

Some background: The `callCluster` that gets passed to the `fetch` method is created in a way that's a bit tricky, to support multiple contexts the `fetch` method could be called. Your `fetch` method could get called as a result of an HTTP API request: in this case, the `callCluster` function wraps `callWithRequest`, and the request headers are expected to have read privilege on the entire `.kibana` index. The use case for this is stats pulled from a Kibana Metricbeat module, where the Beat calls Kibana's stats API in Kibana to invoke collection.
Some background:

- `MY_USAGE_TYPE` can be any string. It usually matches the plugin name. As a safety mechanism, we double check there are no duplicates at the moment of registering the collector.
- The `fetch` method needs to support multiple contexts in which it is called. For example, when stats are pulled from a Kibana Metricbeat module, the Beat calls Kibana's stats API to invoke usage collection.
In this case, the `fetch` method is called as a result of an HTTP API request and `callCluster` wraps `callWithRequest`, where the request headers are expected to have read privilege on the entire `.kibana' index.

Note: there will be many cases where you won't need to use the `callCluster` function that gets passed in to your `fetch` method at all. Your feature might have an accumulating value in server memory, or read something from the OS, or use other clients like a custom SavedObjects client. In that case it's up to the plugin to initialize those clients like the example below:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ describe('CollectorSet', () => {
loggerSpies.warn.mockRestore();
});

const mockCallCluster = () => Promise.resolve({ passTest: 1000 });
const mockCallCluster = jest.fn().mockResolvedValue({ passTest: 1000 });

it('should throw an error if non-Collector type of object is registered', () => {
const collectors = new CollectorSet({ logger });
Expand All @@ -58,6 +58,23 @@ describe('CollectorSet', () => {
);
});

it('should throw when 2 collectors with the same type are registered', () => {
const collectorSet = new CollectorSet({ logger });
collectorSet.registerCollector(
new Collector(logger, { type: 'test_duplicated', fetch: () => 1, isReady: () => true })
);
expect(() =>
collectorSet.registerCollector(
// Even for Collector vs. UsageCollector
new UsageCollector(logger, {
type: 'test_duplicated',
fetch: () => 2,
isReady: () => false,
})
)
).toThrowError(`Usage collector's type "test_duplicated" is duplicated.`);
});

it('should log debug status of fetching from the collector', async () => {
const collectors = new CollectorSet({ logger });
collectors.registerCollector(
Expand All @@ -68,7 +85,7 @@ describe('CollectorSet', () => {
})
);

const result = await collectors.bulkFetch(mockCallCluster as any);
const result = await collectors.bulkFetch(mockCallCluster);
expect(loggerSpies.debug).toHaveBeenCalledTimes(1);
expect(loggerSpies.debug).toHaveBeenCalledWith(
'Fetching data from MY_TEST_COLLECTOR collector'
Expand All @@ -93,7 +110,7 @@ describe('CollectorSet', () => {

let result;
try {
result = await collectors.bulkFetch(mockCallCluster as any);
result = await collectors.bulkFetch(mockCallCluster);
} catch (err) {
// Do nothing
}
Expand All @@ -111,7 +128,7 @@ describe('CollectorSet', () => {
})
);

const result = await collectors.bulkFetch(mockCallCluster as any);
const result = await collectors.bulkFetch(mockCallCluster);
expect(result).toStrictEqual([
{
type: 'MY_TEST_COLLECTOR',
Expand All @@ -129,7 +146,7 @@ describe('CollectorSet', () => {
} as any)
);

const result = await collectors.bulkFetch(mockCallCluster as any);
const result = await collectors.bulkFetch(mockCallCluster);
expect(result).toStrictEqual([
{
type: 'MY_TEST_COLLECTOR',
Expand All @@ -152,7 +169,7 @@ describe('CollectorSet', () => {
})
);

const result = await collectors.bulkFetch(mockCallCluster as any);
const result = await collectors.bulkFetch(mockCallCluster);
expect(result).toStrictEqual([
{
type: 'MY_TEST_COLLECTOR',
Expand Down
24 changes: 14 additions & 10 deletions src/plugins/usage_collection/server/collector/collector_set.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ export class CollectorSet {
private _waitingForAllCollectorsTimestamp?: number;
private readonly logger: Logger;
private readonly maximumWaitTimeForAllCollectorsInS: number;
private collectors: Array<Collector<any, any>> = [];
private readonly collectors: Map<string, Collector<any, any>>;
constructor({ logger, maximumWaitTimeForAllCollectorsInS, collectors = [] }: CollectorSetConfig) {
this.logger = logger;
this.collectors = collectors;
this.collectors = new Map(collectors.map((collector) => [collector.type, collector]));
this.maximumWaitTimeForAllCollectorsInS = maximumWaitTimeForAllCollectorsInS || 60;
}

Expand All @@ -55,7 +55,11 @@ export class CollectorSet {
throw new Error('CollectorSet can only have Collector instances registered');
}

this.collectors.push(collector);
if (this.collectors.get(collector.type)) {
throw new Error(`Usage collector's type "${collector.type}" is duplicated.`);
}

this.collectors.set(collector.type, collector);

if (collector.init) {
this.logger.debug(`Initializing ${collector.type} collector`);
Expand All @@ -64,7 +68,7 @@ export class CollectorSet {
};

public getCollectorByType = (type: string) => {
return this.collectors.find((c) => c.type === type);
return [...this.collectors.values()].find((c) => c.type === type);
};

public isUsageCollector = (x: UsageCollector | any): x is UsageCollector => {
Expand All @@ -81,7 +85,7 @@ export class CollectorSet {

const collectorTypesNotReady: string[] = [];
let allReady = true;
for (const collector of collectorSet.collectors) {
for (const collector of collectorSet.collectors.values()) {
if (!(await collector.isReady())) {
allReady = false;
collectorTypesNotReady.push(collector.type);
Expand Down Expand Up @@ -113,10 +117,10 @@ export class CollectorSet {

public bulkFetch = async (
callCluster: LegacyAPICaller,
collectors: Array<Collector<any, any>> = this.collectors
collectors: Map<string, Collector<any, any>> = this.collectors
) => {
const responses = [];
for (const collector of collectors) {
for (const collector of collectors.values()) {
this.logger.debug(`Fetching data from ${collector.type} collector`);
try {
responses.push({
Expand All @@ -136,7 +140,7 @@ export class CollectorSet {
* @return {new CollectorSet}
*/
public getFilteredCollectorSet = (filter: (col: Collector) => boolean) => {
const filtered = this.collectors.filter(filter);
const filtered = [...this.collectors.values()].filter(filter);
return this.makeCollectorSetFromArray(filtered);
};

Expand Down Expand Up @@ -188,12 +192,12 @@ export class CollectorSet {

// TODO: remove
public map = (mapFn: any) => {
return this.collectors.map(mapFn);
return [...this.collectors.values()].map(mapFn);
};

// TODO: remove
public some = (someFn: any) => {
return this.collectors.some(someFn);
return [...this.collectors.values()].some(someFn);
};

private makeCollectorSetFromArray = (collectors: Collector[]) => {
Expand Down