From 4d792afdaf5a254820a5bf854625b6defc9212f8 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 26 Oct 2018 10:32:00 +0200 Subject: [PATCH 1/4] [RCI] Add IndexShardOperationPermits.asyncBlockOperations(ActionListener) The current implementation of asyncBlockOperations() can be used to execute some code once all indexing operations permits have been acquired, then releases all permits immediately after the code execution. This immediate release is not suitable for treatments that need to keep all permits over multiple execution steps. This commit adds a new asyncBlockOperations() that exposes a Releasable, making it possible to acquire all permits and only release them all when needed by closing the Releasable. This method is aimed to be used in a TransportReplicationAction that will acquire all permits on the primary shard. The existing blockOperations() and asyncBlockOperations() methods have been modified to delegate permit acquisition/releasing to this new method. Relates to #33888 --- .../shard/IndexShardOperationPermits.java | 79 ++++++++++++++----- .../IndexShardOperationPermitsTests.java | 42 ++++++++-- 2 files changed, 96 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index d4c3833b13a58..eec190bd03b80 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -104,8 +104,8 @@ void blockOperations( final TimeUnit timeUnit, final CheckedRunnable onBlocked) throws InterruptedException, TimeoutException, E { delayOperations(); - try { - doBlockOperations(timeout, timeUnit, onBlocked); + try (Releasable ignored = acquireAll(timeout, timeUnit)) { + onBlocked.run(); } finally { releaseDelayedOperations(); } @@ -123,23 +123,66 @@ void blockOperations( * @param onFailure the action to run if a failure occurs while blocking operations * @param the type of checked exception thrown by {@code onBlocked} (not thrown on the calling thread) */ - void asyncBlockOperations( - final long timeout, final TimeUnit timeUnit, final CheckedRunnable onBlocked, final Consumer onFailure) { + void asyncBlockOperations(final long timeout, final TimeUnit timeUnit, + final CheckedRunnable onBlocked, final Consumer onFailure) { + asyncBlockOperations(new ActionListener() { + @Override + public void onFailure(final Exception e) { + onFailure.accept(e); + } + + @Override + public void onResponse(final Releasable releasable) { + try (Releasable ignored = releasable) { + onBlocked.run(); + } catch (final Exception e) { + onFailure.accept(e); + } + } + }, timeout, timeUnit); + } + + /** + * Immediately delays operations and on another thread waits for in-flight operations to finish and then acquires all permits. When all + * permits are acquired, the provided {@link ActionListener} is called under the guarantee that no new operations are started. Delayed + * operations are run once the {@link Releasable} is released or if a failure occurs while acquiring all permits; in this case the + * {@code onFailure} handler will be invoked before running delayed operations. + * + * @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed + * @param timeout the maximum time to wait for the in-flight operations block + * @param timeUnit the time unit of the {@code timeout} argument + */ + public void asyncBlockOperations(final ActionListener onAcquired, final long timeout, final TimeUnit timeUnit) { delayOperations(); threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() { + + final AtomicBoolean released = new AtomicBoolean(false); + @Override public void onFailure(final Exception e) { - onFailure.accept(e); + try { + onAcquired.onFailure(e); + } finally { + releaseDelayedOperationsIfNeeded(); + } } @Override protected void doRun() throws Exception { - doBlockOperations(timeout, timeUnit, onBlocked); + final Releasable releasable = acquireAll(timeout, timeUnit); + onAcquired.onResponse(() -> { + try { + releasable.close(); + } finally { + releaseDelayedOperationsIfNeeded(); + } + }); } - @Override - public void onAfter() { - releaseDelayedOperations(); + private void releaseDelayedOperationsIfNeeded() { + if (released.compareAndSet(false, true)) { + releaseDelayedOperations(); + } } }); } @@ -154,10 +197,7 @@ private void delayOperations() { } } - private void doBlockOperations( - final long timeout, - final TimeUnit timeUnit, - final CheckedRunnable onBlocked) throws InterruptedException, TimeoutException, E { + private Releasable acquireAll(final long timeout, final TimeUnit timeUnit) throws InterruptedException, TimeoutException { if (Assertions.ENABLED) { // since delayed is not volatile, we have to synchronize even here for visibility synchronized (this) { @@ -165,12 +205,13 @@ private void doBlockOperations( } } if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) { - assert semaphore.availablePermits() == 0; - try { - onBlocked.run(); - } finally { - semaphore.release(TOTAL_PERMITS); - } + final AtomicBoolean closed = new AtomicBoolean(); + return () -> { + if (closed.compareAndSet(false, true)) { + assert semaphore.availablePermits() == 0; + semaphore.release(TOTAL_PERMITS); + } + }; } else { throw new TimeoutException("timeout while blocking operations"); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java index 23337def2ae1b..0d274032e76d8 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; @@ -48,6 +49,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -220,7 +222,7 @@ public void testGetBlockWhenBlocked() throws ExecutionException, InterruptedExce try (Releasable ignored = blockAndWait()) { permits.acquire(future, ThreadPool.Names.GENERIC, true, ""); - permits.asyncBlockOperations( + randomAsyncBlockOperations(permits, 30, TimeUnit.MINUTES, () -> { @@ -334,7 +336,7 @@ public void testAsyncBlockOperationsOperationWhileBlocked() throws InterruptedEx final CountDownLatch blockAcquired = new CountDownLatch(1); final CountDownLatch releaseBlock = new CountDownLatch(1); final AtomicBoolean blocked = new AtomicBoolean(); - permits.asyncBlockOperations( + randomAsyncBlockOperations(permits, 30, TimeUnit.MINUTES, () -> { @@ -392,7 +394,7 @@ public void testAsyncBlockOperationsOperationBeforeBlocked() throws InterruptedE // now we will delay operations while the first operation is still executing (because it is latched) final CountDownLatch blockedLatch = new CountDownLatch(1); final AtomicBoolean onBlocked = new AtomicBoolean(); - permits.asyncBlockOperations( + randomAsyncBlockOperations(permits, 30, TimeUnit.MINUTES, () -> { @@ -486,7 +488,7 @@ public void onFailure(Exception e) { } catch (final BrokenBarrierException | InterruptedException e) { throw new RuntimeException(e); } - permits.asyncBlockOperations( + randomAsyncBlockOperations(permits, 30, TimeUnit.MINUTES, () -> { @@ -559,7 +561,7 @@ public void testActiveOperationsCount() throws ExecutionException, InterruptedEx public void testAsyncBlockOperationsOnFailure() throws InterruptedException { final AtomicReference reference = new AtomicReference<>(); final CountDownLatch onFailureLatch = new CountDownLatch(1); - permits.asyncBlockOperations( + randomAsyncBlockOperations(permits, 10, TimeUnit.MINUTES, () -> { @@ -596,7 +598,7 @@ public void testTimeout() throws BrokenBarrierException, InterruptedException { { final AtomicReference reference = new AtomicReference<>(); final CountDownLatch onFailureLatch = new CountDownLatch(1); - permits.asyncBlockOperations( + randomAsyncBlockOperations(permits, 1, TimeUnit.MILLISECONDS, () -> {}, @@ -716,4 +718,32 @@ public void testPermitTraceCapturing() throws ExecutionException, InterruptedExc assertThat(permits.getActiveOperationsCount(), equalTo(0)); assertThat(permits.getActiveOperations(), emptyIterable()); } + + /** + * Randomizes the usage of {@link IndexShardOperationPermits#asyncBlockOperations(ActionListener, long, TimeUnit)} and + * {@link IndexShardOperationPermits#asyncBlockOperations(long, TimeUnit, CheckedRunnable, Consumer)} + */ + private void randomAsyncBlockOperations(final IndexShardOperationPermits permits, + final long timeout, final TimeUnit timeUnit, + final CheckedRunnable onBlocked, final Consumer onFailure) { + if (randomBoolean()) { + permits.asyncBlockOperations(timeout, timeUnit, onBlocked, onFailure); + } else { + permits.asyncBlockOperations(new ActionListener() { + @Override + public void onResponse(final Releasable releasable) { + try (Releasable ignored = releasable) { + onBlocked.run(); + } catch (final Exception e) { + onFailure.accept(e); + } + } + + @Override + public void onFailure(final Exception e) { + onFailure.accept(e); + } + }, timeout, timeUnit); + } + } } From b15647bac545db2a3459a8e677fa3cec15e0a8ef Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 5 Nov 2018 13:18:37 +0100 Subject: [PATCH 2/4] Apply feedback --- .../elasticsearch/index/shard/IndexShard.java | 33 ++-- .../shard/IndexShardOperationPermits.java | 32 ---- .../IndexShardOperationPermitsTests.java | 172 +++++++++--------- 3 files changed, 112 insertions(+), 125 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 17756630517d2..921eeb319f1a0 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2286,23 +2286,32 @@ private void bumpPrimaryTerm(long newPrimaryTerm, final Ch assert newPrimaryTerm > pendingPrimaryTerm; assert operationPrimaryTerm <= pendingPrimaryTerm; final CountDownLatch termUpdated = new CountDownLatch(1); - indexShardOperationPermits.asyncBlockOperations(30, TimeUnit.MINUTES, () -> { - assert operationPrimaryTerm <= pendingPrimaryTerm; - termUpdated.await(); - // indexShardOperationPermits doesn't guarantee that async submissions are executed - // in the order submitted. We need to guard against another term bump - if (operationPrimaryTerm < newPrimaryTerm) { - operationPrimaryTerm = newPrimaryTerm; - onBlocked.run(); - } - }, - e -> { + indexShardOperationPermits.asyncBlockOperations(new ActionListener() { + @Override + public void onFailure(final Exception e) { try { failShard("exception during primary term transition", e); } catch (AlreadyClosedException ace) { // ignore, shard is already closed } - }); + } + + @Override + public void onResponse(final Releasable releasable) { + try (Releasable ignored = releasable) { + assert operationPrimaryTerm <= pendingPrimaryTerm; + termUpdated.await(); + // indexShardOperationPermits doesn't guarantee that async submissions are executed + // in the order submitted. We need to guard against another term bump + if (operationPrimaryTerm < newPrimaryTerm) { + operationPrimaryTerm = newPrimaryTerm; + onBlocked.run(); + } + } catch (final Exception e) { + onFailure(e); + } + } + }, 30, TimeUnit.MINUTES); pendingPrimaryTerm = newPrimaryTerm; termUpdated.countDown(); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index eec190bd03b80..93572b31260d1 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -41,7 +41,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -111,37 +110,6 @@ void blockOperations( } } - /** - * Immediately delays operations and on another thread waits for in-flight operations to finish and then executes {@code onBlocked} - * under the guarantee that no new operations are started. Delayed operations are run after {@code onBlocked} has executed. After - * operations are delayed and the blocking is forked to another thread, returns to the caller. If a failure occurs while blocking - * operations or executing {@code onBlocked} then the {@code onFailure} handler will be invoked. - * - * @param timeout the maximum time to wait for the in-flight operations block - * @param timeUnit the time unit of the {@code timeout} argument - * @param onBlocked the action to run once the block has been acquired - * @param onFailure the action to run if a failure occurs while blocking operations - * @param the type of checked exception thrown by {@code onBlocked} (not thrown on the calling thread) - */ - void asyncBlockOperations(final long timeout, final TimeUnit timeUnit, - final CheckedRunnable onBlocked, final Consumer onFailure) { - asyncBlockOperations(new ActionListener() { - @Override - public void onFailure(final Exception e) { - onFailure.accept(e); - } - - @Override - public void onResponse(final Releasable releasable) { - try (Releasable ignored = releasable) { - onBlocked.run(); - } catch (final Exception e) { - onFailure.accept(e); - } - } - }, timeout, timeUnit); - } - /** * Immediately delays operations and on another thread waits for in-flight operations to finish and then acquires all permits. When all * permits are acquired, the provided {@link ActionListener} is called under the guarantee that no new operations are started. Delayed diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java index 0d274032e76d8..a72a295039645 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java @@ -20,7 +20,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; @@ -49,7 +48,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -201,8 +199,19 @@ public void testBlockIfClosed() { permits.close(); expectThrows(IndexShardClosedException.class, () -> permits.blockOperations(randomInt(10), TimeUnit.MINUTES, () -> { throw new IllegalArgumentException("fake error"); })); - expectThrows(IndexShardClosedException.class, () -> permits.asyncBlockOperations(randomInt(10), TimeUnit.MINUTES, - () -> { throw new IllegalArgumentException("fake error"); }, e -> { throw new AssertionError(e); })); + expectThrows(IndexShardClosedException.class, + () -> permits.asyncBlockOperations(new ActionListener() { + @Override + public void onResponse(final Releasable releasable) { + releasable.close(); + throw new IllegalArgumentException("fake error"); + } + + @Override + public void onFailure(final Exception e) { + throw new AssertionError(e); + } + }, randomInt(10), TimeUnit.MINUTES)); } public void testOperationsDelayedIfBlock() throws ExecutionException, InterruptedException, TimeoutException { @@ -222,17 +231,23 @@ public void testGetBlockWhenBlocked() throws ExecutionException, InterruptedExce try (Releasable ignored = blockAndWait()) { permits.acquire(future, ThreadPool.Names.GENERIC, true, ""); - randomAsyncBlockOperations(permits, - 30, - TimeUnit.MINUTES, - () -> { - blocked.set(true); - blockAcquired.countDown(); - releaseBlock.await(); - }, - e -> { + permits.asyncBlockOperations(new ActionListener() { + @Override + public void onResponse(final Releasable releasable) { + try (Releasable ignored = releasable) { + blocked.set(true); + blockAcquired.countDown(); + releaseBlock.await(); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void onFailure(final Exception e) { throw new RuntimeException(e); - }); + } + }, 30, TimeUnit.MINUTES); assertFalse(blocked.get()); assertFalse(future.isDone()); } @@ -294,7 +309,7 @@ public void onResponse(Releasable releasable) { future2.get(1, TimeUnit.HOURS).close(); } - protected Releasable blockAndWait() throws InterruptedException { + private Releasable blockAndWait() throws InterruptedException { CountDownLatch blockAcquired = new CountDownLatch(1); CountDownLatch releaseBlock = new CountDownLatch(1); CountDownLatch blockReleased = new CountDownLatch(1); @@ -336,17 +351,23 @@ public void testAsyncBlockOperationsOperationWhileBlocked() throws InterruptedEx final CountDownLatch blockAcquired = new CountDownLatch(1); final CountDownLatch releaseBlock = new CountDownLatch(1); final AtomicBoolean blocked = new AtomicBoolean(); - randomAsyncBlockOperations(permits, - 30, - TimeUnit.MINUTES, - () -> { + permits.asyncBlockOperations(new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + try (Releasable ignored = releasable) { blocked.set(true); blockAcquired.countDown(); releaseBlock.await(); - }, - e -> { + } catch (final InterruptedException e) { throw new RuntimeException(e); - }); + } + } + + @Override + public void onFailure(final Exception e) { + throw new RuntimeException(e); + } + }, 30, TimeUnit.MINUTES); blockAcquired.await(); assertTrue(blocked.get()); @@ -394,16 +415,20 @@ public void testAsyncBlockOperationsOperationBeforeBlocked() throws InterruptedE // now we will delay operations while the first operation is still executing (because it is latched) final CountDownLatch blockedLatch = new CountDownLatch(1); final AtomicBoolean onBlocked = new AtomicBoolean(); - randomAsyncBlockOperations(permits, - 30, - TimeUnit.MINUTES, - () -> { + permits.asyncBlockOperations(new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + try (Releasable ignored = releasable) { onBlocked.set(true); blockedLatch.countDown(); - }, e -> { - throw new RuntimeException(e); - }); + } + } + @Override + public void onFailure(final Exception e) { + throw new RuntimeException(e); + } + }, 30, TimeUnit.MINUTES); assertFalse(onBlocked.get()); // if we submit another operation, it should be delayed @@ -488,15 +513,20 @@ public void onFailure(Exception e) { } catch (final BrokenBarrierException | InterruptedException e) { throw new RuntimeException(e); } - randomAsyncBlockOperations(permits, - 30, - TimeUnit.MINUTES, - () -> { + permits.asyncBlockOperations(new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + try (Releasable ignored = releasable) { values.add(operations); operationLatch.countDown(); - }, e -> { - throw new RuntimeException(e); - }); + } + } + + @Override + public void onFailure(final Exception e) { + throw new RuntimeException(e); + } + }, 30, TimeUnit.MINUTES); }); blockingThread.start(); @@ -561,16 +591,20 @@ public void testActiveOperationsCount() throws ExecutionException, InterruptedEx public void testAsyncBlockOperationsOnFailure() throws InterruptedException { final AtomicReference reference = new AtomicReference<>(); final CountDownLatch onFailureLatch = new CountDownLatch(1); - randomAsyncBlockOperations(permits, - 10, - TimeUnit.MINUTES, - () -> { + permits.asyncBlockOperations(new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + try (Releasable ignored = releasable) { throw new RuntimeException("simulated"); - }, - e -> { - reference.set(e); - onFailureLatch.countDown(); - }); + } + } + + @Override + public void onFailure(final Exception e) { + reference.set(e); + onFailureLatch.countDown(); + } + }, 10, TimeUnit.MINUTES); onFailureLatch.await(); assertThat(reference.get(), instanceOf(RuntimeException.class)); assertThat(reference.get(), hasToString(containsString("simulated"))); @@ -598,14 +632,18 @@ public void testTimeout() throws BrokenBarrierException, InterruptedException { { final AtomicReference reference = new AtomicReference<>(); final CountDownLatch onFailureLatch = new CountDownLatch(1); - randomAsyncBlockOperations(permits, - 1, - TimeUnit.MILLISECONDS, - () -> {}, - e -> { - reference.set(e); - onFailureLatch.countDown(); - }); + permits.asyncBlockOperations(new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + releasable.close(); + } + + @Override + public void onFailure(final Exception e) { + reference.set(e); + onFailureLatch.countDown(); + } + }, 1, TimeUnit.MILLISECONDS); onFailureLatch.await(); assertThat(reference.get(), hasToString(containsString("timeout while blocking operations"))); } @@ -718,32 +756,4 @@ public void testPermitTraceCapturing() throws ExecutionException, InterruptedExc assertThat(permits.getActiveOperationsCount(), equalTo(0)); assertThat(permits.getActiveOperations(), emptyIterable()); } - - /** - * Randomizes the usage of {@link IndexShardOperationPermits#asyncBlockOperations(ActionListener, long, TimeUnit)} and - * {@link IndexShardOperationPermits#asyncBlockOperations(long, TimeUnit, CheckedRunnable, Consumer)} - */ - private void randomAsyncBlockOperations(final IndexShardOperationPermits permits, - final long timeout, final TimeUnit timeUnit, - final CheckedRunnable onBlocked, final Consumer onFailure) { - if (randomBoolean()) { - permits.asyncBlockOperations(timeout, timeUnit, onBlocked, onFailure); - } else { - permits.asyncBlockOperations(new ActionListener() { - @Override - public void onResponse(final Releasable releasable) { - try (Releasable ignored = releasable) { - onBlocked.run(); - } catch (final Exception e) { - onFailure.accept(e); - } - } - - @Override - public void onFailure(final Exception e) { - onFailure.accept(e); - } - }, timeout, timeUnit); - } - } } From ef1a506f9f5f4e0a39e78a3e319e760e295d834c Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 7 Nov 2018 11:25:29 +0100 Subject: [PATCH 3/4] Apply feedback --- .../shard/IndexShardOperationPermits.java | 6 +- .../IndexShardOperationPermitsTests.java | 113 ++++++------------ 2 files changed, 42 insertions(+), 77 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 93572b31260d1..67c48c38791f0 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -114,7 +114,7 @@ void blockOperations( * Immediately delays operations and on another thread waits for in-flight operations to finish and then acquires all permits. When all * permits are acquired, the provided {@link ActionListener} is called under the guarantee that no new operations are started. Delayed * operations are run once the {@link Releasable} is released or if a failure occurs while acquiring all permits; in this case the - * {@code onFailure} handler will be invoked before running delayed operations. + * {@code onFailure} handler will be invoked after delayed operations are released. * * @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed * @param timeout the maximum time to wait for the in-flight operations block @@ -129,9 +129,9 @@ public void asyncBlockOperations(final ActionListener onAcquired, fi @Override public void onFailure(final Exception e) { try { - onAcquired.onFailure(e); + releaseDelayedOperationsIfNeeded(); // resume delayed operations as soon as possible } finally { - releaseDelayedOperationsIfNeeded(); + onAcquired.onFailure(e); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java index a72a295039645..14f52b0cf6e6f 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; @@ -200,18 +201,8 @@ public void testBlockIfClosed() { expectThrows(IndexShardClosedException.class, () -> permits.blockOperations(randomInt(10), TimeUnit.MINUTES, () -> { throw new IllegalArgumentException("fake error"); })); expectThrows(IndexShardClosedException.class, - () -> permits.asyncBlockOperations(new ActionListener() { - @Override - public void onResponse(final Releasable releasable) { - releasable.close(); - throw new IllegalArgumentException("fake error"); - } - - @Override - public void onFailure(final Exception e) { - throw new AssertionError(e); - } - }, randomInt(10), TimeUnit.MINUTES)); + () -> permits.asyncBlockOperations(wrap(() -> { throw new IllegalArgumentException("fake error");}), + randomInt(10), TimeUnit.MINUTES)); } public void testOperationsDelayedIfBlock() throws ExecutionException, InterruptedException, TimeoutException { @@ -231,23 +222,11 @@ public void testGetBlockWhenBlocked() throws ExecutionException, InterruptedExce try (Releasable ignored = blockAndWait()) { permits.acquire(future, ThreadPool.Names.GENERIC, true, ""); - permits.asyncBlockOperations(new ActionListener() { - @Override - public void onResponse(final Releasable releasable) { - try (Releasable ignored = releasable) { - blocked.set(true); - blockAcquired.countDown(); - releaseBlock.await(); - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - - @Override - public void onFailure(final Exception e) { - throw new RuntimeException(e); - } - }, 30, TimeUnit.MINUTES); + permits.asyncBlockOperations(wrap(() -> { + blocked.set(true); + blockAcquired.countDown(); + releaseBlock.await(); + }), 30, TimeUnit.MINUTES); assertFalse(blocked.get()); assertFalse(future.isDone()); } @@ -351,23 +330,11 @@ public void testAsyncBlockOperationsOperationWhileBlocked() throws InterruptedEx final CountDownLatch blockAcquired = new CountDownLatch(1); final CountDownLatch releaseBlock = new CountDownLatch(1); final AtomicBoolean blocked = new AtomicBoolean(); - permits.asyncBlockOperations(new ActionListener() { - @Override - public void onResponse(Releasable releasable) { - try (Releasable ignored = releasable) { - blocked.set(true); - blockAcquired.countDown(); - releaseBlock.await(); - } catch (final InterruptedException e) { - throw new RuntimeException(e); - } - } - - @Override - public void onFailure(final Exception e) { - throw new RuntimeException(e); - } - }, 30, TimeUnit.MINUTES); + permits.asyncBlockOperations(wrap(() -> { + blocked.set(true); + blockAcquired.countDown(); + releaseBlock.await(); + }), 30, TimeUnit.MINUTES); blockAcquired.await(); assertTrue(blocked.get()); @@ -415,20 +382,10 @@ public void testAsyncBlockOperationsOperationBeforeBlocked() throws InterruptedE // now we will delay operations while the first operation is still executing (because it is latched) final CountDownLatch blockedLatch = new CountDownLatch(1); final AtomicBoolean onBlocked = new AtomicBoolean(); - permits.asyncBlockOperations(new ActionListener() { - @Override - public void onResponse(Releasable releasable) { - try (Releasable ignored = releasable) { - onBlocked.set(true); - blockedLatch.countDown(); - } - } - - @Override - public void onFailure(final Exception e) { - throw new RuntimeException(e); - } - }, 30, TimeUnit.MINUTES); + permits.asyncBlockOperations(wrap(() -> { + onBlocked.set(true); + blockedLatch.countDown(); + }), 30, TimeUnit.MINUTES); assertFalse(onBlocked.get()); // if we submit another operation, it should be delayed @@ -513,20 +470,10 @@ public void onFailure(Exception e) { } catch (final BrokenBarrierException | InterruptedException e) { throw new RuntimeException(e); } - permits.asyncBlockOperations(new ActionListener() { - @Override - public void onResponse(Releasable releasable) { - try (Releasable ignored = releasable) { - values.add(operations); - operationLatch.countDown(); - } - } - - @Override - public void onFailure(final Exception e) { - throw new RuntimeException(e); - } - }, 30, TimeUnit.MINUTES); + permits.asyncBlockOperations(wrap(() -> { + values.add(operations); + operationLatch.countDown(); + }), 30, TimeUnit.MINUTES); }); blockingThread.start(); @@ -756,4 +703,22 @@ public void testPermitTraceCapturing() throws ExecutionException, InterruptedExc assertThat(permits.getActiveOperationsCount(), equalTo(0)); assertThat(permits.getActiveOperations(), emptyIterable()); } + + private static ActionListener wrap(final CheckedRunnable onResponse) { + return new ActionListener() { + @Override + public void onResponse(final Releasable releasable) { + try (Releasable ignored = releasable) { + onResponse.run(); + } catch (final Exception e) { + onFailure(e); + } + } + + @Override + public void onFailure(final Exception e) { + throw new RuntimeException(e); + } + }; + } } From 338b6ae459fbeb2c83873e4086026ae4d570fc2a Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 7 Nov 2018 14:05:11 +0100 Subject: [PATCH 4/4] throw new AssertionError(e); --- .../index/shard/IndexShardOperationPermitsTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java index 14f52b0cf6e6f..a785c2c4d8224 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java @@ -717,7 +717,7 @@ public void onResponse(final Releasable releasable) { @Override public void onFailure(final Exception e) { - throw new RuntimeException(e); + throw new AssertionError(e); } }; }