diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 6192394897bc7..3562f90786029 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -187,6 +187,11 @@ public void cloneShardSnapshot( in.cloneShardSnapshot(source, target, shardId, shardGeneration, listener); } + @Override + public void awaitIdle() { + in.awaitIdle(); + } + @Override public Lifecycle.State lifecycleState() { return in.lifecycleState(); diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index 737154cf3f9ea..35234e4a3035b 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -771,5 +771,8 @@ protected void doClose() throws IOException { repos.addAll(internalRepositories.values()); repos.addAll(repositories.values()); IOUtils.close(repos); + for (Repository repo : repos) { + repo.awaitIdle(); + } } } diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index ef3c70e19e24d..58cdab9767b78 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -335,6 +335,15 @@ default Map adaptUserMetadata(Map userMetadata) return userMetadata; } + /** + * Block until all in-flight operations for this repository have completed. Must only be called after this instance has been closed + * by a call to stop {@link #close()}. + * Waiting for ongoing operations should be implemented here instead of in {@link #stop()} or {@link #close()} hooks of this interface + * as these are expected to be called on the cluster state applier thread (which must not block) if a repository is removed from the + * cluster. This method is intended to be called on node shutdown instead as a means to ensure no repository operations are leaked. + */ + void awaitIdle(); + static boolean assertSnapshotMetaThread() { final String threadName = Thread.currentThread().getName(); assert threadName.contains('[' + ThreadPool.Names.SNAPSHOT_META + ']') || threadName.startsWith("TEST-") diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 01775d98dfaf0..a3ab4bd255ee8 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -30,6 +30,7 @@ import org.elasticsearch.action.StepListener; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.ListenableActionFuture; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; @@ -66,6 +67,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -428,6 +430,30 @@ protected void doClose() { } } + // listeners to invoke when a restore completes and there are no more restores running + @Nullable + private List> emptyListeners; + + // Set of shard ids that this repository is currently restoring + private final Set ongoingRestores = new HashSet<>(); + + @Override + public void awaitIdle() { + assert lifecycle.stoppedOrClosed(); + final PlainActionFuture future; + synchronized (ongoingRestores) { + if (ongoingRestores.isEmpty()) { + return; + } + future = new PlainActionFuture<>(); + if (emptyListeners == null) { + emptyListeners = new ArrayList<>(); + } + emptyListeners.add(future); + } + FutureUtils.get(future); + } + @Override public void executeConsistentStateUpdate( Function createUpdateTask, @@ -2907,7 +2933,30 @@ public void restoreShard( ); final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); final BlobContainer container = shardContainer(indexId, snapshotShardId); - executor.execute(ActionRunnable.wrap(restoreListener, l -> { + synchronized (ongoingRestores) { + if (store.isClosing()) { + restoreListener.onFailure(new AlreadyClosedException("store is closing")); + return; + } + if (lifecycle.started() == false) { + restoreListener.onFailure(new AlreadyClosedException("repository [" + metadata.name() + "] closed")); + return; + } + final boolean added = ongoingRestores.add(shardId); + assert added : "add restore for [" + shardId + "] that already has an existing restore"; + } + executor.execute(ActionRunnable.wrap(ActionListener.runAfter(restoreListener, () -> { + final List> onEmptyListeners; + synchronized (ongoingRestores) { + if (ongoingRestores.remove(shardId) && ongoingRestores.isEmpty() && emptyListeners != null) { + onEmptyListeners = emptyListeners; + emptyListeners = null; + } else { + return; + } + } + ActionListener.onResponse(onEmptyListeners, null); + }), l -> { final BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId); final SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles(), null); new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState) { @@ -3013,6 +3062,9 @@ void ensureNotClosing(final Store store) throws AlreadyClosedException { if (store.isClosing()) { throw new AlreadyClosedException("store is closing"); } + if (lifecycle.started() == false) { + throw new AlreadyClosedException("repository [" + metadata.name() + "] closed"); + } } }.restore(snapshotFiles, store, l); diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index cf4533dd4a527..6a22e7a00354d 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -322,6 +322,9 @@ public void cloneShardSnapshot( } + @Override + public void awaitIdle() {} + @Override public Lifecycle.State lifecycleState() { return null; diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index c9711b64db94f..55f6ca347dae8 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -153,6 +153,10 @@ public void verify(String verificationToken, DiscoveryNode localNode) { public void updateState(final ClusterState state) { } + @Override + public void awaitIdle() { + } + @Override public void executeConsistentStateUpdate(Function createUpdateTask, String source, Consumer onFailure) { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index dae5e1e2e90f8..0dac80cafbcdf 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -506,6 +506,10 @@ public void cloneShardSnapshot(SnapshotId source, SnapshotId target, RepositoryS throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } + @Override + public void awaitIdle() { + } + private void updateMappings(Client leaderClient, Index leaderIndex, long leaderMappingVersion, Client followerClient, Index followerIndex) { final PlainActionFuture indexMetadataFuture = new PlainActionFuture<>();