Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to skip the rest of the processors in a pipeline #85932

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/85932.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 85932
summary: Add ability to skip the rest of the processors in a pipeline
area: Ingest Node
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex

void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiConsumer<IngestDocument, Exception> handler) {
assert currentProcessor <= processorsWithMetrics.size();
if (currentProcessor == processorsWithMetrics.size()) {
if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isSkipCurrentPipeline()) {
handler.accept(ingestDocument, null);
return;
}
Expand All @@ -150,7 +150,9 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC
Processor processor;
IngestMetric metric;
// iteratively execute any sync processors
while (currentProcessor < processorsWithMetrics.size() && processorsWithMetrics.get(currentProcessor).v1().isAsync() == false) {
while (currentProcessor < processorsWithMetrics.size()
&& processorsWithMetrics.get(currentProcessor).v1().isAsync() == false
&& ingestDocument.isSkipCurrentPipeline() == false) {
processorWithMetric = processorsWithMetrics.get(currentProcessor);
processor = processorWithMetric.v1();
metric = processorWithMetric.v2();
Expand All @@ -176,7 +178,7 @@ void innerExecute(int currentProcessor, IngestDocument ingestDocument, final BiC
}

assert currentProcessor <= processorsWithMetrics.size();
if (currentProcessor == processorsWithMetrics.size()) {
if (currentProcessor == processorsWithMetrics.size() || ingestDocument.isSkipCurrentPipeline()) {
handler.accept(ingestDocument, null);
return;
}
Expand Down
14 changes: 14 additions & 0 deletions server/src/main/java/org/elasticsearch/ingest/IngestDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public final class IngestDocument {

// Contains all pipelines that have been executed for this document
private final Set<String> executedPipelines = new LinkedHashSet<>();
private boolean skipCurrentPipeline = false;

private boolean doNoSelfReferencesCheck = false;

Expand Down Expand Up @@ -838,6 +839,7 @@ public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Except
if (executedPipelines.add(pipeline.getId())) {
Object previousPipeline = ingestMetadata.put("pipeline", pipeline.getId());
pipeline.execute(this, (result, e) -> {
skipCurrentPipeline = false;
executedPipelines.remove(pipeline.getId());
if (previousPipeline != null) {
ingestMetadata.put("pipeline", previousPipeline);
Expand Down Expand Up @@ -880,6 +882,18 @@ public void doNoSelfReferencesCheck(boolean doNoSelfReferencesCheck) {
this.doNoSelfReferencesCheck = doNoSelfReferencesCheck;
}

/**
* Skips the remaining processors in the current pipeline, except for on failure processors.
* If the current pipeline is executed via a pipeline processor, the caller pipeline will not be skipped.
*/
public void skipCurrentPipeline() {
this.skipCurrentPipeline = true;
}

boolean isSkipCurrentPipeline() {
return skipCurrentPipeline;
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,85 @@ public void testMultipleProcessorsDoNotIgnoreFailures() {
}
}

public void testSkipPipeline() throws Exception {
TestProcessor processor1 = new TestProcessor(IngestDocument::skipCurrentPipeline);
TestProcessor processor2 = new TestProcessor(new RuntimeException("this processor was expected to be skipped"));
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor compoundProcessor = new CompoundProcessor(
false,
List.of(processor1, processor2),
List.of(),
relativeTimeProvider
);
executeCompound(compoundProcessor, ingestDocument, (result, e) -> {});
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertStats(0, compoundProcessor, 0, 1, 0, 0);
assertThat(processor2.getInvokedCounter(), equalTo(0));
assertStats(1, compoundProcessor, 0, 0, 0, 0);
}

public void testSkipAsyncProcessor() throws Exception {
TestProcessor processor1 = new TestProcessor(IngestDocument::skipCurrentPipeline) {
@Override
public boolean isAsync() {
return true;
}
};
TestProcessor processor2 = new TestProcessor(new RuntimeException("this processor was expected to be skipped"));
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor compoundProcessor = new CompoundProcessor(
false,
List.of(processor1, processor2),
List.of(),
relativeTimeProvider
);
executeCompound(compoundProcessor, ingestDocument, (result, e) -> {});
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertStats(0, compoundProcessor, 0, 1, 0, 0);
assertThat(processor2.getInvokedCounter(), equalTo(0));
assertStats(1, compoundProcessor, 0, 0, 0, 0);
}

public void testSkipProcessorIgnoreFailure() throws Exception {
TestProcessor processor1 = new TestProcessor(doc -> {
doc.skipCurrentPipeline();
throw new RuntimeException("simulate processor failure after calling skipCurrentPipeline()");
});
TestProcessor processor2 = new TestProcessor(doc -> {});
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor compoundProcessor = new CompoundProcessor(true, List.of(processor1, processor2), List.of(), relativeTimeProvider);
executeCompound(compoundProcessor, ingestDocument, (result, e) -> {});
assertThat(processor1.getInvokedCounter(), equalTo(1));
assertStats(0, compoundProcessor, 0, 1, 1, 0);
assertThat(processor2.getInvokedCounter(), equalTo(0));
assertStats(1, compoundProcessor, 0, 0, 0, 0);
}

public void testDontSkipFailureProcessor() throws Exception {
TestProcessor processor = new TestProcessor(doc -> {
doc.skipCurrentPipeline();
throw new RuntimeException("simulate processor failure after calling skipCurrentPipeline()");
});
TestProcessor failureProcessor1 = new TestProcessor(doc -> {});
TestProcessor failureProcessor2 = new TestProcessor(doc -> {});
LongSupplier relativeTimeProvider = mock(LongSupplier.class);
when(relativeTimeProvider.getAsLong()).thenReturn(0L);
CompoundProcessor compoundProcessor = new CompoundProcessor(
false,
List.of(processor),
List.of(failureProcessor1, failureProcessor2),
relativeTimeProvider
);
executeCompound(compoundProcessor, ingestDocument, (result, e) -> {});
assertThat(processor.getInvokedCounter(), equalTo(1));
assertStats(0, compoundProcessor, 0, 1, 1, 0);
assertThat(failureProcessor1.getInvokedCounter(), equalTo(1));
assertThat(failureProcessor2.getInvokedCounter(), equalTo(1));
}

private TestProcessor getTestProcessor(String tag, boolean isAsync, boolean shouldThrowException) {
return new TestProcessor(
tag,
Expand Down