diff --git a/docs/changelog/85932.yaml b/docs/changelog/85932.yaml new file mode 100644 index 0000000000000..db61abcc475a3 --- /dev/null +++ b/docs/changelog/85932.yaml @@ -0,0 +1,5 @@ +pr: 85932 +summary: Add ability to skip the rest of the processors in a pipeline +area: Ingest Node +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index 2910ab11e8c94..459e6c084e798 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -141,7 +141,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer handler) { assert currentProcessor <= processorsWithMetrics.size(); - if (currentProcessor == processorsWithMetrics.size()) { + if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isSkipCurrentPipeline()) { handler.accept(ingestDocument, null); return; } @@ -150,7 +150,9 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC Processor processor; IngestMetric metric; // iteratively execute any sync processors - while (currentProcessor < processorsWithMetrics.size() && processorsWithMetrics.get(currentProcessor).v1().isAsync() == false) { + while (currentProcessor < processorsWithMetrics.size() + && processorsWithMetrics.get(currentProcessor).v1().isAsync() == false + && ingestDocument.isSkipCurrentPipeline() == false) { processorWithMetric = processorsWithMetrics.get(currentProcessor); processor = processorWithMetric.v1(); metric = processorWithMetric.v2(); @@ -176,7 +178,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC } assert currentProcessor <= processorsWithMetrics.size(); - if (currentProcessor == processorsWithMetrics.size()) { + if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isSkipCurrentPipeline()) { handler.accept(ingestDocument, null); return; } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index f471926087ae5..bffecd2cfe990 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -62,6 +62,7 @@ public final class IngestDocument { // Contains all pipelines that have been executed for this document private final Set executedPipelines = new LinkedHashSet<>(); + private boolean skipCurrentPipeline = false; private boolean doNoSelfReferencesCheck = false; @@ -838,6 +839,7 @@ public void executePipeline(Pipeline pipeline, BiConsumer { + skipCurrentPipeline = false; executedPipelines.remove(pipeline.getId()); if (previousPipeline != null) { ingestMetadata.put("pipeline", previousPipeline); @@ -880,6 +882,18 @@ public void doNoSelfReferencesCheck(boolean doNoSelfReferencesCheck) { this.doNoSelfReferencesCheck = doNoSelfReferencesCheck; } + /** + * Skips the remaining processors in the current pipeline, except for on failure processors. + * If the current pipeline is executed via a pipeline processor, the caller pipeline will not be skipped. + */ + public void skipCurrentPipeline() { + this.skipCurrentPipeline = true; + } + + boolean isSkipCurrentPipeline() { + return skipCurrentPipeline; + } + @Override public boolean equals(Object obj) { if (obj == this) { diff --git a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java index 4bc581594d8a4..1426200383171 100644 --- a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java @@ -532,6 +532,85 @@ public void testMultipleProcessorsDoNotIgnoreFailures() { } } + public void testSkipPipeline() throws Exception { + TestProcessor processor1 = new TestProcessor(IngestDocument::skipCurrentPipeline); + TestProcessor processor2 = new TestProcessor(new RuntimeException("this processor was expected to be skipped")); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + List.of(processor1, processor2), + List.of(), + relativeTimeProvider + ); + executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); + assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 0, 0); + assertThat(processor2.getInvokedCounter(), equalTo(0)); + assertStats(1, compoundProcessor, 0, 0, 0, 0); + } + + public void testSkipAsyncProcessor() throws Exception { + TestProcessor processor1 = new TestProcessor(IngestDocument::skipCurrentPipeline) { + @Override + public boolean isAsync() { + return true; + } + }; + TestProcessor processor2 = new TestProcessor(new RuntimeException("this processor was expected to be skipped")); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + List.of(processor1, processor2), + List.of(), + relativeTimeProvider + ); + executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); + assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 0, 0); + assertThat(processor2.getInvokedCounter(), equalTo(0)); + assertStats(1, compoundProcessor, 0, 0, 0, 0); + } + + public void testSkipProcessorIgnoreFailure() throws Exception { + TestProcessor processor1 = new TestProcessor(doc -> { + doc.skipCurrentPipeline(); + throw new RuntimeException("simulate processor failure after calling skipCurrentPipeline()"); + }); + TestProcessor processor2 = new TestProcessor(doc -> {}); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor(true, List.of(processor1, processor2), List.of(), relativeTimeProvider); + executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); + assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 1, 0); + assertThat(processor2.getInvokedCounter(), equalTo(0)); + assertStats(1, compoundProcessor, 0, 0, 0, 0); + } + + public void testDontSkipFailureProcessor() throws Exception { + TestProcessor processor = new TestProcessor(doc -> { + doc.skipCurrentPipeline(); + throw new RuntimeException("simulate processor failure after calling skipCurrentPipeline()"); + }); + TestProcessor failureProcessor1 = new TestProcessor(doc -> {}); + TestProcessor failureProcessor2 = new TestProcessor(doc -> {}); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + List.of(processor), + List.of(failureProcessor1, failureProcessor2), + relativeTimeProvider + ); + executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); + assertThat(processor.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 1, 0); + assertThat(failureProcessor1.getInvokedCounter(), equalTo(1)); + assertThat(failureProcessor2.getInvokedCounter(), equalTo(1)); + } + private TestProcessor getTestProcessor(String tag, boolean isAsync, boolean shouldThrowException) { return new TestProcessor( tag,