Skip to content

Commit

Permalink
catchup working for rotating partition
Browse files Browse the repository at this point in the history
  • Loading branch information
simbo1905 committed Oct 19, 2024
1 parent 5ebb659 commit 1d70d87
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 54 deletions.
14 changes: 5 additions & 9 deletions trex2-lib/src/main/java/com/github/trex_paxos/TrexNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ TrexResult paxos(TrexMessage input) {
var number = prepare.number();
if (number.lessThan(progress.highestPromised()) || prepare.logIndex() <= progress.highestCommittedIndex()) {
// nack a low nextPrepareMessage else any nextPrepareMessage for a committed slot sending any accepts they are missing
messages.add(nack(prepare, loadCatchup(prepare.logIndex())));
messages.add(nack(prepare));
} else if (number.greaterThan(progress.highestPromised())) {
// ack a higher nextPrepareMessage
final var newProgress = progress.withHighestPromised(prepare.number());
Expand Down Expand Up @@ -259,9 +259,7 @@ case Commit(final var commitFrom, final var maxSlotCommittable, long leaderMaxAc
switch (accept) {
case Accept(_, _, _, NoOperation _) -> {
}
case Accept(_, _, _, final Command command) -> {
commands.add(command);
}
case Accept(_, _, _, final Command command) -> commands.add(command);
}
}

Expand Down Expand Up @@ -352,9 +350,8 @@ PrepareResponse ack(Prepare prepare) {
* Send a negative nextPrepareMessage response message to the leader.
*
* @param prepare The nextPrepareMessage message to reject.
* @param catchup The list of accept messages to send to the leader.
*/
PrepareResponse nack(Prepare prepare, List<Accept> catchup) {
PrepareResponse nack(Prepare prepare) {
return new PrepareResponse(
new Vote(nodeIdentifier, prepare.number().nodeIdentifier(), prepare.logIndex(), false),
highestCommitted(), journal.loadAccept(prepare.logIndex())
Expand Down Expand Up @@ -408,7 +405,7 @@ public TrexRole getRole() {

/// The heartbeat method is called by the TrexEngine to send messages to the cluster to stop them
/// timing out. There may also be dropped messages due to partitions or crashes. So we will also
/// heart beat prepare or accept messages that are pending a response.
/// heartbeat prepare or accept messages that are pending a response.
public List<TrexMessage> heartbeat() {
final var result = new ArrayList<TrexMessage>();
if (isLeader()) {
Expand All @@ -421,15 +418,14 @@ public List<TrexMessage> heartbeat() {
}

private List<Accept> pendingAcceptMessages() {
final var r = LongStream.range(
return LongStream.range(
progress.highestCommittedIndex() + 1,
progress.highestAcceptedIndex() + 1
)
.mapToObj(journal::loadAccept)
.takeWhile(Optional::isPresent)
.flatMap(Optional::stream)
.toList();
return r;
}

private Commit currentCommitMessage() {
Expand Down
44 changes: 0 additions & 44 deletions trex2-lib/src/test/java/com/github/trex_paxos/LossyJournal.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ protected Stream<TrexMessage> networkSimulation(Send send, Long now, BiFunction<
.flatMap(engine -> engine.paxos(m).messages().stream());
case DirectMessage m -> engines.get(m.to()).paxos(m).messages().stream();
case AbstractCommand abstractCommand -> throw new AssertionError("Unexpected command message: " + abstractCommand);
default -> throw new IllegalStateException("Unexpected value: " + send.message());
};

private void makeClientDataEvents(int iterations, NavigableMap<Long, List<Event>> eventQueue) {
Expand Down

0 comments on commit 1d70d87

Please sign in to comment.