Skip to content

Commit

Permalink
[ML] APM Latency Correlations: Field/value candidates prioritization (#…
Browse files Browse the repository at this point in the history
…107370)

- Makes sure fields defined in `FIELDS_TO_ADD_AS_CANDIDATE` and prefixed with one of `FIELD_PREFIX_TO_ADD_AS_CANDIDATE` get queried first when retrieving the `correlation` and `ks-test` value.
- Correctly consider the `includeFrozen` parameter.
- The bulk of the PR is a refactor:
  - Moves `query_*` files to `queries` directory
  - Introduces `asyncSearchServiceStateProvider` to manage the state of the async search service in isolation so that we no longer mutate individual vars or plain objects.
  - Introduces `asyncSearchServiceLogProvider` and extends the log to not only store messages but original error messages retrieved from ES too.
  - Refactors some more functions in separate files and adds unit tests.
  - Removes some deprecated code no longer needed.
  • Loading branch information
walterra authored Aug 11, 2021
1 parent a444d8a commit 86c17da
Show file tree
Hide file tree
Showing 44 changed files with 1,089 additions and 418 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export interface SearchServiceParams {

export interface SearchServiceFetchParams extends SearchServiceParams {
index: string;
includeFrozen?: boolean;
}

export interface SearchServiceValue {
Expand All @@ -50,5 +51,4 @@ export interface AsyncSearchProviderProgress {
loadedFieldCanditates: number;
loadedFieldValuePairs: number;
loadedHistograms: number;
getOverallProgress: () => number;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,76 +5,45 @@
* 2.0.
*/

import { shuffle, range } from 'lodash';
import { range } from 'lodash';
import type { ElasticsearchClient } from 'src/core/server';
import type { ApmIndicesConfig } from '../../settings/apm_indices/get_apm_indices';
import { fetchTransactionDurationFieldCandidates } from './query_field_candidates';
import { fetchTransactionDurationFieldValuePairs } from './query_field_value_pairs';
import { fetchTransactionDurationPercentiles } from './query_percentiles';
import { fetchTransactionDurationCorrelation } from './query_correlation';
import { fetchTransactionDurationHistogramRangeSteps } from './query_histogram_range_steps';
import { fetchTransactionDurationRanges, HistogramItem } from './query_ranges';
import type {
AsyncSearchProviderProgress,
SearchServiceParams,
SearchServiceFetchParams,
SearchServiceValue,
} from '../../../../common/search_strategies/correlations/types';
import { computeExpectationsAndRanges } from './utils/aggregation_utils';
import { fetchTransactionDurationFractions } from './query_fractions';

const CORRELATION_THRESHOLD = 0.3;
const KS_TEST_THRESHOLD = 0.1;

const currentTimeAsString = () => new Date().toISOString();
import type { ApmIndicesConfig } from '../../settings/apm_indices/get_apm_indices';
import {
fetchTransactionDurationFieldCandidates,
fetchTransactionDurationFieldValuePairs,
fetchTransactionDurationFractions,
fetchTransactionDurationPercentiles,
fetchTransactionDurationHistograms,
fetchTransactionDurationHistogramRangeSteps,
fetchTransactionDurationRanges,
} from './queries';
import { computeExpectationsAndRanges } from './utils';
import { asyncSearchServiceLogProvider } from './async_search_service_log';
import { asyncSearchServiceStateProvider } from './async_search_service_state';

export const asyncSearchServiceProvider = (
esClient: ElasticsearchClient,
getApmIndices: () => Promise<ApmIndicesConfig>,
searchServiceParams: SearchServiceParams,
includeFrozen: boolean
) => {
let isCancelled = false;
let isRunning = true;
let error: Error;
let ccsWarning = false;
const log: string[] = [];
const logMessage = (message: string) =>
log.push(`${currentTimeAsString()}: ${message}`);

const progress: AsyncSearchProviderProgress = {
started: Date.now(),
loadedHistogramStepsize: 0,
loadedOverallHistogram: 0,
loadedFieldCanditates: 0,
loadedFieldValuePairs: 0,
loadedHistograms: 0,
getOverallProgress: () =>
progress.loadedHistogramStepsize * 0.025 +
progress.loadedOverallHistogram * 0.025 +
progress.loadedFieldCanditates * 0.025 +
progress.loadedFieldValuePairs * 0.025 +
progress.loadedHistograms * 0.9,
};
const { addLogMessage, getLogMessages } = asyncSearchServiceLogProvider();

const values: SearchServiceValue[] = [];
let overallHistogram: HistogramItem[] | undefined;
const state = asyncSearchServiceStateProvider();

let percentileThresholdValue: number;

const cancel = () => {
logMessage(`Service cancelled.`);
isCancelled = true;
};

const fetchCorrelations = async () => {
async function fetchCorrelations() {
let params: SearchServiceFetchParams | undefined;

try {
const indices = await getApmIndices();
params = {
...searchServiceParams,
index: indices['apm_oss.transactionIndices'],
includeFrozen,
};

// 95th percentile to be displayed as a marker in the log log chart
Expand All @@ -86,37 +55,40 @@ export const asyncSearchServiceProvider = (
params,
params.percentileThreshold ? [params.percentileThreshold] : undefined
);
percentileThresholdValue =
const percentileThresholdValue =
percentileThreshold[`${params.percentileThreshold}.0`];
state.setPercentileThresholdValue(percentileThresholdValue);

logMessage(
addLogMessage(
`Fetched ${params.percentileThreshold}th percentile value of ${percentileThresholdValue} based on ${totalDocs} documents.`
);

// finish early if we weren't able to identify the percentileThresholdValue.
if (percentileThresholdValue === undefined) {
logMessage(
addLogMessage(
`Abort service since percentileThresholdValue could not be determined.`
);
progress.loadedHistogramStepsize = 1;
progress.loadedOverallHistogram = 1;
progress.loadedFieldCanditates = 1;
progress.loadedFieldValuePairs = 1;
progress.loadedHistograms = 1;
isRunning = false;
state.setProgress({
loadedHistogramStepsize: 1,
loadedOverallHistogram: 1,
loadedFieldCanditates: 1,
loadedFieldValuePairs: 1,
loadedHistograms: 1,
});
state.setIsRunning(false);
return;
}

const histogramRangeSteps = await fetchTransactionDurationHistogramRangeSteps(
esClient,
params
);
progress.loadedHistogramStepsize = 1;
state.setProgress({ loadedHistogramStepsize: 1 });

logMessage(`Loaded histogram range steps.`);
addLogMessage(`Loaded histogram range steps.`);

if (isCancelled) {
isRunning = false;
if (state.getIsCancelled()) {
state.setIsRunning(false);
return;
}

Expand All @@ -125,13 +97,13 @@ export const asyncSearchServiceProvider = (
params,
histogramRangeSteps
);
progress.loadedOverallHistogram = 1;
overallHistogram = overallLogHistogramChartData;
state.setProgress({ loadedOverallHistogram: 1 });
state.setOverallHistogram(overallLogHistogramChartData);

logMessage(`Loaded overall histogram chart data.`);
addLogMessage(`Loaded overall histogram chart data.`);

if (isCancelled) {
isRunning = false;
if (state.getIsCancelled()) {
state.setIsRunning(false);
return;
}

Expand All @@ -142,10 +114,10 @@ export const asyncSearchServiceProvider = (
} = await fetchTransactionDurationPercentiles(esClient, params, percents);
const percentiles = Object.values(percentilesRecords);

logMessage(`Loaded percentiles.`);
addLogMessage(`Loaded percentiles.`);

if (isCancelled) {
isRunning = false;
if (state.getIsCancelled()) {
state.setIsRunning(false);
return;
}

Expand All @@ -154,21 +126,22 @@ export const asyncSearchServiceProvider = (
params
);

logMessage(`Identified ${fieldCandidates.length} fieldCandidates.`);
addLogMessage(`Identified ${fieldCandidates.length} fieldCandidates.`);

progress.loadedFieldCanditates = 1;
state.setProgress({ loadedFieldCanditates: 1 });

const fieldValuePairs = await fetchTransactionDurationFieldValuePairs(
esClient,
params,
fieldCandidates,
progress
state,
addLogMessage
);

logMessage(`Identified ${fieldValuePairs.length} fieldValuePairs.`);
addLogMessage(`Identified ${fieldValuePairs.length} fieldValuePairs.`);

if (isCancelled) {
isRunning = false;
if (state.getIsCancelled()) {
state.setIsRunning(false);
return;
}

Expand All @@ -181,114 +154,75 @@ export const asyncSearchServiceProvider = (
totalDocCount,
} = await fetchTransactionDurationFractions(esClient, params, ranges);

logMessage(`Loaded fractions and totalDocCount of ${totalDocCount}.`);

async function* fetchTransactionDurationHistograms() {
for (const item of shuffle(fieldValuePairs)) {
if (params === undefined || item === undefined || isCancelled) {
isRunning = false;
return;
}

// If one of the fields have an error
// We don't want to stop the whole process
try {
const {
correlation,
ksTest,
} = await fetchTransactionDurationCorrelation(
esClient,
params,
expectations,
ranges,
fractions,
totalDocCount,
item.field,
item.value
);

if (isCancelled) {
isRunning = false;
return;
}

if (
correlation !== null &&
correlation > CORRELATION_THRESHOLD &&
ksTest !== null &&
ksTest < KS_TEST_THRESHOLD
) {
const logHistogram = await fetchTransactionDurationRanges(
esClient,
params,
histogramRangeSteps,
item.field,
item.value
);
yield {
...item,
correlation,
ksTest,
histogram: logHistogram,
};
} else {
yield undefined;
}
} catch (e) {
// don't fail the whole process for individual correlation queries,
// just add the error to the internal log and check if we'd want to set the
// cross-cluster search compatibility warning to true.
logMessage(
`Failed to fetch correlation/kstest for '${item.field}/${item.value}'`
);
if (params?.index.includes(':')) {
ccsWarning = true;
}
yield undefined;
}
}
}
addLogMessage(`Loaded fractions and totalDocCount of ${totalDocCount}.`);

let loadedHistograms = 0;
for await (const item of fetchTransactionDurationHistograms()) {
for await (const item of fetchTransactionDurationHistograms(
esClient,
addLogMessage,
params,
state,
expectations,
ranges,
fractions,
histogramRangeSteps,
totalDocCount,
fieldValuePairs
)) {
if (item !== undefined) {
values.push(item);
state.addValue(item);
}
loadedHistograms++;
progress.loadedHistograms = loadedHistograms / fieldValuePairs.length;
state.setProgress({
loadedHistograms: loadedHistograms / fieldValuePairs.length,
});
}

logMessage(
`Identified ${values.length} significant correlations out of ${fieldValuePairs.length} field/value pairs.`
addLogMessage(
`Identified ${
state.getState().values.length
} significant correlations out of ${
fieldValuePairs.length
} field/value pairs.`
);
} catch (e) {
error = e;
state.setError(e);
}

if (error !== undefined && params?.index.includes(':')) {
ccsWarning = true;
if (state.getState().error !== undefined && params?.index.includes(':')) {
state.setCcsWarning(true);
}

isRunning = false;
};
state.setIsRunning(false);
}

fetchCorrelations();

return () => {
const sortedValues = values.sort((a, b) => b.correlation - a.correlation);
const {
ccsWarning,
error,
isRunning,
overallHistogram,
percentileThresholdValue,
progress,
} = state.getState();

return {
ccsWarning,
error,
log,
log: getLogMessages(),
isRunning,
loaded: Math.round(progress.getOverallProgress() * 100),
loaded: Math.round(state.getOverallProgress() * 100),
overallHistogram,
started: progress.started,
total: 100,
values: sortedValues,
values: state.getValuesSortedByCorrelation(),
percentileThresholdValue,
cancel,
cancel: () => {
addLogMessage(`Service cancelled.`);
state.setIsCancelled(true);
},
};
};
};
Loading

0 comments on commit 86c17da

Please sign in to comment.