Skip to content

Commit

Permalink
processes client messages
Browse files Browse the repository at this point in the history
  • Loading branch information
simbo1905 committed Oct 6, 2024
1 parent ebfeade commit db6a080
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 117 deletions.
32 changes: 27 additions & 5 deletions trex2-lib/src/main/java/com/github/trex_paxos/TrexEngine.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.github.trex_paxos;

import com.github.trex_paxos.msg.Commit;
import com.github.trex_paxos.msg.Prepare;
import com.github.trex_paxos.msg.TrexMessage;
import com.github.trex_paxos.msg.*;

import java.util.Optional;
import java.util.concurrent.Semaphore;
Expand All @@ -26,12 +24,35 @@ public TrexEngine(TrexNode trexNode) {
this.trexNode = trexNode;
}

/**
* Set a random timeout for the current node. This method is called when the node is not the leader.
*/
abstract void setRandomTimeout();

/**
* Reset the timeout for the current node. This method is called when the node is not the leader when it receives a
* message from the leader.
*/
abstract void resetTimeout();

/**
* Set the heartbeat for the current node. This method is called when the node is the leader or a recoverer.
*/
abstract void setHeartbeat();

/**
* Process an application command sent from a client.
*/
Optional<Accept> command(Command command) {
if (trexNode.isLeader()) {
final var nextExcept = trexNode.nextAccept(command);
trexNode.paxos(nextExcept);
return Optional.of(nextExcept);
} else {
return Optional.empty();
}
}

Semaphore mutex = new Semaphore(1);

