From 819a3f624faf82b43391e41c2008a7822b7b03e6 Mon Sep 17 00:00:00 2001 From: Brian McGue Date: Fri, 4 Nov 2022 16:14:54 -0700 Subject: [PATCH 1/3] Add failure handling for set processors Also add a remove processor and text_classification and text_embedding types. --- .../ml_inference_pipeline/index.test.ts | 105 +++++++++++++++++- .../common/ml_inference_pipeline/index.ts | 41 ++++++- 2 files changed, 140 insertions(+), 6 deletions(-) diff --git a/x-pack/plugins/enterprise_search/common/ml_inference_pipeline/index.test.ts b/x-pack/plugins/enterprise_search/common/ml_inference_pipeline/index.test.ts index b2616ed7615ba..32a2d8d377d9c 100644 --- a/x-pack/plugins/enterprise_search/common/ml_inference_pipeline/index.test.ts +++ b/x-pack/plugins/enterprise_search/common/ml_inference_pipeline/index.test.ts @@ -5,7 +5,11 @@ * 2.0. */ -import { IngestSetProcessor, MlTrainedModelConfig } from '@elastic/elasticsearch/lib/api/types'; +import { + IngestRemoveProcessor, + IngestSetProcessor, + MlTrainedModelConfig, +} from '@elastic/elasticsearch/lib/api/types'; import { BUILT_IN_MODEL_TAG } from '@kbn/ml-plugin/common/constants/data_frame_analytics'; import { SUPPORTED_PYTORCH_TASKS } from '@kbn/ml-plugin/common/constants/trained_models'; @@ -18,6 +22,7 @@ import { getSetProcessorForInferenceType, SUPPORTED_PYTORCH_TASKS as LOCAL_SUPPORTED_PYTORCH_TASKS, parseMlInferenceParametersFromPipeline, + getRemoveProcessorForInferenceType, } from '.'; const mockModel: MlTrainedModelConfig = { @@ -63,6 +68,38 @@ describe('getMlModelTypesForModelConfig lib function', () => { }); }); +describe('getRemoveProcessorForInferenceType lib function', () => { + const destinationField = 'dest'; + + it('should return expected value for TEXT_CLASSIFICATION', () => { + const inferenceType = SUPPORTED_PYTORCH_TASKS.TEXT_CLASSIFICATION; + + const expected: IngestRemoveProcessor = { + field: destinationField, + ignore_missing: true, + }; + + expect(getRemoveProcessorForInferenceType(destinationField, inferenceType)).toEqual(expected); + }); + + it('should return expected value for TEXT_EMBEDDING', () => { + const inferenceType = SUPPORTED_PYTORCH_TASKS.TEXT_EMBEDDING; + + const expected: IngestRemoveProcessor = { + field: destinationField, + ignore_missing: true, + }; + + expect(getRemoveProcessorForInferenceType(destinationField, inferenceType)).toEqual(expected); + }); + + it('should return undefined for unknown inferenceType', () => { + const inferenceType = 'wrongInferenceType'; + + expect(getRemoveProcessorForInferenceType(destinationField, inferenceType)).toBeUndefined(); + }); +}); + describe('getSetProcessorForInferenceType lib function', () => { const destinationField = 'dest'; @@ -79,10 +116,28 @@ describe('getSetProcessorForInferenceType lib function', () => { "Copy the predicted_value to 'dest' if the prediction_probability is greater than 0.5", field: destinationField, if: 'ml.inference.dest.prediction_probability > 0.5', + on_failure: [ + { + append: { + field: '_source._ingest.set_errors', + ignore_failure: true, + value: [ + { + message: + "Processor 'set' in pipeline 'my-pipeline' failed with message '{{ _ingest.on_failure_message }}'", + pipeline: 'my-pipeline', + timestamp: '{{{ _ingest.timestamp }}}', + }, + ], + }, + }, + ], value: undefined, }; - expect(getSetProcessorForInferenceType(destinationField, inferenceType)).toEqual(expected); + expect(getSetProcessorForInferenceType(destinationField, inferenceType, 'my-pipeline')).toEqual( + expected + ); }); it('should return expected value for TEXT_EMBEDDING', () => { @@ -92,16 +147,36 @@ describe('getSetProcessorForInferenceType lib function', () => { copy_from: 'ml.inference.dest.predicted_value', description: "Copy the predicted_value to 'dest'", field: destinationField, + on_failure: [ + { + append: { + field: '_source._ingest.set_errors', + ignore_failure: true, + value: [ + { + message: + "Processor 'set' in pipeline 'my-pipeline' failed with message '{{ _ingest.on_failure_message }}'", + pipeline: 'my-pipeline', + timestamp: '{{{ _ingest.timestamp }}}', + }, + ], + }, + }, + ], value: undefined, }; - expect(getSetProcessorForInferenceType(destinationField, inferenceType)).toEqual(expected); + expect(getSetProcessorForInferenceType(destinationField, inferenceType, 'my-pipeline')).toEqual( + expected + ); }); it('should return undefined for unknown inferenceType', () => { const inferenceType = 'wrongInferenceType'; - expect(getSetProcessorForInferenceType(destinationField, inferenceType)).toBeUndefined(); + expect( + getSetProcessorForInferenceType(destinationField, inferenceType, 'my-pipeline') + ).toBeUndefined(); }); }); @@ -185,6 +260,12 @@ describe('generateMlInferencePipelineBody lib function', () => { expect.objectContaining({ description: expect.any(String), processors: expect.arrayContaining([ + expect.objectContaining({ + remove: { + field: 'my-destination-field', + ignore_missing: true, + }, + }), expect.objectContaining({ set: { copy_from: 'ml.inference.my-destination-field.predicted_value', @@ -192,6 +273,22 @@ describe('generateMlInferencePipelineBody lib function', () => { "Copy the predicted_value to 'my-destination-field' if the prediction_probability is greater than 0.5", field: 'my-destination-field', if: 'ml.inference.my-destination-field.prediction_probability > 0.5', + on_failure: [ + { + append: { + field: '_source._ingest.set_errors', + ignore_failure: true, + value: [ + { + message: + "Processor 'set' in pipeline 'my-pipeline' failed with message '{{ _ingest.on_failure_message }}'", + pipeline: 'my-pipeline', + timestamp: '{{{ _ingest.timestamp }}}', + }, + ], + }, + }, + ], }, }), ]), diff --git a/x-pack/plugins/enterprise_search/common/ml_inference_pipeline/index.ts b/x-pack/plugins/enterprise_search/common/ml_inference_pipeline/index.ts index 4e5b124f8dff0..f378f7cf111a0 100644 --- a/x-pack/plugins/enterprise_search/common/ml_inference_pipeline/index.ts +++ b/x-pack/plugins/enterprise_search/common/ml_inference_pipeline/index.ts @@ -7,6 +7,8 @@ import { IngestPipeline, + IngestProcessorContainer, + IngestRemoveProcessor, IngestSetProcessor, MlTrainedModelConfig, } from '@elastic/elasticsearch/lib/api/types'; @@ -53,7 +55,8 @@ export const generateMlInferencePipelineBody = ({ model.input?.field_names?.length > 0 ? model.input.field_names[0] : 'MODEL_INPUT_FIELD'; const inferenceType = Object.keys(model.inference_config)[0]; - const set = getSetProcessorForInferenceType(destinationField, inferenceType); + const remove = getRemoveProcessorForInferenceType(destinationField, inferenceType); + const set = getSetProcessorForInferenceType(destinationField, inferenceType, pipelineName); return { description: description ?? '', @@ -64,6 +67,7 @@ export const generateMlInferencePipelineBody = ({ ignore_missing: true, }, }, + ...(remove ? [{ remove }] : []), { inference: { field_map: { @@ -108,10 +112,26 @@ export const generateMlInferencePipelineBody = ({ export const getSetProcessorForInferenceType = ( destinationField: string, - inferenceType: string + inferenceType: string, + pipelineName: string ): IngestSetProcessor | undefined => { let set: IngestSetProcessor | undefined; const prefixedDestinationField = `ml.inference.${destinationField}`; + const onFailure: IngestProcessorContainer[] = [ + { + append: { + field: '_source._ingest.set_errors', + ignore_failure: true, + value: [ + { + message: `Processor 'set' in pipeline '${pipelineName}' failed with message '{{ _ingest.on_failure_message }}'`, + pipeline: pipelineName, + timestamp: '{{{ _ingest.timestamp }}}', + }, + ], + }, + }, + ]; if (inferenceType === SUPPORTED_PYTORCH_TASKS.TEXT_CLASSIFICATION) { set = { @@ -119,6 +139,7 @@ export const getSetProcessorForInferenceType = ( description: `Copy the predicted_value to '${destinationField}' if the prediction_probability is greater than 0.5`, field: destinationField, if: `${prefixedDestinationField}.prediction_probability > 0.5`, + on_failure: onFailure, value: undefined, }; } else if (inferenceType === SUPPORTED_PYTORCH_TASKS.TEXT_EMBEDDING) { @@ -126,6 +147,7 @@ export const getSetProcessorForInferenceType = ( copy_from: `${prefixedDestinationField}.predicted_value`, description: `Copy the predicted_value to '${destinationField}'`, field: destinationField, + on_failure: onFailure, value: undefined, }; } @@ -133,6 +155,21 @@ export const getSetProcessorForInferenceType = ( return set; }; +export const getRemoveProcessorForInferenceType = ( + destinationField: string, + inferenceType: string +): IngestRemoveProcessor | undefined => { + if ( + inferenceType === SUPPORTED_PYTORCH_TASKS.TEXT_CLASSIFICATION || + inferenceType === SUPPORTED_PYTORCH_TASKS.TEXT_EMBEDDING + ) { + return { + field: destinationField, + ignore_missing: true, + }; + } +}; + /** * Parses model types list from the given configuration of a trained machine learning model * @param trainedModel configuration for a trained machine learning model From 84518ea2c9369fa62f4498a421785a27b5c2da7d Mon Sep 17 00:00:00 2001 From: Brian McGue Date: Tue, 15 Nov 2022 13:13:52 -0800 Subject: [PATCH 2/3] Use bracket style getter in painless script Fields with dashes would not compile otherwise. --- .../common/ml_inference_pipeline/index.test.ts | 4 ++-- .../enterprise_search/common/ml_inference_pipeline/index.ts | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/plugins/enterprise_search/common/ml_inference_pipeline/index.test.ts b/x-pack/plugins/enterprise_search/common/ml_inference_pipeline/index.test.ts index e5c49ce8d8e26..f697b917b9342 100644 --- a/x-pack/plugins/enterprise_search/common/ml_inference_pipeline/index.test.ts +++ b/x-pack/plugins/enterprise_search/common/ml_inference_pipeline/index.test.ts @@ -115,7 +115,7 @@ describe('getSetProcessorForInferenceType lib function', () => { description: "Copy the predicted_value to 'dest' if the prediction_probability is greater than 0.5", field: destinationField, - if: 'ctx.ml.inference.dest.prediction_probability > 0.5', + if: "ctx.ml.inference['dest'].prediction_probability > 0.5", on_failure: [ { append: { @@ -272,7 +272,7 @@ describe('generateMlInferencePipelineBody lib function', () => { description: "Copy the predicted_value to 'my-destination-field' if the prediction_probability is greater than 0.5", field: 'my-destination-field', - if: 'ctx.ml.inference.my-destination-field.prediction_probability > 0.5', + if: "ctx.ml.inference['my-destination-field'].prediction_probability > 0.5", on_failure: [ { append: { diff --git a/x-pack/plugins/enterprise_search/common/ml_inference_pipeline/index.ts b/x-pack/plugins/enterprise_search/common/ml_inference_pipeline/index.ts index dcc11c7748e03..3b72d0a376659 100644 --- a/x-pack/plugins/enterprise_search/common/ml_inference_pipeline/index.ts +++ b/x-pack/plugins/enterprise_search/common/ml_inference_pipeline/index.ts @@ -138,7 +138,7 @@ export const getSetProcessorForInferenceType = ( copy_from: `${prefixedDestinationField}.predicted_value`, description: `Copy the predicted_value to '${destinationField}' if the prediction_probability is greater than 0.5`, field: destinationField, - if: `ctx.${prefixedDestinationField}.prediction_probability > 0.5`, + if: `ctx.ml.inference['${destinationField}'].prediction_probability > 0.5`, on_failure: onFailure, value: undefined, }; From e5eb975cd16737a470915f4cd177269c76c1bc04 Mon Sep 17 00:00:00 2001 From: Brian McGue Date: Tue, 15 Nov 2022 13:56:03 -0800 Subject: [PATCH 3/3] Replace set processor on_failure with null checks --- .../ml_inference_pipeline/index.test.ts | 65 ++----------------- .../common/ml_inference_pipeline/index.ts | 26 ++------ 2 files changed, 10 insertions(+), 81 deletions(-) diff --git a/x-pack/plugins/enterprise_search/common/ml_inference_pipeline/index.test.ts b/x-pack/plugins/enterprise_search/common/ml_inference_pipeline/index.test.ts index f697b917b9342..0cf88337a8a8c 100644 --- a/x-pack/plugins/enterprise_search/common/ml_inference_pipeline/index.test.ts +++ b/x-pack/plugins/enterprise_search/common/ml_inference_pipeline/index.test.ts @@ -115,29 +115,11 @@ describe('getSetProcessorForInferenceType lib function', () => { description: "Copy the predicted_value to 'dest' if the prediction_probability is greater than 0.5", field: destinationField, - if: "ctx.ml.inference['dest'].prediction_probability > 0.5", - on_failure: [ - { - append: { - field: '_source._ingest.set_errors', - ignore_failure: true, - value: [ - { - message: - "Processor 'set' in pipeline 'my-pipeline' failed with message '{{ _ingest.on_failure_message }}'", - pipeline: 'my-pipeline', - timestamp: '{{{ _ingest.timestamp }}}', - }, - ], - }, - }, - ], + if: "ctx?.ml?.inference != null && ctx.ml.inference['dest'] != null && ctx.ml.inference['dest'].prediction_probability > 0.5", value: undefined, }; - expect(getSetProcessorForInferenceType(destinationField, inferenceType, 'my-pipeline')).toEqual( - expected - ); + expect(getSetProcessorForInferenceType(destinationField, inferenceType)).toEqual(expected); }); it('should return expected value for TEXT_EMBEDDING', () => { @@ -147,36 +129,17 @@ describe('getSetProcessorForInferenceType lib function', () => { copy_from: 'ml.inference.dest.predicted_value', description: "Copy the predicted_value to 'dest'", field: destinationField, - on_failure: [ - { - append: { - field: '_source._ingest.set_errors', - ignore_failure: true, - value: [ - { - message: - "Processor 'set' in pipeline 'my-pipeline' failed with message '{{ _ingest.on_failure_message }}'", - pipeline: 'my-pipeline', - timestamp: '{{{ _ingest.timestamp }}}', - }, - ], - }, - }, - ], + if: "ctx?.ml?.inference != null && ctx.ml.inference['dest'] != null", value: undefined, }; - expect(getSetProcessorForInferenceType(destinationField, inferenceType, 'my-pipeline')).toEqual( - expected - ); + expect(getSetProcessorForInferenceType(destinationField, inferenceType)).toEqual(expected); }); it('should return undefined for unknown inferenceType', () => { const inferenceType = 'wrongInferenceType'; - expect( - getSetProcessorForInferenceType(destinationField, inferenceType, 'my-pipeline') - ).toBeUndefined(); + expect(getSetProcessorForInferenceType(destinationField, inferenceType)).toBeUndefined(); }); }); @@ -272,23 +235,7 @@ describe('generateMlInferencePipelineBody lib function', () => { description: "Copy the predicted_value to 'my-destination-field' if the prediction_probability is greater than 0.5", field: 'my-destination-field', - if: "ctx.ml.inference['my-destination-field'].prediction_probability > 0.5", - on_failure: [ - { - append: { - field: '_source._ingest.set_errors', - ignore_failure: true, - value: [ - { - message: - "Processor 'set' in pipeline 'my-pipeline' failed with message '{{ _ingest.on_failure_message }}'", - pipeline: 'my-pipeline', - timestamp: '{{{ _ingest.timestamp }}}', - }, - ], - }, - }, - ], + if: "ctx?.ml?.inference != null && ctx.ml.inference['my-destination-field'] != null && ctx.ml.inference['my-destination-field'].prediction_probability > 0.5", }, }), ]), diff --git a/x-pack/plugins/enterprise_search/common/ml_inference_pipeline/index.ts b/x-pack/plugins/enterprise_search/common/ml_inference_pipeline/index.ts index 3b72d0a376659..26aaf98c0edea 100644 --- a/x-pack/plugins/enterprise_search/common/ml_inference_pipeline/index.ts +++ b/x-pack/plugins/enterprise_search/common/ml_inference_pipeline/index.ts @@ -7,7 +7,6 @@ import { IngestPipeline, - IngestProcessorContainer, IngestRemoveProcessor, IngestSetProcessor, MlTrainedModelConfig, @@ -56,7 +55,7 @@ export const generateMlInferencePipelineBody = ({ const inferenceType = Object.keys(model.inference_config)[0]; const remove = getRemoveProcessorForInferenceType(destinationField, inferenceType); - const set = getSetProcessorForInferenceType(destinationField, inferenceType, pipelineName); + const set = getSetProcessorForInferenceType(destinationField, inferenceType); return { description: description ?? '', @@ -112,34 +111,17 @@ export const generateMlInferencePipelineBody = ({ export const getSetProcessorForInferenceType = ( destinationField: string, - inferenceType: string, - pipelineName: string + inferenceType: string ): IngestSetProcessor | undefined => { let set: IngestSetProcessor | undefined; const prefixedDestinationField = `ml.inference.${destinationField}`; - const onFailure: IngestProcessorContainer[] = [ - { - append: { - field: '_source._ingest.set_errors', - ignore_failure: true, - value: [ - { - message: `Processor 'set' in pipeline '${pipelineName}' failed with message '{{ _ingest.on_failure_message }}'`, - pipeline: pipelineName, - timestamp: '{{{ _ingest.timestamp }}}', - }, - ], - }, - }, - ]; if (inferenceType === SUPPORTED_PYTORCH_TASKS.TEXT_CLASSIFICATION) { set = { copy_from: `${prefixedDestinationField}.predicted_value`, description: `Copy the predicted_value to '${destinationField}' if the prediction_probability is greater than 0.5`, field: destinationField, - if: `ctx.ml.inference['${destinationField}'].prediction_probability > 0.5`, - on_failure: onFailure, + if: `ctx?.ml?.inference != null && ctx.ml.inference['${destinationField}'] != null && ctx.ml.inference['${destinationField}'].prediction_probability > 0.5`, value: undefined, }; } else if (inferenceType === SUPPORTED_PYTORCH_TASKS.TEXT_EMBEDDING) { @@ -147,7 +129,7 @@ export const getSetProcessorForInferenceType = ( copy_from: `${prefixedDestinationField}.predicted_value`, description: `Copy the predicted_value to '${destinationField}'`, field: destinationField, - on_failure: onFailure, + if: `ctx?.ml?.inference != null && ctx.ml.inference['${destinationField}'] != null`, value: undefined, }; }