Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk processor concurrent requests #41451

Merged
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 78 additions & 30 deletions server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand All @@ -39,6 +40,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

Expand Down Expand Up @@ -225,6 +227,7 @@ private static Scheduler buildScheduler(ScheduledThreadPoolExecutor scheduledThr
private final Runnable onClose;

private volatile boolean closed = false;
private final ReentrantLock lock = new ReentrantLock();

BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener,
int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,
Expand Down Expand Up @@ -264,21 +267,26 @@ public void close() {
* completed
* @throws InterruptedException If the current thread is interrupted
*/
public synchronized boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
if (closed) {
return true;
}
closed = true;
public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
lock.lock();
try {
if (closed) {
return true;
}
closed = true;

this.cancellableFlushTask.cancel();
this.cancellableFlushTask.cancel();

if (bulkRequest.numberOfActions() > 0) {
execute();
}
try {
return this.bulkRequestHandler.awaitClose(timeout, unit);
if (bulkRequest.numberOfActions() > 0) {
execute();
}
try {
return this.bulkRequestHandler.awaitClose(timeout, unit);
} finally {
onClose.run();
}
} finally {
onClose.run();
lock.unlock();
}
}

Expand Down Expand Up @@ -315,10 +323,22 @@ protected void ensureOpen() {
}
}

private synchronized void internalAdd(DocWriteRequest<?> request) {
ensureOpen();
bulkRequest.add(request);
executeIfNeeded();
private void internalAdd(DocWriteRequest<?> request) {
//bulkRequest and instance swapping is not threadsafe, so execute the mutations under a lock.
//once the bulk request is ready to be shipped swap the instance reference unlock and send the local reference to the handler.
Tuple<BulkRequest, Long> bulkRequestToExecute = null;
lock.lock();
try {
ensureOpen();
bulkRequest.add(request);
bulkRequestToExecute = newBulkRequestIfNeeded();
} finally {
lock.unlock();
}
//execute sending the local reference outside the lock to allow handler to control the concurrency via it's configuration.
if (bulkRequestToExecute != null) {
execute(bulkRequestToExecute.v1(), bulkRequestToExecute.v2());
}
}

/**
Expand All @@ -332,11 +352,23 @@ public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nu
/**
* Adds the data from the bytes to be processed by the bulk processor
*/
public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
@Nullable String defaultPipeline,
XContentType xContentType) throws Exception {
bulkRequest.add(data, defaultIndex, defaultType, null, null, defaultPipeline, true, xContentType);
executeIfNeeded();
Tuple<BulkRequest, Long> bulkRequestToExecute = null;
lock.lock();
try {
ensureOpen();
bulkRequest.add(data, defaultIndex, defaultType, null, null, defaultPipeline,
true, xContentType);
bulkRequestToExecute = newBulkRequestIfNeeded();
} finally {
lock.unlock();
}

if (bulkRequestToExecute != null) {
execute(bulkRequestToExecute.v1(), bulkRequestToExecute.v2());
}
return this;
}

Expand All @@ -358,23 +390,32 @@ public boolean isCancelled() {
return scheduler.scheduleWithFixedDelay(flushRunnable, flushInterval, ThreadPool.Names.GENERIC);
}

private void executeIfNeeded() {
// needs to be executed under a lock
private Tuple<BulkRequest,Long> newBulkRequestIfNeeded(){
ensureOpen();
if (!isOverTheLimit()) {
return;
return null;
}
execute();
final BulkRequest bulkRequest = this.bulkRequest;
this.bulkRequest = bulkRequestSupplier.get();
return new Tuple<>(bulkRequest,executionIdGen.incrementAndGet()) ;
}

// may be executed without a lock
private void execute(BulkRequest bulkRequest, long executionId ){
this.bulkRequestHandler.execute(bulkRequest, executionId);
}

// (currently) needs to be executed under a lock
// needs to be executed under a lock
private void execute() {
final BulkRequest bulkRequest = this.bulkRequest;
final long executionId = executionIdGen.incrementAndGet();

this.bulkRequest = bulkRequestSupplier.get();
this.bulkRequestHandler.execute(bulkRequest, executionId);
execute(bulkRequest, executionId);
}

// needs to be executed under a lock
private boolean isOverTheLimit() {
if (bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions) {
return true;
Expand All @@ -388,25 +429,32 @@ private boolean isOverTheLimit() {
/**
* Flush pending delete or index requests.
*/
public synchronized void flush() {
ensureOpen();
if (bulkRequest.numberOfActions() > 0) {
execute();
public void flush() {
lock.lock();
try {
ensureOpen();
if (bulkRequest.numberOfActions() > 0) {
execute();
}
} finally {
lock.unlock();
}
}

class Flush implements Runnable {

@Override
public void run() {
synchronized (BulkProcessor.this) {
lock.lock();
try {
if (closed) {
return;
}
if (bulkRequest.numberOfActions() == 0) {
return;
}
execute();
} finally {
lock.unlock();
}
}
}
Expand Down
Loading