Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Advance PRRLs to match GCP of tracked shards #43751

Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
* This class is responsible for tracking the replication group with its progress and safety markers (local and global checkpoints).
Expand Down Expand Up @@ -457,18 +458,55 @@ public static String getPeerRecoveryRetentionLeaseId(ShardRouting shardRouting)
}

/**
* Advance the peer-recovery retention lease for all tracked shard copies, for use in tests until advancing these leases is done
* properly. TODO remove this.
* Advance the peer-recovery retention leases for all assigned shard copies to discard history below the corresponding global
* checkpoint, and renew any leases that are approaching expiry.
*/
public synchronized void advancePeerRecoveryRetentionLeasesToGlobalCheckpoints() {
public synchronized void renewPeerRecoveryRetentionLeases() {
assert primaryMode;
for (ShardRouting shardRouting : routingTable) {
if (shardRouting.assignedToNode()) {
final CheckpointState checkpointState = checkpoints.get(shardRouting.allocationId().getId());
renewRetentionLease(getPeerRecoveryRetentionLeaseId(shardRouting), Math.max(0L, checkpointState.globalCheckpoint + 1L),
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
assert invariant();

/*
* Peer-recovery retention leases never expire while the associated shard is assigned, but we must still renew them occasionally in
* case the associated shard is temporarily unassigned. However we must not renew them too often, since each renewal must be
* persisted and the resulting IO can be expensive on nodes with large numbers of shards (see #42299). We choose to renew them after
* half the expiry time, so that by default the cluster has at least 6 hours to recover before these leases start to expire.
*/
final long renewalTimeMillis = currentTimeMillisSupplier.getAsLong() - indexSettings.getRetentionLeaseMillis() / 2;

/*
* If any of the peer-recovery retention leases need renewal, it's a good opportunity to renew them all.
*/
final boolean renewalNeeded = StreamSupport.stream(routingTable.spliterator(), false).filter(ShardRouting::assignedToNode)
.anyMatch(shardRouting -> {
final RetentionLease retentionLease = retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting));
if (retentionLease == null) {
/*
* If this shard copy is tracked then we got here here via a rolling upgrade from an older version that doesn't
* create peer recovery retention leases for every shard copy. TODO create leases lazily in that situation.
*/
assert checkpoints.get(shardRouting.allocationId().getId()).tracked == false
|| indexSettings.getIndexVersionCreated().before(Version.V_8_0_0);
return false;
}
return retentionLease.timestamp() <= renewalTimeMillis
|| retentionLease.retainingSequenceNumber() <= checkpoints.get(shardRouting.allocationId().getId()).globalCheckpoint;
});

if (renewalNeeded) {
for (ShardRouting shardRouting : routingTable) {
if (shardRouting.assignedToNode()) {
final RetentionLease retentionLease = retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting));
if (retentionLease != null) {
final CheckpointState checkpointState = checkpoints.get(shardRouting.allocationId().getId());
renewRetentionLease(getPeerRecoveryRetentionLeaseId(shardRouting),
Math.max(0L, checkpointState.globalCheckpoint + 1L),
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
}
}
}
}

assert invariant();
}

public static class CheckpointState implements Writeable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2111,6 +2111,7 @@ public void syncRetentionLeases() {
assert assertPrimaryMode();
verifyNotClosed();
ensureSoftDeletesEnabled("retention leases");
replicationTracker.renewPeerRecoveryRetentionLeases();
final Tuple<Boolean, RetentionLeases> retentionLeases = getRetentionLeases(true);
if (retentionLeases.v1()) {
logger.trace("syncing retention leases [{}] after expiration check", retentionLeases.v2());
Expand Down Expand Up @@ -2502,16 +2503,6 @@ public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint,
replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener);
}

/**
* Test-only method to advance the all shards' peer-recovery retention leases to their tracked global checkpoints so that operations
* can be discarded. TODO Remove this when retention leases are advanced by other mechanisms.
*/
public void advancePeerRecoveryRetentionLeasesToGlobalCheckpoints() {
assert assertPrimaryMode();
replicationTracker.advancePeerRecoveryRetentionLeasesToGlobalCheckpoints();
syncRetentionLeases();
}

class ShardEventListener implements Engine.EventListener {
private final CopyOnWriteArrayList<Consumer<ShardFailure>> delegates = new CopyOnWriteArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.test.IndexSettingsModule;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -50,6 +51,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.stream.Collectors;
Expand All @@ -61,6 +63,9 @@
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;

public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
Expand Down Expand Up @@ -975,4 +980,158 @@ private static void addPeerRecoveryRetentionLease(final ReplicationTracker track
addPeerRecoveryRetentionLease(tracker, AllocationId.newInitializing(allocationId));
}

public void testPeerRecoveryRetentionLeaseCreationAndRenewal() {

final int numberOfActiveAllocationsIds = randomIntBetween(1, 8);
final int numberOfInitializingIds = randomIntBetween(0, 8);
final Tuple<Set<AllocationId>, Set<AllocationId>> activeAndInitializingAllocationIds =
randomActiveAndInitializingAllocationIds(numberOfActiveAllocationsIds, numberOfInitializingIds);
final Set<AllocationId> activeAllocationIds = activeAndInitializingAllocationIds.v1();
final Set<AllocationId> initializingAllocationIds = activeAndInitializingAllocationIds.v2();

final AllocationId primaryId = activeAllocationIds.iterator().next();

final long initialClusterStateVersion = randomNonNegativeLong();

final AtomicLong currentTimeMillis = new AtomicLong(0L);
final ReplicationTracker tracker = newTracker(primaryId, updatedGlobalCheckpoint::set, currentTimeMillis::get);

final long retentionLeaseExpiryTimeMillis = tracker.indexSettings().getRetentionLeaseMillis();
final long peerRecoveryRetentionLeaseRenewalTimeMillis = retentionLeaseExpiryTimeMillis / 2;

final long maximumTestTimeMillis = 13 * retentionLeaseExpiryTimeMillis;
final long testStartTimeMillis = randomLongBetween(0L, Long.MAX_VALUE - maximumTestTimeMillis);
currentTimeMillis.set(testStartTimeMillis);

final Function<AllocationId, RetentionLease> retentionLeaseFromAllocationId = allocationId
-> new RetentionLease(ReplicationTracker.getPeerRecoveryRetentionLeaseId(nodeIdFromAllocationId(allocationId)),
0L, currentTimeMillis.get(), ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE);

final List<RetentionLease> initialLeases = new ArrayList<>();
if (randomBoolean()) {
initialLeases.add(retentionLeaseFromAllocationId.apply(primaryId));
}
for (final AllocationId replicaId : initializingAllocationIds) {
if (randomBoolean()) {
initialLeases.add(retentionLeaseFromAllocationId.apply(replicaId));
}
}
for (int i = randomIntBetween(0, 5); i > 0; i--) {
initialLeases.add(retentionLeaseFromAllocationId.apply(AllocationId.newInitializing()));
}
tracker.updateRetentionLeasesOnReplica(new RetentionLeases(randomNonNegativeLong(), randomNonNegativeLong(), initialLeases));

IndexShardRoutingTable routingTable = routingTable(initializingAllocationIds, primaryId);
tracker.updateFromMaster(initialClusterStateVersion, ids(activeAllocationIds), routingTable);
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
assertTrue("primary's retention lease should exist",
tracker.getRetentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(routingTable.primaryShard())));

final Consumer<Runnable> assertAsTimePasses = assertion -> {
final long startTime = currentTimeMillis.get();
while (currentTimeMillis.get() < startTime + retentionLeaseExpiryTimeMillis * 2) {
currentTimeMillis.addAndGet(randomLongBetween(0L, retentionLeaseExpiryTimeMillis * 2));
tracker.renewPeerRecoveryRetentionLeases();
tracker.getRetentionLeases(true);
assertion.run();
}
};

assertAsTimePasses.accept(() -> {
// Leases for assigned replicas do not expire
final RetentionLeases retentionLeases = tracker.getRetentionLeases();
for (final AllocationId replicaId : initializingAllocationIds) {
final String leaseId = retentionLeaseFromAllocationId.apply(replicaId).id();
assertTrue("should not have removed lease for " + replicaId + " in " + retentionLeases,
initialLeases.stream().noneMatch(l -> l.id().equals(leaseId)) || retentionLeases.contains(leaseId));
}
});

// Leases that don't correspond to assigned replicas, however, are expired by this time.
final Set<String> expectedLeaseIds = Stream.concat(Stream.of(primaryId), initializingAllocationIds.stream())
.map(allocationId -> retentionLeaseFromAllocationId.apply(allocationId).id()).collect(Collectors.toSet());
for (final RetentionLease retentionLease : tracker.getRetentionLeases().leases()) {
assertThat(expectedLeaseIds, hasItem(retentionLease.id()));
}

for (AllocationId replicaId : initializingAllocationIds) {
markAsTrackingAndInSyncQuietly(tracker, replicaId.getId(), NO_OPS_PERFORMED);
}

assertThat(tracker.getRetentionLeases().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()),
equalTo(expectedLeaseIds));

assertAsTimePasses.accept(() -> {
// Leases still don't expire
assertThat(tracker.getRetentionLeases().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()),
equalTo(expectedLeaseIds));

// Also leases are renewed before reaching half the expiry time
//noinspection OptionalGetWithoutIsPresent
assertThat(tracker.getRetentionLeases() + " renewed before too long",
tracker.getRetentionLeases().leases().stream().mapToLong(RetentionLease::timestamp).min().getAsLong(),
greaterThanOrEqualTo(currentTimeMillis.get() - peerRecoveryRetentionLeaseRenewalTimeMillis));
});

