diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml index bdcfdc7f6b18b..65e888d8caa32 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml @@ -641,7 +641,6 @@ teardown: - match: { acknowledged: true } - do: - catch: /illegal_state_exception/ ingest.simulate: verbose: true body: > @@ -667,8 +666,10 @@ teardown: } ] } -- match: { error.root_cause.0.type: "illegal_state_exception" } -- match: { error.root_cause.0.reason: "Cycle detected for pipeline: inner" } +- length: { docs: 1 } +- length: { docs.0.processor_results: 1 } +- match: { docs.0.processor_results.0.error.reason: "java.lang.IllegalArgumentException: java.lang.IllegalStateException: Cycle detected for pipeline: outer" } +- match: { docs.0.processor_results.0.error.caused_by.caused_by.reason: "Cycle detected for pipeline: outer" } --- "Test verbose simulate with Pipeline Processor with Multiple Pipelines": diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java index c081707f4dbda..e2b44ae2a7a60 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java @@ -21,17 +21,13 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Pipeline; -import org.elasticsearch.ingest.CompoundProcessor; -import org.elasticsearch.ingest.PipelineProcessor; import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; -import java.util.Collections; -import java.util.IdentityHashMap; import java.util.List; -import java.util.Set; import static org.elasticsearch.ingest.TrackingResultProcessor.decorate; @@ -46,11 +42,9 @@ class SimulateExecutionService { } SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose) { - // Prevent cycles in pipeline decoration - final Set pipelinesSeen = Collections.newSetFromMap(new IdentityHashMap<>()); if (verbose) { List processorResultList = new ArrayList<>(); - CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList, pipelinesSeen); + CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); try { Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), verbosePipelineProcessor); diff --git a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java index 9078dc86c1b07..2493f291bcddf 100644 --- a/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java @@ -62,10 +62,7 @@ public class ConditionalProcessor extends AbstractProcessor { @Override public IngestDocument execute(IngestDocument ingestDocument) throws Exception { - IngestConditionalScript script = - scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams()); - if (script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()))) { - // Only record metric if the script evaluates to true + if (evaluate(ingestDocument)) { long startTimeInNanos = relativeTimeProvider.getAsLong(); try { metric.preIngest(); @@ -81,6 +78,12 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception { return ingestDocument; } + boolean evaluate(IngestDocument ingestDocument) { + IngestConditionalScript script = + scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams()); + return script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata())); + } + Processor getProcessor() { return processor; } diff --git a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index 41a984be5adad..4b78715144649 100644 --- a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -19,11 +19,11 @@ package org.elasticsearch.ingest; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ingest.SimulateProcessorResult; import java.util.ArrayList; import java.util.List; -import java.util.Set; /** * Processor to be used within Simulate API to keep track of processors executed in pipeline. @@ -42,14 +42,46 @@ public final class TrackingResultProcessor implements Processor { @Override public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + Processor processor = actualProcessor; try { - actualProcessor.execute(ingestDocument); - processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument))); + if (processor instanceof ConditionalProcessor) { + ConditionalProcessor conditionalProcessor = (ConditionalProcessor) processor; + if (conditionalProcessor.evaluate(ingestDocument) == false) { + return ingestDocument; + } + if (conditionalProcessor.getProcessor() instanceof PipelineProcessor) { + processor = conditionalProcessor.getProcessor(); + } + } + if (processor instanceof PipelineProcessor) { + PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor); + Pipeline pipeline = pipelineProcessor.getPipeline(); + //runtime check for cycles against a copy of the document. This is needed to properly handle conditionals around pipelines + try { + IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument); + ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline()); + } catch (ElasticsearchException elasticsearchException) { + if (elasticsearchException.getCause().getCause() instanceof IllegalStateException) { + throw elasticsearchException; + } + //else do nothing, let the tracking processors throw the exception while recording the path up to the failure + } catch (Exception e) { + // do nothing, let the tracking processors throw the exception while recording the path up to the failure + } + //now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it + CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList); + Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(), + verbosePipelineProcessor); + ingestDocument.executePipeline(verbosePipeline); + } else { + processor.execute(ingestDocument); + processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument))); + } } catch (Exception e) { if (ignoreFailure) { - processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument), e)); + processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument), e)); } else { - processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), e)); + processorResultList.add(new SimulateProcessorResult(processor.getTag(), e)); } throw e; } @@ -66,35 +98,19 @@ public String getTag() { return actualProcessor.getTag(); } - public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List processorResultList, - Set pipelinesSeen) { + public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List processorResultList) { List processors = new ArrayList<>(compoundProcessor.getProcessors().size()); for (Processor processor : compoundProcessor.getProcessors()) { - if (processor instanceof PipelineProcessor) { - PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor); - if (pipelinesSeen.add(pipelineProcessor) == false) { - throw new IllegalStateException("Cycle detected for pipeline: " + pipelineProcessor.getPipeline().getId()); - } - processors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList, pipelinesSeen)); - pipelinesSeen.remove(pipelineProcessor); - } else if (processor instanceof CompoundProcessor) { - processors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen)); + if (processor instanceof CompoundProcessor) { + processors.add(decorate((CompoundProcessor) processor, processorResultList)); } else { processors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList)); } } List onFailureProcessors = new ArrayList<>(compoundProcessor.getProcessors().size()); for (Processor processor : compoundProcessor.getOnFailureProcessors()) { - if (processor instanceof PipelineProcessor) { - PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor); - if (pipelinesSeen.add(pipelineProcessor) == false) { - throw new IllegalStateException("Cycle detected for pipeline: " + pipelineProcessor.getPipeline().getId()); - } - onFailureProcessors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList, - pipelinesSeen)); - pipelinesSeen.remove(pipelineProcessor); - } else if (processor instanceof CompoundProcessor) { - onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen)); + if (processor instanceof CompoundProcessor) { + onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList)); } else { onFailureProcessors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList)); } diff --git a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java index fbb46fc8787af..2c047283ed1bb 100644 --- a/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/TrackingResultProcessorTests.java @@ -21,17 +21,22 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ingest.SimulateProcessorResult; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.script.MockScriptEngine; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptModule; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.script.ScriptType; import org.elasticsearch.test.ESTestCase; import org.junit.Before; +import org.mockito.Mockito; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.IdentityHashMap; import java.util.List; import java.util.Map; -import java.util.Set; import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_MESSAGE_FIELD; import static org.elasticsearch.ingest.CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD; @@ -39,10 +44,11 @@ import static org.elasticsearch.ingest.TrackingResultProcessor.decorate; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -50,13 +56,11 @@ public class TrackingResultProcessorTests extends ESTestCase { private IngestDocument ingestDocument; private List resultList; - private Set pipelinesSeen; @Before public void init() { ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>()); resultList = new ArrayList<>(); - pipelinesSeen = Collections.newSetFromMap(new IdentityHashMap<>()); } public void testActualProcessor() throws Exception { @@ -76,9 +80,9 @@ public void testActualProcessor() throws Exception { public void testActualCompoundProcessorWithoutOnFailure() throws Exception { RuntimeException exception = new RuntimeException("processor failed"); - TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); + TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); CompoundProcessor actualProcessor = new CompoundProcessor(testProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); try { trackingProcessor.execute(ingestDocument); @@ -97,14 +101,14 @@ public void testActualCompoundProcessorWithoutOnFailure() throws Exception { public void testActualCompoundProcessorWithOnFailure() throws Exception { RuntimeException exception = new RuntimeException("fail"); - TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; }); + TestProcessor failProcessor = new TestProcessor("fail", "test", ingestDocument -> { throw exception; }); TestProcessor onFailureProcessor = new TestProcessor("success", "test", ingestDocument -> {}); CompoundProcessor actualProcessor = new CompoundProcessor(false, Arrays.asList(new CompoundProcessor(false, Arrays.asList(failProcessor, onFailureProcessor), Arrays.asList(onFailureProcessor, failProcessor))), Arrays.asList(onFailureProcessor)); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); trackingProcessor.execute(ingestDocument); SimulateProcessorResult expectedFailResult = new SimulateProcessorResult(failProcessor.getTag(), ingestDocument); @@ -139,10 +143,10 @@ public void testActualCompoundProcessorWithOnFailure() throws Exception { public void testActualCompoundProcessorWithIgnoreFailure() throws Exception { RuntimeException exception = new RuntimeException("processor failed"); - TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); + TestProcessor testProcessor = new TestProcessor(ingestDocument -> { throw exception; }); CompoundProcessor actualProcessor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList()); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); trackingProcessor.execute(ingestDocument); @@ -154,6 +158,45 @@ public void testActualCompoundProcessorWithIgnoreFailure() throws Exception { assertThat(resultList.get(0).getProcessorTag(), equalTo(expectedResult.getProcessorTag())); } + public void testActualCompoundProcessorWithFalseConditional() throws Exception { + String key1 = randomAlphaOfLength(10); + String key2 = randomAlphaOfLength(10); + String key3 = randomAlphaOfLength(10); + + String scriptName = "conditionalScript"; + ScriptService scriptService = new ScriptService(Settings.builder().build(), Collections.singletonMap(Script.DEFAULT_SCRIPT_LANG, + new MockScriptEngine(Script.DEFAULT_SCRIPT_LANG, Collections.singletonMap(scriptName, ctx -> false), Collections.emptyMap())), + new HashMap<>(ScriptModule.CORE_CONTEXTS) + ); + + CompoundProcessor compoundProcessor = new CompoundProcessor( + new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key1, randomInt()); }), + new ConditionalProcessor( + randomAlphaOfLength(10), + new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptName, Collections.emptyMap()), scriptService, + new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key2, randomInt()); })), + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key3, randomInt()); })); + + CompoundProcessor trackingProcessor = decorate(compoundProcessor, resultList); + trackingProcessor.execute(ingestDocument); + SimulateProcessorResult expectedResult = new SimulateProcessorResult(compoundProcessor.getTag(), ingestDocument); + + //the step for key 2 is never executed due to conditional and thus not part of the result set + assertThat(resultList.size(), equalTo(2)); + + assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key3)); + + assertTrue(resultList.get(1).getIngestDocument().hasField(key1)); + assertFalse(resultList.get(1).getIngestDocument().hasField(key2)); + assertTrue(resultList.get(1).getIngestDocument().hasField(key3)); + + assertThat(resultList.get(1).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(1).getFailure(), nullValue()); + assertThat(resultList.get(1).getProcessorTag(), nullValue()); + } + public void testActualPipelineProcessor() throws Exception { String pipelineId = "pipeline1"; IngestService ingestService = mock(IngestService.class); @@ -176,13 +219,13 @@ pipelineId, null, null, new CompoundProcessor( PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); trackingProcessor.execute(ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); - verify(ingestService).getPipeline(pipelineId); + verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId); assertThat(resultList.size(), equalTo(3)); assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); @@ -198,6 +241,142 @@ pipelineId, null, null, new CompoundProcessor( assertThat(resultList.get(2).getProcessorTag(), nullValue()); } + public void testActualPipelineProcessorWithTrueConditional() throws Exception { + String pipelineId1 = "pipeline1"; + String pipelineId2 = "pipeline2"; + IngestService ingestService = mock(IngestService.class); + Map pipelineConfig0 = new HashMap<>(); + pipelineConfig0.put("name", pipelineId1); + Map pipelineConfig1 = new HashMap<>(); + pipelineConfig1.put("name", pipelineId1); + Map pipelineConfig2 = new HashMap<>(); + pipelineConfig2.put("name", pipelineId2); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + String key1 = randomAlphaOfLength(10); + String key2 = randomAlphaOfLength(10); + String key3 = randomAlphaOfLength(10); + + String scriptName = "conditionalScript"; + + ScriptService scriptService = new ScriptService(Settings.builder().build(), Collections.singletonMap(Script.DEFAULT_SCRIPT_LANG, + new MockScriptEngine(Script.DEFAULT_SCRIPT_LANG, Collections.singletonMap(scriptName, ctx -> true), Collections.emptyMap())), + new HashMap<>(ScriptModule.CORE_CONTEXTS) + ); + + Pipeline pipeline1 = new Pipeline( + pipelineId1, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key1, randomInt()); }), + new ConditionalProcessor( + randomAlphaOfLength(10), + new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptName, Collections.emptyMap()), scriptService, + factory.create(Collections.emptyMap(), null, pipelineConfig2)), + new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key3, randomInt()); }) + ) + ); + + Pipeline pipeline2 = new Pipeline( + pipelineId2, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key2, randomInt()); }))); + + when(ingestService.getPipeline(pipelineId1)).thenReturn(pipeline1); + when(ingestService.getPipeline(pipelineId2)).thenReturn(pipeline2); + + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0); + CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); + + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + + trackingProcessor.execute(ingestDocument); + + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + + verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId1); + verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId2); + assertThat(resultList.size(), equalTo(3)); + + assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key3)); + + assertTrue(resultList.get(1).getIngestDocument().hasField(key1)); + assertTrue(resultList.get(1).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(1).getIngestDocument().hasField(key3)); + + assertThat(resultList.get(2).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(2).getFailure(), nullValue()); + assertThat(resultList.get(2).getProcessorTag(), nullValue()); + } + + public void testActualPipelineProcessorWithFalseConditional() throws Exception { + String pipelineId1 = "pipeline1"; + String pipelineId2 = "pipeline2"; + IngestService ingestService = mock(IngestService.class); + Map pipelineConfig0 = new HashMap<>(); + pipelineConfig0.put("name", pipelineId1); + Map pipelineConfig1 = new HashMap<>(); + pipelineConfig1.put("name", pipelineId1); + Map pipelineConfig2 = new HashMap<>(); + pipelineConfig2.put("name", pipelineId2); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + + String key1 = randomAlphaOfLength(10); + String key2 = randomAlphaOfLength(10); + String key3 = randomAlphaOfLength(10); + + String scriptName = "conditionalScript"; + + ScriptService scriptService = new ScriptService(Settings.builder().build(), Collections.singletonMap(Script.DEFAULT_SCRIPT_LANG, + new MockScriptEngine(Script.DEFAULT_SCRIPT_LANG, Collections.singletonMap(scriptName, ctx -> false), Collections.emptyMap())), + new HashMap<>(ScriptModule.CORE_CONTEXTS) + ); + + Pipeline pipeline1 = new Pipeline( + pipelineId1, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key1, randomInt()); }), + new ConditionalProcessor( + randomAlphaOfLength(10), + new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, scriptName, Collections.emptyMap()), scriptService, + factory.create(Collections.emptyMap(), null, pipelineConfig2)), + new TestProcessor(ingestDocument -> {ingestDocument.setFieldValue(key3, randomInt()); }) + ) + ); + + Pipeline pipeline2 = new Pipeline( + pipelineId2, null, null, new CompoundProcessor( + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key2, randomInt()); }))); + + when(ingestService.getPipeline(pipelineId1)).thenReturn(pipeline1); + when(ingestService.getPipeline(pipelineId2)).thenReturn(pipeline2); + + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0); + CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); + + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + + trackingProcessor.execute(ingestDocument); + + + SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); + + verify(ingestService, Mockito.atLeast(1)).getPipeline(pipelineId1); + verify(ingestService, Mockito.never()).getPipeline(pipelineId2); + assertThat(resultList.size(), equalTo(2)); + + assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key2)); + assertFalse(resultList.get(0).getIngestDocument().hasField(key3)); + + assertTrue(resultList.get(1).getIngestDocument().hasField(key1)); + assertFalse(resultList.get(1).getIngestDocument().hasField(key2)); + assertTrue(resultList.get(1).getIngestDocument().hasField(key3)); + + assertThat(resultList.get(1).getIngestDocument(), equalTo(expectedResult.getIngestDocument())); + assertThat(resultList.get(1).getFailure(), nullValue()); + assertThat(resultList.get(1).getProcessorTag(), nullValue()); + } + public void testActualPipelineProcessorWithHandledFailure() throws Exception { RuntimeException exception = new RuntimeException("processor failed"); @@ -226,13 +405,13 @@ pipelineId, null, null, new CompoundProcessor( PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); trackingProcessor.execute(ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); - verify(ingestService).getPipeline(pipelineId); + verify(ingestService, Mockito.atLeast(2)).getPipeline(pipelineId); assertThat(resultList.size(), equalTo(4)); assertTrue(resultList.get(0).getIngestDocument().hasField(key1)); @@ -253,25 +432,36 @@ pipelineId, null, null, new CompoundProcessor( } public void testActualPipelineProcessorWithCycle() throws Exception { - String pipelineId = "pipeline1"; + String pipelineId1 = "pipeline1"; + String pipelineId2 = "pipeline2"; IngestService ingestService = mock(IngestService.class); - Map pipelineConfig = new HashMap<>(); - pipelineConfig.put("name", pipelineId); + Map pipelineConfig0 = new HashMap<>(); + pipelineConfig0.put("name", pipelineId1); + Map pipelineConfig1 = new HashMap<>(); + pipelineConfig1.put("name", pipelineId1); + Map pipelineConfig2 = new HashMap<>(); + pipelineConfig2.put("name", pipelineId2); PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); - PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); - Pipeline pipeline = new Pipeline( - pipelineId, null, null, new CompoundProcessor(pipelineProcessor) - ); - when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); + Pipeline pipeline1 = new Pipeline( + pipelineId1, null, null, new CompoundProcessor(factory.create(Collections.emptyMap(), null, pipelineConfig2))); + + Pipeline pipeline2 = new Pipeline( + pipelineId2, null, null, new CompoundProcessor(factory.create(Collections.emptyMap(), null, pipelineConfig1))); + when(ingestService.getPipeline(pipelineId1)).thenReturn(pipeline1); + when(ingestService.getPipeline(pipelineId2)).thenReturn(pipeline2); + + PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig0); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor); - IllegalStateException exception = expectThrows(IllegalStateException.class, - () -> decorate(actualProcessor, resultList, pipelinesSeen)); - assertThat(exception.getMessage(), equalTo("Cycle detected for pipeline: pipeline1")); - } + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); + ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> trackingProcessor.execute(ingestDocument)); + assertThat(exception.getCause(), instanceOf(IllegalArgumentException.class)); + assertThat(exception.getCause().getCause(), instanceOf(IllegalStateException.class)); + assertThat(exception.getMessage(), containsString("Cycle detected for pipeline: pipeline1")); + } public void testActualPipelineProcessorRepeatedInvocation() throws Exception { String pipelineId = "pipeline1"; @@ -284,19 +474,19 @@ public void testActualPipelineProcessorRepeatedInvocation() throws Exception { PipelineProcessor pipelineProcessor = factory.create(Collections.emptyMap(), null, pipelineConfig); Pipeline pipeline = new Pipeline( pipelineId, null, null, new CompoundProcessor( - new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key1, randomInt()); })) + new TestProcessor(ingestDocument -> { ingestDocument.setFieldValue(key1, randomInt()); })) ); when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); CompoundProcessor actualProcessor = new CompoundProcessor(pipelineProcessor, pipelineProcessor); - CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList, pipelinesSeen); + CompoundProcessor trackingProcessor = decorate(actualProcessor, resultList); trackingProcessor.execute(ingestDocument); SimulateProcessorResult expectedResult = new SimulateProcessorResult(actualProcessor.getTag(), ingestDocument); - verify(ingestService, times(2)).getPipeline(pipelineId); + verify(ingestService, Mockito.atLeast(2)).getPipeline(pipelineId); assertThat(resultList.size(), equalTo(2)); assertThat(resultList.get(0).getIngestDocument(), not(equalTo(expectedResult.getIngestDocument())));