Skip to content

Commit

Permalink
Handle error conditions when simulating ingest pipelines with verbosi…
Browse files Browse the repository at this point in the history
…ty enabled (elastic#63327)
  • Loading branch information
danhermann committed Oct 13, 2020
1 parent d838e9e commit 70134d3
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,62 @@ teardown:
- is_true: docs.1.processor_results.1.doc._ingest.timestamp
- is_true: docs.1.processor_results.1.doc._ingest.pipeline

---
"Test verbose simulate with error in pipeline":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"rename" : {
"field" : "does_not_exist",
"target_field" : "_value"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.simulate:
verbose: true
body: >
{
"pipeline": {
"description": "_description",
"processors": [
{
"pipeline" : {
"name" : "my_pipeline"
}
}
]
},
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "bar",
"bar": "hello"
}
}
]
}
- length: { docs: 1 }
- length: { docs.0.processor_results: 2 }
- match: { docs.0.processor_results.0.processor_type: "pipeline" }
- match: { docs.0.processor_results.0.status: "success" }
- match: { docs.0.processor_results.1.processor_type: "rename" }
- match: { docs.0.processor_results.1.status: "error" }
- match: { docs.0.processor_results.1.error.root_cause.0.type: "illegal_argument_exception" }
- match: { docs.0.processor_results.1.error.root_cause.0.reason: "field [does_not_exist] doesn't exist" }
- match: { docs.0.processor_results.1.error.type: "illegal_argument_exception" }
- match: { docs.0.processor_results.1.error.reason: "field [does_not_exist] doesn't exist" }

---
"Test verbose simulate with on_failure":
- do:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
public final class IngestDocument {

public static final String INGEST_KEY = "_ingest";
public static final String PIPELINE_CYCLE_ERROR_MESSAGE = "Cycle detected for pipeline: ";
private static final String INGEST_KEY_PREFIX = INGEST_KEY + ".";
private static final String SOURCE_PREFIX = SourceFieldMapper.NAME + ".";

Expand Down Expand Up @@ -748,7 +749,7 @@ public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Except
handler.accept(result, e);
});
} else {
handler.accept(null, new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId()));
handler.accept(null, new IllegalStateException(PIPELINE_CYCLE_ERROR_MESSAGE + pipeline.getId()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.List;
import java.util.function.BiConsumer;

import static org.elasticsearch.ingest.IngestDocument.PIPELINE_CYCLE_ERROR_MESSAGE;

/**
* Processor to be used within Simulate API to keep track of processors executed in pipeline.
*/
Expand Down Expand Up @@ -73,20 +75,18 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
pipelineProcessor.getPipelineToCallName(ingestDocument) + ']');
}
ingestDocumentCopy.executePipeline(pipelineToCall, (result, e) -> {
// do nothing, let the tracking processors throw the exception while recording the path up to the failure
if (e instanceof ElasticsearchException) {
ElasticsearchException elasticsearchException = (ElasticsearchException) e;
//else do nothing, let the tracking processors throw the exception while recording the path up to the failure
if (elasticsearchException.getCause() instanceof IllegalStateException) {
if (ignoreFailure) {
processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getType(), pipelineProcessor.getTag(),
pipelineProcessor.getDescription(), new IngestDocument(ingestDocument), e, conditionalWithResult));
} else {
processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getType(), pipelineProcessor.getTag(),
pipelineProcessor.getDescription(), e, conditionalWithResult));
}
handler.accept(null, elasticsearchException);
// special handling for pipeline cycle errors
if (e instanceof ElasticsearchException &&
e.getCause() instanceof IllegalStateException &&
e.getCause().getMessage().startsWith(PIPELINE_CYCLE_ERROR_MESSAGE)) {
if (ignoreFailure) {
processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getType(), pipelineProcessor.getTag(),
pipelineProcessor.getDescription(), new IngestDocument(ingestDocument), e, conditionalWithResult));
} else {
processorResultList.add(new SimulateProcessorResult(pipelineProcessor.getType(), pipelineProcessor.getTag(),
pipelineProcessor.getDescription(), e, conditionalWithResult));
}
handler.accept(null, e);
} else {
//now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), null, processorResultList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,43 @@ pipelineId, null, null, new CompoundProcessor(
assertThat(resultList.get(4).getProcessorTag(), nullValue());
}

public void testActualPipelineProcessorWithUnhandledFailure() throws Exception {
String pipelineId = "pipeline1";
IngestService ingestService = createIngestService();
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put("name", pipelineId);
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);

String key1 = randomAlphaOfLength(10);
IllegalStateException exception = new IllegalStateException("Not a pipeline cycle error");

Pipeline pipeline = new Pipeline(
pipelineId, null, null, new CompoundProcessor(
new TestProcessor(ingestDocument -> ingestDocument.setFieldValue(key1, randomInt())),
new TestProcessor(ingestDocument -> { throw exception; }))
);
when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline);

PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, null, pipelineConfig);
CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor);

CompoundProcessor trackingProcessor = decorate(actualProcessor, null, resultList);

trackingProcessor.execute(ingestDocument, (result, e) -> {});

SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
actualProcessor.getDescription(), ingestDocument, null);
expectedResult.getIngestDocument().getIngestMetadata().put("pipeline", pipelineId);

verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId);

assertThat(resultList.size(), equalTo(3));
assertNull(resultList.get(0).getConditionalWithResult());
assertThat(resultList.get(0).getType(), equalTo("pipeline"));
assertTrue(resultList.get(1).getIngestDocument().hasField(key1));
assertThat(resultList.get(2).getFailure(), equalTo(exception));
}

public void testActualPipelineProcessorWithCycle() throws Exception {
String pipelineId1 = "pipeline1";
String pipelineId2 = "pipeline2";
Expand Down

0 comments on commit 70134d3

Please sign in to comment.