Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for all Rec. to Stop on Node Close #46178

Merged
merged 4 commits into from
Sep 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
Expand All @@ -50,7 +53,7 @@
* The source recovery accepts recovery requests from other peer shards and start the recovery process from this
* source shard to the target shard.
*/
public class PeerRecoverySourceService implements IndexEventListener {
public class PeerRecoverySourceService extends AbstractLifecycleComponent implements IndexEventListener {

private static final Logger logger = LogManager.getLogger(PeerRecoverySourceService.class);

Expand All @@ -74,6 +77,19 @@ public PeerRecoverySourceService(TransportService transportService, IndicesServi
new StartRecoveryTransportRequestHandler());
}

@Override
protected void doStart() {
}

@Override
protected void doStop() {
ongoingRecoveries.awaitEmpty();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this awaitEmpty block a node from closing? Should we just cancel all ongoing recoveries?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dnhatn it was my understanding that we already do cancel all ongoing recoveries when shutting down the indices service (that's what happens in practice at the moment it seems) => I didn't think I need to take any action here.

Does this awaitEmpty block a node from closing?

Yes, but see above. I think this is fine. In the end, if the recoveries aren't properly stopped we will eventually interrupt this wait in the test cluster code and throw an exception so this won't silently cause trouble in the future either.

}

@Override
protected void doClose() {
}

@Override
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard,
Settings indexSettings) {
Expand Down Expand Up @@ -118,9 +134,14 @@ final int numberOfOngoingRecoveries() {
}

final class OngoingRecoveries {

private final Map<IndexShard, ShardRecoveryContext> ongoingRecoveries = new HashMap<>();

@Nullable
private List<ActionListener<Void>> emptyListeners;

synchronized RecoverySourceHandler addNewRecovery(StartRecoveryRequest request, IndexShard shard) {
assert lifecycle.started();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When such an assertion fails, we don't have context about why the assertion failed (only that lifecycle.started() is not true). When we write assertions like this, it is nice to provide more context, for example: assert lifecycle.started() : lifecycle.state() which would display the toString value of lifecycle.state() when the assertion trips, giving us more context about the failure.

final ShardRecoveryContext shardContext = ongoingRecoveries.computeIfAbsent(shard, s -> new ShardRecoveryContext());
RecoverySourceHandler handler = shardContext.addNewRecovery(request, shard);
shard.recoveryStats().incCurrentAsSource();
Expand All @@ -138,6 +159,13 @@ synchronized void remove(IndexShard shard, RecoverySourceHandler handler) {
if (shardRecoveryContext.recoveryHandlers.isEmpty()) {
ongoingRecoveries.remove(shard);
}
if (ongoingRecoveries.isEmpty()) {
if (emptyListeners != null) {
final List<ActionListener<Void>> onEmptyListeners = emptyListeners;
emptyListeners = null;
ActionListener.onResponse(onEmptyListeners, null);
}
}
}

synchronized void cancel(IndexShard shard, String reason) {
Expand All @@ -157,6 +185,22 @@ synchronized void cancel(IndexShard shard, String reason) {
}
}

void awaitEmpty() {
assert lifecycle.stoppedOrClosed();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we be sure that no new recoveries may start after this method has returned?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indices services close before this one so it should be fine. I added an assertion about the state of the peer recovery source service before adding a recovery though to make sure :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly to my earlier comment, which is a general comment about code assertions, it would be nice to have more context here in case this assertion ever fails.

final PlainActionFuture<Void> future;
synchronized (this) {
if (ongoingRecoveries.isEmpty()) {
return;
}
future = new PlainActionFuture<>();
if (emptyListeners == null) {
emptyListeners = new ArrayList<>();
}
emptyListeners.add(future);
}
FutureUtils.get(future);
}

private final class ShardRecoveryContext {
final Set<RecoverySourceHandler> recoveryHandlers = new HashSet<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,13 +402,14 @@ private Releasable acquireStore(Store store) {
store.incRef();
return Releasables.releaseOnce(() -> {
final PlainActionFuture<Void> future = new PlainActionFuture<>();
threadPool.generic().execute(new ActionRunnable<>(future) {
@Override
protected void doRun() {
store.decRef();
listener.onResponse(null);
}
});
assert threadPool.generic().isShutdown() == false;
// TODO: We shouldn't use the generic thread pool here as we already execute this from the generic pool.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@henningandersen Maybe we need to revisit the original solution (in #45409) to avoid forking here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree to that. I think the best path is to make closing the resources async (except the one in the outer exception handler) such that we notify the recovery-response listener after completion of closing resources (we sometimes notify first and then close).

// While practically unlikely at a min pool size of 128 we could technically block the whole pool by waiting on futures
// below and thus make it impossible for the store release to execute which in turn would block the futures forever
threadPool.generic().execute(ActionRunnable.wrap(future, l -> {
store.decRef();
l.onResponse(null);
}));
FutureUtils.get(future);
});
}
Expand Down
3 changes: 3 additions & 0 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ protected Node(
.filter(p -> p instanceof LifecycleComponent)
.map(p -> (LifecycleComponent) p).collect(Collectors.toList());
resourcesToClose.addAll(pluginLifecycleComponents);
resourcesToClose.add(injector.getInstance(PeerRecoverySourceService.class));
this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
client.initialize(injector.getInstance(new Key<Map<ActionType, TransportAction>>() {}), transportService.getTaskManager(),
() -> clusterService.localNode().getId(), transportService.getRemoteClusterService());
Expand Down Expand Up @@ -689,6 +690,7 @@ public Node start() throws NodeValidationException {
assert localNodeFactory.getNode() != null;
assert transportService.getLocalNode().equals(localNodeFactory.getNode())
: "transportService has a different local node than the factory provided";
injector.getInstance(PeerRecoverySourceService.class).start();
final MetaData onDiskMetadata;
// we load the global state here (the persistent part of the cluster state stored on disk) to
// pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
Expand Down Expand Up @@ -834,6 +836,7 @@ public synchronized void close() throws IOException {
toClose.add(injector.getInstance(IndicesService.class));
// close filter/fielddata caches after indices
toClose.add(injector.getInstance(IndicesStore.class));
toClose.add(injector.getInstance(PeerRecoverySourceService.class));
toClose.add(() -> stopWatch.stop().start("cluster"));
toClose.add(injector.getInstance(ClusterService.class));
toClose.add(() -> stopWatch.stop().start("node_connections_service"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public void testDuplicateRecoveries() throws IOException {
StartRecoveryRequest startRecoveryRequest = new StartRecoveryRequest(primary.shardId(), randomAlphaOfLength(10),
getFakeDiscoNode("source"), getFakeDiscoNode("target"), Store.MetadataSnapshot.EMPTY, randomBoolean(), randomLong(),
SequenceNumbers.UNASSIGNED_SEQ_NO);
peerRecoverySourceService.start();
RecoverySourceHandler handler = peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary);
DelayRecoveryException delayRecoveryException = expectThrows(DelayRecoveryException.class,
() -> peerRecoverySourceService.ongoingRecoveries.addNewRecovery(startRecoveryRequest, primary));
Expand Down