diff --git a/docs/reference/modules/discovery/discovery-settings.asciidoc b/docs/reference/modules/discovery/discovery-settings.asciidoc index 7c99c9888606e..87c40a7b37b33 100644 --- a/docs/reference/modules/discovery/discovery-settings.asciidoc +++ b/docs/reference/modules/discovery/discovery-settings.asciidoc @@ -22,8 +22,9 @@ Discovery and cluster formation are affected by the following settings: Specifies whether {es} should form a multiple-node cluster. By default, {es} discovers other nodes when forming a cluster and allows other nodes to join the cluster later. If `discovery.type` is set to `single-node`, {es} forms a - single-node cluster. For more information about when you might use this - setting, see <>. + single-node cluster and suppresses the timeouts set by + `cluster.publish.timeout` and `cluster.join.timeout`. For more information + about when you might use this setting, see <>. `cluster.initial_master_nodes`:: @@ -167,8 +168,8 @@ or may become unstable or intolerant of certain failures. `cluster.join.timeout`:: Sets how long a node will wait after sending a request to join a cluster - before it considers the request to have failed and retries. Defaults to - `60s`. + before it considers the request to have failed and retries, unless + `discovery.type` is set to `single-node`. Defaults to `60s`. `cluster.max_voting_config_exclusions`:: @@ -185,8 +186,8 @@ or may become unstable or intolerant of certain failures. `cluster.publish.timeout`:: Sets how long the master node waits for each cluster state update to be - completely published to all nodes. The default value is `30s`. See - <>. + completely published to all nodes, unless `discovery.type` is set to + `single-node`. The default value is `30s`. See <>. [[no-master-block]]`cluster.no_master_block`:: Specifies which operations are rejected when there is no active master in a diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java index 7271013cb363d..8e7ad6f76ee48 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/ClusterBootstrapService.java @@ -78,7 +78,7 @@ public class ClusterBootstrapService { public ClusterBootstrapService(Settings settings, TransportService transportService, Supplier> discoveredNodesSupplier, BooleanSupplier isBootstrappedSupplier, Consumer votingConfigurationConsumer) { - if (DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE.equals(DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings))) { + if (DiscoveryModule.isSingleNodeDiscovery(settings)) { if (INITIAL_MASTER_NODES_SETTING.exists(settings)) { throw new IllegalArgumentException("setting [" + INITIAL_MASTER_NODES_SETTING.getKey() + "] is not allowed when [" + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey() + "] is set to [" + diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 20b3637680fb9..16e75464caf0f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -164,7 +164,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe this.masterService = masterService; this.allocationService = allocationService; this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators); - this.singleNodeDiscovery = DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE.equals(DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings)); + this.singleNodeDiscovery = DiscoveryModule.isSingleNodeDiscovery(settings); this.electionStrategy = electionStrategy; this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService, this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators, @@ -1217,6 +1217,8 @@ class CoordinatorPublication extends Publication { private final AckListener ackListener; private final ActionListener publishListener; private final PublicationTransportHandler.PublicationContext publicationContext; + + @Nullable // if using single-node discovery private final Scheduler.ScheduledCancellable timeoutHandler; private final Scheduler.Cancellable infoTimeoutHandler; @@ -1260,7 +1262,7 @@ public void onNodeAck(DiscoveryNode node, Exception e) { this.ackListener = ackListener; this.publishListener = publishListener; - this.timeoutHandler = transportService.getThreadPool().schedule(new Runnable() { + this.timeoutHandler = singleNodeDiscovery ? null : transportService.getThreadPool().schedule(new Runnable() { @Override public void run() { synchronized (mutex) { @@ -1328,8 +1330,7 @@ public void onFailure(String source, Exception e) { synchronized (mutex) { removePublicationAndPossiblyBecomeCandidate("clusterApplier#onNewClusterState"); } - timeoutHandler.cancel(); - infoTimeoutHandler.cancel(); + cancelTimeoutHandlers(); ackListener.onNodeAck(getLocalNode(), e); publishListener.onFailure(e); } @@ -1376,8 +1377,7 @@ public void onSuccess(String source) { lagDetector.startLagDetector(publishRequest.getAcceptedState().version()); logIncompleteNodes(Level.WARN); } - timeoutHandler.cancel(); - infoTimeoutHandler.cancel(); + cancelTimeoutHandlers(); ackListener.onNodeAck(getLocalNode(), null); publishListener.onResponse(null); } @@ -1388,8 +1388,7 @@ public void onSuccess(String source) { public void onFailure(Exception e) { assert Thread.holdsLock(mutex) : "Coordinator mutex not held"; removePublicationAndPossiblyBecomeCandidate("Publication.onCompletion(false)"); - timeoutHandler.cancel(); - infoTimeoutHandler.cancel(); + cancelTimeoutHandlers(); final FailedToCommitClusterStateException exception = new FailedToCommitClusterStateException("publication failed", e); ackListener.onNodeAck(getLocalNode(), exception); // other nodes have acked, but not the master. @@ -1398,6 +1397,13 @@ public void onFailure(Exception e) { }, EsExecutors.newDirectExecutorService(), transportService.getThreadPool().getThreadContext()); } + private void cancelTimeoutHandlers() { + if (timeoutHandler != null) { + timeoutHandler.cancel(); + } + infoTimeoutHandler.cancel(); + } + private void handleAssociatedJoin(Join join) { if (join.getTerm() == getCurrentTerm() && missingJoinVoteFrom(join.getSourceNode())) { logger.trace("handling {}", join); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index 87e6ce720ed2b..2e47acf07b215 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -34,12 +34,14 @@ import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.TransportChannel; @@ -83,6 +85,8 @@ public class JoinHelper { private final MasterService masterService; private final TransportService transportService; private final JoinTaskExecutor joinTaskExecutor; + + @Nullable // if using single-node discovery private final TimeValue joinTimeout; private final Set> pendingOutgoingJoins = Collections.synchronizedSet(new HashSet<>()); @@ -95,7 +99,7 @@ public class JoinHelper { Collection> joinValidators, RerouteService rerouteService) { this.masterService = masterService; this.transportService = transportService; - this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings); + this.joinTimeout = DiscoveryModule.isSingleNodeDiscovery(settings) ? null : JOIN_TIMEOUT_SETTING.get(settings); this.joinTaskExecutor = new JoinTaskExecutor(allocationService, logger, rerouteService) { @Override diff --git a/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java b/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java index 843562ef9e619..f44dfdec32ab7 100644 --- a/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java @@ -155,6 +155,10 @@ seedHostsProvider, clusterApplier, joinValidators, new Random(Randomness.get().n logger.info("using discovery type [{}] and seed hosts providers {}", discoveryType, seedProviderNames); } + public static boolean isSingleNodeDiscovery(Settings settings) { + return SINGLE_NODE_DISCOVERY_TYPE.equals(DISCOVERY_TYPE_SETTING.get(settings)); + } + public Discovery getDiscovery() { return discovery; } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 3004dc99ac360..070b1097f04de 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings.Builder; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.discovery.DiscoveryModule; @@ -56,6 +57,7 @@ import java.util.stream.Collectors; import static org.elasticsearch.cluster.coordination.AbstractCoordinatorTestCase.Cluster.DEFAULT_DELAY_VARIABILITY; +import static org.elasticsearch.cluster.coordination.AbstractCoordinatorTestCase.Cluster.EXTREME_DELAY_VARIABILITY; import static org.elasticsearch.cluster.coordination.Coordinator.Mode.CANDIDATE; import static org.elasticsearch.cluster.coordination.Coordinator.PUBLISH_TIMEOUT_SETTING; import static org.elasticsearch.cluster.coordination.ElectionSchedulerFactory.ELECTION_INITIAL_TIMEOUT_SETTING; @@ -78,6 +80,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.Matchers.startsWith; public class CoordinatorTests extends AbstractCoordinatorTestCase { @@ -1050,7 +1053,7 @@ public void testDiscoveryUsesNodesFromLastClusterState() { cluster.stabilise(); partitionedNode.heal(); - cluster.runRandomly(false); + cluster.runRandomly(false, true, EXTREME_DELAY_VARIABILITY); cluster.stabilise(); } } @@ -1131,6 +1134,36 @@ public void testSingleNodeDiscoveryWithQuorum() { } } + public void testSingleNodeDiscoveryStabilisesEvenWhenDisrupted() { + try (Cluster cluster = new Cluster(1, randomBoolean(), Settings.builder() + .put(DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey(), DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE).build())) { + + // A cluster using single-node discovery should not apply any timeouts to joining or cluster state publication. There are no + // other options, so there's no point in failing and retrying from scratch no matter how badly disrupted we are and we may as + // well just wait. + + // larger variability is are good for checking that we don't time out, but smaller variability also tightens up the time bound + // within which we expect to converge, so use a mix of both + final long delayVariabilityMillis = randomLongBetween(DEFAULT_DELAY_VARIABILITY, TimeValue.timeValueMinutes(10).millis()); + if (randomBoolean()) { + cluster.runRandomly(true, false, delayVariabilityMillis); + } else { + cluster.deterministicTaskQueue.setExecutionDelayVariabilityMillis(delayVariabilityMillis); + } + + final ClusterNode clusterNode = cluster.getAnyNode(); + + // cf. DEFAULT_STABILISATION_TIME, but stabilisation is quicker when there's a single node - there's no meaningful fault + // detection and ongoing publications do not time out + cluster.runFor(ELECTION_INITIAL_TIMEOUT_SETTING.get(Settings.EMPTY).millis() + delayVariabilityMillis + + 4 * delayVariabilityMillis // two round trips for pre-voting and voting + + 7 * delayVariabilityMillis, // see definition of DEFAULT_CLUSTER_STATE_UPDATE_DELAY + "stabilising"); + + assertThat(cluster.getAnyLeader(), sameInstance(clusterNode)); + } + } + private static class BrokenCustom extends AbstractDiffable implements ClusterState.Custom { static final String EXCEPTION_MESSAGE = "simulated"; diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java index 5f034dea82de0..5239827f2b171 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -320,10 +320,16 @@ int size() { } void runRandomly() { - runRandomly(true); + runRandomly(true, true, EXTREME_DELAY_VARIABILITY); } - void runRandomly(boolean allowReboots) { + /** + * @param allowReboots whether to randomly reboot the nodes during the process, losing all transient state. Usually true. + * @param coolDown whether to set the delay variability back to {@link Cluster#DEFAULT_DELAY_VARIABILITY} and drain all + * disrupted events from the queue before returning. Usually true. + * @param delayVariability the delay variability to use while running randomly. Usually {@link Cluster#EXTREME_DELAY_VARIABILITY}. + */ + void runRandomly(boolean allowReboots, boolean coolDown, long delayVariability) { // TODO supporting (preserving?) existing disruptions needs implementing if needed, for now we just forbid it assertThat("may reconnect disconnected nodes, probably unexpected", disconnectedNodes, empty()); @@ -336,9 +342,9 @@ void runRandomly(boolean allowReboots) { final int randomSteps = scaledRandomIntBetween(10, 10000); final int keyRange = randomSteps / 50; // for randomized writes and reads - logger.info("--> start of safety phase of at least [{}] steps", randomSteps); + logger.info("--> start of safety phase of at least [{}] steps with delay variability of [{}ms]", randomSteps, delayVariability); - deterministicTaskQueue.setExecutionDelayVariabilityMillis(EXTREME_DELAY_VARIABILITY); + deterministicTaskQueue.setExecutionDelayVariabilityMillis(delayVariability); disruptStorage = true; int step = 0; long finishTime = -1; @@ -349,8 +355,13 @@ void runRandomly(boolean allowReboots) { if (randomSteps <= step && finishTime == -1) { finishTime = deterministicTaskQueue.getLatestDeferredExecutionTime(); - deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY); - logger.debug("----> [runRandomly {}] reducing delay variability and running until [{}ms]", step, finishTime); + if (coolDown) { + deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY); + logger.debug("----> [runRandomly {}] reducing delay variability and running until [{}ms]", step, finishTime); + } else { + logger.debug("----> [runRandomly {}] running until [{}ms] with delay variability of [{}ms]", step, finishTime, + deterministicTaskQueue.getExecutionDelayVariabilityMillis()); + } } try {