diff --git a/trex2-lib/src/main/java/com/github/trex_paxos/TrexNode.java b/trex2-lib/src/main/java/com/github/trex_paxos/TrexNode.java index af4c568..b9b5fb7 100644 --- a/trex2-lib/src/main/java/com/github/trex_paxos/TrexNode.java +++ b/trex2-lib/src/main/java/com/github/trex_paxos/TrexNode.java @@ -94,7 +94,7 @@ TrexResult paxos(TrexMessage input) { var number = prepare.number(); if (number.lessThan(progress.highestPromised()) || prepare.logIndex() <= progress.highestCommittedIndex()) { // nack a low nextPrepareMessage else any nextPrepareMessage for a committed slot sending any accepts they are missing - messages.add(nack(prepare, loadCatchup(prepare.logIndex()))); + messages.add(nack(prepare)); } else if (number.greaterThan(progress.highestPromised())) { // ack a higher nextPrepareMessage final var newProgress = progress.withHighestPromised(prepare.number()); @@ -259,9 +259,7 @@ case Commit(final var commitFrom, final var maxSlotCommittable, long leaderMaxAc switch (accept) { case Accept(_, _, _, NoOperation _) -> { } - case Accept(_, _, _, final Command command) -> { - commands.add(command); - } + case Accept(_, _, _, final Command command) -> commands.add(command); } } @@ -352,9 +350,8 @@ PrepareResponse ack(Prepare prepare) { * Send a negative nextPrepareMessage response message to the leader. * * @param prepare The nextPrepareMessage message to reject. - * @param catchup The list of accept messages to send to the leader. */ - PrepareResponse nack(Prepare prepare, List catchup) { + PrepareResponse nack(Prepare prepare) { return new PrepareResponse( new Vote(nodeIdentifier, prepare.number().nodeIdentifier(), prepare.logIndex(), false), highestCommitted(), journal.loadAccept(prepare.logIndex()) @@ -408,7 +405,7 @@ public TrexRole getRole() { /// The heartbeat method is called by the TrexEngine to send messages to the cluster to stop them /// timing out. There may also be dropped messages due to partitions or crashes. So we will also - /// heart beat prepare or accept messages that are pending a response. + /// heartbeat prepare or accept messages that are pending a response. public List heartbeat() { final var result = new ArrayList(); if (isLeader()) { @@ -421,7 +418,7 @@ public List heartbeat() { } private List pendingAcceptMessages() { - final var r = LongStream.range( + return LongStream.range( progress.highestCommittedIndex() + 1, progress.highestAcceptedIndex() + 1 ) @@ -429,7 +426,6 @@ private List pendingAcceptMessages() { .takeWhile(Optional::isPresent) .flatMap(Optional::stream) .toList(); - return r; } private Commit currentCommitMessage() { diff --git a/trex2-lib/src/test/java/com/github/trex_paxos/LossyJournal.java b/trex2-lib/src/test/java/com/github/trex_paxos/LossyJournal.java deleted file mode 100644 index d4893e6..0000000 --- a/trex2-lib/src/test/java/com/github/trex_paxos/LossyJournal.java +++ /dev/null @@ -1,44 +0,0 @@ -package com.github.trex_paxos; - -import com.github.trex_paxos.msg.Accept; -import com.github.trex_paxos.msg.Progress; - -import java.util.HashMap; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Optional; - -public class LossyJournal implements Journal { - - final Map progressMap = new HashMap<>(); - - @Override - public void saveProgress(Progress progress) { - progressMap.put(progress.nodeIdentifier(), progress); - } - - @Override - public Progress loadProgress(byte nodeIdentifier) { - return progressMap.get(nodeIdentifier); - } - - final NavigableMap acceptMap = new java.util.TreeMap<>(); - - @Override - public void journalAccept(Accept accept) { - acceptMap.put(accept.logIndex(), accept); - } - - @Override - public Optional loadAccept(long logIndex) { - return Optional.ofNullable(acceptMap.get(logIndex)); - } - - @Override - public void sync() { - // no-op - } - - final Map committedMap = new HashMap<>(); - -} diff --git a/trex2-lib/src/test/java/com/github/trex_paxos/Simulation.java b/trex2-lib/src/test/java/com/github/trex_paxos/Simulation.java index ce0ed9d..3f8e2ad 100644 --- a/trex2-lib/src/test/java/com/github/trex_paxos/Simulation.java +++ b/trex2-lib/src/test/java/com/github/trex_paxos/Simulation.java @@ -246,7 +246,6 @@ protected Stream networkSimulation(Send send, Long now, BiFunction< .flatMap(engine -> engine.paxos(m).messages().stream()); case DirectMessage m -> engines.get(m.to()).paxos(m).messages().stream(); case AbstractCommand abstractCommand -> throw new AssertionError("Unexpected command message: " + abstractCommand); - default -> throw new IllegalStateException("Unexpected value: " + send.message()); }; private void makeClientDataEvents(int iterations, NavigableMap> eventQueue) {