diff --git a/server/src/main/java/org/elasticsearch/discovery/zen2/Legislator.java b/server/src/main/java/org/elasticsearch/discovery/zen2/Legislator.java index bcf01da78ea83..8ca88b4ceecf8 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen2/Legislator.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen2/Legislator.java @@ -70,6 +70,7 @@ import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; +import java.util.function.UnaryOperator; import java.util.stream.Collectors; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; @@ -311,6 +312,9 @@ public void handleClientValue(ClusterState clusterState) { if (mode != Mode.LEADER) { throw new ConsensusMessageRejectedException("handleClientValue: not currently leading, so cannot handle client value."); } + + assert localNode.equals(clusterState.getNodes().get(localNode.getId())) : localNode + " should be in published " + clusterState; + PublishRequest publishRequest = consensusState.handleClientValue(clusterState); publish(publishRequest); } @@ -1421,6 +1425,6 @@ private void onTimeout() { } public interface MasterService { - void submitTask(String reason, Function<ClusterState, ClusterState> runnable); + void submitTask(String reason, UnaryOperator<ClusterState> runnable); } } diff --git a/server/src/test/java/org/elasticsearch/discovery/zen2/LegislatorTests.java b/server/src/test/java/org/elasticsearch/discovery/zen2/LegislatorTests.java index 434f53a91163b..8efc98b705c16 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen2/LegislatorTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/zen2/LegislatorTests.java @@ -67,6 +67,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; +import java.util.function.UnaryOperator; import java.util.stream.Collectors; import static org.elasticsearch.discovery.zen2.Legislator.CONSENSUS_HEARTBEAT_DELAY_SETTING; @@ -398,6 +399,7 @@ class Cluster { Map<Long, ClusterState> committedStatesByVersion = new HashMap<>(); private static final long DEFAULT_DELAY_VARIABILITY = 100L; private static final long RANDOM_MODE_DELAY_VARIABILITY = 10000L; + private long masterServicesTaskId = 0L; // How long to wait? The worst case is that a leader just committed a value to all the other nodes, and then // dropped off the network, which would mean that all the other nodes must detect its failure. It takes @@ -805,6 +807,7 @@ class ClusterNode { private final int index; private final MockTransport transport; private final FutureExecutor futureExecutor; + private final List<PendingTask> pendingTasks = new ArrayList<>(); PersistedState persistedState; Legislator legislator; @@ -822,22 +825,40 @@ private DiscoveryNode createDiscoveryNode() { return new DiscoveryNode("node" + this.index, buildNewFakeTransportAddress(), Version.CURRENT); } - private void sendMasterServiceTask(String reason, Function<ClusterState, ClusterState> runnable) { - futureExecutor.schedule(TimeValue.timeValueMillis(0L), "sendMasterServiceTask:" + reason, () -> { - try { - if (legislator.getMode() == Legislator.Mode.LEADER) { - ClusterState newState = runnable.apply(legislator.getLastAcceptedState()); - assert Objects.equals(newState.getNodes().getMasterNodeId(), localNode.getId()) : newState; - assert newState.getNodes().getNodes().get(localNode.getId()) != null; - legislator.handleClientValue( - ClusterState.builder(newState).term(legislator.getCurrentTerm()).incrementVersion().build()); + private void runPendingTasks() { + // TODO this batches tasks more aggressively than the real MasterService does, which weakens the properties we are + // testing here. Make the batching more realistic. + List<PendingTask> currentPendingTasks = new ArrayList<>(pendingTasks); + pendingTasks.clear(); + if (currentPendingTasks.size() == 0) { + return; + } + + try { + if (legislator.getMode() == Legislator.Mode.LEADER) { + ClusterState newState = legislator.getLastAcceptedState(); + for (final PendingTask pendingTask : currentPendingTasks) { + logger.trace("[{}] running [{}]", localNode.getId(), pendingTask); + newState = pendingTask.run(newState); } - } catch (ConsensusMessageRejectedException e) { - logger.trace(() -> new ParameterizedMessage("[{}] sendMasterServiceTask: [{}] failed: {}", - localNode.getName(), reason, e.getMessage())); - sendMasterServiceTask(reason, runnable); + assert Objects.equals(newState.getNodes().getMasterNodeId(), localNode.getId()) : newState; + assert newState.getNodes().getNodes().get(localNode.getId()) != null; + legislator.handleClientValue( + ClusterState.builder(newState).term(legislator.getCurrentTerm()).incrementVersion().build()); } - }); + } catch (ConsensusMessageRejectedException e) { + logger.trace(() -> new ParameterizedMessage("[{}] runPendingTasks: failed, rescheduling: {}", + localNode.getId(), e.getMessage())); + pendingTasks.addAll(currentPendingTasks); + futureExecutor.schedule(TimeValue.ZERO, "ClusterNode#runPendingTasks: retry" , this::runPendingTasks); + } + } + + private void sendMasterServiceTask(String reason, UnaryOperator<ClusterState> task) { + PendingTask pendingTask = new PendingTask(task, reason, masterServicesTaskId++); + logger.trace("[{}] sendMasterServiceTask: enqueueing [{}]", localNode.getId(), pendingTask); + pendingTasks.add(pendingTask); + futureExecutor.schedule(TimeValue.ZERO, "sendMasterServiceTask: " + pendingTask, this::runPendingTasks); } ClusterNode initialise(VotingConfiguration initialConfiguration) { @@ -854,6 +875,7 @@ void reboot() { logger.trace("reboot: taking down [{}]", localNode); tasks.removeIf(task -> task.scheduledFor(localNode)); inFlightMessages.removeIf(action -> action.hasDestination(localNode)); + pendingTasks.clear(); localNode = createDiscoveryNode(); logger.trace("reboot: starting up [{}]", localNode); try { @@ -901,6 +923,27 @@ public void schedule(TimeValue delay, String description, Runnable task) { } } + private class PendingTask { + private final UnaryOperator<ClusterState> task; + private final String description; + private final long taskId; + + PendingTask(UnaryOperator<ClusterState> task, String description, long taskId) { + this.task = task; + this.description = description; + this.taskId = taskId; + } + + @Override + public String toString() { + return description + " [" + taskId + "]"; + } + + ClusterState run(ClusterState clusterState) { + return task.apply(clusterState); + } + } + private class MockTransport implements Transport { @Override