diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index cf693f0c60a0e..d7e0c2e4d816c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -332,7 +332,8 @@ private void resetFailedAllocationCounter(RoutingAllocation allocation) { * @param The list element type. * @return A comma-separated string of the first few elements. */ - static String firstListElementsToCommaDelimitedString(List elements, Function formatter, boolean isDebugEnabled) { + public static String firstListElementsToCommaDelimitedString(List elements, Function formatter, + boolean isDebugEnabled) { final int maxNumberOfElements = 10; if (isDebugEnabled || elements.size() <= maxNumberOfElements) { return elements.stream().map(formatter).collect(Collectors.joining(", ")); diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java index 82c41251c586a..618eb246a14b4 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java @@ -69,7 +69,7 @@ public final class RepositoryData { * An instance initialized for an empty repository. */ public static final RepositoryData EMPTY = new RepositoryData(EMPTY_REPO_GEN, - Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY); + Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY); /** * The generational id of the index file from which the repository data was read. @@ -92,13 +92,16 @@ public final class RepositoryData { */ private final Map> indexSnapshots; + private final Map snapshotVersions; + /** * Shard generations. */ private final ShardGenerations shardGenerations; public RepositoryData(long genId, Map snapshotIds, Map snapshotStates, - Map> indexSnapshots, ShardGenerations shardGenerations) { + Map snapshotVersions, Map> indexSnapshots, + ShardGenerations shardGenerations) { this.genId = genId; this.snapshotIds = Collections.unmodifiableMap(snapshotIds); this.snapshotStates = Collections.unmodifiableMap(snapshotStates); @@ -106,12 +109,27 @@ public RepositoryData(long genId, Map snapshotIds, Map versions) { + if (versions.isEmpty()) { + return this; + } + final Map newVersions = new HashMap<>(snapshotVersions); + versions.forEach((id, version) -> newVersions.put(id.getUUID(), version)); + return new RepositoryData(genId, snapshotIds, snapshotStates, newVersions, indexSnapshots, shardGenerations); } public ShardGenerations shardGenerations() { @@ -141,6 +159,14 @@ public SnapshotState getSnapshotState(final SnapshotId snapshotId) { return snapshotStates.get(snapshotId.getUUID()); } + /** + * Returns the {@link Version} for the given snapshot or {@code null} if unknown. + */ + @Nullable + public Version getVersion(SnapshotId snapshotId) { + return snapshotVersions.get(snapshotId.getUUID()); + } + /** * Returns an unmodifiable map of the index names to {@link IndexId} in the repository. */ @@ -173,6 +199,7 @@ public List indicesToUpdateAfterRemovingSnapshot(SnapshotId snapshotId) */ public RepositoryData addSnapshot(final SnapshotId snapshotId, final SnapshotState snapshotState, + final Version version, final ShardGenerations shardGenerations) { if (snapshotIds.containsKey(snapshotId.getUUID())) { // if the snapshot id already exists in the repository data, it means an old master @@ -184,11 +211,13 @@ public RepositoryData addSnapshot(final SnapshotId snapshotId, snapshots.put(snapshotId.getUUID(), snapshotId); Map newSnapshotStates = new HashMap<>(snapshotStates); newSnapshotStates.put(snapshotId.getUUID(), snapshotState); + Map newSnapshotVersions = new HashMap<>(snapshotVersions); + newSnapshotVersions.put(snapshotId.getUUID(), version); Map> allIndexSnapshots = new HashMap<>(indexSnapshots); for (final IndexId indexId : shardGenerations.indices()) { allIndexSnapshots.computeIfAbsent(indexId, k -> new LinkedHashSet<>()).add(snapshotId); } - return new RepositoryData(genId, snapshots, newSnapshotStates, allIndexSnapshots, + return new RepositoryData(genId, snapshots, newSnapshotStates, newSnapshotVersions, allIndexSnapshots, ShardGenerations.builder().putAll(this.shardGenerations).putAll(shardGenerations).build()); } @@ -202,7 +231,7 @@ public RepositoryData withGenId(long newGeneration) { if (newGeneration == genId) { return this; } - return new RepositoryData(newGeneration, this.snapshotIds, this.snapshotStates, this.indexSnapshots, this.shardGenerations); + return new RepositoryData(newGeneration, snapshotIds, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations); } /** @@ -222,6 +251,8 @@ public RepositoryData removeSnapshot(final SnapshotId snapshotId, final ShardGen } Map newSnapshotStates = new HashMap<>(snapshotStates); newSnapshotStates.remove(snapshotId.getUUID()); + final Map newSnapshotVersions = new HashMap<>(snapshotVersions); + newSnapshotVersions.remove(snapshotId.getUUID()); Map> indexSnapshots = new HashMap<>(); for (final IndexId indexId : indices.values()) { Set set; @@ -241,7 +272,7 @@ public RepositoryData removeSnapshot(final SnapshotId snapshotId, final ShardGen indexSnapshots.put(indexId, set); } - return new RepositoryData(genId, newSnapshotIds, newSnapshotStates, indexSnapshots, + return new RepositoryData(genId, newSnapshotIds, newSnapshotStates, newSnapshotVersions, indexSnapshots, ShardGenerations.builder().putAll(shardGenerations).putAll(updatedShardGenerations) .retainIndicesAndPruneDeletes(indexSnapshots.keySet()).build() ); @@ -269,6 +300,7 @@ public boolean equals(Object obj) { RepositoryData that = (RepositoryData) obj; return snapshotIds.equals(that.snapshotIds) && snapshotStates.equals(that.snapshotStates) + && snapshotVersions.equals(that.snapshotVersions) && indices.equals(that.indices) && indexSnapshots.equals(that.indexSnapshots) && shardGenerations.equals(that.shardGenerations); @@ -276,7 +308,7 @@ public boolean equals(Object obj) { @Override public int hashCode() { - return Objects.hash(snapshotIds, snapshotStates, indices, indexSnapshots, shardGenerations); + return Objects.hash(snapshotIds, snapshotStates, snapshotVersions, indices, indexSnapshots, shardGenerations); } /** @@ -323,6 +355,7 @@ public List resolveNewIndices(final List indicesToResolve) { private static final String NAME = "name"; private static final String UUID = "uuid"; private static final String STATE = "state"; + private static final String VERSION = "version"; private static final String MIN_VERSION = "min_version"; /** @@ -339,6 +372,9 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final if (snapshotStates.containsKey(snapshot.getUUID())) { builder.field(STATE, snapshotStates.get(snapshot.getUUID()).value()); } + if (snapshotVersions.containsKey(snapshot.getUUID())) { + builder.field(VERSION, snapshotVersions.get(snapshot.getUUID()).toString()); + } builder.endObject(); } builder.endArray(); @@ -378,6 +414,7 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final public static RepositoryData snapshotsFromXContent(final XContentParser parser, long genId) throws IOException { final Map snapshots = new HashMap<>(); final Map snapshotStates = new HashMap<>(); + final Map snapshotVersions = new HashMap<>(); final Map> indexSnapshots = new HashMap<>(); final ShardGenerations.Builder shardGenerations = ShardGenerations.builder(); @@ -390,6 +427,7 @@ public static RepositoryData snapshotsFromXContent(final XContentParser parser, String name = null; String uuid = null; SnapshotState state = null; + Version version = null; while (parser.nextToken() != XContentParser.Token.END_OBJECT) { String currentFieldName = parser.currentName(); parser.nextToken(); @@ -399,12 +437,17 @@ public static RepositoryData snapshotsFromXContent(final XContentParser parser, uuid = parser.text(); } else if (STATE.equals(currentFieldName)) { state = SnapshotState.fromValue(parser.numberValue().byteValue()); + } else if (VERSION.equals(currentFieldName)) { + version = Version.fromString(parser.text()); } } final SnapshotId snapshotId = new SnapshotId(name, uuid); if (state != null) { snapshotStates.put(uuid, state); } + if (version != null) { + snapshotVersions.put(uuid, version); + } snapshots.put(snapshotId.getUUID(), snapshotId); } } else { @@ -488,7 +531,7 @@ public static RepositoryData snapshotsFromXContent(final XContentParser parser, } else { throw new ElasticsearchParseException("start object expected"); } - return new RepositoryData(genId, snapshots, snapshotStates, indexSnapshots, shardGenerations.build()); + return new RepositoryData(genId, snapshots, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations.build()); } } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index b9016394ca514..87fbf9c44d0c3 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -32,6 +32,7 @@ import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.StepListener; @@ -46,6 +47,7 @@ import org.elasticsearch.cluster.metadata.RepositoriesMetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Numbers; @@ -121,6 +123,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -873,7 +876,7 @@ public void finalizeSnapshot(final SnapshotId snapshotId, final SnapshotInfo snapshotInfo = snapshotInfos.iterator().next(); getRepositoryData(ActionListener.wrap(existingRepositoryData -> { final RepositoryData updatedRepositoryData = - existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), shardGenerations); + existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), Version.CURRENT, shardGenerations); writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens, ActionListener.wrap(v -> { if (writeShardGens) { cleanupOldShardGens(existingRepositoryData, updatedRepositoryData); @@ -1252,8 +1255,42 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } }); + final StepListener filterRepositoryDataStep = new StepListener<>(); + // Step 2: Write new index-N blob to repository and update index.latest setPendingStep.whenComplete(newGen -> threadPool().executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> { + // BwC logic: Load snapshot version information if any snapshot is missing a version in RepositoryData so that the new + // RepositoryData contains a version for every snapshot + final List snapshotIdsWithoutVersion = repositoryData.getSnapshotIds().stream().filter( + snapshotId -> repositoryData.getVersion(snapshotId) == null).collect(Collectors.toList()); + if (snapshotIdsWithoutVersion.isEmpty() == false) { + final Map updatedVersionMap = new ConcurrentHashMap<>(); + final GroupedActionListener loadAllVersionsListener = new GroupedActionListener<>( + ActionListener.runAfter( + new ActionListener<>() { + @Override + public void onResponse(Collection voids) { + logger.info("Successfully loaded all snapshot's version information for {} from snapshot metadata", + AllocationService.firstListElementsToCommaDelimitedString( + snapshotIdsWithoutVersion, SnapshotId::toString, logger.isDebugEnabled())); + } + + @Override + public void onFailure(Exception e) { + logger.warn("Failure when trying to load missing version information from snapshot metadata", e); + } + }, () -> filterRepositoryDataStep.onResponse(repositoryData.withVersions(updatedVersionMap))), + snapshotIdsWithoutVersion.size()); + for (SnapshotId snapshotId : snapshotIdsWithoutVersion) { + threadPool().executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(loadAllVersionsListener, () -> + updatedVersionMap.put(snapshotId, getSnapshotInfo(snapshotId).version()))); + } + } else { + filterRepositoryDataStep.onResponse(repositoryData); + } + })), listener::onFailure); + filterRepositoryDataStep.whenComplete(filteredRepositoryData -> { + final long newGen = setPendingStep.result(); if (latestKnownRepoGen.get() >= newGen) { throw new IllegalArgumentException( "Tried writing generation [" + newGen + "] but repository is at least at generation [" + latestKnownRepoGen.get() @@ -1263,7 +1300,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen); logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob); writeAtomic(indexBlob, - BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true); + BytesReference.bytes(filteredRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true); // write the current generation to the index-latest file final BytesReference genBytes; try (BytesStreamOutput bStream = new BytesStreamOutput()) { @@ -1297,13 +1334,13 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(String source, Exception e) { - l.onFailure( + listener.onFailure( new RepositoryException(metadata.name(), "Failed to execute cluster state update [" + source + "]", e)); } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(l, () -> { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(listener, () -> { // Delete all now outdated index files up to 1000 blobs back from the new generation. // If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them. // Deleting one older than the current expectedGen is done for BwC reasons as older versions used to keep @@ -1320,7 +1357,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS })); } }); - })), listener::onFailure); + }, listener::onFailure); } private RepositoryMetaData getRepoMetaData(ClusterState state) { diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 9340e1508999c..45967235e8086 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -365,9 +365,12 @@ public boolean hasOldVersionSnapshots(String repositoryName, RepositoryData repo } else { try { final Repository repository = repositoriesService.repository(repositoryName); - hasOldFormatSnapshots = snapshotIds.stream().map(repository::getSnapshotInfo).anyMatch( - snapshotInfo -> (excluded == null || snapshotInfo.snapshotId().equals(excluded) == false) - && snapshotInfo.version().before(SHARD_GEN_IN_REPO_DATA_VERSION)); + hasOldFormatSnapshots = snapshotIds.stream().filter(snapshotId -> snapshotId.equals(excluded) == false).anyMatch( + snapshotId -> { + final Version known = repositoryData.getVersion(snapshotId); + return (known == null ? repository.getSnapshotInfo(snapshotId).version() : known) + .before(SHARD_GEN_IN_REPO_DATA_VERSION); + }); } catch (SnapshotMissingException e) { logger.warn("Failed to load snapshot metadata, assuming repository is in old format", e); return true; diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java index 12adfb49120a7..0d15856745793 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.repositories; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.Version; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.XContent; @@ -105,7 +106,8 @@ public void testAddSnapshots() { builder.put(indexId, 0, "2"); } RepositoryData newRepoData = repositoryData.addSnapshot(newSnapshot, - randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED), builder.build()); + randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED), + randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()), builder.build()); // verify that the new repository data has the new snapshot and its indices assertTrue(newRepoData.getSnapshotIds().contains(newSnapshot)); for (IndexId indexId : indices) { @@ -122,17 +124,19 @@ public void testInitIndices() { final int numSnapshots = randomIntBetween(1, 30); final Map snapshotIds = new HashMap<>(numSnapshots); final Map snapshotStates = new HashMap<>(numSnapshots); + final Map snapshotVersions = new HashMap<>(numSnapshots); for (int i = 0; i < numSnapshots; i++) { final SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID()); snapshotIds.put(snapshotId.getUUID(), snapshotId); snapshotStates.put(snapshotId.getUUID(), randomFrom(SnapshotState.values())); + snapshotVersions.put(snapshotId.getUUID(), randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion())); } RepositoryData repositoryData = new RepositoryData(EMPTY_REPO_GEN, snapshotIds, - Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY); + Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY); // test that initializing indices works Map> indices = randomIndices(snapshotIds); RepositoryData newRepoData = - new RepositoryData(repositoryData.getGenId(), snapshotIds, snapshotStates, indices, ShardGenerations.EMPTY); + new RepositoryData(repositoryData.getGenId(), snapshotIds, snapshotStates, snapshotVersions, indices, ShardGenerations.EMPTY); List expected = new ArrayList<>(repositoryData.getSnapshotIds()); Collections.sort(expected); List actual = new ArrayList<>(newRepoData.getSnapshotIds()); @@ -168,7 +172,8 @@ public void testResolveIndexId() { public void testGetSnapshotState() { final SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID()); final SnapshotState state = randomFrom(SnapshotState.values()); - final RepositoryData repositoryData = RepositoryData.EMPTY.addSnapshot(snapshotId, state, ShardGenerations.EMPTY); + final RepositoryData repositoryData = RepositoryData.EMPTY.addSnapshot(snapshotId, state, + randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()), ShardGenerations.EMPTY); assertEquals(state, repositoryData.getSnapshotState(snapshotId)); assertNull(repositoryData.getSnapshotState(new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID()))); } @@ -187,9 +192,11 @@ public void testIndexThatReferencesAnUnknownSnapshot() throws IOException { Map snapshotIds = new HashMap<>(); Map snapshotStates = new HashMap<>(); + Map snapshotVersions = new HashMap<>(); for (SnapshotId snapshotId : parsedRepositoryData.getSnapshotIds()) { snapshotIds.put(snapshotId.getUUID(), snapshotId); snapshotStates.put(snapshotId.getUUID(), parsedRepositoryData.getSnapshotState(snapshotId)); + snapshotVersions.put(snapshotId.getUUID(), parsedRepositoryData.getVersion(snapshotId)); } final IndexId corruptedIndexId = randomFrom(parsedRepositoryData.getIndices().values()); @@ -211,7 +218,7 @@ public void testIndexThatReferencesAnUnknownSnapshot() throws IOException { assertNotNull(corruptedIndexId); RepositoryData corruptedRepositoryData = new RepositoryData(parsedRepositoryData.getGenId(), snapshotIds, snapshotStates, - indexSnapshots, shardGenBuilder.build()); + snapshotVersions, indexSnapshots, shardGenBuilder.build()); final XContentBuilder corruptedBuilder = XContentBuilder.builder(xContent); corruptedRepositoryData.snapshotsToXContent(corruptedBuilder, true); @@ -280,7 +287,8 @@ public static RepositoryData generateRandomRepoData() { builder.put(someIndex, j, uuid); } } - repositoryData = repositoryData.addSnapshot(snapshotId, randomFrom(SnapshotState.values()), builder.build()); + repositoryData = repositoryData.addSnapshot(snapshotId, randomFrom(SnapshotState.values()), + randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()), builder.build()); } return repositoryData; } diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java index 13102182cd7b0..3f0e80a93d263 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.repositories.blobstore; +import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -259,7 +260,7 @@ private RepositoryData addRandomSnapshotsToRepoData(RepositoryData repoData, boo builder.put(new IndexId(randomAlphaOfLength(8), UUIDs.randomBase64UUID()), 0, "1"); } repoData = repoData.addSnapshot(snapshotId, - randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED), builder.build()); + randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED), Version.CURRENT, builder.build()); } return repoData; } diff --git a/server/src/test/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java b/server/src/test/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java index 9abb107521042..8e08d0700ca2a 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.snapshots; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.support.PlainActionFuture; @@ -27,18 +28,25 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoriesMetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; +import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.threadpool.ThreadPool; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.Collections; import java.util.Locale; +import java.util.function.Function; +import java.util.stream.Collectors; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.containsString; @@ -263,11 +271,24 @@ public void testHandlingMissingRootLevelSnapshotMetadata() throws Exception { logger.info("--> delete root level snapshot metadata blob for snapshot [{}]", snapshotToCorrupt); Files.delete(repo.resolve(String.format(Locale.ROOT, BlobStoreRepository.SNAPSHOT_NAME_FORMAT, snapshotToCorrupt.getUUID()))); + logger.info("--> strip version information from index-N blob"); + final RepositoryData withoutVersions = new RepositoryData(repositoryData.getGenId(), + repositoryData.getSnapshotIds().stream().collect(Collectors.toMap( + SnapshotId::getUUID, Function.identity())), + repositoryData.getSnapshotIds().stream().collect(Collectors.toMap( + SnapshotId::getUUID, repositoryData::getSnapshotState)), + Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY); + + Files.write(repo.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + withoutVersions.getGenId()), + BytesReference.toBytes(BytesReference.bytes(withoutVersions.snapshotsToXContent(XContentFactory.jsonBuilder(), + true))), StandardOpenOption.TRUNCATE_EXISTING); + logger.info("--> verify that repo is assumed in old metadata format"); final SnapshotsService snapshotsService = internalCluster().getCurrentMasterNodeInstance(SnapshotsService.class); final ThreadPool threadPool = internalCluster().getCurrentMasterNodeInstance(ThreadPool.class); assertThat(PlainActionFuture.get(f -> threadPool.generic().execute( - ActionRunnable.supply(f, () -> snapshotsService.hasOldVersionSnapshots(repoName, repositoryData, null)))), is(true)); + ActionRunnable.supply(f, () -> snapshotsService.hasOldVersionSnapshots(repoName, getRepositoryData(repository), null)))), + is(true)); logger.info("--> verify that snapshot with missing root level metadata can be deleted"); assertAcked(client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotToCorrupt.getName()).get()); @@ -276,6 +297,10 @@ public void testHandlingMissingRootLevelSnapshotMetadata() throws Exception { assertThat(PlainActionFuture.get(f -> threadPool.generic().execute( ActionRunnable.supply(f, () -> snapshotsService.hasOldVersionSnapshots(repoName, getRepositoryData(repository), null)))), is(false)); + final RepositoryData finalRepositoryData = getRepositoryData(repository); + for (SnapshotId snapshotId : finalRepositoryData.getSnapshotIds()) { + assertThat(finalRepositoryData.getVersion(snapshotId), is(Version.CURRENT)); + } } private void assertRepositoryBlocked(Client client, String repo, String existingSnapshot) { diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index f0118d3c0b699..ecce14dfd460c 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -88,7 +88,7 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind @Override public void getRepositoryData(ActionListener listener) { final IndexId indexId = new IndexId(indexName, "blah"); - listener.onResponse(new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(), + listener.onResponse(new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.singletonMap(indexId, emptySet()), ShardGenerations.EMPTY)); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 20c098ee7f82a..19ee64290133c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -13,6 +13,7 @@ import org.apache.lucene.index.IndexCommit; import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; @@ -231,6 +232,7 @@ public void getRepositoryData(ActionListener listener) { Map copiedSnapshotIds = new HashMap<>(); Map snapshotStates = new HashMap<>(copiedSnapshotIds.size()); + Map snapshotVersions = new HashMap<>(copiedSnapshotIds.size()); Map> indexSnapshots = new HashMap<>(copiedSnapshotIds.size()); ImmutableOpenMap remoteIndices = remoteMetaData.getIndices(); @@ -239,10 +241,11 @@ public void getRepositoryData(ActionListener listener) { SnapshotId snapshotId = new SnapshotId(LATEST, LATEST); copiedSnapshotIds.put(indexName, snapshotId); snapshotStates.put(indexName, SnapshotState.SUCCESS); + snapshotVersions.put(indexName, Version.CURRENT); Index index = remoteIndices.get(indexName).getIndex(); indexSnapshots.put(new IndexId(indexName, index.getUUID()), Collections.singleton(snapshotId)); } - return new RepositoryData(1, copiedSnapshotIds, snapshotStates, indexSnapshots, ShardGenerations.EMPTY); + return new RepositoryData(1, copiedSnapshotIds, snapshotStates, snapshotVersions, indexSnapshots, ShardGenerations.EMPTY); }); }