From 1f875b03a2c39974241b3bdd505a6220bfb572d9 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 26 Nov 2018 16:31:29 +0100 Subject: [PATCH] Always try to bump the prim term + add a test --- .../elasticsearch/index/shard/IndexShard.java | 10 +++- .../index/shard/IndexShardTests.java | 57 +++++++++++++++++++ 2 files changed, 64 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d13b2e7acadd9..ad06d8449a06b 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2333,7 +2333,7 @@ default void onFailure(final Exception e) { private void bumpPrimaryTerm(final long newPrimaryTerm, final PrimaryTermUpdateListener listener) { assert Thread.holdsLock(mutex); - assert newPrimaryTerm > pendingPrimaryTerm; + assert newPrimaryTerm >= pendingPrimaryTerm; assert operationPrimaryTerm <= pendingPrimaryTerm; final CountDownLatch termUpdated = new CountDownLatch(1); indexShardOperationPermits.asyncBlockOperations(new ActionListener() { @@ -2467,9 +2467,9 @@ public void onFailure(final Exception e) { } }; - if (opPrimaryTerm > pendingPrimaryTerm) { + if (requirePrimaryTermUpdate(opPrimaryTerm, allowCombineOperationWithPrimaryTermUpdate)) { synchronized (mutex) { - if (opPrimaryTerm > pendingPrimaryTerm) { + if (requirePrimaryTermUpdate(opPrimaryTerm, allowCombineOperationWithPrimaryTermUpdate)) { final IndexShardState shardState = state(); // only roll translog and update primary term if shard has made it past recovery // Having a new primary term here means that the old primary failed and that there is a new primary, which again @@ -2524,6 +2524,10 @@ public void onFailure(Exception e) { operationExecutor.accept(operationListener); } + private boolean requirePrimaryTermUpdate(final long opPrimaryTerm, final boolean allPermits) { + return (opPrimaryTerm > pendingPrimaryTerm) || (allPermits && opPrimaryTerm > operationPrimaryTerm); + } + public int getActiveOperationsCount() { // refCount is incremented on successful acquire and decremented on close return indexShardOperationPermits.getActiveOperationsCount(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index cccc51b8017d3..179ddbcf1b258 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -3596,6 +3596,63 @@ public void testResetEngine() throws Exception { closeShard(shard, false); } + public void testConcurrentAcquireAllReplicaOperationsPermitsWithPrimaryTermUpdate() throws Exception { + final IndexShard replica = newStartedShard(false); + indexOnReplicaWithGaps(replica, between(0, 1000), Math.toIntExact(replica.getLocalCheckpoint())); + + final int operations = scaledRandomIntBetween(10, 64); + final Thread[] threads = new Thread[operations]; + final CyclicBarrier startBarrier = new CyclicBarrier(1 + operations); + final CountDownLatch latch = new CountDownLatch(operations); + + final AtomicArray> opsTerms = new AtomicArray<>(operations); + final long futurePrimaryTerm = replica.getOperationPrimaryTerm() + 1; + + for (int i = 0; i < operations; i++) { + final int threadId = i; + threads[threadId] = new Thread(() -> { + try { + startBarrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + replica.acquireAllReplicaOperationsPermits( + futurePrimaryTerm, + replica.getGlobalCheckpoint(), + replica.getMaxSeqNoOfUpdatesOrDeletes(), + new ActionListener() { + @Override + public void onResponse(final Releasable releasable) { + try (Releasable ignored = releasable) { + opsTerms.set(threadId, Tuple.tuple(replica.getPendingPrimaryTerm(), replica.getOperationPrimaryTerm())); + latch.countDown(); + } + } + + @Override + public void onFailure(final Exception e) { + latch.countDown(); + throw new RuntimeException(e); + } + }, TimeValue.timeValueMinutes(30L)); + }); + threads[threadId].start(); + } + + startBarrier.await(); + latch.await(); + + for (Tuple opTerms : opsTerms.asList()) { + assertNotNull(opTerms); + assertEquals("Expected primary term and pending primary term captured during execution must match", + futurePrimaryTerm, opTerms.v1().longValue()); + assertEquals("Expected primary term and operation primary term captured during execution must match", + futurePrimaryTerm, opTerms.v2().longValue()); + } + closeShard(replica, false); + } + + @Override public Settings threadPoolSettings() { return Settings.builder().put(super.threadPoolSettings()).put("thread_pool.estimated_time_interval", "5ms").build();