/**
Expand Down Expand Up @@ -97,7 +118,7 @@ TrexResult paxosNotThreadSafe(TrexMessage input) {
} else {
// here we reset the timeout if we see real work by a leader
// FIXME this says that if we saw a message from the leader we should reset the timeout.
// yet it will lead to interupt. We should only reset the timeout if we see a message from the leader
// yet it will lead to interrupt. We should only reset the timeout if we see a message from the leader
resetTimeout();
}
return result;
Expand Down Expand Up @@ -126,10 +147,11 @@ public TrexResult receive(TrexMessage p) {
return result;
}

public Optional<Commit> hearbeat() {
public Optional<Commit> heartbeat() {
var result = trexNode.heartbeat();
LOGGER.info("heartbeat: " + trexNode.nodeIdentifier() + " " + trexNode.getRole());
setHeartbeat();
return result;
}

}
65 changes: 30 additions & 35 deletions trex2-lib/src/main/java/com/github/trex_paxos/TrexNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,23 @@ TrexResult paxos(TrexMessage input) {
} else if (equalOrHigherAccept(progress, accept)) {
// always journal first
journal.journalAccept(accept);
// record that we have accepted a higher slot
if (accept.logIndex() > progress.highestAcceptedIndex()) {
progress = progress.withHighestAccepted(accept.logIndex());
}
if (higherAccept(progress, accept)) {
// we must update promise on a higher accept http://stackoverflow.com/q/29880949/329496
Progress updatedProgress = progress.withHighestPromised(accept.number());
journal.saveProgress(updatedProgress);
this.progress = updatedProgress;
this.progress = progress.withHighestPromised(accept.number());
}
journal.saveProgress(this.progress);
messages.add(ack(accept));
} else {
throw new AssertionError("unreachable progress={" + progress + "}, accept={" + accept + "}");
}
}
case Prepare prepare -> {
var number = prepare.number();
if (number.lessThan(progress.highestPromised()) || prepare.logIndex() <= progress.highestCommitted()) {
if (number.lessThan(progress.highestPromised()) || prepare.logIndex() <= progress.highestCommittedIndex()) {
// nack a low prepare else any prepare for a committed slot sending any accepts they are missing
messages.add(nack(prepare, loadCatchup(prepare.logIndex())));
} else if (number.greaterThan(progress.highestPromised())) {
Expand Down Expand Up @@ -159,7 +162,10 @@ TrexResult paxos(TrexMessage input) {
case Accept(_, _, _, NoOperation _) -> {
// NOOP
}
case Accept(_, _, _, final Command command) -> commands.add(command);
case Accept(_, _, _, final Command command) -> {
commands.add(command);
messages.add(new Commit(nodeIdentifier, slot));
}
}
}
// free the memory
Expand All @@ -186,8 +192,8 @@ case Accept(_, _, _, NoOperation _) -> {
case PrepareResponse prepareResponse -> {
if (RECOVER == role) {
if (prepareResponse.highestUncommitted().isPresent() && prepareResponse.highestCommittedIndex().isPresent()) {
final long highestCommittedOther = prepareResponse.highestCommittedIndex().get();
final long highestCommitted = progress.highestCommitted();
final long highestCommittedOther = prepareResponse.highestCommittedIndex().get();
final long highestCommitted = progress.highestCommittedIndex();
if (highestCommitted < highestCommittedOther) {
// we are behind so now try to catch up
prepareResponse.catchupResponse().ifPresent(catchupResponse -> {
Expand All @@ -213,7 +219,7 @@ case Accept(_, _, _, NoOperation _) -> {
// we are unable to achieve a quorum, so we must back down
backdown();
case WIN -> {
// first issue new prepare messages for higher slots
// only if we learn that other nodes have prepared higher slots we must prepare them
votes.values().stream()
.filter(p -> p.highestCommittedIndex().isPresent())
.map(p -> p.highestCommittedIndex().get())
Expand Down Expand Up @@ -246,9 +252,9 @@ case Accept(_, _, _, NoOperation _) -> {
messages.add(accept);
// create the empty map to track the responses
acceptVotesByLogIndex.put(logIndex, new AcceptVotes(accept));
// self vote for the Accept
selfVoteOnAccept(accept);
// we are no long awaiting the prepare for the current slot
// send the Accept to ourselves and process the response
paxos(paxos(accept).messages().getFirst());
// we are no longer awaiting the prepare for the current slot
prepareResponsesByLogIndex.remove(logIndex);
// if we have had no evidence of higher accepted operationBytes we can promote
if (prepareResponsesByLogIndex.isEmpty()) {
Expand All @@ -261,7 +267,7 @@ case Accept(_, _, _, NoOperation _) -> {
}
}
case Commit(final var from, final var maxSlotCommittable) -> {
final var lastCommittedIndex = progress.highestCommitted();
final var lastCommittedIndex = progress.highestCommittedIndex();
if (maxSlotCommittable > lastCommittedIndex) {
// we may have gaps, so we must find the ones that we have in the log
final var commitableAccepts =
Expand Down Expand Up @@ -295,7 +301,7 @@ case Accept(final var logIndex, _, _, final Command command) -> {
}
case Catchup(final var replyTo, final var to, final var highestCommittedOther) -> {
if (to == nodeIdentifier)
messages.add(new CatchupResponse(nodeIdentifier, replyTo, progress.highestCommitted(), loadCatchup(highestCommittedOther)));
messages.add(new CatchupResponse(nodeIdentifier, replyTo, progress.highestCommittedIndex(), loadCatchup(highestCommittedOther)));
}
case CatchupResponse(_, final var to, _, _) -> {
if (to == nodeIdentifier)
Expand All @@ -305,21 +311,6 @@ case CatchupResponse(_, final var to, _, _) -> {
return new TrexResult(messages, commands);
}

private void selfVoteOnAccept(Accept accept) {
if (lowerAccept(progress, accept) || higherAcceptForCommittedSlot(accept, progress)) {
acceptVotesByLogIndex.get(accept.logIndex()).responses().put(nodeIdentifier, nack(accept));
} else if (equalOrHigherAccept(progress, accept)) {
// always journal first
journal.journalAccept(accept);
if (higherAccept(progress, accept)) {
// we must update promise on a higher accept http://stackoverflow.com/q/29880949/329496
Progress updatedProgress = progress.withHighestPromised(accept.number());
journal.saveProgress(updatedProgress);
this.progress = updatedProgress;
}
}
}

private void saveCatchup(CatchupResponse catchupResponse) {
for (Accept accept : catchupResponse.catchup()) {
final var slot = accept.logIndex();
Expand All @@ -341,7 +332,7 @@ private void saveCatchup(CatchupResponse catchupResponse) {
}

private List<Accept> loadCatchup(long highestCommittedOther) {
final long highestCommitted = progress.highestCommitted();
final long highestCommitted = progress.highestCommittedIndex();
List<Accept> catchup = new ArrayList<>();
for (long slot = highestCommitted + 1; slot < highestCommittedOther; slot++) {
journal.loadAccept(slot).ifPresent(catchup::add);
Expand Down Expand Up @@ -401,7 +392,7 @@ PrepareResponse nack(Prepare prepare, List<Accept> catchup) {
return new PrepareResponse(
new Vote(nodeIdentifier, prepare.number().nodeIdentifier(), prepare.logIndex(), false),
journal.loadAccept(prepare.logIndex()),
Optional.of(new CatchupResponse(nodeIdentifier, prepare.from(), progress.highestCommitted(), catchup)));
Optional.of(new CatchupResponse(nodeIdentifier, prepare.from(), progress.highestCommittedIndex(), catchup)));
}

/**
Expand All @@ -413,7 +404,7 @@ PrepareResponse nack(Prepare prepare, List<Accept> catchup) {
Optional<Accept> startAppendToLog(Command command) {
assert role == LEAD : "role={" + role + "}";
if (term != null) {
final long slot = progress.highestAccepted() + 1;
final long slot = progress.highestAcceptedIndex() + 1;
final var accept = new Accept(nodeIdentifier, slot, term, command);
// this could self accept else self reject
final var actOrNack = this.paxos(accept);
Expand All @@ -434,7 +425,7 @@ static boolean equalOrHigherAccept(Progress progress, Accept accept) {

static boolean higherAcceptForCommittedSlot(Accept accept, Progress progress) {
return accept.number().greaterThan(progress.highestPromised()) &&
accept.logIndex() <= progress.highestCommitted();
accept.logIndex() <= progress.highestCommittedIndex();
}

static Boolean lowerAccept(Progress progress, Accept accept) {
Expand All @@ -447,7 +438,7 @@ static Boolean higherAccept(Progress progress, Accept accept) {

@SuppressWarnings("unused")
public long highestCommitted() {
return progress.highestCommitted();
return progress.highestCommittedIndex();
}

@SuppressWarnings("unused")
Expand All @@ -459,7 +450,7 @@ Optional<Prepare> timeout() {
if (role == FOLLOW) {
role = RECOVER;
term = new BallotNumber(progress.highestPromised().counter() + 1, nodeIdentifier);
final var prepare = new Prepare(nodeIdentifier, progress.highestCommitted() + 1, term);
final var prepare = new Prepare(nodeIdentifier, progress.highestCommittedIndex() + 1, term);
final var selfPrepareResponse = paxos(prepare);
assert selfPrepareResponse.messages().size() == 1 : "selfPrepare={" + selfPrepareResponse + "}";
return Optional.of(prepare);
Expand All @@ -476,7 +467,11 @@ public TrexRole getRole() {
}

public Optional<Commit> heartbeat() {
final var commit = role == LEAD ? new Commit(nodeIdentifier, progress.highestCommitted()) : null;
final var commit = role == LEAD ? new Commit(nodeIdentifier, progress.highestCommittedIndex()) : null;
return Optional.ofNullable(commit);
}

public Accept nextAccept(Command command) {
return new Accept(nodeIdentifier, progress.highestAcceptedIndex() + 1, progress.highestPromised(), command);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
import java.io.DataOutputStream;
import java.io.IOException;

public sealed interface AbstractCommand permits NoOperation, Command {
public sealed interface AbstractCommand extends Message permits NoOperation, Command {
void writeTo(DataOutputStream dataStream) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.github.trex_paxos.msg;

public sealed interface Message permits TrexMessage, AbstractCommand {
}
28 changes: 16 additions & 12 deletions trex2-lib/src/main/java/com/github/trex_paxos/msg/Progress.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
import java.io.IOException;

/**
* Progress is a record of the highest ballot number promised which must be forced to disk for Paxos to be correct.
* It is also the highest accepted command and the highest committed command which is used to speed up node recovery.
* Progress is a record of the highest ballot number promised or seen on an accepted message which must be crash durable
* (e.g. forced to disk) for Paxos to be correct. We also store the highest committed index and the highest accepted index.
*
* @param nodeIdentifier The current node identifier.
* @param highestPromised
* @param highestCommitted
* @param highestAccepted
* @param highestPromised The highest ballot number promised or seen on an accepted message.
* @param highestCommittedIndex The highest log index that has been learnt to have been fixed and so committed.
* @param highestAcceptedIndex The highest log index that has been accepted.
*/
public record Progress(byte nodeIdentifier, BallotNumber highestPromised, long highestCommitted,
long highestAccepted) implements JournalRecord {
public record Progress(byte nodeIdentifier, BallotNumber highestPromised, long highestCommittedIndex,
long highestAcceptedIndex) implements JournalRecord {

/**
* When an application initializes an empty journal it has to have a NIL value.
Expand All @@ -27,19 +27,19 @@ public Progress(byte nodeIdentifier) {
}

public Progress withHighestCommitted(long committedLogIndex) {
return new Progress(nodeIdentifier, highestPromised, committedLogIndex, highestAccepted);
return new Progress(nodeIdentifier, highestPromised, committedLogIndex, highestAcceptedIndex);
}

// Java may get `with` so that we can retire this method.
public Progress withHighestPromised(BallotNumber p) {
return new Progress(nodeIdentifier, p, highestCommitted, highestAccepted);
return new Progress(nodeIdentifier, p, highestCommittedIndex, highestAcceptedIndex);
}

public void writeTo(DataOutputStream dos) throws IOException {
dos.writeByte(nodeIdentifier);
highestPromised.writeTo(dos);
dos.writeLong(highestCommitted);
dos.writeLong(highestAccepted);
dos.writeLong(highestCommittedIndex);
dos.writeLong(highestAcceptedIndex);
}

public static Progress readFrom(DataInputStream dis) throws IOException {
Expand All @@ -48,6 +48,10 @@ public static Progress readFrom(DataInputStream dis) throws IOException {

@Override
public String toString() {
return "P(p={" + highestPromised + "},c={" + highestCommitted + "},a={" + highestAccepted + "})";
return "P(p={" + highestPromised + "},c={" + highestCommittedIndex + "},a={" + highestAcceptedIndex + "})";
}

public Progress withHighestAccepted(long highestAcceptedIndex) {
return new Progress(nodeIdentifier, highestPromised, highestCommittedIndex, highestAcceptedIndex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,15 @@
import java.io.DataOutputStream;
import java.io.IOException;

public sealed interface TrexMessage permits Accept, AcceptResponse, BroadcastMessage, Catchup, CatchupResponse, Commit, DirectMessage, Prepare, PrepareResponse {
public sealed interface TrexMessage extends Message permits Accept,
AcceptResponse,
BroadcastMessage,
Catchup,
CatchupResponse,
Commit,
DirectMessage,
Prepare,
PrepareResponse {

void writeTo(DataOutputStream dos) throws IOException;

Expand Down
27 changes: 0 additions & 27 deletions trex2-lib/src/test/java/com/github/trex_paxos/AlgorithmTests.java

This file was deleted.

Loading

0 comments on commit db6a080

Please sign in to comment.