Skip to content

Commit

Permalink
Fix more tests
Browse files Browse the repository at this point in the history
- Track blackholed requests for timely completion
- Deliver blackholed requests at end of runRandomly
- Release memory from rebooted nodes
- Complete messages delivered to rebooted nodes
- Heal cluster from disruptions before end of tests
  • Loading branch information
DaveCTurner committed Sep 7, 2021
1 parent 624f6ed commit 7b4961d
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,8 @@ public void testAckListenerReceivesNoAckFromHangingFollower() {
cluster.stabilise(defaultMillis(PUBLISH_TIMEOUT_SETTING));
assertTrue("expected eventual ack from " + leader, ackCollector.hasAckedSuccessfully(leader));
assertFalse("expected no ack from " + follower0, ackCollector.hasAcked(follower0));

follower0.setClusterStateApplyResponse(ClusterStateApplyResponse.SUCCEED);
}
}

Expand Down Expand Up @@ -1388,6 +1390,10 @@ public void testClusterCannotFormWithFailingJoinValidation() {
cluster.bootstrapIfNecessary();
cluster.runFor(10000, "failing join validation");
assertTrue(cluster.clusterNodes.stream().allMatch(cn -> cn.getLastAppliedClusterState().version() == 0));

for (ClusterNode clusterNode : cluster.clusterNodes) {
clusterNode.extraJoinValidators.clear();
}
}
}

Expand Down Expand Up @@ -1565,6 +1571,8 @@ public void testSingleNodeDiscoveryStabilisesEvenWhenDisrupted() {
+ 7 * delayVariabilityMillis, "stabilising");

assertThat(cluster.getAnyLeader(), sameInstance(clusterNode));

cluster.deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY);
}
}

Expand Down Expand Up @@ -1705,6 +1713,10 @@ public void assertMatched() {
mockLogAppender.stop();
}
}

