Skip to content

Commit

Permalink
Ignore timeouts with single-node discovery (#52159)
Browse files Browse the repository at this point in the history
Today we use `cluster.join.timeout` to prevent nodes from waiting indefinitely
if joining a faulty master that is too slow to respond, and
`cluster.publish.timeout` to allow a faulty master to detect that it is unable
to publish its cluster state updates in a timely fashion. If these timeouts
occur then the node restarts the discovery process in an attempt to find a
healthier master.

In the special case of `discovery.type: single-node` there is no point in
looking for another healthier master since the single node in the cluster is
all we've got. This commit suppresses these timeouts and instead lets the node
wait for joins and publications to succeed no matter how long this might take.
  • Loading branch information
DaveCTurner authored Feb 11, 2020
1 parent 8f7afbd commit a304d9a
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 23 deletions.
13 changes: 7 additions & 6 deletions docs/reference/modules/discovery/discovery-settings.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ Discovery and cluster formation are affected by the following settings:
Specifies whether {es} should form a multiple-node cluster. By default, {es}
discovers other nodes when forming a cluster and allows other nodes to join
the cluster later. If `discovery.type` is set to `single-node`, {es} forms a
single-node cluster. For more information about when you might use this
setting, see <<single-node-discovery>>.
single-node cluster and suppresses the timeouts set by
`cluster.publish.timeout` and `cluster.join.timeout`. For more information
about when you might use this setting, see <<single-node-discovery>>.

`cluster.initial_master_nodes`::

Expand Down Expand Up @@ -167,8 +168,8 @@ or may become unstable or intolerant of certain failures.
`cluster.join.timeout`::

Sets how long a node will wait after sending a request to join a cluster
before it considers the request to have failed and retries. Defaults to
`60s`.
before it considers the request to have failed and retries, unless
`discovery.type` is set to `single-node`. Defaults to `60s`.

`cluster.max_voting_config_exclusions`::

Expand All @@ -185,8 +186,8 @@ or may become unstable or intolerant of certain failures.
`cluster.publish.timeout`::

Sets how long the master node waits for each cluster state update to be
completely published to all nodes. The default value is `30s`. See
<<cluster-state-publishing>>.
completely published to all nodes, unless `discovery.type` is set to
`single-node`. The default value is `30s`. See <<cluster-state-publishing>>.

[[no-master-block]]`cluster.no_master_block`::
Specifies which operations are rejected when there is no active master in a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class ClusterBootstrapService {
public ClusterBootstrapService(Settings settings, TransportService transportService,
Supplier<Iterable<DiscoveryNode>> discoveredNodesSupplier, BooleanSupplier isBootstrappedSupplier,
Consumer<VotingConfiguration> votingConfigurationConsumer) {
if (DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE.equals(DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings))) {
if (DiscoveryModule.isSingleNodeDiscovery(settings)) {
if (INITIAL_MASTER_NODES_SETTING.exists(settings)) {
throw new IllegalArgumentException("setting [" + INITIAL_MASTER_NODES_SETTING.getKey() +
"] is not allowed when [" + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey() + "] is set to [" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
this.masterService = masterService;
this.allocationService = allocationService;
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
this.singleNodeDiscovery = DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE.equals(DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings));
this.singleNodeDiscovery = DiscoveryModule.isSingleNodeDiscovery(settings);
this.electionStrategy = electionStrategy;
this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators,
Expand Down Expand Up @@ -1217,6 +1217,8 @@ class CoordinatorPublication extends Publication {
private final AckListener ackListener;
private final ActionListener<Void> publishListener;
private final PublicationTransportHandler.PublicationContext publicationContext;

@Nullable // if using single-node discovery
private final Scheduler.ScheduledCancellable timeoutHandler;
private final Scheduler.Cancellable infoTimeoutHandler;

Expand Down Expand Up @@ -1260,7 +1262,7 @@ public void onNodeAck(DiscoveryNode node, Exception e) {
this.ackListener = ackListener;
this.publishListener = publishListener;

this.timeoutHandler = transportService.getThreadPool().schedule(new Runnable() {
this.timeoutHandler = singleNodeDiscovery ? null : transportService.getThreadPool().schedule(new Runnable() {
@Override
public void run() {
synchronized (mutex) {
Expand Down Expand Up @@ -1328,8 +1330,7 @@ public void onFailure(String source, Exception e) {
synchronized (mutex) {
removePublicationAndPossiblyBecomeCandidate("clusterApplier#onNewClusterState");
}
timeoutHandler.cancel();
infoTimeoutHandler.cancel();
cancelTimeoutHandlers();
ackListener.onNodeAck(getLocalNode(), e);
publishListener.onFailure(e);
}
Expand Down Expand Up @@ -1376,8 +1377,7 @@ public void onSuccess(String source) {
lagDetector.startLagDetector(publishRequest.getAcceptedState().version());
logIncompleteNodes(Level.WARN);
}
timeoutHandler.cancel();
infoTimeoutHandler.cancel();
cancelTimeoutHandlers();
ackListener.onNodeAck(getLocalNode(), null);
publishListener.onResponse(null);
}
Expand All @@ -1388,8 +1388,7 @@ public void onSuccess(String source) {
public void onFailure(Exception e) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
removePublicationAndPossiblyBecomeCandidate("Publication.onCompletion(false)");
timeoutHandler.cancel();
infoTimeoutHandler.cancel();
cancelTimeoutHandlers();

final FailedToCommitClusterStateException exception = new FailedToCommitClusterStateException("publication failed", e);
ackListener.onNodeAck(getLocalNode(), exception); // other nodes have acked, but not the master.
Expand All @@ -1398,6 +1397,13 @@ public void onFailure(Exception e) {
}, EsExecutors.newDirectExecutorService(), transportService.getThreadPool().getThreadContext());
}

private void cancelTimeoutHandlers() {
if (timeoutHandler != null) {
timeoutHandler.cancel();
}
infoTimeoutHandler.cancel();
}

private void handleAssociatedJoin(Join join) {
if (join.getTerm() == getCurrentTerm() && missingJoinVoteFrom(join.getSourceNode())) {
logger.trace("handling {}", join);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportChannel;
Expand Down Expand Up @@ -83,6 +85,8 @@ public class JoinHelper {
private final MasterService masterService;
private final TransportService transportService;
private final JoinTaskExecutor joinTaskExecutor;

@Nullable // if using single-node discovery
private final TimeValue joinTimeout;

private final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = Collections.synchronizedSet(new HashSet<>());
Expand All @@ -95,7 +99,7 @@ public class JoinHelper {
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, RerouteService rerouteService) {
this.masterService = masterService;
this.transportService = transportService;
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
this.joinTimeout = DiscoveryModule.isSingleNodeDiscovery(settings) ? null : JOIN_TIMEOUT_SETTING.get(settings);
this.joinTaskExecutor = new JoinTaskExecutor(allocationService, logger, rerouteService) {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ seedHostsProvider, clusterApplier, joinValidators, new Random(Randomness.get().n
logger.info("using discovery type [{}] and seed hosts providers {}", discoveryType, seedProviderNames);
}

public static boolean isSingleNodeDiscovery(Settings settings) {
return SINGLE_NODE_DISCOVERY_TYPE.equals(DISCOVERY_TYPE_SETTING.get(settings));
}

public Discovery getDiscovery() {
return discovery;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.Settings.Builder;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.discovery.DiscoveryModule;
Expand All @@ -56,6 +57,7 @@
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.coordination.AbstractCoordinatorTestCase.Cluster.DEFAULT_DELAY_VARIABILITY;
import static org.elasticsearch.cluster.coordination.AbstractCoordinatorTestCase.Cluster.EXTREME_DELAY_VARIABILITY;
import static org.elasticsearch.cluster.coordination.Coordinator.Mode.CANDIDATE;
import static org.elasticsearch.cluster.coordination.Coordinator.PUBLISH_TIMEOUT_SETTING;
import static org.elasticsearch.cluster.coordination.ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING;
Expand All @@ -78,6 +80,7 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;
import static org.hamcrest.Matchers.startsWith;

public class CoordinatorTests extends AbstractCoordinatorTestCase {
Expand Down Expand Up @@ -1050,7 +1053,7 @@ public void testDiscoveryUsesNodesFromLastClusterState() {
cluster.stabilise();

partitionedNode.heal();
cluster.runRandomly(false);
cluster.runRandomly(false, true, EXTREME_DELAY_VARIABILITY);
cluster.stabilise();
}
}
Expand Down Expand Up @@ -1131,6 +1134,36 @@ public void testSingleNodeDiscoveryWithQuorum() {
}
}

public void testSingleNodeDiscoveryStabilisesEvenWhenDisrupted() {
try (Cluster cluster = new Cluster(1, randomBoolean(), Settings.builder()
.put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE).build())) {

// A cluster using single-node discovery should not apply any timeouts to joining or cluster state publication. There are no
// other options, so there's no point in failing and retrying from scratch no matter how badly disrupted we are and we may as
// well just wait.

// larger variability is are good for checking that we don't time out, but smaller variability also tightens up the time bound
// within which we expect to converge, so use a mix of both
final long delayVariabilityMillis = randomLongBetween(DEFAULT_DELAY_VARIABILITY, TimeValue.timeValueMinutes(10).millis());
if (randomBoolean()) {
cluster.runRandomly(true, false, delayVariabilityMillis);
} else {
cluster.deterministicTaskQueue.setExecutionDelayVariabilityMillis(delayVariabilityMillis);
}

final ClusterNode clusterNode = cluster.getAnyNode();

// cf. DEFAULT_STABILISATION_TIME, but stabilisation is quicker when there's a single node - there's no meaningful fault
// detection and ongoing publications do not time out
cluster.runFor(ELECTION_INITIAL_TIMEOUT_SETTING.get(Settings.EMPTY).millis() + delayVariabilityMillis
+ 4 * delayVariabilityMillis // two round trips for pre-voting and voting
+ 7 * delayVariabilityMillis, // see definition of DEFAULT_CLUSTER_STATE_UPDATE_DELAY
"stabilising");

assertThat(cluster.getAnyLeader(), sameInstance(clusterNode));
}
}

private static class BrokenCustom extends AbstractDiffable<ClusterState.Custom> implements ClusterState.Custom {

static final String EXCEPTION_MESSAGE = "simulated";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,16 @@ int size() {
}

void runRandomly() {
runRandomly(true);
runRandomly(true, true, EXTREME_DELAY_VARIABILITY);
}

void runRandomly(boolean allowReboots) {
/**
* @param allowReboots whether to randomly reboot the nodes during the process, losing all transient state. Usually true.
* @param coolDown whether to set the delay variability back to {@link Cluster#DEFAULT_DELAY_VARIABILITY} and drain all
* disrupted events from the queue before returning. Usually true.
* @param delayVariability the delay variability to use while running randomly. Usually {@link Cluster#EXTREME_DELAY_VARIABILITY}.
*/
void runRandomly(boolean allowReboots, boolean coolDown, long delayVariability) {

// TODO supporting (preserving?) existing disruptions needs implementing if needed, for now we just forbid it
assertThat("may reconnect disconnected nodes, probably unexpected", disconnectedNodes, empty());
Expand All @@ -336,9 +342,9 @@ void runRandomly(boolean allowReboots) {

final int randomSteps = scaledRandomIntBetween(10, 10000);
final int keyRange = randomSteps / 50; // for randomized writes and reads
logger.info("--> start of safety phase of at least [{}] steps", randomSteps);
logger.info("--> start of safety phase of at least [{}] steps with delay variability of [{}ms]", randomSteps, delayVariability);

deterministicTaskQueue.setExecutionDelayVariabilityMillis(EXTREME_DELAY_VARIABILITY);
deterministicTaskQueue.setExecutionDelayVariabilityMillis(delayVariability);
disruptStorage = true;
int step = 0;
long finishTime = -1;
Expand All @@ -349,8 +355,13 @@ void runRandomly(boolean allowReboots) {

if (randomSteps <= step && finishTime == -1) {
finishTime = deterministicTaskQueue.getLatestDeferredExecutionTime();
deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY);
logger.debug("----> [runRandomly {}] reducing delay variability and running until [{}ms]", step, finishTime);
if (coolDown) {
deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY);
logger.debug("----> [runRandomly {}] reducing delay variability and running until [{}ms]", step, finishTime);
} else {
logger.debug("----> [runRandomly {}] running until [{}ms] with delay variability of [{}ms]", step, finishTime,
deterministicTaskQueue.getExecutionDelayVariabilityMillis());
}
}

try {
Expand Down

0 comments on commit a304d9a

Please sign in to comment.