Skip to content

Commit

Permalink
Track Snapshot Version in RepositoryData (elastic#50930)
Browse files Browse the repository at this point in the history
Add tracking of snapshot versions to RepositoryData to make BwC logic more efficient.
Follow up to elastic#50853
  • Loading branch information
original-brownbear authored and SivagurunathanV committed Jan 21, 2020
1 parent b3501cb commit 94e071d
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,8 @@ private void resetFailedAllocationCounter(RoutingAllocation allocation) {
* @param <T> The list element type.
* @return A comma-separated string of the first few elements.
*/
static <T> String firstListElementsToCommaDelimitedString(List<T> elements, Function<T, String> formatter, boolean isDebugEnabled) {
public static <T> String firstListElementsToCommaDelimitedString(List<T> elements, Function<T, String> formatter,
boolean isDebugEnabled) {
final int maxNumberOfElements = 10;
if (isDebugEnabled || elements.size() <= maxNumberOfElements) {
return elements.stream().map(formatter).collect(Collectors.joining(", "));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public final class RepositoryData {
* An instance initialized for an empty repository.
*/
public static final RepositoryData EMPTY = new RepositoryData(EMPTY_REPO_GEN,
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY);
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY);

/**
* The generational id of the index file from which the repository data was read.
Expand All @@ -92,26 +92,44 @@ public final class RepositoryData {
*/
private final Map<IndexId, Set<SnapshotId>> indexSnapshots;

private final Map<String, Version> snapshotVersions;

/**
* Shard generations.
*/
private final ShardGenerations shardGenerations;

public RepositoryData(long genId, Map<String, SnapshotId> snapshotIds, Map<String, SnapshotState> snapshotStates,
Map<IndexId, Set<SnapshotId>> indexSnapshots, ShardGenerations shardGenerations) {
Map<String, Version> snapshotVersions, Map<IndexId, Set<SnapshotId>> indexSnapshots,
ShardGenerations shardGenerations) {
this.genId = genId;
this.snapshotIds = Collections.unmodifiableMap(snapshotIds);
this.snapshotStates = Collections.unmodifiableMap(snapshotStates);
this.indices = Collections.unmodifiableMap(indexSnapshots.keySet().stream()
.collect(Collectors.toMap(IndexId::getName, Function.identity())));
this.indexSnapshots = Collections.unmodifiableMap(indexSnapshots);
this.shardGenerations = Objects.requireNonNull(shardGenerations);
this.snapshotVersions = snapshotVersions;
assert indices.values().containsAll(shardGenerations.indices()) : "ShardGenerations contained indices "
+ shardGenerations.indices() + " but snapshots only reference indices " + indices.values();
}

protected RepositoryData copy() {
return new RepositoryData(genId, snapshotIds, snapshotStates, indexSnapshots, shardGenerations);
return new RepositoryData(genId, snapshotIds, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations);
}

/**
* Creates a copy of this instance that contains updated version data.
* @param versions map of snapshot versions
* @return copy with updated version data
*/
public RepositoryData withVersions(Map<SnapshotId, Version> versions) {
if (versions.isEmpty()) {
return this;
}
final Map<String, Version> newVersions = new HashMap<>(snapshotVersions);
versions.forEach((id, version) -> newVersions.put(id.getUUID(), version));
return new RepositoryData(genId, snapshotIds, snapshotStates, newVersions, indexSnapshots, shardGenerations);
}

public ShardGenerations shardGenerations() {
Expand Down Expand Up @@ -141,6 +159,14 @@ public SnapshotState getSnapshotState(final SnapshotId snapshotId) {
return snapshotStates.get(snapshotId.getUUID());
}

/**
* Returns the {@link Version} for the given snapshot or {@code null} if unknown.
*/
@Nullable
public Version getVersion(SnapshotId snapshotId) {
return snapshotVersions.get(snapshotId.getUUID());
}

/**
* Returns an unmodifiable map of the index names to {@link IndexId} in the repository.
*/
Expand Down Expand Up @@ -173,6 +199,7 @@ public List<IndexId> indicesToUpdateAfterRemovingSnapshot(SnapshotId snapshotId)
*/
public RepositoryData addSnapshot(final SnapshotId snapshotId,
final SnapshotState snapshotState,
final Version version,
final ShardGenerations shardGenerations) {
if (snapshotIds.containsKey(snapshotId.getUUID())) {
// if the snapshot id already exists in the repository data, it means an old master
Expand All @@ -184,11 +211,13 @@ public RepositoryData addSnapshot(final SnapshotId snapshotId,
snapshots.put(snapshotId.getUUID(), snapshotId);
Map<String, SnapshotState> newSnapshotStates = new HashMap<>(snapshotStates);
newSnapshotStates.put(snapshotId.getUUID(), snapshotState);
Map<String, Version> newSnapshotVersions = new HashMap<>(snapshotVersions);
newSnapshotVersions.put(snapshotId.getUUID(), version);
Map<IndexId, Set<SnapshotId>> allIndexSnapshots = new HashMap<>(indexSnapshots);
for (final IndexId indexId : shardGenerations.indices()) {
allIndexSnapshots.computeIfAbsent(indexId, k -> new LinkedHashSet<>()).add(snapshotId);
}
return new RepositoryData(genId, snapshots, newSnapshotStates, allIndexSnapshots,
return new RepositoryData(genId, snapshots, newSnapshotStates, newSnapshotVersions, allIndexSnapshots,
ShardGenerations.builder().putAll(this.shardGenerations).putAll(shardGenerations).build());
}

Expand All @@ -202,7 +231,7 @@ public RepositoryData withGenId(long newGeneration) {
if (newGeneration == genId) {
return this;
}
return new RepositoryData(newGeneration, this.snapshotIds, this.snapshotStates, this.indexSnapshots, this.shardGenerations);
return new RepositoryData(newGeneration, snapshotIds, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations);
}

/**
Expand All @@ -222,6 +251,8 @@ public RepositoryData removeSnapshot(final SnapshotId snapshotId, final ShardGen
}
Map<String, SnapshotState> newSnapshotStates = new HashMap<>(snapshotStates);
newSnapshotStates.remove(snapshotId.getUUID());
final Map<String, Version> newSnapshotVersions = new HashMap<>(snapshotVersions);
newSnapshotVersions.remove(snapshotId.getUUID());
Map<IndexId, Set<SnapshotId>> indexSnapshots = new HashMap<>();
for (final IndexId indexId : indices.values()) {
Set<SnapshotId> set;
Expand All @@ -241,7 +272,7 @@ public RepositoryData removeSnapshot(final SnapshotId snapshotId, final ShardGen
indexSnapshots.put(indexId, set);
}

return new RepositoryData(genId, newSnapshotIds, newSnapshotStates, indexSnapshots,
return new RepositoryData(genId, newSnapshotIds, newSnapshotStates, newSnapshotVersions, indexSnapshots,
ShardGenerations.builder().putAll(shardGenerations).putAll(updatedShardGenerations)
.retainIndicesAndPruneDeletes(indexSnapshots.keySet()).build()
);
Expand Down Expand Up @@ -269,14 +300,15 @@ public boolean equals(Object obj) {
RepositoryData that = (RepositoryData) obj;
return snapshotIds.equals(that.snapshotIds)
&& snapshotStates.equals(that.snapshotStates)
&& snapshotVersions.equals(that.snapshotVersions)
&& indices.equals(that.indices)
&& indexSnapshots.equals(that.indexSnapshots)
&& shardGenerations.equals(that.shardGenerations);
}

@Override
public int hashCode() {
return Objects.hash(snapshotIds, snapshotStates, indices, indexSnapshots, shardGenerations);
return Objects.hash(snapshotIds, snapshotStates, snapshotVersions, indices, indexSnapshots, shardGenerations);
}

/**
Expand Down Expand Up @@ -323,6 +355,7 @@ public List<IndexId> resolveNewIndices(final List<String> indicesToResolve) {
private static final String NAME = "name";
private static final String UUID = "uuid";
private static final String STATE = "state";
private static final String VERSION = "version";
private static final String MIN_VERSION = "min_version";

/**
Expand All @@ -339,6 +372,9 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final
if (snapshotStates.containsKey(snapshot.getUUID())) {
builder.field(STATE, snapshotStates.get(snapshot.getUUID()).value());
}
if (snapshotVersions.containsKey(snapshot.getUUID())) {
builder.field(VERSION, snapshotVersions.get(snapshot.getUUID()).toString());
}
builder.endObject();
}
builder.endArray();
Expand Down Expand Up @@ -378,6 +414,7 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final
public static RepositoryData snapshotsFromXContent(final XContentParser parser, long genId) throws IOException {
final Map<String, SnapshotId> snapshots = new HashMap<>();
final Map<String, SnapshotState> snapshotStates = new HashMap<>();
final Map<String, Version> snapshotVersions = new HashMap<>();
final Map<IndexId, Set<SnapshotId>> indexSnapshots = new HashMap<>();
final ShardGenerations.Builder shardGenerations = ShardGenerations.builder();

Expand All @@ -390,6 +427,7 @@ public static RepositoryData snapshotsFromXContent(final XContentParser parser,
String name = null;
String uuid = null;
SnapshotState state = null;
Version version = null;
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String currentFieldName = parser.currentName();
parser.nextToken();
Expand All @@ -399,12 +437,17 @@ public static RepositoryData snapshotsFromXContent(final XContentParser parser,
uuid = parser.text();
} else if (STATE.equals(currentFieldName)) {
state = SnapshotState.fromValue(parser.numberValue().byteValue());
} else if (VERSION.equals(currentFieldName)) {
version = Version.fromString(parser.text());
}
}
final SnapshotId snapshotId = new SnapshotId(name, uuid);
if (state != null) {
snapshotStates.put(uuid, state);
}
if (version != null) {
snapshotVersions.put(uuid, version);
}
snapshots.put(snapshotId.getUUID(), snapshotId);
}
} else {
Expand Down Expand Up @@ -488,7 +531,7 @@ public static RepositoryData snapshotsFromXContent(final XContentParser parser,
} else {
throw new ElasticsearchParseException("start object expected");
}
return new RepositoryData(genId, snapshots, snapshotStates, indexSnapshots, shardGenerations.build());
return new RepositoryData(genId, snapshots, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations.build());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.StepListener;
Expand All @@ -46,6 +47,7 @@
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Numbers;
Expand Down Expand Up @@ -121,6 +123,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -873,7 +876,7 @@ public void finalizeSnapshot(final SnapshotId snapshotId,
final SnapshotInfo snapshotInfo = snapshotInfos.iterator().next();
getRepositoryData(ActionListener.wrap(existingRepositoryData -> {
final RepositoryData updatedRepositoryData =
existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), shardGenerations);
existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), Version.CURRENT, shardGenerations);
writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens, ActionListener.wrap(v -> {
if (writeShardGens) {
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
Expand Down Expand Up @@ -1252,8 +1255,42 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}
});

final StepListener<RepositoryData> filterRepositoryDataStep = new StepListener<>();

// Step 2: Write new index-N blob to repository and update index.latest
setPendingStep.whenComplete(newGen -> threadPool().executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
// BwC logic: Load snapshot version information if any snapshot is missing a version in RepositoryData so that the new
// RepositoryData contains a version for every snapshot
final List<SnapshotId> snapshotIdsWithoutVersion = repositoryData.getSnapshotIds().stream().filter(
snapshotId -> repositoryData.getVersion(snapshotId) == null).collect(Collectors.toList());
if (snapshotIdsWithoutVersion.isEmpty() == false) {
final Map<SnapshotId, Version> updatedVersionMap = new ConcurrentHashMap<>();
final GroupedActionListener<Void> loadAllVersionsListener = new GroupedActionListener<>(
ActionListener.runAfter(
new ActionListener<>() {
@Override
public void onResponse(Collection<Void> voids) {
logger.info("Successfully loaded all snapshot's version information for {} from snapshot metadata",
AllocationService.firstListElementsToCommaDelimitedString(
snapshotIdsWithoutVersion, SnapshotId::toString, logger.isDebugEnabled()));
}

@Override
public void onFailure(Exception e) {
logger.warn("Failure when trying to load missing version information from snapshot metadata", e);
}
}, () -> filterRepositoryDataStep.onResponse(repositoryData.withVersions(updatedVersionMap))),
snapshotIdsWithoutVersion.size());
for (SnapshotId snapshotId : snapshotIdsWithoutVersion) {
threadPool().executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(loadAllVersionsListener, () ->
updatedVersionMap.put(snapshotId, getSnapshotInfo(snapshotId).version())));
}
} else {
filterRepositoryDataStep.onResponse(repositoryData);
}
})), listener::onFailure);
filterRepositoryDataStep.whenComplete(filteredRepositoryData -> {
final long newGen = setPendingStep.result();
if (latestKnownRepoGen.get() >= newGen) {
throw new IllegalArgumentException(
"Tried writing generation [" + newGen + "] but repository is at least at generation [" + latestKnownRepoGen.get()
Expand All @@ -1263,7 +1300,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
writeAtomic(indexBlob,
BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true);
BytesReference.bytes(filteredRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true);
// write the current generation to the index-latest file
final BytesReference genBytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
Expand Down Expand Up @@ -1297,13 +1334,13 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void onFailure(String source, Exception e) {
l.onFailure(
listener.onFailure(
new RepositoryException(metadata.name(), "Failed to execute cluster state update [" + source + "]", e));
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(l, () -> {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(listener, () -> {
// Delete all now outdated index files up to 1000 blobs back from the new generation.
// If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them.
// Deleting one older than the current expectedGen is done for BwC reasons as older versions used to keep
Expand All @@ -1320,7 +1357,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}));
}
});
})), listener::onFailure);
}, listener::onFailure);
}

private RepositoryMetaData getRepoMetaData(ClusterState state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,12 @@ public boolean hasOldVersionSnapshots(String repositoryName, RepositoryData repo
} else {
try {
final Repository repository = repositoriesService.repository(repositoryName);
hasOldFormatSnapshots = snapshotIds.stream().map(repository::getSnapshotInfo).anyMatch(
snapshotInfo -> (excluded == null || snapshotInfo.snapshotId().equals(excluded) == false)
&& snapshotInfo.version().before(SHARD_GEN_IN_REPO_DATA_VERSION));
hasOldFormatSnapshots = snapshotIds.stream().filter(snapshotId -> snapshotId.equals(excluded) == false).anyMatch(
snapshotId -> {
final Version known = repositoryData.getVersion(snapshotId);
return (known == null ? repository.getSnapshotInfo(snapshotId).version() : known)
.before(SHARD_GEN_IN_REPO_DATA_VERSION);
});
} catch (SnapshotMissingException e) {
logger.warn("Failed to load snapshot metadata, assuming repository is in old format", e);
return true;
Expand Down
Loading

0 comments on commit 94e071d

Please sign in to comment.