Skip to content

Commit

Permalink
Create missing PRRLs after primary activation
Browse files Browse the repository at this point in the history
Today peer recovery retention leases (PRRLs) are created when starting a
replication group from scratch and during peer recovery. However, if the
replication group was migrated from nodes running a version which does not
create PRRLs (e.g. 7.3 and earlier) then it's possible that the primary was
relocated or promoted without first establishing all the expected leases.

It's not possible to establish these leases before or during primary
activation, so we must create them as soon as possible afterwards. This gives
weaker guarantees about history retention, since there's a possibility that
history will be discarded before it can be used. In practice such situations
are expected to occur only rarely.

This commit adds the machinery to create missing leases after primary
activation, and strengthens the assertions about the existence of such leases
in order to ensure that once all the leases do exist we never again enter a
state where there's a missing lease.

Relates elastic#41536
  • Loading branch information
DaveCTurner committed Jul 5, 2019
1 parent 5dd6c68 commit d233bb0
Show file tree
Hide file tree
Showing 4 changed files with 301 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,16 @@
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.document.RestIndexAction;
import org.elasticsearch.rest.action.document.RestUpdateAction;
Expand All @@ -40,10 +45,12 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
Expand All @@ -53,6 +60,7 @@
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isIn;
Expand Down Expand Up @@ -382,6 +390,113 @@ public void testRecoveryWithSoftDeletes() throws Exception {
ensureGreen(index);
}

public void testRetentionLeasesEstablishedWhenPromotingPrimary() throws Exception {
final String index = "recover_and_create_leases_in_promotion";
if (CLUSTER_TYPE == ClusterType.OLD) {
Settings.Builder settings = Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), between(1, 5))
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), between(1, 2)) // triggers nontrivial promotion
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
.put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
createIndex(index, settings.build());
int numDocs = randomInt(10);
indexDocs(index, 0, numDocs);
if (randomBoolean()) {
client().performRequest(new Request("POST", "/" + index + "/_flush"));
}
}
ensureGreen(index);
if (CLUSTER_TYPE == ClusterType.UPGRADED) {
assertAllCopiesHaveRetentionLeases(index);
}
}

public void testRetentionLeasesEstablishedWhenRelocatingPrimary() throws Exception {
final String index = "recover_and_create_leases_in_relocation";
switch (CLUSTER_TYPE) {
case OLD:
Settings.Builder settings = Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), between(1, 5))
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
.put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
createIndex(index, settings.build());
int numDocs = randomInt(10);
indexDocs(index, 0, numDocs);
if (randomBoolean()) {
client().performRequest(new Request("POST", "/" + index + "/_flush"));
}
ensureGreen(index);
break;

case MIXED:
// trigger a primary relocation by excluding the last old node with a shard filter
final Map<?, ?> nodesMap
= ObjectPath.createFromResponse(client().performRequest(new Request("GET", "/_nodes"))).evaluate("nodes");
final List<String> oldNodeNames = new ArrayList<>();
for (Object nodeDetails : nodesMap.values()) {
final Map<?, ?> nodeDetailsMap = (Map<?, ?>) nodeDetails;
final String versionString = (String) nodeDetailsMap.get("version");
if (versionString.equals(Version.CURRENT.toString()) == false) {
oldNodeNames.add((String) nodeDetailsMap.get("name"));
}
}

if (oldNodeNames.size() == 1) {
final String oldNodeName = oldNodeNames.get(0);
logger.info("--> excluding index [{}] from node [{}]", index, oldNodeName);
final Request putSettingsRequest = new Request("PUT", "/" + index + "/_settings");
putSettingsRequest.setJsonEntity("{\"index.routing.allocation.exclude._name\":\"" + oldNodeName + "\"}");
assertOK(client().performRequest(putSettingsRequest));
ensureGreen(index);
assertAllCopiesHaveRetentionLeases(index);
} else {
ensureGreen(index);
}
break;

case UPGRADED:
ensureGreen(index);
assertAllCopiesHaveRetentionLeases(index);
break;
}
}

