diff --git a/docs/changelog/85931.yaml b/docs/changelog/85931.yaml new file mode 100644 index 0000000000000..5ad9617b21f8f --- /dev/null +++ b/docs/changelog/85931.yaml @@ -0,0 +1,5 @@ +pr: 85931 +summary: Invoke default pipeline of new index +area: Ingest Node +type: enhancement +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java index 98ce3933f9d3c..504d78eaf47c8 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/FinalPipelineIT.java @@ -29,6 +29,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.ingest.AbstractProcessor; +import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.PipelineConfiguration; import org.elasticsearch.ingest.Processor; @@ -49,6 +50,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.function.BiConsumer; import java.util.function.Supplier; @@ -188,6 +190,70 @@ public void testDefaultPipelineOfNewDestinationIsNotInvoked() { assertFalse(target.getHits().getAt(0).getSourceAsMap().containsKey("final")); } + public void testDefaultPipelineOfRedirectDestinationIsInvoked() { + Settings settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build(); + createIndex("index", settings); + + settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "target_default_pipeline").build(); + createIndex("target", settings); + + BytesReference defaultPipelineBody = new BytesArray(""" + {"processors": [{"redirect": {}}]}"""); + client().admin() + .cluster() + .putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON)) + .actionGet(); + + BytesReference targetPipeline = new BytesArray(""" + {"processors": [{"final": {}}]}"""); + client().admin() + .cluster() + .putPipeline(new PutPipelineRequest("target_default_pipeline", targetPipeline, XContentType.JSON)) + .actionGet(); + + IndexResponse indexResponse = client().prepareIndex("index") + .setId("1") + .setSource(Map.of("field", "value")) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + assertEquals(RestStatus.CREATED, indexResponse.status()); + SearchResponse target = client().prepareSearch("target").get(); + assertEquals(1, target.getHits().getTotalHits().value); + assertTrue(target.getHits().getAt(0).getSourceAsMap().containsKey("final")); + } + + public void testAvoidIndexingLoop() { + Settings settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build(); + createIndex("index", settings); + + settings = Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "target_default_pipeline").build(); + createIndex("target", settings); + + BytesReference defaultPipelineBody = new BytesArray(""" + {"processors": [{"redirect": {"dest": "target"}}]}"""); + client().admin() + .cluster() + .putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON)) + .actionGet(); + + BytesReference targetPipeline = new BytesArray(""" + {"processors": [{"redirect": {"dest": "index"}}]}"""); + client().admin() + .cluster() + .putPipeline(new PutPipelineRequest("target_default_pipeline", targetPipeline, XContentType.JSON)) + .actionGet(); + + IllegalStateException exception = expectThrows( + IllegalStateException.class, + () -> client().prepareIndex("index") + .setId("1") + .setSource(Map.of("dest", "index")) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get() + ); + assertThat(exception.getMessage(), containsString("index cycle detected while processing pipelines: [index, target, index]")); + } + public void testFinalPipeline() { final Settings settings = Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "final_pipeline").build(); createIndex("index", settings); @@ -394,6 +460,26 @@ public String getType() { return "changing_dest"; } + }, + "redirect", + (processorFactories, tag, description, config) -> { + final String dest = Objects.requireNonNullElse( + ConfigurationUtils.readOptionalStringProperty(description, tag, config, "dest"), + "target" + ); + return new AbstractProcessor(tag, description) { + @Override + public IngestDocument execute(final IngestDocument ingestDocument) throws Exception { + ingestDocument.redirect(dest); + return ingestDocument; + } + + @Override + public String getType() { + return "redirect"; + } + + }; } ); } diff --git a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java index 2910ab11e8c94..459e6c084e798 100644 --- a/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/CompoundProcessor.java @@ -141,7 +141,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer handler) { assert currentProcessor <= processorsWithMetrics.size(); - if (currentProcessor == processorsWithMetrics.size()) { + if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isSkipCurrentPipeline()) { handler.accept(ingestDocument, null); return; } @@ -150,7 +150,9 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC Processor processor; IngestMetric metric; // iteratively execute any sync processors - while (currentProcessor < processorsWithMetrics.size() && processorsWithMetrics.get(currentProcessor).v1().isAsync() == false) { + while (currentProcessor < processorsWithMetrics.size() + && processorsWithMetrics.get(currentProcessor).v1().isAsync() == false + && ingestDocument.isSkipCurrentPipeline() == false) { processorWithMetric = processorsWithMetrics.get(currentProcessor); processor = processorWithMetric.v1(); metric = processorWithMetric.v2(); @@ -176,7 +178,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC } assert currentProcessor <= processorsWithMetrics.size(); - if (currentProcessor == processorsWithMetrics.size()) { + if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isSkipCurrentPipeline()) { handler.accept(ingestDocument, null); return; } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index f471926087ae5..cdbcc4d5e412b 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -62,8 +62,10 @@ public final class IngestDocument { // Contains all pipelines that have been executed for this document private final Set executedPipelines = new LinkedHashSet<>(); + private boolean skipCurrentPipeline = false; private boolean doNoSelfReferencesCheck = false; + private boolean invokeDefaultPipelineOfDestination = false; public IngestDocument(String index, String id, long version, String routing, VersionType versionType, Map source) { this.ctxMap = new IngestCtxMap(index, id, version, routing, versionType, ZonedDateTime.now(ZoneOffset.UTC), source); @@ -80,6 +82,7 @@ public IngestDocument(IngestDocument other) { new IngestCtxMap(deepCopyMap(other.ctxMap.getSource()), other.ctxMap.getMetadata().clone()), deepCopyMap(other.ingestMetadata) ); + this.invokeDefaultPipelineOfDestination = other.invokeDefaultPipelineOfDestination; } /** @@ -838,6 +841,7 @@ public void executePipeline(Pipeline pipeline, BiConsumer { + skipCurrentPipeline = false; executedPipelines.remove(pipeline.getId()); if (previousPipeline != null) { ingestMetadata.put("pipeline", previousPipeline); @@ -903,6 +907,20 @@ public String toString() { return "IngestDocument{" + " sourceAndMetadata=" + ctxMap + ", ingestMetadata=" + ingestMetadata + '}'; } + public void redirect(String destIndex) { + getMetadata().setIndex(destIndex); + invokeDefaultPipelineOfDestination = true; + skipCurrentPipeline = true; + } + + public boolean isInvokeDefaultPipelineOfDestination() { + return invokeDefaultPipelineOfDestination; + } + + public boolean isSkipCurrentPipeline() { + return skipCurrentPipeline; + } + public enum Metadata { INDEX(IndexFieldMapper.NAME), TYPE("_type"), diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 0e53b6a39f0fd..9873fd997fd39 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -68,6 +68,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Locale; @@ -714,21 +715,8 @@ protected void doRun() { continue; } - final String pipelineId = indexRequest.getPipeline(); - indexRequest.setPipeline(NOOP_PIPELINE_NAME); - final String finalPipelineId = indexRequest.getFinalPipeline(); - indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME); - boolean hasFinalPipeline = true; - final List pipelines; - if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false - && IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) { - pipelines = List.of(pipelineId, finalPipelineId); - } else if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false) { - pipelines = List.of(pipelineId); - hasFinalPipeline = false; - } else if (IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) { - pipelines = List.of(finalPipelineId); - } else { + Pipelines pipelines = getPipelines(indexRequest); + if (pipelines.isEmpty()) { i++; continue; } @@ -763,8 +751,16 @@ public void onFailure(Exception e) { }); IngestDocument ingestDocument = newIngestDocument(indexRequest); - executePipelines(pipelines.iterator(), hasFinalPipeline, indexRequest, ingestDocument, documentListener); - + LinkedHashSet indexRecursionDetection = new LinkedHashSet<>(); + indexRecursionDetection.add(indexRequest.index()); + executePipelines( + pipelines.iterator(), + pipelines.hasFinalPipeline(), + indexRequest, + ingestDocument, + documentListener, + indexRecursionDetection + ); i++; } } @@ -772,12 +768,61 @@ public void onFailure(Exception e) { }); } + private Pipelines getPipelines(IndexRequest indexRequest) { + final String pipelineId = indexRequest.getPipeline(); + indexRequest.setPipeline(NOOP_PIPELINE_NAME); + final String finalPipelineId = indexRequest.getFinalPipeline(); + indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME); + return new Pipelines(pipelineId, finalPipelineId); + } + + private static class Pipelines implements Iterable { + private String defaultPipeline; + private String finalPipeline; + + private Pipelines(String defaultPipeline, String finalPipeline) { + if (NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) { + this.defaultPipeline = defaultPipeline; + } + if (NOOP_PIPELINE_NAME.equals(finalPipeline) == false) { + this.finalPipeline = finalPipeline; + } + } + + public boolean hasFinalPipeline() { + return finalPipeline != null; + } + + public boolean isEmpty() { + return defaultPipeline == null && finalPipeline == null; + } + + public void withoutDefaultPipeline() { + defaultPipeline = null; + } + + @Override + public Iterator iterator() { + if (defaultPipeline != null && finalPipeline != null) { + return List.of(defaultPipeline, finalPipeline).iterator(); + } + if (finalPipeline != null) { + return List.of(finalPipeline).iterator(); + } + if (defaultPipeline != null) { + return List.of(defaultPipeline).iterator(); + } + return Collections.emptyIterator(); + } + } + private void executePipelines( final Iterator pipelineIds, final boolean hasFinalPipeline, final IndexRequest indexRequest, final IngestDocument ingestDocument, - final ActionListener listener + final ActionListener listener, + final Set indexRecursionDetection ) { assert pipelineIds.hasNext(); final String pipelineId = pipelineIds.next(); @@ -840,6 +885,14 @@ private void executePipelines( final String newIndex = indexRequest.indices()[0]; if (Objects.equals(originalIndex, newIndex) == false) { + if (indexRecursionDetection.add(newIndex) == false) { + List indexRoute = new ArrayList<>(indexRecursionDetection); + indexRoute.add(newIndex); + listener.onFailure( + new IllegalStateException(format("index cycle detected while processing pipelines: %s", indexRoute)) + ); + return; // document failed! + } if (hasFinalPipeline && pipelineIds.hasNext() == false) { listener.onFailure( new IllegalStateException( @@ -854,19 +907,21 @@ private void executePipelines( ); return; // document failed! } else { + // reset request pipeline that is set to _none which would take precedence over the default pipeline + indexRequest.setPipeline(null); indexRequest.isPipelineResolved(false); resolvePipelines(null, indexRequest, state.metadata()); - if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false) { - newPipelineIds = Collections.singleton(indexRequest.getFinalPipeline()).iterator(); - newHasFinalPipeline = true; - } else { - newPipelineIds = Collections.emptyIterator(); + Pipelines pipelines = getPipelines(indexRequest); + if (ingestDocument.isInvokeDefaultPipelineOfDestination() == false) { + pipelines.withoutDefaultPipeline(); } + newHasFinalPipeline = pipelines.hasFinalPipeline(); + newPipelineIds = pipelines.iterator(); } } if (newPipelineIds.hasNext()) { - executePipelines(newPipelineIds, newHasFinalPipeline, indexRequest, ingestDocument, listener); + executePipelines(newPipelineIds, newHasFinalPipeline, indexRequest, ingestDocument, listener, indexRecursionDetection); } else { // update the index request's source and (potentially) cache the timestamp for TSDB updateIndexRequestSource(indexRequest, ingestDocument); diff --git a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java index 4bc581594d8a4..09b8c206fc135 100644 --- a/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/CompoundProcessorTests.java @@ -532,6 +532,85 @@ public void testMultipleProcessorsDoNotIgnoreFailures() { } } + public void testSkipPipeline() { + TestProcessor processor1 = new TestProcessor(doc -> doc.redirect("foo")); + TestProcessor processor2 = new TestProcessor(new RuntimeException("this processor was expected to be skipped")); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + List.of(processor1, processor2), + List.of(), + relativeTimeProvider + ); + executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); + assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 0, 0); + assertThat(processor2.getInvokedCounter(), equalTo(0)); + assertStats(1, compoundProcessor, 0, 0, 0, 0); + } + + public void testSkipAsyncProcessor() { + TestProcessor processor1 = new TestProcessor(doc -> doc.redirect("foo")) { + @Override + public boolean isAsync() { + return true; + } + }; + TestProcessor processor2 = new TestProcessor(new RuntimeException("this processor was expected to be skipped")); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + List.of(processor1, processor2), + List.of(), + relativeTimeProvider + ); + executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); + assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 0, 0); + assertThat(processor2.getInvokedCounter(), equalTo(0)); + assertStats(1, compoundProcessor, 0, 0, 0, 0); + } + + public void testSkipProcessorIgnoreFailure() { + TestProcessor processor1 = new TestProcessor(doc -> { + doc.redirect("foo"); + throw new RuntimeException("simulate processor failure after calling skipCurrentPipeline()"); + }); + TestProcessor processor2 = new TestProcessor(doc -> {}); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor(true, List.of(processor1, processor2), List.of(), relativeTimeProvider); + executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); + assertThat(processor1.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 1, 0); + assertThat(processor2.getInvokedCounter(), equalTo(0)); + assertStats(1, compoundProcessor, 0, 0, 0, 0); + } + + public void testDontSkipFailureProcessor() { + TestProcessor processor = new TestProcessor(doc -> { + doc.redirect("foo"); + throw new RuntimeException("simulate processor failure after calling skipCurrentPipeline()"); + }); + TestProcessor failureProcessor1 = new TestProcessor(doc -> {}); + TestProcessor failureProcessor2 = new TestProcessor(doc -> {}); + LongSupplier relativeTimeProvider = mock(LongSupplier.class); + when(relativeTimeProvider.getAsLong()).thenReturn(0L); + CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + List.of(processor), + List.of(failureProcessor1, failureProcessor2), + relativeTimeProvider + ); + executeCompound(compoundProcessor, ingestDocument, (result, e) -> {}); + assertThat(processor.getInvokedCounter(), equalTo(1)); + assertStats(0, compoundProcessor, 0, 1, 1, 0); + assertThat(failureProcessor1.getInvokedCounter(), equalTo(1)); + assertThat(failureProcessor2.getInvokedCounter(), equalTo(1)); + } + private TestProcessor getTestProcessor(String tag, boolean isAsync, boolean shouldThrowException) { return new TestProcessor( tag,