Skip to content

Commit

Permalink
refactor to remove holding Accept outside of journal while awaiting r…
Browse files Browse the repository at this point in the history
…esponses
  • Loading branch information
simbo1905 committed Dec 5, 2024
1 parent f3c615b commit c01f914
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 17 deletions.
26 changes: 13 additions & 13 deletions trex2-lib/src/main/java/com/github/trex_paxos/TrexNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private void algorithm(TrexMessage input, List<TrexMessage> messages, TreeMap<Lo
switch (input) {
case Accept accept -> {
if (lowerAccept(accept) || fixedSlot(accept)) {
messages.add(nack(accept));
messages.add(nack(accept.slotTerm()));
} else if (equalOrHigherAccept(accept)) {
// always journal first
journal.writeAccept(accept);
Expand Down Expand Up @@ -374,7 +374,6 @@ private void processAcceptResponse(AcceptResponse acceptResponse, Map<Long, Abst
"WIN logIndex==" + logIndex +
" nodeIdentifier==" + nodeIdentifier() +
" number==" + acceptVotes.accept().number() +
" value==" + acceptVotes.accept().command() +
" vs==" + vs);

acceptVotesByLogIndex.put(logIndex, AcceptVotes.chosen(acceptVotes.accept()));
Expand All @@ -388,10 +387,11 @@ private void processAcceptResponse(AcceptResponse acceptResponse, Map<Long, Abst

if (!fixed.isEmpty()) {
// run the callback
for (var accept : fixed) {
for (var slotTerm : fixed) {
final var accept = journal.readAccept(slotTerm.logIndex()).orElseThrow();
fixed(accept, commands);
// free the memory and stop heartbeating out the accepts
acceptVotesByLogIndex.remove(accept.logIndex());
acceptVotesByLogIndex.remove(slotTerm.logIndex());
}

// we have fixed slots
Expand Down Expand Up @@ -433,7 +433,7 @@ void backdown() {
/**
* Send a positive vote message to the leader.
*
* @param accept The accept message to acknowledge.
* @param accept The `accept` to positively acknowledge.
*/
final AcceptResponse ack(Accept accept) {
return new AcceptResponse(
Expand All @@ -447,15 +447,15 @@ final AcceptResponse ack(Accept accept) {
/**
* Send a negative vote message to the leader.
*
* @param accept The accept message to reject.
* @param slotTerm The `accept(S,V,_)` to negatively acknowledge.
*/
final AcceptResponse nack(Accept accept) {
final AcceptResponse nack(Accept.SlotTerm slotTerm) {
return new AcceptResponse(
nodeIdentifier,
accept.number().nodeIdentifier(),
slotTerm.number().nodeIdentifier(),
new AcceptResponse.Vote(nodeIdentifier,
accept.number().nodeIdentifier(),
accept.logIndex(),
slotTerm.number().nodeIdentifier(),
slotTerm.logIndex(),
false)
, progress.highestFixedIndex());
}
Expand Down Expand Up @@ -664,12 +664,12 @@ private void processPrepareResponse(PrepareResponse prepareResponse, List<TrexMe
* A record of the votes received by a node from other cluster members.
*/
// FIXME do not hold the Accept as we need to load it from the journal just hold the number+slot
public record AcceptVotes(Accept accept, Map<Byte, AcceptResponse> responses, boolean chosen) {
public record AcceptVotes(Accept.SlotTerm accept, Map<Byte, AcceptResponse> responses, boolean chosen) {
public AcceptVotes(Accept accept) {
this(accept, new HashMap<>(), false);
this(accept.slotTerm(), new HashMap<>(), false);
}

public static AcceptVotes chosen(Accept accept) {
public static AcceptVotes chosen(Accept.SlotTerm accept) {
return new AcceptVotes(accept, Collections.emptyMap(), true);
}
}
Expand Down
7 changes: 7 additions & 0 deletions trex2-lib/src/main/java/com/github/trex_paxos/msg/Accept.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,11 @@ public record Accept(byte from,
public int compareNumbers(Accept accept) {
return number.compareTo(accept.number);
}

public record SlotTerm(long logIndex, BallotNumber number) {
}

public SlotTerm slotTerm() {
return new SlotTerm(logIndex, number);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public Optional<Accept> readAccept(long logIndex) {
final var s = slot.getAndIncrement();
final var v = createAcceptVotes(s);
// Setup gap scenario we first add chosen `accept` before gap
acceptVotesByLogIndex.put(s, v);
acceptVotesByLogIndex.put(s, v.votes());
// we need to put it into the journal also
journaledAccepts.get().put(s, v.accept());
// Then we create a gap
Expand All @@ -97,17 +97,21 @@ public Optional<Accept> readAccept(long logIndex) {
final var s = slot.get();
final var v = createAcceptVotes(s);
// Setup gap scenario we first add chosen `accept` before gap
acceptVotesByLogIndex.put(s, v);
acceptVotesByLogIndex.put(s, v.votes());
// we need to put it into the journal also
journaledAccepts.get().put(s, v.accept());
}

private AcceptVotes createAcceptVotes(long s) {
record CreatedData(Accept accept, AcceptVotes votes) {
}

private CreatedData createAcceptVotes(long s) {
final var a = new Accept(thisNodeId, s, thisPromise, NoOperation.NOOP);
final Map<Byte, AcceptResponse> responses = new TreeMap<>();
responses.put(thisNodeId, new AcceptResponse(thisNodeId, thisNodeId,
new AcceptResponse.Vote(thisNodeId, thisNodeId, s, outcomeVote), s));
return new AcceptVotes(a, responses, false);
AcceptVotes votes = new AcceptVotes(a.slotTerm(), responses, false);
return new CreatedData(a, votes);
}
};

Expand Down

0 comments on commit c01f914

Please sign in to comment.