Skip to content

Commit

Permalink
[ML] New Platform server shim: update results service routes to use n…
Browse files Browse the repository at this point in the history
…ew platform router (elastic#56886)

* migrate resultsService routes to NP. begin conversion of model file to TS

* add schema validation to routes

* add types to results service model file

* add docs for routes

* update route description and add routes to doc json file
  • Loading branch information
alvarezmelissa87 committed Feb 6, 2020
1 parent f6913a1 commit ff85c7d
Show file tree
Hide file tree
Showing 8 changed files with 356 additions and 167 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { AnomalyRecordDoc } from '../../../common/types/anomalies';

export interface AnomaliesTableRecord {
time: number;
source: AnomalyRecordDoc;
rowId: string;
jobId: string;
detectorIndex: number;
severity: number;
entityName?: string;
entityValue?: any;
influencers?: Array<{ [key: string]: any }>;
actual?: number[];
actualSort?: any;
typical?: number[];
typicalSort?: any;
metricDescriptionSort?: number;
}

export function buildAnomalyTableItems(
anomalyRecords: any,
aggregationInterval: any,
dateFormatTz: string
): AnomaliesTableRecord[];
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@
import Boom from 'boom';
import { ML_RESULTS_INDEX_PATTERN } from '../../../common/constants/index_patterns';
import { callWithRequestType } from '../../../common/types/kibana';

interface CriteriaField {
fieldName: string;
fieldValue: any;
}
import { CriteriaField } from './results_service';

const PARTITION_FIELDS = ['partition_field', 'over_field', 'by_field'] as const;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,55 @@

import _ from 'lodash';
import moment from 'moment';

import { buildAnomalyTableItems } from './build_anomaly_table_items';
import { SearchResponse } from 'elasticsearch';
import { RequestHandlerContext } from 'kibana/server';
import { buildAnomalyTableItems, AnomaliesTableRecord } from './build_anomaly_table_items';
import { ML_RESULTS_INDEX_PATTERN } from '../../../common/constants/index_patterns';
import { ANOMALIES_TABLE_DEFAULT_QUERY_SIZE } from '../../../common/constants/search';
import { getPartitionFieldsValuesFactory } from './get_partition_fields_values';
import { AnomalyRecordDoc } from '../../../common/types/anomalies';

// Service for carrying out Elasticsearch queries to obtain data for the
// ML Results dashboards.

const DEFAULT_MAX_EXAMPLES = 500;

export function resultsServiceProvider(callWithRequest) {
export interface CriteriaField {
fieldType?: string;
fieldName: string;
fieldValue: any;
}

interface Influencer {
fieldName: string;
fieldValue: any;
}

export function resultsServiceProvider(client: RequestHandlerContext | (() => any)) {
const callAsCurrentUser =
typeof client === 'object' ? client.ml!.mlClient.callAsCurrentUser : client;
// Obtains data for the anomalies table, aggregating anomalies by day or hour as requested.
// Return an Object with properties 'anomalies' and 'interval' (interval used to aggregate anomalies,
// one of day, hour or second. Note 'auto' can be provided as the aggregationInterval in the request,
// in which case the interval is determined according to the time span between the first and
// last anomalies), plus an examplesByJobId property if any of the
// anomalies are categorization anomalies in mlcategory.
async function getAnomaliesTableData(
jobIds,
criteriaFields,
influencers,
aggregationInterval,
threshold,
earliestMs,
latestMs,
dateFormatTz,
maxRecords = ANOMALIES_TABLE_DEFAULT_QUERY_SIZE,
maxExamples = DEFAULT_MAX_EXAMPLES,
influencersFilterQuery
jobIds: string[],
criteriaFields: CriteriaField[],
influencers: Influencer[],
aggregationInterval: string,
threshold: number,
earliestMs: number,
latestMs: number,
dateFormatTz: string,
maxRecords: number = ANOMALIES_TABLE_DEFAULT_QUERY_SIZE,
maxExamples: number = DEFAULT_MAX_EXAMPLES,
influencersFilterQuery: any
) {
// Build the query to return the matching anomaly record results.
// Add criteria for the time range, record score, plus any specified job IDs.
const boolCriteria = [
const boolCriteria: object[] = [
{
range: {
timestamp: {
Expand Down Expand Up @@ -120,7 +135,7 @@ export function resultsServiceProvider(callWithRequest) {
});
}

const resp = await callWithRequest('search', {
const resp: SearchResponse<any> = await callAsCurrentUser('search', {
index: ML_RESULTS_INDEX_PATTERN,
rest_total_hits_as_int: true,
size: maxRecords,
Expand All @@ -146,9 +161,16 @@ export function resultsServiceProvider(callWithRequest) {
},
});

const tableData = { anomalies: [], interval: 'second' };
const tableData: {
anomalies: AnomaliesTableRecord[];
interval: string;
examplesByJobId?: { [key: string]: any };
} = {
anomalies: [],
interval: 'second',
};
if (resp.hits.total !== 0) {
let records = [];
let records: AnomalyRecordDoc[] = [];
resp.hits.hits.forEach(hit => {
records.push(hit._source);
});
Expand All @@ -169,12 +191,12 @@ export function resultsServiceProvider(callWithRequest) {

// Load examples for any categorization anomalies.
const categoryAnomalies = tableData.anomalies.filter(
item => item.entityName === 'mlcategory'
(item: any) => item.entityName === 'mlcategory'
);
if (categoryAnomalies.length > 0) {
tableData.examplesByJobId = {};

const categoryIdsByJobId = {};
const categoryIdsByJobId: { [key: string]: any } = {};
categoryAnomalies.forEach(anomaly => {
if (!_.has(categoryIdsByJobId, anomaly.jobId)) {
categoryIdsByJobId[anomaly.jobId] = [];
Expand All @@ -192,7 +214,9 @@ export function resultsServiceProvider(callWithRequest) {
categoryIdsByJobId[jobId],
maxExamples
);
tableData.examplesByJobId[jobId] = examplesByCategoryId;
if (tableData.examplesByJobId !== undefined) {
tableData.examplesByJobId[jobId] = examplesByCategoryId;
}
})
);
}
Expand All @@ -202,10 +226,10 @@ export function resultsServiceProvider(callWithRequest) {
}

// Returns the maximum anomaly_score for result_type:bucket over jobIds for the interval passed in
async function getMaxAnomalyScore(jobIds = [], earliestMs, latestMs) {
async function getMaxAnomalyScore(jobIds: string[] = [], earliestMs: number, latestMs: number) {
// Build the criteria to use in the bool filter part of the request.
// Adds criteria for the time range plus any specified job IDs.
const boolCriteria = [
const boolCriteria: object[] = [
{
range: {
timestamp: {
Expand Down Expand Up @@ -265,7 +289,7 @@ export function resultsServiceProvider(callWithRequest) {
},
};

const resp = await callWithRequest('search', query);
const resp = await callAsCurrentUser('search', query);
const maxScore = _.get(resp, ['aggregations', 'max_score', 'value'], null);

return { maxScore };
Expand All @@ -275,7 +299,7 @@ export function resultsServiceProvider(callWithRequest) {
// Returns data over all jobs unless an optional list of job IDs of interest is supplied.
// Returned response consists of latest bucket timestamps (ms since Jan 1 1970) against job ID
async function getLatestBucketTimestampByJob(jobIds = []) {
const filter = [
const filter: object[] = [
{
term: {
result_type: 'bucket',
Expand Down Expand Up @@ -303,7 +327,7 @@ export function resultsServiceProvider(callWithRequest) {
// Size of job terms agg, consistent with maximum number of jobs supported by Java endpoints.
const maxJobs = 10000;

const resp = await callWithRequest('search', {
const resp = await callAsCurrentUser('search', {
index: ML_RESULTS_INDEX_PATTERN,
size: 0,
body: {
Expand All @@ -330,8 +354,12 @@ export function resultsServiceProvider(callWithRequest) {
},
});

const bucketsByJobId = _.get(resp, ['aggregations', 'byJobId', 'buckets'], []);
const timestampByJobId = {};
const bucketsByJobId: Array<{ key: string; maxTimestamp: { value?: number } }> = _.get(
resp,
['aggregations', 'byJobId', 'buckets'],
[]
);
const timestampByJobId: { [key: string]: number | undefined } = {};
bucketsByJobId.forEach(bucket => {
timestampByJobId[bucket.key] = bucket.maxTimestamp.value;
});
Expand All @@ -342,8 +370,8 @@ export function resultsServiceProvider(callWithRequest) {
// Obtains the categorization examples for the categories with the specified IDs
// from the given index and job ID.
// Returned response consists of a list of examples against category ID.
async function getCategoryExamples(jobId, categoryIds, maxExamples) {
const resp = await callWithRequest('search', {
async function getCategoryExamples(jobId: string, categoryIds: any, maxExamples: number) {
const resp = await callAsCurrentUser('search', {
index: ML_RESULTS_INDEX_PATTERN,
rest_total_hits_as_int: true,
size: ANOMALIES_TABLE_DEFAULT_QUERY_SIZE, // Matches size of records in anomaly summary table.
Expand All @@ -356,9 +384,9 @@ export function resultsServiceProvider(callWithRequest) {
},
});

const examplesByCategoryId = {};
const examplesByCategoryId: { [key: string]: any } = {};
if (resp.hits.total !== 0) {
resp.hits.hits.forEach(hit => {
resp.hits.hits.forEach((hit: any) => {
if (maxExamples) {
examplesByCategoryId[hit._source.category_id] = _.slice(
hit._source.examples,
Expand All @@ -377,8 +405,8 @@ export function resultsServiceProvider(callWithRequest) {
// Obtains the definition of the category with the specified ID and job ID.
// Returned response contains four properties - categoryId, regex, examples
// and terms (space delimited String of the common tokens matched in values of the category).
async function getCategoryDefinition(jobId, categoryId) {
const resp = await callWithRequest('search', {
async function getCategoryDefinition(jobId: string, categoryId: string) {
const resp = await callAsCurrentUser('search', {
index: ML_RESULTS_INDEX_PATTERN,
rest_total_hits_as_int: true,
size: 1,
Expand Down Expand Up @@ -409,6 +437,6 @@ export function resultsServiceProvider(callWithRequest) {
getCategoryExamples,
getLatestBucketTimestampByJob,
getMaxAnomalyScore,
getPartitionFieldsValues: getPartitionFieldsValuesFactory(callWithRequest),
getPartitionFieldsValues: getPartitionFieldsValuesFactory(callAsCurrentUser),
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { schema } from '@kbn/config-schema';

const criteriaFieldSchema = schema.object({
fieldType: schema.maybe(schema.string()),
fieldName: schema.string(),
fieldValue: schema.any(),
});

export const anomaliesTableDataSchema = {
jobIds: schema.arrayOf(schema.string()),
criteriaFields: schema.arrayOf(criteriaFieldSchema),
influencers: schema.arrayOf(schema.maybe(schema.string())),
aggregationInterval: schema.string(),
threshold: schema.number(),
earliestMs: schema.number(),
latestMs: schema.number(),
dateFormatTz: schema.string(),
maxRecords: schema.number(),
maxExamples: schema.maybe(schema.number()),
influencersFilterQuery: schema.maybe(schema.any()),
};

export const partitionFieldValuesSchema = {
jobId: schema.string(),
searchTerm: schema.maybe(schema.any()),
criteriaFields: schema.arrayOf(criteriaFieldSchema),
earliestMs: schema.number(),
latestMs: schema.number(),
};
8 changes: 7 additions & 1 deletion x-pack/legacy/plugins/ml/server/routes/apidoc.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@
"ValidateAnomalyDetector",
"ForecastAnomalyDetector",
"GetOverallBuckets",
"GetCategories"
"GetCategories",
"ResultsService",
"GetAnomaliesTableData",
"GetCategoryDefinition",
"GetMaxAnomalyScore",
"GetCategoryExamples",
"GetPartitionFieldsValues"
]
}
Loading

0 comments on commit ff85c7d

Please sign in to comment.