diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 6d8c3ea37fcbe..6b87dc3562294 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -60,6 +60,7 @@ import java.util.stream.Collectors; import java.util.stream.LongStream; import java.util.stream.Stream; +import java.util.stream.StreamSupport; /** * This class is responsible for tracking the replication group with its progress and safety markers (local and global checkpoints). @@ -457,18 +458,55 @@ public static String getPeerRecoveryRetentionLeaseId(ShardRouting shardRouting) } /** - * Advance the peer-recovery retention lease for all tracked shard copies, for use in tests until advancing these leases is done - * properly. TODO remove this. + * Advance the peer-recovery retention leases for all assigned shard copies to discard history below the corresponding global + * checkpoint, and renew any leases that are approaching expiry. */ - public synchronized void advancePeerRecoveryRetentionLeasesToGlobalCheckpoints() { + public synchronized void renewPeerRecoveryRetentionLeases() { assert primaryMode; - for (ShardRouting shardRouting : routingTable) { - if (shardRouting.assignedToNode()) { - final CheckpointState checkpointState = checkpoints.get(shardRouting.allocationId().getId()); - renewRetentionLease(getPeerRecoveryRetentionLeaseId(shardRouting), Math.max(0L, checkpointState.globalCheckpoint + 1L), - PEER_RECOVERY_RETENTION_LEASE_SOURCE); + assert invariant(); + + /* + * Peer-recovery retention leases never expire while the associated shard is assigned, but we must still renew them occasionally in + * case the associated shard is temporarily unassigned. However we must not renew them too often, since each renewal must be + * persisted and the resulting IO can be expensive on nodes with large numbers of shards (see #42299). We choose to renew them after + * half the expiry time, so that by default the cluster has at least 6 hours to recover before these leases start to expire. + */ + final long renewalTimeMillis = currentTimeMillisSupplier.getAsLong() - indexSettings.getRetentionLeaseMillis() / 2; + + /* + * If any of the peer-recovery retention leases need renewal, it's a good opportunity to renew them all. + */ + final boolean renewalNeeded = StreamSupport.stream(routingTable.spliterator(), false).filter(ShardRouting::assignedToNode) + .anyMatch(shardRouting -> { + final RetentionLease retentionLease = retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting)); + if (retentionLease == null) { + /* + * If this shard copy is tracked then we got here here via a rolling upgrade from an older version that doesn't + * create peer recovery retention leases for every shard copy. TODO create leases lazily in that situation. + */ + assert checkpoints.get(shardRouting.allocationId().getId()).tracked == false + || indexSettings.getIndexVersionCreated().before(Version.V_8_0_0); + return false; + } + return retentionLease.timestamp() <= renewalTimeMillis + || retentionLease.retainingSequenceNumber() <= checkpoints.get(shardRouting.allocationId().getId()).globalCheckpoint; + }); + + if (renewalNeeded) { + for (ShardRouting shardRouting : routingTable) { + if (shardRouting.assignedToNode()) { + final RetentionLease retentionLease = retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting)); + if (retentionLease != null) { + final CheckpointState checkpointState = checkpoints.get(shardRouting.allocationId().getId()); + renewRetentionLease(getPeerRecoveryRetentionLeaseId(shardRouting), + Math.max(0L, checkpointState.globalCheckpoint + 1L), + PEER_RECOVERY_RETENTION_LEASE_SOURCE); + } + } } } + + assert invariant(); } public static class CheckpointState implements Writeable { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index c9e08dd6a101c..35217d349da92 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2111,6 +2111,7 @@ public void syncRetentionLeases() { assert assertPrimaryMode(); verifyNotClosed(); ensureSoftDeletesEnabled("retention leases"); + replicationTracker.renewPeerRecoveryRetentionLeases(); final Tuple retentionLeases = getRetentionLeases(true); if (retentionLeases.v1()) { logger.trace("syncing retention leases [{}] after expiration check", retentionLeases.v2()); @@ -2502,16 +2503,6 @@ public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener); } - /** - * Test-only method to advance the all shards' peer-recovery retention leases to their tracked global checkpoints so that operations - * can be discarded. TODO Remove this when retention leases are advanced by other mechanisms. - */ - public void advancePeerRecoveryRetentionLeasesToGlobalCheckpoints() { - assert assertPrimaryMode(); - replicationTracker.advancePeerRecoveryRetentionLeasesToGlobalCheckpoints(); - syncRetentionLeases(); - } - class ShardEventListener implements Engine.EventListener { private final CopyOnWriteArrayList> delegates = new CopyOnWriteArrayList<>(); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java index f01e726eb7c8d..f571193cec405 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.test.IndexSettingsModule; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -50,6 +51,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongConsumer; import java.util.stream.Collectors; @@ -61,6 +63,9 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; public class ReplicationTrackerTests extends ReplicationTrackerTestCase { @@ -975,4 +980,158 @@ private static void addPeerRecoveryRetentionLease(final ReplicationTracker track addPeerRecoveryRetentionLease(tracker, AllocationId.newInitializing(allocationId)); } + public void testPeerRecoveryRetentionLeaseCreationAndRenewal() { + + final int numberOfActiveAllocationsIds = randomIntBetween(1, 8); + final int numberOfInitializingIds = randomIntBetween(0, 8); + final Tuple, Set> activeAndInitializingAllocationIds = + randomActiveAndInitializingAllocationIds(numberOfActiveAllocationsIds, numberOfInitializingIds); + final Set activeAllocationIds = activeAndInitializingAllocationIds.v1(); + final Set initializingAllocationIds = activeAndInitializingAllocationIds.v2(); + + final AllocationId primaryId = activeAllocationIds.iterator().next(); + + final long initialClusterStateVersion = randomNonNegativeLong(); + + final AtomicLong currentTimeMillis = new AtomicLong(0L); + final ReplicationTracker tracker = newTracker(primaryId, updatedGlobalCheckpoint::set, currentTimeMillis::get); + + final long retentionLeaseExpiryTimeMillis = tracker.indexSettings().getRetentionLeaseMillis(); + final long peerRecoveryRetentionLeaseRenewalTimeMillis = retentionLeaseExpiryTimeMillis / 2; + + final long maximumTestTimeMillis = 13 * retentionLeaseExpiryTimeMillis; + final long testStartTimeMillis = randomLongBetween(0L, Long.MAX_VALUE - maximumTestTimeMillis); + currentTimeMillis.set(testStartTimeMillis); + + final Function retentionLeaseFromAllocationId = allocationId + -> new RetentionLease(ReplicationTracker.getPeerRecoveryRetentionLeaseId(nodeIdFromAllocationId(allocationId)), + 0L, currentTimeMillis.get(), ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE); + + final List initialLeases = new ArrayList<>(); + if (randomBoolean()) { + initialLeases.add(retentionLeaseFromAllocationId.apply(primaryId)); + } + for (final AllocationId replicaId : initializingAllocationIds) { + if (randomBoolean()) { + initialLeases.add(retentionLeaseFromAllocationId.apply(replicaId)); + } + } + for (int i = randomIntBetween(0, 5); i > 0; i--) { + initialLeases.add(retentionLeaseFromAllocationId.apply(AllocationId.newInitializing())); + } + tracker.updateRetentionLeasesOnReplica(new RetentionLeases(randomNonNegativeLong(), randomNonNegativeLong(), initialLeases)); + + IndexShardRoutingTable routingTable = routingTable(initializingAllocationIds, primaryId); + tracker.updateFromMaster(initialClusterStateVersion, ids(activeAllocationIds), routingTable); + tracker.activatePrimaryMode(NO_OPS_PERFORMED); + assertTrue("primary's retention lease should exist", + tracker.getRetentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(routingTable.primaryShard()))); + + final Consumer assertAsTimePasses = assertion -> { + final long startTime = currentTimeMillis.get(); + while (currentTimeMillis.get() < startTime + retentionLeaseExpiryTimeMillis * 2) { + currentTimeMillis.addAndGet(randomLongBetween(0L, retentionLeaseExpiryTimeMillis * 2)); + tracker.renewPeerRecoveryRetentionLeases(); + tracker.getRetentionLeases(true); + assertion.run(); + } + }; + + assertAsTimePasses.accept(() -> { + // Leases for assigned replicas do not expire + final RetentionLeases retentionLeases = tracker.getRetentionLeases(); + for (final AllocationId replicaId : initializingAllocationIds) { + final String leaseId = retentionLeaseFromAllocationId.apply(replicaId).id(); + assertTrue("should not have removed lease for " + replicaId + " in " + retentionLeases, + initialLeases.stream().noneMatch(l -> l.id().equals(leaseId)) || retentionLeases.contains(leaseId)); + } + }); + + // Leases that don't correspond to assigned replicas, however, are expired by this time. + final Set expectedLeaseIds = Stream.concat(Stream.of(primaryId), initializingAllocationIds.stream()) + .map(allocationId -> retentionLeaseFromAllocationId.apply(allocationId).id()).collect(Collectors.toSet()); + for (final RetentionLease retentionLease : tracker.getRetentionLeases().leases()) { + assertThat(expectedLeaseIds, hasItem(retentionLease.id())); + } + + for (AllocationId replicaId : initializingAllocationIds) { + markAsTrackingAndInSyncQuietly(tracker, replicaId.getId(), NO_OPS_PERFORMED); + } + + assertThat(tracker.getRetentionLeases().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()), + equalTo(expectedLeaseIds)); + + assertAsTimePasses.accept(() -> { + // Leases still don't expire + assertThat(tracker.getRetentionLeases().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()), + equalTo(expectedLeaseIds)); + + // Also leases are renewed before reaching half the expiry time + //noinspection OptionalGetWithoutIsPresent + assertThat(tracker.getRetentionLeases() + " renewed before too long", + tracker.getRetentionLeases().leases().stream().mapToLong(RetentionLease::timestamp).min().getAsLong(), + greaterThanOrEqualTo(currentTimeMillis.get() - peerRecoveryRetentionLeaseRenewalTimeMillis)); + }); + + IndexShardRoutingTable.Builder routingTableBuilder = new IndexShardRoutingTable.Builder(routingTable); + for (ShardRouting replicaShard : routingTable.replicaShards()) { + routingTableBuilder.removeShard(replicaShard); + routingTableBuilder.addShard(replicaShard.moveToStarted()); + } + routingTable = routingTableBuilder.build(); + activeAllocationIds.addAll(initializingAllocationIds); + + tracker.updateFromMaster(initialClusterStateVersion + randomLongBetween(1, 10), ids(activeAllocationIds), routingTable); + + assertAsTimePasses.accept(() -> { + // Leases still don't expire + assertThat(tracker.getRetentionLeases().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()), + equalTo(expectedLeaseIds)); + // ... and any extra peer recovery retention leases are expired immediately since the shard is fully active + tracker.addPeerRecoveryRetentionLease(randomAlphaOfLength(10), randomNonNegativeLong(), ActionListener.wrap(() -> {})); + }); + + tracker.renewPeerRecoveryRetentionLeases(); + assertTrue("expired extra lease", tracker.getRetentionLeases(true).v1()); + + final AllocationId advancingAllocationId + = initializingAllocationIds.isEmpty() || rarely() ? primaryId : randomFrom(initializingAllocationIds); + final String advancingLeaseId = retentionLeaseFromAllocationId.apply(advancingAllocationId).id(); + + final long initialGlobalCheckpoint + = Math.max(NO_OPS_PERFORMED, tracker.getTrackedLocalCheckpointForShard(advancingAllocationId.getId()).globalCheckpoint); + assertThat(tracker.getRetentionLeases().get(advancingLeaseId).retainingSequenceNumber(), equalTo(initialGlobalCheckpoint + 1)); + final long newGlobalCheckpoint = initialGlobalCheckpoint + randomLongBetween(1, 1000); + tracker.updateGlobalCheckpointForShard(advancingAllocationId.getId(), newGlobalCheckpoint); + tracker.renewPeerRecoveryRetentionLeases(); + assertThat("lease was renewed because the shard advanced its global checkpoint", + tracker.getRetentionLeases().get(advancingLeaseId).retainingSequenceNumber(), equalTo(newGlobalCheckpoint + 1)); + + final long initialVersion = tracker.getRetentionLeases().version(); + tracker.renewPeerRecoveryRetentionLeases(); + assertThat("immediate renewal is a no-op", tracker.getRetentionLeases().version(), equalTo(initialVersion)); + + //noinspection OptionalGetWithoutIsPresent + final long millisUntilFirstRenewal + = tracker.getRetentionLeases().leases().stream().mapToLong(RetentionLease::timestamp).min().getAsLong() + + peerRecoveryRetentionLeaseRenewalTimeMillis + - currentTimeMillis.get(); + + if (millisUntilFirstRenewal != 0) { + final long shorterThanRenewalTime = randomLongBetween(0L, millisUntilFirstRenewal - 1); + currentTimeMillis.addAndGet(shorterThanRenewalTime); + tracker.renewPeerRecoveryRetentionLeases(); + assertThat("renewal is a no-op after a short time", tracker.getRetentionLeases().version(), equalTo(initialVersion)); + currentTimeMillis.addAndGet(millisUntilFirstRenewal - shorterThanRenewalTime); + } + + tracker.renewPeerRecoveryRetentionLeases(); + assertThat("renewal happens after a sufficiently long time", tracker.getRetentionLeases().version(), greaterThan(initialVersion)); + assertTrue("all leases were renewed", + tracker.getRetentionLeases().leases().stream().allMatch(l -> l.timestamp() == currentTimeMillis.get())); + + assertThat("test ran for too long, potentially leading to overflow", + currentTimeMillis.get(), lessThanOrEqualTo(testStartTimeMillis + maximumTestTimeMillis)); + } + } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 5b183f59dc02e..d6e5a44588a32 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2925,7 +2925,7 @@ public void testDocStats() throws Exception { indexShard.getLocalCheckpoint()); indexShard.updateGlobalCheckpointForShard(indexShard.routingEntry().allocationId().getId(), indexShard.getLocalCheckpoint()); - indexShard.advancePeerRecoveryRetentionLeasesToGlobalCheckpoints(); + indexShard.syncRetentionLeases(); } else { indexShard.updateGlobalCheckpointOnReplica(newGlobalCheckpoint, "test"); @@ -3524,7 +3524,7 @@ public void testSegmentMemoryTrackedInBreaker() throws Exception { primary.updateGlobalCheckpointForShard( primary.routingEntry().allocationId().getId(), primary.getLastSyncedGlobalCheckpoint()); - primary.advancePeerRecoveryRetentionLeasesToGlobalCheckpoints(); + primary.syncRetentionLeases(); primary.sync(); flushShard(primary); } diff --git a/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java b/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java index cc94b9a5ea4a3..eb0e564e92d33 100644 --- a/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java +++ b/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java @@ -81,7 +81,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.StreamSupport; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -1009,7 +1008,10 @@ private void assertCumulativeQueryCacheStats(IndicesStatsResponse response) { } public void testFilterCacheStats() throws Exception { - Settings settings = Settings.builder().put(indexSettings()).put("number_of_replicas", 0).build(); + Settings settings = Settings.builder().put(indexSettings()) + .put("number_of_replicas", 0) + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "200ms") + .build(); assertAcked(prepareCreate("index").setSettings(settings).get()); indexRandom(false, true, client().prepareIndex("index", "type", "1").setSource("foo", "bar"), @@ -1053,10 +1055,13 @@ public void testFilterCacheStats() throws Exception { // we need to flush and make that commit safe so that the SoftDeletesPolicy can drop everything. if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) { persistGlobalCheckpoint("index"); - internalCluster().nodesInclude("index").stream() - .flatMap(n -> StreamSupport.stream(internalCluster().getInstance(IndicesService.class, n).spliterator(), false)) - .flatMap(n -> StreamSupport.stream(n.spliterator(), false)) - .forEach(IndexShard::advancePeerRecoveryRetentionLeasesToGlobalCheckpoints); + assertBusy(() -> { + for (final ShardStats shardStats : client().admin().indices().prepareStats("index").get().getIndex("index").getShards()) { + final long maxSeqNo = shardStats.getSeqNoStats().getMaxSeqNo(); + assertTrue(shardStats.getRetentionLeaseStats().retentionLeases().leases().stream() + .allMatch(retentionLease -> retentionLease.retainingSequenceNumber() == maxSeqNo + 1)); + } + }); flush("index"); } ForceMergeResponse forceMergeResponse = diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java b/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java index 1f4a35a29c28e..fdb623d1d1e91 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalSettingsPlugin.java @@ -50,6 +50,7 @@ public List> getSettings() { PROVIDED_NAME_SETTING, TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING, IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING, + IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING, IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING ); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java index 6b4cfe20a099f..2f6af23f7ad92 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java @@ -97,20 +97,11 @@ public List> getSettings() { } - public static final class RetentionLeaseSyncIntervalSettingPlugin extends Plugin { - - @Override - public List> getSettings() { - return Collections.singletonList(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING); - } - - } - @Override protected Collection> nodePlugins() { return Stream.concat( super.nodePlugins().stream(), - Stream.of(RetentionLeaseRenewIntervalSettingPlugin.class, RetentionLeaseSyncIntervalSettingPlugin.class)) + Stream.of(RetentionLeaseRenewIntervalSettingPlugin.class)) .collect(Collectors.toList()); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index 4325517e43092..f8c55b93e053e 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -64,11 +64,11 @@ import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.RetentionLeaseActions; -import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.IndicesService; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; @@ -108,7 +108,6 @@ import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; -import java.util.stream.StreamSupport; import static java.util.Collections.singletonMap; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -1147,8 +1146,10 @@ private void runFallBehindTest( final CheckedRunnable afterPausingFollower, final Consumer> exceptionConsumer) throws Exception { final int numberOfPrimaryShards = randomIntBetween(1, 3); - final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), - singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + final Map extraSettingsMap = new HashMap<>(2); + extraSettingsMap.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"); + extraSettingsMap.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "200ms"); + final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), extraSettingsMap); assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); ensureLeaderYellow("index1"); @@ -1179,15 +1180,17 @@ private void runFallBehindTest( leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); } leaderClient().prepareDelete("index1", "doc", "1").get(); - getLeaderCluster().nodesInclude("index1").stream() - .flatMap(n -> StreamSupport.stream(getLeaderCluster().getInstance(IndicesService.class, n).spliterator(), false)) - .flatMap(n -> StreamSupport.stream(n.spliterator(), false)) - .filter(indexShard -> indexShard.shardId().getIndexName().equals("index1")) - .filter(indexShard -> indexShard.routingEntry().primary()) - .forEach(IndexShard::advancePeerRecoveryRetentionLeasesToGlobalCheckpoints); - leaderClient().admin().indices().refresh(new RefreshRequest("index1")).actionGet(); leaderClient().admin().indices().flush(new FlushRequest("index1").force(true)).actionGet(); + assertBusy(() -> { + final ShardStats[] shardsStats = leaderClient().admin().indices().prepareStats("index1").get().getIndex("index1").getShards(); + for (final ShardStats shardStats : shardsStats) { + final long maxSeqNo = shardStats.getSeqNoStats().getMaxSeqNo(); + assertTrue(shardStats.getRetentionLeaseStats().retentionLeases().leases().stream() + .filter(retentionLease -> ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(retentionLease.source())) + .allMatch(retentionLease -> retentionLease.retainingSequenceNumber() == maxSeqNo + 1)); + } + }); ForceMergeRequest forceMergeRequest = new ForceMergeRequest("index1"); forceMergeRequest.maxNumSegments(1); leaderClient().admin().indices().forceMerge(forceMergeRequest).actionGet(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesTests.java index b85c00a635922..16e7a90c3c679 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesTests.java @@ -15,20 +15,20 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.LocalStateCcr; import java.util.Collection; -import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.StreamSupport; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; @@ -38,7 +38,7 @@ public class ShardChangesTests extends ESSingleNodeTestCase { @Override protected Collection> getPlugins() { - return Collections.singleton(LocalStateCcr.class); + return Stream.of(LocalStateCcr.class, InternalSettingsPlugin.class).collect(Collectors.toList()); } // this emulates what the CCR persistent task will do for pulling @@ -105,7 +105,8 @@ public void testMissingOperations() throws Exception { .put("index.soft_deletes.enabled", true) .put("index.soft_deletes.retention.operations", 0) .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 0)) + .put("index.number_of_replicas", 0) + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "200ms")) .get(); for (int i = 0; i < 32; i++) { @@ -114,9 +115,15 @@ public void testMissingOperations() throws Exception { client().admin().indices().flush(new FlushRequest("index").force(true)).actionGet(); } client().admin().indices().refresh(new RefreshRequest("index")).actionGet(); - StreamSupport.stream(getInstanceFromNode(IndicesService.class).spliterator(), false) - .flatMap(n -> StreamSupport.stream(n.spliterator(), false)) - .forEach(IndexShard::advancePeerRecoveryRetentionLeasesToGlobalCheckpoints); + assertBusy(() -> { + final ShardStats[] shardsStats = client().admin().indices().prepareStats("index").get().getIndex("index").getShards(); + for (final ShardStats shardStats : shardsStats) { + final long maxSeqNo = shardStats.getSeqNoStats().getMaxSeqNo(); + assertTrue(shardStats.getRetentionLeaseStats().retentionLeases().leases().stream() + .allMatch(retentionLease -> retentionLease.retainingSequenceNumber() == maxSeqNo + 1)); + } + }); + ForceMergeRequest forceMergeRequest = new ForceMergeRequest("index"); forceMergeRequest.maxNumSegments(1); client().admin().indices().forceMerge(forceMergeRequest).actionGet();