From 807c1d15aac5383792de8b6d92e403d552258cc7 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 30 Sep 2019 17:54:50 +0200 Subject: [PATCH] Speed up Snapshot Finalization (#47283) As a result of #45689 snapshot finalization started to take significantly longer than before. This may be a little unfortunate since it increases the likelihood of failing to finalize after having written out all the segment blobs. This change parallelizes all the metadata writes that can safely run in parallel in the finalization step to speed the finalization step up again. Also, this will generally speed up the snapshot process overall in case of large number of indices. This is also a nice to have for #46250 since we add yet another step (deleting of old index- blobs in the shards to the finalization. --- .../repositories/FilterRepository.java | 10 +-- .../repositories/Repository.java | 8 +- .../blobstore/BlobStoreRepository.java | 88 ++++++++++--------- .../snapshots/SnapshotsService.java | 45 +++++----- .../RepositoriesServiceTests.java | 9 +- ...ckEventuallyConsistentRepositoryTests.java | 45 +++++----- .../index/shard/RestoreOnlyRepository.java | 9 +- .../xpack/ccr/repository/CcrRepository.java | 6 +- .../SourceOnlySnapshotRepository.java | 12 +-- .../SourceOnlySnapshotShardTests.java | 5 +- 10 files changed, 122 insertions(+), 115 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 6d9cba05748ea..72aa81d3c0eda 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -79,11 +79,11 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met } @Override - public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, - List shardFailures, long repositoryStateId, boolean includeGlobalState, - MetaData metaData, Map userMetadata) { - return in.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId, - includeGlobalState, metaData, userMetadata); + public void finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, + List shardFailures, long repositoryStateId, boolean includeGlobalState, + MetaData metaData, Map userMetadata, ActionListener listener) { + in.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId, + includeGlobalState, metaData, userMetadata, listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index f83712249f7fc..ff4c991531923 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -135,11 +135,11 @@ default Repository create(RepositoryMetaData metaData, Function indices, long startTime, String failure, int totalShards, - List shardFailures, long repositoryStateId, boolean includeGlobalState, - MetaData clusterMetaData, Map userMetadata); + void finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, + List shardFailures, long repositoryStateId, boolean includeGlobalState, + MetaData clusterMetaData, Map userMetadata, ActionListener listener); /** * Deletes snapshot diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 47df18146d1ea..92798034678c6 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -97,7 +97,6 @@ import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; -import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; import java.util.ArrayList; import java.util.Arrays; @@ -665,53 +664,60 @@ private void deleteIndexMetaDataBlobIgnoringErrors(SnapshotId snapshotId, IndexI } @Override - public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId, - final List indices, - final long startTime, - final String failure, - final int totalShards, - final List shardFailures, - final long repositoryStateId, - final boolean includeGlobalState, - final MetaData clusterMetaData, - final Map userMetadata) { - SnapshotInfo blobStoreSnapshot = new SnapshotInfo(snapshotId, - indices.stream().map(IndexId::getName).collect(Collectors.toList()), - startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures, - includeGlobalState, userMetadata); + public void finalizeSnapshot(final SnapshotId snapshotId, + final List indices, + final long startTime, + final String failure, + final int totalShards, + final List shardFailures, + final long repositoryStateId, + final boolean includeGlobalState, + final MetaData clusterMetaData, + final Map userMetadata, + final ActionListener listener) { + + // Once we're done writing all metadata, we update the index-N blob to finalize the snapshot + final ActionListener afterMetaWrites = ActionListener.wrap(snapshotInfo -> { + writeIndexGen(getRepositoryData().addSnapshot(snapshotId, snapshotInfo.state(), indices), repositoryStateId); + listener.onResponse(snapshotInfo); + }, ex -> listener.onFailure(new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", ex))); + + // We upload one meta blob for each index, one for the cluster-state and one snap-${uuid}.dat blob + final GroupedActionListener allMetaListener = + new GroupedActionListener<>(ActionListener.map(afterMetaWrites, snapshotInfos -> { + assert snapshotInfos.size() == 1 : "Should have only received a single SnapshotInfo but received " + snapshotInfos; + return snapshotInfos.iterator().next(); + }), 2 + indices.size()); + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - try { - // We ignore all FileAlreadyExistsException here since otherwise a master failover while in this method will - // mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the - // index or global metadata will be compatible with the segments written in this snapshot as well. - // Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way that - // decrements the generation it points at + // We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method will + // mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the + // index or global metadata will be compatible with the segments written in this snapshot as well. + // Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way + // that decrements the generation it points at - // Write Global MetaData + // Write Global MetaData + executor.execute(ActionRunnable.wrap(allMetaListener, l -> { globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID(), false); + l.onResponse(null); + })); - // write the index metadata for each index in the snapshot - for (IndexId index : indices) { + // write the index metadata for each index in the snapshot + for (IndexId index : indices) { + executor.execute(ActionRunnable.wrap(allMetaListener, l -> { indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false); - } - } catch (IOException ex) { - throw new SnapshotException(metadata.name(), snapshotId, "failed to write metadata for snapshot", ex); + l.onResponse(null); + })); } - try { - final RepositoryData updatedRepositoryData = getRepositoryData().addSnapshot(snapshotId, blobStoreSnapshot.state(), indices); - snapshotFormat.write(blobStoreSnapshot, blobContainer(), snapshotId.getUUID(), false); - writeIndexGen(updatedRepositoryData, repositoryStateId); - } catch (FileAlreadyExistsException ex) { - // if another master was elected and took over finalizing the snapshot, it is possible - // that both nodes try to finalize the snapshot and write to the same blobs, so we just - // log a warning here and carry on - throw new RepositoryException(metadata.name(), "Blob already exists while " + - "finalizing snapshot, assume the snapshot has already been saved", ex); - } catch (IOException ex) { - throw new RepositoryException(metadata.name(), "failed to update snapshot in repository", ex); - } - return blobStoreSnapshot; + executor.execute(ActionRunnable.wrap(afterMetaWrites, afterMetaListener -> { + final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshotId, + indices.stream().map(IndexId::getName).collect(Collectors.toList()), + startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures, + includeGlobalState, userMetadata); + snapshotFormat.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), false); + afterMetaListener.onResponse(snapshotInfo); + })); } @Override diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 5af6e397a5539..b1823a9b3117a 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -561,25 +561,25 @@ public void onNoLongerMaster() { private void cleanupAfterError(Exception exception) { threadPool.generic().execute(() -> { if (snapshotCreated) { - try { - repositoriesService.repository(snapshot.snapshot().getRepository()) - .finalizeSnapshot(snapshot.snapshot().getSnapshotId(), - snapshot.indices(), - snapshot.startTime(), - ExceptionsHelper.detailedMessage(exception), - 0, - Collections.emptyList(), - snapshot.getRepositoryStateId(), - snapshot.includeGlobalState(), - metaDataForSnapshot(snapshot, clusterService.state().metaData()), - snapshot.userMetadata()); - } catch (Exception inner) { - inner.addSuppressed(exception); - logger.warn(() -> new ParameterizedMessage("[{}] failed to close snapshot in repository", - snapshot.snapshot()), inner); - } + repositoriesService.repository(snapshot.snapshot().getRepository()) + .finalizeSnapshot(snapshot.snapshot().getSnapshotId(), + snapshot.indices(), + snapshot.startTime(), + ExceptionsHelper.stackTrace(exception), + 0, + Collections.emptyList(), + snapshot.getRepositoryStateId(), + snapshot.includeGlobalState(), + metaDataForSnapshot(snapshot, clusterService.state().metaData()), + snapshot.userMetadata(), ActionListener.runAfter(ActionListener.wrap(ignored -> { + }, inner -> { + inner.addSuppressed(exception); + logger.warn(() -> new ParameterizedMessage("[{}] failed to finalize snapshot in repository", + snapshot.snapshot()), inner); + }), () -> userCreateSnapshotListener.onFailure(e))); + } else { + userCreateSnapshotListener.onFailure(e); } - userCreateSnapshotListener.onFailure(e); }); } } @@ -1007,7 +1007,7 @@ protected void doRun() { shardFailures.add(new SnapshotShardFailure(status.nodeId(), shardId, status.reason())); } } - SnapshotInfo snapshotInfo = repository.finalizeSnapshot( + repository.finalizeSnapshot( snapshot.getSnapshotId(), entry.indices(), entry.startTime(), @@ -1017,9 +1017,10 @@ protected void doRun() { entry.getRepositoryStateId(), entry.includeGlobalState(), metaDataForSnapshot(entry, metaData), - entry.userMetadata()); - removeSnapshotFromClusterState(snapshot, snapshotInfo, null); - logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state()); + entry.userMetadata(), ActionListener.wrap(snapshotInfo -> { + removeSnapshotFromClusterState(snapshot, snapshotInfo, null); + logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state()); + }, this::onFailure)); } @Override diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 7a1bcefea9d95..41ea153499a05 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -159,10 +159,11 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met } @Override - public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, - int totalShards, List shardFailures, long repositoryStateId, - boolean includeGlobalState, MetaData metaData, Map userMetadata) { - return null; + public void finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, + int totalShards, List shardFailures, long repositoryStateId, + boolean includeGlobalState, MetaData metaData, Map userMetadata, + ActionListener listener) { + listener.onResponse(null); } @Override diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java index 14d4a5ba60b97..9d94fc3ed4368 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java @@ -18,15 +18,16 @@ */ package org.elasticsearch.snapshots.mockstore; +import org.apache.lucene.util.SameThreadExecutorService; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.env.Environment; -import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -34,32 +35,16 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.NoSuchFileException; -import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; -import static org.elasticsearch.env.Environment.PATH_HOME_SETTING; -import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.startsWith; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class MockEventuallyConsistentRepositoryTests extends ESTestCase { - private Environment environment; - - @Override - public void setUp() throws Exception { - super.setUp(); - final Path tempDir = createTempDir(); - final String nodeName = "testNode"; - environment = TestEnvironment.newEnvironment(Settings.builder() - .put(NODE_NAME_SETTING.getKey(), nodeName) - .put(PATH_HOME_SETTING.getKey(), tempDir.resolve(nodeName).toAbsolutePath()) - .put(Environment.PATH_REPO_SETTING.getKey(), tempDir.resolve("repo").toAbsolutePath()) - .build()); - } - public void testReadAfterWriteConsistently() throws IOException { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( @@ -151,27 +136,37 @@ public void testOverwriteShardSnapBlobFails() throws IOException { public void testOverwriteSnapshotInfoBlob() { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); + final ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.executor(ThreadPool.Names.SNAPSHOT)).thenReturn(new SameThreadExecutorService()); try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), mock(ThreadPool.class), blobStoreContext)) { + xContentRegistry(), threadPool, blobStoreContext)) { repository.start(); // We create a snap- blob for snapshot "foo" in the first generation + final PlainActionFuture future = PlainActionFuture.newFuture(); final SnapshotId snapshotId = new SnapshotId("foo", UUIDs.randomBase64UUID()); repository.finalizeSnapshot(snapshotId, Collections.emptyList(), 1L, null, 5, Collections.emptyList(), - -1L, false, MetaData.EMPTY_META_DATA, Collections.emptyMap()); + -1L, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), future); + future.actionGet(); // We try to write another snap- blob for "foo" in the next generation. It fails because the content differs. final AssertionError assertionError = expectThrows(AssertionError.class, - () -> repository.finalizeSnapshot( - snapshotId, Collections.emptyList(), 1L, null, 6, Collections.emptyList(), - 0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap())); + () -> { + final PlainActionFuture fut = PlainActionFuture.newFuture(); + repository.finalizeSnapshot( + snapshotId, Collections.emptyList(), 1L, null, 6, Collections.emptyList(), + 0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), fut); + fut.actionGet(); + }); assertThat(assertionError.getMessage(), equalTo("\nExpected: <6>\n but: was <5>")); // We try to write yet another snap- blob for "foo" in the next generation. // It passes cleanly because the content of the blob except for the timestamps. + final PlainActionFuture future2 = PlainActionFuture.newFuture(); repository.finalizeSnapshot(snapshotId, Collections.emptyList(), 1L, null, 5, Collections.emptyList(), - 0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap()); + 0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), future2); + future2.actionGet(); } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index 417e4e98649af..f1703798f31a2 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -98,10 +98,11 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met } @Override - public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, - int totalShards, List shardFailures, long repositoryStateId, - boolean includeGlobalState, MetaData metaData, Map userMetadata) { - return null; + public void finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, + int totalShards, List shardFailures, long repositoryStateId, + boolean includeGlobalState, MetaData metaData, Map userMetadata, + ActionListener listener) { + listener.onResponse(null); } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 293fc04989f98..e5638a3ba0ed3 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -254,9 +254,9 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } @Override - public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, - List shardFailures, long repositoryStateId, boolean includeGlobalState, - MetaData metaData, Map userMetadata) { + public void finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, + List shardFailures, long repositoryStateId, boolean includeGlobalState, + MetaData metaData, Map userMetadata, ActionListener listener) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java index 280e4a4344575..711abf781894c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java @@ -90,17 +90,17 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met } @Override - public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, - List shardFailures, long repositoryStateId, boolean includeGlobalState, MetaData metaData, - Map userMetadata) { + public void finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, + List shardFailures, long repositoryStateId, boolean includeGlobalState, + MetaData metaData, Map userMetadata, ActionListener listener) { // we process the index metadata at snapshot time. This means if somebody tries to restore // a _source only snapshot with a plain repository it will be just fine since we already set the // required engine, that the index is read-only and the mapping to a default mapping try { - return super.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId, - includeGlobalState, metadataToSnapshot(indices, metaData), userMetadata); + super.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId, + includeGlobalState, metadataToSnapshot(indices, metaData), userMetadata, listener); } catch (IOException ex) { - throw new UncheckedIOException(ex); + listener.onFailure(ex); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index 34acf179c3fbf..4b80bfc5cd268 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -205,10 +205,13 @@ public void testRestoreMinmal() throws IOException { repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(), indexShardSnapshotStatus, future); future.actionGet(); + final PlainActionFuture finFuture = PlainActionFuture.newFuture(); repository.finalizeSnapshot(snapshotId, Collections.singletonList(indexId), indexShardSnapshotStatus.asCopy().getStartTime(), null, 1, Collections.emptyList(), repository.getRepositoryData().getGenId(), true, - MetaData.builder().put(shard.indexSettings().getIndexMetaData(), false).build(), Collections.emptyMap()); + MetaData.builder().put(shard.indexSettings().getIndexMetaData(), false).build(), Collections.emptyMap(), + finFuture); + finFuture.actionGet(); }); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount());