Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Advance PRRLs to match GCP of tracked shards #43751

Original file line number Diff line number Diff line change
Expand Up @@ -457,16 +457,40 @@ public static String getPeerRecoveryRetentionLeaseId(ShardRouting shardRouting)
}

/**
* Advance the peer-recovery retention lease for all tracked shard copies, for use in tests until advancing these leases is done
* properly. TODO remove this.
* Advance the peer-recovery retention leases for all assigned shard copies to discard history below the corresponding global
* checkpoint, and renew any leases that are approaching expiry.
*/
public synchronized void advancePeerRecoveryRetentionLeasesToGlobalCheckpoints() {
public synchronized void renewPeerRecoveryRetentionLeases() {
assert primaryMode;

/*
* Peer-recovery retention leases never expire while the associated shard is assigned, but we must still renew them occasionally in
* case the associated shard is temporarily unassigned. However we must not renew them too often, since each renewal must be
* persisted and the resulting IO can be expensive on nodes with large numbers of shards (see #42299). We choose to renew them after
* half the expiry time, so that by default the cluster has at least 6 hours to recover before these leases start to expire.
*/
final long renewalTimeMillis = currentTimeMillisSupplier.getAsLong() - indexSettings.getRetentionLeaseMillis() / 2;

for (ShardRouting shardRouting : routingTable) {
if (shardRouting.assignedToNode()) {
final CheckpointState checkpointState = checkpoints.get(shardRouting.allocationId().getId());
renewRetentionLease(getPeerRecoveryRetentionLeaseId(shardRouting), Math.max(0L, checkpointState.globalCheckpoint + 1L),
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
final RetentionLease retentionLease = retentionLeases.get(getPeerRecoveryRetentionLeaseId(shardRouting));
if (retentionLease == null) {
if (checkpointState.tracked) {
/*
* BWC: We got here here via a rolling upgrade from an older version that doesn't create peer recovery retention
* leases for every shard copy. TODO create leases lazily
*/
assert indexSettings.getIndexVersionCreated().before(Version.V_8_0_0) : indexSettings.getIndexVersionCreated();
}
} else {
if (retentionLease.retainingSequenceNumber() <= checkpointState.globalCheckpoint
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we only renew if the shard is active? A non-completed recovery attempt should not qualify for a renewal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could but this would mean that when the recovery completes it would be an active shard with a very old lease until the next sync a few minutes later, and a failure in those few minutes could result in its lease expiring. I see no harm in renewing leases for recovering shards too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hadn't thought about that situation. Yes, that would be bad. Perhaps we could delay it until we have checkpointState.tracked though (as we are only able to communicate to the shard itself in that case).

Copy link
Contributor Author

@DaveCTurner DaveCTurner Jul 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels a little unnatural to restrict this even to tracked shards. We establish the lease during peer recovery before initiating tracking, so I think it makes sense to maintain it without regard to the tracking status of the shard.

However I have pushed 400794e to renew all the leases at once - if we're going to push out a new set of leases we may as well advance them all at the same time. This means that assigning a shard could cost one extra retention lease sync, to bring it up to date, and then the renewals will carry on with the usual schedule.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

|| retentionLease.timestamp() <= renewalTimeMillis) {
renewRetentionLease(getPeerRecoveryRetentionLeaseId(shardRouting),
Math.max(0L, checkpointState.globalCheckpoint + 1L),
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2038,6 +2038,7 @@ public void syncRetentionLeases() {
assert assertPrimaryMode();
verifyNotClosed();
ensureSoftDeletesEnabled("retention leases");
replicationTracker.renewPeerRecoveryRetentionLeases();
final Tuple<Boolean, RetentionLeases> retentionLeases = getRetentionLeases(true);
if (retentionLeases.v1()) {
logger.trace("syncing retention leases [{}] after expiration check", retentionLeases.v2());
Expand Down Expand Up @@ -2429,16 +2430,6 @@ public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint,
replicationTracker.addPeerRecoveryRetentionLease(nodeId, globalCheckpoint, listener);
}

/**
* Test-only method to advance the all shards' peer-recovery retention leases to their tracked global checkpoints so that operations
* can be discarded. TODO Remove this when retention leases are advanced by other mechanisms.
*/
public void advancePeerRecoveryRetentionLeasesToGlobalCheckpoints() {
assert assertPrimaryMode();
replicationTracker.advancePeerRecoveryRetentionLeasesToGlobalCheckpoints();
syncRetentionLeases();
}

class ShardEventListener implements Engine.EventListener {
private final CopyOnWriteArrayList<Consumer<ShardFailure>> delegates = new CopyOnWriteArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.test.IndexSettingsModule;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -50,6 +51,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.stream.Collectors;
Expand All @@ -61,6 +63,9 @@
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;

public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
Expand Down Expand Up @@ -975,4 +980,154 @@ private static void addPeerRecoveryRetentionLease(final ReplicationTracker track
addPeerRecoveryRetentionLease(tracker, AllocationId.newInitializing(allocationId));
}

public void testPeerRecoveryRetentionLeaseCreationAndRenewal() {

final int numberOfActiveAllocationsIds = randomIntBetween(1, 8);
final int numberOfInitializingIds = randomIntBetween(0, 8);
final Tuple<Set<AllocationId>, Set<AllocationId>> activeAndInitializingAllocationIds =
randomActiveAndInitializingAllocationIds(numberOfActiveAllocationsIds, numberOfInitializingIds);
final Set<AllocationId> activeAllocationIds = activeAndInitializingAllocationIds.v1();
final Set<AllocationId> initializingAllocationIds = activeAndInitializingAllocationIds.v2();

final AllocationId primaryId = activeAllocationIds.iterator().next();

final long initialClusterStateVersion = randomNonNegativeLong();

final AtomicLong currentTimeMillis = new AtomicLong(0L);
final ReplicationTracker tracker = newTracker(primaryId, updatedGlobalCheckpoint::set, currentTimeMillis::get);

final long retentionLeaseExpiryTimeMillis = tracker.indexSettings().getRetentionLeaseMillis();
final long peerRecoveryRetentionLeaseRenewalTimeMillis = retentionLeaseExpiryTimeMillis / 2;

final long maximumTestTimeMillis = 13 * retentionLeaseExpiryTimeMillis;
final long testStartTimeMillis = randomLongBetween(0L, Long.MAX_VALUE - maximumTestTimeMillis);
currentTimeMillis.set(testStartTimeMillis);

final Function<AllocationId, RetentionLease> retentionLeaseFromAllocationId = allocationId
-> new RetentionLease(ReplicationTracker.getPeerRecoveryRetentionLeaseId(nodeIdFromAllocationId(allocationId)),
0L, currentTimeMillis.get(), ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE);

final List<RetentionLease> initialLeases = new ArrayList<>();
if (randomBoolean()) {
initialLeases.add(retentionLeaseFromAllocationId.apply(primaryId));
}
for (final AllocationId replicaId : initializingAllocationIds) {
if (randomBoolean()) {
initialLeases.add(retentionLeaseFromAllocationId.apply(replicaId));
}
}
for (int i = randomIntBetween(0, 5); i > 0; i--) {
initialLeases.add(retentionLeaseFromAllocationId.apply(AllocationId.newInitializing()));
}
tracker.updateRetentionLeasesOnReplica(new RetentionLeases(randomNonNegativeLong(), randomNonNegativeLong(), initialLeases));

IndexShardRoutingTable routingTable = routingTable(initializingAllocationIds, primaryId);
tracker.updateFromMaster(initialClusterStateVersion, ids(activeAllocationIds), routingTable);
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
assertTrue("primary's retention lease should exist",
tracker.getRetentionLeases().contains(ReplicationTracker.getPeerRecoveryRetentionLeaseId(routingTable.primaryShard())));

final Consumer<Runnable> assertAsTimePasses = assertion -> {
final long startTime = currentTimeMillis.get();
while (currentTimeMillis.get() < startTime + retentionLeaseExpiryTimeMillis * 2) {
currentTimeMillis.addAndGet(randomLongBetween(0L, retentionLeaseExpiryTimeMillis * 2));
tracker.renewPeerRecoveryRetentionLeases();
tracker.getRetentionLeases(true);
assertion.run();
}
};

assertAsTimePasses.accept(() -> {
// Leases for assigned replicas do not expire
final RetentionLeases retentionLeases = tracker.getRetentionLeases();
for (final AllocationId replicaId : initializingAllocationIds) {
final String leaseId = retentionLeaseFromAllocationId.apply(replicaId).id();
assertTrue("should not have removed lease for " + replicaId + " in " + retentionLeases,
initialLeases.stream().noneMatch(l -> l.id().equals(leaseId)) || retentionLeases.contains(leaseId));
}
});

// Leases that don't correspond to assigned replicas, however, are expired by this time.
final Set<String> expectedLeaseIds = Stream.concat(Stream.of(primaryId), initializingAllocationIds.stream())
.map(allocationId -> retentionLeaseFromAllocationId.apply(allocationId).id()).collect(Collectors.toSet());
for (final RetentionLease retentionLease : tracker.getRetentionLeases().leases()) {
assertThat(expectedLeaseIds, hasItem(retentionLease.id()));
}

for (AllocationId replicaId : initializingAllocationIds) {
markAsTrackingAndInSyncQuietly(tracker, replicaId.getId(), NO_OPS_PERFORMED);
}

assertThat(tracker.getRetentionLeases().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()),
equalTo(expectedLeaseIds));

assertAsTimePasses.accept(() -> {
// Leases still don't expire
assertThat(tracker.getRetentionLeases().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()),
equalTo(expectedLeaseIds));

// Also leases are renewed before reaching half the expiry time
//noinspection OptionalGetWithoutIsPresent
assertThat(tracker.getRetentionLeases() + " renewed before too long",
tracker.getRetentionLeases().leases().stream().mapToLong(RetentionLease::timestamp).min().getAsLong(),
greaterThanOrEqualTo(currentTimeMillis.get() - peerRecoveryRetentionLeaseRenewalTimeMillis));
});

IndexShardRoutingTable.Builder routingTableBuilder = new IndexShardRoutingTable.Builder(routingTable);
for (ShardRouting replicaShard : routingTable.replicaShards()) {
routingTableBuilder.removeShard(replicaShard);
routingTableBuilder.addShard(replicaShard.moveToStarted());
}
routingTable = routingTableBuilder.build();
activeAllocationIds.addAll(initializingAllocationIds);

tracker.updateFromMaster(initialClusterStateVersion + randomLongBetween(1, 10), ids(activeAllocationIds), routingTable);

assertAsTimePasses.accept(() -> {
// Leases still don't expire
assertThat(tracker.getRetentionLeases().leases().stream().map(RetentionLease::id).collect(Collectors.toSet()),
equalTo(expectedLeaseIds));
// ... and any extra peer recovery retention leases are expired immediately since the shard is fully active
tracker.addPeerRecoveryRetentionLease(randomAlphaOfLength(10), randomNonNegativeLong(), ActionListener.wrap(() -> {}));
});

tracker.renewPeerRecoveryRetentionLeases();
assertTrue("expired extra lease", tracker.getRetentionLeases(true).v1());


for (RetentionLease retentionLease : tracker.getRetentionLeases().leases()) {
// update all leases' timestamps so they don't need a time-based renewal for a while
tracker.renewRetentionLease(retentionLease.id(), retentionLease.retainingSequenceNumber(), retentionLease.source());
}

final AllocationId advancingAllocationId
= initializingAllocationIds.isEmpty() || rarely() ? primaryId : randomFrom(initializingAllocationIds);
final String advancingLeaseId = retentionLeaseFromAllocationId.apply(advancingAllocationId).id();

final long initialGlobalCheckpoint
= Math.max(NO_OPS_PERFORMED, tracker.getTrackedLocalCheckpointForShard(advancingAllocationId.getId()).globalCheckpoint);
assertThat(tracker.getRetentionLeases().get(advancingLeaseId).retainingSequenceNumber(), equalTo(initialGlobalCheckpoint + 1));
final long newGlobalCheckpoint = initialGlobalCheckpoint + randomLongBetween(1, 1000);
tracker.updateGlobalCheckpointForShard(advancingAllocationId.getId(), newGlobalCheckpoint);
tracker.renewPeerRecoveryRetentionLeases();
assertThat("lease was renewed because the shard advanced its global checkpoint",
tracker.getRetentionLeases().get(advancingLeaseId).retainingSequenceNumber(), equalTo(newGlobalCheckpoint + 1));

final long initialVersion = tracker.getRetentionLeases().version();
tracker.renewPeerRecoveryRetentionLeases();
assertThat("immediate renewal is a no-op", tracker.getRetentionLeases().version(), equalTo(initialVersion));

final long shorterThanRenewalTime = randomLongBetween(0L, peerRecoveryRetentionLeaseRenewalTimeMillis - 1);
currentTimeMillis.addAndGet(shorterThanRenewalTime);
tracker.renewPeerRecoveryRetentionLeases();
assertThat("renewal is a no-op after a short time", tracker.getRetentionLeases().version(), equalTo(initialVersion));

currentTimeMillis.addAndGet(peerRecoveryRetentionLeaseRenewalTimeMillis - shorterThanRenewalTime);
tracker.renewPeerRecoveryRetentionLeases();
assertThat("renewal happens after a sufficiently long time", tracker.getRetentionLeases().version(), greaterThan(initialVersion));

assertThat("test ran for too long, potentially leading to overflow",
currentTimeMillis.get(), lessThanOrEqualTo(testStartTimeMillis + maximumTestTimeMillis));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2954,7 +2954,7 @@ public void testDocStats() throws Exception {
indexShard.getLocalCheckpoint());
indexShard.updateGlobalCheckpointForShard(indexShard.routingEntry().allocationId().getId(),
indexShard.getLocalCheckpoint());
indexShard.advancePeerRecoveryRetentionLeasesToGlobalCheckpoints();
indexShard.syncRetentionLeases();
} else {
indexShard.updateGlobalCheckpointOnReplica(newGlobalCheckpoint, "test");

Expand Down Expand Up @@ -3553,7 +3553,7 @@ public void testSegmentMemoryTrackedInBreaker() throws Exception {
primary.updateGlobalCheckpointForShard(
primary.routingEntry().allocationId().getId(),
primary.getLastSyncedGlobalCheckpoint());
primary.advancePeerRecoveryRetentionLeasesToGlobalCheckpoints();
primary.syncRetentionLeases();
primary.sync();
flushShard(primary);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.StreamSupport;

import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
Expand Down Expand Up @@ -1009,7 +1008,10 @@ private void assertCumulativeQueryCacheStats(IndicesStatsResponse response) {
}

public void testFilterCacheStats() throws Exception {
Settings settings = Settings.builder().put(indexSettings()).put("number_of_replicas", 0).build();
Settings settings = Settings.builder().put(indexSettings())
.put("number_of_replicas", 0)
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "200ms")
.build();
assertAcked(prepareCreate("index").setSettings(settings).get());
indexRandom(false, true,
client().prepareIndex("index", "type", "1").setSource("foo", "bar"),
Expand Down Expand Up @@ -1053,10 +1055,13 @@ public void testFilterCacheStats() throws Exception {
// we need to flush and make that commit safe so that the SoftDeletesPolicy can drop everything.
if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) {
persistGlobalCheckpoint("index");
internalCluster().nodesInclude("index").stream()
.flatMap(n -> StreamSupport.stream(internalCluster().getInstance(IndicesService.class, n).spliterator(), false))
.flatMap(n -> StreamSupport.stream(n.spliterator(), false))
.forEach(IndexShard::advancePeerRecoveryRetentionLeasesToGlobalCheckpoints);
assertBusy(() -> {
for (final ShardStats shardStats : client().admin().indices().prepareStats("index").get().getIndex("index").getShards()) {
final long maxSeqNo = shardStats.getSeqNoStats().getMaxSeqNo();
assertTrue(shardStats.getRetentionLeaseStats().retentionLeases().leases().stream()
.allMatch(retentionLease -> retentionLease.retainingSequenceNumber() == maxSeqNo + 1));
}
});
flush("index");
}
ForceMergeResponse forceMergeResponse =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public List<Setting<?>> getSettings() {
PROVIDED_NAME_SETTING,
TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING,
IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING,
IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING,
IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,11 @@ public List<Setting<?>> getSettings() {

}

public static final class RetentionLeaseSyncIntervalSettingPlugin extends Plugin {

@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING);
}

}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Stream.concat(
super.nodePlugins().stream(),
Stream.of(RetentionLeaseRenewIntervalSettingPlugin.class, RetentionLeaseSyncIntervalSettingPlugin.class))
Stream.of(RetentionLeaseRenewIntervalSettingPlugin.class))
.collect(Collectors.toList());
}

Expand Down
Loading