diff --git a/examples/raft-kv-memstore/src/store/mod.rs b/examples/raft-kv-memstore/src/store/mod.rs index b040bb886..32e17c45f 100644 --- a/examples/raft-kv-memstore/src/store/mod.rs +++ b/examples/raft-kv-memstore/src/store/mod.rs @@ -20,7 +20,6 @@ use openraft::RaftLogReader; use openraft::RaftSnapshotBuilder; use openraft::RaftStorage; use openraft::SnapshotMeta; -use openraft::StateMachineChanges; use openraft::StorageError; use openraft::StorageIOError; use openraft::Vote; @@ -305,7 +304,7 @@ impl RaftStorage for Arc { &mut self, meta: &SnapshotMeta, snapshot: Box, - ) -> Result, StorageError> { + ) -> Result<(), StorageError> { tracing::info!( { snapshot_size = snapshot.get_ref().len() }, "decoding snapshot for installation" @@ -333,10 +332,7 @@ impl RaftStorage for Arc { // Update current snapshot. let mut current_snapshot = self.current_snapshot.write().await; *current_snapshot = Some(new_snapshot); - Ok(StateMachineChanges { - last_applied: meta.last_log_id, - is_snapshot: true, - }) + Ok(()) } #[tracing::instrument(level = "trace", skip(self))] diff --git a/examples/raft-kv-rocksdb/src/store.rs b/examples/raft-kv-rocksdb/src/store.rs index ab8510332..0a34e1360 100644 --- a/examples/raft-kv-rocksdb/src/store.rs +++ b/examples/raft-kv-rocksdb/src/store.rs @@ -24,7 +24,6 @@ use openraft::RaftLogReader; use openraft::RaftSnapshotBuilder; use openraft::RaftStorage; use openraft::SnapshotMeta; -use openraft::StateMachineChanges; use openraft::StorageError; use openraft::StorageIOError; use openraft::Vote; @@ -527,7 +526,7 @@ impl RaftStorage for Arc { &mut self, meta: &SnapshotMeta, snapshot: Box, - ) -> Result, StorageError> { + ) -> Result<(), StorageError> { tracing::info!( { snapshot_size = snapshot.get_ref().len() }, "decoding snapshot for installation" @@ -553,10 +552,7 @@ impl RaftStorage for Arc { } self.set_current_snapshot_(new_snapshot)?; - Ok(StateMachineChanges { - last_applied: meta.last_log_id, - is_snapshot: true, - }) + Ok(()) } #[tracing::instrument(level = "trace", skip(self))] diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index ff0092cc5..3f3f6dd83 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -23,7 +23,6 @@ use openraft::LogId; use openraft::RaftStorage; use openraft::RaftStorageDebug; use openraft::SnapshotMeta; -use openraft::StateMachineChanges; use openraft::StorageError; use openraft::StorageIOError; use openraft::Vote; @@ -358,7 +357,7 @@ impl RaftStorage for Arc { &mut self, meta: &SnapshotMeta, snapshot: Box, - ) -> Result, StorageError> { + ) -> Result<(), StorageError> { tracing::info!( { snapshot_size = snapshot.get_ref().len() }, "decoding snapshot for installation" @@ -392,10 +391,7 @@ impl RaftStorage for Arc { // Update current snapshot. let mut current_snapshot = self.current_snapshot.write().await; *current_snapshot = Some(new_snapshot); - Ok(StateMachineChanges { - last_applied: meta.last_log_id, - is_snapshot: true, - }) + Ok(()) } #[tracing::instrument(level = "trace", skip(self))] diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 17647899f..9f618b688 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -1685,9 +1685,8 @@ impl, S: RaftStorage> RaftRuntime let snapshot_data = self.received_snapshot.remove(&snapshot_meta.snapshot_id); if let Some(data) = snapshot_data { - // TODO: `changes` is not used - let changes = self.storage.install_snapshot(snapshot_meta, data).await?; - tracing::debug!("update after install-snapshot: {:?}", changes); + self.storage.install_snapshot(snapshot_meta, data).await?; + tracing::debug!("Done install_snapshot, meta: {:?}", snapshot_meta); } else { unreachable!("buffered snapshot not found: snapshot meta: {:?}", snapshot_meta) } diff --git a/openraft/src/lib.rs b/openraft/src/lib.rs index 8f8d0582b..a2bbbae82 100644 --- a/openraft/src/lib.rs +++ b/openraft/src/lib.rs @@ -81,7 +81,6 @@ pub use crate::raft_types::LogIdOptionExt; pub(crate) use crate::raft_types::MetricsChangeFlags; pub use crate::raft_types::SnapshotId; pub use crate::raft_types::SnapshotSegmentId; -pub use crate::raft_types::StateMachineChanges; pub use crate::raft_types::Update; pub use crate::storage::RaftLogReader; pub use crate::storage::RaftSnapshotBuilder; diff --git a/openraft/src/raft_types.rs b/openraft/src/raft_types.rs index 243fc581c..be06db9bd 100644 --- a/openraft/src/raft_types.rs +++ b/openraft/src/raft_types.rs @@ -4,7 +4,6 @@ use std::fmt::Formatter; use crate::LeaderId; use crate::MessageSummary; use crate::NodeId; -use crate::RaftTypeConfig; /// The identity of a raft log. /// A term, node_id and an index identifies an log globally. @@ -197,11 +196,3 @@ impl MetricsChangeFlags { self.cluster = true } } - -/// The changes of a state machine. -/// E.g. when applying a log to state machine, or installing a state machine from snapshot. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct StateMachineChanges { - pub last_applied: Option>, - pub is_snapshot: bool, -} diff --git a/openraft/src/storage/mod.rs b/openraft/src/storage/mod.rs index 9ec0cf626..2bfc2105c 100644 --- a/openraft/src/storage/mod.rs +++ b/openraft/src/storage/mod.rs @@ -16,7 +16,6 @@ use crate::defensive::check_range_matches_entries; use crate::membership::EffectiveMembership; use crate::node::Node; use crate::raft_types::SnapshotId; -use crate::raft_types::StateMachineChanges; use crate::Entry; use crate::LogId; use crate::MessageSummary; @@ -283,7 +282,7 @@ where C: RaftTypeConfig /// for details on log compaction / snapshotting. async fn begin_receiving_snapshot(&mut self) -> Result, StorageError>; - /// Install a snapshot which has finished streaming from the cluster leader. + /// Install a snapshot which has finished streaming from the leader. /// /// All other snapshots should be deleted at this point. /// @@ -293,7 +292,7 @@ where C: RaftTypeConfig &mut self, meta: &SnapshotMeta, snapshot: Box, - ) -> Result, StorageError>; + ) -> Result<(), StorageError>; /// Get a readable handle to the current snapshot, along with its metadata. /// diff --git a/openraft/src/store_ext.rs b/openraft/src/store_ext.rs index 10198c3d9..982bc1ca6 100644 --- a/openraft/src/store_ext.rs +++ b/openraft/src/store_ext.rs @@ -21,7 +21,6 @@ use crate::RaftStorage; use crate::RaftStorageDebug; use crate::RaftTypeConfig; use crate::SnapshotMeta; -use crate::StateMachineChanges; use crate::StorageError; use crate::Vote; use crate::Wrapper; @@ -178,7 +177,7 @@ where &mut self, meta: &SnapshotMeta, snapshot: Box, - ) -> Result, StorageError> { + ) -> Result<(), StorageError> { self.inner().install_snapshot(meta, snapshot).await } diff --git a/openraft/tests/snapshot/t40_purge_in_snapshot_logs.rs b/openraft/tests/snapshot/t40_purge_in_snapshot_logs.rs index d4b4fe556..82db8efb1 100644 --- a/openraft/tests/snapshot/t40_purge_in_snapshot_logs.rs +++ b/openraft/tests/snapshot/t40_purge_in_snapshot_logs.rs @@ -7,6 +7,7 @@ use openraft::Config; use openraft::LeaderId; use openraft::LogId; use openraft::RaftLogReader; +use tokio::time::sleep; use crate::fixtures::init_default_ut_tracing; use crate::fixtures::RaftRouter; @@ -46,7 +47,7 @@ async fn purge_in_snapshot_logs() -> Result<()> { assert_eq!(max_keep as usize, logs.len()); } - // Leader: .......15..20 + // Leader: -------15..20 // Learner: 0..10 tracing::info!("--- block replication, build another snapshot"); { @@ -62,6 +63,10 @@ async fn purge_in_snapshot_logs() -> Result<()> { .await?; } + // There may be a cached append-entries request that already loads log 10..15 from the store, just before building + // snapshot. + sleep(Duration::from_millis(500)).await; + tracing::info!("--- restore replication, install the 2nd snapshot on learner"); { router.restore_node(1); diff --git a/rocksstore/src/lib.rs b/rocksstore/src/lib.rs index 02d8f390f..0080ffade 100644 --- a/rocksstore/src/lib.rs +++ b/rocksstore/src/lib.rs @@ -27,7 +27,6 @@ use openraft::RaftLogReader; use openraft::RaftSnapshotBuilder; use openraft::RaftStorage; use openraft::SnapshotMeta; -use openraft::StateMachineChanges; use openraft::StorageError; use openraft::StorageIOError; use openraft::Vote; @@ -534,7 +533,7 @@ impl RaftStorage for Arc { &mut self, meta: &SnapshotMeta, snapshot: Box, - ) -> Result, StorageError> { + ) -> Result<(), StorageError> { tracing::info!( { snapshot_size = snapshot.get_ref().len() }, "decoding snapshot for installation" @@ -561,10 +560,7 @@ impl RaftStorage for Arc { self.put_meta::(&new_snapshot)?; - Ok(StateMachineChanges { - last_applied: meta.last_log_id, - is_snapshot: true, - }) + Ok(()) } #[tracing::instrument(level = "trace", skip(self))] diff --git a/sledstore/src/lib.rs b/sledstore/src/lib.rs index 09c5a486c..01a33bf4d 100644 --- a/sledstore/src/lib.rs +++ b/sledstore/src/lib.rs @@ -26,7 +26,6 @@ use openraft::RaftLogReader; use openraft::RaftSnapshotBuilder; use openraft::RaftStorage; use openraft::SnapshotMeta; -use openraft::StateMachineChanges; use openraft::StorageError; use openraft::StorageIOError; use openraft::Vote; @@ -627,7 +626,7 @@ impl RaftStorage for Arc { &mut self, meta: &SnapshotMeta, snapshot: Box, - ) -> Result, StorageError> { + ) -> Result<(), StorageError> { tracing::info!( { snapshot_size = snapshot.get_ref().len() }, "decoding snapshot for installation" @@ -653,10 +652,7 @@ impl RaftStorage for Arc { } self.set_current_snapshot_(new_snapshot).await?; - Ok(StateMachineChanges { - last_applied: meta.last_log_id, - is_snapshot: true, - }) + Ok(()) } #[tracing::instrument(level = "trace", skip(self))]