Skip to content

Commit

Permalink
Add ability to override datafeeds and job config for partition field
Browse files Browse the repository at this point in the history
  • Loading branch information
phillipb committed Sep 29, 2020
1 parent 4f6df62 commit f6fdbb4
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,20 @@ import {
bucketSpan,
partitionField,
} from '../../../../../common/infra_ml';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import MemoryJob from '../../../../../../ml/server/models/data_recognizer/modules/metrics_ui_hosts/ml/hosts_memory_usage.json';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import MemoryDatafeed from '../../../../../../ml/server/models/data_recognizer/modules/metrics_ui_hosts/ml/datafeed_hosts_memory_usage.json';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import NetworkInJob from '../../../../../../ml/server/models/data_recognizer/modules/metrics_ui_hosts/ml/hosts_network_in.json';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import NetworkInDatafeed from '../../../../../../ml/server/models/data_recognizer/modules/metrics_ui_hosts/ml/datafeed_hosts_network_in.json';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import NetworkOutJob from '../../../../../../ml/server/models/data_recognizer/modules/metrics_ui_hosts/ml/hosts_network_out.json';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import NetworkOutDatafeed from '../../../../../../ml/server/models/data_recognizer/modules/metrics_ui_hosts/ml/datafeed_hosts_network_out.json';

