Skip to content

Commit

Permalink
[ML] Explain Log Rate Spikes: Make use of abort signal for ES queries. (
Browse files Browse the repository at this point in the history
#143683)

So far we passed on the abort signal from the client to possibly cancel the analysis, but the signal was not passed on to the ES queries to cancel those. That means the analysis could be cancelled after each step but it did not cancel ES queries that were already running. This PR takes the already existing abort signal and passes it on to all ES queries.

This surfaced an issue with running too many queries in parallel: We didn't have a limit so far when fetching the histogram data. With using the abort signals, Kibana would report a warning if more than 10 queries were run at once. The PR updates fetching histogram data to also do it in chunks of 10 queries like we already do for the p-value aggregation. So the larger bulk of the file diff is the result of wrapping the histogram queries inside a for of to iterate over the chunks of queries.
  • Loading branch information
walterra authored Oct 25, 2022
1 parent 13bb47a commit df9dbcd
Show file tree
Hide file tree
Showing 8 changed files with 324 additions and 198 deletions.
22 changes: 13 additions & 9 deletions x-pack/packages/ml/agg_utils/src/fetch_agg_intervals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ export const fetchAggIntervals = async (
query: estypes.QueryDslQueryContainer,
fields: HistogramField[],
samplerShardSize: number,
runtimeMappings?: estypes.MappingRuntimeFields
runtimeMappings?: estypes.MappingRuntimeFields,
abortSignal?: AbortSignal
): Promise<NumericColumnStatsMap> => {
const numericColumns = fields.filter((field) => {
return field.type === KBN_FIELD_TYPES.NUMBER || field.type === KBN_FIELD_TYPES.DATE;
Expand All @@ -49,16 +50,19 @@ export const fetchAggIntervals = async (
return aggs;
}, {} as Record<string, object>);

const body = await client.search({
index: indexPattern,
size: 0,
body: {
query,
aggs: buildSamplerAggregation(minMaxAggs, samplerShardSize),
const body = await client.search(
{
index: indexPattern,
size: 0,
...(isPopulatedObject(runtimeMappings) ? { runtime_mappings: runtimeMappings } : {}),
body: {
query,
aggs: buildSamplerAggregation(minMaxAggs, samplerShardSize),
size: 0,
...(isPopulatedObject(runtimeMappings) ? { runtime_mappings: runtimeMappings } : {}),
},
},
});
{ signal: abortSignal, maxRetries: 0 }
);

const aggsPath = getSamplerAggregationsResponsePath(samplerShardSize);
const aggregations = aggsPath.length > 0 ? get(body.aggregations, aggsPath) : body.aggregations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ export const fetchHistogramsForFields = async (
query: any,
fields: FieldsForHistograms,
samplerShardSize: number,
runtimeMappings?: estypes.MappingRuntimeFields
runtimeMappings?: estypes.MappingRuntimeFields,
abortSignal?: AbortSignal
) => {
const aggIntervals = {
...(await fetchAggIntervals(
Expand All @@ -155,7 +156,8 @@ export const fetchHistogramsForFields = async (
query,
fields.filter((f) => !isNumericHistogramFieldWithColumnStats(f)),
samplerShardSize,
runtimeMappings
runtimeMappings,
abortSignal
)),
...fields.filter(isNumericHistogramFieldWithColumnStats).reduce((p, field) => {
const { interval, min, max, fieldName } = field;
Expand Down Expand Up @@ -209,7 +211,7 @@ export const fetchHistogramsForFields = async (
...(isPopulatedObject(runtimeMappings) ? { runtime_mappings: runtimeMappings } : {}),
},
},
{ maxRetries: 0 }
{ signal: abortSignal, maxRetries: 0 }
);

const aggsPath = getSamplerAggregationsResponsePath(samplerShardSize);
Expand Down
23 changes: 23 additions & 0 deletions x-pack/plugins/aiops/server/lib/is_request_aborted_error.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { isRequestAbortedError } from './is_request_aborted_error';

describe('isRequestAbortedError', () => {
it('returns false for a string', () => {
expect(isRequestAbortedError('the-error')).toBe(false);
});
it('returns false for a an object without a name field', () => {
expect(isRequestAbortedError({ error: 'the-error' })).toBe(false);
});
it(`returns false for a an object with a name field other than 'RequestAbortedError'`, () => {
expect(isRequestAbortedError({ name: 'the-error' })).toBe(false);
});
it(`returns true for a an object with a name field that contains 'RequestAbortedError'`, () => {
expect(isRequestAbortedError({ name: 'RequestAbortedError' })).toBe(true);
});
});
16 changes: 16 additions & 0 deletions x-pack/plugins/aiops/server/lib/is_request_aborted_error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { isPopulatedObject } from '@kbn/ml-is-populated-object';

interface RequestAbortedError extends Error {
name: 'RequestAbortedError';
}

export function isRequestAbortedError(arg: unknown): arg is RequestAbortedError {
return isPopulatedObject(arg, ['name']) && arg.name === 'RequestAbortedError';
}
Loading

0 comments on commit df9dbcd

Please sign in to comment.