Skip to content

Commit

Permalink
Factor out getAllCompositeAggregationData<T>()
Browse files Browse the repository at this point in the history
  • Loading branch information
skh committed Apr 18, 2019
1 parent 3fa6213 commit 78b03a2
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 37 deletions.
4 changes: 2 additions & 2 deletions x-pack/plugins/infra/server/lib/snapshot/response_helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ export interface InfraSnapshotBucketWithValues {

export type InfraSnapshotMetricsBucket = InfraSnapshotBucketWithKey & InfraSnapshotBucketWithValues;

export interface InfraSnaphotNodeGroupByBucket {
export interface InfraSnapshotNodeGroupByBucket {
key: {
node: string;
[groupByField: string]: string;
};
}

export const getNodePath = (
groupBucket: InfraSnaphotNodeGroupByBucket,
groupBucket: InfraSnapshotNodeGroupByBucket,
options: InfraSnapshotRequestOptions
): InfraSnapshotNodePath[] => {
const node = groupBucket.key;
Expand Down
90 changes: 55 additions & 35 deletions x-pack/plugins/infra/server/lib/snapshot/snapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@ import { InfraSources } from '../sources';
import { JsonObject } from '../../../common/typed_json';
import { SNAPSHOT_COMPOSITE_REQUEST_SIZE } from './constants';
import { getGroupedNodesSources, getMetricsAggregations, getMetricsSources } from './query_helpers';
import { getNodeMetrics, getNodeMetricsForLookup, getNodePath } from './response_helpers';
import {
getNodeMetrics,
getNodeMetricsForLookup,
getNodePath,
InfraSnapshotNodeGroupByBucket,
InfraSnapshotNodeMetricsBucket,
} from './response_helpers';

export interface InfraSnapshotRequestOptions {
nodeType: InfraNodeType;
Expand Down Expand Up @@ -56,9 +62,8 @@ const requestGroupedNodes = async (
request: InfraFrameworkRequest,
options: InfraSnapshotRequestOptions,
framework: InfraBackendFrameworkAdapter
) => {
// This needs to be typed as 'any' as the query will be altered below to add the 'after_key' field.
const query: any = {
): Promise<InfraSnapshotNodeGroupByBucket[]> => {
const query = {
allowNoIndices: true,
index: `${options.sourceConfiguration.logAlias},${options.sourceConfiguration.metricAlias}`,
ignoreUnavailable: true,
Expand All @@ -80,7 +85,7 @@ const requestGroupedNodes = async (
},
},
size: 0,
aggs: {
aggregations: {
nodes: {
composite: {
size: SNAPSHOT_COMPOSITE_REQUEST_SIZE,
Expand All @@ -91,35 +96,24 @@ const requestGroupedNodes = async (
},
};

let response = await framework.callWithRequest<any, any>(request, 'search', query);

if (response.hits.total.value === 0) {
return [];
}

let buckets = response.aggregations.nodes.buckets;

// Getting an empty response back is the only way to find out that there are no further results.
while (response.aggregations.nodes.buckets.length > 0) {
query.body.aggs.nodes.composite.after = response.aggregations.nodes.after_key;
response = await framework.callWithRequest<any, any>(request, 'search', query);
buckets = buckets.concat(response.aggregations.nodes.buckets);
}
return buckets;
return await getAllCompositeAggregationData<InfraSnapshotNodeGroupByBucket>(
framework,
request,
query
);
};

const requestNodeMetrics = async (
request: InfraFrameworkRequest,
options: InfraSnapshotRequestOptions,
framework: InfraBackendFrameworkAdapter
) => {
): Promise<InfraSnapshotNodeMetricsBucket[]> => {
const index =
options.metric.type === 'logRate'
? `${options.sourceConfiguration.logAlias}`
: `${options.sourceConfiguration.metricAlias}`;

// This needs to be typed as 'any' as the query will be altered below to add the 'after_key' field.
const query: any = {
const query = {
allowNoIndices: true,
index,
ignoreUnavailable: true,
Expand All @@ -141,7 +135,7 @@ const requestNodeMetrics = async (
},
},
size: 0,
aggs: {
aggregations: {
nodes: {
composite: {
size: SNAPSHOT_COMPOSITE_REQUEST_SIZE,
Expand All @@ -161,26 +155,52 @@ const requestNodeMetrics = async (
},
};

let response = await framework.callWithRequest<any, any>(request, 'search', query);
return await getAllCompositeAggregationData<InfraSnapshotNodeMetricsBucket>(
framework,
request,
query
);
};

const getAllCompositeAggregationData = async <BucketType>(
framework: InfraBackendFrameworkAdapter,
request: InfraFrameworkRequest,
query: any,
previousBuckets: BucketType[] = []
): Promise<BucketType[]> => {
const response = await framework.callWithRequest<any, any>(request, 'search', query);

// Nothing available, return the previous buckets.
if (response.hits.total.value === 0) {
return [];
return previousBuckets;
}

let buckets = response.aggregations.nodes.buckets;
// if ES doesn't return an aggregations key, something went seriously wrong.
if (!response.aggregations) {
throw new Error('Whoops!, `aggregations` key must always be returned.');
}

const currentBuckets = response.aggregations.nodes.buckets;

// Getting an empty response back is the only way to find out that there are no further results.
while (response.aggregations.nodes.buckets.length > 0) {
query.body.aggs.nodes.composite.after = response.aggregations.nodes.after_key;
response = await framework.callWithRequest<any, any>(request, 'search', query);
buckets = buckets.concat(response.aggregations.nodes.buckets);
// if there are no currentBuckets then we are finished paginating through the results
if (currentBuckets.length === 0) {
return previousBuckets;
}
return buckets;

// There is possibly more data, concat previous and current buckets and call ourselves recursively.
const newQuery = { ...query };
newQuery.body.aggregations.nodes.composite.after = response.aggregations.nodes.after_key;
return getAllCompositeAggregationData(
framework,
request,
query,
previousBuckets.concat(currentBuckets)
);
};

const mergeNodeMetrics = (
nodes: any[],
metrics: any[],
nodes: InfraSnapshotNodeGroupByBucket[],
metrics: InfraSnapshotNodeMetricsBucket[],
options: InfraSnapshotRequestOptions
): InfraSnapshotNode[] => {
const result: any[] = [];
Expand Down

0 comments on commit 78b03a2

Please sign in to comment.