Skip to content

Commit

Permalink
Bulk processor concurrent requests (elastic#41451)
Browse files Browse the repository at this point in the history
`org.elasticsearch.action.bulk.BulkProcessor` is a threadsafe class that
allows for simple semantics to deal with sending bulk requests. Once a
bulk reaches it's pre-defined size, documents, or flush interval it will
execute sending the bulk. One configurable option is the number of concurrent
outstanding bulk requests. That concurrency is implemented in
`org.elasticsearch.action.bulk.BulkRequestHandler` via a semaphore. However,
the only code that currently calls into this code is blocked by `synchronized`
methods. This results in the in-ability for the BulkProcessor to behave concurrently
despite supporting configurable amounts of concurrent requests.

This change removes the `synchronized` method in favor an explicit
lock around the non-thread safe parts of the method. The call into
`org.elasticsearch.action.bulk.BulkRequestHandler` is no longer blocking, which
allows `org.elasticsearch.action.bulk.BulkRequestHandler` to handle it's own concurrency.
  • Loading branch information
jakelandis authored and Gurkan Kaymak committed May 27, 2019
1 parent d35c443 commit 8bb66f4
Show file tree
Hide file tree
Showing 2 changed files with 328 additions and 31 deletions.
108 changes: 78 additions & 30 deletions server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand All @@ -39,6 +40,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

Expand Down Expand Up @@ -225,6 +227,7 @@ private static Scheduler buildScheduler(ScheduledThreadPoolExecutor scheduledThr
private final Runnable onClose;

private volatile boolean closed = false;
private final ReentrantLock lock = new ReentrantLock();

BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener,
int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,
Expand Down Expand Up @@ -264,21 +267,26 @@ public void close() {
* completed
* @throws InterruptedException If the current thread is interrupted
*/
public synchronized boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
if (closed) {
return true;
}
closed = true;
public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
lock.lock();
try {
if (closed) {
return true;
}
closed = true;

this.cancellableFlushTask.cancel();
this.cancellableFlushTask.cancel();

if (bulkRequest.numberOfActions() > 0) {
execute();
}
try {
return this.bulkRequestHandler.awaitClose(timeout, unit);
if (bulkRequest.numberOfActions() > 0) {
execute();
}
try {
return this.bulkRequestHandler.awaitClose(timeout, unit);
} finally {
onClose.run();
}
} finally {
onClose.run();
lock.unlock();
}
}

Expand Down Expand Up @@ -315,10 +323,22 @@ protected void ensureOpen() {
}
}

private synchronized void internalAdd(DocWriteRequest<?> request) {
ensureOpen();
bulkRequest.add(request);
executeIfNeeded();
private void internalAdd(DocWriteRequest<?> request) {
//bulkRequest and instance swapping is not threadsafe, so execute the mutations under a lock.
//once the bulk request is ready to be shipped swap the instance reference unlock and send the local reference to the handler.
Tuple<BulkRequest, Long> bulkRequestToExecute = null;
lock.lock();
try {
ensureOpen();
bulkRequest.add(request);
bulkRequestToExecute = newBulkRequestIfNeeded();
} finally {
lock.unlock();
}
//execute sending the local reference outside the lock to allow handler to control the concurrency via it's configuration.
if (bulkRequestToExecute != null) {
execute(bulkRequestToExecute.v1(), bulkRequestToExecute.v2());
}
}

/**
Expand All @@ -332,11 +352,23 @@ public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nu
/**
* Adds the data from the bytes to be processed by the bulk processor
*/
public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
@Nullable String defaultPipeline,
XContentType xContentType) throws Exception {
bulkRequest.add(data, defaultIndex, defaultType, null, null, defaultPipeline, true, xContentType);
executeIfNeeded();
Tuple<BulkRequest, Long> bulkRequestToExecute = null;
lock.lock();
try {
ensureOpen();
bulkRequest.add(data, defaultIndex, defaultType, null, null, defaultPipeline,
true, xContentType);
bulkRequestToExecute = newBulkRequestIfNeeded();
} finally {
lock.unlock();
}

if (bulkRequestToExecute != null) {
execute(bulkRequestToExecute.v1(), bulkRequestToExecute.v2());
}
return this;
}

Expand All @@ -358,23 +390,32 @@ public boolean isCancelled() {
return scheduler.scheduleWithFixedDelay(flushRunnable, flushInterval, ThreadPool.Names.GENERIC);
}

private void executeIfNeeded() {
// needs to be executed under a lock
private Tuple<BulkRequest,Long> newBulkRequestIfNeeded(){
ensureOpen();
if (!isOverTheLimit()) {
return;
return null;
}
execute();
final BulkRequest bulkRequest = this.bulkRequest;
this.bulkRequest = bulkRequestSupplier.get();
return new Tuple<>(bulkRequest,executionIdGen.incrementAndGet()) ;
}

// may be executed without a lock
private void execute(BulkRequest bulkRequest, long executionId ){
this.bulkRequestHandler.execute(bulkRequest, executionId);
}

// (currently) needs to be executed under a lock
// needs to be executed under a lock
private void execute() {
final BulkRequest bulkRequest = this.bulkRequest;
final long executionId = executionIdGen.incrementAndGet();

this.bulkRequest = bulkRequestSupplier.get();
this.bulkRequestHandler.execute(bulkRequest, executionId);
execute(bulkRequest, executionId);
}

// needs to be executed under a lock
private boolean isOverTheLimit() {
if (bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions) {
return true;
Expand All @@ -388,25 +429,32 @@ private boolean isOverTheLimit() {
/**
* Flush pending delete or index requests.
*/
public synchronized void flush() {
ensureOpen();
if (bulkRequest.numberOfActions() > 0) {
execute();
public void flush() {
lock.lock();
try {
ensureOpen();
if (bulkRequest.numberOfActions() > 0) {
execute();
}
} finally {
lock.unlock();
}
}

class Flush implements Runnable {

@Override
public void run() {
synchronized (BulkProcessor.this) {
lock.lock();
try {
if (closed) {
return;
}
if (bulkRequest.numberOfActions() == 0) {
return;
}
execute();
} finally {
lock.unlock();
}
}
}
Expand Down
Loading

0 comments on commit 8bb66f4

Please sign in to comment.