IndexShardRoutingTable.Builder routingTableBuilder = new IndexShardRoutingTable.Builder(routingTable);
for (ShardRouting replicaShard : routingTable.replicaShards()) {
routingTableBuilder.removeShard(replicaShard);
routingTableBuilder.addShard(replicaShard.moveToStarted());
}
routingTable = routingTableBuilder.build();
activeAllocationIds.addAll(initializingAllocationIds);

tracker.updateFromMaster(initialClusterStateVersion + randomLongBetween(1, 10), ids(activeAllocationIds), routingTable);

assertAsTimePasses.accept(() -> {
// Leases still don't expire
assertThat(tracker.getRetentionLeases().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()),
equalTo(expectedLeaseIds));
// ... and any extra peer recovery retention leases are expired immediately since the shard is fully active
tracker.addPeerRecoveryRetentionLease(randomAlphaOfLength(10), randomNonNegativeLong(), ActionListener.wrap(() -> {}));
});

tracker.renewPeerRecoveryRetentionLeases();
assertTrue("expired extra lease", tracker.getRetentionLeases(true).v1());

final AllocationId advancingAllocationId
= initializingAllocationIds.isEmpty() || rarely() ? primaryId : randomFrom(initializingAllocationIds);
final String advancingLeaseId = retentionLeaseFromAllocationId.apply(advancingAllocationId).id();

