From fc1ef2f2d9711f7b9e88fe4399445b135d5f8023 Mon Sep 17 00:00:00 2001 From: Liqi Geng Date: Tue, 8 Dec 2020 23:29:23 -0600 Subject: [PATCH] Fix examples and snapshot code of MemStorageCore (#409) Close #408. The commit index of LightReady is not handled in examples. But the test can not pass is mainly due to the bug of generating snapshot in MemStorageCore. The term in snapshot meta should not be the hard_state.term. It should be the log term of meta.index. By the way, the snapshot has an assumption that all entries whose index is less than hard_state.commit has been applied, which is not true when using async ready. So I leave a TODO and I will resolve it soon. Signed-off-by: gengliqi --- examples/five_mem_node/main.rs | 4 ++++ examples/single_mem_node/main.rs | 4 ++++ src/storage.rs | 28 ++++++++++++++++++++-------- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/examples/five_mem_node/main.rs b/examples/five_mem_node/main.rs index 6ed6a2730..a3b729ce1 100644 --- a/examples/five_mem_node/main.rs +++ b/examples/five_mem_node/main.rs @@ -325,6 +325,10 @@ fn on_ready( // Call `RawNode::advance` interface to update position flags in the raft. let mut light_rd = raft_group.advance(ready); + // Update commit index. + if let Some(commit) = light_rd.commit_index() { + store.wl().mut_hard_state().set_commit(commit); + } // Send out the messages. handle_messages(light_rd.take_messages()); // Apply all committed entries. diff --git a/examples/single_mem_node/main.rs b/examples/single_mem_node/main.rs index b7c36d481..ba1eb8425 100644 --- a/examples/single_mem_node/main.rs +++ b/examples/single_mem_node/main.rs @@ -162,6 +162,10 @@ fn on_ready(raft_group: &mut RawNode, cbs: &mut HashMap Result<()> { assert!( self.has_entry_at(index), - "commit_to {} but the entry not exists", + "commit_to {} but the entry does not exist", index ); @@ -193,7 +194,6 @@ impl MemStorageCore { /// Panics if the snapshot index is less than the storage's first index. pub fn apply_snapshot(&mut self, mut snapshot: Snapshot) -> Result<()> { let mut meta = snapshot.take_metadata(); - let term = meta.term; let index = meta.index; if self.first_index() > index { @@ -202,7 +202,7 @@ impl MemStorageCore { self.snapshot_metadata = meta.clone(); - self.raft_state.hard_state.term = term; + self.raft_state.hard_state.term = cmp::max(self.raft_state.hard_state.term, meta.term); self.raft_state.hard_state.commit = index; self.entries.clear(); @@ -214,12 +214,24 @@ impl MemStorageCore { fn snapshot(&self) -> Snapshot { let mut snapshot = Snapshot::default(); - // Use the latest applied_idx to construct the snapshot. - let applied_idx = self.raft_state.hard_state.commit; - let term = self.raft_state.hard_state.term; + // We assume all entries whose indexes are less than `hard_state.commit` + // have been applied, so use the latest commit index to construct the snapshot. + // TODO: This is not true for async ready. let meta = snapshot.mut_metadata(); - meta.index = applied_idx; - meta.term = term; + meta.index = self.raft_state.hard_state.commit; + meta.term = match meta.index.cmp(&self.snapshot_metadata.index) { + cmp::Ordering::Equal => self.snapshot_metadata.term, + cmp::Ordering::Greater => { + let offset = self.entries[0].index; + self.entries[(meta.index - offset) as usize].term + } + cmp::Ordering::Less => { + panic!( + "commit {} < snapshot_metadata.index {}", + meta.index, self.snapshot_metadata.index + ); + } + }; meta.set_conf_state(self.raft_state.conf_state.clone()); snapshot