private void assertAllCopiesHaveRetentionLeases(String index) throws Exception {
assertBusy(() -> {
final Request statsRequest = new Request("GET", "/" + index + "/_stats");
statsRequest.addParameter("level", "shards");
final Map<?, ?> shardsStats = ObjectPath.createFromResponse(client().performRequest(statsRequest))
.evaluate("indices." + index + ".shards");
for (Map.Entry<?, ?> shardCopiesEntry : shardsStats.entrySet()) {
final List<?> shardCopiesList = (List<?>) shardCopiesEntry.getValue();

final Set<String> expectedLeaseIds = new HashSet<>();
for (Object shardCopyStats : shardCopiesList) {
final String nodeId
= Objects.requireNonNull((String) ((Map<?, ?>) (((Map<?, ?>) shardCopyStats).get("routing"))).get("node"));
expectedLeaseIds.add(ReplicationTracker.getPeerRecoveryRetentionLeaseId(
ShardRouting.newUnassigned(new ShardId("_na_", "test", 0), false, RecoverySource.PeerRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "test")).initialize(nodeId, null, 0L)));
}

final Set<String> actualLeaseIds = new HashSet<>();
for (Object shardCopyStats : shardCopiesList) {
final List<?> leases
= (List<?>) ((Map<?, ?>) (((Map<?, ?>) shardCopyStats).get("retention_leases"))).get("leases");
for (Object lease : leases) {
actualLeaseIds.add(Objects.requireNonNull((String) (((Map<?, ?>) lease).get("id"))));
}
}
assertThat("[" + index + "][" + shardCopiesEntry.getKey() + "] has leases " + actualLeaseIds
+ " but expected " + expectedLeaseIds,
actualLeaseIds, hasItems(expectedLeaseIds.toArray(new String[0])));
}
});
}

/**
* This test creates an index in the non upgraded cluster and closes it. It then checks that the index
* is effectively closed and potentially replicated (if the version the index was created on supports
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.carrotsearch.hppc.ObjectLongMap;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.AllocationId;
Expand Down Expand Up @@ -201,6 +202,14 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
*/
private long persistedRetentionLeasesVersion;

/**
* Whether there should be a peer recovery retention lease (PRRL) for every tracked shard copy. Always true on indices created from
* {@link Version#V_7_4_0} onwards, because these versions create PRRLs properly. May be false on indices created in an earlier version
* if we recently did a rolling upgrade and {@link ReplicationTracker#createMissingPeerRecoveryRetentionLeases(ActionListener)} has not
* yet completed. Is only permitted to change from false to true; can be removed once support for pre-PRRL indices is no longer needed.
*/
private boolean hasAllPeerRecoveryRetentionLeases;

/**
* Get all retention leases tracked on this shard.
*
Expand Down Expand Up @@ -486,10 +495,10 @@ public synchronized void renewPeerRecoveryRetentionLeases() {
if (retentionLease == null) {
/*
* If this shard copy is tracked then we got here here via a rolling upgrade from an older version that doesn't
* create peer recovery retention leases for every shard copy. TODO create leases lazily in that situation.
* create peer recovery retention leases for every shard copy.
*/
assert checkpoints.get(shardRouting.allocationId().getId()).tracked == false
|| indexSettings.getIndexVersionCreated().before(Version.V_7_4_0);
|| hasAllPeerRecoveryRetentionLeases == false;
return false;
}
return retentionLease.timestamp() <= renewalTimeMillis
Expand Down Expand Up @@ -753,7 +762,7 @@ private boolean invariant() {
if (primaryMode
&& indexSettings.isSoftDeleteEnabled()
&& indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN
&& indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0)) {
&& hasAllPeerRecoveryRetentionLeases) {
// all tracked shard copies have a corresponding peer-recovery retention lease
for (final ShardRouting shardRouting : routingTable.assignedShards()) {
if (checkpoints.get(shardRouting.allocationId().getId()).tracked) {
Expand Down Expand Up @@ -820,6 +829,7 @@ public ReplicationTracker(
this.pendingInSync = new HashSet<>();
this.routingTable = null;
this.replicationGroup = null;
this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0);
assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false;
assert invariant();
}
Expand Down Expand Up @@ -914,30 +924,51 @@ public synchronized void activatePrimaryMode(final long localCheckpoint) {
updateGlobalCheckpointOnPrimary();

if (indexSettings.isSoftDeleteEnabled()) {
addPeerRecoveryRetentionLeaseForSolePrimary();
}

assert invariant();
}

/**
* Creates a peer recovery retention lease for this shard, if one does not already exist and this shard is the sole shard copy in the
* replication group. If one does not already exist and yet there are other shard copies in this group then we must have just done
* a rolling upgrade from a version before {@link Version#V_7_4_0}, in which case the missing leases should be created asynchronously
* by the caller using {@link ReplicationTracker#createMissingPeerRecoveryRetentionLeases(ActionListener)}.
*/
private void addPeerRecoveryRetentionLeaseForSolePrimary() {
assert primaryMode;
assert Thread.holdsLock(this);

if (indexSettings().getIndexMetaData().getState() == IndexMetaData.State.OPEN) {
final ShardRouting primaryShard = routingTable.primaryShard();
final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard);
if (retentionLeases.get(leaseId) == null) {
/*
* We might have got here here via a rolling upgrade from an older version that doesn't create peer recovery retention
* leases for every shard copy, but in this case we do not expect any leases to exist.
*/
if (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0)) {
// We are starting up the whole replication group from scratch: if we were not (i.e. this is a replica promotion) then
// this copy must already be in-sync and active and therefore holds a retention lease for itself.
assert routingTable.activeShards().equals(Collections.singletonList(primaryShard)) : routingTable.activeShards();
if (replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard))) {
assert primaryShard.allocationId().getId().equals(shardAllocationId)
: routingTable.activeShards() + " vs " + shardAllocationId;
assert replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard));

: routingTable.assignedShards() + " vs " + shardAllocationId;
// Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication
// group.
logger.trace("addPeerRecoveryRetentionLeaseForSolePrimary: adding lease [{}]", leaseId);
innerAddRetentionLease(leaseId, Math.max(0L, checkpoints.get(shardAllocationId).globalCheckpoint + 1),
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
hasAllPeerRecoveryRetentionLeases = true;
} else {
/*
* We got here here via a rolling upgrade from an older version that doesn't create peer recovery retention
* leases for every shard copy, but in this case we do not expect any leases to exist.
*/
assert hasAllPeerRecoveryRetentionLeases == false : routingTable + " vs " + retentionLeases;
logger.debug("{} becoming primary of {} with missing lease: {}", primaryShard, routingTable, retentionLeases);
}
} else if (hasAllPeerRecoveryRetentionLeases == false && routingTable.assignedShards().stream().allMatch(shardRouting ->
retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting))
|| checkpoints.get(shardRouting.allocationId().getId()).tracked == false)) {
// Although this index is old enough not to have all the expected peer recovery retention leases, in fact it does, so we
// don't need to do any more work.
hasAllPeerRecoveryRetentionLeases = true;
}
}

