Skip to content

Commit

Permalink
move lock outside of try block and fix formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
jakelandis committed May 10, 2019
1 parent b014871 commit a9e0f3b
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ private static Scheduler buildScheduler(ScheduledThreadPoolExecutor scheduledThr

private final AtomicLong executionIdGen = new AtomicLong();

private volatile BulkRequest bulkRequest;
private BulkRequest bulkRequest;
private final Supplier<BulkRequest> bulkRequestSupplier;
private final BulkRequestHandler bulkRequestHandler;
private final Runnable onClose;
Expand Down Expand Up @@ -268,8 +268,8 @@ public void close() {
* @throws InterruptedException If the current thread is interrupted
*/
public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
lock.lock();
try {
lock.lock();
if (closed) {
return true;
}
Expand All @@ -285,7 +285,7 @@ public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedExcepti
} finally {
onClose.run();
}
}finally {
} finally {
lock.unlock();
}
}
Expand Down Expand Up @@ -327,8 +327,8 @@ private void internalAdd(DocWriteRequest<?> request) {
//bulkRequest and instance swapping is not threadsafe, so execute the mutations under a lock.
//once the bulk request is ready to be shipped swap the instance reference unlock and send the local reference to the handler.
Tuple<BulkRequest, Long> bulkRequestToExecute = null;
lock.lock();
try {
lock.lock();
ensureOpen();
bulkRequest.add(request);
bulkRequestToExecute = newBulkRequestIfNeeded();
Expand Down Expand Up @@ -356,8 +356,8 @@ public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nu
@Nullable String defaultPipeline,
XContentType xContentType) throws Exception {
Tuple<BulkRequest, Long> bulkRequestToExecute = null;
lock.lock();
try {
lock.lock();
ensureOpen();
bulkRequest.add(data, defaultIndex, defaultType, null, null, defaultPipeline,
true, xContentType);
Expand Down Expand Up @@ -430,22 +430,22 @@ private boolean isOverTheLimit() {
* Flush pending delete or index requests.
*/
public void flush() {
lock.lock();
try {
lock.lock();
ensureOpen();
if (bulkRequest.numberOfActions() > 0) {
execute();
}
}finally {
} finally {
lock.unlock();
}
}

class Flush implements Runnable {
@Override
public void run() {
lock.lock();
try {
lock.lock();
if (closed) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ concurrentBulkRequests, maxBatchSize, new ByteSizeValue(Integer.MAX_VALUE), null
for (Future f : futures) {
try {
f.get(10, TimeUnit.SECONDS);
}catch (Exception e){
} catch (Exception e){
failureCount.incrementAndGet();
logger.error("failure while getting future", e);
}
Expand Down Expand Up @@ -252,7 +252,7 @@ concurrentBulkRequests, maxBatchSize, new ByteSizeValue(Integer.MAX_VALUE),
for (Future f : futures) {
try {
f.get(10, TimeUnit.SECONDS);
}catch (Exception e){
} catch (Exception e){
failureCount.incrementAndGet();
logger.error("failure while getting future", e);
}
Expand Down

0 comments on commit a9e0f3b

Please sign in to comment.