final long initialGlobalCheckpoint
= Math.max(NO_OPS_PERFORMED, tracker.getTrackedLocalCheckpointForShard(advancingAllocationId.getId()).globalCheckpoint);
assertThat(tracker.getRetentionLeases().get(advancingLeaseId).retainingSequenceNumber(), equalTo(initialGlobalCheckpoint + 1));
final long newGlobalCheckpoint = initialGlobalCheckpoint + randomLongBetween(1, 1000);
tracker.updateGlobalCheckpointForShard(advancingAllocationId.getId(), newGlobalCheckpoint);
tracker.renewPeerRecoveryRetentionLeases();
assertThat("lease was renewed because the shard advanced its global checkpoint",
tracker.getRetentionLeases().get(advancingLeaseId).retainingSequenceNumber(), equalTo(newGlobalCheckpoint + 1));

final long initialVersion = tracker.getRetentionLeases().version();
tracker.renewPeerRecoveryRetentionLeases();
assertThat("immediate renewal is a no-op", tracker.getRetentionLeases().version(), equalTo(initialVersion));

//noinspection OptionalGetWithoutIsPresent
final long millisUntilFirstRenewal
= tracker.getRetentionLeases().leases().stream().mapToLong(RetentionLease::timestamp).min().getAsLong()
+ peerRecoveryRetentionLeaseRenewalTimeMillis
- currentTimeMillis.get();

