Skip to content

Commit

Permalink
Always try to bump the prim term + add a test
Browse files Browse the repository at this point in the history
  • Loading branch information
tlrx committed Dec 3, 2018
1 parent d23d993 commit 1f875b0
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2333,7 +2333,7 @@ default void onFailure(final Exception e) {

private void bumpPrimaryTerm(final long newPrimaryTerm, final PrimaryTermUpdateListener listener) {
assert Thread.holdsLock(mutex);
assert newPrimaryTerm > pendingPrimaryTerm;
assert newPrimaryTerm >= pendingPrimaryTerm;
assert operationPrimaryTerm <= pendingPrimaryTerm;
final CountDownLatch termUpdated = new CountDownLatch(1);
indexShardOperationPermits.asyncBlockOperations(new ActionListener<Releasable>() {
Expand Down Expand Up @@ -2467,9 +2467,9 @@ public void onFailure(final Exception e) {
}
};

if (opPrimaryTerm > pendingPrimaryTerm) {
if (requirePrimaryTermUpdate(opPrimaryTerm, allowCombineOperationWithPrimaryTermUpdate)) {
synchronized (mutex) {
if (opPrimaryTerm > pendingPrimaryTerm) {
if (requirePrimaryTermUpdate(opPrimaryTerm, allowCombineOperationWithPrimaryTermUpdate)) {
final IndexShardState shardState = state();
// only roll translog and update primary term if shard has made it past recovery
// Having a new primary term here means that the old primary failed and that there is a new primary, which again
Expand Down Expand Up @@ -2524,6 +2524,10 @@ public void onFailure(Exception e) {
operationExecutor.accept(operationListener);
}

private boolean requirePrimaryTermUpdate(final long opPrimaryTerm, final boolean allPermits) {
return (opPrimaryTerm > pendingPrimaryTerm) || (allPermits && opPrimaryTerm > operationPrimaryTerm);
}

public int getActiveOperationsCount() {
// refCount is incremented on successful acquire and decremented on close
return indexShardOperationPermits.getActiveOperationsCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3596,6 +3596,63 @@ public void testResetEngine() throws Exception {
closeShard(shard, false);
}

public void testConcurrentAcquireAllReplicaOperationsPermitsWithPrimaryTermUpdate() throws Exception {
final IndexShard replica = newStartedShard(false);
indexOnReplicaWithGaps(replica, between(0, 1000), Math.toIntExact(replica.getLocalCheckpoint()));

final int operations = scaledRandomIntBetween(10, 64);
final Thread[] threads = new Thread[operations];
final CyclicBarrier startBarrier = new CyclicBarrier(1 + operations);
final CountDownLatch latch = new CountDownLatch(operations);

final AtomicArray<Tuple<Long, Long>> opsTerms = new AtomicArray<>(operations);
final long futurePrimaryTerm = replica.getOperationPrimaryTerm() + 1;

for (int i = 0; i < operations; i++) {
final int threadId = i;
threads[threadId] = new Thread(() -> {
try {
startBarrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
replica.acquireAllReplicaOperationsPermits(
futurePrimaryTerm,
replica.getGlobalCheckpoint(),
replica.getMaxSeqNoOfUpdatesOrDeletes(),
new ActionListener<Releasable>() {
@Override
public void onResponse(final Releasable releasable) {
try (Releasable ignored = releasable) {
opsTerms.set(threadId, Tuple.tuple(replica.getPendingPrimaryTerm(), replica.getOperationPrimaryTerm()));
latch.countDown();
}
}

@Override
public void onFailure(final Exception e) {
latch.countDown();
throw new RuntimeException(e);
}
}, TimeValue.timeValueMinutes(30L));
});
threads[threadId].start();
}

startBarrier.await();
latch.await();

for (Tuple<Long, Long> opTerms : opsTerms.asList()) {
assertNotNull(opTerms);
assertEquals("Expected primary term and pending primary term captured during execution must match",
futurePrimaryTerm, opTerms.v1().longValue());
assertEquals("Expected primary term and operation primary term captured during execution must match",
futurePrimaryTerm, opTerms.v2().longValue());
}
closeShard(replica, false);
}


@Override
public Settings threadPoolSettings() {
return Settings.builder().put(super.threadPoolSettings()).put("thread_pool.estimated_time_interval", "5ms").build();
Expand Down

0 comments on commit 1f875b0

Please sign in to comment.