Skip to content

Commit

Permalink
PrepareResponsePropertyTests and a pass over error handling and docs
Browse files Browse the repository at this point in the history
  • Loading branch information
simbo1905 committed Dec 7, 2024
1 parent ee5a9cb commit e5b4bfe
Show file tree
Hide file tree
Showing 6 changed files with 297 additions and 91 deletions.
141 changes: 89 additions & 52 deletions trex2-lib/src/main/java/com/github/trex_paxos/TrexNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,42 @@
import static com.github.trex_paxos.TrexRole.*;

/// A TrexNode is a single node in a Paxos cluster. It runs the part-time parliament algorithm. It requires
/// collaborating classes.
/// - A [Journal] which must be crash durable storage. The wrapping [TrexEngine] must flush the journal to durable state (fsync) before sending out any messages.
/// the following collaborating classes. This class logs to JUL logging as severe. You can configure JUL logging to
/// bridge to your chosen logging framework. This class is not thread safe. The [TrexEngine] will wrap this class and
/// use a virtual thread friendly mutex to ensure that only one thread is calling the algorithm method at a time.
/// - A [Journal] which must be crash durable storage. The wrapping [TrexEngine] will call {@link Journal#sync()} unless
/// it has be constructed with `hostManagedTransactions=true`.
/// - A [QuorumStrategy] which may be a simple majority, in the future FPaxos or UPaxos.
/// This class will mark itself as crashed if it has exceptions due to journal IO errors or if it reads corrupt data.
/// After it has logged to JUL and stderr it will throw the original Exception if any. After that it will always throw an
/// AssertionError see {@link #isCrashed()}.
public class TrexNode {
/// We are using JUL logging to reduce dependencies. You can configure JUL logging to bridge to your chosen logging framework.
static final Logger LOGGER = Logger.getLogger("");

/// We log when we win
private final Level logAtLevel;

private boolean crashed = false;

/// A node is marked as crashed if:
/// 1. The journal experiences an exception writing to the journal.
/// 2. Data is read from the journal that violates the protocol invariants.
/// 3. Explicitly checks of the protocol invariants fail.
/// The original exception if any is logged to both JUL and stderr and thrown. If more messages are attempted
/// an IllegalStateException is thrown.
/// It is expected that the operator must reboot the node. If the journal is corrupt then the
/// operator must restore the journal from a backup possibly and allow it to catch up else clone another node and
/// change the `nodeIdentifier` in the journal to recreate the node. You might use a kubernetes health check or similar
/// to monitor is method and restart the processes if it becomes crashed.
@SuppressWarnings("unused")
public boolean isCrashed() {
return crashed;
}

/// Create a new TrexNode that will load the current progress from the journal. The journal must have been pre-initialised.
///
/// @param logAtLevel The level to log when values are known to be chosen which is logged as "WIN" and when are know to be sequentially fixed with is logged as "FIXED".
/// @param nodeIdentifier The unique node identifier. This must be unique across the cluster and across enough time for prior messages to have been forgotten.
/// @param quorumStrategy The quorum strategy that may be a simple majority, else things like FPaxos or UPaxos
/// @param journal The durable storage and durable log. This must be pre-initialised.
Expand All @@ -53,20 +77,20 @@ public TrexNode(Level logAtLevel, byte nodeIdentifier, QuorumStrategy quorumStra
this.logAtLevel = logAtLevel;
}

/// The current node identifier. This must be globally unique in the cluster. You can manage that using Paxos itself.
/// The current node identifier. This must be globally unique in the cluster.
final byte nodeIdentifier;

/// The durable storage and durable log.
final Journal journal;

// The quorum strategy that may be trivial or may be cluster membership aware to implement UPaxos. You can manage that using Paxos itself.
/// The quorum strategy that may be trivial or may be cluster membership aware to implement UPaxos. You can manage that using Paxos itself.
final QuorumStrategy quorumStrategy;

/// If we have rebooted then we start off as a follower.
/// This is only package private to allow unit tests to set the role.
TrexRole role = FOLLOW;

/// The initial progress is loaded from the Journal at startup. It is the last known state of the node prior to a crash.
/// The initial progress is loaded from the Journal at startup.
Progress progress;

/// During a recovery we will track all the slots that we are probing to find the highest accepted operationBytes.
Expand All @@ -79,70 +103,81 @@ public TrexNode(Level logAtLevel, byte nodeIdentifier, QuorumStrategy quorumStra
/// It is only used by the leader and recoverer. It will be null for a follower.
BallotNumber term = null;

/// This is the main Paxos Algorithm. It is not public as a TrexEngine will wrap this to handle specifics of resetting
/// timeouts. This method will recurse without returning when we need to send a message to ourselves. Possible side
/// effects:
///
/// * The progress record may be updated in memory and saved into the journal.
/// * Accept messages may be written into the journal.
///
/// VERY IMPORTANT: The journal *must* be flushed to durable storage before sending out any messages returned from
/// this method. That ultimately inhibits latency yet batching can be used to maintain throughput. Yet it cannot be
/// skipped without breaking the algorithm.
///
/// This method suppresses warnings about finally clauses that do not terminate normally so that it can check for
/// protocol violations and throw in the finally block. We will use jul logging to log any exception thrown in the
/// main code. The most likely reason to catch an error is that the journal has a problem such as an IOError. Yet
/// we will log then suppress that exception in order to run the finally block to check for protocol violations.
/// This method wraps the main algorithm method with guards to ensure safety. The node will mark itself as crashed
/// if the main algorithm threw an error trying to use the journal else was given corrupted data. It will also mark
/// itself as crashed if it detects the protocol invariants have been violated. See {@link #isCrashed()} which can
/// be monitored by something like a kubernetes health checks to restart the
/// node automatically if it is crashed. See {@link #algorithm(TrexMessage, List, TreeMap)} for the main logic.
/// this method is not thread safe. When this method returns the journal must
/// be made crash durable before sending out any messages.The [TrexEngine] will wrap this class and use a virtual thread friendly mutex to
/// and flush the journal if it is not using host managed transactions. This method will throw an IllegalStateException
/// if the node is crashed for all future calls. The operator must reboot the node. If the journal is corrupt then the
/// operator must restore the journal from a backup possibly or clone another node by change the `nodeIdentifier` in the
/// [Journal].
///
/// @param input The message to process.
/// @return A possibly empty list of messages to send out to the cluster plus a possibly empty list of chosen commands to up-call to the host
/// application.
/// @throws AssertionError if the protocol invariants are violated. This is a none recoverable error. The process should be rebooted so that
/// only what is in the durable journal is used as the state of this node.
@SuppressWarnings("Finally")
/// application. The journal state must be made crash durable before sending out any messages.
/// @throws IllegalStateException If the node has been marked as crashed it will always throw an exception and will
/// need rebooting. See {@link #isCrashed()}.
TrexResult paxos(TrexMessage input) {
if (crashed) {
/// We are in an undefined or corrupted state. See {@link #isCrashed()}
LOGGER.severe(CRASHED);
throw new AssertionError(CRASHED);
/// Just in case the host application has not setup JUL logging property we log to stderr as a last resort.
System.err.println(CRASHED);
throw new IllegalStateException(CRASHED);
}
/// This will hold any outbound message that must only be sent after the journal has been flushed to durable storage.
List<TrexMessage> messages = new ArrayList<>();
/// This will hold any fixed commands. These may be written to the data store under the same translation as the journal.stat.
TreeMap<Long, AbstractCommand> commands = new TreeMap<>();
/// This tracks what our old state was so that we can crash if we change the state for the wrong message types.
final var priorProgress = progress;
try {
/// Run the actual algorithm. This method is void as we the command and message are out parameters.
algorithm(input, messages, commands);
} catch (Throwable e) {
// The most probable reason to throw is an IOError from the journal. So we must kill ourselves!
/// The most probable reason to throw is an IOError from the journal else it returned corrupt data we cannot process. .
crashed = true;
// Log that we are crashing and log the reason.
LOGGER.severe(CRASHING + e);
// We will most likely throw a protocol violation error in the finally block. So here we log the exception to stderr as a last resort.
/// Log that we are crashing and log the reason.
LOGGER.log(Level.SEVERE, CRASHING + e, e);
/// In case the application developer has not correctly configured logging JUL logging we log to stderr.
System.err.println(CRASHING + e);
//noinspection CallToPrintStackTrace
e.printStackTrace();
/// We throw yet the finally block will also run and may also log errors about invariants being violated before
/// the thrown issue is sent up to the host application.
throw e;
} finally {
/// Here we always check the invariants in finally block see {@link #isCrashed()}
if (priorProgress != progress && !priorProgress.equals(progress)) {
// The general advice is not to throw. In this case the general advice is wrong.
// We must throw as we have violated the protocol and that should be seen as fatal.
validateProtocolInvariantElseThrowError(input, priorProgress);
validateProtocolInvariants(input, priorProgress);
}
if (!commands.isEmpty()) {
// The general advice is not to throw. In this case the general advice is wrong.
// We must throw if the journal gives us weird commands as that is a fatal error.
validateCommandIndexesElseThrowError(input, commands, priorProgress);
validateCommandIndexes(input, commands, priorProgress);
}
}
return new TrexResult(messages, commands);
}

private void validateCommandIndexesElseThrowError(TrexMessage input, TreeMap<Long, AbstractCommand> commands, Progress priorProgress) {
private void validateCommandIndexes(TrexMessage input, TreeMap<Long, AbstractCommand> commands, Progress priorProgress) {
// TODO validate that the commands are contiguous using the reduce method
if (commands.lastKey() != progress.highestFixedIndex()) {
final var message = COMMAND_INDEXES + " input=" + input + " priorProgress=" + priorProgress + " progress=" + progress;
LOGGER.severe(message);
throw new AssertionError(message);
}
}

/// This is the main Paxos Algorithm. It is not public as it is wrapped in guards that check the invariants and
/// ensure that the node is stopped if it is in unknown state of if the invariants were violated.
///
/// @param input The message to process.
/// @param messages This is an out version list of messages to send out to the cluster.
private void algorithm(TrexMessage input, List<TrexMessage> messages, TreeMap<Long, AbstractCommand> commands) {
switch (input) {
case Accept accept -> {
Expand Down Expand Up @@ -320,48 +355,50 @@ private boolean fixedSlot(Accept accept) {
return accept.logIndex() <= progress.highestFixedIndex();
}

static final String PROTOCOL_VIOLATION_PROMISES = TrexNode.class.getCanonicalName() + " FATAL SEVERE ERROR Paxos Protocol Violation the promise has been changed when the message is not a PaxosMessage type.";
static final String PROTOCOL_VIOLATION_NUMBER = TrexNode.class.getCanonicalName() + " FATAL SEVERE ERROR Paxos Protocol Violation the promise has decreased.";
static final String PROTOCOL_VIOLATION_INDEX = TrexNode.class.getCanonicalName() + " FATAL SEVERE ERROR Paxos Protocol Violation the fixed slot index has decreased.";
static final String PROTOCOL_VIOLATION_SLOT_FIXING = TrexNode.class.getCanonicalName() + " FATAL SEVERE ERROR Paxos Protocol Violation the promise has been changed when the message is not a SlotFixingMessage type.";
static final String CRASHED = TrexNode.class.getCanonicalName() + "FATAL SEVERE ERROR This node has crashed and must be rebooted. The durable journal state is now the only source of truth.";
static final String CRASHING = TrexNode.class.getCanonicalName() + "FATAL SEVERE ERROR This node has crashed and must be rebooted. The durable journal state is now the only source of truth: ";
static final String COMMAND_INDEXES = TrexNode.class.getCanonicalName() + "FATAL SEVERE ERROR This node has issued commands that do not align to its committed slot index: ";
static final String PROTOCOL_VIOLATION_PROMISES = TrexNode.class.getCanonicalName() + " FATAL SEVERE ERROR CRASHED Paxos Protocol Violation the promise has been changed when the message is not a PaxosMessage type.";
static final String PROTOCOL_VIOLATION_NUMBER = TrexNode.class.getCanonicalName() + " FATAL SEVERE ERROR CRASHED Paxos Protocol Violation the promise has decreased.";
static final String PROTOCOL_VIOLATION_INDEX = TrexNode.class.getCanonicalName() + " FATAL SEVERE ERROR CRASHED Paxos Protocol Violation the fixed slot index has decreased.";
static final String PROTOCOL_VIOLATION_SLOT_FIXING = TrexNode.class.getCanonicalName() + " FATAL SEVERE ERROR CRASHED Paxos Protocol Violation the promise has been changed when the message is not a SlotFixingMessage type.";
static final String CRASHED = TrexNode.class.getCanonicalName() + "FATAL SEVERE ERROR CRASHED This node has crashed and must be rebooted. The durable journal state (if not corrupted) is now the only source of truth.";
static final String CRASHING = TrexNode.class.getCanonicalName() + "FATAL SEVERE ERROR CRASHED This node has crashed and must be rebooted. The durable journal state (if not corrupted) is now the only source of truth to to throwable: ";
static final String COMMAND_INDEXES = TrexNode.class.getCanonicalName() + "FATAL SEVERE ERROR CRASHED This node has issued commands that do not align to its committed slot index: ";

/// Here we check that we have not violated the Paxos algorithm invariants. If we have then we throw an error.
/// We also need to check what is described in the wiki page [Cluster Replication With Paxos for the Java Virtual Machine](https://github.com/trex-paxos/trex-paxos-jvm/wiki)
private void validateProtocolInvariantElseThrowError(TrexMessage input, Progress priorProgress) {
private void validateProtocolInvariants(TrexMessage input, Progress priorProgress) {
final var priorPromise = priorProgress.highestPromised();
final var latestPromise = progress.highestPromised();
final var protocolMessage = input instanceof PaxosMessage;

// only prepare and accept messages can change the promise
if (!priorPromise.equals(latestPromise) && !protocolMessage) {
logSevereAndThrow(PROTOCOL_VIOLATION_PROMISES, input, priorProgress);
this.crashed = true;
final var message = PROTOCOL_VIOLATION_PROMISES + " input=" + input + " priorProgress=" + priorProgress + " progress=" + progress;
LOGGER.severe(message);
}

// promises cannot go backwards the ballot number must only ever increase
if (latestPromise.lessThan(priorPromise)) {
logSevereAndThrow(PROTOCOL_VIOLATION_NUMBER, input, priorProgress);
this.crashed = true;
final var message = PROTOCOL_VIOLATION_NUMBER + " input=" + input + " priorProgress=" + priorProgress + " progress=" + progress;
LOGGER.severe(message);
}

// the fixed slot index must only ever increase
if (priorProgress.highestFixedIndex() > progress.highestFixedIndex()) {
logSevereAndThrow(PROTOCOL_VIOLATION_INDEX, input, priorProgress);
this.crashed = true;
final var message = PROTOCOL_VIOLATION_INDEX + " input=" + input + " priorProgress=" + priorProgress + " progress=" + progress;
LOGGER.severe(message);
} else if (priorProgress.highestFixedIndex() != progress.highestFixedIndex()) {
final var slotFixingMessage = input instanceof SlotFixingMessage;
if (!slotFixingMessage) {
logSevereAndThrow(PROTOCOL_VIOLATION_SLOT_FIXING, input, priorProgress);
this.crashed = true;
final var message = PROTOCOL_VIOLATION_SLOT_FIXING + " input=" + input + " priorProgress=" + priorProgress + " progress=" + progress;
LOGGER.severe(message);
}
}
}

private void logSevereAndThrow(String error, TrexMessage input, Progress priorProgress) {
final var message = error + " input=" + input + " priorProgress=" + priorProgress + " progress=" + progress;
LOGGER.severe(message);
throw new AssertionError(message);
}

private void abdicate(List<TrexMessage> messages) {
messages.clear();
abdicate();
Expand Down Expand Up @@ -627,7 +664,7 @@ private void processPrepareResponse(PrepareResponse prepareResponse, List<TrexMe
// at the next timeout it will increment the counter and be able to lead.
abdicate(messages);
case WIN -> {
// only if we learn that other nodes have prepared higher slots we must nextPrepareMessage them
// only if we learn that other nodes have prepared higher slots we must issue send message for them
votes.values().stream()
.map(PrepareResponse::highestAcceptedIndex)
.max(Long::compareTo)
Expand Down
6 changes: 3 additions & 3 deletions trex2-lib/src/main/java/com/github/trex_paxos/TrexResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@
///
/// @param messages A possibly empty list of messages that were generated during the paxos run.
/// @param commands A possibly empty list of chosen aka fixed commands.
public record TrexResult(List<TrexMessage> messages, Map<Long, AbstractCommand> commands) {
public record TrexResult(List<TrexMessage> messages, TreeMap<Long, AbstractCommand> commands) {
public TrexResult {
messages = List.copyOf(messages);
commands = Map.copyOf(commands);
commands = new TreeMap<>(commands);
}
static TrexResult noResult() {
return new TrexResult(List.of(), Map.of());
return new TrexResult(List.of(), new TreeMap<>());
}

static TrexResult merge(List<TrexResult> results) {
Expand Down
14 changes: 8 additions & 6 deletions trex2-lib/src/main/java/com/github/trex_paxos/msg/Command.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.github.trex_paxos.msg;

import java.util.Arrays;
import java.util.zip.CRC32;

/// A client command which is the value we are trying to fixe. As this library is neutral
/// to the application, the command is completely opaque to the library. The
Expand Down Expand Up @@ -62,11 +62,13 @@ public boolean equals(Object arg0) {
return java.util.Arrays.equals(operationBytes, other.operationBytes);
}

@Override
public String toString() {
return "Command[" +
"clientMsgUuid='" + clientMsgUuid + '\'' +
", operationBytes=" + Arrays.toString(operationBytes) +
']';
CRC32 crc32 = new CRC32();
crc32.update(operationBytes);

return String.format("Command[clientMsgUuid='%s', operationBytes=byte[%d]:CRC32=%d]",
clientMsgUuid,
operationBytes.length,
crc32.getValue());
}
}
Loading

0 comments on commit e5b4bfe

Please sign in to comment.