Skip to content

Commit

Permalink
Merge pull request #1705 from tigerbeetle/matklad/sync-view
Browse files Browse the repository at this point in the history
vsr: sync uses correct view to go into recovering_head
  • Loading branch information
matklad authored Mar 15, 2024
2 parents 017448d + 10665e1 commit 341af50
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 23 deletions.
1 change: 1 addition & 0 deletions src/lsm/forest_fuzz.zig
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ const Environment = struct {
.commit_max = env.checkpoint_op.? + 1,
.sync_op_min = 0,
.sync_op_max = 0,
.sync_view = 0,
.storage_size = vsr.superblock.data_file_size_min +
(env.grid.free_set.highest_address_acquired() orelse 0) * constants.block_size,
.release = 1,
Expand Down
1 change: 1 addition & 0 deletions src/lsm/manifest_log_fuzz.zig
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ const Environment = struct {
.commit_max = vsr.Checkpoint.checkpoint_after(vsr_state.commit_max),
.sync_op_min = 0,
.sync_op_max = 0,
.sync_view = 0,
.storage_size = vsr.superblock.data_file_size_min +
(env.grid.free_set.highest_address_acquired() orelse 0) * constants.block_size,
.release = 1,
Expand Down
1 change: 1 addition & 0 deletions src/lsm/tree_fuzz.zig
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ fn EnvironmentType(comptime table_usage: TableUsage) type {
.commit_max = checkpoint_op + 1,
.sync_op_min = 0,
.sync_op_max = 0,
.sync_view = 0,
.storage_size = vsr.superblock.data_file_size_min +
(env.grid.free_set.highest_address_acquired() orelse 0) * constants.block_size,
.release = 1,
Expand Down
4 changes: 3 additions & 1 deletion src/simulator.zig
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,9 @@ pub const Simulator = struct {
// Even with faults disabled, a replica that was syncing before it crashed
// (or just recently finished syncing before it crashed) may wind up in
// status=recovering_head.
assert(fault or replica.op < replica.op_checkpoint());
assert(fault or
replica.op < replica.op_checkpoint() or
replica.log_view < replica.superblock.working.vsr_state.sync_view);
}

replica_storage.faulty = true;
Expand Down
61 changes: 40 additions & 21 deletions src/vsr/replica.zig
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ pub fn ReplicaType(
self.transition_to_normal_from_recovering_status();
}
} else {
if (self.log_view < self.superblock.working.vsr_state.checkpoint.header.view) {
if (self.log_view < self.superblock.working.vsr_state.sync_view) {
// During state sync, the replica installed CheckpointState from a future view.
self.transition_to_recovering_head();
} else {
Expand Down Expand Up @@ -2702,7 +2702,8 @@ pub fn ReplicaType(
.size = @sizeOf(Header) + @sizeOf(u16) * constants.vsr_releases_max,
.cluster = self.cluster,
.replica = self.replica,
.view = self.view_durable(), // Don't drop pings while the view is being updated.
// Don't drop pings while the view is being updated.
.view = @max(self.view_durable(), self.superblock.working.vsr_state.sync_view),
.release = self.release,
.checkpoint_id = self.superblock.working.checkpoint_id(),
.checkpoint_op = self.op_checkpoint(),
Expand All @@ -2716,7 +2717,13 @@ pub fn ReplicaType(
message.header.set_checksum_body(message.body());
message.header.set_checksum();

assert(message.header.view <= self.view);
// Pings advertise checkpoints, and current checkpoint's view might be greater than
// the replica view.
if (message.header.view > self.view) {
assert(self.status == .recovering_head);
assert(self.superblock.working.vsr_state.sync_view >
self.superblock.working.vsr_state.view);
}

self.send_message_to_other_replicas_and_standbys(message.base());
}
Expand Down Expand Up @@ -3812,16 +3819,17 @@ pub fn ReplicaType(
// Thus, the SuperBlock's `commit_min` is set to 7-2=5.
const vsr_state_commit_min = self.op_checkpoint_next();

const vsr_state_sync_op: struct { min: u64, max: u64 } = sync: {
const vsr_state_sync: struct { op_min: u64, op_max: u64, view: u32 } = sync: {
if (self.sync_content_done()) {
assert(self.sync_tables == null);
assert(self.grid_repair_tables.executing() == 0);

break :sync .{ .min = 0, .max = 0 };
break :sync .{ .op_min = 0, .op_max = 0, .view = 0 };
} else {
break :sync .{
.min = self.superblock.staging.vsr_state.sync_op_min,
.max = self.superblock.staging.vsr_state.sync_op_max,
.op_min = self.superblock.staging.vsr_state.sync_op_min,
.op_max = self.superblock.staging.vsr_state.sync_op_max,
.view = self.superblock.staging.vsr_state.view,
};
}
};
Expand All @@ -3845,8 +3853,9 @@ pub fn ReplicaType(
.{
.header = self.journal.header_with_op(vsr_state_commit_min).?.*,
.commit_max = self.commit_max,
.sync_op_min = vsr_state_sync_op.min,
.sync_op_max = vsr_state_sync_op.max,
.sync_op_min = vsr_state_sync.op_min,
.sync_op_max = vsr_state_sync.op_max,
.sync_view = vsr_state_sync.view,
.manifest_references = self.state_machine.forest.manifest_log.checkpoint_references(),
.free_set_reference = self.grid.free_set_checkpoint.checkpoint_reference(),
.client_sessions_reference = self.client_sessions_checkpoint.checkpoint_reference(),
Expand Down Expand Up @@ -5079,7 +5088,7 @@ pub fn ReplicaType(
return true;
}

if (message.header.view < self.superblock.working.vsr_state.checkpoint.header.view) {
if (message.header.view < self.superblock.working.vsr_state.sync_view) {
assert(self.status == .recovering_head);
log.debug("{}: on_{s}: ignoring (recovering_head, checkpoint from newer view)", .{
self.replica,
Expand Down Expand Up @@ -6473,6 +6482,10 @@ pub fn ReplicaType(
assert(self.journal.has(header));
}

if (header.op == self.op_checkpoint() + 1) {
assert(header.parent == self.superblock.working.vsr_state.checkpoint.header.checksum);
}

if (header.op < self.op_repair_min()) return;

// We must not set an op as dirty if we already have it exactly because:
Expand Down Expand Up @@ -7159,7 +7172,9 @@ pub fn ReplicaType(
// Critical: Do not advertise a view/log_view before it is durable.
// See view_durable()/log_view_durable().
if (message.header.view > self.view_durable() and
message.header.command != .request_start_view)
message.header.command != .request_start_view and
!(message.header.command == .ping and
message.header.view == self.superblock.working.vsr_state.sync_view))
{
// Pings are used for syncing time, so they must not be
// blocked on persisting view.
Expand Down Expand Up @@ -7676,9 +7691,9 @@ pub fn ReplicaType(
assert(self.commit_stage == .idle);
assert(self.journal.header_with_op(self.op) != null);

if (self.log_view < self.superblock.working.vsr_state.checkpoint.header.view) {
if (self.log_view < self.superblock.working.vsr_state.sync_view) {
// Transitioning to recovering head after state sync --- checkpoint is from a
// "future" view, so the replica needs to truncate our log.
// "future" view, so the replica needs to truncate its log.
assert(self.commit_min == self.op_checkpoint());
} else {
if (self.status == .recovering) {
Expand Down Expand Up @@ -8380,6 +8395,7 @@ pub fn ReplicaType(
const stage: *const SyncStage.RequestingCheckpoint =
&self.syncing.requesting_checkpoint;
assert(stage.target.checkpoint_id == vsr.checksum(std.mem.asBytes(checkpoint_state)));
assert(stage.target.view >= checkpoint_state.header.view);

log.debug("{[replica]}: sync_requesting_checkpoint_callback: " ++
"checkpoint_op={[checkpoint_op]} checkpoint_id={[checkpoint_id]x:0>32} " ++
Expand Down Expand Up @@ -8455,6 +8471,7 @@ pub fn ReplicaType(
else
sync_min_new;
};
const sync_view = stage.target.view;

self.sync_message_timeout.stop();
self.superblock.sync(
Expand All @@ -8465,6 +8482,7 @@ pub fn ReplicaType(
.commit_max = self.commit_max,
.sync_op_max = sync_op_max,
.sync_op_min = sync_op_min,
.sync_view = sync_view,
},
);
}
Expand Down Expand Up @@ -8523,12 +8541,16 @@ pub fn ReplicaType(

// The head op must be in the Journal and there should not be a break between the
// checkpoint header and the Journal.
if (self.op < self.op_checkpoint() or
self.log_view < self.superblock.working.vsr_state.checkpoint.header.view)
{
if (self.op < self.op_checkpoint() or self.log_view < stage.target.view) {
self.transition_to_recovering_head();
}

if (self.status == .normal) {
if (self.journal.header_with_op(self.op_checkpoint() + 1)) |header| {
assert(header.parent == self.superblock.working.vsr_state.checkpoint.header.checksum);
}
}

self.grid.open(grid_open_callback);
self.sync_dispatch(.idle);
}
Expand Down Expand Up @@ -8937,7 +8959,7 @@ pub fn ReplicaType(
if (header.replica >= self.replica_count) return; // Ignore messages from standbys.
if (header.replica == self.replica) return; // Ignore messages from self (misdirected).

const candidate = switch (header.into_any()) {
const candidate: SyncTarget = switch (header.into_any()) {
inline .commit, .ping => |h| .{
.checkpoint_id = h.checkpoint_id,
.checkpoint_op = h.checkpoint_op,
Expand Down Expand Up @@ -9001,10 +9023,7 @@ pub fn ReplicaType(
@tagName(self.syncing),
});

self.sync_target_max = .{
.checkpoint_id = candidate.checkpoint_id,
.checkpoint_op = candidate.checkpoint_op,
};
self.sync_target_max = candidate;

if (self.syncing != .idle) {
self.sync_start_from_sync();
Expand Down
17 changes: 16 additions & 1 deletion src/vsr/superblock.zig
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ pub const SuperBlockHeader = extern struct {
/// past them via state sync.)
sync_op_max: u64,

/// The view where it is known that checkpoint.header is committed.
/// It may be greater than view --- during state sync, the replica first accepts a new
/// checkpoint, and then jumps view. Appending checkpoint first is required for appending
/// SV headers to the journal.
sync_view: u32,

/// The last view in which the replica's status was normal.
log_view: u32,

Expand All @@ -140,7 +146,7 @@ pub const SuperBlockHeader = extern struct {
/// Number of replicas (determines sizes of the quorums), part of VSR configuration.
replica_count: u8,

reserved: [783]u8 = [_]u8{0} ** 783,
reserved: [779]u8 = [_]u8{0} ** 779,

comptime {
assert(@sizeOf(VSRState) == 2048);
Expand Down Expand Up @@ -184,6 +190,7 @@ pub const SuperBlockHeader = extern struct {
.commit_max = 0,
.sync_op_min = 0,
.sync_op_max = 0,
.sync_view = 0,
.log_view = 0,
.view = 0,
};
Expand All @@ -192,6 +199,7 @@ pub const SuperBlockHeader = extern struct {
pub fn assert_internally_consistent(state: VSRState) void {
assert(state.commit_max >= state.checkpoint.header.op);
assert(state.sync_op_max >= state.sync_op_min);
if (state.sync_op_max == 0) assert(state.sync_view == 0);
assert(state.view >= state.log_view);
assert(state.replica_count > 0);
assert(state.replica_count <= constants.replicas_max);
Expand Down Expand Up @@ -751,6 +759,7 @@ pub fn SuperBlockType(comptime Storage: type) type {
.commit_max = 0,
.sync_op_min = 0,
.sync_op_max = 0,
.sync_view = 0,
.log_view = 0,
.view = 0,
.replica_count = options.replica_count,
Expand Down Expand Up @@ -805,6 +814,7 @@ pub fn SuperBlockType(comptime Storage: type) type {
commit_max: u64,
sync_op_min: u64,
sync_op_max: u64,
sync_view: u32,
manifest_references: ManifestReferences,
free_set_reference: TrailerReference,
client_sessions_reference: TrailerReference,
Expand Down Expand Up @@ -863,7 +873,9 @@ pub fn SuperBlockType(comptime Storage: type) type {
vsr_state.commit_max = update.commit_max;
vsr_state.sync_op_min = update.sync_op_min;
vsr_state.sync_op_max = update.sync_op_max;
vsr_state.sync_view = update.sync_view;
assert(superblock.staging.vsr_state.would_be_updated_by(vsr_state));
assert(vsr_state.sync_view == 0 or vsr_state.sync_view == vsr_state_staging.sync_view);

context.* = .{
.superblock = superblock,
Expand Down Expand Up @@ -930,6 +942,7 @@ pub fn SuperBlockType(comptime Storage: type) type {
commit_max: u64,
sync_op_min: u64,
sync_op_max: u64,
sync_view: u32,
};

pub fn sync(
Expand All @@ -952,12 +965,14 @@ pub fn SuperBlockType(comptime Storage: type) type {
);
assert(update.sync_op_min <= update.sync_op_max);
assert(update.sync_op_max > update.checkpoint.header.op);
assert(update.sync_view >= update.checkpoint.header.view);

var vsr_state = superblock.staging.vsr_state;
vsr_state.checkpoint = update.checkpoint;
vsr_state.commit_max = update.commit_max;
vsr_state.sync_op_min = update.sync_op_min;
vsr_state.sync_op_max = update.sync_op_max;
vsr_state.sync_view = update.sync_view;

assert(superblock.staging.vsr_state.would_be_updated_by(vsr_state));

Expand Down
4 changes: 4 additions & 0 deletions src/vsr/superblock_fuzz.zig
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ fn run_fuzz(allocator: std.mem.Allocator, seed: u64, transitions_count_total: us
.commit_max = 0,
.sync_op_min = 0,
.sync_op_max = 0,
.sync_view = 0,
.log_view = 0,
.view = 0,
.replica_id = members[replica],
Expand Down Expand Up @@ -334,6 +335,7 @@ const Environment = struct {
.commit_max = env.superblock.staging.vsr_state.commit_max + 3,
.sync_op_min = 0,
.sync_op_max = 0,
.sync_view = 0,
.log_view = env.superblock.staging.vsr_state.log_view + 4,
.view = env.superblock.staging.vsr_state.view + 5,
.replica_id = env.members[replica],
Expand Down Expand Up @@ -415,6 +417,7 @@ const Environment = struct {
.commit_max = vsr_state_old.commit_max + 1,
.sync_op_min = 0,
.sync_op_max = 0,
.sync_view = 0,
.log_view = vsr_state_old.log_view,
.view = vsr_state_old.view,
.replica_id = env.members[replica],
Expand Down Expand Up @@ -455,6 +458,7 @@ const Environment = struct {
.commit_max = vsr_state.commit_max,
.sync_op_min = 0,
.sync_op_max = 0,
.sync_view = 0,
.storage_size = data_file_size_min,
.release = 1,
});
Expand Down
3 changes: 3 additions & 0 deletions src/vsr/sync.zig
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,7 @@ pub const Target = struct {
checkpoint_id: u128,
/// The op_checkpoint() that corresponds to the checkpoint id.
checkpoint_op: u64,
/// The view where the target's checkpoint is committed.
/// It might be greater than `checkpoint.header.view`.
view: u32,
};

0 comments on commit 341af50

Please sign in to comment.