diff --git a/x-pack/packages/ml/agg_utils/src/fetch_agg_intervals.ts b/x-pack/packages/ml/agg_utils/src/fetch_agg_intervals.ts index 338f55ad754c2..7986747d34dd3 100644 --- a/x-pack/packages/ml/agg_utils/src/fetch_agg_intervals.ts +++ b/x-pack/packages/ml/agg_utils/src/fetch_agg_intervals.ts @@ -29,7 +29,8 @@ export const fetchAggIntervals = async ( query: estypes.QueryDslQueryContainer, fields: HistogramField[], samplerShardSize: number, - runtimeMappings?: estypes.MappingRuntimeFields + runtimeMappings?: estypes.MappingRuntimeFields, + abortSignal?: AbortSignal ): Promise => { const numericColumns = fields.filter((field) => { return field.type === KBN_FIELD_TYPES.NUMBER || field.type === KBN_FIELD_TYPES.DATE; @@ -49,16 +50,19 @@ export const fetchAggIntervals = async ( return aggs; }, {} as Record); - const body = await client.search({ - index: indexPattern, - size: 0, - body: { - query, - aggs: buildSamplerAggregation(minMaxAggs, samplerShardSize), + const body = await client.search( + { + index: indexPattern, size: 0, - ...(isPopulatedObject(runtimeMappings) ? { runtime_mappings: runtimeMappings } : {}), + body: { + query, + aggs: buildSamplerAggregation(minMaxAggs, samplerShardSize), + size: 0, + ...(isPopulatedObject(runtimeMappings) ? { runtime_mappings: runtimeMappings } : {}), + }, }, - }); + { signal: abortSignal, maxRetries: 0 } + ); const aggsPath = getSamplerAggregationsResponsePath(samplerShardSize); const aggregations = aggsPath.length > 0 ? get(body.aggregations, aggsPath) : body.aggregations; diff --git a/x-pack/packages/ml/agg_utils/src/fetch_histograms_for_fields.ts b/x-pack/packages/ml/agg_utils/src/fetch_histograms_for_fields.ts index a921eaeae370b..70d5f6360155d 100644 --- a/x-pack/packages/ml/agg_utils/src/fetch_histograms_for_fields.ts +++ b/x-pack/packages/ml/agg_utils/src/fetch_histograms_for_fields.ts @@ -146,7 +146,8 @@ export const fetchHistogramsForFields = async ( query: any, fields: FieldsForHistograms, samplerShardSize: number, - runtimeMappings?: estypes.MappingRuntimeFields + runtimeMappings?: estypes.MappingRuntimeFields, + abortSignal?: AbortSignal ) => { const aggIntervals = { ...(await fetchAggIntervals( @@ -155,7 +156,8 @@ export const fetchHistogramsForFields = async ( query, fields.filter((f) => !isNumericHistogramFieldWithColumnStats(f)), samplerShardSize, - runtimeMappings + runtimeMappings, + abortSignal )), ...fields.filter(isNumericHistogramFieldWithColumnStats).reduce((p, field) => { const { interval, min, max, fieldName } = field; @@ -209,7 +211,7 @@ export const fetchHistogramsForFields = async ( ...(isPopulatedObject(runtimeMappings) ? { runtime_mappings: runtimeMappings } : {}), }, }, - { maxRetries: 0 } + { signal: abortSignal, maxRetries: 0 } ); const aggsPath = getSamplerAggregationsResponsePath(samplerShardSize); diff --git a/x-pack/plugins/aiops/server/lib/is_request_aborted_error.test.ts b/x-pack/plugins/aiops/server/lib/is_request_aborted_error.test.ts new file mode 100644 index 0000000000000..ee9725dedda4b --- /dev/null +++ b/x-pack/plugins/aiops/server/lib/is_request_aborted_error.test.ts @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { isRequestAbortedError } from './is_request_aborted_error'; + +describe('isRequestAbortedError', () => { + it('returns false for a string', () => { + expect(isRequestAbortedError('the-error')).toBe(false); + }); + it('returns false for a an object without a name field', () => { + expect(isRequestAbortedError({ error: 'the-error' })).toBe(false); + }); + it(`returns false for a an object with a name field other than 'RequestAbortedError'`, () => { + expect(isRequestAbortedError({ name: 'the-error' })).toBe(false); + }); + it(`returns true for a an object with a name field that contains 'RequestAbortedError'`, () => { + expect(isRequestAbortedError({ name: 'RequestAbortedError' })).toBe(true); + }); +}); diff --git a/x-pack/plugins/aiops/server/lib/is_request_aborted_error.ts b/x-pack/plugins/aiops/server/lib/is_request_aborted_error.ts new file mode 100644 index 0000000000000..b80256dcd8639 --- /dev/null +++ b/x-pack/plugins/aiops/server/lib/is_request_aborted_error.ts @@ -0,0 +1,16 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { isPopulatedObject } from '@kbn/ml-is-populated-object'; + +interface RequestAbortedError extends Error { + name: 'RequestAbortedError'; +} + +export function isRequestAbortedError(arg: unknown): arg is RequestAbortedError { + return isPopulatedObject(arg, ['name']) && arg.name === 'RequestAbortedError'; +} diff --git a/x-pack/plugins/aiops/server/routes/explain_log_rate_spikes.ts b/x-pack/plugins/aiops/server/routes/explain_log_rate_spikes.ts index c61e1915f68f7..2468df9df8237 100644 --- a/x-pack/plugins/aiops/server/routes/explain_log_rate_spikes.ts +++ b/x-pack/plugins/aiops/server/routes/explain_log_rate_spikes.ts @@ -34,6 +34,7 @@ import { } from '../../common/api/explain_log_rate_spikes'; import { API_ENDPOINT } from '../../common/api'; +import { isRequestAbortedError } from '../lib/is_request_aborted_error'; import type { AiopsLicense } from '../types'; import { fetchChangePointPValues } from './queries/fetch_change_point_p_values'; @@ -92,6 +93,7 @@ export const defineExplainLogRateSpikesRoute = ( const client = (await context.core).elasticsearch.client.asCurrentUser; const controller = new AbortController(); + const abortSignal = controller.signal; let isRunning = false; let loaded = 0; @@ -129,9 +131,13 @@ export const defineExplainLogRateSpikesRoute = ( } function end() { - isRunning = false; - logDebugMessage('Ending analysis.'); - streamEnd(); + if (isRunning) { + isRunning = false; + logDebugMessage('Ending analysis.'); + streamEnd(); + } else { + logDebugMessage('end() was called again with isRunning already being false.'); + } } function endWithUpdatedLoadingState() { @@ -178,10 +184,12 @@ export const defineExplainLogRateSpikesRoute = ( let fieldCandidates: Awaited>; try { - fieldCandidates = await fetchFieldCandidates(client, request.body); + fieldCandidates = await fetchFieldCandidates(client, request.body, abortSignal); } catch (e) { - logger.error(`Failed to fetch field candidates, got: \n${e.toString()}`); - pushError(`Failed to fetch field candidates.`); + if (!isRequestAbortedError(e)) { + logger.error(`Failed to fetch field candidates, got: \n${e.toString()}`); + pushError(`Failed to fetch field candidates.`); + } end(); return; } @@ -208,16 +216,20 @@ export const defineExplainLogRateSpikesRoute = ( if (fieldCandidates.length === 0) { endWithUpdatedLoadingState(); } else if (shouldStop) { + logDebugMessage('shouldStop after fetching field candidates.'); end(); return; } const changePoints: ChangePoint[] = []; const fieldsToSample = new Set(); - const chunkSize = 10; + + // Don't use more than 10 here otherwise Kibana will emit an error + // regarding a limit of abort signal listeners of more than 10. + const CHUNK_SIZE = 10; let chunkCount = 0; - const fieldCandidatesChunks = chunk(fieldCandidates, chunkSize); + const fieldCandidatesChunks = chunk(fieldCandidates, CHUNK_SIZE); logDebugMessage('Fetch p-values.'); @@ -233,16 +245,18 @@ export const defineExplainLogRateSpikesRoute = ( request.body, fieldCandidatesChunk, logger, - pushError + pushError, + abortSignal ); } catch (e) { - logger.error( - `Failed to fetch p-values for ${JSON.stringify( - fieldCandidatesChunk - )}, got: \n${e.toString()}` - ); - pushError(`Failed to fetch p-values for ${JSON.stringify(fieldCandidatesChunk)}.`); - // Still continue the analysis even if chunks of p-value queries fail. + if (!isRequestAbortedError(e)) { + logger.error( + `Failed to fetch p-values for ${JSON.stringify( + fieldCandidatesChunk + )}, got: \n${e.toString()}` + ); + pushError(`Failed to fetch p-values for ${JSON.stringify(fieldCandidatesChunk)}.`); + } // Still continue the analysis even if chunks of p-value queries fail. continue; } @@ -267,7 +281,7 @@ export const defineExplainLogRateSpikesRoute = ( defaultMessage: 'Identified {fieldValuePairsCount, plural, one {# significant field/value pair} other {# significant field/value pairs}}.', values: { - fieldValuePairsCount: changePoints?.length ?? 0, + fieldValuePairsCount: changePoints.length, }, } ), @@ -276,13 +290,12 @@ export const defineExplainLogRateSpikesRoute = ( if (shouldStop) { logDebugMessage('shouldStop fetching p-values.'); - end(); return; } } - if (changePoints?.length === 0) { + if (changePoints.length === 0) { logDebugMessage('Stopping analysis, did not find change points.'); endWithUpdatedLoadingState(); return; @@ -305,12 +318,15 @@ export const defineExplainLogRateSpikesRoute = ( histogramFields, // samplerShardSize -1, - undefined + undefined, + abortSignal )) as [NumericChartData] )[0]; } catch (e) { - logger.error(`Failed to fetch the overall histogram data, got: \n${e.toString()}`); - pushError(`Failed to fetch overall histogram data.`); + if (!isRequestAbortedError(e)) { + logger.error(`Failed to fetch the overall histogram data, got: \n${e.toString()}`); + pushError(`Failed to fetch overall histogram data.`); + } // Still continue the analysis even if loading the overall histogram fails. } @@ -329,6 +345,12 @@ export const defineExplainLogRateSpikesRoute = ( ); } + if (shouldStop) { + logDebugMessage('shouldStop after fetching overall histogram.'); + end(); + return; + } + if (groupingEnabled) { logDebugMessage('Group results.'); @@ -374,9 +396,16 @@ export const defineExplainLogRateSpikesRoute = ( request.body.deviationMin, request.body.deviationMax, logger, - pushError + pushError, + abortSignal ); + if (shouldStop) { + logDebugMessage('shouldStop after fetching frequent_items.'); + end(); + return; + } + if (fields.length > 0 && df.length > 0) { // The way the `frequent_items` aggregations works could return item sets that include // field/value pairs that are not part of the original list of significant change points. @@ -517,172 +546,208 @@ export const defineExplainLogRateSpikesRoute = ( pushHistogramDataLoadingState(); - logDebugMessage('Fetch group histograms.'); + if (shouldStop) { + logDebugMessage('shouldStop after grouping.'); + end(); + return; + } - await asyncForEach(changePointGroups, async (cpg) => { - if (overallTimeSeries !== undefined) { - const histogramQuery = { - bool: { - filter: cpg.group.map((d) => ({ - term: { [d.fieldName]: d.fieldValue }, - })), - }, - }; + logDebugMessage(`Fetch ${changePointGroups.length} group histograms.`); - let cpgTimeSeries: NumericChartData; - try { - cpgTimeSeries = ( - (await fetchHistogramsForFields( - client, - request.body.index, - histogramQuery, - // fields - [ - { - fieldName: request.body.timeFieldName, - type: KBN_FIELD_TYPES.DATE, - interval: overallTimeSeries.interval, - min: overallTimeSeries.stats[0], - max: overallTimeSeries.stats[1], - }, - ], - // samplerShardSize - -1, - undefined - )) as [NumericChartData] - )[0]; - } catch (e) { - logger.error( - `Failed to fetch the histogram data for group #${ - cpg.id - }, got: \n${e.toString()}` - ); - pushError(`Failed to fetch the histogram data for group #${cpg.id}.`); - return; - } - const histogram = - overallTimeSeries.data.map((o, i) => { - const current = cpgTimeSeries.data.find( - (d1) => d1.key_as_string === o.key_as_string - ) ?? { - doc_count: 0, - }; - return { - key: o.key, - key_as_string: o.key_as_string ?? '', - doc_count_change_point: current.doc_count, - doc_count_overall: Math.max(0, o.doc_count - current.doc_count), - }; - }) ?? []; + const changePointGroupsChunks = chunk(changePointGroups, CHUNK_SIZE); - push( - addChangePointsGroupHistogramAction([ - { - id: cpg.id, - histogram, - }, - ]) - ); + for (const changePointGroupsChunk of changePointGroupsChunks) { + if (shouldStop) { + logDebugMessage('shouldStop abort fetching group histograms.'); + end(); + return; } - }); + + await asyncForEach(changePointGroupsChunk, async (cpg) => { + if (overallTimeSeries !== undefined) { + const histogramQuery = { + bool: { + filter: cpg.group.map((d) => ({ + term: { [d.fieldName]: d.fieldValue }, + })), + }, + }; + + let cpgTimeSeries: NumericChartData; + try { + cpgTimeSeries = ( + (await fetchHistogramsForFields( + client, + request.body.index, + histogramQuery, + // fields + [ + { + fieldName: request.body.timeFieldName, + type: KBN_FIELD_TYPES.DATE, + interval: overallTimeSeries.interval, + min: overallTimeSeries.stats[0], + max: overallTimeSeries.stats[1], + }, + ], + // samplerShardSize + -1, + undefined, + abortSignal + )) as [NumericChartData] + )[0]; + } catch (e) { + if (!isRequestAbortedError(e)) { + logger.error( + `Failed to fetch the histogram data for group #${ + cpg.id + }, got: \n${e.toString()}` + ); + pushError(`Failed to fetch the histogram data for group #${cpg.id}.`); + } + return; + } + const histogram = + overallTimeSeries.data.map((o, i) => { + const current = cpgTimeSeries.data.find( + (d1) => d1.key_as_string === o.key_as_string + ) ?? { + doc_count: 0, + }; + return { + key: o.key, + key_as_string: o.key_as_string ?? '', + doc_count_change_point: current.doc_count, + doc_count_overall: Math.max(0, o.doc_count - current.doc_count), + }; + }) ?? []; + + push( + addChangePointsGroupHistogramAction([ + { + id: cpg.id, + histogram, + }, + ]) + ); + } + }); + } } } catch (e) { - logger.error( - `Failed to transform field/value pairs into groups, got: \n${e.toString()}` - ); - pushError(`Failed to transform field/value pairs into groups.`); + if (!isRequestAbortedError(e)) { + logger.error( + `Failed to transform field/value pairs into groups, got: \n${e.toString()}` + ); + pushError(`Failed to transform field/value pairs into groups.`); + } } } loaded += PROGRESS_STEP_HISTOGRAMS_GROUPS; - logDebugMessage('Fetch field/value histograms.'); + logDebugMessage(`Fetch ${changePoints.length} field/value histograms.`); // time series filtered by fields - if (changePoints && overallTimeSeries !== undefined) { - await asyncForEach(changePoints, async (cp) => { - if (overallTimeSeries !== undefined) { - const histogramQuery = { - bool: { - filter: [ - { - term: { [cp.fieldName]: cp.fieldValue }, - }, - ], - }, - }; - - let cpTimeSeries: NumericChartData; - - try { - cpTimeSeries = ( - (await fetchHistogramsForFields( - client, - request.body.index, - histogramQuery, - // fields - [ + if (changePoints.length > 0 && overallTimeSeries !== undefined) { + const changePointsChunks = chunk(changePoints, CHUNK_SIZE); + + for (const changePointsChunk of changePointsChunks) { + if (shouldStop) { + logDebugMessage('shouldStop abort fetching field/value histograms.'); + end(); + return; + } + + await asyncForEach(changePointsChunk, async (cp) => { + if (overallTimeSeries !== undefined) { + const histogramQuery = { + bool: { + filter: [ { - fieldName: request.body.timeFieldName, - type: KBN_FIELD_TYPES.DATE, - interval: overallTimeSeries.interval, - min: overallTimeSeries.stats[0], - max: overallTimeSeries.stats[1], + term: { [cp.fieldName]: cp.fieldValue }, }, ], - // samplerShardSize - -1, - undefined - )) as [NumericChartData] - )[0]; - } catch (e) { - logger.error( - `Failed to fetch the histogram data for field/value pair "${cp.fieldName}:${ - cp.fieldValue - }", got: \n${e.toString()}` - ); - pushError( - `Failed to fetch the histogram data for field/value pair "${cp.fieldName}:${cp.fieldValue}".` + }, + }; + + let cpTimeSeries: NumericChartData; + + try { + cpTimeSeries = ( + (await fetchHistogramsForFields( + client, + request.body.index, + histogramQuery, + // fields + [ + { + fieldName: request.body.timeFieldName, + type: KBN_FIELD_TYPES.DATE, + interval: overallTimeSeries.interval, + min: overallTimeSeries.stats[0], + max: overallTimeSeries.stats[1], + }, + ], + // samplerShardSize + -1, + undefined, + abortSignal + )) as [NumericChartData] + )[0]; + } catch (e) { + logger.error( + `Failed to fetch the histogram data for field/value pair "${cp.fieldName}:${ + cp.fieldValue + }", got: \n${e.toString()}` + ); + pushError( + `Failed to fetch the histogram data for field/value pair "${cp.fieldName}:${cp.fieldValue}".` + ); + return; + } + + const histogram = + overallTimeSeries.data.map((o, i) => { + const current = cpTimeSeries.data.find( + (d1) => d1.key_as_string === o.key_as_string + ) ?? { + doc_count: 0, + }; + return { + key: o.key, + key_as_string: o.key_as_string ?? '', + doc_count_change_point: current.doc_count, + doc_count_overall: Math.max(0, o.doc_count - current.doc_count), + }; + }) ?? []; + + const { fieldName, fieldValue } = cp; + + loaded += (1 / changePoints.length) * PROGRESS_STEP_HISTOGRAMS; + pushHistogramDataLoadingState(); + push( + addChangePointsHistogramAction([ + { + fieldName, + fieldValue, + histogram, + }, + ]) ); - return; } - - const histogram = - overallTimeSeries.data.map((o, i) => { - const current = cpTimeSeries.data.find( - (d1) => d1.key_as_string === o.key_as_string - ) ?? { - doc_count: 0, - }; - return { - key: o.key, - key_as_string: o.key_as_string ?? '', - doc_count_change_point: current.doc_count, - doc_count_overall: Math.max(0, o.doc_count - current.doc_count), - }; - }) ?? []; - - const { fieldName, fieldValue } = cp; - - loaded += (1 / changePoints.length) * PROGRESS_STEP_HISTOGRAMS; - pushHistogramDataLoadingState(); - push( - addChangePointsHistogramAction([ - { - fieldName, - fieldValue, - histogram, - }, - ]) - ); - } - }); + }); + } } endWithUpdatedLoadingState(); } catch (e) { - logger.error(`Explain log rate spikes analysis failed to finish, got: \n${e.toString()}`); - pushError(`Explain log rate spikes analysis failed to finish.`); + if (!isRequestAbortedError(e)) { + logger.error( + `Explain log rate spikes analysis failed to finish, got: \n${e.toString()}` + ); + pushError(`Explain log rate spikes analysis failed to finish.`); + } end(); } } diff --git a/x-pack/plugins/aiops/server/routes/queries/fetch_change_point_p_values.ts b/x-pack/plugins/aiops/server/routes/queries/fetch_change_point_p_values.ts index e2440c792592e..08165db084670 100644 --- a/x-pack/plugins/aiops/server/routes/queries/fetch_change_point_p_values.ts +++ b/x-pack/plugins/aiops/server/routes/queries/fetch_change_point_p_values.ts @@ -13,6 +13,8 @@ import { ChangePoint } from '@kbn/ml-agg-utils'; import { SPIKE_ANALYSIS_THRESHOLD } from '../../../common/constants'; import type { AiopsExplainLogRateSpikesSchema } from '../../../common/api/explain_log_rate_spikes'; +import { isRequestAbortedError } from '../../lib/is_request_aborted_error'; + import { getQueryWithParams } from './get_query_with_params'; import { getRequestBase } from './get_request_base'; @@ -95,28 +97,34 @@ export const fetchChangePointPValues = async ( params: AiopsExplainLogRateSpikesSchema, fieldNames: string[], logger: Logger, - emitError: (m: string) => void + emitError: (m: string) => void, + abortSignal?: AbortSignal ): Promise => { const result: ChangePoint[] = []; const settledPromises = await Promise.allSettled( fieldNames.map((fieldName) => esClient.search( - getChangePointRequest(params, fieldName) + getChangePointRequest(params, fieldName), + { + signal: abortSignal, + maxRetries: 0, + } ) ) ); function reportError(fieldName: string, error: unknown) { - logger.error( - `Failed to fetch p-value aggregation for fieldName "${fieldName}", got: \n${JSON.stringify( - error, - null, - 2 - )}` - ); - emitError(`Failed to fetch p-value aggregation for fieldName "${fieldName}".`); - // Still continue the analysis even if individual p-value queries fail. + if (!isRequestAbortedError(error)) { + logger.error( + `Failed to fetch p-value aggregation for fieldName "${fieldName}", got: \n${JSON.stringify( + error, + null, + 2 + )}` + ); + emitError(`Failed to fetch p-value aggregation for fieldName "${fieldName}".`); + } } for (const [index, settledPromise] of settledPromises.entries()) { diff --git a/x-pack/plugins/aiops/server/routes/queries/fetch_field_candidates.ts b/x-pack/plugins/aiops/server/routes/queries/fetch_field_candidates.ts index 7a761d91c0da5..036d8c0f51fcf 100644 --- a/x-pack/plugins/aiops/server/routes/queries/fetch_field_candidates.ts +++ b/x-pack/plugins/aiops/server/routes/queries/fetch_field_candidates.ts @@ -47,14 +47,18 @@ export const getRandomDocsRequest = ( export const fetchFieldCandidates = async ( esClient: ElasticsearchClient, - params: AiopsExplainLogRateSpikesSchema + params: AiopsExplainLogRateSpikesSchema, + abortSignal?: AbortSignal ): Promise => { const { index } = params; // Get all supported fields - const respMapping = await esClient.fieldCaps({ - index, - fields: '*', - }); + const respMapping = await esClient.fieldCaps( + { + index, + fields: '*', + }, + { signal: abortSignal, maxRetries: 0 } + ); const finalFieldCandidates: Set = new Set([]); const acceptableFields: Set = new Set(); @@ -69,7 +73,10 @@ export const fetchFieldCandidates = async ( } }); - const resp = await esClient.search(getRandomDocsRequest(params)); + const resp = await esClient.search(getRandomDocsRequest(params), { + signal: abortSignal, + maxRetries: 0, + }); const sampledDocs = resp.hits.hits.map((d) => d.fields ?? {}); // Get all field names for each returned doc and flatten it diff --git a/x-pack/plugins/aiops/server/routes/queries/fetch_frequent_items.ts b/x-pack/plugins/aiops/server/routes/queries/fetch_frequent_items.ts index c9444aaca22af..aaf9af283c3e1 100644 --- a/x-pack/plugins/aiops/server/routes/queries/fetch_frequent_items.ts +++ b/x-pack/plugins/aiops/server/routes/queries/fetch_frequent_items.ts @@ -56,7 +56,8 @@ export async function fetchFrequentItems( deviationMin: number, deviationMax: number, logger: Logger, - emitError: (m: string) => void + emitError: (m: string) => void, + abortSignal?: AbortSignal ) { // get unique fields from change points const fields = [...new Set(changePoints.map((t) => t.fieldName))]; @@ -127,7 +128,7 @@ export async function fetchFrequentItems( track_total_hits: true, }, }, - { maxRetries: 0 } + { signal: abortSignal, maxRetries: 0 } ); if (body.aggregations === undefined) {