diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 6d9cba05748ea..72aa81d3c0eda 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -79,11 +79,11 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met } @Override - public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, - List shardFailures, long repositoryStateId, boolean includeGlobalState, - MetaData metaData, Map userMetadata) { - return in.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId, - includeGlobalState, metaData, userMetadata); + public void finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, + List shardFailures, long repositoryStateId, boolean includeGlobalState, + MetaData metaData, Map userMetadata, ActionListener listener) { + in.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId, + includeGlobalState, metaData, userMetadata, listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index f83712249f7fc..ff4c991531923 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -135,11 +135,11 @@ default Repository create(RepositoryMetaData metaData, Function indices, long startTime, String failure, int totalShards, - List shardFailures, long repositoryStateId, boolean includeGlobalState, - MetaData clusterMetaData, Map userMetadata); + void finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, + List shardFailures, long repositoryStateId, boolean includeGlobalState, + MetaData clusterMetaData, Map userMetadata, ActionListener listener); /** * Deletes snapshot diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 47df18146d1ea..92798034678c6 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -97,7 +97,6 @@ import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; -import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; import java.util.ArrayList; import java.util.Arrays; @@ -665,53 +664,60 @@ private void deleteIndexMetaDataBlobIgnoringErrors(SnapshotId snapshotId, IndexI } @Override - public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId, - final List indices, - final long startTime, - final String failure, - final int totalShards, - final List shardFailures, - final long repositoryStateId, - final boolean includeGlobalState, - final MetaData clusterMetaData, - final Map userMetadata) { - SnapshotInfo blobStoreSnapshot = new SnapshotInfo(snapshotId, - indices.stream().map(IndexId::getName).collect(Collectors.toList()), - startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures, - includeGlobalState, userMetadata); + public void finalizeSnapshot(final SnapshotId snapshotId, + final List indices, + final long startTime, + final String failure, + final int totalShards, + final List shardFailures, + final long repositoryStateId, + final boolean includeGlobalState, + final MetaData clusterMetaData, + final Map userMetadata, + final ActionListener listener) { + + // Once we're done writing all metadata, we update the index-N blob to finalize the snapshot + final ActionListener afterMetaWrites = ActionListener.wrap(snapshotInfo -> { + writeIndexGen(getRepositoryData().addSnapshot(snapshotId, snapshotInfo.state(), indices), repositoryStateId); + listener.onResponse(snapshotInfo); + }, ex -> listener.onFailure(new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", ex))); + + // We upload one meta blob for each index, one for the cluster-state and one snap-${uuid}.dat blob + final GroupedActionListener allMetaListener = + new GroupedActionListener<>(ActionListener.map(afterMetaWrites, snapshotInfos -> { + assert snapshotInfos.size() == 1 : "Should have only received a single SnapshotInfo but received " + snapshotInfos; + return snapshotInfos.iterator().next(); + }), 2 + indices.size()); + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - try { - // We ignore all FileAlreadyExistsException here since otherwise a master failover while in this method will - // mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the - // index or global metadata will be compatible with the segments written in this snapshot as well. - // Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way that - // decrements the generation it points at + // We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method will + // mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the + // index or global metadata will be compatible with the segments written in this snapshot as well. + // Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way + // that decrements the generation it points at - // Write Global MetaData + // Write Global MetaData + executor.execute(ActionRunnable.wrap(allMetaListener, l -> { globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID(), false); + l.onResponse(null); + })); - // write the index metadata for each index in the snapshot - for (IndexId index : indices) { + // write the index metadata for each index in the snapshot + for (IndexId index : indices) { + executor.execute(ActionRunnable.wrap(allMetaListener, l -> { indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false); - } - } catch (IOException ex) { - throw new SnapshotException(metadata.name(), snapshotId, "failed to write metadata for snapshot", ex); + l.onResponse(null); + })); } - try { - final RepositoryData updatedRepositoryData = getRepositoryData().addSnapshot(snapshotId, blobStoreSnapshot.state(), indices); - snapshotFormat.write(blobStoreSnapshot, blobContainer(), snapshotId.getUUID(), false); - writeIndexGen(updatedRepositoryData, repositoryStateId); - } catch (FileAlreadyExistsException ex) { - // if another master was elected and took over finalizing the snapshot, it is possible - // that both nodes try to finalize the snapshot and write to the same blobs, so we just - // log a warning here and carry on - throw new RepositoryException(metadata.name(), "Blob already exists while " + - "finalizing snapshot, assume the snapshot has already been saved", ex); - } catch (IOException ex) { - throw new RepositoryException(metadata.name(), "failed to update snapshot in repository", ex); - } - return blobStoreSnapshot; + executor.execute(ActionRunnable.wrap(afterMetaWrites, afterMetaListener -> { + final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshotId, + indices.stream().map(IndexId::getName).collect(Collectors.toList()), + startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures, + includeGlobalState, userMetadata); + snapshotFormat.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), false); + afterMetaListener.onResponse(snapshotInfo); + })); } @Override diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 5af6e397a5539..b1823a9b3117a 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -561,25 +561,25 @@ public void onNoLongerMaster() { private void cleanupAfterError(Exception exception) { threadPool.generic().execute(() -> { if (snapshotCreated) { - try { - repositoriesService.repository(snapshot.snapshot().getRepository()) - .finalizeSnapshot(snapshot.snapshot().getSnapshotId(), - snapshot.indices(), - snapshot.startTime(), - ExceptionsHelper.detailedMessage(exception), - 0, - Collections.emptyList(), - snapshot.getRepositoryStateId(), - snapshot.includeGlobalState(), - metaDataForSnapshot(snapshot, clusterService.state().metaData()), - snapshot.userMetadata()); - } catch (Exception inner) { - inner.addSuppressed(exception); - logger.warn(() -> new ParameterizedMessage("[{}] failed to close snapshot in repository", - snapshot.snapshot()), inner); - } + repositoriesService.repository(snapshot.snapshot().getRepository()) + .finalizeSnapshot(snapshot.snapshot().getSnapshotId(), + snapshot.indices(), + snapshot.startTime(), + ExceptionsHelper.stackTrace(exception), + 0, + Collections.emptyList(), + snapshot.getRepositoryStateId(), + snapshot.includeGlobalState(), + metaDataForSnapshot(snapshot, clusterService.state().metaData()), + snapshot.userMetadata(), ActionListener.runAfter(ActionListener.wrap(ignored -> { + }, inner -> { + inner.addSuppressed(exception); + logger.warn(() -> new ParameterizedMessage("[{}] failed to finalize snapshot in repository", + snapshot.snapshot()), inner); + }), () -> userCreateSnapshotListener.onFailure(e))); + } else { + userCreateSnapshotListener.onFailure(e); } - userCreateSnapshotListener.onFailure(e); }); } } @@ -1007,7 +1007,7 @@ protected void doRun() { shardFailures.add(new SnapshotShardFailure(status.nodeId(), shardId, status.reason())); } } - SnapshotInfo snapshotInfo = repository.finalizeSnapshot( + repository.finalizeSnapshot( snapshot.getSnapshotId(), entry.indices(), entry.startTime(), @@ -1017,9 +1017,10 @@ protected void doRun() { entry.getRepositoryStateId(), entry.includeGlobalState(), metaDataForSnapshot(entry, metaData), - entry.userMetadata()); - removeSnapshotFromClusterState(snapshot, snapshotInfo, null); - logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state()); + entry.userMetadata(), ActionListener.wrap(snapshotInfo -> { + removeSnapshotFromClusterState(snapshot, snapshotInfo, null); + logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state()); + }, this::onFailure)); } @Override diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 7a1bcefea9d95..41ea153499a05 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -159,10 +159,11 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met } @Override - public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, - int totalShards, List shardFailures, long repositoryStateId, - boolean includeGlobalState, MetaData metaData, Map userMetadata) { - return null; + public void finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, + int totalShards, List shardFailures, long repositoryStateId, + boolean includeGlobalState, MetaData metaData, Map userMetadata, + ActionListener listener) { + listener.onResponse(null); } @Override diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java index 14d4a5ba60b97..9d94fc3ed4368 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java @@ -18,15 +18,16 @@ */ package org.elasticsearch.snapshots.mockstore; +import org.apache.lucene.util.SameThreadExecutorService; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.env.Environment; -import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -34,32 +35,16 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.NoSuchFileException; -import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; -import static org.elasticsearch.env.Environment.PATH_HOME_SETTING; -import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.startsWith; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class MockEventuallyConsistentRepositoryTests extends ESTestCase { - private Environment environment; - - @Override - public void setUp() throws Exception { - super.setUp(); - final Path tempDir = createTempDir(); - final String nodeName = "testNode"; - environment = TestEnvironment.newEnvironment(Settings.builder() - .put(NODE_NAME_SETTING.getKey(), nodeName) - .put(PATH_HOME_SETTING.getKey(), tempDir.resolve(nodeName).toAbsolutePath()) - .put(Environment.PATH_REPO_SETTING.getKey(), tempDir.resolve("repo").toAbsolutePath()) - .build()); - } - public void testReadAfterWriteConsistently() throws IOException { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( @@ -151,27 +136,37 @@ public void testOverwriteShardSnapBlobFails() throws IOException { public void testOverwriteSnapshotInfoBlob() { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); + final ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.executor(ThreadPool.Names.SNAPSHOT)).thenReturn(new SameThreadExecutorService()); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { + xContentRegistry(), threadPool, blobStoreContext)) { repository.start(); // We create a snap- blob for snapshot "foo" in the first generation + final PlainActionFuture future = PlainActionFuture.newFuture(); final SnapshotId snapshotId = new SnapshotId("foo", UUIDs.randomBase64UUID()); repository.finalizeSnapshot(snapshotId, Collections.emptyList(), 1L, null, 5, Collections.emptyList(), - -1L, false, MetaData.EMPTY_META_DATA, Collections.emptyMap()); + -1L, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), future); + future.actionGet(); // We try to write another snap- blob for "foo" in the next generation. It fails because the content differs. final AssertionError assertionError = expectThrows(AssertionError.class, - () -> repository.finalizeSnapshot( - snapshotId, Collections.emptyList(), 1L, null, 6, Collections.emptyList(), - 0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap())); + () -> { + final PlainActionFuture fut = PlainActionFuture.newFuture(); + repository.finalizeSnapshot( + snapshotId, Collections.emptyList(), 1L, null, 6, Collections.emptyList(), + 0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), fut); + fut.actionGet(); + }); assertThat(assertionError.getMessage(), equalTo("\nExpected: <6>\n but: was <5>")); // We try to write yet another snap- blob for "foo" in the next generation. // It passes cleanly because the content of the blob except for the timestamps. + final PlainActionFuture future2 = PlainActionFuture.newFuture(); repository.finalizeSnapshot(snapshotId, Collections.emptyList(), 1L, null, 5, Collections.emptyList(), - 0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap()); + 0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), future2); + future2.actionGet(); } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index 417e4e98649af..f1703798f31a2 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -98,10 +98,11 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met } @Override - public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, - int totalShards, List shardFailures, long repositoryStateId, - boolean includeGlobalState, MetaData metaData, Map userMetadata) { - return null; + public void finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, + int totalShards, List shardFailures, long repositoryStateId, + boolean includeGlobalState, MetaData metaData, Map userMetadata, + ActionListener listener) { + listener.onResponse(null); } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 293fc04989f98..e5638a3ba0ed3 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -254,9 +254,9 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } @Override - public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, - List shardFailures, long repositoryStateId, boolean includeGlobalState, - MetaData metaData, Map userMetadata) { + public void finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, + List shardFailures, long repositoryStateId, boolean includeGlobalState, + MetaData metaData, Map userMetadata, ActionListener listener) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java index 280e4a4344575..711abf781894c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java @@ -90,17 +90,17 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met } @Override - public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, - List shardFailures, long repositoryStateId, boolean includeGlobalState, MetaData metaData, - Map userMetadata) { + public void finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, + List shardFailures, long repositoryStateId, boolean includeGlobalState, + MetaData metaData, Map userMetadata, ActionListener listener) { // we process the index metadata at snapshot time. This means if somebody tries to restore // a _source only snapshot with a plain repository it will be just fine since we already set the // required engine, that the index is read-only and the mapping to a default mapping try { - return super.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId, - includeGlobalState, metadataToSnapshot(indices, metaData), userMetadata); + super.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId, + includeGlobalState, metadataToSnapshot(indices, metaData), userMetadata, listener); } catch (IOException ex) { - throw new UncheckedIOException(ex); + listener.onFailure(ex); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index 34acf179c3fbf..4b80bfc5cd268 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -205,10 +205,13 @@ public void testRestoreMinmal() throws IOException { repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(), indexShardSnapshotStatus, future); future.actionGet(); + final PlainActionFuture finFuture = PlainActionFuture.newFuture(); repository.finalizeSnapshot(snapshotId, Collections.singletonList(indexId), indexShardSnapshotStatus.asCopy().getStartTime(), null, 1, Collections.emptyList(), repository.getRepositoryData().getGenId(), true, - MetaData.builder().put(shard.indexSettings().getIndexMetaData(), false).build(), Collections.emptyMap()); + MetaData.builder().put(shard.indexSettings().getIndexMetaData(), false).build(), Collections.emptyMap(), + finFuture); + finFuture.actionGet(); }); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount());