Skip to content

Commit

Permalink
Paxe (#9)
Browse files Browse the repository at this point in the history
main changes
  • Loading branch information
simbo1905 authored Jan 19, 2025
1 parent 6c76a24 commit c02b219
Show file tree
Hide file tree
Showing 45 changed files with 1,116 additions and 935 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ target/
trex2-lib/target/
.idea/
*/.jqwik-database
all.java
File renamed without changes.
2 changes: 1 addition & 1 deletion trex-lib/src/main/java/com/github/trex_paxos/Command.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.util.UUID;
import java.util.zip.CRC32;

/// A client command which is the value we are trying to fix. As this library is neutral
/// A client command which is the id we are trying to fix. As this library is neutral
/// to the application, the command is completely opaque to the library. The
/// application is responsible for encoding and decoding the commands from and to byte array.
///
Expand Down
8 changes: 4 additions & 4 deletions trex-lib/src/main/java/com/github/trex_paxos/Journal.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
/// It is important to note you should not delete `accept` messages the moment they are up-called into the application.
/// They should be kept around so that other nodes can request retransmission of missed messages. To
/// keep the database size under control you can run a cronjob that reads the {@link Progress#highestFixedIndex()}
/// from all databases and take the min value. You can then delete all `accept` messages from all nodes that are at a
/// from all databases and take the min id. You can then delete all `accept` messages from all nodes that are at a
/// lower log slot index.
///
/// VERY IMPORTANT: If you get errors where you don't know what the state of the underlying journal has become you should call
Expand All @@ -68,12 +68,12 @@ public interface Journal {
/// history when moving nodes between servers we require the node identifier. This is only a safety feature.
Progress readProgress(short nodeIdentifier);

/// Save a value into the log.
/// Save a value wrapped in an `accept` into the log.
/// Logically this method is storing `accept(S,N,V)` so it needs to store the values `{N,V}` at log slot `S`
/// The `N` is the term number of the leader that generated the message.
/// Typically, values are written in sequential `S` order.
///
/// @param accept An accept message that is a log index, command value and term number.
/// @param accept An accept message that is a log index, command id and term number.
void writeAccept(Accept accept);

/// Load any accept record from the log. There may be no accept record for the given log index.
Expand All @@ -82,7 +82,7 @@ public interface Journal {
/// You should not delete any `accept` messages until you know all nodes have a higher [Progress#highestFixedIndex()]
/// than the log index of the accept message.
///
/// When a slot is learned to be fixed by a `fixed(S,N')` the value is read from the log and if `N' != N` then
/// When a slot is learned to be fixed by a `fixed(S,N')` the id is read from the log and if `N' != N` then
/// Retransmission of the correct `accept` will be requested from the leader.
///
/// @param logIndex The log slot to load the accept record for.
Expand Down
269 changes: 143 additions & 126 deletions trex-lib/src/main/java/com/github/trex_paxos/Pickle.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,136 +15,153 @@
*/
package com.github.trex_paxos;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.UUID;

import com.github.trex_paxos.msg.Accept;

import java.io.*;
import java.util.UUID;

/// Pickle is a utility class for serializing and deserializing the record types that the [Journal] uses.
/// Java serialization is famously broken but the Java Platform team are working on it.
/// This class does things the boilerplate way.
public class Pickle {

public static byte[] writeProgress(Progress progress) throws IOException {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(byteArrayOutputStream)) {
write(progress, dos);
return byteArrayOutputStream.toByteArray();
}
}

public static void write(Progress progress, DataOutputStream dos) throws IOException {
dos.writeShort(progress.nodeIdentifier());
write(progress.highestPromised(), dos);
dos.writeLong(progress.highestFixedIndex());
}

public static Progress readProgress(byte[] pickled) throws IOException {
try (ByteArrayInputStream bis = new ByteArrayInputStream(pickled);
DataInputStream dis = new DataInputStream(bis)) {
return readProgress(dis);
}
}

private static Progress readProgress(DataInputStream dis) throws IOException {
return new Progress(dis.readShort(), readBallotNumber(dis), dis.readLong());
}

public static byte[] write(BallotNumber n) throws IOException {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(byteArrayOutputStream)) {
write(n, dos);
return byteArrayOutputStream.toByteArray();
}
}

public static void write(BallotNumber n, DataOutputStream dataOutputStream) throws IOException {
dataOutputStream.writeInt(n.counter());
dataOutputStream.writeShort(n.nodeIdentifier());
}

public static BallotNumber readBallotNumber(byte[] pickled) throws IOException {
try (ByteArrayInputStream bis = new ByteArrayInputStream(pickled);
DataInputStream dis = new DataInputStream(bis)) {
return readBallotNumber(dis);
}
}

public static BallotNumber readBallotNumber(DataInputStream dataInputStream) throws IOException {
return new BallotNumber(dataInputStream.readInt(), dataInputStream.readShort());
}

public static void write(Accept m, DataOutputStream dataStream) throws IOException {
dataStream.writeShort(m.from());
dataStream.writeLong(m.slot());
write(m.number(), dataStream);
write(m.command(), dataStream);
}

public static Accept readAccept(DataInputStream dataInputStream) throws IOException {
final short from = dataInputStream.readShort();
final long logIndex = dataInputStream.readLong();
final BallotNumber number = readBallotNumber(dataInputStream);
final var command = readCommand(dataInputStream);
return new Accept(from, logIndex, number, command);
}

public static byte[] write(AbstractCommand c) throws IOException {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(byteArrayOutputStream)) {
write(c, dos);
return byteArrayOutputStream.toByteArray();
}
}

public static void write(AbstractCommand c, DataOutputStream dataStream) throws IOException {
switch (c) {
case NoOperation _ ->
// Here we use zero bytes as a sentinel to represent the NOOP command.
dataStream.writeInt(0);
case Command command -> {
dataStream.writeInt(command.operationBytes().length);
dataStream.write(command.operationBytes());
final var uuid = command.uuid();
dataStream.writeLong(uuid.getMostSignificantBits());
dataStream.writeLong(uuid.getLeastSignificantBits());
}
}
}

public static AbstractCommand readCommand(byte[] pickled) throws IOException {
try (ByteArrayInputStream bis = new ByteArrayInputStream(pickled);
DataInputStream dis = new DataInputStream(bis)) {
return readCommand(dis);
}
}

public static AbstractCommand readCommand(DataInputStream dataInputStream) throws IOException {
final var byteLength = dataInputStream.readInt();
if (byteLength == 0) {
return NoOperation.NOOP;
}
byte[] bytes = new byte[byteLength];
dataInputStream.readFully(bytes);
return new Command(new UUID(dataInputStream.readLong(), dataInputStream.readLong()), bytes);
}

public static byte[] write(Accept a) throws IOException {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(byteArrayOutputStream)) {
write(a, dos);
return byteArrayOutputStream.toByteArray();
}
}

public static Accept readAccept(byte[] pickled) throws IOException {
try (ByteArrayInputStream bis = new ByteArrayInputStream(pickled);
DataInputStream dis = new DataInputStream(bis)) {
return readAccept(dis);
}
}
}
public static Pickler<Command> instance = new Pickler<>() {

@Override
public byte[] serialize(Command cmd) {
try {
return Pickle.write(cmd);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public Command deserialize(byte[] bytes) {
try {
return (Command) Pickle.readCommand(bytes);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
};

public static byte[] writeProgress(Progress progress) throws IOException {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(byteArrayOutputStream)) {
write(progress, dos);
return byteArrayOutputStream.toByteArray();
}
}

public static void write(Progress progress, DataOutputStream dos) throws IOException {
dos.writeShort(progress.nodeIdentifier());
write(progress.highestPromised(), dos);
dos.writeLong(progress.highestFixedIndex());
}

public static Progress readProgress(byte[] pickled) throws IOException {
try (ByteArrayInputStream bis = new ByteArrayInputStream(pickled);
DataInputStream dis = new DataInputStream(bis)) {
return readProgress(dis);
}
}

private static Progress readProgress(DataInputStream dis) throws IOException {
return new Progress(dis.readShort(), readBallotNumber(dis), dis.readLong());
}

public static byte[] write(BallotNumber n) throws IOException {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(byteArrayOutputStream)) {
write(n, dos);
return byteArrayOutputStream.toByteArray();
}
}

public static void write(BallotNumber n, DataOutputStream dataOutputStream) throws IOException {
dataOutputStream.writeInt(n.counter());
dataOutputStream.writeShort(n.nodeIdentifier());
}

public static BallotNumber readBallotNumber(byte[] pickled) throws IOException {
try (ByteArrayInputStream bis = new ByteArrayInputStream(pickled);
DataInputStream dis = new DataInputStream(bis)) {
return readBallotNumber(dis);
}
}

public static BallotNumber readBallotNumber(DataInputStream dataInputStream) throws IOException {
return new BallotNumber(dataInputStream.readInt(), dataInputStream.readShort());
}

public static void write(Accept m, DataOutputStream dataStream) throws IOException {
dataStream.writeShort(m.from());
dataStream.writeLong(m.slot());
write(m.number(), dataStream);
write(m.command(), dataStream);
}

public static Accept readAccept(DataInputStream dataInputStream) throws IOException {
final short from = dataInputStream.readShort();
final long logIndex = dataInputStream.readLong();
final BallotNumber number = readBallotNumber(dataInputStream);
final var command = readCommand(dataInputStream);
return new Accept(from, logIndex, number, command);
}

public static byte[] write(AbstractCommand c) throws IOException {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(byteArrayOutputStream)) {
write(c, dos);
return byteArrayOutputStream.toByteArray();
}
}

public static void write(AbstractCommand c, DataOutputStream dataStream) throws IOException {
switch (c) {
case NoOperation _ ->
// Here we use zero bytes as a sentinel to represent the NOOP command.
dataStream.writeInt(0);
case Command command -> {
dataStream.writeInt(command.operationBytes().length);
dataStream.write(command.operationBytes());
final var uuid = command.uuid();
dataStream.writeLong(uuid.getMostSignificantBits());
dataStream.writeLong(uuid.getLeastSignificantBits());
}
}
}

public static AbstractCommand readCommand(byte[] pickled) throws IOException {
try (ByteArrayInputStream bis = new ByteArrayInputStream(pickled);
DataInputStream dis = new DataInputStream(bis)) {
return readCommand(dis);
}
}

public static AbstractCommand readCommand(DataInputStream dataInputStream) throws IOException {
final var byteLength = dataInputStream.readInt();
if (byteLength == 0) {
return NoOperation.NOOP;
}
byte[] bytes = new byte[byteLength];
dataInputStream.readFully(bytes);
return new Command(new UUID(dataInputStream.readLong(), dataInputStream.readLong()), bytes);
}

public static byte[] write(Accept a) throws IOException {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(byteArrayOutputStream)) {
write(a, dos);
return byteArrayOutputStream.toByteArray();
}
}

public static Accept readAccept(byte[] pickled) throws IOException {
try (ByteArrayInputStream bis = new ByteArrayInputStream(pickled);
DataInputStream dis = new DataInputStream(bis)) {
return readAccept(dis);
}
}
}
2 changes: 1 addition & 1 deletion trex-lib/src/main/java/com/github/trex_paxos/Progress.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public record Progress(
) {

/**
* When an application initializes an empty journal it has to have a NIL value.
* When an application initializes an empty journal it has to have a NIL id.
*
* @param nodeIdentifier The current node identifier.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
/// - In the primary region set one node to have a voting weight of 2, the other to have a voting weight of 1
/// - In the secondary region set both nodes to have a voting weight of 1
/// - In the `accept` phase use the weighted quorum strategy. This will mean that when the primary contacts the other
/// node in the primary region the value is accepted without contacting the secondary region.
/// node in the primary region the id is accepted without contacting the secondary region.
/// - In the `prepare` phase do not use weights use a simple majority. That requires one vote in the opposite region to
/// force any node in the secondary region to promote to being the leader.
/// - When you want to actually fail-over between regions the host application as a distributed system may need some things
Expand Down
Loading

0 comments on commit c02b219

Please sign in to comment.