-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make ingest executing non blocking #43361
Make ingest executing non blocking #43361
Conversation
Added an additional method to the Processor interface to allow a processor implementation to make a non blocking call.
…tead of recursively invoking a method in order to avoid SO
failed or dropped out of order.
Pinging @elastic/es-core-features |
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
Outdated
Show resolved
Hide resolved
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot)); | ||
synchronized void markItemAsFailed(int slot, Exception e) { | ||
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot)); | ||
LOGGER.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:elasticheart:
server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/ingest/PipelineProcessor.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/ExactMatchProcessor.java
Show resolved
Hide resolved
Looking really good. a couple questions above. |
only if the current thread is not the same as original thread then fork into write thread
@elasticmachine run elasticsearch-ci/default-distro |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, nice work!
search requests originating from the match processor. This is a temporary workaround.
@jakelandis I was benchmarking this PR and then ran into an issue; the bulk request with ingest was failing, because the enrich processor failed to perform search requests due to the fact that search tp is full and search requests were being rejected. This is an expected failure; a search request is performed for each bulk request item. We discussed this before and then we talked about a queue with workers mechanism to throttle the number of search requests (or other api calls) that the enrich processor is allowed to perform concurrently. In the benchmark setup, even though the search is local, the search api still executes on the search tp. Which makes sense from a general search use case perspective, but from the enrich perspective, there is no need to fork into another thread if the search can be performed locally. The internal transport action that we will be building to do a highly specialized lookup will take this into account. Additionally if this internal api needs to make a remote call then that should be throttled like is mentioned above. In order to keep the enrich branch benchmarkable, I made the following workaround change: 9256652 This allows to execute searches concurrently, but blocks when there are more than 100 concurrent searches. With this change the benchmark is able to complete successfully and the measured ingest time in bulk responses is between 173 ms and 588 ms. Compared ingest times from 930 ms to 1152 ms without this PR. |
newValues.add(document.getIngestMetadata().put("_value", previousValue)); | ||
handler.accept(null, e); | ||
} else { | ||
if (result == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
merge both into an else if
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pushed: 0103761
int currentSlot = -1; | ||
int[] originalSlots; | ||
volatile int currentSlot = -1; | ||
volatile int[] originalSlots; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm always a bit on the fence when I see a volatile reference to a mutable object, because then volatile
suggests that changes would be visible across threads, but this only applies to the int[] reference, not its content. I had a quick look and think the logic is correct right now, but an AtomicIntegerArray
instead of an int[]
would make it look less tricky in my opinion. Or alternatively we should set the content of the array before setting the reference a couple lines below and have comments about why concurrency is correct so that this doesn't get broken by an apparently harmless refactoring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed: f404843
Still LGTM |
Added an additional method to the Processor interface to allow a processor implementation to make a non blocking call. Also added semaphore in order to avoid search thread pools from rejecting search requests originating from the match processor. This is a temporary workaround.
This PR changes the ingest executing to be non blocking by adding an additional method to the Processor interface that accepts a BiConsumer as handler and changing IngestService#executeBulkRequest(...) to ingest document in a non blocking fashion iff a processor executes in a non blocking fashion. This is the second PR that merges changes made to server module from the enrich branch (see #32789) into the master branch. The plan is to merge changes made to the server module separately from the pr that will merge enrich into master, so that these changes can be reviewed in isolation. This change originates from the enrich branch and was introduced there in #43361.
…#46241) This PR changes the ingest executing to be non blocking by adding an additional method to the Processor interface that accepts a BiConsumer as handler and changing IngestService#executeBulkRequest(...) to ingest document in a non blocking fashion iff a processor executes in a non blocking fashion. This is the second PR that merges changes made to server module from the enrich branch (see elastic#32789) into the master branch. The plan is to merge changes made to the server module separately from the pr that will merge enrich into master, so that these changes can be reviewed in isolation. This change originates from the enrich branch and was introduced there in elastic#43361.
Backport of #46241 This PR changes the ingest executing to be non blocking by adding an additional method to the Processor interface that accepts a BiConsumer as handler and changing IngestService#executeBulkRequest(...) to ingest document in a non blocking fashion iff a processor executes in a non blocking fashion. This is the second PR that merges changes made to server module from the enrich branch (see #32789) into the master branch. The plan is to merge changes made to the server module separately from the pr that will merge enrich into master, so that these changes can be reviewed in isolation. This change originates from the enrich branch and was introduced there in #43361.
This PR changes the ingest executing to be non blocking by adding an additional method to the
Processor
interface and changingIngestService#executeBulkRequest(...)
to ingest document in a non blocking fashion iff a processor executes in a non blocking fashion.This PR is against the enrich feature branch.
This is a draft PR until #43311 gets merged and this PR changes theExactMatchProcessor
to use the non blockingProcessor#execute(...)
method.