Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid parallel reroutes in DiskThresholdMonitor #43381

Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,13 @@ public void onMaster() {
logger.trace("I have been elected master, scheduling a ClusterInfoUpdateJob");
}

// Submit a job that will start after DEFAULT_STARTING_INTERVAL, and reschedule itself after running
// Submit a job that will reschedule itself after running
threadPool.scheduleUnlessShuttingDown(updateFrequency, executorName(), new SubmitReschedulingClusterInfoUpdatedJob());

try {
if (clusterService.state().getNodes().getDataNodes().size() > 1) {
// Submit an info update job to be run immediately
threadPool.executor(executorName()).execute(() -> maybeRefresh());
threadPool.executor(executorName()).execute(this::maybeRefresh);
}
} catch (EsRejectedExecutionException ex) {
logger.debug("Couldn't schedule cluster info update task - node might be shutting down", ex);
Expand Down Expand Up @@ -173,7 +173,7 @@ public void clusterChanged(ClusterChangedEvent event) {
if (logger.isDebugEnabled()) {
logger.debug("data node was added, retrieving new cluster info");
}
threadPool.executor(executorName()).execute(() -> maybeRefresh());
threadPool.executor(executorName()).execute(this::maybeRefresh);
}

if (this.isMaster && event.nodesRemoved()) {
Expand Down Expand Up @@ -316,7 +316,7 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) {
ShardStats[] stats = indicesStatsResponse.getShards();
ImmutableOpenMap.Builder<String, Long> newShardSizes = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath = ImmutableOpenMap.builder();
buildShardLevelInfo(logger, stats, newShardSizes, newShardRoutingToDataPath, clusterService.state());
buildShardLevelInfo(logger, stats, newShardSizes, newShardRoutingToDataPath);
shardSizes = newShardSizes.build();
shardRoutingToDataPath = newShardRoutingToDataPath.build();
}
Expand Down Expand Up @@ -365,7 +365,7 @@ public void onFailure(Exception e) {
}

static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpenMap.Builder<String, Long> newShardSizes,
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath, ClusterState state) {
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath) {
for (ShardStats s : stats) {
newShardRoutingToDataPath.put(s.getShardRouting(), s.getDataPath());
long size = s.getStats().getStore().sizeInBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,9 @@ public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
if (logger.isTraceEnabled()) {
logger.trace("{}, scheduling a reroute", reason);
}
routingService.reroute(reason);
routingService.reroute(reason, ActionListener.wrap(
r -> logger.trace("{}, reroute completed", reason),
e -> logger.debug(new ParameterizedMessage("{}, reroute failed", reason), e)));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -146,13 +145,14 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
* @param onJoinValidators A collection of join validators to restrict which nodes may join the cluster.
* @param reroute A callback to call when the membership of the cluster has changed, to recalculate the assignment of shards. In
* production code this calls {@link org.elasticsearch.cluster.routing.RoutingService#reroute(String)}.
* production code this calls
* {@link org.elasticsearch.cluster.routing.RoutingService#reroute(String, ActionListener)}.
*/
public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSettings, TransportService transportService,
NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService,
Supplier<CoordinationState.PersistedState> persistedStateSupplier, SeedHostsProvider seedHostsProvider,
ClusterApplier clusterApplier, Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, Random random,
Consumer<String> reroute, ElectionStrategy electionStrategy) {
BiConsumer<String, ActionListener<Void>> reroute, ElectionStrategy electionStrategy) {
this.settings = settings;
this.transportService = transportService;
this.masterService = masterService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand Down Expand Up @@ -92,7 +91,7 @@ public class JoinHelper {
JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, Consumer<String> reroute) {
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, BiConsumer<String, ActionListener<Void>> reroute) {
this.masterService = masterService;
this.transportService = transportService;
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.NotMasterException;
Expand All @@ -36,7 +37,6 @@
import java.util.Collections;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;

Expand All @@ -45,7 +45,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
private final AllocationService allocationService;

private final Logger logger;
private final Consumer<String> reroute;
private final BiConsumer<String, ActionListener<Void>> reroute;

public static class Task {

Expand Down Expand Up @@ -82,7 +82,7 @@ public boolean isFinishElectionTask() {
private static final String FINISH_ELECTION_TASK_REASON = "_FINISH_ELECTION_";
}

public JoinTaskExecutor(AllocationService allocationService, Logger logger, Consumer<String> reroute) {
public JoinTaskExecutor(AllocationService allocationService, Logger logger, BiConsumer<String, ActionListener<Void>> reroute) {
this.allocationService = allocationService;
this.logger = logger;
this.reroute = reroute;
Expand Down Expand Up @@ -149,7 +149,10 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
results.success(joinTask);
}
if (nodesChanged) {
reroute.accept("post-join reroute");
reroute.accept("post-join reroute", ActionListener.wrap(
r -> logger.trace("post-join reroute completed"),
e -> logger.debug("post-join reroute failed", e)));

return results.build(allocationService.adaptAutoExpandReplicas(newState.nodes(nodesBuilder).build()));
} else {
// we must return a new cluster state instance to force publishing. This is important
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,20 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainListenableActionFuture;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;

/**
* A {@link RoutingService} listens to clusters state. When this service
Expand All @@ -51,14 +55,16 @@ public class RoutingService extends AbstractLifecycleComponent {
private static final String CLUSTER_UPDATE_TASK_SOURCE = "cluster_reroute";

private final ClusterService clusterService;
private final AllocationService allocationService;
private final BiFunction<ClusterState, String, ClusterState> reroute;

private AtomicBoolean rerouting = new AtomicBoolean();
private final Object mutex = new Object();
@Nullable // null if no reroute is currently pending
private PlainListenableActionFuture<Void> pendingRerouteListeners;

@Inject
public RoutingService(ClusterService clusterService, AllocationService allocationService) {
public RoutingService(ClusterService clusterService, BiFunction<ClusterState, String, ClusterState> reroute) {
this.clusterService = clusterService;
this.allocationService = allocationService;
this.reroute = reroute;
}

@Override
Expand All @@ -76,47 +82,78 @@ protected void doClose() {
/**
* Initiates a reroute.
*/
public final void reroute(String reason) {
try {
if (lifecycle.stopped()) {
return;
}
if (rerouting.compareAndSet(false, true) == false) {
logger.trace("already has pending reroute, ignoring {}", reason);
public final void reroute(String reason, ActionListener<Void> listener) {
if (lifecycle.started() == false) {
listener.onFailure(new IllegalStateException(
"rejecting delayed reroute [" + reason + "] in state [" + lifecycleState() + "]"));
return;
}
final PlainListenableActionFuture<Void> currentListeners;
synchronized (mutex) {
if (pendingRerouteListeners != null) {
logger.trace("already has pending reroute, adding [{}] to batch", reason);
pendingRerouteListeners.addListener(listener);
return;
}
logger.trace("rerouting {}", reason);
currentListeners = PlainListenableActionFuture.newListenableFuture();
currentListeners.addListener(listener);
pendingRerouteListeners = currentListeners;
}
logger.trace("rerouting [{}]", reason);
try {
clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")",
new ClusterStateUpdateTask(Priority.HIGH) {
@Override
public ClusterState execute(ClusterState currentState) {
rerouting.set(false);
return allocationService.reroute(currentState, reason);
synchronized (mutex) {
assert pendingRerouteListeners == currentListeners;
pendingRerouteListeners = null;
}
return reroute.apply(currentState, reason);
}

@Override
public void onNoLongerMaster(String source) {
rerouting.set(false);
// no biggie
synchronized (mutex) {
if (pendingRerouteListeners == currentListeners) {
pendingRerouteListeners = null;
}
}
currentListeners.onFailure(new NotMasterException("delayed reroute [" + reason + "] cancelled"));
// no big deal, the new master will reroute again
}

@Override
public void onFailure(String source, Exception e) {
rerouting.set(false);
ClusterState state = clusterService.state();
synchronized (mutex) {
if (pendingRerouteListeners == currentListeners) {
pendingRerouteListeners = null;
}
}
final ClusterState state = clusterService.state();
if (logger.isTraceEnabled()) {
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state:\n{}",
source, state), e);
} else {
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state version [{}]",
source, state.version()), e);
}
currentListeners.onFailure(new ElasticsearchException("delayed reroute [" + reason + "] failed", e));
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
currentListeners.onResponse(null);
}
});
} catch (Exception e) {
rerouting.set(false);
synchronized (mutex) {
assert pendingRerouteListeners == currentListeners;
pendingRerouteListeners = null;
}
ClusterState state = clusterService.state();
logger.warn(() -> new ParameterizedMessage("failed to reroute routing table, current state:\n{}", state), e);
currentListeners.onFailure(new ElasticsearchException("delayed reroute [" + reason + "] could not be submitted", e));
}
}
}
Loading