Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recycle pages used by outgoing publications #77317

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix more tests
- Track blackholed requests for timely completion
- Deliver blackholed requests at end of runRandomly
- Release memory from rebooted nodes
- Complete messages delivered to rebooted nodes
- Heal cluster from disruptions before end of tests
  • Loading branch information
DaveCTurner committed Sep 7, 2021
commit 7b4961dfed467e3856c656639f2b194a452ee891
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,8 @@ public void testAckListenerReceivesNoAckFromHangingFollower() {
cluster.stabilise(defaultMillis(PUBLISH_TIMEOUT_SETTING));
assertTrue("expected eventual ack from " + leader, ackCollector.hasAckedSuccessfully(leader));
assertFalse("expected no ack from " + follower0, ackCollector.hasAcked(follower0));

follower0.setClusterStateApplyResponse(ClusterStateApplyResponse.SUCCEED);
}
}

Expand Down Expand Up @@ -1388,6 +1390,10 @@ public void testClusterCannotFormWithFailingJoinValidation() {
cluster.bootstrapIfNecessary();
cluster.runFor(10000, "failing join validation");
assertTrue(cluster.clusterNodes.stream().allMatch(cn -> cn.getLastAppliedClusterState().version() == 0));

for (ClusterNode clusterNode : cluster.clusterNodes) {
clusterNode.extraJoinValidators.clear();
}
}
}

Expand Down Expand Up @@ -1565,6 +1571,8 @@ public void testSingleNodeDiscoveryStabilisesEvenWhenDisrupted() {
+ 7 * delayVariabilityMillis, "stabilising");

assertThat(cluster.getAnyLeader(), sameInstance(clusterNode));

cluster.deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY);
}
}

Expand Down Expand Up @@ -1705,6 +1713,10 @@ public void assertMatched() {
mockLogAppender.stop();
}
}

