Skip to content

Commit

Permalink
Merge branch 'master' into change-snapshot-persisted
Browse files Browse the repository at this point in the history
  • Loading branch information
gengliqi authored Dec 10, 2020
2 parents b4d6adf + fc1ef2f commit 3139b2d
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 8 deletions.
4 changes: 4 additions & 0 deletions examples/five_mem_node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,10 @@ fn on_ready(

// Call `RawNode::advance` interface to update position flags in the raft.
let mut light_rd = raft_group.advance(ready);
// Update commit index.
if let Some(commit) = light_rd.commit_index() {
store.wl().mut_hard_state().set_commit(commit);
}
// Send out the messages.
handle_messages(light_rd.take_messages());
// Apply all committed entries.
Expand Down
4 changes: 4 additions & 0 deletions examples/single_mem_node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ fn on_ready(raft_group: &mut RawNode<MemStorage>, cbs: &mut HashMap<u8, ProposeC

// Advance the Raft.
let mut light_rd = raft_group.advance(ready);
// Update commit index.
if let Some(commit) = light_rd.commit_index() {
store.wl().mut_hard_state().set_commit(commit);
}
// Send out the messages.
handle_messages(light_rd.take_messages());
// Apply all committed entries.
Expand Down
28 changes: 20 additions & 8 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp;
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};

use crate::eraftpb::*;
Expand Down Expand Up @@ -152,7 +153,7 @@ impl MemStorageCore {
pub fn commit_to(&mut self, index: u64) -> Result<()> {
assert!(
self.has_entry_at(index),
"commit_to {} but the entry not exists",
"commit_to {} but the entry does not exist",
index
);

Expand Down Expand Up @@ -193,7 +194,6 @@ impl MemStorageCore {
/// Panics if the snapshot index is less than the storage's first index.
pub fn apply_snapshot(&mut self, mut snapshot: Snapshot) -> Result<()> {
let mut meta = snapshot.take_metadata();
let term = meta.term;
let index = meta.index;

if self.first_index() > index {
Expand All @@ -202,7 +202,7 @@ impl MemStorageCore {

self.snapshot_metadata = meta.clone();

self.raft_state.hard_state.term = term;
self.raft_state.hard_state.term = cmp::max(self.raft_state.hard_state.term, meta.term);
self.raft_state.hard_state.commit = index;
self.entries.clear();

Expand All @@ -214,12 +214,24 @@ impl MemStorageCore {
fn snapshot(&self) -> Snapshot {
let mut snapshot = Snapshot::default();

// Use the latest applied_idx to construct the snapshot.
let applied_idx = self.raft_state.hard_state.commit;
let term = self.raft_state.hard_state.term;
// We assume all entries whose indexes are less than `hard_state.commit`
// have been applied, so use the latest commit index to construct the snapshot.
// TODO: This is not true for async ready.
let meta = snapshot.mut_metadata();
meta.index = applied_idx;
meta.term = term;
meta.index = self.raft_state.hard_state.commit;
meta.term = match meta.index.cmp(&self.snapshot_metadata.index) {
cmp::Ordering::Equal => self.snapshot_metadata.term,
cmp::Ordering::Greater => {
let offset = self.entries[0].index;
self.entries[(meta.index - offset) as usize].term
}
cmp::Ordering::Less => {
panic!(
"commit {} < snapshot_metadata.index {}",
meta.index, self.snapshot_metadata.index
);
}
};

meta.set_conf_state(self.raft_state.conf_state.clone());
snapshot
Expand Down

0 comments on commit 3139b2d

Please sign in to comment.