Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track Snapshot Version in RepositoryData #50930

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public final class RepositoryData {
* An instance initialized for an empty repository.
*/
public static final RepositoryData EMPTY = new RepositoryData(EMPTY_REPO_GEN,
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY);
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY);

/**
* The generational id of the index file from which the repository data was read.
Expand All @@ -92,26 +92,44 @@ public final class RepositoryData {
*/
private final Map<IndexId, Set<SnapshotId>> indexSnapshots;

private final Map<String, Version> snapshotVersions;

/**
* Shard generations.
*/
private final ShardGenerations shardGenerations;

public RepositoryData(long genId, Map<String, SnapshotId> snapshotIds, Map<String, SnapshotState> snapshotStates,
Map<IndexId, Set<SnapshotId>> indexSnapshots, ShardGenerations shardGenerations) {
Map<String, Version> snapshotVersions, Map<IndexId, Set<SnapshotId>> indexSnapshots,
ShardGenerations shardGenerations) {
this.genId = genId;
this.snapshotIds = Collections.unmodifiableMap(snapshotIds);
this.snapshotStates = Collections.unmodifiableMap(snapshotStates);
this.indices = Collections.unmodifiableMap(indexSnapshots.keySet().stream()
.collect(Collectors.toMap(IndexId::getName, Function.identity())));
this.indexSnapshots = Collections.unmodifiableMap(indexSnapshots);
this.shardGenerations = Objects.requireNonNull(shardGenerations);
this.snapshotVersions = snapshotVersions;
assert indices.values().containsAll(shardGenerations.indices()) : "ShardGenerations contained indices "
+ shardGenerations.indices() + " but snapshots only reference indices " + indices.values();
}

protected RepositoryData copy() {
return new RepositoryData(genId, snapshotIds, snapshotStates, indexSnapshots, shardGenerations);
return new RepositoryData(genId, snapshotIds, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations);
}

/**
* Creates a copy of this instance that contains updated version data.
* @param versions map of snapshot versions
* @return copy with updated version data
*/
public RepositoryData withVersions(Map<SnapshotId, Version> versions) {
if (versions.isEmpty()) {
return this;
}
final Map<String, Version> newVersions = new HashMap<>(snapshotVersions);
versions.forEach((id, version) -> newVersions.put(id.getUUID(), version));
return new RepositoryData(genId, snapshotIds, snapshotStates, newVersions, indexSnapshots, shardGenerations);
}

