diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 571d7e208624b..2938e5edb950b 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -335,7 +335,7 @@ class AsyncPrimaryAction extends AbstractRunnable { @Override protected void doRun() throws Exception { final ShardId shardId = request.shardId(); - final IndexShard indexShard = getIndexShard(shardId, targetAllocationID); + final IndexShard indexShard = getIndexShard(shardId); final ShardRouting shardRouting = indexShard.routingEntry(); // we may end up here if the cluster state used to route the primary is so stale that the underlying // index shard was replaced with a replica. For example - in a two node cluster, if the primary fails @@ -609,7 +609,7 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes; final ShardId shardId = request.shardId(); assert shardId != null : "request shardId must be set"; - this.replica = getIndexShard(shardId, targetAllocationID); + this.replica = getIndexShard(shardId); } @Override @@ -719,7 +719,7 @@ public void onFailure(Exception e) { } } - protected IndexShard getIndexShard(final ShardId shardId, final String targetAllocationID) { + protected IndexShard getIndexShard(final ShardId shardId) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); return indexService.getShard(shardId.id()); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 11e001c3fdaf9..05ead45cd128d 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2348,7 +2348,58 @@ public void onResponse(final Releasable releasable) { termUpdated.countDown(); } - private void updatePrimaryTermIfNeeded(final long opPrimaryTerm, final long globalCheckpoint) { + /** + * Acquire a replica operation permit whenever the shard is ready for indexing (see + * {@link #acquirePrimaryOperationPermit(ActionListener, String, Object)}). If the given primary term is lower than then one in + * {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an + * {@link IllegalStateException}. If permit acquisition is delayed, the listener will be invoked on the executor with the specified + * name. + * + * @param opPrimaryTerm the operation primary term + * @param globalCheckpoint the global checkpoint associated with the request + * @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwrite Lucene) or deletes captured on the primary + * after this replication request was executed on it (see {@link #getMaxSeqNoOfUpdatesOrDeletes()} + * @param onPermitAcquired the listener for permit acquisition + * @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed + * @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are + * enabled the tracing will capture the supplied object's {@link Object#toString()} value. + * Otherwise the object isn't used + */ + public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes, + final ActionListener onPermitAcquired, final String executorOnDelay, + final Object debugInfo) { + innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired, + (listener) -> indexShardOperationPermits.acquire(listener, executorOnDelay, true, debugInfo)); + } + + /** + * Acquire all replica operation permits whenever the shard is ready for indexing (see + * {@link #acquireAllPrimaryOperationsPermits(ActionListener, TimeValue)}. If the given primary term is lower than then one in + * {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an + * {@link IllegalStateException}. + * + * @param opPrimaryTerm the operation primary term + * @param globalCheckpoint the global checkpoint associated with the request + * @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwrite Lucene) or deletes captured on the primary + * after this replication request was executed on it (see {@link #getMaxSeqNoOfUpdatesOrDeletes()} + * @param onPermitAcquired the listener for permit acquisition + * @param timeout the maximum time to wait for the in-flight operations block + */ + public void acquireAllReplicaOperationsPermits(final long opPrimaryTerm, + final long globalCheckpoint, + final long maxSeqNoOfUpdatesOrDeletes, + final ActionListener onPermitAcquired, + final TimeValue timeout) { + innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired, + (listener) -> indexShardOperationPermits.asyncBlockOperations(listener, timeout.duration(), timeout.timeUnit())); + } + + private void innerAcquireReplicaOperationPermit(final long opPrimaryTerm, + final long globalCheckpoint, + final long maxSeqNoOfUpdatesOrDeletes, + final ActionListener onPermitAcquired, + final Consumer> consumer) { + verifyNotClosed(); if (opPrimaryTerm > pendingPrimaryTerm) { synchronized (mutex) { if (opPrimaryTerm > pendingPrimaryTerm) { @@ -2381,19 +2432,7 @@ private void updatePrimaryTermIfNeeded(final long opPrimaryTerm, final long glob } assert opPrimaryTerm <= pendingPrimaryTerm : "operation primary term [" + opPrimaryTerm + "] should be at most [" + pendingPrimaryTerm + "]"; - } - - /** - * Creates a new action listener which verifies that the operation primary term is not too old. If the given primary - * term is lower than the current one, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with - * an {@link IllegalStateException}. Otherwise the global checkpoint and the max_seq_no_of_updates marker of the replica are updated - * before the invocation of the {@link ActionListener#onResponse(Object)}} method of the provided listener. - */ - private ActionListener createListener(final ActionListener listener, - final long opPrimaryTerm, - final long globalCheckpoint, - final long maxSeqNoOfUpdatesOrDeletes) { - return new ActionListener() { + consumer.accept(new ActionListener() { @Override public void onResponse(final Releasable releasable) { if (opPrimaryTerm < operationPrimaryTerm) { @@ -2404,81 +2443,26 @@ public void onResponse(final Releasable releasable) { shardId, opPrimaryTerm, operationPrimaryTerm); - listener.onFailure(new IllegalStateException(message)); + onPermitAcquired.onFailure(new IllegalStateException(message)); } else { assert assertReplicationTarget(); try { updateGlobalCheckpointOnReplica(globalCheckpoint, "operation"); advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes); - } catch (final Exception e) { + } catch (Exception e) { releasable.close(); - listener.onFailure(e); + onPermitAcquired.onFailure(e); return; } - listener.onResponse(releasable); + onPermitAcquired.onResponse(releasable); } } @Override public void onFailure(final Exception e) { - listener.onFailure(e); + onPermitAcquired.onFailure(e); } - }; - } - - /** - * Acquire a replica operation permit whenever the shard is ready for indexing (see - * {@link #acquirePrimaryOperationPermit(ActionListener, String, Object)}). If the given primary term is lower than then one in - * {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an - * {@link IllegalStateException}. If permit acquisition is delayed, the listener will be invoked on the executor with the specified - * name. - * - * @param opPrimaryTerm the operation primary term - * @param globalCheckpoint the global checkpoint associated with the request - * @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwrite Lucene) or deletes captured on the primary - * after this replication request was executed on it (see {@link #getMaxSeqNoOfUpdatesOrDeletes()} - * @param onPermitAcquired the listener for permit acquisition - * @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed - * @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are - * enabled the tracing will capture the supplied object's {@link Object#toString()} value. - * Otherwise the object isn't used - */ - public void acquireReplicaOperationPermit(final long opPrimaryTerm, - final long globalCheckpoint, - final long maxSeqNoOfUpdatesOrDeletes, - final ActionListener onPermitAcquired, - final String executorOnDelay, - final Object debugInfo) { - verifyNotClosed(); - updatePrimaryTermIfNeeded(opPrimaryTerm, globalCheckpoint); - - ActionListener listener = createListener(onPermitAcquired, opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes); - indexShardOperationPermits.acquire(listener, executorOnDelay, true, debugInfo); - } - - /** - * Acquire all replica operation permits whenever the shard is ready for indexing (see - * {@link #acquireAllPrimaryOperationsPermits(ActionListener, TimeValue)}. If the given primary term is lower than then one in - * {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an - * {@link IllegalStateException}. - * - * @param opPrimaryTerm the operation primary term - * @param globalCheckpoint the global checkpoint associated with the request - * @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwrite Lucene) or deletes captured on the primary - * after this replication request was executed on it (see {@link #getMaxSeqNoOfUpdatesOrDeletes()} - * @param onPermitAcquired the listener for permit acquisition - * @param timeout the maximum time to wait for the in-flight operations block - */ - public void acquireReplicaAllOperationsPermits(final long opPrimaryTerm, - final long globalCheckpoint, - final long maxSeqNoOfUpdatesOrDeletes, - final ActionListener onPermitAcquired, - final TimeValue timeout) { - verifyNotClosed(); - updatePrimaryTermIfNeeded(opPrimaryTerm, globalCheckpoint); - - ActionListener listener = createListener(onPermitAcquired, opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes); - indexShardOperationPermits.asyncBlockOperations(listener, timeout.duration(), timeout.timeUnit()); + }); } public int getActiveOperationsCount() { diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 0b07b8bba338f..c1991a8f3a17a 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -784,7 +784,7 @@ public void testSeqNoIsSetOnPrimary() throws Exception { new TestAction(Settings.EMPTY, "internal:testSeqNoIsSetOnPrimary", transportService, clusterService, shardStateAction, threadPool) { @Override - protected IndexShard getIndexShard(ShardId shardId, String targetAllocationId) { + protected IndexShard getIndexShard(ShardId shardId) { return shard; } }; diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java index 4f03f0bc6813e..8cad76bcdfe5e 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java @@ -48,7 +48,6 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; @@ -64,7 +63,6 @@ import java.util.HashSet; import java.util.List; import java.util.Objects; -import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.BrokenBarrierException; @@ -88,6 +86,16 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; + +/** + * This test tests the concurrent execution of several transport replication actions. All of these actions (except one) acquire a single + * permit during their execution on shards and are expected to fail if a global level or index level block is present in the cluster state. + * These actions are all started at the same time, but some are delayed until one last action. + * + * This last action is special because it acquires all the permits on shards, adds the block to the cluster state and then "releases" the + * previously delayed single permit actions. This way, there is a clear transition between the single permit actions executed before the + * all permit action that sets the block and those executed afterwards that are doomed to fail because of the block. + */ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTestCase { private ClusterService clusterService; @@ -190,9 +198,8 @@ public void testTransportReplicationActionWithAllPermits() throws Exception { final PlainActionFuture listener = new PlainActionFuture<>(); futures[threadId] = listener; - // An action with blocks which acquires a single operation permit during execution - final TestAction singlePermitAction = new TestAction(Settings.EMPTY, "internal:singlePermitWithBlocks[" + threadId + "]", - transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica, false, Optional.of(globalBlock)); + final TestAction singlePermitAction = new SinglePermitWithBlocksAction(Settings.EMPTY, "internalSinglePermit[" + threadId + "]", + transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica, globalBlock); actions[threadId] = singlePermitAction; Thread thread = new Thread(() -> { @@ -242,8 +249,8 @@ private void assertBlockIsPresentForDelayedOp() { logger.trace("now starting the operation that acquires all permits and sets the block in the cluster state"); // An action which acquires all operation permits during execution and set a block - final TestAction allPermitsAction = new TestAction(Settings.EMPTY, "internal:allPermits", transportService, clusterService, - shardStateAction, threadPool, shardId, primary, replica, true, Optional.empty()); + final TestAction allPermitsAction = new AllPermitsThenBlockAction(Settings.EMPTY, "internalAllPermits", transportService, + clusterService, shardStateAction, threadPool, shardId, primary, replica); final PlainActionFuture allPermitFuture = new PlainActionFuture<>(); Thread thread = new Thread(() -> { @@ -337,30 +344,32 @@ private Request request() { return new Request().setShardId(primary.shardId()); } + /** + * A type of {@link TransportReplicationAction} that allows to use the primary and replica shards passed to the constructor for the + * execution of the replication action. Also records if the operation is executed on the primary and the replica. + */ + private abstract class TestAction extends TransportReplicationAction { - private class TestAction extends TransportReplicationAction { - - private final ShardId shardId; - private final IndexShard primary; - private final IndexShard replica; - private final boolean acquireAllPermits; - private final Optional globalBlock; - private final TimeValue timeout = TimeValue.timeValueSeconds(30L); - - private final SetOnce executedOnPrimary = new SetOnce<>(); - private final SetOnce executedOnReplica = new SetOnce<>(); + protected final ShardId shardId; + protected final IndexShard primary; + protected final IndexShard replica; + protected final SetOnce executedOnPrimary = new SetOnce<>(); + protected final SetOnce executedOnReplica = new SetOnce<>(); TestAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, - ShardStateAction shardStateAction, ThreadPool threadPool, - ShardId shardId, IndexShard primary, IndexShard replica, - boolean acquireAllPermits, Optional globalBlock) { + ShardStateAction shardStateAction, ThreadPool threadPool, ShardId shardId, IndexShard primary, IndexShard replica) { super(settings, actionName, transportService, clusterService, null, threadPool, shardStateAction, new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(), Request::new, Request::new, ThreadPool.Names.SAME); this.shardId = Objects.requireNonNull(shardId); this.primary = Objects.requireNonNull(primary); + assertEquals(shardId, primary.shardId()); this.replica = Objects.requireNonNull(replica); - this.acquireAllPermits = acquireAllPermits; - this.globalBlock = globalBlock; + assertEquals(shardId, replica.shardId()); + } + + @Override + protected Response newResponseInstance() { + return new Response(); } public String getActionName() { @@ -368,40 +377,37 @@ public String getActionName() { } @Override - protected ClusterBlockLevel globalBlockLevel() { - if (globalBlock.isPresent()) { - return globalBlock.get() ? ClusterBlockLevel.WRITE : super.globalBlockLevel(); - } - return null; + protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard shard) throws Exception { + executedOnPrimary.set(true); + // The TransportReplicationAction.getIndexShard() method is overridden for testing purpose but we double check here + // that the permit has been acquired on the primary shard + assertSame(primary, shard); + return new PrimaryResult<>(shardRequest, new Response()); } @Override - protected ClusterBlockLevel indexBlockLevel() { - if (globalBlock.isPresent()) { - return globalBlock.get() == false ? ClusterBlockLevel.WRITE : super.indexBlockLevel(); - } - return null; + protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard shard) throws Exception { + executedOnReplica.set(true); + // The TransportReplicationAction.getIndexShard() method is overridden for testing purpose but we double check here + // that the permit has been acquired on the replica shard + assertSame(replica, shard); + return new ReplicaResult(); } @Override - protected IndexShard getIndexShard(final ShardId indexShardId, final String targetAllocationId) { - if (shardId.equals(indexShardId) == false) { + protected IndexShard getIndexShard(final ShardId shardId) { + if (this.shardId.equals(shardId) == false) { throw new AssertionError("shard id differs from " + shardId); } - if (Objects.equals(primary.routingEntry().allocationId().getId(), targetAllocationId)) { - return primary; - } else if (Objects.equals(replica.routingEntry().allocationId().getId(), targetAllocationId)) { - return replica; - } - throw new ShardNotFoundException(shardId, "something went wrong"); + return (executedOnPrimary.get() == null) ? primary : replica; } @Override protected void sendReplicaRequest(final ConcreteReplicaRequest replicaRequest, final DiscoveryNode node, final ActionListener listener) { - assertEquals(clusterService.state().nodes().get("_node2"), node); - ReplicaOperationTransportHandler replicaOperationTransportHandler = this.new ReplicaOperationTransportHandler(); + assertEquals("Replica is always assigned to node 2 in this test", clusterService.state().nodes().get("_node2"), node); + ReplicaOperationTransportHandler replicaOperationTransportHandler = new ReplicaOperationTransportHandler(); try { replicaOperationTransportHandler.messageReceived(replicaRequest, new TransportChannel() { @Override @@ -428,69 +434,90 @@ public void sendResponse(Exception exception) throws IOException { listener.onFailure(e); } } + } - @Override - protected void acquirePrimaryOperationPermit(IndexShard shard, Request request, ActionListener onAcquired) { - assertTrue(shard.routingEntry().primary()); - assertSame(primary, shard); - if (acquireAllPermits) { - shard.acquireAllPrimaryOperationsPermits(onAcquired, timeout); - } else { - super.acquirePrimaryOperationPermit(shard, request, onAcquired); - } + /** + * A type of {@link TransportReplicationAction} that acquires a single permit during execution and that blocks + * on {@link ClusterBlockLevel#WRITE}. The block can be a global level or an index level block depending of the + * value of the {@code globalBlock} parameter in the constructor. When the operation is executed on shards it + * verifies that at least 1 permit is acquired and that there is no blocks in the cluster state. + */ + private class SinglePermitWithBlocksAction extends TestAction { + + private final boolean globalBlock; + + SinglePermitWithBlocksAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, + ShardStateAction shardStateAction, ThreadPool threadPool, + ShardId shardId, IndexShard primary, IndexShard replica, boolean globalBlock) { + super(settings, actionName, transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica); + this.globalBlock = globalBlock; } @Override - protected void acquireReplicaOperationPermit(IndexShard shard, Request request, ActionListener onAcquired, - long primaryTerm, long globalCheckpoint, long maxSeqNo) { - assertFalse(shard.routingEntry().primary()); - assertSame(replica, shard); - if (acquireAllPermits) { - shard.acquireReplicaAllOperationsPermits(primaryTerm, globalCheckpoint, maxSeqNo, onAcquired, timeout); - } else { - super.acquireReplicaOperationPermit(shard, request, onAcquired, primaryTerm, globalCheckpoint, maxSeqNo); - } + protected ClusterBlockLevel globalBlockLevel() { + return globalBlock ? ClusterBlockLevel.WRITE : super.globalBlockLevel(); } @Override - protected Response newResponseInstance() { - return new Response(); + protected ClusterBlockLevel indexBlockLevel() { + return globalBlock == false ? ClusterBlockLevel.WRITE : super.indexBlockLevel(); } @Override protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard shard) throws Exception { - assertSame(primary, shard); - if (acquireAllPermits) { - assertEquals("All permits must be acquired", 0, shard.getActiveOperationsCount()); - } else { - assertThat(shard.getActiveOperationsCount(), greaterThan(0)); - } - assertNoBlockOnSinglePermitOps(); - executedOnPrimary.set(true); - return new PrimaryResult<>(shardRequest, new Response()); + assertNoBlocks("block must not exist when executing the operation on primary shard: it should have been blocked before"); + assertThat(shard.getActiveOperationsCount(), greaterThan(0)); + return super.shardOperationOnPrimary(shardRequest, shard); } @Override protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard shard) throws Exception { - assertSame(replica, shard); - if (acquireAllPermits) { - assertEquals("All permits must be acquired", 0, shard.getActiveOperationsCount()); - } else { - assertThat(shard.getActiveOperationsCount(), greaterThan(0)); - } - assertNoBlockOnSinglePermitOps(); - executedOnReplica.set(true); - return new ReplicaResult(); + assertNoBlocks("block must not exist when executing the operation on replica shard: it should have been blocked before"); + assertThat(shard.getActiveOperationsCount(), greaterThan(0)); + return super.shardOperationOnReplica(shardRequest, shard); } - private void assertNoBlockOnSinglePermitOps() { - // When a single permit operation is executed on primary/replica shard we must be sure that the block is not here, - // otherwise something went wrong. - if (acquireAllPermits == false) { - final ClusterState clusterState = clusterService.state(); - assertFalse("Global block must not exist", clusterState.blocks().hasGlobalBlock(block)); - assertFalse("Index block must not exist", clusterState.blocks().hasIndexBlock(shardId.getIndexName(), block)); - } + private void assertNoBlocks(final String error) { + final ClusterState clusterState = clusterService.state(); + assertFalse("Global level " + error, clusterState.blocks().hasGlobalBlock(block)); + assertFalse("Index level " + error, clusterState.blocks().hasIndexBlock(shardId.getIndexName(), block)); + } + } + + /** + * A type of {@link TransportReplicationAction} that acquires all permits during execution. + */ + private class AllPermitsThenBlockAction extends TestAction { + + private final TimeValue timeout = TimeValue.timeValueSeconds(30L); + + AllPermitsThenBlockAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, + ShardStateAction shardStateAction, ThreadPool threadPool, + ShardId shardId, IndexShard primary, IndexShard replica) { + super(settings, actionName, transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica); + } + + @Override + protected void acquirePrimaryOperationPermit(IndexShard shard, Request request, ActionListener onAcquired) { + shard.acquireAllPrimaryOperationsPermits(onAcquired, timeout); + } + + @Override + protected void acquireReplicaOperationPermit(IndexShard shard, Request request, ActionListener onAcquired, + long primaryTerm, long globalCheckpoint, long maxSeqNo) { + shard.acquireAllReplicaOperationsPermits(primaryTerm, globalCheckpoint, maxSeqNo, onAcquired, timeout); + } + + @Override + protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard shard) throws Exception { + assertEquals("All permits must be acquired", 0, shard.getActiveOperationsCount()); + return super.shardOperationOnPrimary(shardRequest, shard); + } + + @Override + protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard shard) throws Exception { + assertEquals("All permits must be acquired", 0, shard.getActiveOperationsCount()); + return super.shardOperationOnReplica(shardRequest, shard); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 3c8f7c9eab712..1baa61e144b73 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -318,7 +318,7 @@ public void testClosesPreventsNewOperations() throws Exception { () -> indexShard.acquireReplicaOperationPermit(indexShard.getPendingPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, randomNonNegativeLong(), null, ThreadPool.Names.WRITE, "")); expectThrows(IndexShardClosedException.class, - () -> indexShard.acquireReplicaAllOperationsPermits(indexShard.getPendingPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, + () -> indexShard.acquireAllReplicaOperationsPermits(indexShard.getPendingPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO, randomNonNegativeLong(), null, TimeValue.timeValueSeconds(30L))); } @@ -782,7 +782,7 @@ public void testOperationPermitOnReplicaShards() throws Exception { e = expectThrows(AssertionError.class, () -> indexShard.acquireAllPrimaryOperationsPermits(null, TimeValue.timeValueSeconds(30L))); - assertThat(e, hasToString(containsString("acquirePrimaryAllOperationsPermits should only be called on primary shard"))); + assertThat(e, hasToString(containsString("acquireAllPrimaryOperationsPermits should only be called on primary shard"))); } final long primaryTerm = indexShard.getPendingPrimaryTerm(); @@ -3604,7 +3604,7 @@ public Settings threadPoolSettings() { /** * Randomizes the usage of {@link IndexShard#acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)} and - * {@link IndexShard#acquireReplicaAllOperationsPermits(long, long, long, ActionListener, TimeValue)} in order to acquire a permit. + * {@link IndexShard#acquireAllReplicaOperationsPermits(long, long, long, ActionListener, TimeValue)} in order to acquire a permit. */ private void randomReplicaOperationPermitAcquisition(final IndexShard indexShard, final long opPrimaryTerm, @@ -3617,7 +3617,7 @@ private void randomReplicaOperationPermitAcquisition(final IndexShard indexShard indexShard.acquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, listener, executor, info); } else { final TimeValue timeout = TimeValue.timeValueSeconds(30L); - indexShard.acquireReplicaAllOperationsPermits(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, listener, timeout); + indexShard.acquireAllReplicaOperationsPermits(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, listener, timeout); } } }