Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RCI] Add IndexShardOperationPermits.asyncBlockOperations(ActionListener<Releasable>) #34902

Merged
merged 4 commits into from
Nov 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 21 additions & 12 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2286,23 +2286,32 @@ private <E extends Exception> void bumpPrimaryTerm(long newPrimaryTerm, final Ch
assert newPrimaryTerm > pendingPrimaryTerm;
assert operationPrimaryTerm <= pendingPrimaryTerm;
final CountDownLatch termUpdated = new CountDownLatch(1);
indexShardOperationPermits.asyncBlockOperations(30, TimeUnit.MINUTES, () -> {
assert operationPrimaryTerm <= pendingPrimaryTerm;
termUpdated.await();
// indexShardOperationPermits doesn't guarantee that async submissions are executed
// in the order submitted. We need to guard against another term bump
if (operationPrimaryTerm < newPrimaryTerm) {
operationPrimaryTerm = newPrimaryTerm;
onBlocked.run();
}
},
e -> {
indexShardOperationPermits.asyncBlockOperations(new ActionListener<Releasable>() {
@Override
public void onFailure(final Exception e) {
try {
failShard("exception during primary term transition", e);
} catch (AlreadyClosedException ace) {
// ignore, shard is already closed
}
});
}

@Override
public void onResponse(final Releasable releasable) {
try (Releasable ignored = releasable) {
assert operationPrimaryTerm <= pendingPrimaryTerm;
termUpdated.await();
// indexShardOperationPermits doesn't guarantee that async submissions are executed
// in the order submitted. We need to guard against another term bump
if (operationPrimaryTerm < newPrimaryTerm) {
operationPrimaryTerm = newPrimaryTerm;
onBlocked.run();
}
} catch (final Exception e) {
onFailure(e);
}
}
}, 30, TimeUnit.MINUTES);
pendingPrimaryTerm = newPrimaryTerm;
termUpdated.countDown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -104,42 +103,54 @@ <E extends Exception> void blockOperations(
final TimeUnit timeUnit,
final CheckedRunnable<E> onBlocked) throws InterruptedException, TimeoutException, E {
delayOperations();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can fold delayOperations and releaseDelayedOperations into acquireAll which will simplify the code here and in asyncBlockOperations

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of asyncBlockOperations() we want to delay operations immediately and if we fold delayOperations() into acquireAll(), operations would start to be queued when the runnable is executed by the generic thread pool, not immediately - or am I missing something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed that. Fair point.

try {
doBlockOperations(timeout, timeUnit, onBlocked);
try (Releasable ignored = acquireAll(timeout, timeUnit)) {
onBlocked.run();
} finally {
releaseDelayedOperations();
}
}

/**
* Immediately delays operations and on another thread waits for in-flight operations to finish and then executes {@code onBlocked}
* under the guarantee that no new operations are started. Delayed operations are run after {@code onBlocked} has executed. After
* operations are delayed and the blocking is forked to another thread, returns to the caller. If a failure occurs while blocking
* operations or executing {@code onBlocked} then the {@code onFailure} handler will be invoked.
* Immediately delays operations and on another thread waits for in-flight operations to finish and then acquires all permits. When all
* permits are acquired, the provided {@link ActionListener} is called under the guarantee that no new operations are started. Delayed
* operations are run once the {@link Releasable} is released or if a failure occurs while acquiring all permits; in this case the
* {@code onFailure} handler will be invoked after delayed operations are released.
*
* @param timeout the maximum time to wait for the in-flight operations block
* @param timeUnit the time unit of the {@code timeout} argument
* @param onBlocked the action to run once the block has been acquired
* @param onFailure the action to run if a failure occurs while blocking operations
* @param <E> the type of checked exception thrown by {@code onBlocked} (not thrown on the calling thread)
* @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed
* @param timeout the maximum time to wait for the in-flight operations block
* @param timeUnit the time unit of the {@code timeout} argument
*/
<E extends Exception> void asyncBlockOperations(
final long timeout, final TimeUnit timeUnit, final CheckedRunnable<E> onBlocked, final Consumer<Exception> onFailure) {
public void asyncBlockOperations(final ActionListener<Releasable> onAcquired, final long timeout, final TimeUnit timeUnit) {
delayOperations();
threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() {

final AtomicBoolean released = new AtomicBoolean(false);

@Override
public void onFailure(final Exception e) {
onFailure.accept(e);
try {
s1monw marked this conversation as resolved.
Show resolved Hide resolved
releaseDelayedOperationsIfNeeded(); // resume delayed operations as soon as possible
} finally {
onAcquired.onFailure(e);
}
}

@Override
protected void doRun() throws Exception {
doBlockOperations(timeout, timeUnit, onBlocked);
final Releasable releasable = acquireAll(timeout, timeUnit);
onAcquired.onResponse(() -> {
try {
releasable.close();
} finally {
releaseDelayedOperationsIfNeeded();
}
});
}

@Override
public void onAfter() {
releaseDelayedOperations();
private void releaseDelayedOperationsIfNeeded() {
if (released.compareAndSet(false, true)) {
releaseDelayedOperations();
}
}
});
}
Expand All @@ -154,23 +165,21 @@ private void delayOperations() {
}
}

private <E extends Exception> void doBlockOperations(
final long timeout,
final TimeUnit timeUnit,
final CheckedRunnable<E> onBlocked) throws InterruptedException, TimeoutException, E {
private Releasable acquireAll(final long timeout, final TimeUnit timeUnit) throws InterruptedException, TimeoutException {
if (Assertions.ENABLED) {
// since delayed is not volatile, we have to synchronize even here for visibility
synchronized (this) {
assert queuedBlockOperations > 0;
}
}
if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) {
assert semaphore.availablePermits() == 0;
try {
onBlocked.run();
} finally {
semaphore.release(TOTAL_PERMITS);
}
final AtomicBoolean closed = new AtomicBoolean();
return () -> {
if (closed.compareAndSet(false, true)) {
assert semaphore.availablePermits() == 0;
semaphore.release(TOTAL_PERMITS);
}
};
} else {
throw new TimeoutException("timeout while blocking operations");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
Expand Down Expand Up @@ -199,8 +200,9 @@ public void testBlockIfClosed() {
permits.close();
expectThrows(IndexShardClosedException.class, () -> permits.blockOperations(randomInt(10), TimeUnit.MINUTES,
() -> { throw new IllegalArgumentException("fake error"); }));
expectThrows(IndexShardClosedException.class, () -> permits.asyncBlockOperations(randomInt(10), TimeUnit.MINUTES,
() -> { throw new IllegalArgumentException("fake error"); }, e -> { throw new AssertionError(e); }));
expectThrows(IndexShardClosedException.class,
() -> permits.asyncBlockOperations(wrap(() -> { throw new IllegalArgumentException("fake error");}),
randomInt(10), TimeUnit.MINUTES));
}

public void testOperationsDelayedIfBlock() throws ExecutionException, InterruptedException, TimeoutException {
Expand All @@ -220,17 +222,11 @@ public void testGetBlockWhenBlocked() throws ExecutionException, InterruptedExce
try (Releasable ignored = blockAndWait()) {
permits.acquire(future, ThreadPool.Names.GENERIC, true, "");

permits.asyncBlockOperations(
30,
TimeUnit.MINUTES,
() -> {
blocked.set(true);
blockAcquired.countDown();
releaseBlock.await();
},
e -> {
throw new RuntimeException(e);
});
permits.asyncBlockOperations(wrap(() -> {
blocked.set(true);
blockAcquired.countDown();
releaseBlock.await();
}), 30, TimeUnit.MINUTES);
assertFalse(blocked.get());
assertFalse(future.isDone());
}
Expand Down Expand Up @@ -292,7 +288,7 @@ public void onResponse(Releasable releasable) {
future2.get(1, TimeUnit.HOURS).close();
}

protected Releasable blockAndWait() throws InterruptedException {
private Releasable blockAndWait() throws InterruptedException {
CountDownLatch blockAcquired = new CountDownLatch(1);
CountDownLatch releaseBlock = new CountDownLatch(1);
CountDownLatch blockReleased = new CountDownLatch(1);
Expand Down Expand Up @@ -334,17 +330,11 @@ public void testAsyncBlockOperationsOperationWhileBlocked() throws InterruptedEx
final CountDownLatch blockAcquired = new CountDownLatch(1);
final CountDownLatch releaseBlock = new CountDownLatch(1);
final AtomicBoolean blocked = new AtomicBoolean();
permits.asyncBlockOperations(
30,
TimeUnit.MINUTES,
() -> {
blocked.set(true);
blockAcquired.countDown();
releaseBlock.await();
},
e -> {
throw new RuntimeException(e);
});
permits.asyncBlockOperations(wrap(() -> {
blocked.set(true);
blockAcquired.countDown();
releaseBlock.await();
}), 30, TimeUnit.MINUTES);
blockAcquired.await();
assertTrue(blocked.get());

Expand Down Expand Up @@ -392,16 +382,10 @@ public void testAsyncBlockOperationsOperationBeforeBlocked() throws InterruptedE
// now we will delay operations while the first operation is still executing (because it is latched)
final CountDownLatch blockedLatch = new CountDownLatch(1);
final AtomicBoolean onBlocked = new AtomicBoolean();
permits.asyncBlockOperations(
30,
TimeUnit.MINUTES,
() -> {
onBlocked.set(true);
blockedLatch.countDown();
}, e -> {
throw new RuntimeException(e);
});

permits.asyncBlockOperations(wrap(() -> {
onBlocked.set(true);
blockedLatch.countDown();
}), 30, TimeUnit.MINUTES);
assertFalse(onBlocked.get());

// if we submit another operation, it should be delayed
Expand Down Expand Up @@ -486,15 +470,10 @@ public void onFailure(Exception e) {
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
permits.asyncBlockOperations(
30,
TimeUnit.MINUTES,
() -> {
values.add(operations);
operationLatch.countDown();
}, e -> {
throw new RuntimeException(e);
});
permits.asyncBlockOperations(wrap(() -> {
values.add(operations);
operationLatch.countDown();
}), 30, TimeUnit.MINUTES);
});
blockingThread.start();

Expand Down Expand Up @@ -559,16 +538,20 @@ public void testActiveOperationsCount() throws ExecutionException, InterruptedEx
public void testAsyncBlockOperationsOnFailure() throws InterruptedException {
final AtomicReference<Exception> reference = new AtomicReference<>();
final CountDownLatch onFailureLatch = new CountDownLatch(1);
permits.asyncBlockOperations(
10,
TimeUnit.MINUTES,
() -> {
permits.asyncBlockOperations(new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
try (Releasable ignored = releasable) {
throw new RuntimeException("simulated");
},
e -> {
reference.set(e);
onFailureLatch.countDown();
});
}
}

@Override
public void onFailure(final Exception e) {
reference.set(e);
onFailureLatch.countDown();
}
}, 10, TimeUnit.MINUTES);
onFailureLatch.await();
assertThat(reference.get(), instanceOf(RuntimeException.class));
assertThat(reference.get(), hasToString(containsString("simulated")));
Expand Down Expand Up @@ -596,14 +579,18 @@ public void testTimeout() throws BrokenBarrierException, InterruptedException {
{
final AtomicReference<Exception> reference = new AtomicReference<>();
final CountDownLatch onFailureLatch = new CountDownLatch(1);
permits.asyncBlockOperations(
1,
TimeUnit.MILLISECONDS,
() -> {},
e -> {
reference.set(e);
onFailureLatch.countDown();
});
permits.asyncBlockOperations(new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
releasable.close();
}

@Override
public void onFailure(final Exception e) {
reference.set(e);
onFailureLatch.countDown();
}
}, 1, TimeUnit.MILLISECONDS);
onFailureLatch.await();
assertThat(reference.get(), hasToString(containsString("timeout while blocking operations")));
}
Expand Down Expand Up @@ -716,4 +703,22 @@ public void testPermitTraceCapturing() throws ExecutionException, InterruptedExc
assertThat(permits.getActiveOperationsCount(), equalTo(0));
assertThat(permits.getActiveOperations(), emptyIterable());
}

private static ActionListener<Releasable> wrap(final CheckedRunnable<Exception> onResponse) {
return new ActionListener<Releasable>() {
@Override
public void onResponse(final Releasable releasable) {
try (Releasable ignored = releasable) {
onResponse.run();
} catch (final Exception e) {
onFailure(e);
}
}

@Override
public void onFailure(final Exception e) {
throw new AssertionError(e);
}
};
}
}