Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Continue registering pipelines after one pipeline parse failure. #28752

Merged
merged 11 commits into from
Mar 8, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;

import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -64,6 +65,57 @@ protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
}
}

public void testScriptDisabled() throws Exception {
String pipelineIdWithoutScript = randomAlphaOfLengthBetween(5, 10);
String pipelineIdWithScript = pipelineIdWithoutScript + "_script";
internalCluster().startNode();

BytesReference pipelineWithScript = new BytesArray("{\n" +
" \"processors\" : [\n" +
" {\"script\" : {\"lang\": \"" + MockScriptEngine.NAME + "\", \"source\": \"my_script\"}}\n" +
" ]\n" +
"}");
BytesReference pipelineWithoutScript = new BytesArray("{\n" +
" \"processors\" : [\n" +
" {\"set\" : {\"field\": \"y\", \"value\": 0}}\n" +
" ]\n" +
"}");

Consumer<String> checkPipelineExists = (id) -> assertThat(client().admin().cluster().prepareGetPipeline(id)
.get().pipelines().get(0).getId(), equalTo(id));

client().admin().cluster().preparePutPipeline(pipelineIdWithScript, pipelineWithScript, XContentType.JSON).get();
client().admin().cluster().preparePutPipeline(pipelineIdWithoutScript, pipelineWithoutScript, XContentType.JSON).get();

checkPipelineExists.accept(pipelineIdWithScript);
checkPipelineExists.accept(pipelineIdWithoutScript);

internalCluster().stopCurrentMasterNode();
internalCluster().startNode(Settings.builder().put("script.allowed_types", "none"));

checkPipelineExists.accept(pipelineIdWithoutScript);
checkPipelineExists.accept(pipelineIdWithScript);

client().prepareIndex("index", "doc", "1")
.setSource("x", 0)
.setPipeline(pipelineIdWithoutScript)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();

IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
() -> client().prepareIndex("index", "doc", "2")
.setSource("x", 0)
.setPipeline(pipelineIdWithScript)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get());
assertThat(exception.getMessage(),
equalTo("pipeline with id [" + pipelineIdWithScript + "] was not parsed successfully, check logs at start-up for exceptions"));

Map<String, Object> source = client().prepareGet("index", "doc", "1").get().getSource();
assertThat(source.get("x"), equalTo(0));
assertThat(source.get("y"), equalTo(0));
}

public void testPipelineWithScriptProcessorThatHasStoredScript() throws Exception {
internalCluster().startNode();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,13 @@
import java.util.List;
import java.util.Map;

import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;

import static org.elasticsearch.common.settings.Setting.Property;

/**
* Holder class for several ingest related services.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
*/
public final class Pipeline {

static final Pipeline EMPTY = new Pipeline("_empty", null, null, new CompoundProcessor());
static final String DESCRIPTION_KEY = "description";
static final String PROCESSORS_KEY = "processors";
static final String VERSION_KEY = "version";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ private Pipeline getPipeline(String pipelineId) {
Pipeline pipeline = store.get(pipelineId);
if (pipeline == null) {
throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
} else if (pipeline == Pipeline.EMPTY) {
throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] was not parsed successfully," +
" check logs at start-up for exceptions");
}
return pipeline;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,20 @@ void innerUpdatePipelines(ClusterState previousState, ClusterState state) {
}

Map<String, Pipeline> pipelines = new HashMap<>();
ArrayList<ElasticsearchParseException> exceptions = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/ArrayList/List ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) {
try {
pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories));
} catch (ElasticsearchParseException e) {
throw e;
pipelines.put(pipeline.getId(), Pipeline.EMPTY);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can create a pipeline with a single fail processor. This fail processor can then contain the reason why the pipeline in question couldn't be loaded. This way there is no need to check the logs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this, but I will try to do this in such a way that the pipeline isn't actually executed and is only used to store the exception. If I can't find a way to cleanly do that, then I will just let the pipeline run and fail

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, also, fail processor is inside ingest-common so that makes it slightly more annoying

exceptions.add(e);
} catch (Exception e) {
throw new ElasticsearchParseException("Error updating pipeline with id [" + pipeline.getId() + "]", e);
pipelines.put(pipeline.getId(), Pipeline.EMPTY);
exceptions.add(new ElasticsearchParseException("Error updating pipeline with id [" + pipeline.getId() + "]", e));
}
}
this.pipelines = Collections.unmodifiableMap(pipelines);
ExceptionsHelper.rethrowAndSuppress(exceptions);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,24 @@ public void testExecuteIndexPipelineDoesNotExist() {
verify(completionHandler, never()).accept(anyBoolean());
}

public void testExecuteIndexPipelineExistsButFailedParsing() {
when(store.get("_id")).thenReturn(Pipeline.EMPTY);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id");
@SuppressWarnings("unchecked")
Consumer<Exception> failureHandler = mock(Consumer.class);
@SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class);
try {
executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
fail("IllegalArgumentException expected");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(),
equalTo("pipeline with id [_id] was not parsed successfully, check logs at start-up for exceptions"));
}
verify(failureHandler, never()).accept(any(Exception.class));
verify(completionHandler, never()).accept(anyBoolean());
}

public void testExecuteBulkPipelineDoesNotExist() {
CompoundProcessor processor = mock(CompoundProcessor.class);
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor));
Expand Down