type JobType = 'hosts_memory_usage' | 'hosts_network_in' | 'hosts_network_out';
const moduleId = 'metrics_ui_hosts';
const moduleName = i18n.translate('xpack.infra.ml.metricsModuleName', {
defaultMessage: 'Metrics anomanly detection',
Expand Down Expand Up @@ -57,20 +70,66 @@ const setUpModule = async (
pField?: string
) => {
const indexNamePattern = indices.join(',');
const jobIds = ['hosts_memory_usage', 'hosts_network_in', 'hosts_network_out'];
const jobOverrides = jobIds.map((id) => ({
job_id: id,
data_description: {
time_field: timestampField,
},
custom_settings: {
metrics_source_config: {
indexPattern: indexNamePattern,
timestampField,
bucketSpan,
const jobIds: JobType[] = ['hosts_memory_usage', 'hosts_network_in', 'hosts_network_out'];

const jobOverrides = jobIds.map((id) => {
const { job: defaultJobConfig } = getDefaultJobConfigs(id);

// eslint-disable-next-line @typescript-eslint/naming-convention
const analysis_config = {
...defaultJobConfig.analysis_config,
};

if (pField) {
analysis_config.detectors[0].partition_field_name = pField;
if (analysis_config.influencers.indexOf(pField) === -1) {
analysis_config.influencers.push(pField);
}
}

return {
job_id: id,
data_description: {
time_field: timestampField,
},
},
}));
analysis_config,
custom_settings: {
metrics_source_config: {
indexPattern: indexNamePattern,
timestampField,
bucketSpan,
},
},
};
});

const datafeedOverrides = jobIds.map((id) => {
const { datafeed: defaultDatafeedConfig } = getDefaultJobConfigs(id);

if (!pField || id === 'hosts_memory_usage') {
// Since the host memory usage doesn't have custom aggs, we don't need to do anything to add a partition field
return defaultDatafeedConfig;
}

// If we have a partition field, we need to change the aggregation to do a terms agg at the top level
const aggregations = {
[pField]: {
terms: {
field: pField,
},
aggregations: {
...defaultDatafeedConfig.aggregations,
},
},
};

// console.log(JSON.stringify(aggregations, null, 2))
return {
...defaultDatafeedConfig,
job_id: id,
aggregations,
};
});

return callSetupMlModuleAPI(
moduleId,
Expand All @@ -80,10 +139,30 @@ const setUpModule = async (
sourceId,
indexNamePattern,
jobOverrides,
[]
datafeedOverrides
);
};

const getDefaultJobConfigs = (jobId: JobType) => {
switch (jobId) {
case 'hosts_memory_usage':
return {
datafeed: MemoryDatafeed,
job: MemoryJob,
};
case 'hosts_network_in':
return {
datafeed: NetworkInDatafeed,
job: NetworkInJob,
};
case 'hosts_network_out':
return {
datafeed: NetworkOutDatafeed,
job: NetworkOutJob,
};
}
};

const cleanUpModule = async (spaceId: string, sourceId: string) => {
return await cleanUpJobsAndDatafeeds(spaceId, sourceId, metricsHostsJobTypes);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,21 @@ import {
bucketSpan,
partitionField,
} from '../../../../../common/infra_ml';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import MemoryJob from '../../../../../../ml/server/models/data_recognizer/modules/metrics_ui_k8s/ml/k8s_memory_usage.json';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import MemoryDatafeed from '../../../../../../ml/server/models/data_recognizer/modules/metrics_ui_k8s/ml/datafeed_k8s_memory_usage.json';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import NetworkInJob from '../../../../../../ml/server/models/data_recognizer/modules/metrics_ui_k8s/ml/k8s_network_in.json';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import NetworkInDatafeed from '../../../../../../ml/server/models/data_recognizer/modules/metrics_ui_k8s/ml/datafeed_k8s_network_in.json';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import NetworkOutJob from '../../../../../../ml/server/models/data_recognizer/modules/metrics_ui_k8s/ml/k8s_network_out.json';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import NetworkOutDatafeed from '../../../../../../ml/server/models/data_recognizer/modules/metrics_ui_k8s/ml/datafeed_k8s_network_out.json';

type JobType = 'k8s_memory_usage' | 'k8s_network_in' | 'k8s_network_out';
export const DEFAULT_K8S_PARTITION_FIELD = 'kubernetes.namespace';
const moduleId = 'metrics_ui_k8s';
const moduleName = i18n.translate('xpack.infra.ml.metricsModuleName', {
defaultMessage: 'Metrics anomanly detection',
Expand Down Expand Up @@ -57,23 +71,69 @@ const setUpModule = async (
pField?: string
) => {
const indexNamePattern = indices.join(',');
const jobIds = ['k8s_memory_usage', 'k8s_network_in', 'k8s_network_out'];
const jobOverrides = jobIds.map((id) => ({
job_id: id,
analysis_config: {
bucket_span: `${bucketSpan}ms`,
},
data_description: {
time_field: timestampField,
},
custom_settings: {
metrics_source_config: {
indexPattern: indexNamePattern,
timestampField,
bucketSpan,
const jobIds: JobType[] = ['k8s_memory_usage', 'k8s_network_in', 'k8s_network_out'];
const jobOverrides = jobIds.map((id) => {
const { job: defaultJobConfig } = getDefaultJobConfigs(id);

// eslint-disable-next-line @typescript-eslint/naming-convention
const analysis_config = {
...defaultJobConfig.analysis_config,
};

if (pField) {
analysis_config.detectors[0].partition_field_name = pField;
if (analysis_config.influencers.indexOf(pField) === -1) {
analysis_config.influencers.push(pField);
}
}

return {
job_id: id,
data_description: {
time_field: timestampField,
},
},
}));
analysis_config,
custom_settings: {
metrics_source_config: {
indexPattern: indexNamePattern,
timestampField,
bucketSpan,
},
},
};
});

const datafeedOverrides = jobIds.map((id) => {
const { datafeed: defaultDatafeedConfig } = getDefaultJobConfigs(id);

if (!pField || id === 'k8s_memory_usage') {
// Since the host memory usage doesn't have custom aggs, we don't need to do anything to add a partition field
return defaultDatafeedConfig;
}

// Because the ML K8s jobs ship with a default partition field of {kubernetes.namespace}, ignore that agg and wrap it in our own agg.
const innerAggregation =
defaultDatafeedConfig.aggregations[DEFAULT_K8S_PARTITION_FIELD].aggregations;

// If we have a partition field, we need to change the aggregation to do a terms agg to partition the data at the top level
const aggregations = {
[pField]: {
terms: {
field: pField,
size: 25, // 25 is arbitratry and only used to keep the number of buckets to a managable level in the event that the user choose a high cardinality partition field.
},
aggregations: {
...innerAggregation,
},
},
};

return {
...defaultDatafeedConfig,
job_id: id,
aggregations,
};
});

return callSetupMlModuleAPI(
moduleId,
Expand All @@ -83,10 +143,30 @@ const setUpModule = async (
sourceId,
indexNamePattern,
jobOverrides,
[]
datafeedOverrides
);
};

const getDefaultJobConfigs = (jobId: JobType) => {
switch (jobId) {
case 'k8s_memory_usage':
return {
datafeed: MemoryDatafeed,
job: MemoryJob,
};
case 'k8s_network_in':
return {
datafeed: NetworkInDatafeed,
job: NetworkInJob,
};
case 'k8s_network_out':
return {
datafeed: NetworkOutDatafeed,
job: NetworkOutJob,
};
}
};

const cleanUpModule = async (spaceId: string, sourceId: string) => {
return await cleanUpJobsAndDatafeeds(spaceId, sourceId, metricsK8SJobTypes);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { useSourceViaHttp } from '../../../../../../containers/source/use_source
import { useMetricK8sModuleContext } from '../../../../../../containers/ml/modules/metrics_k8s/module';
import { useMetricHostsModuleContext } from '../../../../../../containers/ml/modules/metrics_hosts/module';
import { FixedDatePicker } from '../../../../../../components/fixed_datepicker';
import { DEFAULT_K8S_PARTITION_FIELD } from '../../../../../../containers/ml/modules/metrics_k8s/module_descriptor';

interface Props {
jobType: 'hosts' | 'kubernetes';
Expand Down Expand Up @@ -107,7 +108,7 @@ export const JobSetupScreen = (props: Props) => {

useEffect(() => {
if (props.jobType === 'kubernetes') {
setPartitionField(['kubernetes.namespace']);
setPartitionField([DEFAULT_K8S_PARTITION_FIELD]);
}
}, [props.jobType]);

Expand Down

0 comments on commit f6fdbb4

Please sign in to comment.