Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sequence number based replica allocation #46959

Merged
merged 14 commits into from
Oct 13, 2019

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ public void removePeerRecoveryRetentionLease(String nodeId, ActionListener<Repli
/**
* Id for a peer recovery retention lease for the given node. See {@link ReplicationTracker#addPeerRecoveryRetentionLease}.
*/
static String getPeerRecoveryRetentionLeaseId(String nodeId) {
public static String getPeerRecoveryRetentionLeaseId(String nodeId) {
return "peer_recovery/" + nodeId;
}

Expand All @@ -541,6 +541,15 @@ public static String getPeerRecoveryRetentionLeaseId(ShardRouting shardRouting)
return getPeerRecoveryRetentionLeaseId(shardRouting.currentNodeId());
}

/**
* Returns a list of peer recovery retention leases installed in this replication group
*/
public List<RetentionLease> getPeerRecoveryRetentionLeases() {
return getRetentionLeases().leases().stream()
.filter(lease -> PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(lease.source()))
.collect(Collectors.toUnmodifiableList());
}

/**
* Advance the peer-recovery retention leases for all assigned shard copies to discard history below the corresponding global
* checkpoint, and renew any leases that are approaching expiry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2619,6 +2619,13 @@ public void removePeerRecoveryRetentionLease(String nodeId, ActionListener<Repli
replicationTracker.removePeerRecoveryRetentionLease(nodeId, listener);
}

/**
* Returns a list of retention leases for peer recovery installed in this shard copy.
*/
public List<RetentionLease> getPeerRecoveryRetentionLeases() {
henningandersen marked this conversation as resolved.
Show resolved Hide resolved
return replicationTracker.getPeerRecoveryRetentionLeases();
}

private SafeCommitInfo getSafeCommitInfo() {
final Engine engine = getEngineOrNull();
return engine == null ? SafeCommitInfo.EMPTY : engine.getSafeCommitInfo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
Expand All @@ -43,6 +44,7 @@
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
Expand All @@ -54,6 +56,7 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -119,15 +122,16 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException
IndexShard indexShard = indexService.getShardOrNull(shardId.id());
if (indexShard != null) {
try {
final StoreFilesMetaData storeFilesMetaData = new StoreFilesMetaData(shardId, indexShard.snapshotStoreMetadata());
final StoreFilesMetaData storeFilesMetaData = new StoreFilesMetaData(shardId,
indexShard.snapshotStoreMetadata(), indexShard.getPeerRecoveryRetentionLeases());
exists = true;
return storeFilesMetaData;
} catch (org.apache.lucene.index.IndexNotFoundException e) {
logger.trace(new ParameterizedMessage("[{}] node is missing index, responding with empty", shardId), e);
return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY);
return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList());
} catch (IOException e) {
logger.warn(new ParameterizedMessage("[{}] can't read metadata from store, responding with empty", shardId), e);
return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY);
return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList());
}
}
}
Expand All @@ -142,20 +146,20 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException
}
if (metaData == null) {
logger.trace("{} node doesn't have meta data for the requests index, responding with empty", shardId);
return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY);
return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList());
}
final IndexSettings indexSettings = indexService != null ? indexService.getIndexSettings() :
new IndexSettings(metaData, settings);
final ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, indexSettings);
if (shardPath == null) {
return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY);
return new StoreFilesMetaData(shardId, Store.MetadataSnapshot.EMPTY, Collections.emptyList());
}
// note that this may fail if it can't get access to the shard lock. Since we check above there is an active shard, this means:
// 1) a shard is being constructed, which means the master will not use a copy of this replica
// 2) A shard is shutting down and has not cleared it's content within lock timeout. In this case the master may not
// reuse local resources.
return new StoreFilesMetaData(shardId, Store.readMetadataSnapshot(shardPath.resolveIndex(), shardId,
nodeEnv::shardLock, logger));
nodeEnv::shardLock, logger), Collections.emptyList());
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
} finally {
TimeValue took = new TimeValue(System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS);
if (exists) {
Expand All @@ -167,17 +171,34 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException
}

public static class StoreFilesMetaData implements Iterable<StoreFileMetaData>, Writeable {
private ShardId shardId;
Store.MetadataSnapshot metadataSnapshot;
private final ShardId shardId;
private final Store.MetadataSnapshot metadataSnapshot;
private final List<RetentionLease> peerRecoveryRetentionLeases;

public StoreFilesMetaData(ShardId shardId, Store.MetadataSnapshot metadataSnapshot,
List<RetentionLease> peerRecoveryRetentionLeases) {
this.shardId = shardId;
this.metadataSnapshot = metadataSnapshot;
this.peerRecoveryRetentionLeases = peerRecoveryRetentionLeases;
}

public StoreFilesMetaData(StreamInput in) throws IOException {
this.shardId = new ShardId(in);
this.metadataSnapshot = new Store.MetadataSnapshot(in);
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
this.peerRecoveryRetentionLeases = in.readList(RetentionLease::new);
} else {
this.peerRecoveryRetentionLeases = Collections.emptyList();
}
}

public StoreFilesMetaData(ShardId shardId, Store.MetadataSnapshot metadataSnapshot) {
this.shardId = shardId;
this.metadataSnapshot = metadataSnapshot;
@Override
public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
metadataSnapshot.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeList(peerRecoveryRetentionLeases);
}
}

public ShardId shardId() {
Expand All @@ -201,10 +222,8 @@ public StoreFileMetaData file(String name) {
return metadataSnapshot.asMap().get(name);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
metadataSnapshot.writeTo(out);
public List<RetentionLease> peerRecoveryRetentionLeases() {
return peerRecoveryRetentionLeases;
}

/**
Expand Down
Loading