Skip to content

Commit

Permalink
Add simple batching to fake MasterService (elastic#48)
Browse files Browse the repository at this point in the history
We face failing tests and other obstructions because of out-of-order processing
of tasks submitted to the `MasterService` provided by the test harness. Pending
a more nuanced notion of the batching of these tasks, this change batches all
submitted tasks together so we can make progress on other fronts.
  • Loading branch information
DaveCTurner authored May 18, 2018
1 parent 2e04775 commit eb51407
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
Expand Down Expand Up @@ -311,6 +312,9 @@ public void handleClientValue(ClusterState clusterState) {
if (mode != Mode.LEADER) {
throw new ConsensusMessageRejectedException("handleClientValue: not currently leading, so cannot handle client value.");
}

assert localNode.equals(clusterState.getNodes().get(localNode.getId())) : localNode + " should be in published " + clusterState;

PublishRequest publishRequest = consensusState.handleClientValue(clusterState);
publish(publishRequest);
}
Expand Down Expand Up @@ -1421,6 +1425,6 @@ private void onTimeout() {
}

public interface MasterService {
void submitTask(String reason, Function<ClusterState, ClusterState> runnable);
void submitTask(String reason, UnaryOperator<ClusterState> runnable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import static org.elasticsearch.discovery.zen2.Legislator.CONSENSUS_HEARTBEAT_DELAY_SETTING;
Expand Down Expand Up @@ -398,6 +399,7 @@ class Cluster {
Map<Long, ClusterState> committedStatesByVersion = new HashMap<>();
private static final long DEFAULT_DELAY_VARIABILITY = 100L;
private static final long RANDOM_MODE_DELAY_VARIABILITY = 10000L;
private long masterServicesTaskId = 0L;

// How long to wait? The worst case is that a leader just committed a value to all the other nodes, and then
// dropped off the network, which would mean that all the other nodes must detect its failure. It takes
Expand Down Expand Up @@ -805,6 +807,7 @@ class ClusterNode {
private final int index;
private final MockTransport transport;
private final FutureExecutor futureExecutor;
private final List<PendingTask> pendingTasks = new ArrayList<>();

PersistedState persistedState;
Legislator legislator;
Expand All @@ -822,22 +825,40 @@ private DiscoveryNode createDiscoveryNode() {
return new DiscoveryNode("node" + this.index, buildNewFakeTransportAddress(), Version.CURRENT);
}

private void sendMasterServiceTask(String reason, Function<ClusterState, ClusterState> runnable) {
futureExecutor.schedule(TimeValue.timeValueMillis(0L), "sendMasterServiceTask:" + reason, () -> {
try {
if (legislator.getMode() == Legislator.Mode.LEADER) {
ClusterState newState = runnable.apply(legislator.getLastAcceptedState());
assert Objects.equals(newState.getNodes().getMasterNodeId(), localNode.getId()) : newState;
assert newState.getNodes().getNodes().get(localNode.getId()) != null;
legislator.handleClientValue(
ClusterState.builder(newState).term(legislator.getCurrentTerm()).incrementVersion().build());
private void runPendingTasks() {
// TODO this batches tasks more aggressively than the real MasterService does, which weakens the properties we are
// testing here. Make the batching more realistic.
List<PendingTask> currentPendingTasks = new ArrayList<>(pendingTasks);
pendingTasks.clear();
if (currentPendingTasks.size() == 0) {
return;
}

try {
if (legislator.getMode() == Legislator.Mode.LEADER) {
ClusterState newState = legislator.getLastAcceptedState();
for (final PendingTask pendingTask : currentPendingTasks) {
logger.trace("[{}] running [{}]", localNode.getId(), pendingTask);
newState = pendingTask.run(newState);
}
} catch (ConsensusMessageRejectedException e) {
logger.trace(() -> new ParameterizedMessage("[{}] sendMasterServiceTask: [{}] failed: {}",
localNode.getName(), reason, e.getMessage()));
sendMasterServiceTask(reason, runnable);
assert Objects.equals(newState.getNodes().getMasterNodeId(), localNode.getId()) : newState;
assert newState.getNodes().getNodes().get(localNode.getId()) != null;
legislator.handleClientValue(
ClusterState.builder(newState).term(legislator.getCurrentTerm()).incrementVersion().build());
}
});
} catch (ConsensusMessageRejectedException e) {
logger.trace(() -> new ParameterizedMessage("[{}] runPendingTasks: failed, rescheduling: {}",
localNode.getId(), e.getMessage()));
pendingTasks.addAll(currentPendingTasks);
futureExecutor.schedule(TimeValue.ZERO, "ClusterNode#runPendingTasks: retry" , this::runPendingTasks);
}
}

private void sendMasterServiceTask(String reason, UnaryOperator<ClusterState> task) {
PendingTask pendingTask = new PendingTask(task, reason, masterServicesTaskId++);
logger.trace("[{}] sendMasterServiceTask: enqueueing [{}]", localNode.getId(), pendingTask);
pendingTasks.add(pendingTask);
futureExecutor.schedule(TimeValue.ZERO, "sendMasterServiceTask: " + pendingTask, this::runPendingTasks);
}

ClusterNode initialise(VotingConfiguration initialConfiguration) {
Expand All @@ -854,6 +875,7 @@ void reboot() {
logger.trace("reboot: taking down [{}]", localNode);
tasks.removeIf(task -> task.scheduledFor(localNode));
inFlightMessages.removeIf(action -> action.hasDestination(localNode));
pendingTasks.clear();
localNode = createDiscoveryNode();
logger.trace("reboot: starting up [{}]", localNode);
try {
Expand Down Expand Up @@ -901,6 +923,27 @@ public void schedule(TimeValue delay, String description, Runnable task) {
}
}

private class PendingTask {
private final UnaryOperator<ClusterState> task;
private final String description;
private final long taskId;

PendingTask(UnaryOperator<ClusterState> task, String description, long taskId) {
this.task = task;
this.description = description;
this.taskId = taskId;
}

@Override
public String toString() {
return description + " [" + taskId + "]";
}

ClusterState run(ClusterState clusterState) {
return task.apply(clusterState);
}
}

private class MockTransport implements Transport {

@Override
Expand Down

0 comments on commit eb51407

Please sign in to comment.