diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java index ef47b153f5354..644a8e2eb5cb5 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java @@ -24,10 +24,13 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ChannelActionListener; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; @@ -50,7 +53,7 @@ * The source recovery accepts recovery requests from other peer shards and start the recovery process from this * source shard to the target shard. */ -public class PeerRecoverySourceService implements IndexEventListener { +public class PeerRecoverySourceService extends AbstractLifecycleComponent implements IndexEventListener { private static final Logger logger = LogManager.getLogger(PeerRecoverySourceService.class); @@ -74,6 +77,19 @@ public PeerRecoverySourceService(TransportService transportService, IndicesServi new StartRecoveryTransportRequestHandler()); } + @Override + protected void doStart() { + } + + @Override + protected void doStop() { + ongoingRecoveries.awaitEmpty(); + } + + @Override + protected void doClose() { + } + @Override public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { @@ -118,9 +134,14 @@ final int numberOfOngoingRecoveries() { } final class OngoingRecoveries { + private final Map ongoingRecoveries = new HashMap<>(); + @Nullable + private List> emptyListeners; + synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) { + assert lifecycle.started(); final ShardRecoveryContext shardContext = ongoingRecoveries.computeIfAbsent(shard, s -> new ShardRecoveryContext()); RecoverySourceHandler handler = shardContext.addNewRecovery(request, shard); shard.recoveryStats().incCurrentAsSource(); @@ -138,6 +159,13 @@ synchronized void remove(IndexShard shard, RecoverySourceHandler handler) { if (shardRecoveryContext.recoveryHandlers.isEmpty()) { ongoingRecoveries.remove(shard); } + if (ongoingRecoveries.isEmpty()) { + if (emptyListeners != null) { + final List> onEmptyListeners = emptyListeners; + emptyListeners = null; + ActionListener.onResponse(onEmptyListeners, null); + } + } } synchronized void cancel(IndexShard shard, String reason) { @@ -157,6 +185,22 @@ synchronized void cancel(IndexShard shard, String reason) { } } + void awaitEmpty() { + assert lifecycle.stoppedOrClosed(); + final PlainActionFuture future; + synchronized (this) { + if (ongoingRecoveries.isEmpty()) { + return; + } + future = new PlainActionFuture<>(); + if (emptyListeners == null) { + emptyListeners = new ArrayList<>(); + } + emptyListeners.add(future); + } + FutureUtils.get(future); + } + private final class ShardRecoveryContext { final Set recoveryHandlers = new HashSet<>(); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index e5ab1d7890eb4..8324dd023b703 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -402,13 +402,14 @@ private Releasable acquireStore(Store store) { store.incRef(); return Releasables.releaseOnce(() -> { final PlainActionFuture future = new PlainActionFuture<>(); - threadPool.generic().execute(new ActionRunnable<>(future) { - @Override - protected void doRun() { - store.decRef(); - listener.onResponse(null); - } - }); + assert threadPool.generic().isShutdown() == false; + // TODO: We shouldn't use the generic thread pool here as we already execute this from the generic pool. + // While practically unlikely at a min pool size of 128 we could technically block the whole pool by waiting on futures + // below and thus make it impossible for the store release to execute which in turn would block the futures forever + threadPool.generic().execute(ActionRunnable.wrap(future, l -> { + store.decRef(); + l.onResponse(null); + })); FutureUtils.get(future); }); } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index cb7888eddee96..4f892948066db 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -593,6 +593,7 @@ protected Node( .filter(p -> p instanceof LifecycleComponent) .map(p -> (LifecycleComponent) p).collect(Collectors.toList()); resourcesToClose.addAll(pluginLifecycleComponents); + resourcesToClose.add(injector.getInstance(PeerRecoverySourceService.class)); this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents); client.initialize(injector.getInstance(new Key>() {}), transportService.getTaskManager(), () -> clusterService.localNode().getId(), transportService.getRemoteClusterService()); @@ -689,6 +690,7 @@ public Node start() throws NodeValidationException { assert localNodeFactory.getNode() != null; assert transportService.getLocalNode().equals(localNodeFactory.getNode()) : "transportService has a different local node than the factory provided"; + injector.getInstance(PeerRecoverySourceService.class).start(); final MetaData onDiskMetadata; // we load the global state here (the persistent part of the cluster state stored on disk) to // pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state. @@ -834,6 +836,7 @@ public synchronized void close() throws IOException { toClose.add(injector.getInstance(IndicesService.class)); // close filter/fielddata caches after indices toClose.add(injector.getInstance(IndicesStore.class)); + toClose.add(injector.getInstance(PeerRecoverySourceService.class)); toClose.add(() -> stopWatch.stop().start("cluster")); toClose.add(injector.getInstance(ClusterService.class)); toClose.add(() -> stopWatch.stop().start("node_connections_service")); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java index 72eb2baeca942..491c3974e5bd3 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoverySourceServiceTests.java @@ -43,6 +43,7 @@ public void testDuplicateRecoveries() throws IOException { StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(primary.shardId(), randomAlphaOfLength(10), getFakeDiscoNode("source"), getFakeDiscoNode("target"), Store.MetadataSnapshot.EMPTY, randomBoolean(), randomLong(), SequenceNumbers.UNASSIGNED_SEQ_NO); + peerRecoverySourceService.start(); RecoverySourceHandler handler = peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary); DelayRecoveryException delayRecoveryException = expectThrows(DelayRecoveryException.class, () -> peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary));