From bcfa96d0fb0edbc5dce85d2548fb4dd531be8a04 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Tue, 20 Feb 2018 13:29:57 -0800 Subject: [PATCH 1/8] Continue registering pipelines after one pipeline parse failure. Ingest has been failing to apply existing pipelines from cluster-state into the in-memory representation that are no longer valid. One example of this is a pipeline with a script processor. If a cluster starts up with scripting disabled, these pipelines will not be loaded. Even though GETing a pipeline worked, indexing operations claimed that this pipeline did not exist. This is because one gets information from cluster-state and the other is from an in-memory data-structure. Now, two things happen 1. suppress the exceptions until after other successfull pipelines are loaded 2. replace failed pipelines with a placeholder pipeline called `Pipeline.EMPTY` If the pipeline execution service encounters `Pipeline.EMPTY`, it is known that something went wrong at the time of pipeline creation and an exception was thrown to the user at some point at start-up. closes #28269. --- .../ingest/common/IngestRestartIT.java | 52 +++++++++++++++++++ .../elasticsearch/ingest/IngestService.java | 4 -- .../org/elasticsearch/ingest/Pipeline.java | 1 + .../ingest/PipelineExecutionService.java | 3 ++ .../elasticsearch/ingest/PipelineStore.java | 8 ++- .../ingest/PipelineExecutionServiceTests.java | 18 +++++++ 6 files changed, 80 insertions(+), 6 deletions(-) diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java index c62a8fd237148..357d67f253147 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java @@ -33,6 +33,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.function.Consumer; import java.util.function.Function; import static org.hamcrest.Matchers.equalTo; @@ -64,6 +65,57 @@ protected Map, Object>> pluginScripts() { } } + public void testScriptDisabled() throws Exception { + String pipelineIdWithoutScript = randomAlphaOfLengthBetween(5, 10); + String pipelineIdWithScript = pipelineIdWithoutScript + "_script"; + internalCluster().startNode(); + + BytesReference pipelineWithScript = new BytesArray("{\n" + + " \"processors\" : [\n" + + " {\"script\" : {\"lang\": \"" + MockScriptEngine.NAME + "\", \"source\": \"my_script\"}}\n" + + " ]\n" + + "}"); + BytesReference pipelineWithoutScript = new BytesArray("{\n" + + " \"processors\" : [\n" + + " {\"set\" : {\"field\": \"y\", \"value\": 0}}\n" + + " ]\n" + + "}"); + + Consumer checkPipelineExists = (id) -> assertThat(client().admin().cluster().prepareGetPipeline(id) + .get().pipelines().get(0).getId(), equalTo(id)); + + client().admin().cluster().preparePutPipeline(pipelineIdWithScript, pipelineWithScript, XContentType.JSON).get(); + client().admin().cluster().preparePutPipeline(pipelineIdWithoutScript, pipelineWithoutScript, XContentType.JSON).get(); + + checkPipelineExists.accept(pipelineIdWithScript); + checkPipelineExists.accept(pipelineIdWithoutScript); + + internalCluster().stopCurrentMasterNode(); + internalCluster().startNode(Settings.builder().put("script.allowed_types", "none")); + + checkPipelineExists.accept(pipelineIdWithoutScript); + checkPipelineExists.accept(pipelineIdWithScript); + + client().prepareIndex("index", "doc", "1") + .setSource("x", 0) + .setPipeline(pipelineIdWithoutScript) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, + () -> client().prepareIndex("index", "doc", "2") + .setSource("x", 0) + .setPipeline(pipelineIdWithScript) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get()); + assertThat(exception.getMessage(), + equalTo("pipeline with id [" + pipelineIdWithScript + "] was not parsed successfully, check logs at start-up for exceptions")); + + Map source = client().prepareGet("index", "doc", "1").get().getSource(); + assertThat(source.get("x"), equalTo(0)); + assertThat(source.get("y"), equalTo(0)); + } + public void testPipelineWithScriptProcessorThatHasStoredScript() throws Exception { internalCluster().startNode(); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 4a018ca025896..ad2b8643f7ae3 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -25,8 +25,6 @@ import java.util.List; import java.util.Map; -import org.elasticsearch.common.settings.ClusterSettings; -import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; import org.elasticsearch.index.analysis.AnalysisRegistry; @@ -34,8 +32,6 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; -import static org.elasticsearch.common.settings.Setting.Property; - /** * Holder class for several ingest related services. */ diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index 473b555c05d22..20464d240aa6f 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -32,6 +32,7 @@ */ public final class Pipeline { + static final Pipeline EMPTY = new Pipeline("_empty", null, null, new CompoundProcessor()); static final String DESCRIPTION_KEY = "description"; static final String PROCESSORS_KEY = "processors"; static final String VERSION_KEY = "version"; diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java b/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java index cec622f4a2587..1d9c5a0b64143 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java @@ -192,6 +192,9 @@ private Pipeline getPipeline(String pipelineId) { Pipeline pipeline = store.get(pipelineId); if (pipeline == null) { throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); + } else if (pipeline == Pipeline.EMPTY) { + throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] was not parsed successfully," + + " check logs at start-up for exceptions"); } return pipeline; } diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java b/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java index 21372e46e5f3d..962a0d1d963b9 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java @@ -81,16 +81,20 @@ void innerUpdatePipelines(ClusterState previousState, ClusterState state) { } Map pipelines = new HashMap<>(); + ArrayList exceptions = new ArrayList<>(); for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) { try { pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories)); } catch (ElasticsearchParseException e) { - throw e; + pipelines.put(pipeline.getId(), Pipeline.EMPTY); + exceptions.add(e); } catch (Exception e) { - throw new ElasticsearchParseException("Error updating pipeline with id [" + pipeline.getId() + "]", e); + pipelines.put(pipeline.getId(), Pipeline.EMPTY); + exceptions.add(new ElasticsearchParseException("Error updating pipeline with id [" + pipeline.getId() + "]", e)); } } this.pipelines = Collections.unmodifiableMap(pipelines); + ExceptionsHelper.rethrowAndSuppress(exceptions); } /** diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java index 44c8e78bef703..4acae978a848b 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java @@ -91,6 +91,24 @@ public void testExecuteIndexPipelineDoesNotExist() { verify(completionHandler, never()).accept(anyBoolean()); } + public void testExecuteIndexPipelineExistsButFailedParsing() { + when(store.get("_id")).thenReturn(Pipeline.EMPTY); + IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); + @SuppressWarnings("unchecked") + Consumer failureHandler = mock(Consumer.class); + @SuppressWarnings("unchecked") + Consumer completionHandler = mock(Consumer.class); + try { + executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); + fail("IllegalArgumentException expected"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), + equalTo("pipeline with id [_id] was not parsed successfully, check logs at start-up for exceptions")); + } + verify(failureHandler, never()).accept(any(Exception.class)); + verify(completionHandler, never()).accept(anyBoolean()); + } + public void testExecuteBulkPipelineDoesNotExist() { CompoundProcessor processor = mock(CompoundProcessor.class); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor)); From 1680c0f62ae27e630a4a97d81f072f52aa1a093e Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Wed, 21 Feb 2018 10:02:15 -0800 Subject: [PATCH 2/8] move from Pipeline.Empty to storing exception in description! --- .../org/elasticsearch/ingest/common/IngestRestartIT.java | 3 ++- server/src/main/java/org/elasticsearch/ingest/Pipeline.java | 1 - .../org/elasticsearch/ingest/PipelineExecutionService.java | 6 +++--- .../main/java/org/elasticsearch/ingest/PipelineStore.java | 6 ++++-- .../elasticsearch/ingest/PipelineExecutionServiceTests.java | 4 ++-- 5 files changed, 11 insertions(+), 9 deletions(-) diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java index 357d67f253147..0b2c629c3ec0a 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java @@ -109,7 +109,8 @@ public void testScriptDisabled() throws Exception { .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .get()); assertThat(exception.getMessage(), - equalTo("pipeline with id [" + pipelineIdWithScript + "] was not parsed successfully, check logs at start-up for exceptions")); + equalTo("pipeline with id [" + pipelineIdWithScript + "] was not parsed successfully." + + " java.lang.IllegalArgumentException: cannot execute [inline] scripts")); Map source = client().prepareGet("index", "doc", "1").get().getSource(); assertThat(source.get("x"), equalTo(0)); diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index 20464d240aa6f..473b555c05d22 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -32,7 +32,6 @@ */ public final class Pipeline { - static final Pipeline EMPTY = new Pipeline("_empty", null, null, new CompoundProcessor()); static final String DESCRIPTION_KEY = "description"; static final String PROCESSORS_KEY = "processors"; static final String VERSION_KEY = "version"; diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java b/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java index 8a54d70388ba3..ef9104e34bdca 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java @@ -199,9 +199,9 @@ private Pipeline getPipeline(String pipelineId) { Pipeline pipeline = store.get(pipelineId); if (pipeline == null) { throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); - } else if (pipeline == Pipeline.EMPTY) { - throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] was not parsed successfully," + - " check logs at start-up for exceptions"); + } else if (pipeline.getId().equals("invalid_" + pipelineId)) { + throw new IllegalArgumentException( + "pipeline with id [" + pipelineId + "] was not parsed successfully. " + pipeline.getDescription()); } return pipeline; } diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java b/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java index 962a0d1d963b9..4d811f381c151 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java @@ -86,10 +86,12 @@ void innerUpdatePipelines(ClusterState previousState, ClusterState state) { try { pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories)); } catch (ElasticsearchParseException e) { - pipelines.put(pipeline.getId(), Pipeline.EMPTY); + pipelines.put(pipeline.getId(), new Pipeline("invalid_" + pipeline.getId(), e.getMessage(), + null, new CompoundProcessor())); exceptions.add(e); } catch (Exception e) { - pipelines.put(pipeline.getId(), Pipeline.EMPTY); + pipelines.put(pipeline.getId(), new Pipeline("invalid_" + pipeline.getId(), e.getMessage(), + null, new CompoundProcessor())); exceptions.add(new ElasticsearchParseException("Error updating pipeline with id [" + pipeline.getId() + "]", e)); } } diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java index ee6cd58542f7d..b15c3876527bd 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java @@ -93,7 +93,7 @@ public void testExecuteIndexPipelineDoesNotExist() { } public void testExecuteIndexPipelineExistsButFailedParsing() { - when(store.get("_id")).thenReturn(Pipeline.EMPTY); + when(store.get("_id")).thenReturn(new Pipeline("invalid__id", "error", null, new CompoundProcessor())); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); @SuppressWarnings("unchecked") Consumer failureHandler = mock(Consumer.class); @@ -104,7 +104,7 @@ public void testExecuteIndexPipelineExistsButFailedParsing() { fail("IllegalArgumentException expected"); } catch (IllegalArgumentException e) { assertThat(e.getMessage(), - equalTo("pipeline with id [_id] was not parsed successfully, check logs at start-up for exceptions")); + equalTo("pipeline with id [_id] was not parsed successfully. error")); } verify(failureHandler, never()).accept(any(Exception.class)); verify(completionHandler, never()).accept(anyBoolean()); From 0d3cabaaeb4d34872cf7a38582303f5b4d5409a1 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Wed, 21 Feb 2018 13:11:45 -0800 Subject: [PATCH 3/8] fix pipelinestore test on error. pipeline is no longer null --- .../java/org/elasticsearch/ingest/PipelineStoreTests.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java index bb0d57871208c..6112db3079ef7 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java @@ -165,7 +165,9 @@ public void testPutWithErrorResponse() { assertThat(e.getMessage(), equalTo("[processors] required property is missing")); } pipeline = store.get(id); - assertThat(pipeline, nullValue()); + assertNotNull(pipeline); + assertThat(pipeline.getId(), equalTo("invalid__id")); + assertThat(pipeline.getDescription(), equalTo("[processors] required property is missing")); } public void testDelete() { From e25ea29e4ca559a81250838418d53b80bbbb103d Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Wed, 21 Feb 2018 14:04:16 -0800 Subject: [PATCH 4/8] more test fixes --- .../test/java/org/elasticsearch/ingest/IngestClientIT.java | 3 +++ .../ingest/IngestProcessorNotInstalledOnAllNodesIT.java | 5 ++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java b/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java index dbbc8e443c076..f68a61b4bc7d9 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java @@ -271,5 +271,8 @@ public void testPutWithPipelineFactoryError() throws Exception { assertNotNull(ex); assertThat(ex.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]")); } + + GetPipelineResponse response = client().admin().cluster().prepareGetPipeline("_id").get(); + assertFalse(response.isFound()); } } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java b/server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java index 645933348879c..4aa24fbaddcaf 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java @@ -104,7 +104,10 @@ public void testFailStartNode() throws Exception { installPlugin = false; String node2 = internalCluster().startNode(); pipeline = internalCluster().getInstance(NodeService.class, node2).getIngestService().getPipelineStore().get("_id"); - assertThat(pipeline, nullValue()); + + assertNotNull(pipeline); + assertThat(pipeline.getId(), equalTo("invalid__id")); + assertThat(pipeline.getDescription(), equalTo("No processor type exists with name [test]")); } } From 2da875cf87c68bbdb52fb7b4909c2daddfd9704b Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Wed, 21 Feb 2018 15:13:20 -0800 Subject: [PATCH 5/8] cleanup state --- .../test/java/org/elasticsearch/ingest/IngestClientIT.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java b/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java index f68a61b4bc7d9..9bd685cd5e8ca 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java @@ -172,6 +172,10 @@ public void testBulkWithIngestFailures() throws Exception { assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult()); } } + + // cleanup + WritePipelineResponse deletePipelineResponse = client().admin().cluster().prepareDeletePipeline("_id").get(); + assertTrue(deletePipelineResponse.isAcknowledged()); } public void testBulkWithUpsert() throws Exception { From dfb26798238f30556af3e0113e614ce91a8e307e Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Tue, 6 Mar 2018 10:43:07 -0800 Subject: [PATCH 6/8] change mode to insert pipeline with fail processor --- .../ingest/common/IngestRestartIT.java | 20 ++++++++-- .../ingest/PipelineExecutionService.java | 3 -- .../elasticsearch/ingest/PipelineStore.java | 31 +++++++++++++--- .../elasticsearch/ingest/IngestClientIT.java | 4 -- ...gestProcessorNotInstalledOnAllNodesIT.java | 5 +-- .../ingest/PipelineExecutionServiceTests.java | 37 ++++++++++++------- .../ingest/PipelineStoreTests.java | 9 +++-- 7 files changed, 72 insertions(+), 37 deletions(-) diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java index 0b2c629c3ec0a..a8ca20485c451 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java @@ -18,10 +18,14 @@ */ package org.elasticsearch.ingest.common; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.MockScriptEngine; @@ -90,6 +94,7 @@ public void testScriptDisabled() throws Exception { checkPipelineExists.accept(pipelineIdWithScript); checkPipelineExists.accept(pipelineIdWithoutScript); + internalCluster().stopCurrentMasterNode(); internalCluster().startNode(Settings.builder().put("script.allowed_types", "none")); @@ -102,15 +107,22 @@ public void testScriptDisabled() throws Exception { .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .get(); - IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, + ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> client().prepareIndex("index", "doc", "2") .setSource("x", 0) .setPipeline(pipelineIdWithScript) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .get()); - assertThat(exception.getMessage(), - equalTo("pipeline with id [" + pipelineIdWithScript + "] was not parsed successfully." + - " java.lang.IllegalArgumentException: cannot execute [inline] scripts")); + assertThat(exception.getHeaderKeys(), equalTo(Sets.newHashSet("processor_type"))); + assertThat(exception.getHeader("processor_type"), equalTo(Arrays.asList("unknown"))); + assertThat(exception.getRootCause().getMessage(), + equalTo("pipeline with id [" + pipelineIdWithScript + "] could not be loaded, caused by " + + "[ElasticsearchParseException[Error updating pipeline with id [" + pipelineIdWithScript + "]]; " + + "nested: ElasticsearchException[java.lang.IllegalArgumentException: cannot execute [inline] scripts]; " + + "nested: IllegalArgumentException[cannot execute [inline] scripts];; " + + "ElasticsearchException[java.lang.IllegalArgumentException: cannot execute [inline] scripts]; " + + "nested: IllegalArgumentException[cannot execute [inline] scripts];; java.lang.IllegalArgumentException: " + + "cannot execute [inline] scripts]")); Map source = client().prepareGet("index", "doc", "1").get().getSource(); assertThat(source.get("x"), equalTo(0)); diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java b/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java index ef9104e34bdca..31bedd4ee1777 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java @@ -199,9 +199,6 @@ private Pipeline getPipeline(String pipelineId) { Pipeline pipeline = store.get(pipelineId); if (pipeline == null) { throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); - } else if (pipeline.getId().equals("invalid_" + pipelineId)) { - throw new IllegalArgumentException( - "pipeline with id [" + pipelineId + "] was not parsed successfully. " + pipeline.getDescription()); } return pipeline; } diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java b/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java index 4d811f381c151..c6dce0bd45b3c 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java @@ -81,24 +81,43 @@ void innerUpdatePipelines(ClusterState previousState, ClusterState state) { } Map pipelines = new HashMap<>(); - ArrayList exceptions = new ArrayList<>(); + List exceptions = new ArrayList<>(); for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) { try { pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories)); } catch (ElasticsearchParseException e) { - pipelines.put(pipeline.getId(), new Pipeline("invalid_" + pipeline.getId(), e.getMessage(), - null, new CompoundProcessor())); + pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), e)); exceptions.add(e); } catch (Exception e) { - pipelines.put(pipeline.getId(), new Pipeline("invalid_" + pipeline.getId(), e.getMessage(), - null, new CompoundProcessor())); - exceptions.add(new ElasticsearchParseException("Error updating pipeline with id [" + pipeline.getId() + "]", e)); + ElasticsearchParseException parseException = new ElasticsearchParseException( + "Error updating pipeline with id [" + pipeline.getId() + "]", e); + pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), parseException)); + exceptions.add(parseException); } } this.pipelines = Collections.unmodifiableMap(pipelines); ExceptionsHelper.rethrowAndSuppress(exceptions); } + private Pipeline substitutePipeline(String id, ElasticsearchParseException e) { + String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null; + String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown"; + String errorMessage = "pipeline with id [" + id + "] could not be loaded, caused by [" + e.getDetailedMessage() + "]"; + Processor failureProcessor = new AbstractProcessor(tag) { + @Override + public void execute(IngestDocument ingestDocument) { + throw new IllegalStateException(errorMessage); + } + + @Override + public String getType() { + return type; + } + }; + String description = "this is a place holder pipeline, because pipeline with id [" + id + "] could not be loaded"; + return new Pipeline(id, description, null, new CompoundProcessor(failureProcessor)); + } + /** * Deletes the pipeline specified by id in the request. */ diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java b/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java index 9bd685cd5e8ca..a448b62651d64 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java @@ -36,16 +36,12 @@ import org.elasticsearch.action.ingest.SimulatePipelineRequest; import org.elasticsearch.action.ingest.SimulatePipelineResponse; import org.elasticsearch.action.ingest.WritePipelineResponse; -import org.elasticsearch.action.support.replication.TransportReplicationActionTests; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Requests; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.script.Script; -import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ESIntegTestCase; import java.util.Arrays; diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java b/server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java index 4aa24fbaddcaf..846d43e74a636 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java @@ -37,7 +37,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; @ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST) public class IngestProcessorNotInstalledOnAllNodesIT extends ESIntegTestCase { @@ -106,8 +105,8 @@ public void testFailStartNode() throws Exception { pipeline = internalCluster().getInstance(NodeService.class, node2).getIngestService().getPipelineStore().get("_id"); assertNotNull(pipeline); - assertThat(pipeline.getId(), equalTo("invalid__id")); - assertThat(pipeline.getDescription(), equalTo("No processor type exists with name [test]")); + assertThat(pipeline.getId(), equalTo("_id")); + assertThat(pipeline.getDescription(), equalTo("this is a place holder pipeline, because pipeline with id [_id] could not be loaded")); } } diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java index b15c3876527bd..5a3b57a6d7e0b 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.ingest; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkRequest; @@ -93,21 +94,29 @@ public void testExecuteIndexPipelineDoesNotExist() { } public void testExecuteIndexPipelineExistsButFailedParsing() { - when(store.get("_id")).thenReturn(new Pipeline("invalid__id", "error", null, new CompoundProcessor())); + when(store.get("_id")).thenReturn(new Pipeline("_id", "stub", null, + new CompoundProcessor(new AbstractProcessor("mock") { + @Override + public void execute(IngestDocument ingestDocument) { + throw new IllegalStateException("error"); + } + + @Override + public String getType() { + return null; + } + }))); + SetOnce failed = new SetOnce<>(); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - @SuppressWarnings("unchecked") - Consumer failureHandler = mock(Consumer.class); - @SuppressWarnings("unchecked") - Consumer completionHandler = mock(Consumer.class); - try { - executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); - fail("IllegalArgumentException expected"); - } catch (IllegalArgumentException e) { - assertThat(e.getMessage(), - equalTo("pipeline with id [_id] was not parsed successfully. error")); - } - verify(failureHandler, never()).accept(any(Exception.class)); - verify(completionHandler, never()).accept(anyBoolean()); + Consumer failureHandler = (e) -> { + assertThat(e.getCause().getClass(), equalTo(IllegalArgumentException.class)); + assertThat(e.getCause().getCause().getClass(), equalTo(IllegalStateException.class)); + assertThat(e.getCause().getCause().getMessage(), equalTo("error")); + failed.set(true); + }; + Consumer completionHandler = (e) -> failed.set(false); + executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); + assertTrue(failed.get()); } public void testExecuteBulkPipelineDoesNotExist() { diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java index 6112db3079ef7..250bb5059cf58 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java @@ -29,7 +29,6 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.ESTestCase; @@ -166,8 +165,12 @@ public void testPutWithErrorResponse() { } pipeline = store.get(id); assertNotNull(pipeline); - assertThat(pipeline.getId(), equalTo("invalid__id")); - assertThat(pipeline.getDescription(), equalTo("[processors] required property is missing")); + assertThat(pipeline.getId(), equalTo("_id")); + assertThat(pipeline.getDescription(), equalTo("this is a place holder pipeline, because pipeline with" + + " id [_id] could not be loaded")); + assertThat(pipeline.getProcessors().size(), equalTo(1)); + assertNull(pipeline.getProcessors().get(0).getTag()); + assertThat(pipeline.getProcessors().get(0).getType(), equalTo("unknown")); } public void testDelete() { From 02d3f97d5e815aad1260f577ed3ba23a1216404c Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Thu, 8 Mar 2018 07:58:33 -0800 Subject: [PATCH 7/8] fix linelength --- .../ingest/IngestProcessorNotInstalledOnAllNodesIT.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java b/server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java index 846d43e74a636..03777b98ab73e 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java @@ -106,7 +106,8 @@ public void testFailStartNode() throws Exception { assertNotNull(pipeline); assertThat(pipeline.getId(), equalTo("_id")); - assertThat(pipeline.getDescription(), equalTo("this is a place holder pipeline, because pipeline with id [_id] could not be loaded")); + assertThat(pipeline.getDescription(), equalTo("this is a place holder pipeline, " + + "because pipeline with id [_id] could not be loaded")); } } From 534e1556250be88f590323ff6f0c8df6826d2267 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Thu, 8 Mar 2018 10:47:19 -0800 Subject: [PATCH 8/8] fix cleanup --- .../test/java/org/elasticsearch/ingest/IngestClientIT.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java b/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java index a448b62651d64..809a81b687e80 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java @@ -126,6 +126,10 @@ public void testSimulate() throws Exception { IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, source); assertThat(simulateDocumentBaseResult.getIngestDocument().getSourceAndMetadata(), equalTo(ingestDocument.getSourceAndMetadata())); assertThat(simulateDocumentBaseResult.getFailure(), nullValue()); + + // cleanup + WritePipelineResponse deletePipelineResponse = client().admin().cluster().prepareDeletePipeline("_id").get(); + assertTrue(deletePipelineResponse.isAcknowledged()); } public void testBulkWithIngestFailures() throws Exception {