public ShardGenerations shardGenerations() {
Expand Down Expand Up @@ -141,6 +159,14 @@ public SnapshotState getSnapshotState(final SnapshotId snapshotId) {
return snapshotStates.get(snapshotId.getUUID());
}

/**
* Returns the {@link Version} for the given snapshot or {@code null} if unknown.
*/
@Nullable
public Version getVersion(SnapshotId snapshotId) {
return snapshotVersions.get(snapshotId.getUUID());
}

/**
* Returns an unmodifiable map of the index names to {@link IndexId} in the repository.
*/
Expand Down Expand Up @@ -173,6 +199,7 @@ public List<IndexId> indicesToUpdateAfterRemovingSnapshot(SnapshotId snapshotId)
*/
public RepositoryData addSnapshot(final SnapshotId snapshotId,
final SnapshotState snapshotState,
final Version version,
final ShardGenerations shardGenerations) {
if (snapshotIds.containsKey(snapshotId.getUUID())) {
// if the snapshot id already exists in the repository data, it means an old master
Expand All @@ -184,11 +211,13 @@ public RepositoryData addSnapshot(final SnapshotId snapshotId,
snapshots.put(snapshotId.getUUID(), snapshotId);
Map<String, SnapshotState> newSnapshotStates = new HashMap<>(snapshotStates);
newSnapshotStates.put(snapshotId.getUUID(), snapshotState);
Map<String, Version> newSnapshotVersions = new HashMap<>(snapshotVersions);
newSnapshotVersions.put(snapshotId.getUUID(), version);
Map<IndexId, Set<SnapshotId>> allIndexSnapshots = new HashMap<>(indexSnapshots);
for (final IndexId indexId : shardGenerations.indices()) {
allIndexSnapshots.computeIfAbsent(indexId, k -> new LinkedHashSet<>()).add(snapshotId);
}
return new RepositoryData(genId, snapshots, newSnapshotStates, allIndexSnapshots,
return new RepositoryData(genId, snapshots, newSnapshotStates, newSnapshotVersions, allIndexSnapshots,
ShardGenerations.builder().putAll(this.shardGenerations).putAll(shardGenerations).build());
}

Expand All @@ -202,7 +231,7 @@ public RepositoryData withGenId(long newGeneration) {
if (newGeneration == genId) {
return this;
}
return new RepositoryData(newGeneration, this.snapshotIds, this.snapshotStates, this.indexSnapshots, this.shardGenerations);
return new RepositoryData(newGeneration, snapshotIds, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations);
}

/**
Expand All @@ -222,6 +251,8 @@ public RepositoryData removeSnapshot(final SnapshotId snapshotId, final ShardGen
}
Map<String, SnapshotState> newSnapshotStates = new HashMap<>(snapshotStates);
newSnapshotStates.remove(snapshotId.getUUID());
final Map<String, Version> newSnapshotVersions = new HashMap<>(snapshotVersions);
newSnapshotVersions.remove(snapshotId.getUUID());
Map<IndexId, Set<SnapshotId>> indexSnapshots = new HashMap<>();
for (final IndexId indexId : indices.values()) {
Set<SnapshotId> set;
Expand All @@ -241,7 +272,7 @@ public RepositoryData removeSnapshot(final SnapshotId snapshotId, final ShardGen
indexSnapshots.put(indexId, set);
}

return new RepositoryData(genId, newSnapshotIds, newSnapshotStates, indexSnapshots,
return new RepositoryData(genId, newSnapshotIds, newSnapshotStates, newSnapshotVersions, indexSnapshots,
ShardGenerations.builder().putAll(shardGenerations).putAll(updatedShardGenerations)
.retainIndicesAndPruneDeletes(indexSnapshots.keySet()).build()
);
Expand Down Expand Up @@ -269,14 +300,15 @@ public boolean equals(Object obj) {
RepositoryData that = (RepositoryData) obj;
return snapshotIds.equals(that.snapshotIds)
&& snapshotStates.equals(that.snapshotStates)
&& snapshotVersions.equals(that.snapshotVersions)
&& indices.equals(that.indices)
&& indexSnapshots.equals(that.indexSnapshots)
&& shardGenerations.equals(that.shardGenerations);
}

@Override
public int hashCode() {
return Objects.hash(snapshotIds, snapshotStates, indices, indexSnapshots, shardGenerations);
return Objects.hash(snapshotIds, snapshotStates, snapshotVersions, indices, indexSnapshots, shardGenerations);
}

/**
Expand Down Expand Up @@ -323,6 +355,7 @@ public List<IndexId> resolveNewIndices(final List<String> indicesToResolve) {
private static final String NAME = "name";
private static final String UUID = "uuid";
private static final String STATE = "state";
private static final String VERSION = "version";
private static final String MIN_VERSION = "min_version";

/**
Expand All @@ -339,6 +372,9 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final
if (snapshotStates.containsKey(snapshot.getUUID())) {
builder.field(STATE, snapshotStates.get(snapshot.getUUID()).value());
}
if (snapshotVersions.containsKey(snapshot.getUUID())) {
builder.field(VERSION, snapshotVersions.get(snapshot.getUUID()).toString());
}
builder.endObject();
}
builder.endArray();
Expand Down Expand Up @@ -378,6 +414,7 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final
public static RepositoryData snapshotsFromXContent(final XContentParser parser, long genId) throws IOException {
final Map<String, SnapshotId> snapshots = new HashMap<>();
final Map<String, SnapshotState> snapshotStates = new HashMap<>();
final Map<String, Version> snapshotVersions = new HashMap<>();
final Map<IndexId, Set<SnapshotId>> indexSnapshots = new HashMap<>();
final ShardGenerations.Builder shardGenerations = ShardGenerations.builder();

Expand All @@ -390,6 +427,7 @@ public static RepositoryData snapshotsFromXContent(final XContentParser parser,
String name = null;
String uuid = null;
SnapshotState state = null;
Version version = null;
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String currentFieldName = parser.currentName();
parser.nextToken();
Expand All @@ -399,12 +437,17 @@ public static RepositoryData snapshotsFromXContent(final XContentParser parser,
uuid = parser.text();
} else if (STATE.equals(currentFieldName)) {
state = SnapshotState.fromValue(parser.numberValue().byteValue());
} else if (VERSION.equals(currentFieldName)) {
version = Version.fromString(parser.text());
}
}
final SnapshotId snapshotId = new SnapshotId(name, uuid);
if (state != null) {
snapshotStates.put(uuid, state);
}
if (version != null) {
snapshotVersions.put(uuid, version);
}
snapshots.put(snapshotId.getUUID(), snapshotId);
}
} else {
Expand Down Expand Up @@ -488,7 +531,7 @@ public static RepositoryData snapshotsFromXContent(final XContentParser parser,
} else {
throw new ElasticsearchParseException("start object expected");
}
return new RepositoryData(genId, snapshots, snapshotStates, indexSnapshots, shardGenerations.build());
return new RepositoryData(genId, snapshots, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations.build());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.StepListener;
Expand Down Expand Up @@ -121,6 +122,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -873,7 +875,7 @@ public void finalizeSnapshot(final SnapshotId snapshotId,
final SnapshotInfo snapshotInfo = snapshotInfos.iterator().next();
getRepositoryData(ActionListener.wrap(existingRepositoryData -> {
final RepositoryData updatedRepositoryData =
existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), shardGenerations);
existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), Version.CURRENT, shardGenerations);
writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens, ActionListener.wrap(v -> {
if (writeShardGens) {
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
Expand Down Expand Up @@ -1252,8 +1254,41 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}
});

final StepListener<RepositoryData> filterRepositoryDataStep = new StepListener<>();

// Step 2: Write new index-N blob to repository and update index.latest
setPendingStep.whenComplete(newGen -> threadPool().executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
// BwC logic: Load snapshot version information if any snapshot is missing a version in RepositoryData so that the new
// RepositoryData contains a version for every snapshot
final List<SnapshotId> snapshotIdsWithoutVersion = repositoryData.getSnapshotIds().stream().filter(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is somewhat annoying that we have to load all SnapshotInfo here one-off but I think it's still the best solution available.
Ideally, I was hoping to be able to just work from the assumption that if no version is set for a snapshot then it must be from before 8.0/7.6. But I gave up on that idea since it breaks as soon as some older version cluster (for whatever reason), takes a snapshot and removes all the version fields when writing out a new index-N (maybe that's the wrong trade off though, glad to hear opinions on this :) ... obviously you could argue that loading all the SnapshotInfo is too high a price to pay just so the repo can move to the new metadata version earlier, but IMO even on S3 etc. loading 100 snapshots or so won't take all that long or cost any meaningful amount of $$$).

snapshotId -> repositoryData.getVersion(snapshotId) == null).collect(Collectors.toList());
if (snapshotIdsWithoutVersion.isEmpty() == false) {
final Map<SnapshotId, Version> updatedVersionMap = new ConcurrentHashMap<>();
final GroupedActionListener<Void> loadAllVersionsListener = new GroupedActionListener<>(
ActionListener.runAfter(
new ActionListener<>() {
@Override
public void onResponse(Collection<Void> voids) {
logger.info("Successfully loaded all snapshot's version information for {} from snapshot metadata",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

info IMO since it's a one time thing and it would be nice to have some marker of the "upgrade" in the logs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that snapshotIdsWithoutVersion can be very long. I was wondering if we should just display the size of snapshotIdsWithoutVersion here in the info logging.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be nice to have the concrete ids of the snapshots to help debug situations where users might be writing to the repo from various ES versions (that's my main motivation to have this). I figured the list isn't going to be so long that it would create real issues due to the log line length so worst case it's a bit of an annoyance right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be nice to have the concrete ids of the snapshots to help debug situations where users might be writing to the repo from various ES versions (that's my main motivation to have this)

Why do the exact names and versions matter in that case? Isn't it sufficient to know that some snapshots got version-tagged that previously were not?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Versions I don't care about much and the exact names aren't important either, but it would be nice to see if the list of snapshots changed if this gets logged repeatedly.
Especially on Cloud just having just the size which might be effectively constant over time seems like it could create an annoying situation when debugging :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps use firstListElementsToCommaDelimitedString then (see AllocationService)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah nice thanks => reused in 02e8c4e (can probably move that method to a more appropriate place in a follow up)

snapshotIdsWithoutVersion);
}

@Override
public void onFailure(Exception e) {
logger.warn("Failure when trying to load missing version information from snapshot metadata", e);
}
}, () -> filterRepositoryDataStep.onResponse(repositoryData.withVersions(updatedVersionMap))),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not great but just like in last week's discussion of SnapshotsService not introducing any new breakage here so we take whatever version information we can get and run with it even on exceptions.

snapshotIdsWithoutVersion.size());
for (SnapshotId snapshotId : snapshotIdsWithoutVersion) {
threadPool().executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(loadAllVersionsListener, () ->
updatedVersionMap.put(snapshotId, getSnapshotInfo(snapshotId).version())));
}
} else {
filterRepositoryDataStep.onResponse(repositoryData);
}
})), listener::onFailure);
filterRepositoryDataStep.whenComplete(filteredRepositoryData -> {
final long newGen = setPendingStep.result();
if (latestKnownRepoGen.get() >= newGen) {
throw new IllegalArgumentException(
"Tried writing generation [" + newGen + "] but repository is at least at generation [" + latestKnownRepoGen.get()
Expand All @@ -1263,7 +1298,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
writeAtomic(indexBlob,
BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true);
BytesReference.bytes(filteredRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true);
// write the current generation to the index-latest file
final BytesReference genBytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
Expand Down Expand Up @@ -1297,13 +1332,13 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void onFailure(String source, Exception e) {
l.onFailure(
listener.onFailure(
new RepositoryException(metadata.name(), "Failed to execute cluster state update [" + source + "]", e));
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(l, () -> {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(listener, () -> {
// Delete all now outdated index files up to 1000 blobs back from the new generation.
// If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them.
// Deleting one older than the current expectedGen is done for BwC reasons as older versions used to keep
Expand All @@ -1320,7 +1355,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}));
}
});
})), listener::onFailure);
}, listener::onFailure);
}

private RepositoryMetaData getRepoMetaData(ClusterState state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,12 @@ public boolean hasOldVersionSnapshots(String repositoryName, RepositoryData repo
} else {
try {
final Repository repository = repositoriesService.repository(repositoryName);
hasOldFormatSnapshots = snapshotIds.stream().map(repository::getSnapshotInfo).anyMatch(
snapshotInfo -> (excluded == null || snapshotInfo.snapshotId().equals(excluded) == false)
&& snapshotInfo.version().before(SHARD_GEN_IN_REPO_DATA_VERSION));
hasOldFormatSnapshots = snapshotIds.stream().filter(snapshotId -> snapshotId.equals(excluded) == false).anyMatch(
snapshotId -> {
final Version known = repositoryData.getVersion(snapshotId);
return (known == null ? repository.getSnapshotInfo(snapshotId).version() : known)
.before(SHARD_GEN_IN_REPO_DATA_VERSION);
});
} catch (SnapshotMissingException e) {
logger.warn("Failed to load snapshot metadata, assuming repository is in old format", e);
return true;
Expand Down
Loading