-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make ingest executing non blocking #43361
Changes from 15 commits
7222f1e
7df6779
72a4690
09bdbd1
58658be
0cc9e5f
d000f85
a77f9ee
cc718a4
2dce37a
5f66516
acd3671
720ee01
5d105a1
494cdbe
9256652
f404843
0103761
17f9a66
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,8 @@ | |
|
||
package org.elasticsearch.action.bulk; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.apache.logging.log4j.message.ParameterizedMessage; | ||
import org.apache.lucene.util.SparseFixedBitSet; | ||
import org.elasticsearch.ElasticsearchParseException; | ||
|
@@ -585,14 +587,13 @@ private long relativeTime() { | |
} | ||
|
||
void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener<BulkResponse> listener) { | ||
long ingestStartTimeInNanos = System.nanoTime(); | ||
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original); | ||
ingestService.executeBulkRequest(() -> bulkRequestModifier, | ||
(indexRequest, exception) -> { | ||
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]", | ||
indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), exception); | ||
bulkRequestModifier.markCurrentItemAsFailed(exception); | ||
}, (exception) -> { | ||
final long ingestStartTimeInNanos = System.nanoTime(); | ||
final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original); | ||
ingestService.executeBulkRequest( | ||
original.numberOfActions(), | ||
() -> bulkRequestModifier, | ||
bulkRequestModifier::markItemAsFailed, | ||
jakelandis marked this conversation as resolved.
Show resolved
Hide resolved
|
||
(originalThread, exception) -> { | ||
if (exception != null) { | ||
logger.error("failed to execute pipeline for a bulk request", exception); | ||
listener.onFailure(exception); | ||
|
@@ -607,21 +608,50 @@ void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListen | |
// (this will happen if pre-processing all items in the bulk failed) | ||
actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0)); | ||
} else { | ||
doExecute(task, bulkRequest, actionListener); | ||
// If a processor went async and returned a response on a different thread then | ||
// before we continue the bulk request we should fork back on a write thread: | ||
if (originalThread == Thread.currentThread()) { | ||
assert Thread.currentThread().getName().contains(ThreadPool.Names.WRITE); | ||
doExecute(task, bulkRequest, actionListener); | ||
} else { | ||
threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() { | ||
@Override | ||
public void onFailure(Exception e) { | ||
listener.onFailure(e); | ||
} | ||
|
||
@Override | ||
protected void doRun() throws Exception { | ||
doExecute(task, bulkRequest, actionListener); | ||
} | ||
|
||
@Override | ||
public boolean isForceExecution() { | ||
// If we fork back to a write thread we **not** should fail, because tp queue is full. | ||
// (Otherwise the work done during ingest will be lost) | ||
// It is okay to force execution here. Throttling of write requests happens prior to | ||
// ingest when a node receives a bulk request. | ||
return true; | ||
} | ||
}); | ||
} | ||
} | ||
} | ||
}, | ||
indexRequest -> bulkRequestModifier.markCurrentItemAsDropped()); | ||
bulkRequestModifier::markItemAsDropped | ||
); | ||
} | ||
|
||
static final class BulkRequestModifier implements Iterator<DocWriteRequest<?>> { | ||
|
||
private static final Logger LOGGER = LogManager.getLogger(BulkRequestModifier.class); | ||
|
||
final BulkRequest bulkRequest; | ||
final SparseFixedBitSet failedSlots; | ||
final List<BulkItemResponse> itemResponses; | ||
|
||
int currentSlot = -1; | ||
int[] originalSlots; | ||
volatile int currentSlot = -1; | ||
volatile int[] originalSlots; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm always a bit on the fence when I see a volatile reference to a mutable object, because then There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed: f404843 |
||
|
||
BulkRequestModifier(BulkRequest bulkRequest) { | ||
this.bulkRequest = bulkRequest; | ||
|
@@ -679,11 +709,11 @@ ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis, | |
} | ||
} | ||
|
||
void markCurrentItemAsDropped() { | ||
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot)); | ||
failedSlots.set(currentSlot); | ||
synchronized void markItemAsDropped(int slot) { | ||
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot)); | ||
failedSlots.set(slot); | ||
itemResponses.add( | ||
new BulkItemResponse(currentSlot, indexRequest.opType(), | ||
new BulkItemResponse(slot, indexRequest.opType(), | ||
new UpdateResponse( | ||
new ShardId(indexRequest.index(), IndexMetaData.INDEX_UUID_NA_VALUE, 0), | ||
indexRequest.type(), indexRequest.id(), indexRequest.version(), DocWriteResponse.Result.NOOP | ||
|
@@ -692,16 +722,19 @@ void markCurrentItemAsDropped() { | |
); | ||
} | ||
|
||
void markCurrentItemAsFailed(Exception e) { | ||
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot)); | ||
synchronized void markItemAsFailed(int slot, Exception e) { | ||
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot)); | ||
LOGGER.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. :elasticheart: |
||
indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), e); | ||
|
||
// We hit a error during preprocessing a request, so we: | ||
// 1) Remember the request item slot from the bulk, so that we're done processing all requests we know what failed | ||
// 2) Add a bulk item failure for this request | ||
// 3) Continue with the next request in the bulk. | ||
failedSlots.set(currentSlot); | ||
failedSlots.set(slot); | ||
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), | ||
indexRequest.id(), e); | ||
itemResponses.add(new BulkItemResponse(currentSlot, indexRequest.opType(), failure)); | ||
itemResponses.add(new BulkItemResponse(slot, indexRequest.opType(), failure)); | ||
} | ||
|
||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
merge both into an
else if
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pushed: 0103761