Skip to content

Commit

Permalink
Merge branch 'main' into feature-fleet-server-host-ui
Browse files Browse the repository at this point in the history
  • Loading branch information
kibanamachine authored Oct 28, 2022
2 parents cde1379 + 31fc33a commit 234bb69
Show file tree
Hide file tree
Showing 27 changed files with 235 additions and 109 deletions.
30 changes: 22 additions & 8 deletions x-pack/plugins/aiops/server/routes/explain_log_rate_spikes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import { isRequestAbortedError } from '../lib/is_request_aborted_error';
import type { AiopsLicense } from '../types';

import { fetchChangePointPValues } from './queries/fetch_change_point_p_values';
import { fetchFieldCandidates } from './queries/fetch_field_candidates';
import { fetchIndexInfo } from './queries/fetch_index_info';
import {
dropDuplicates,
fetchFrequentItems,
Expand Down Expand Up @@ -168,32 +168,44 @@ export const defineExplainLogRateSpikesRoute = (
logDebugMessage('Reset.');
push(resetAction());
pushPingWithTimeout();
logDebugMessage('Load field candidates.');

// Step 1: Index Info: Field candidates, total doc count, sample probability

const fieldCandidates: Awaited<ReturnType<typeof fetchIndexInfo>>['fieldCandidates'] = [];
let sampleProbability = 1;
let totalDocCount = 0;

logDebugMessage('Fetch index information.');
push(
updateLoadingStateAction({
ccsWarning: false,
loaded,
loadingState: i18n.translate(
'xpack.aiops.explainLogRateSpikes.loadingState.loadingFieldCandidates',
'xpack.aiops.explainLogRateSpikes.loadingState.loadingIndexInformation',
{
defaultMessage: 'Loading field candidates.',
defaultMessage: 'Loading index information.',
}
),
})
);

let fieldCandidates: Awaited<ReturnType<typeof fetchFieldCandidates>>;
try {
fieldCandidates = await fetchFieldCandidates(client, request.body, abortSignal);
const indexInfo = await fetchIndexInfo(client, request.body, abortSignal);
fieldCandidates.push(...indexInfo.fieldCandidates);
sampleProbability = indexInfo.sampleProbability;
totalDocCount = indexInfo.totalDocCount;
} catch (e) {
if (!isRequestAbortedError(e)) {
logger.error(`Failed to fetch field candidates, got: \n${e.toString()}`);
pushError(`Failed to fetch field candidates.`);
logger.error(`Failed to fetch index information, got: \n${e.toString()}`);
pushError(`Failed to fetch index information.`);
}
end();
return;
}

logDebugMessage(`Total document count: ${totalDocCount}`);
logDebugMessage(`Sample probability: ${sampleProbability}`);

loaded += LOADED_FIELD_CANDIDATES;

push(
Expand Down Expand Up @@ -245,6 +257,7 @@ export const defineExplainLogRateSpikesRoute = (
request.body,
fieldCandidatesChunk,
logger,
sampleProbability,
pushError,
abortSignal
);
Expand Down Expand Up @@ -396,6 +409,7 @@ export const defineExplainLogRateSpikesRoute = (
request.body.deviationMin,
request.body.deviationMax,
logger,
sampleProbability,
pushError,
abortSignal
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { ElasticsearchClient } from '@kbn/core/server';

import type { Logger } from '@kbn/logging';
import { ChangePoint } from '@kbn/ml-agg-utils';
import { isPopulatedObject } from '@kbn/ml-is-populated-object';
import { SPIKE_ANALYSIS_THRESHOLD } from '../../../common/constants';
import type { AiopsExplainLogRateSpikesSchema } from '../../../common/api/explain_log_rate_spikes';

Expand All @@ -23,7 +24,9 @@ import { getRequestBase } from './get_request_base';

export const getChangePointRequest = (
params: AiopsExplainLogRateSpikesSchema,
fieldName: string
fieldName: string,
// The default value of 1 means no sampling will be used
sampleProbability: number = 1
): estypes.SearchRequest => {
const query = getQueryWithParams({
params,
Expand All @@ -50,36 +53,49 @@ export const getChangePointRequest = (
];
}

const body = {
query,
size: 0,
aggs: {
change_point_p_value: {
significant_terms: {
field: fieldName,
background_filter: {
bool: {
filter: [
...filter,
{
range: {
[timeFieldName]: {
gte: params.baselineMin,
lt: params.baselineMax,
format: 'epoch_millis',
},
const pValueAgg: Record<string, estypes.AggregationsAggregationContainer> = {
change_point_p_value: {
significant_terms: {
field: fieldName,
background_filter: {
bool: {
filter: [
...filter,
{
range: {
[timeFieldName]: {
gte: params.baselineMin,
lt: params.baselineMax,
format: 'epoch_millis',
},
},
],
},
},
],
},
p_value: { background_is_superset: false },
size: 1000,
},
// @ts-expect-error `p_value` is not yet part of `AggregationsAggregationContainer`
p_value: { background_is_superset: false },
size: 1000,
},
},
};

const randomSamplerAgg: Record<string, estypes.AggregationsAggregationContainer> = {
sample: {
// @ts-expect-error `random_sampler` is not yet part of `AggregationsAggregationContainer`
random_sampler: {
probability: sampleProbability,
},
aggs: pValueAgg,
},
};

const body = {
query,
size: 0,
aggs: sampleProbability < 1 ? randomSamplerAgg : pValueAgg,
};

return {
...getRequestBase(params),
body,
Expand All @@ -92,24 +108,35 @@ interface Aggs extends estypes.AggregationsSignificantLongTermsAggregate {
buckets: estypes.AggregationsSignificantLongTermsBucket[];
}

interface PValuesAggregation extends estypes.AggregationsSamplerAggregation {
change_point_p_value: Aggs;
}

interface RandomSamplerAggregation {
sample: PValuesAggregation;
}

function isRandomSamplerAggregation(arg: unknown): arg is RandomSamplerAggregation {
return isPopulatedObject(arg, ['sample']);
}

export const fetchChangePointPValues = async (
esClient: ElasticsearchClient,
params: AiopsExplainLogRateSpikesSchema,
fieldNames: string[],
logger: Logger,
// The default value of 1 means no sampling will be used
sampleProbability: number = 1,
emitError: (m: string) => void,
abortSignal?: AbortSignal
): Promise<ChangePoint[]> => {
const result: ChangePoint[] = [];

const settledPromises = await Promise.allSettled(
fieldNames.map((fieldName) =>
esClient.search<unknown, { change_point_p_value: Aggs }>(
getChangePointRequest(params, fieldName),
{
signal: abortSignal,
maxRetries: 0,
}
esClient.search<unknown, { sample: PValuesAggregation } | { change_point_p_value: Aggs }>(
getChangePointRequest(params, fieldName, sampleProbability),
{ signal: abortSignal, maxRetries: 0 }
)
)
);
Expand Down Expand Up @@ -144,7 +171,9 @@ export const fetchChangePointPValues = async (
continue;
}

const overallResult = resp.aggregations.change_point_p_value;
const overallResult = isRandomSamplerAggregation(resp.aggregations)
? resp.aggregations.sample.change_point_p_value
: resp.aggregations.change_point_p_value;

for (const bucket of overallResult.buckets) {
const pValue = Math.exp(-bucket.score);
Expand Down
61 changes: 36 additions & 25 deletions x-pack/plugins/aiops/server/routes/queries/fetch_frequent_items.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { Logger } from '@kbn/logging';
import type { ChangePoint, FieldValuePair } from '@kbn/ml-agg-utils';
import { isPopulatedObject } from '@kbn/ml-is-populated-object';

const FREQUENT_ITEMS_FIELDS_LIMIT = 15;

Expand All @@ -21,6 +22,14 @@ interface FrequentItemsAggregation extends estypes.AggregationsSamplerAggregatio
};
}

interface RandomSamplerAggregation {
sample: FrequentItemsAggregation;
}

function isRandomSamplerAggregation(arg: unknown): arg is RandomSamplerAggregation {
return isPopulatedObject(arg, ['sample']);
}

export function dropDuplicates(cps: ChangePoint[], uniqueFields: Array<keyof ChangePoint>) {
return uniqWith(cps, (a, b) => isEqual(pick(a, uniqueFields), pick(b, uniqueFields)));
}
Expand Down Expand Up @@ -58,6 +67,8 @@ export async function fetchFrequentItems(
deviationMin: number,
deviationMax: number,
logger: Logger,
// The default value of 1 means no sampling will be used
sampleProbability: number = 1,
emitError: (m: string) => void,
abortSignal?: AbortSignal
) {
Expand Down Expand Up @@ -98,44 +109,40 @@ export async function fetchFrequentItems(
field,
}));

const totalDocCount = changePoints[0].total_doc_count;
const minDocCount = 50000;
let sampleProbability = 1;

if (totalDocCount > minDocCount) {
sampleProbability = Math.min(0.5, minDocCount / totalDocCount);
}

logger.debug(`frequent_items sample probability: ${sampleProbability}`);
const frequentItemsAgg: Record<string, estypes.AggregationsAggregationContainer> = {
fi: {
// @ts-expect-error `frequent_items` is not yet part of `AggregationsAggregationContainer`
frequent_items: {
minimum_set_size: 2,
size: 200,
minimum_support: 0.1,
fields: aggFields,
},
},
};

// frequent items can be slow, so sample and use 10% min_support
const aggs: Record<string, estypes.AggregationsAggregationContainer> = {
const randomSamplerAgg: Record<string, estypes.AggregationsAggregationContainer> = {
sample: {
// @ts-expect-error `random_sampler` is not yet part of `AggregationsAggregationContainer`
random_sampler: {
probability: sampleProbability,
},
aggs: {
fi: {
// @ts-expect-error `frequent_items` is not yet part of `AggregationsAggregationContainer`
frequent_items: {
minimum_set_size: 2,
size: 200,
minimum_support: 0.1,
fields: aggFields,
},
},
},
aggs: frequentItemsAgg,
},
};

const esBody = {
query,
aggs,
aggs: sampleProbability < 1 ? randomSamplerAgg : frequentItemsAgg,
size: 0,
track_total_hits: true,
};

const body = await client.search<unknown, { sample: FrequentItemsAggregation }>(
const body = await client.search<
unknown,
{ sample: FrequentItemsAggregation } | FrequentItemsAggregation
>(
{
index,
size: 0,
Expand All @@ -156,13 +163,17 @@ export async function fetchFrequentItems(

const totalDocCountFi = (body.hits.total as estypes.SearchTotalHits).value;

const shape = body.aggregations.sample.fi.buckets.length;
const frequentItems = isRandomSamplerAggregation(body.aggregations)
? body.aggregations.sample.fi
: body.aggregations.fi;

const shape = frequentItems.buckets.length;
let maximum = shape;
if (maximum > 50000) {
maximum = 50000;
}

const fiss = body.aggregations.sample.fi.buckets;
const fiss = frequentItems.buckets;
fiss.length = maximum;

const results: ItemsetResult[] = [];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import type { ElasticsearchClient } from '@kbn/core/server';

import type { AiopsExplainLogRateSpikesSchema } from '../../../common/api/explain_log_rate_spikes';

import { fetchFieldCandidates, getRandomDocsRequest } from './fetch_field_candidates';
import { fetchIndexInfo, getRandomDocsRequest } from './fetch_index_info';

const params: AiopsExplainLogRateSpikesSchema = {
index: 'the-index',
Expand All @@ -26,7 +26,7 @@ const params: AiopsExplainLogRateSpikesSchema = {
searchQuery: '{"bool":{"filter":[],"must":[{"match_all":{}}],"must_not":[]}}',
};

describe('query_field_candidates', () => {
describe('fetch_index_info', () => {
describe('getRandomDocsRequest', () => {
it('returns the most basic request body for a sample of random documents', () => {
const req = getRandomDocsRequest(params);
Expand Down Expand Up @@ -57,6 +57,7 @@ describe('query_field_candidates', () => {
},
},
size: 1000,
track_total_hits: true,
},
index: params.index,
ignore_throttled: undefined,
Expand Down Expand Up @@ -87,6 +88,7 @@ describe('query_field_candidates', () => {
},
},
],
total: { value: 5000000 },
},
} as unknown as estypes.SearchResponse;
});
Expand All @@ -96,9 +98,14 @@ describe('query_field_candidates', () => {
search: esClientSearchMock,
} as unknown as ElasticsearchClient;

const resp = await fetchFieldCandidates(esClientMock, params);
const { totalDocCount, sampleProbability, fieldCandidates } = await fetchIndexInfo(
esClientMock,
params
);

expect(resp).toEqual(['myIpFieldName', 'myKeywordFieldName']);
expect(fieldCandidates).toEqual(['myIpFieldName', 'myKeywordFieldName']);
expect(sampleProbability).toEqual(0.01);
expect(totalDocCount).toEqual(5000000);
expect(esClientFieldCapsMock).toHaveBeenCalledTimes(1);
expect(esClientSearchMock).toHaveBeenCalledTimes(1);
});
Expand Down
Loading

0 comments on commit 234bb69

Please sign in to comment.