From 4b617042f3155ec2d63cb6e97b0e11062fd18f83 Mon Sep 17 00:00:00 2001 From: Walter Rafelsberger Date: Fri, 4 Nov 2022 16:19:08 +0100 Subject: [PATCH] [ML] Explain Log Rate Spikes: Replace chunks of queries with concurrent queue. (#144220) The queries for p-values and histograms were done in chunks of 10 parallel queries. The drawback with this approach was that if just one of these 10 queries was a lot slower, we'd still have to wait for it to finish before we could start the next chunk. This PR replaces the chunking approach with an async concurrent queue of up to 10 queries. The difference is that as soon as the first of the 10 first queries finishes, we can start another query and don't have to wait for the slower ones to finish. For this PR the `async` library is added to `package.json`, however it's not a completely new library being added since it was already used as a dependency of other packages we use in Kibana. --- package.json | 2 + .../server/routes/explain_log_rate_spikes.ts | 341 +++++++++--------- .../apis/aiops/explain_log_rate_spikes.ts | 4 +- yarn.lock | 5 + 4 files changed, 180 insertions(+), 172 deletions(-) diff --git a/package.json b/package.json index e95bd6e7d8116..1be185c0ea385 100644 --- a/package.json +++ b/package.json @@ -459,6 +459,7 @@ "adm-zip": "^0.5.9", "antlr4ts": "^0.5.0-alpha.3", "archiver": "^5.3.1", + "async": "^3.2.3", "axios": "^0.27.2", "base64-js": "^1.3.1", "bitmap-sdf": "^1.0.3", @@ -804,6 +805,7 @@ "@testing-library/user-event": "^13.5.0", "@types/apidoc": "^0.22.3", "@types/archiver": "^5.3.1", + "@types/async": "^3.2.3", "@types/babel__core": "^7.1.19", "@types/babel__generator": "^7.6.4", "@types/babel__helper-plugin-utils": "^7.10.0", diff --git a/x-pack/plugins/aiops/server/routes/explain_log_rate_spikes.ts b/x-pack/plugins/aiops/server/routes/explain_log_rate_spikes.ts index 1e1c85d5e4b30..9324b70d7225b 100644 --- a/x-pack/plugins/aiops/server/routes/explain_log_rate_spikes.ts +++ b/x-pack/plugins/aiops/server/routes/explain_log_rate_spikes.ts @@ -5,18 +5,22 @@ * 2.0. */ -import { chunk } from 'lodash'; +import { queue } from 'async'; import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import { i18n } from '@kbn/i18n'; -import { asyncForEach } from '@kbn/std'; import type { IRouter } from '@kbn/core/server'; import { KBN_FIELD_TYPES } from '@kbn/field-types'; import type { Logger } from '@kbn/logging'; import type { DataRequestHandlerContext } from '@kbn/data-plugin/server'; import { streamFactory } from '@kbn/aiops-utils'; -import type { ChangePoint, NumericChartData, NumericHistogramField } from '@kbn/ml-agg-utils'; +import type { + ChangePoint, + ChangePointGroup, + NumericChartData, + NumericHistogramField, +} from '@kbn/ml-agg-utils'; import { fetchHistogramsForFields } from '@kbn/ml-agg-utils'; import { stringHash } from '@kbn/ml-string-hash'; @@ -209,6 +213,8 @@ export const defineExplainLogRateSpikesRoute = ( loaded += LOADED_FIELD_CANDIDATES; + const fieldCandidatesCount = fieldCandidates.length; + push( updateLoadingStateAction({ ccsWarning: false, @@ -219,14 +225,14 @@ export const defineExplainLogRateSpikesRoute = ( defaultMessage: 'Identified {fieldCandidatesCount, plural, one {# field candidate} other {# field candidates}}.', values: { - fieldCandidatesCount: fieldCandidates.length, + fieldCandidatesCount, }, } ), }) ); - if (fieldCandidates.length === 0) { + if (fieldCandidatesCount === 0) { endWithUpdatedLoadingState(); } else if (shouldStop) { logDebugMessage('shouldStop after fetching field candidates.'); @@ -239,24 +245,20 @@ export const defineExplainLogRateSpikesRoute = ( // Don't use more than 10 here otherwise Kibana will emit an error // regarding a limit of abort signal listeners of more than 10. - const CHUNK_SIZE = 10; - let chunkCount = 0; - - const fieldCandidatesChunks = chunk(fieldCandidates, CHUNK_SIZE); + const MAX_CONCURRENT_QUERIES = 10; logDebugMessage('Fetch p-values.'); - for (const fieldCandidatesChunk of fieldCandidatesChunks) { - chunkCount++; - logDebugMessage( - `Fetch p-values. Chunk ${chunkCount} of ${fieldCandidatesChunks.length}` - ); + const pValuesQueue = queue(async function (fieldCandidate: string) { + loaded += (1 / fieldCandidatesCount) * PROGRESS_STEP_P_VALUES; + let pValues: Awaited>; + try { pValues = await fetchChangePointPValues( client, request.body, - fieldCandidatesChunk, + [fieldCandidate], logger, sampleProbability, pushError, @@ -265,13 +267,11 @@ export const defineExplainLogRateSpikesRoute = ( } catch (e) { if (!isRequestAbortedError(e)) { logger.error( - `Failed to fetch p-values for ${JSON.stringify( - fieldCandidatesChunk - )}, got: \n${e.toString()}` + `Failed to fetch p-values for '${fieldCandidate}', got: \n${e.toString()}` ); - pushError(`Failed to fetch p-values for ${JSON.stringify(fieldCandidatesChunk)}.`); - } // Still continue the analysis even if chunks of p-value queries fail. - continue; + pushError(`Failed to fetch p-values for '${fieldCandidate}'.`); + } + return; } if (pValues.length > 0) { @@ -279,12 +279,10 @@ export const defineExplainLogRateSpikesRoute = ( fieldsToSample.add(d.fieldName); }); changePoints.push(...pValues); - } - loaded += (1 / fieldCandidatesChunks.length) * PROGRESS_STEP_P_VALUES; - if (pValues.length > 0) { push(addChangePointsAction(pValues)); } + push( updateLoadingStateAction({ ccsWarning: false, @@ -304,10 +302,13 @@ export const defineExplainLogRateSpikesRoute = ( if (shouldStop) { logDebugMessage('shouldStop fetching p-values.'); + pValuesQueue.kill(); end(); - return; } - } + }, MAX_CONCURRENT_QUERIES); + + pValuesQueue.push(fieldCandidates); + await pValuesQueue.drain(); if (changePoints.length === 0) { logDebugMessage('Stopping analysis, did not find change points.'); @@ -572,84 +573,84 @@ export const defineExplainLogRateSpikesRoute = ( logDebugMessage(`Fetch ${changePointGroups.length} group histograms.`); - const changePointGroupsChunks = chunk(changePointGroups, CHUNK_SIZE); - - for (const changePointGroupsChunk of changePointGroupsChunks) { + const groupHistogramQueue = queue(async function (cpg: ChangePointGroup) { if (shouldStop) { logDebugMessage('shouldStop abort fetching group histograms.'); + groupHistogramQueue.kill(); end(); return; } - await asyncForEach(changePointGroupsChunk, async (cpg) => { - if (overallTimeSeries !== undefined) { - const histogramQuery = getHistogramQuery( - request.body, - cpg.group.map((d) => ({ - term: { [d.fieldName]: d.fieldValue }, - })) - ); + if (overallTimeSeries !== undefined) { + const histogramQuery = getHistogramQuery( + request.body, + cpg.group.map((d) => ({ + term: { [d.fieldName]: d.fieldValue }, + })) + ); - let cpgTimeSeries: NumericChartData; - try { - cpgTimeSeries = ( - (await fetchHistogramsForFields( - client, - request.body.index, - histogramQuery, - // fields - [ - { - fieldName: request.body.timeFieldName, - type: KBN_FIELD_TYPES.DATE, - interval: overallTimeSeries.interval, - min: overallTimeSeries.stats[0], - max: overallTimeSeries.stats[1], - }, - ], - // samplerShardSize - -1, - undefined, - abortSignal - )) as [NumericChartData] - )[0]; - } catch (e) { - if (!isRequestAbortedError(e)) { - logger.error( - `Failed to fetch the histogram data for group #${ - cpg.id - }, got: \n${e.toString()}` - ); - pushError(`Failed to fetch the histogram data for group #${cpg.id}.`); - } - return; + let cpgTimeSeries: NumericChartData; + try { + cpgTimeSeries = ( + (await fetchHistogramsForFields( + client, + request.body.index, + histogramQuery, + // fields + [ + { + fieldName: request.body.timeFieldName, + type: KBN_FIELD_TYPES.DATE, + interval: overallTimeSeries.interval, + min: overallTimeSeries.stats[0], + max: overallTimeSeries.stats[1], + }, + ], + // samplerShardSize + -1, + undefined, + abortSignal + )) as [NumericChartData] + )[0]; + } catch (e) { + if (!isRequestAbortedError(e)) { + logger.error( + `Failed to fetch the histogram data for group #${ + cpg.id + }, got: \n${e.toString()}` + ); + pushError(`Failed to fetch the histogram data for group #${cpg.id}.`); } - const histogram = - overallTimeSeries.data.map((o, i) => { - const current = cpgTimeSeries.data.find( - (d1) => d1.key_as_string === o.key_as_string - ) ?? { - doc_count: 0, - }; - return { - key: o.key, - key_as_string: o.key_as_string ?? '', - doc_count_change_point: current.doc_count, - doc_count_overall: Math.max(0, o.doc_count - current.doc_count), - }; - }) ?? []; - - push( - addChangePointsGroupHistogramAction([ - { - id: cpg.id, - histogram, - }, - ]) - ); + return; } - }); - } + const histogram = + overallTimeSeries.data.map((o, i) => { + const current = cpgTimeSeries.data.find( + (d1) => d1.key_as_string === o.key_as_string + ) ?? { + doc_count: 0, + }; + return { + key: o.key, + key_as_string: o.key_as_string ?? '', + doc_count_change_point: current.doc_count, + doc_count_overall: Math.max(0, o.doc_count - current.doc_count), + }; + }) ?? []; + + push( + addChangePointsGroupHistogramAction([ + { + id: cpg.id, + histogram, + }, + ]) + ); + } + }, MAX_CONCURRENT_QUERIES); + + groupHistogramQueue.push(changePointGroups); + await groupHistogramQueue.drain(); } } catch (e) { if (!isRequestAbortedError(e)) { @@ -667,90 +668,90 @@ export const defineExplainLogRateSpikesRoute = ( // time series filtered by fields if (changePoints.length > 0 && overallTimeSeries !== undefined) { - const changePointsChunks = chunk(changePoints, CHUNK_SIZE); - - for (const changePointsChunk of changePointsChunks) { + const fieldValueHistogramQueue = queue(async function (cp: ChangePoint) { if (shouldStop) { logDebugMessage('shouldStop abort fetching field/value histograms.'); + fieldValueHistogramQueue.kill(); end(); return; } - await asyncForEach(changePointsChunk, async (cp) => { - if (overallTimeSeries !== undefined) { - const histogramQuery = getHistogramQuery(request.body, [ + if (overallTimeSeries !== undefined) { + const histogramQuery = getHistogramQuery(request.body, [ + { + term: { [cp.fieldName]: cp.fieldValue }, + }, + ]); + + let cpTimeSeries: NumericChartData; + + try { + cpTimeSeries = ( + (await fetchHistogramsForFields( + client, + request.body.index, + histogramQuery, + // fields + [ + { + fieldName: request.body.timeFieldName, + type: KBN_FIELD_TYPES.DATE, + interval: overallTimeSeries.interval, + min: overallTimeSeries.stats[0], + max: overallTimeSeries.stats[1], + }, + ], + // samplerShardSize + -1, + undefined, + abortSignal + )) as [NumericChartData] + )[0]; + } catch (e) { + logger.error( + `Failed to fetch the histogram data for field/value pair "${cp.fieldName}:${ + cp.fieldValue + }", got: \n${e.toString()}` + ); + pushError( + `Failed to fetch the histogram data for field/value pair "${cp.fieldName}:${cp.fieldValue}".` + ); + return; + } + + const histogram = + overallTimeSeries.data.map((o, i) => { + const current = cpTimeSeries.data.find( + (d1) => d1.key_as_string === o.key_as_string + ) ?? { + doc_count: 0, + }; + return { + key: o.key, + key_as_string: o.key_as_string ?? '', + doc_count_change_point: current.doc_count, + doc_count_overall: Math.max(0, o.doc_count - current.doc_count), + }; + }) ?? []; + + const { fieldName, fieldValue } = cp; + + loaded += (1 / changePoints.length) * PROGRESS_STEP_HISTOGRAMS; + pushHistogramDataLoadingState(); + push( + addChangePointsHistogramAction([ { - term: { [cp.fieldName]: cp.fieldValue }, + fieldName, + fieldValue, + histogram, }, - ]); - - let cpTimeSeries: NumericChartData; - - try { - cpTimeSeries = ( - (await fetchHistogramsForFields( - client, - request.body.index, - histogramQuery, - // fields - [ - { - fieldName: request.body.timeFieldName, - type: KBN_FIELD_TYPES.DATE, - interval: overallTimeSeries.interval, - min: overallTimeSeries.stats[0], - max: overallTimeSeries.stats[1], - }, - ], - // samplerShardSize - -1, - undefined, - abortSignal - )) as [NumericChartData] - )[0]; - } catch (e) { - logger.error( - `Failed to fetch the histogram data for field/value pair "${cp.fieldName}:${ - cp.fieldValue - }", got: \n${e.toString()}` - ); - pushError( - `Failed to fetch the histogram data for field/value pair "${cp.fieldName}:${cp.fieldValue}".` - ); - return; - } + ]) + ); + } + }, MAX_CONCURRENT_QUERIES); - const histogram = - overallTimeSeries.data.map((o, i) => { - const current = cpTimeSeries.data.find( - (d1) => d1.key_as_string === o.key_as_string - ) ?? { - doc_count: 0, - }; - return { - key: o.key, - key_as_string: o.key_as_string ?? '', - doc_count_change_point: current.doc_count, - doc_count_overall: Math.max(0, o.doc_count - current.doc_count), - }; - }) ?? []; - - const { fieldName, fieldValue } = cp; - - loaded += (1 / changePoints.length) * PROGRESS_STEP_HISTOGRAMS; - pushHistogramDataLoadingState(); - push( - addChangePointsHistogramAction([ - { - fieldName, - fieldValue, - histogram, - }, - ]) - ); - } - }); - } + fieldValueHistogramQueue.push(changePoints); + await fieldValueHistogramQueue.drain(); } endWithUpdatedLoadingState(); diff --git a/x-pack/test/api_integration/apis/aiops/explain_log_rate_spikes.ts b/x-pack/test/api_integration/apis/aiops/explain_log_rate_spikes.ts index 58fe4a3c67fd3..79c5be4a93a34 100644 --- a/x-pack/test/api_integration/apis/aiops/explain_log_rate_spikes.ts +++ b/x-pack/test/api_integration/apis/aiops/explain_log_rate_spikes.ts @@ -34,8 +34,8 @@ export default ({ getService }: FtrProviderContext) => { }; const expected = { - chunksLength: 13, - actionsLength: 12, + chunksLength: 34, + actionsLength: 33, noIndexChunksLength: 4, noIndexActionsLength: 3, changePointFilter: 'add_change_points', diff --git a/yarn.lock b/yarn.lock index ca156c451e3c2..cbed8a4c3432c 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6140,6 +6140,11 @@ resolved "https://registry.yarnpkg.com/@types/aria-query/-/aria-query-4.2.0.tgz#14264692a9d6e2fa4db3df5e56e94b5e25647ac0" integrity sha512-iIgQNzCm0v7QMhhe4Jjn9uRh+I6GoPmt03CbEtwx3ao8/EfoQcmgtqH4vQ5Db/lxiIGaWDv6nwvunuh0RyX0+A== +"@types/async@^3.2.3": + version "3.2.15" + resolved "https://registry.yarnpkg.com/@types/async/-/async-3.2.15.tgz#26d4768fdda0e466f18d6c9918ca28cc89a4e1fe" + integrity sha512-PAmPfzvFA31mRoqZyTVsgJMsvbynR429UTTxhmfsUCrWGh3/fxOrzqBtaTPJsn4UtzTv4Vb0+/O7CARWb69N4g== + "@types/babel__core@*", "@types/babel__core@^7.1.14", "@types/babel__core@^7.1.19": version "7.1.19" resolved "https://registry.yarnpkg.com/@types/babel__core/-/babel__core-7.1.19.tgz#7b497495b7d1b4812bdb9d02804d0576f43ee460"