From d10692725550f140886421e05537cba18ac5aa13 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Thu, 8 Mar 2018 15:22:59 -0800 Subject: [PATCH] Continue registering pipelines after one pipeline parse failure. (#28752) Ingest has been failing to apply existing pipelines from cluster-state into the in-memory representation that are no longer valid. One example of this is a pipeline with a script processor. If a cluster starts up with scripting disabled, these pipelines will not be loaded. Even though GETing a pipeline worked, indexing operations claimed that this pipeline did not exist. This is because one gets information from cluster-state and the other is from an in-memory data-structure. Now, two things happen 1. suppress the exceptions until after other successful pipelines are loaded 2. replace failed pipelines with a placeholder pipeline If the pipeline execution service encounters the stubbed pipeline, it is known that something went wrong at the time of pipeline creation and an exception was thrown to the user at some point at start-up. closes #28269. --- .../ingest/common/IngestRestartIT.java | 65 +++++++++++++++++++ .../elasticsearch/ingest/IngestService.java | 4 -- .../elasticsearch/ingest/PipelineStore.java | 29 ++++++++- .../elasticsearch/ingest/IngestClientIT.java | 15 +++-- ...gestProcessorNotInstalledOnAllNodesIT.java | 7 +- .../ingest/PipelineExecutionServiceTests.java | 27 ++++++++ .../ingest/PipelineStoreTests.java | 9 ++- 7 files changed, 142 insertions(+), 14 deletions(-) diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java index c62a8fd237148..a8ca20485c451 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java @@ -18,10 +18,14 @@ */ package org.elasticsearch.ingest.common; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.MockScriptEngine; @@ -33,6 +37,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.function.Consumer; import java.util.function.Function; import static org.hamcrest.Matchers.equalTo; @@ -64,6 +69,66 @@ protected Map, Object>> pluginScripts() { } } + public void testScriptDisabled() throws Exception { + String pipelineIdWithoutScript = randomAlphaOfLengthBetween(5, 10); + String pipelineIdWithScript = pipelineIdWithoutScript + "_script"; + internalCluster().startNode(); + + BytesReference pipelineWithScript = new BytesArray("{\n" + + " \"processors\" : [\n" + + " {\"script\" : {\"lang\": \"" + MockScriptEngine.NAME + "\", \"source\": \"my_script\"}}\n" + + " ]\n" + + "}"); + BytesReference pipelineWithoutScript = new BytesArray("{\n" + + " \"processors\" : [\n" + + " {\"set\" : {\"field\": \"y\", \"value\": 0}}\n" + + " ]\n" + + "}"); + + Consumer checkPipelineExists = (id) -> assertThat(client().admin().cluster().prepareGetPipeline(id) + .get().pipelines().get(0).getId(), equalTo(id)); + + client().admin().cluster().preparePutPipeline(pipelineIdWithScript, pipelineWithScript, XContentType.JSON).get(); + client().admin().cluster().preparePutPipeline(pipelineIdWithoutScript, pipelineWithoutScript, XContentType.JSON).get(); + + checkPipelineExists.accept(pipelineIdWithScript); + checkPipelineExists.accept(pipelineIdWithoutScript); + + + internalCluster().stopCurrentMasterNode(); + internalCluster().startNode(Settings.builder().put("script.allowed_types", "none")); + + checkPipelineExists.accept(pipelineIdWithoutScript); + checkPipelineExists.accept(pipelineIdWithScript); + + client().prepareIndex("index", "doc", "1") + .setSource("x", 0) + .setPipeline(pipelineIdWithoutScript) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + + ElasticsearchException exception = expectThrows(ElasticsearchException.class, + () -> client().prepareIndex("index", "doc", "2") + .setSource("x", 0) + .setPipeline(pipelineIdWithScript) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get()); + assertThat(exception.getHeaderKeys(), equalTo(Sets.newHashSet("processor_type"))); + assertThat(exception.getHeader("processor_type"), equalTo(Arrays.asList("unknown"))); + assertThat(exception.getRootCause().getMessage(), + equalTo("pipeline with id [" + pipelineIdWithScript + "] could not be loaded, caused by " + + "[ElasticsearchParseException[Error updating pipeline with id [" + pipelineIdWithScript + "]]; " + + "nested: ElasticsearchException[java.lang.IllegalArgumentException: cannot execute [inline] scripts]; " + + "nested: IllegalArgumentException[cannot execute [inline] scripts];; " + + "ElasticsearchException[java.lang.IllegalArgumentException: cannot execute [inline] scripts]; " + + "nested: IllegalArgumentException[cannot execute [inline] scripts];; java.lang.IllegalArgumentException: " + + "cannot execute [inline] scripts]")); + + Map source = client().prepareGet("index", "doc", "1").get().getSource(); + assertThat(source.get("x"), equalTo(0)); + assertThat(source.get("y"), equalTo(0)); + } + public void testPipelineWithScriptProcessorThatHasStoredScript() throws Exception { internalCluster().startNode(); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 4a018ca025896..ad2b8643f7ae3 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -25,8 +25,6 @@ import java.util.List; import java.util.Map; -import org.elasticsearch.common.settings.ClusterSettings; -import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; import org.elasticsearch.index.analysis.AnalysisRegistry; @@ -34,8 +32,6 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; -import static org.elasticsearch.common.settings.Setting.Property; - /** * Holder class for several ingest related services. */ diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java b/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java index 21372e46e5f3d..c6dce0bd45b3c 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java @@ -81,16 +81,41 @@ void innerUpdatePipelines(ClusterState previousState, ClusterState state) { } Map pipelines = new HashMap<>(); + List exceptions = new ArrayList<>(); for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) { try { pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories)); } catch (ElasticsearchParseException e) { - throw e; + pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), e)); + exceptions.add(e); } catch (Exception e) { - throw new ElasticsearchParseException("Error updating pipeline with id [" + pipeline.getId() + "]", e); + ElasticsearchParseException parseException = new ElasticsearchParseException( + "Error updating pipeline with id [" + pipeline.getId() + "]", e); + pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), parseException)); + exceptions.add(parseException); } } this.pipelines = Collections.unmodifiableMap(pipelines); + ExceptionsHelper.rethrowAndSuppress(exceptions); + } + + private Pipeline substitutePipeline(String id, ElasticsearchParseException e) { + String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null; + String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown"; + String errorMessage = "pipeline with id [" + id + "] could not be loaded, caused by [" + e.getDetailedMessage() + "]"; + Processor failureProcessor = new AbstractProcessor(tag) { + @Override + public void execute(IngestDocument ingestDocument) { + throw new IllegalStateException(errorMessage); + } + + @Override + public String getType() { + return type; + } + }; + String description = "this is a place holder pipeline, because pipeline with id [" + id + "] could not be loaded"; + return new Pipeline(id, description, null, new CompoundProcessor(failureProcessor)); } /** diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java b/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java index 654927b19f2fb..64a9999084877 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java @@ -36,16 +36,12 @@ import org.elasticsearch.action.ingest.SimulatePipelineRequest; import org.elasticsearch.action.ingest.SimulatePipelineResponse; import org.elasticsearch.action.ingest.WritePipelineResponse; -import org.elasticsearch.action.support.replication.TransportReplicationActionTests; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Requests; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.script.Script; -import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ESIntegTestCase; import java.util.Arrays; @@ -130,6 +126,10 @@ public void testSimulate() throws Exception { IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, source); assertThat(simulateDocumentBaseResult.getIngestDocument().getSourceAndMetadata(), equalTo(ingestDocument.getSourceAndMetadata())); assertThat(simulateDocumentBaseResult.getFailure(), nullValue()); + + // cleanup + WritePipelineResponse deletePipelineResponse = client().admin().cluster().prepareDeletePipeline("_id").get(); + assertTrue(deletePipelineResponse.isAcknowledged()); } public void testBulkWithIngestFailures() throws Exception { @@ -172,6 +172,10 @@ public void testBulkWithIngestFailures() throws Exception { assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult()); } } + + // cleanup + WritePipelineResponse deletePipelineResponse = client().admin().cluster().prepareDeletePipeline("_id").get(); + assertTrue(deletePipelineResponse.isAcknowledged()); } public void testBulkWithUpsert() throws Exception { @@ -271,5 +275,8 @@ public void testPutWithPipelineFactoryError() throws Exception { assertNotNull(ex); assertThat(ex.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]")); } + + GetPipelineResponse response = client().admin().cluster().prepareGetPipeline("_id").get(); + assertFalse(response.isFound()); } } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java b/server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java index 645933348879c..03777b98ab73e 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java @@ -37,7 +37,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; @ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST) public class IngestProcessorNotInstalledOnAllNodesIT extends ESIntegTestCase { @@ -104,7 +103,11 @@ public void testFailStartNode() throws Exception { installPlugin = false; String node2 = internalCluster().startNode(); pipeline = internalCluster().getInstance(NodeService.class, node2).getIngestService().getPipelineStore().get("_id"); - assertThat(pipeline, nullValue()); + + assertNotNull(pipeline); + assertThat(pipeline.getId(), equalTo("_id")); + assertThat(pipeline.getDescription(), equalTo("this is a place holder pipeline, " + + "because pipeline with id [_id] could not be loaded")); } } diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java index 44c8e78bef703..927b366794d8f 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.ingest; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkRequest; @@ -91,6 +92,32 @@ public void testExecuteIndexPipelineDoesNotExist() { verify(completionHandler, never()).accept(anyBoolean()); } + public void testExecuteIndexPipelineExistsButFailedParsing() { + when(store.get("_id")).thenReturn(new Pipeline("_id", "stub", null, + new CompoundProcessor(new AbstractProcessor("mock") { + @Override + public void execute(IngestDocument ingestDocument) { + throw new IllegalStateException("error"); + } + + @Override + public String getType() { + return null; + } + }))); + SetOnce failed = new SetOnce<>(); + IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); + Consumer failureHandler = (e) -> { + assertThat(e.getCause().getClass(), equalTo(IllegalArgumentException.class)); + assertThat(e.getCause().getCause().getClass(), equalTo(IllegalStateException.class)); + assertThat(e.getCause().getCause().getMessage(), equalTo("error")); + failed.set(true); + }; + Consumer completionHandler = (e) -> failed.set(false); + executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); + assertTrue(failed.get()); + } + public void testExecuteBulkPipelineDoesNotExist() { CompoundProcessor processor = mock(CompoundProcessor.class); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor)); diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java index bb0d57871208c..250bb5059cf58 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java @@ -29,7 +29,6 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.ESTestCase; @@ -165,7 +164,13 @@ public void testPutWithErrorResponse() { assertThat(e.getMessage(), equalTo("[processors] required property is missing")); } pipeline = store.get(id); - assertThat(pipeline, nullValue()); + assertNotNull(pipeline); + assertThat(pipeline.getId(), equalTo("_id")); + assertThat(pipeline.getDescription(), equalTo("this is a place holder pipeline, because pipeline with" + + " id [_id] could not be loaded")); + assertThat(pipeline.getProcessors().size(), equalTo(1)); + assertNull(pipeline.getProcessors().get(0).getTag()); + assertThat(pipeline.getProcessors().get(0).getType(), equalTo("unknown")); } public void testDelete() {