Skip to content

Commit

Permalink
Multi field terms (#116928)
Browse files Browse the repository at this point in the history
  • Loading branch information
flash1293 authored Nov 23, 2021
1 parent 7bd903b commit 079db96
Show file tree
Hide file tree
Showing 31 changed files with 1,133 additions and 225 deletions.
2 changes: 2 additions & 0 deletions src/plugins/data/common/search/aggs/agg_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export const getAggTypes = () => ({
{ name: BUCKET_TYPES.DATE_RANGE, fn: buckets.getDateRangeBucketAgg },
{ name: BUCKET_TYPES.IP_RANGE, fn: buckets.getIpRangeBucketAgg },
{ name: BUCKET_TYPES.TERMS, fn: buckets.getTermsBucketAgg },
{ name: BUCKET_TYPES.MULTI_TERMS, fn: buckets.getMultiTermsBucketAgg },
{ name: BUCKET_TYPES.FILTER, fn: buckets.getFilterBucketAgg },
{ name: BUCKET_TYPES.FILTERS, fn: buckets.getFiltersBucketAgg },
{ name: BUCKET_TYPES.SIGNIFICANT_TERMS, fn: buckets.getSignificantTermsBucketAgg },
Expand All @@ -77,6 +78,7 @@ export const getAggTypesFunctions = () => [
buckets.aggHistogram,
buckets.aggDateHistogram,
buckets.aggTerms,
buckets.aggMultiTerms,
metrics.aggAvg,
metrics.aggBucketAvg,
metrics.aggBucketMax,
Expand Down
2 changes: 2 additions & 0 deletions src/plugins/data/common/search/aggs/aggs_service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ describe('Aggs service', () => {
"date_range",
"ip_range",
"terms",
"multi_terms",
"filter",
"filters",
"significant_terms",
Expand Down Expand Up @@ -115,6 +116,7 @@ describe('Aggs service', () => {
"date_range",
"ip_range",
"terms",
"multi_terms",
"filter",
"filters",
"significant_terms",
Expand Down
120 changes: 120 additions & 0 deletions src/plugins/data/common/search/aggs/buckets/_terms_order_helper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import moment from 'moment';
import { IBucketAggConfig, BucketAggParam } from './bucket_agg_type';

export const termsAggFilter = [
'!top_hits',
'!percentiles',
'!std_dev',
'!derivative',
'!moving_avg',
'!serial_diff',
'!cumulative_sum',
'!avg_bucket',
'!max_bucket',
'!min_bucket',
'!sum_bucket',
];

export const termsOrderAggParamDefinition: Partial<BucketAggParam<IBucketAggConfig>> = {
name: 'orderAgg',
type: 'agg',
allowedAggs: termsAggFilter,
default: null,
makeAgg(termsAgg, state = { type: 'count' }) {
state.schema = 'orderAgg';
const orderAgg = termsAgg.aggConfigs.createAggConfig<IBucketAggConfig>(state, {
addToAggConfigs: false,
});
orderAgg.id = termsAgg.id + '-orderAgg';

return orderAgg;
},
write(agg, output, aggs) {
const dir = agg.params.order.value;
const order: Record<string, any> = (output.params.order = {});

let orderAgg = agg.params.orderAgg || aggs!.getResponseAggById(agg.params.orderBy);

// TODO: This works around an Elasticsearch bug the always casts terms agg scripts to strings
// thus causing issues with filtering. This probably causes other issues since float might not
// be able to contain the number on the elasticsearch side
if (output.params.script) {
output.params.value_type = agg.getField().type === 'number' ? 'float' : agg.getField().type;
}

if (agg.params.missingBucket && agg.params.field.type === 'string') {
output.params.missing = '__missing__';
}

if (!orderAgg) {
order[agg.params.orderBy || '_count'] = dir;
return;
}

if (aggs?.hasTimeShifts() && Object.keys(aggs?.getTimeShifts()).length > 1 && aggs.timeRange) {
const shift = orderAgg.getTimeShift();
orderAgg = aggs.createAggConfig(
{
type: 'filtered_metric',
id: orderAgg.id,
params: {
customBucket: aggs
.createAggConfig(
{
type: 'filter',
id: 'shift',
params: {
filter: {
language: 'lucene',
query: {
range: {
[aggs.timeFields![0]]: {
gte: moment(aggs.timeRange.from)
.subtract(shift || 0)
.toISOString(),
lte: moment(aggs.timeRange.to)
.subtract(shift || 0)
.toISOString(),
},
},
},
},
},
},
{
addToAggConfigs: false,
}
)
.serialize(),
customMetric: orderAgg.serialize(),
},
enabled: false,
},
{
addToAggConfigs: false,
}
);
}
if (orderAgg.type.name === 'count') {
order._count = dir;
return;
}

const orderAggPath = orderAgg.getValueBucketPath();

if (orderAgg.parentId && aggs) {
orderAgg = aggs.byId(orderAgg.parentId);
}

output.subAggs = (output.subAggs || []).concat(orderAgg);
order[orderAggPath] = dir;
},
};
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
mergeOtherBucketAggResponse,
updateMissingBucket,
OTHER_BUCKET_SEPARATOR as SEP,
constructSingleTermOtherFilter,
} from './_terms_other_bucket_helper';
import { AggConfigs, CreateAggConfigParams } from '../agg_configs';
import { BUCKET_TYPES } from './bucket_agg_types';
Expand Down Expand Up @@ -573,7 +574,8 @@ describe('Terms Agg Other bucket helper', () => {
singleTermResponse,
singleOtherResponse,
aggConfigs.aggs[0] as IBucketAggConfig,
otherAggConfig()
otherAggConfig(),
constructSingleTermOtherFilter
);
expect((mergedResponse!.aggregations!['1'] as any).buckets[3].key).toEqual('__other__');
}
Expand All @@ -594,7 +596,8 @@ describe('Terms Agg Other bucket helper', () => {
nestedTermResponse,
nestedOtherResponse,
aggConfigs.aggs[1] as IBucketAggConfig,
otherAggConfig()
otherAggConfig(),
constructSingleTermOtherFilter
);

expect((mergedResponse!.aggregations!['1'] as any).buckets[1]['2'].buckets[3].key).toEqual(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,18 @@
*/

import { isNumber, keys, values, find, each, cloneDeep, flatten } from 'lodash';
import { i18n } from '@kbn/i18n';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { buildExistsFilter, buildPhrasesFilter, buildQueryFromFilters } from '@kbn/es-query';
import {
buildExistsFilter,
buildPhrasesFilter,
buildQueryFromFilters,
Filter,
} from '@kbn/es-query';
import { AggGroupNames } from '../agg_groups';
import { IAggConfigs } from '../agg_configs';
import { IBucketAggConfig } from './bucket_agg_type';
import { IAggType } from '../agg_type';
import { IAggConfig } from '../agg_config';

export const OTHER_BUCKET_SEPARATOR = '╰┄►';

Expand Down Expand Up @@ -44,7 +51,7 @@ const getNestedAggDSL = (aggNestedDsl: Record<string, any>, startFromAggId: stri
const getAggResultBuckets = (
aggConfigs: IAggConfigs,
response: estypes.SearchResponse<any>['aggregations'],
aggWithOtherBucket: IBucketAggConfig,
aggWithOtherBucket: IAggConfig,
key: string
) => {
const keyParts = key.split(OTHER_BUCKET_SEPARATOR);
Expand Down Expand Up @@ -111,11 +118,7 @@ const getAggConfigResultMissingBuckets = (responseAggs: any, aggId: string) => {
* @param key: the key for this specific other bucket
* @param otherAgg: AggConfig of the aggregation with other bucket
*/
const getOtherAggTerms = (
requestAgg: Record<string, any>,
key: string,
otherAgg: IBucketAggConfig
) => {
const getOtherAggTerms = (requestAgg: Record<string, any>, key: string, otherAgg: IAggConfig) => {
return requestAgg['other-filter'].filters.filters[key].bool.must_not
.filter(
(filter: Record<string, any>) =>
Expand All @@ -126,7 +129,7 @@ const getOtherAggTerms = (

export const buildOtherBucketAgg = (
aggConfigs: IAggConfigs,
aggWithOtherBucket: IBucketAggConfig,
aggWithOtherBucket: IAggConfig,
response: any
) => {
const bucketAggs = aggConfigs.aggs.filter(
Expand Down Expand Up @@ -200,12 +203,16 @@ export const buildOtherBucketAgg = (
return;
}

const hasScriptedField = !!aggWithOtherBucket.params.field.scripted;
const hasScriptedField = !!aggWithOtherBucket.params.field?.scripted;
const hasMissingBucket = !!aggWithOtherBucket.params.missingBucket;
const hasMissingBucketKey = agg.buckets.some(
(bucket: { key: string }) => bucket.key === '__missing__'
);
if (!hasScriptedField && (!hasMissingBucket || hasMissingBucketKey)) {
if (
aggWithOtherBucket.params.field &&
!hasScriptedField &&
(!hasMissingBucket || hasMissingBucketKey)
) {
filters.push(
buildExistsFilter(
aggWithOtherBucket.params.field,
Expand All @@ -217,7 +224,7 @@ export const buildOtherBucketAgg = (
// create not filters for all the buckets
each(agg.buckets, (bucket) => {
if (bucket.key === '__missing__') return;
const filter = currentAgg.createFilter(bucket.key);
const filter = currentAgg.createFilter(currentAgg.getKey(bucket, bucket.key));
filter.meta.negate = true;
filters.push(filter);
});
Expand All @@ -244,8 +251,9 @@ export const mergeOtherBucketAggResponse = (
aggsConfig: IAggConfigs,
response: estypes.SearchResponse<any>,
otherResponse: any,
otherAgg: IBucketAggConfig,
requestAgg: Record<string, any>
otherAgg: IAggConfig,
requestAgg: Record<string, any>,
otherFilterBuilder: (requestAgg: Record<string, any>, key: string, otherAgg: IAggConfig) => Filter
): estypes.SearchResponse<any> => {
const updatedResponse = cloneDeep(response);
each(otherResponse.aggregations['other-filter'].buckets, (bucket, key) => {
Expand All @@ -257,15 +265,8 @@ export const mergeOtherBucketAggResponse = (
otherAgg,
bucketKey
);
const requestFilterTerms = getOtherAggTerms(requestAgg, key, otherAgg);

const phraseFilter = buildPhrasesFilter(
otherAgg.params.field,
requestFilterTerms,
otherAgg.aggConfigs.indexPattern
);
phraseFilter.meta.negate = true;
bucket.filters = [phraseFilter];
const otherFilter = otherFilterBuilder(requestAgg, key, otherAgg);
bucket.filters = [otherFilter];
bucket.key = '__other__';

if (
Expand All @@ -285,7 +286,7 @@ export const mergeOtherBucketAggResponse = (
export const updateMissingBucket = (
response: estypes.SearchResponse<any>,
aggConfigs: IAggConfigs,
agg: IBucketAggConfig
agg: IAggConfig
) => {
const updatedResponse = cloneDeep(response);
const aggResultBuckets = getAggConfigResultMissingBuckets(updatedResponse.aggregations, agg.id);
Expand All @@ -294,3 +295,84 @@ export const updateMissingBucket = (
});
return updatedResponse;
};

export function constructSingleTermOtherFilter(
requestAgg: Record<string, any>,
key: string,
otherAgg: IAggConfig
) {
const requestFilterTerms = getOtherAggTerms(requestAgg, key, otherAgg);

const phraseFilter = buildPhrasesFilter(
otherAgg.params.field,
requestFilterTerms,
otherAgg.aggConfigs.indexPattern
);
phraseFilter.meta.negate = true;
return phraseFilter;
}

export function constructMultiTermOtherFilter(
requestAgg: Record<string, any>,
key: string
): Filter {
return {
query: requestAgg['other-filter'].filters.filters[key],
meta: {},
};
}

export const createOtherBucketPostFlightRequest = (
otherFilterBuilder: (requestAgg: Record<string, any>, key: string, otherAgg: IAggConfig) => Filter
) => {
const postFlightRequest: IAggType['postFlightRequest'] = async (
resp,
aggConfigs,
aggConfig,
searchSource,
inspectorRequestAdapter,
abortSignal,
searchSessionId
) => {
if (!resp.aggregations) return resp;
const nestedSearchSource = searchSource.createChild();
if (aggConfig.params.otherBucket) {
const filterAgg = buildOtherBucketAgg(aggConfigs, aggConfig, resp);
if (!filterAgg) return resp;

nestedSearchSource.setField('aggs', filterAgg);

const { rawResponse: response } = await nestedSearchSource
.fetch$({
abortSignal,
sessionId: searchSessionId,
inspector: {
adapter: inspectorRequestAdapter,
title: i18n.translate('data.search.aggs.buckets.terms.otherBucketTitle', {
defaultMessage: 'Other bucket',
}),
description: i18n.translate('data.search.aggs.buckets.terms.otherBucketDescription', {
defaultMessage:
'This request counts the number of documents that fall ' +
'outside the criterion of the data buckets.',
}),
},
})
.toPromise();

resp = mergeOtherBucketAggResponse(
aggConfigs,
resp,
response,
aggConfig,
filterAgg(),
otherFilterBuilder
);
}
if (aggConfig.params.missingBucket) {
resp = updateMissingBucket(resp, aggConfigs, aggConfig);
}
return resp;
};
return postFlightRequest;
};
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export enum BUCKET_TYPES {
DATE_RANGE = 'date_range',
RANGE = 'range',
TERMS = 'terms',
MULTI_TERMS = 'multi_terms',
SIGNIFICANT_TERMS = 'significant_terms',
GEOHASH_GRID = 'geohash_grid',
GEOTILE_GRID = 'geotile_grid',
Expand Down
Loading

0 comments on commit 079db96

Please sign in to comment.