assert invariant();
}

/**
Expand Down Expand Up @@ -1240,9 +1271,54 @@ public synchronized void activateWithPrimaryContext(PrimaryContext primaryContex
// note that if there was no cluster state update between start of the engine of this shard and the call to
// initializeWithPrimaryContext, we might still have missed a cluster state update. This is best effort.
runAfter.run();

if (indexSettings.isSoftDeleteEnabled()) {
addPeerRecoveryRetentionLeaseForSolePrimary();
}

assert invariant();
}

private synchronized void setHasAllPeerRecoveryRetentionLeases() {
hasAllPeerRecoveryRetentionLeases = true;
assert invariant();
}

/**
* Create any required peer-recovery retention leases that do not currently exist because we just did a rolling upgrade from a version
* prior to {@link Version#V_7_4_0} that does not create peer-recovery retention leases.
*/
public synchronized void createMissingPeerRecoveryRetentionLeases(ActionListener<Void> listener) {
if (hasAllPeerRecoveryRetentionLeases == false) {
final List<ShardRouting> shardRoutings = routingTable.assignedShards();
final GroupedActionListener<ReplicationResponse> groupedActionListener = new GroupedActionListener<>(ActionListener.wrap(vs -> {
setHasAllPeerRecoveryRetentionLeases();
listener.onResponse(null);
}, listener::onFailure), shardRoutings.size());
for (ShardRouting shardRouting : shardRoutings) {
if (retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting))) {
groupedActionListener.onResponse(null);
} else {
final CheckpointState checkpointState = checkpoints.get(shardRouting.allocationId().getId());
if (checkpointState.tracked == false) {
groupedActionListener.onResponse(null);
} else {
logger.trace("createMissingPeerRecoveryRetentionLeases: adding missing lease for {}", shardRouting);
try {
addPeerRecoveryRetentionLease(shardRouting.currentNodeId(),
Math.max(SequenceNumbers.NO_OPS_PERFORMED, checkpointState.globalCheckpoint), groupedActionListener);
} catch (Exception e) {
groupedActionListener.onFailure(e);
}
}
}
}
} else {
logger.trace("createMissingPeerRecoveryRetentionLeases: nothing to do");
listener.onResponse(null);
}
}

private Runnable getMasterUpdateOperationFromCurrentState() {
assert primaryMode == false;
final long lastAppliedClusterStateVersion = appliedClusterStateVersion;
Expand Down
Loading

0 comments on commit d233bb0

Please sign in to comment.