-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Continue registering pipelines after one pipeline parse failure. #28752
Conversation
Ingest has been failing to apply existing pipelines from cluster-state into the in-memory representation that are no longer valid. One example of this is a pipeline with a script processor. If a cluster starts up with scripting disabled, these pipelines will not be loaded. Even though GETing a pipeline worked, indexing operations claimed that this pipeline did not exist. This is because one gets information from cluster-state and the other is from an in-memory data-structure. Now, two things happen 1. suppress the exceptions until after other successfull pipelines are loaded 2. replace failed pipelines with a placeholder pipeline called `Pipeline.EMPTY` If the pipeline execution service encounters `Pipeline.EMPTY`, it is known that something went wrong at the time of pipeline creation and an exception was thrown to the user at some point at start-up. closes elastic#28269.
c930300
to
bcfa96d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good @talevy! I left a suggestion on how to improve handeling these pipeline construction errors.
for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) { | ||
try { | ||
pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories)); | ||
} catch (ElasticsearchParseException e) { | ||
throw e; | ||
pipelines.put(pipeline.getId(), Pipeline.EMPTY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can create a pipeline with a single fail
processor. This fail processor can then contain the reason why the pipeline in question couldn't be loaded. This way there is no need to check the logs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this, but I will try to do this in such a way that the pipeline isn't actually executed and is only used to store the exception. If I can't find a way to cleanly do that, then I will just let the pipeline run and fail
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, also, fail
processor is inside ingest-common
so that makes it slightly more annoying
@martijnvg I've updated to carry the exception's message and pass it on to the user at runtime. good call — way better than telling them to look at logs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@talevy Cool! I left an additional suggestion on how to improve the error handling.
for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) { | ||
try { | ||
pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories)); | ||
} catch (ElasticsearchParseException e) { | ||
throw e; | ||
pipelines.put(pipeline.getId(), new Pipeline("invalid_" + pipeline.getId(), e.getMessage(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if we create a pipeline that throws the exception for us (fail processor is in a ingest-common, but it very simple to have that behaviour here too):
pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), e));
private Pipeline substitutePipeline(String id, ElasticsearchParseException e) {
String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null;
String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown";
String errorMessage = "pipeline with id [" + id + "] could not be loaded, caused by [" + e.getDetailedMessage() + "]";
Processor failureProcessor = new AbstractProcessor(tag) {
@Override
public void execute(IngestDocument ingestDocument) throws Exception {
throw new IllegalStateException(errorMessage);
}
@Override
public String getType() {
return type;
}
};
String description = "this is a place holder pipeline, because pipeline with id [" + id + "] could not be loaded";
return new Pipeline(id, description, null, new CompoundProcessor(failureProcessor));
}
This way the if statement added PipelineExecutionService.java
is not needed and this failure to load logic is in one place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note, I updated the above comment.
Also I think it is important to throw an IllegalStateException instead of a IllegalArgumentException, because the fact that we failed to load the pipeline at this stage is a real issue and this is the best way we can deal with it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with this approach is that we will be put these substituted pipelines in PUT requests.
this will return true: https://github.com/elastic/elasticsearch/pull/28752/files#diff-fd6036d9f6ce14fecb8751a3adb9fc59R280 (testPutWithPipelineFactoryError
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm. nevermind.. for some reason I saw that test fail. I might have done something wrong or a different test failed
@@ -81,16 +81,22 @@ void innerUpdatePipelines(ClusterState previousState, ClusterState state) { | |||
} | |||
|
|||
Map<String, Pipeline> pipelines = new HashMap<>(); | |||
ArrayList<ElasticsearchParseException> exceptions = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/ArrayList/List ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
@martijnvg I've updated the PR to reflect your suggestion around inserting a placeholder pipeline that fails during execution |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
(The build only failed, because of a checkstyle violation)
Also I think we should backport this to the 5.6 branch. wdyt?
@martijnvg Once I merge into 6.x and master, I'll see how the backport looks for 5.6 |
) Ingest has been failing to apply existing pipelines from cluster-state into the in-memory representation that are no longer valid. One example of this is a pipeline with a script processor. If a cluster starts up with scripting disabled, these pipelines will not be loaded. Even though GETing a pipeline worked, indexing operations claimed that this pipeline did not exist. This is because one gets information from cluster-state and the other is from an in-memory data-structure. Now, two things happen 1. suppress the exceptions until after other successful pipelines are loaded 2. replace failed pipelines with a placeholder pipeline If the pipeline execution service encounters the stubbed pipeline, it is known that something went wrong at the time of pipeline creation and an exception was thrown to the user at some point at start-up. closes #28269.
) Ingest has been failing to apply existing pipelines from cluster-state into the in-memory representation that are no longer valid. One example of this is a pipeline with a script processor. If a cluster starts up with scripting disabled, these pipelines will not be loaded. Even though GETing a pipeline worked, indexing operations claimed that this pipeline did not exist. This is because one gets information from cluster-state and the other is from an in-memory data-structure. Now, two things happen 1. suppress the exceptions until after other successful pipelines are loaded 2. replace failed pipelines with a placeholder pipeline If the pipeline execution service encounters the stubbed pipeline, it is known that something went wrong at the time of pipeline creation and an exception was thrown to the user at some point at start-up. closes #28269.
…stic#28752) Ingest has been failing to apply existing pipelines from cluster-state into the in-memory representation that are no longer valid. One example of this is a pipeline with a script processor. If a cluster starts up with scripting disabled, these pipelines will not be loaded. Even though GETing a pipeline worked, indexing operations claimed that this pipeline did not exist. This is because one gets information from cluster-state and the other is from an in-memory data-structure. Now, two things happen 1. suppress the exceptions until after other successful pipelines are loaded 2. replace failed pipelines with a placeholder pipeline If the pipeline execution service encounters the stubbed pipeline, it is known that something went wrong at the time of pipeline creation and an exception was thrown to the user at some point at start-up. closes elastic#28269.
* master: (28 commits) Maybe die before failing engine (elastic#28973) Remove special handling for _all in nodes info Remove Booleans use from XContent and ToXContent (elastic#28768) Update Gradle Testing Docs (elastic#28970) Make primary-replica resync failures less lenient (elastic#28534) Remove temporary file 10_basic.yml~ Use different pipeline id in test. (pipelines do not get removed between tests extending from ESIntegTestCase) Use fixture to test the repository-gcs plugin (elastic#28788) Use String.join() to describe a list of tasks (elastic#28941) Fixed incorrect test try-catch statement Plugins: Consolidate plugin and module loading code (elastic#28815) percolator: Take `matchAllDocs` and `verified` of the sub result into account when analyzing a function_score query. Build: Remove rest tests on archive distribution projects (elastic#28952) Remove FastStringReader in favor of vanilla StringReader (elastic#28944) Remove FastCharArrayReader and FastCharArrayWriter (elastic#28951) Continue registering pipelines after one pipeline parse failure. (elastic#28752) Build: Fix ability to ignore when no tests are run (elastic#28930) [rest-api-spec] update doc link for /_rank_eval Switch XContentBuilder from BytesStreamOutput to ByteArrayOutputStream (elastic#28945) Factor UnknownNamedObjectException into its own class (elastic#28931) ...
Ingest has been failing to apply existing pipelines from cluster-state
into the in-memory representation that are no longer valid. One example of
this is a pipeline with a script processor. If a cluster starts up with scripting
disabled, these pipelines will not be loaded. Even though GETing a pipeline worked,
indexing operations claimed that this pipeline did not exist. This is because one
gets information from cluster-state and the other is from an in-memory data-structure.
Now, two things happen
in its description
If the pipeline execution service encounters a pipeline stub, it is known that
something went wrong at the time of pipeline creation and an exception was thrown to
the user at some point at start-up.
closes #28269.