if (millisUntilFirstRenewal != 0) {
final long shorterThanRenewalTime = randomLongBetween(0L, millisUntilFirstRenewal - 1);
currentTimeMillis.addAndGet(shorterThanRenewalTime);
tracker.renewPeerRecoveryRetentionLeases();
assertThat("renewal is a no-op after a short time", tracker.getRetentionLeases().version(), equalTo(initialVersion));
currentTimeMillis.addAndGet(millisUntilFirstRenewal - shorterThanRenewalTime);
}

tracker.renewPeerRecoveryRetentionLeases();
assertThat("renewal happens after a sufficiently long time", tracker.getRetentionLeases().version(), greaterThan(initialVersion));
assertTrue("all leases were renewed",
tracker.getRetentionLeases().leases().stream().allMatch(l -> l.timestamp() == currentTimeMillis.get()));

assertThat("test ran for too long, potentially leading to overflow",
currentTimeMillis.get(), lessThanOrEqualTo(testStartTimeMillis + maximumTestTimeMillis));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2925,7 +2925,7 @@ public void testDocStats() throws Exception {
indexShard.getLocalCheckpoint());
indexShard.updateGlobalCheckpointForShard(indexShard.routingEntry().allocationId().getId(),
indexShard.getLocalCheckpoint());
indexShard.advancePeerRecoveryRetentionLeasesToGlobalCheckpoints();
indexShard.syncRetentionLeases();
} else {
indexShard.updateGlobalCheckpointOnReplica(newGlobalCheckpoint, "test");

Expand Down Expand Up @@ -3524,7 +3524,7 @@ public void testSegmentMemoryTrackedInBreaker() throws Exception {
primary.updateGlobalCheckpointForShard(
primary.routingEntry().allocationId().getId(),
primary.getLastSyncedGlobalCheckpoint());
primary.advancePeerRecoveryRetentionLeasesToGlobalCheckpoints();
primary.syncRetentionLeases();
primary.sync();
flushShard(primary);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.StreamSupport;

import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
Expand Down Expand Up @@ -1009,7 +1008,10 @@ private void assertCumulativeQueryCacheStats(IndicesStatsResponse response) {
}

public void testFilterCacheStats() throws Exception {
Settings settings = Settings.builder().put(indexSettings()).put("number_of_replicas", 0).build();
Settings settings = Settings.builder().put(indexSettings())
.put("number_of_replicas", 0)
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "200ms")
.build();
assertAcked(prepareCreate("index").setSettings(settings).get());
indexRandom(false, true,
client().prepareIndex("index", "type", "1").setSource("foo", "bar"),
Expand Down Expand Up @@ -1053,10 +1055,13 @@ public void testFilterCacheStats() throws Exception {
// we need to flush and make that commit safe so that the SoftDeletesPolicy can drop everything.
if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) {
persistGlobalCheckpoint("index");
internalCluster().nodesInclude("index").stream()
.flatMap(n -> StreamSupport.stream(internalCluster().getInstance(IndicesService.class, n).spliterator(), false))
.flatMap(n -> StreamSupport.stream(n.spliterator(), false))
.forEach(IndexShard::advancePeerRecoveryRetentionLeasesToGlobalCheckpoints);
assertBusy(() -> {
for (final ShardStats shardStats : client().admin().indices().prepareStats("index").get().getIndex("index").getShards()) {
final long maxSeqNo = shardStats.getSeqNoStats().getMaxSeqNo();
assertTrue(shardStats.getRetentionLeaseStats().retentionLeases().leases().stream()
.allMatch(retentionLease -> retentionLease.retainingSequenceNumber() == maxSeqNo + 1));
}
});
flush("index");
}
ForceMergeResponse forceMergeResponse =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public List<Setting<?>> getSettings() {
PROVIDED_NAME_SETTING,
TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING,
IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING,
IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING,
IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING
);
}
Expand Down
Loading