for (ClusterNode clusterNode : cluster.clusterNodes) {
clusterNode.heal();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterModule;
Expand Down Expand Up @@ -41,6 +42,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
Expand Down Expand Up @@ -454,6 +456,10 @@ public String toString() {
final ClusterNode clusterNode = getAnyNode();
logger.debug("----> [runRandomly {}] applying initial configuration on {}", step, clusterNode.getId());
clusterNode.applyInitialConfiguration();
} else if (rarely()) {
final ClusterNode clusterNode = getAnyNode();
logger.debug("----> [runRandomly {}] completing blackholed requests sent by {}", step, clusterNode.getId());
clusterNode.deliverBlackholedRequests();
} else {
if (deterministicTaskQueue.hasDeferredTasks() && randomBoolean()) {
deterministicTaskQueue.advanceTime();
Expand All @@ -468,6 +474,8 @@ public String toString() {
assertConsistentStates();
}

logger.debug("delivering pending blackholed requests");
clusterNodes.forEach(ClusterNode::deliverBlackholedRequests);
logger.debug("running {} cleanup actions", cleanupActions.size());
cleanupActions.forEach(Runnable::run);
logger.debug("finished running cleanup actions");
Expand All @@ -487,8 +495,9 @@ private void updateCommittedStates() {
if (storedState == null) {
committedStatesByVersion.put(applierState.getVersion(), applierState);
} else {
assertEquals("expected " + applierState + " but got " + storedState,
value(applierState), value(storedState));
if (value(applierState) != value(storedState)) {
fail("expected " + applierState + " but got " + storedState);
}
}
}
}
Expand Down Expand Up @@ -728,6 +737,12 @@ void clearBlackholedConnections() {

@Override
public void close() {
// noinspection ReplaceInefficientStreamCount using .count() to run the filter on every node
while (clusterNodes.stream().filter(ClusterNode::deliverBlackholedRequests).count() != 0L) {
logger.debug("--> stabilising again after delivering blackholed requests");
stabilise(DEFAULT_STABILISATION_TIME);
}

clusterNodes.forEach(ClusterNode::close);
}

Expand Down Expand Up @@ -908,7 +923,7 @@ public class ClusterNode {
private DisruptableMockTransport mockTransport;
private NodeHealthService nodeHealthService;
List<BiConsumer<DiscoveryNode, ClusterState>> extraJoinValidators = new ArrayList<>();

private DelegatingBigArrays delegatingBigArrays;

ClusterNode(int nodeIndex, boolean masterEligible, Settings nodeSettings, NodeHealthService nodeHealthService) {
this(nodeIndex, createDiscoveryNode(nodeIndex, masterEligible), defaultPersistedStateSupplier, nodeSettings,
Expand Down Expand Up @@ -970,11 +985,12 @@ protected Optional<DisruptableMockTransport> getDisruptableMockTransport(Transpo
final Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators =
Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs)));
final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY);
delegatingBigArrays = new DelegatingBigArrays(bigArrays);
coordinator = new Coordinator(
"test_node",
settings,
clusterSettings,
bigArrays,
delegatingBigArrays,
transportService,
getNamedWriteableRegistry(),
allocationService,
Expand Down Expand Up @@ -1029,9 +1045,16 @@ ClusterNode restartedNode(Function<Metadata, Metadata> adaptGlobalMetadata, Func
address.address().getHostString(), address.getAddress(), address, Collections.emptyMap(),
localNode.isMasterNode() && DiscoveryNode.isMasterNode(nodeSettings)
? allExceptVotingOnlyRole : emptySet(), Version.CURRENT);
return new ClusterNode(nodeIndex, newLocalNode,
node -> new MockPersistedState(newLocalNode, persistedState, adaptGlobalMetadata, adaptCurrentTerm), nodeSettings,
nodeHealthService);
try {
return new ClusterNode(
nodeIndex,
newLocalNode,
node -> new MockPersistedState(newLocalNode, persistedState, adaptGlobalMetadata, adaptCurrentTerm),
nodeSettings,
nodeHealthService);
} finally {
delegatingBigArrays.releaseAll();
}
}

private CoordinationState.PersistedState getPersistedState() {
Expand Down Expand Up @@ -1073,11 +1096,17 @@ Runnable onNode(Runnable runnable) {
return new Runnable() {
@Override
public void run() {
if (clusterNodes.contains(ClusterNode.this) == false) {
if (clusterNodes.contains(ClusterNode.this)) {
wrapped.run();
} else if (runnable instanceof DisruptableMockTransport.RebootSensitiveRunnable) {
logger.trace(
"completing reboot-sensitive runnable {} from node {} as node has been removed from cluster",
runnable,
localNode);
((DisruptableMockTransport.RebootSensitiveRunnable) runnable).ifRebooted();
} else {
logger.trace("ignoring runnable {} from node {} as node has been removed from cluster", runnable, localNode);
return;
}
wrapped.run();
}

@Override
Expand Down Expand Up @@ -1249,6 +1278,10 @@ private boolean isNotUsefullyBootstrapped() {
void allowClusterStateApplicationFailure() {
clusterApplierService.allowClusterStateApplicationFailure();
}

boolean deliverBlackholedRequests() {
return mockTransport.deliverBlackholedRequests();
}
}

private List<TransportAddress> provideSeedHosts(SeedHostsProvider.HostsResolver ignored) {
Expand Down Expand Up @@ -1503,4 +1536,108 @@ public void testRegisterSpecConsistency() {
assertThat(spec.nextState(7, null, 42), equalTo(Optional.empty()));
}

/**
* A wrapper around a {@link BigArrays} which tracks the arrays it allocates so that they can be released if the node reboots. Only
* works for {@link ByteArray} allocations since that's all the {@link Coordinator} needs.
*/
static class DelegatingBigArrays extends BigArrays {

private final BigArrays delegate;

private final Set<DelegatingByteArray> trackedArrays = new HashSet<>();

public DelegatingBigArrays(BigArrays delegate) {
super(null, null, null);
this.delegate = delegate;
}

@Override
public ByteArray newByteArray(long size, boolean clearOnResize) {
return track(delegate.newByteArray(size, clearOnResize));
}

@Override
public ByteArray resize(ByteArray array, long size) {
assert array instanceof DelegatingByteArray;
trackedArrays.remove(array);
return track(delegate.resize(((DelegatingByteArray) array).getDelegate(), size));
}

private ByteArray track(ByteArray byteArray) {
final DelegatingByteArray wrapped = new DelegatingByteArray(byteArray);
trackedArrays.add(wrapped);
return wrapped;
}

public void releaseAll() {
for (DelegatingByteArray trackedArray : List.copyOf(trackedArrays)) {
trackedArray.close();
}
assert trackedArrays.isEmpty() : trackedArrays;
}

private class DelegatingByteArray implements ByteArray {

private final ByteArray delegate;

public DelegatingByteArray(ByteArray delegate) {
this.delegate = delegate;
}

ByteArray getDelegate() {
return delegate;
}

@Override
public void close() {
delegate.close();
trackedArrays.remove(this);
}

@Override
public long size() {
return delegate.size();
}

@Override
public byte get(long index) {
return delegate.get(index);
}

@Override
public byte set(long index, byte value) {
return delegate.set(index, value);
}

@Override
public boolean get(long index, int len, BytesRef ref) {
return delegate.get(index, len, ref);
}

@Override
public void set(long index, byte[] buf, int offset, int len) {
delegate.set(index, buf, offset, len);
}

@Override
public void fill(long fromIndex, long toIndex, byte value) {
delegate.fill(fromIndex, toIndex, value);
}

@Override
public boolean hasArray() {
return delegate.hasArray();
}

@Override
public byte[] array() {
return delegate.array();
}

@Override
public long ramBytesUsed() {
return delegate.ramBytesUsed();
}
}
}
}
Loading

0 comments on commit 7b4961d

Please sign in to comment.