Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support asynchronous ready in 0.6.x #406

Merged
merged 6 commits into from
Nov 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ rust:
- stable
- nightly
# Officially the oldest compiler we support.
- 1.39.0
- 1.42.0
matrix:
include:
- os: windows
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ default-logger = ["slog-stdlog", "slog-envlogger", "slog-term"]
fxhash = "0.2.1"
fail = { version = "0.3", optional = true }
getset = "0.0.9"
protobuf = "2"
protobuf = ">= 2.0, <= 2.14"
quick-error = "1.2.2"
raft-proto = { path = "proto", version = "0.6.0-alpha", default-features = false }
rand = "0.7"
Expand Down
18 changes: 12 additions & 6 deletions benches/suites/raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,10 @@ fn test_ready_raft_node(logger: &slog::Logger) -> RawNode<MemStorage> {
let mut node = quick_raw_node(logger);
node.raft.become_candidate();
node.raft.become_leader();
node.raft.raft_log.stable_to(1, 1);
let unstable = node.raft.raft_log.unstable_entries().to_vec();
node.raft.raft_log.stable_entries();
node.raft.raft_log.store.wl().append(&unstable).expect("");
node.raft.on_persist_entries(1, 1);
node.raft.commit_apply(1);
let mut entries = vec![];
for i in 1..101 {
Expand All @@ -114,11 +117,14 @@ fn test_ready_raft_node(logger: &slog::Logger) -> RawNode<MemStorage> {
e.term = 1;
entries.push(e);
}
let mut unstable_entries = entries.clone();
node.raft.raft_log.store.wl().append(&entries).expect("");
node.raft.raft_log.unstable.offset = 102;
let _ = node.raft.append_entry(&mut entries);
let unstable = node.raft.raft_log.unstable_entries().to_vec();
node.raft.raft_log.stable_entries();
node.raft.raft_log.store.wl().append(&unstable).expect("");
node.raft.raft_log.stable_entries();
// This increases 'committed_index' to `last_index` because there is only one node in quorum.
node.raft.append_entry(&mut unstable_entries);
node.raft
.on_persist_entries(node.raft.raft_log.last_index(), 1);

let mut snap = Snapshot::default();
snap.set_data(vec![0; 8 * 1024 * 1024]);
Expand All @@ -128,6 +134,6 @@ fn test_ready_raft_node(logger: &slog::Logger) -> RawNode<MemStorage> {
node.raft.msgs.push(Message::default());
}
// Force reverting committed index to provide us some entries to be stored from next `Ready`
node.raft.raft_log.committed = 101;
node.raft.raft_log.committed = 1;
node
}
125 changes: 67 additions & 58 deletions examples/five_mem_node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,15 +246,22 @@ fn on_ready(
// Get the `Ready` with `RawNode::ready` interface.
let mut ready = raft_group.ready();

// Persistent raft logs. It's necessary because in `RawNode::advance` we stabilize
// raft logs to the latest position.
if let Err(e) = store.wl().append(ready.entries()) {
error!(
logger,
"persist raft log fail: {:?}, need to retry or panic", e
);
return;
}
let handle_messages = |msgs: Vec<Vec<Message>>| {
for vec_msg in msgs {
for msg in vec_msg {
let to = msg.to;
if mailboxes[&to].send(msg).is_err() {
error!(
logger,
"send raft message to {} fail, let Raft retry it", to
);
}
}
}
};

// Send out the messages come from the node.
handle_messages(ready.take_messages());

// Apply the snapshot. It's necessary because in `RawNode::advance` we stabilize the snapshot.
if *ready.snapshot() != Snapshot::default() {
Expand All @@ -268,60 +275,62 @@ fn on_ready(
}
}

// Send out the messages come from the node.
for msg in ready.messages.drain(..) {
let to = msg.to;
if mailboxes[&to].send(msg).is_err() {
error!(
logger,
"send raft message to {} fail, let Raft retry it", to
);
}
}

// Apply all committed proposals.
if let Some(committed_entries) = ready.committed_entries.take() {
for entry in &committed_entries {
if entry.data.is_empty() {
// From new elected leaders.
continue;
}
if let EntryType::EntryConfChange = entry.get_entry_type() {
// For conf change messages, make them effective.
let mut cc = ConfChange::default();
cc.merge_from_bytes(&entry.data).unwrap();
let node_id = cc.node_id;
match cc.get_change_type() {
ConfChangeType::AddNode => raft_group.raft.add_node(node_id).unwrap(),
ConfChangeType::RemoveNode => raft_group.raft.remove_node(node_id).unwrap(),
ConfChangeType::AddLearnerNode => raft_group.raft.add_learner(node_id).unwrap(),
let mut handle_committed_entries =
|rn: &mut RawNode<MemStorage>, committed_entries: Vec<Entry>| {
for entry in committed_entries {
if entry.data.is_empty() {
// From new elected leaders.
continue;
}
let cs = raft_group.raft.prs().configuration().to_conf_state();
store.wl().set_conf_state(cs);
} else {
// For normal proposals, extract the key-value pair and then
// insert them into the kv engine.
let data = str::from_utf8(&entry.data).unwrap();
let reg = Regex::new("put ([0-9]+) (.+)").unwrap();
if let Some(caps) = reg.captures(&data) {
kv_pairs.insert(caps[1].parse().unwrap(), caps[2].to_string());
if let EntryType::EntryConfChange = entry.get_entry_type() {
// For conf change messages, make them effective.
let mut cc = ConfChange::default();
cc.merge_from_bytes(&entry.data).unwrap();
let cs = rn.apply_conf_change(&cc).unwrap();
store.wl().set_conf_state(cs);
} else {
// For normal proposals, extract the key-value pair and then
// insert them into the kv engine.
let data = str::from_utf8(&entry.data).unwrap();
let reg = Regex::new("put ([0-9]+) (.+)").unwrap();
if let Some(caps) = reg.captures(&data) {
kv_pairs.insert(caps[1].parse().unwrap(), caps[2].to_string());
}
}
if rn.raft.state == StateRole::Leader {
// The leader should response to the clients, tell them if their proposals
// succeeded or not.
let proposal = proposals.lock().unwrap().pop_front().unwrap();
proposal.propose_success.send(true).unwrap();
}
}
if raft_group.raft.state == StateRole::Leader {
// The leader should response to the clients, tell them if their proposals
// succeeded or not.
let proposal = proposals.lock().unwrap().pop_front().unwrap();
proposal.propose_success.send(true).unwrap();
}
}
if let Some(last_committed) = committed_entries.last() {
let mut s = store.wl();
s.mut_hard_state().commit = last_committed.index;
s.mut_hard_state().term = last_committed.term;
}
};
// Apply all committed entries.
handle_committed_entries(raft_group, ready.take_committed_entries());

// Persistent raft logs. It's necessary because in `RawNode::advance` we stabilize
// raft logs to the latest position.
if let Err(e) = store.wl().append(ready.entries()) {
error!(
logger,
"persist raft log fail: {:?}, need to retry or panic", e
);
return;
}

if let Some(hs) = ready.hs() {
// Raft HardState changed, and we need to persist it.
store.wl().set_hardstate(hs.clone());
}

// Call `RawNode::advance` interface to update position flags in the raft.
raft_group.advance(ready);
let mut light_rd = raft_group.advance(ready);
// Send out the messages.
handle_messages(light_rd.take_messages());
// Apply all committed entries.
handle_committed_entries(raft_group, light_rd.take_committed_entries());
// Advance the apply index.
raft_group.advance_apply();
}

fn example_config() -> Config {
Expand Down
74 changes: 36 additions & 38 deletions examples/single_mem_node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,52 +102,33 @@ fn main() {
}
}

fn on_ready(r: &mut RawNode<MemStorage>, cbs: &mut HashMap<u8, ProposeCallback>) {
if !r.has_ready() {
fn on_ready(raft_group: &mut RawNode<MemStorage>, cbs: &mut HashMap<u8, ProposeCallback>) {
if !raft_group.has_ready() {
return;
}
let store = raft_group.raft.raft_log.store.clone();

// The Raft is ready, we can do something now.
let mut ready = r.ready();
// Get the `Ready` with `RawNode::ready` interface.
let mut ready = raft_group.ready();

let is_leader = r.raft.leader_id == r.raft.id;
if is_leader {
// If the peer is leader, the leader can send messages to other followers ASAP.
let msgs = ready.messages.drain(..);
for _msg in msgs {
// Here we only have one peer, so can ignore this.
let handle_messages = |msgs: Vec<Vec<Message>>| {
for vec_msg in msgs {
for _msg in vec_msg {
// Send messages to other peers.
}
}
}
};

// Send out the messages come from the node.
handle_messages(ready.take_messages());

if !raft::is_empty_snap(ready.snapshot()) {
// This is a snapshot, we need to apply the snapshot at first.
r.mut_store()
.wl()
.apply_snapshot(ready.snapshot().clone())
.unwrap();
}

if !ready.entries().is_empty() {
// Append entries to the Raft log
r.mut_store().wl().append(ready.entries()).unwrap();
}

if let Some(hs) = ready.hs() {
// Raft HardState changed, and we need to persist it.
r.mut_store().wl().set_hardstate(hs.clone());
}

if !is_leader {
// If not leader, the follower needs to reply the messages to
// the leader after appending Raft entries.
let msgs = ready.messages.drain(..);
for _msg in msgs {
// Send messages to other peers.
}
store.wl().apply_snapshot(ready.snapshot().clone()).unwrap();
}

if let Some(committed_entries) = ready.committed_entries.take() {
let mut _last_apply_index = 0;
let mut _last_apply_index = 0;
let mut handle_committed_entries = |committed_entries: Vec<Entry>| {
for entry in committed_entries {
// Mostly, you need to save the last apply index to resume applying
// after restart. Here we just ignore this because we use a Memory storage.
Expand All @@ -166,10 +147,27 @@ fn on_ready(r: &mut RawNode<MemStorage>, cbs: &mut HashMap<u8, ProposeCallback>)

// TODO: handle EntryConfChange
}
};
handle_committed_entries(ready.take_committed_entries());

if !ready.entries().is_empty() {
// Append entries to the Raft log.
store.wl().append(&ready.entries()).unwrap();
}

if let Some(hs) = ready.hs() {
// Raft HardState changed, and we need to persist it.
store.wl().set_hardstate(hs.clone());
}

// Advance the Raft
r.advance(ready);
// Advance the Raft.
let mut light_rd = raft_group.advance(ready);
// Send out the messages.
handle_messages(light_rd.take_messages());
// Apply all committed entries.
handle_committed_entries(light_rd.take_committed_entries());
// Advance the apply index.
raft_group.advance_apply();
}

fn send_propose(logger: Logger, sender: mpsc::Sender<Msg>) {
Expand Down
20 changes: 20 additions & 0 deletions harness/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,26 @@ impl Interface {
None => vec![],
}
}

/// Persist the unstable snapshot and entries.
pub fn persist(&mut self) {
if self.raft.is_some() {
if let Some(snapshot) = self.raft_log.unstable_snapshot() {
let snap = snapshot.clone();
self.raft_log.stable_snap();
let index = snap.get_metadata().index;
self.mut_store().wl().apply_snapshot(snap).expect("");
self.commit_apply(index);
}
let unstable = self.raft_log.unstable_entries().to_vec();
if !unstable.is_empty() {
self.raft_log.stable_entries();
let last_entry = unstable.last().unwrap();
self.mut_store().wl().append(&unstable).expect("");
self.on_persist_entries(last_entry.index, last_entry.term);
}
}
}
}

impl From<Option<Raft<MemStorage>>> for Interface {
Expand Down
2 changes: 2 additions & 0 deletions harness/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ impl Network {
let resp = {
let p = self.peers.get_mut(&m.to).unwrap();
let _ = p.step(m);
// The unstable data should be persisted before sending msg.
p.persist();
p.read_messages()
};
new_msgs.append(&mut self.filter(resp));
Expand Down
Loading