Skip to content

Commit

Permalink
timeouts fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
simbo1905 committed Oct 12, 2024
1 parent 6b0422c commit 5ccf1cc
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 18 deletions.
7 changes: 2 additions & 5 deletions trex2-lib/src/main/java/com/github/trex_paxos/TrexEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,11 @@ TrexResult paxosNotThreadSafe(TrexMessage input) {

if (trexNode.isLeader()) {
// this line says we must always see our own heartbeat to set a new heartbeat.
resetTimeout();
setHeartbeat();
// TODO what if we are a recover we should heartbeat prepares until the network is stable.
} else {
// here we reset the timeout if we see real work by a leader
// FIXME this says that if we saw a message from the leader we should reset the timeout.
// yet it will lead to interrupt. We should only reset the timeout if we see a message from the leader
resetTimeout();
}

return result;
}

Expand Down
76 changes: 63 additions & 13 deletions trex2-lib/src/test/java/com/github/trex_paxos/SimulationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
import org.junit.jupiter.api.Test;

import java.util.*;
import java.util.logging.Handler;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
import java.util.random.RandomGenerator;
import java.util.random.RandomGeneratorFactory;
import java.util.stream.Collectors;
Expand All @@ -15,6 +18,20 @@
import static org.assertj.core.api.Assertions.assertThat;

class Simulation {
static {
Logger rootLogger = Logger.getLogger("");
Handler[] handlers = rootLogger.getHandlers();
for (Handler handler : handlers) {
handler.setFormatter(new SimpleFormatter() {
@Override
public String format(LogRecord record) {
return String.format("[%s] %s%n",
record.getLevel().getName(),
record.getMessage());
}
});
}
}
static final Logger LOGGER = Logger.getLogger(Simulation.class.getName());

private final RandomGenerator rng;
Expand Down Expand Up @@ -58,11 +75,14 @@ record ClientCommand() implements Event {

private void resetTimeout(byte nodeIdentifier) {
Optional.ofNullable(nodeTimeouts.get(nodeIdentifier)).ifPresent(timeout -> Optional.ofNullable(eventQueue.get(timeout)).ifPresent(events -> {
events.remove(new Timeout(nodeIdentifier));
if (events.isEmpty()) {
eventQueue.remove(timeout);
}
}));
events.remove(new Timeout(nodeIdentifier));
if (events.isEmpty()) {
eventQueue.remove(timeout);
}
})
);
nodeTimeouts.remove(nodeIdentifier);
assert eventQueue.keySet().containsAll(nodeTimeouts.values()) : "Not all node timeouts are present in the event queue";
}

long now = 0;
Expand All @@ -80,9 +100,13 @@ private long tick(long now) {
private void setRandomTimeout(byte nodeIdentifier) {
final var timeout = rng.nextInt((int) maxTimeout);
final var when = now() + timeout;
nodeTimeouts.put(nodeIdentifier, when);
if (nodeTimeouts.containsKey(nodeIdentifier)) {
resetTimeout(nodeIdentifier);
}
final var events = eventQueue.computeIfAbsent(when, _ -> new ArrayList<>());
nodeTimeouts.put(nodeIdentifier, when);
events.add(new Timeout(nodeIdentifier));
assert eventQueue.keySet().containsAll(nodeTimeouts.values()) : "Not all node timeouts are present in the event queue";
}

private void setHeartbeat(byte nodeIdentifier) {
Expand Down Expand Up @@ -332,10 +356,18 @@ public void testLeaderElection(RandomGenerator rng) {
assertThat(lastCommits).hasSizeGreaterThan(2);
}


@Test
public void testClientWork() {
RandomGenerator rng = Simulation.repeatableRandomGenerator(456789L);
public void testClientWork1000() {
RandomGenerator rng = Simulation.repeatableRandomGenerator(1234);
IntStream.range(0, 1000).forEach(i -> {
LOGGER.info("\n --------------- \nstarting iteration: " + i);
testClientWork(rng);
}
);
}

public void testClientWork(RandomGenerator rng) {
// given a repeatable test setup
final var simulation = new Simulation(rng, 30);

Expand All @@ -356,16 +388,32 @@ public void testClientWork() {
assertThat(roles.stream().filter(r -> r == TrexRole.LEAD).count()).isEqualTo(1);

// and we should have the same commit logs
assertThat(simulation.trexEngine1.journal.fakeJournal)
.isEqualTo(simulation.trexEngine2.journal.fakeJournal)
.isEqualTo(simulation.trexEngine3.journal.fakeJournal);
assertThat(consistentJournals(
simulation.trexEngine1.journal.fakeJournal,
simulation.trexEngine2.journal.fakeJournal,
simulation.trexEngine3.journal.fakeJournal

)).isTrue();
}

private void makeLeader(Simulation simulation) {
/**
* This logic will iteration over the journals and ensure that they are not inconsistent.
*/
boolean consistentJournals(NavigableMap<Long, Accept> fakeJournal, NavigableMap<Long, Accept> fakeJournal1, NavigableMap<Long, Accept> fakeJournal2) {
final NavigableMap<Long, Accept> longestJournal = fakeJournal.size() > fakeJournal1.size() ? fakeJournal : fakeJournal1.size() > fakeJournal2.size() ? fakeJournal1 : fakeJournal2;
return longestJournal.entrySet().stream().allMatch(e -> {
final var logIndex = e.getKey();
final var accept = e.getValue();
return Optional.ofNullable(fakeJournal1.get(logIndex)).map(a -> a.equals(accept)).orElse(true)
&& Optional.ofNullable(fakeJournal2.get(logIndex)).map(a -> a.equals(accept)).orElse(true);
});
}

void makeLeader(Simulation simulation) {

final var leader = simulation.trexEngine1;

// timing out the leader will set its timeout so no need to call start here
simulation.trexEngine1.start();
simulation.trexEngine2.start();
simulation.trexEngine3.start();

Expand All @@ -382,6 +430,8 @@ private void makeLeader(Simulation simulation) {
simulation.trexEngine2.paxos(r3.messages().getFirst());
simulation.trexEngine3.paxos(r3.messages().getFirst());
});

LOGGER.info("Leader: " + leader.trexNode.nodeIdentifier());
}

}

0 comments on commit 5ccf1cc

Please sign in to comment.