for (ClusterNode clusterNode : cluster.clusterNodes) {
clusterNode.heal();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterModule;
Expand Down Expand Up @@ -41,6 +42,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
Expand Down Expand Up @@ -454,6 +456,10 @@ public String toString() {
final ClusterNode clusterNode = getAnyNode();
logger.debug("----> [runRandomly {}] applying initial configuration on {}", step, clusterNode.getId());
clusterNode.applyInitialConfiguration();
} else if (rarely()) {
final ClusterNode clusterNode = getAnyNode();
logger.debug("----> [runRandomly {}] completing blackholed requests sent by {}", step, clusterNode.getId());
clusterNode.deliverBlackholedRequests();
} else {
if (deterministicTaskQueue.hasDeferredTasks() && randomBoolean()) {
deterministicTaskQueue.advanceTime();
Expand All @@ -468,6 +474,8 @@ public String toString() {
assertConsistentStates();
}

logger.debug("delivering pending blackholed requests");
clusterNodes.forEach(ClusterNode::deliverBlackholedRequests);
logger.debug("running {} cleanup actions", cleanupActions.size());
cleanupActions.forEach(Runnable::run);
logger.debug("finished running cleanup actions");
Expand All @@ -487,8 +495,9 @@ private void updateCommittedStates() {
if (storedState == null) {
committedStatesByVersion.put(applierState.getVersion(), applierState);
} else {
assertEquals("expected " + applierState + " but got " + storedState,
value(applierState), value(storedState));
if (value(applierState) != value(storedState)) {
fail("expected " + applierState + " but got " + storedState);
}
}
}
}
Expand Down Expand Up @@ -728,6 +737,12 @@ void clearBlackholedConnections() {

@Override
public void close() {
// noinspection ReplaceInefficientStreamCount using .count() to run the filter on every node
while (clusterNodes.stream().filter(ClusterNode::deliverBlackholedRequests).count() != 0L) {
logger.debug("--> stabilising again after delivering blackholed requests");
stabilise(DEFAULT_STABILISATION_TIME);
}

clusterNodes.forEach(ClusterNode::close);
}

Expand Down Expand Up @@ -908,7 +923,7 @@ public class ClusterNode {
private DisruptableMockTransport mockTransport;
private NodeHealthService nodeHealthService;
List<BiConsumer<DiscoveryNode, ClusterState>> extraJoinValidators = new ArrayList<>();

private DelegatingBigArrays delegatingBigArrays;

ClusterNode(int nodeIndex, boolean masterEligible, Settings nodeSettings, NodeHealthService nodeHealthService) {
this(nodeIndex, createDiscoveryNode(nodeIndex, masterEligible), defaultPersistedStateSupplier, nodeSettings,
Expand Down Expand Up @@ -970,11 +985,12 @@ protected Optional<DisruptableMockTransport> getDisruptableMockTransport(Transpo
final Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators =
Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs)));
final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY);
delegatingBigArrays = new DelegatingBigArrays(bigArrays);
coordinator = new Coordinator(
"test_node",
settings,
clusterSettings,
bigArrays,
delegatingBigArrays,
transportService,
getNamedWriteableRegistry(),
allocationService,
Expand Down Expand Up @@ -1029,9 +1045,16 @@ ClusterNode restartedNode(Function<Metadata, Metadata> adaptGlobalMetadata, Func
address.address().getHostString(), address.getAddress(), address, Collections.emptyMap(),
localNode.isMasterNode() && DiscoveryNode.isMasterNode(nodeSettings)
? allExceptVotingOnlyRole : emptySet(), Version.CURRENT);
return new ClusterNode(nodeIndex, newLocalNode,
node -> new MockPersistedState(newLocalNode, persistedState, adaptGlobalMetadata, adaptCurrentTerm), nodeSettings,
nodeHealthService);
try {
return new ClusterNode(
nodeIndex,
newLocalNode,
node -> new MockPersistedState(newLocalNode, persistedState, adaptGlobalMetadata, adaptCurrentTerm),
nodeSettings,
nodeHealthService);
} finally {
delegatingBigArrays.releaseAll();
}
}

private CoordinationState.PersistedState getPersistedState() {
Expand Down Expand Up @@ -1073,11 +1096,17 @@ Runnable onNode(Runnable runnable) {
return new Runnable() {
@Override
public void run() {
if (clusterNodes.contains(ClusterNode.this) == false) {
if (clusterNodes.contains(ClusterNode.this)) {
wrapped.run();
} else if (runnable instanceof DisruptableMockTransport.RebootSensitiveRunnable) {
logger.trace(
"completing reboot-sensitive runnable {} from node {} as node has been removed from cluster",
runnable,
localNode);
((DisruptableMockTransport.RebootSensitiveRunnable) runnable).ifRebooted();
} else {
logger.trace("ignoring runnable {} from node {} as node has been removed from cluster", runnable, localNode);
return;
}
wrapped.run();
}

@Override
Expand Down Expand Up @@ -1249,6 +1278,10 @@ private boolean isNotUsefullyBootstrapped() {
void allowClusterStateApplicationFailure() {
clusterApplierService.allowClusterStateApplicationFailure();
}

boolean deliverBlackholedRequests() {
return mockTransport.deliverBlackholedRequests();
}
}

private List<TransportAddress> provideSeedHosts(SeedHostsProvider.HostsResolver ignored) {
Expand Down Expand Up @@ -1503,4 +1536,108 @@ public void testRegisterSpecConsistency() {
assertThat(spec.nextState(7, null, 42), equalTo(Optional.empty()));
}

/**
* A wrapper around a {@link BigArrays} which tracks the arrays it allocates so that they can be released if the node reboots. Only
* works for {@link ByteArray} allocations since that's all the {@link Coordinator} needs.
*/
static class DelegatingBigArrays extends BigArrays {

private final BigArrays delegate;

private final Set<DelegatingByteArray> trackedArrays = new HashSet<>();

public DelegatingBigArrays(BigArrays delegate) {
super(null, null, null);
this.delegate = delegate;
}

@Override
public ByteArray newByteArray(long size, boolean clearOnResize) {
return track(delegate.newByteArray(size, clearOnResize));
}

@Override
public ByteArray resize(ByteArray array, long size) {
assert array instanceof DelegatingByteArray;
trackedArrays.remove(array);
return track(delegate.resize(((DelegatingByteArray) array).getDelegate(), size));
}

private ByteArray track(ByteArray byteArray) {
final DelegatingByteArray wrapped = new DelegatingByteArray(byteArray);
trackedArrays.add(wrapped);
return wrapped;
}

public void releaseAll() {
for (DelegatingByteArray trackedArray : List.copyOf(trackedArrays)) {
trackedArray.close();
}
assert trackedArrays.isEmpty() : trackedArrays;
}

private class DelegatingByteArray implements ByteArray {

private final ByteArray delegate;

public DelegatingByteArray(ByteArray delegate) {
this.delegate = delegate;
}

ByteArray getDelegate() {
return delegate;
}

@Override
public void close() {
delegate.close();
trackedArrays.remove(this);
}

@Override
public long size() {
return delegate.size();
}

@Override
public byte get(long index) {
return delegate.get(index);
}

@Override
public byte set(long index, byte value) {
return delegate.set(index, value);
}

@Override
public boolean get(long index, int len, BytesRef ref) {
return delegate.get(index, len, ref);
}

@Override
public void set(long index, byte[] buf, int offset, int len) {
delegate.set(index, buf, offset, len);
}

@Override
public void fill(long fromIndex, long toIndex, byte value) {
delegate.fill(fromIndex, toIndex, value);
}

@Override
public boolean hasArray() {
return delegate.hasArray();
}

@Override
public byte[] array() {
return delegate.array();
}

@Override
public long ramBytesUsed() {
return delegate.ramBytesUsed();
}
}
}
}
Loading