From d23d9930b5c9df0ae06da5c5397531c5d95eaf49 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 26 Nov 2018 16:31:29 +0100 Subject: [PATCH 1/5] Combine primary term update with all permit operation execution --- .../elasticsearch/index/shard/IndexShard.java | 136 ++++++++++++------ .../index/shard/IndexShardTests.java | 3 - 2 files changed, 96 insertions(+), 43 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index ed4a744d0a025..d13b2e7acadd9 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -48,7 +48,6 @@ import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Booleans; -import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -63,6 +62,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AsyncIOProcessor; +import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.Index; @@ -2316,7 +2316,22 @@ public void acquireAllPrimaryOperationsPermits(final ActionListener indexShardOperationPermits.asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit()); } - private void bumpPrimaryTerm(final long newPrimaryTerm, final CheckedRunnable onBlocked) { + @FunctionalInterface + private interface PrimaryTermUpdateListener extends ActionListener { + + void onPrimaryTermUpdate() throws Exception; + + @Override + default void onResponse(final Releasable releasable) { + Releasables.close(releasable); + } + + @Override + default void onFailure(final Exception e) { + } + } + + private void bumpPrimaryTerm(final long newPrimaryTerm, final PrimaryTermUpdateListener listener) { assert Thread.holdsLock(mutex); assert newPrimaryTerm > pendingPrimaryTerm; assert operationPrimaryTerm <= pendingPrimaryTerm; @@ -2328,22 +2343,32 @@ public void onFailure(final Exception e) { failShard("exception during primary term transition", e); } catch (AlreadyClosedException ace) { // ignore, shard is already closed + } finally { + listener.onFailure(e); } } @Override public void onResponse(final Releasable releasable) { - try (Releasable ignored = releasable) { + final RunOnce releaseOnce = new RunOnce(releasable::close); + boolean success = false; + try { assert operationPrimaryTerm <= pendingPrimaryTerm; termUpdated.await(); // indexShardOperationPermits doesn't guarantee that async submissions are executed // in the order submitted. We need to guard against another term bump if (operationPrimaryTerm < newPrimaryTerm) { operationPrimaryTerm = newPrimaryTerm; - onBlocked.run(); + listener.onPrimaryTermUpdate(); } + listener.onResponse(releaseOnce::run); + success = true; } catch (final Exception e) { onFailure(e); + } finally { + if (success == false) { + releaseOnce.run(); + } } } }, 30, TimeUnit.MINUTES); @@ -2371,7 +2396,7 @@ public void onResponse(final Releasable releasable) { public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes, final ActionListener onPermitAcquired, final String executorOnDelay, final Object debugInfo) { - innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired, + innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired, false, (listener) -> indexShardOperationPermits.acquire(listener, executorOnDelay, true, debugInfo)); } @@ -2393,7 +2418,7 @@ public void acquireAllReplicaOperationsPermits(final long opPrimaryTerm, final long maxSeqNoOfUpdatesOrDeletes, final ActionListener onPermitAcquired, final TimeValue timeout) { - innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired, + innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired, true, (listener) -> indexShardOperationPermits.asyncBlockOperations(listener, timeout.duration(), timeout.timeUnit())); } @@ -2401,41 +2426,16 @@ private void innerAcquireReplicaOperationPermit(final long opPrimaryTerm, final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes, final ActionListener onPermitAcquired, - final Consumer> consumer) { + final boolean allowCombineOperationWithPrimaryTermUpdate, + final Consumer> operationExecutor) { verifyNotClosed(); - if (opPrimaryTerm > pendingPrimaryTerm) { - synchronized (mutex) { - if (opPrimaryTerm > pendingPrimaryTerm) { - final IndexShardState shardState = state(); - // only roll translog and update primary term if shard has made it past recovery - // Having a new primary term here means that the old primary failed and that there is a new primary, which again - // means that the master will fail this shard as all initializing shards are failed when a primary is selected - // We abort early here to prevent an ongoing recovery from the failed primary to mess with the global / local checkpoint - if (shardState != IndexShardState.POST_RECOVERY && - shardState != IndexShardState.STARTED) { - throw new IndexShardNotStartedException(shardId, shardState); - } - if (opPrimaryTerm > pendingPrimaryTerm) { - bumpPrimaryTerm(opPrimaryTerm, () -> { - updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition"); - final long currentGlobalCheckpoint = getGlobalCheckpoint(); - final long maxSeqNo = seqNoStats().getMaxSeqNo(); - logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]", - opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo); - if (currentGlobalCheckpoint < maxSeqNo) { - resetEngineToGlobalCheckpoint(); - } else { - getEngine().rollTranslogGeneration(); - } - }); - } - } - } - } - assert opPrimaryTerm <= pendingPrimaryTerm - : "operation primary term [" + opPrimaryTerm + "] should be at most [" + pendingPrimaryTerm + "]"; - consumer.accept(new ActionListener() { + // This listener is used for the execution of the operation. If the operation requires all the permits for its + // execution and the primary term must be updated first, we can combine the operation execution with the + // primary term update. Since indexShardOperationPermits doesn't guarantee that async submissions are executed + // in the order submitted, combining both operations ensure that the term is updated before the operation is + // executed. It also has the side effect of acquiring all the permits one time instead of two. + final ActionListener operationListener = new ActionListener() { @Override public void onResponse(final Releasable releasable) { if (opPrimaryTerm < operationPrimaryTerm) { @@ -2465,7 +2465,63 @@ public void onResponse(final Releasable releasable) { public void onFailure(final Exception e) { onPermitAcquired.onFailure(e); } - }); + }; + + if (opPrimaryTerm > pendingPrimaryTerm) { + synchronized (mutex) { + if (opPrimaryTerm > pendingPrimaryTerm) { + final IndexShardState shardState = state(); + // only roll translog and update primary term if shard has made it past recovery + // Having a new primary term here means that the old primary failed and that there is a new primary, which again + // means that the master will fail this shard as all initializing shards are failed when a primary is selected + // We abort early here to prevent an ongoing recovery from the failed primary to mess with the global / local checkpoint + if (shardState != IndexShardState.POST_RECOVERY && + shardState != IndexShardState.STARTED) { + throw new IndexShardNotStartedException(shardId, shardState); + } + + bumpPrimaryTerm(opPrimaryTerm, new PrimaryTermUpdateListener() { + @Override + public void onPrimaryTermUpdate() throws Exception { + updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition"); + final long currentGlobalCheckpoint = getGlobalCheckpoint(); + final long maxSeqNo = seqNoStats().getMaxSeqNo(); + logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]", + opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo); + if (currentGlobalCheckpoint < maxSeqNo) { + resetEngineToGlobalCheckpoint(); + } else { + getEngine().rollTranslogGeneration(); + } + } + + @Override + public void onResponse(final Releasable releasable) { + if (allowCombineOperationWithPrimaryTermUpdate) { + operationListener.onResponse(releasable); + } else { + Releasables.close(releasable); + } + } + + @Override + public void onFailure(Exception e) { + if (allowCombineOperationWithPrimaryTermUpdate) { + operationListener.onFailure(e); + } + } + }); + + if (allowCombineOperationWithPrimaryTermUpdate) { + logger.debug("operation execution has been combined with primary term update"); + return; + } + } + } + } + assert opPrimaryTerm <= pendingPrimaryTerm + : "operation primary term [" + opPrimaryTerm + "] should be at most [" + pendingPrimaryTerm + "]"; + operationExecutor.accept(operationListener); } public int getActiveOperationsCount() { diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 970e8ed5c9fab..cccc51b8017d3 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -732,7 +732,6 @@ private Releasable acquireReplicaOperationPermitBlockingly(IndexShard indexShard return fut.get(); } - @AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/35850") public void testOperationPermitOnReplicaShards() throws Exception { final ShardId shardId = new ShardId("test", "_na_", 0); final IndexShard indexShard; @@ -1023,7 +1022,6 @@ public void testGlobalCheckpointSync() throws IOException { closeShards(replicaShard, primaryShard); } - @AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/35850") public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException, InterruptedException { final IndexShard indexShard = newStartedShard(false); final int operations = 1024 - scaledRandomIntBetween(0, 1024); @@ -1088,7 +1086,6 @@ public void onFailure(Exception e) { closeShard(indexShard, false); } - @AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/35850") public void testRollbackReplicaEngineOnPromotion() throws IOException, InterruptedException { final IndexShard indexShard = newStartedShard(false); From 1f875b03a2c39974241b3bdd505a6220bfb572d9 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 26 Nov 2018 16:31:29 +0100 Subject: [PATCH 2/5] Always try to bump the prim term + add a test --- .../elasticsearch/index/shard/IndexShard.java | 10 +++- .../index/shard/IndexShardTests.java | 57 +++++++++++++++++++ 2 files changed, 64 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d13b2e7acadd9..ad06d8449a06b 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2333,7 +2333,7 @@ default void onFailure(final Exception e) { private void bumpPrimaryTerm(final long newPrimaryTerm, final PrimaryTermUpdateListener listener) { assert Thread.holdsLock(mutex); - assert newPrimaryTerm > pendingPrimaryTerm; + assert newPrimaryTerm >= pendingPrimaryTerm; assert operationPrimaryTerm <= pendingPrimaryTerm; final CountDownLatch termUpdated = new CountDownLatch(1); indexShardOperationPermits.asyncBlockOperations(new ActionListener() { @@ -2467,9 +2467,9 @@ public void onFailure(final Exception e) { } }; - if (opPrimaryTerm > pendingPrimaryTerm) { + if (requirePrimaryTermUpdate(opPrimaryTerm, allowCombineOperationWithPrimaryTermUpdate)) { synchronized (mutex) { - if (opPrimaryTerm > pendingPrimaryTerm) { + if (requirePrimaryTermUpdate(opPrimaryTerm, allowCombineOperationWithPrimaryTermUpdate)) { final IndexShardState shardState = state(); // only roll translog and update primary term if shard has made it past recovery // Having a new primary term here means that the old primary failed and that there is a new primary, which again @@ -2524,6 +2524,10 @@ public void onFailure(Exception e) { operationExecutor.accept(operationListener); } + private boolean requirePrimaryTermUpdate(final long opPrimaryTerm, final boolean allPermits) { + return (opPrimaryTerm > pendingPrimaryTerm) || (allPermits && opPrimaryTerm > operationPrimaryTerm); + } + public int getActiveOperationsCount() { // refCount is incremented on successful acquire and decremented on close return indexShardOperationPermits.getActiveOperationsCount(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index cccc51b8017d3..179ddbcf1b258 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -3596,6 +3596,63 @@ public void testResetEngine() throws Exception { closeShard(shard, false); } + public void testConcurrentAcquireAllReplicaOperationsPermitsWithPrimaryTermUpdate() throws Exception { + final IndexShard replica = newStartedShard(false); + indexOnReplicaWithGaps(replica, between(0, 1000), Math.toIntExact(replica.getLocalCheckpoint())); + + final int operations = scaledRandomIntBetween(10, 64); + final Thread[] threads = new Thread[operations]; + final CyclicBarrier startBarrier = new CyclicBarrier(1 + operations); + final CountDownLatch latch = new CountDownLatch(operations); + + final AtomicArray> opsTerms = new AtomicArray<>(operations); + final long futurePrimaryTerm = replica.getOperationPrimaryTerm() + 1; + + for (int i = 0; i < operations; i++) { + final int threadId = i; + threads[threadId] = new Thread(() -> { + try { + startBarrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + replica.acquireAllReplicaOperationsPermits( + futurePrimaryTerm, + replica.getGlobalCheckpoint(), + replica.getMaxSeqNoOfUpdatesOrDeletes(), + new ActionListener() { + @Override + public void onResponse(final Releasable releasable) { + try (Releasable ignored = releasable) { + opsTerms.set(threadId, Tuple.tuple(replica.getPendingPrimaryTerm(), replica.getOperationPrimaryTerm())); + latch.countDown(); + } + } + + @Override + public void onFailure(final Exception e) { + latch.countDown(); + throw new RuntimeException(e); + } + }, TimeValue.timeValueMinutes(30L)); + }); + threads[threadId].start(); + } + + startBarrier.await(); + latch.await(); + + for (Tuple opTerms : opsTerms.asList()) { + assertNotNull(opTerms); + assertEquals("Expected primary term and pending primary term captured during execution must match", + futurePrimaryTerm, opTerms.v1().longValue()); + assertEquals("Expected primary term and operation primary term captured during execution must match", + futurePrimaryTerm, opTerms.v2().longValue()); + } + closeShard(replica, false); + } + + @Override public Settings threadPoolSettings() { return Settings.builder().put(super.threadPoolSettings()).put("thread_pool.estimated_time_interval", "5ms").build(); From 587e0745e83ef476b747bfd4093e94a5449156f8 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 26 Nov 2018 16:31:29 +0100 Subject: [PATCH 3/5] Adapt Test --- .../index/shard/IndexShardTests.java | 92 ++++++++++--------- 1 file changed, 48 insertions(+), 44 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 179ddbcf1b258..becc2a1ae5c7c 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -3600,59 +3600,63 @@ public void testConcurrentAcquireAllReplicaOperationsPermitsWithPrimaryTermUpdat final IndexShard replica = newStartedShard(false); indexOnReplicaWithGaps(replica, between(0, 1000), Math.toIntExact(replica.getLocalCheckpoint())); - final int operations = scaledRandomIntBetween(10, 64); - final Thread[] threads = new Thread[operations]; - final CyclicBarrier startBarrier = new CyclicBarrier(1 + operations); - final CountDownLatch latch = new CountDownLatch(operations); + final int nbTermUpdates = randomIntBetween(1, 5); - final AtomicArray> opsTerms = new AtomicArray<>(operations); - final long futurePrimaryTerm = replica.getOperationPrimaryTerm() + 1; + for (int i = 0; i < nbTermUpdates; i++) { + long opPrimaryTerm = replica.getOperationPrimaryTerm() + 1; + final long globalCheckpoint = replica.getGlobalCheckpoint(); + final long maxSeqNoOfUpdatesOrDeletes = replica.getMaxSeqNoOfUpdatesOrDeletes(); - for (int i = 0; i < operations; i++) { - final int threadId = i; - threads[threadId] = new Thread(() -> { - try { - startBarrier.await(); - } catch (final BrokenBarrierException | InterruptedException e) { - throw new RuntimeException(e); - } - replica.acquireAllReplicaOperationsPermits( - futurePrimaryTerm, - replica.getGlobalCheckpoint(), - replica.getMaxSeqNoOfUpdatesOrDeletes(), - new ActionListener() { - @Override - public void onResponse(final Releasable releasable) { - try (Releasable ignored = releasable) { - opsTerms.set(threadId, Tuple.tuple(replica.getPendingPrimaryTerm(), replica.getOperationPrimaryTerm())); - latch.countDown(); - } - } + final int operations = scaledRandomIntBetween(10, 64); + final CyclicBarrier barrier = new CyclicBarrier(1 + operations); + final CountDownLatch latch = new CountDownLatch(operations); - @Override - public void onFailure(final Exception e) { - latch.countDown(); - throw new RuntimeException(e); - } - }, TimeValue.timeValueMinutes(30L)); - }); - threads[threadId].start(); - } + final Thread[] threads = new Thread[operations]; + for (int j = 0; j < operations; j++) { + threads[j] = new Thread(() -> { + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + replica.acquireAllReplicaOperationsPermits( + opPrimaryTerm, + globalCheckpoint, + maxSeqNoOfUpdatesOrDeletes, + new ActionListener() { + @Override + public void onResponse(final Releasable releasable) { + try (Releasable ignored = releasable) { + assertThat(replica.getPendingPrimaryTerm(), greaterThanOrEqualTo(opPrimaryTerm)); + assertThat(replica.getOperationPrimaryTerm(), equalTo(opPrimaryTerm)); + } finally { + latch.countDown(); + } + } - startBarrier.await(); - latch.await(); + @Override + public void onFailure(final Exception e) { + try { + throw new RuntimeException(e); + } finally { + latch.countDown(); + } + } + }, TimeValue.timeValueMinutes(30L)); + }); + threads[j].start(); + } + barrier.await(); + latch.await(); - for (Tuple opTerms : opsTerms.asList()) { - assertNotNull(opTerms); - assertEquals("Expected primary term and pending primary term captured during execution must match", - futurePrimaryTerm, opTerms.v1().longValue()); - assertEquals("Expected primary term and operation primary term captured during execution must match", - futurePrimaryTerm, opTerms.v2().longValue()); + for (Thread thread : threads) { + thread.join(); + } } + closeShard(replica, false); } - @Override public Settings threadPoolSettings() { return Settings.builder().put(super.threadPoolSettings()).put("thread_pool.estimated_time_interval", "5ms").build(); From 943d6811820cb36fcc7569d0d0a1f4cd7bf7cc95 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 26 Nov 2018 16:31:29 +0100 Subject: [PATCH 4/5] Apply feedback --- .../elasticsearch/index/shard/IndexShard.java | 90 ++++++++----------- .../index/shard/IndexShardTests.java | 2 +- 2 files changed, 36 insertions(+), 56 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index ad06d8449a06b..118c00bc7cfae 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -48,6 +48,7 @@ import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Booleans; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -548,7 +549,7 @@ public void onFailure(Exception e) { } catch (final AlreadyClosedException e) { // okay, the index was deleted } - }); + }, null); } } // set this last, once we finished updating all internal state. @@ -2316,22 +2317,9 @@ public void acquireAllPrimaryOperationsPermits(final ActionListener indexShardOperationPermits.asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit()); } - @FunctionalInterface - private interface PrimaryTermUpdateListener extends ActionListener { - - void onPrimaryTermUpdate() throws Exception; - - @Override - default void onResponse(final Releasable releasable) { - Releasables.close(releasable); - } - - @Override - default void onFailure(final Exception e) { - } - } - - private void bumpPrimaryTerm(final long newPrimaryTerm, final PrimaryTermUpdateListener listener) { + private void bumpPrimaryTerm(final long newPrimaryTerm, + final CheckedRunnable onBlocked, + @Nullable ActionListener combineWithAction) { assert Thread.holdsLock(mutex); assert newPrimaryTerm >= pendingPrimaryTerm; assert operationPrimaryTerm <= pendingPrimaryTerm; @@ -2339,19 +2327,26 @@ private void bumpPrimaryTerm(final long newPrimaryTerm, final PrimaryTermUpdateL indexShardOperationPermits.asyncBlockOperations(new ActionListener() { @Override public void onFailure(final Exception e) { + try { + innerFail(e); + } finally { + if (combineWithAction != null) { + combineWithAction.onFailure(e); + } + } + } + + private void innerFail(final Exception e) { try { failShard("exception during primary term transition", e); } catch (AlreadyClosedException ace) { // ignore, shard is already closed - } finally { - listener.onFailure(e); } } @Override public void onResponse(final Releasable releasable) { final RunOnce releaseOnce = new RunOnce(releasable::close); - boolean success = false; try { assert operationPrimaryTerm <= pendingPrimaryTerm; termUpdated.await(); @@ -2359,14 +2354,18 @@ public void onResponse(final Releasable releasable) { // in the order submitted. We need to guard against another term bump if (operationPrimaryTerm < newPrimaryTerm) { operationPrimaryTerm = newPrimaryTerm; - listener.onPrimaryTermUpdate(); + onBlocked.run(); } - listener.onResponse(releaseOnce::run); - success = true; } catch (final Exception e) { - onFailure(e); + if (combineWithAction == null) { + // otherwise leave it to combineWithAction to release the permit + releaseOnce.run(); + } + innerFail(e); } finally { - if (success == false) { + if (combineWithAction != null) { + combineWithAction.onResponse(releasable); + } else { releaseOnce.run(); } } @@ -2480,37 +2479,18 @@ public void onFailure(final Exception e) { throw new IndexShardNotStartedException(shardId, shardState); } - bumpPrimaryTerm(opPrimaryTerm, new PrimaryTermUpdateListener() { - @Override - public void onPrimaryTermUpdate() throws Exception { - updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition"); - final long currentGlobalCheckpoint = getGlobalCheckpoint(); - final long maxSeqNo = seqNoStats().getMaxSeqNo(); - logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]", - opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo); - if (currentGlobalCheckpoint < maxSeqNo) { - resetEngineToGlobalCheckpoint(); - } else { - getEngine().rollTranslogGeneration(); - } + bumpPrimaryTerm(opPrimaryTerm, () -> { + updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition"); + final long currentGlobalCheckpoint = getGlobalCheckpoint(); + final long maxSeqNo = seqNoStats().getMaxSeqNo(); + logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]", + opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo); + if (currentGlobalCheckpoint < maxSeqNo) { + resetEngineToGlobalCheckpoint(); + } else { + getEngine().rollTranslogGeneration(); } - - @Override - public void onResponse(final Releasable releasable) { - if (allowCombineOperationWithPrimaryTermUpdate) { - operationListener.onResponse(releasable); - } else { - Releasables.close(releasable); - } - } - - @Override - public void onFailure(Exception e) { - if (allowCombineOperationWithPrimaryTermUpdate) { - operationListener.onFailure(e); - } - } - }); + }, allowCombineOperationWithPrimaryTermUpdate ? operationListener : null); if (allowCombineOperationWithPrimaryTermUpdate) { logger.debug("operation execution has been combined with primary term update"); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index becc2a1ae5c7c..e843650ab5e24 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -3607,7 +3607,7 @@ public void testConcurrentAcquireAllReplicaOperationsPermitsWithPrimaryTermUpdat final long globalCheckpoint = replica.getGlobalCheckpoint(); final long maxSeqNoOfUpdatesOrDeletes = replica.getMaxSeqNoOfUpdatesOrDeletes(); - final int operations = scaledRandomIntBetween(10, 64); + final int operations = scaledRandomIntBetween(5, 32); final CyclicBarrier barrier = new CyclicBarrier(1 + operations); final CountDownLatch latch = new CountDownLatch(operations); From ee152f0d88736a2da36b8a25ed2dc54413ba374f Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 26 Nov 2018 16:31:29 +0100 Subject: [PATCH 5/5] Change assertion --- .../src/main/java/org/elasticsearch/index/shard/IndexShard.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 118c00bc7cfae..fc4875db09a93 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2321,7 +2321,7 @@ private void bumpPrimaryTerm(final long newPrimaryTerm, final CheckedRunnable onBlocked, @Nullable ActionListener combineWithAction) { assert Thread.holdsLock(mutex); - assert newPrimaryTerm >= pendingPrimaryTerm; + assert newPrimaryTerm > pendingPrimaryTerm || (newPrimaryTerm >= pendingPrimaryTerm && combineWithAction != null); assert operationPrimaryTerm <= pendingPrimaryTerm; final CountDownLatch termUpdated = new CountDownLatch(1); indexShardOperationPermits.asyncBlockOperations(new ActionListener() {