From d0d04b28fedc10b05a41d818ff7e3de77f246cb8 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= <drdr.xp@gmail.com>
Date: Sun, 28 Aug 2022 20:46:45 +0800
Subject: [PATCH] Change: only purge logs that are in snapshot

Let `snapshot+logs` be a complete state of a raft node.

The Assumption before is `state_machine+logs` is a complete state of a
raft node. This requires state machine to persist the state every time
applying a log, which would be an innecessary overhead.

- Change: remove ENV config entries. Do not let a lib be affected by
  environment variables.

- Change: remove `Config.keep_unsnapshoted_log`: now by default, logs
  not included in snapshot won't be deleted.

  Rename `Config.max_applied_log_to_keep` to `max_in_snapshot_log_to_keep`.
---
 openraft/src/config/config.rs                 | 45 +++-------
 openraft/src/config/config_test.rs            | 17 +---
 openraft/src/core/raft_core.rs                |  7 +-
 openraft/src/core/replication.rs              |  6 ++
 openraft/src/engine/calc_purge_upto_test.rs   | 61 ++------------
 openraft/src/engine/engine_impl.rs            | 47 +++++------
 .../engine/follower_commit_entries_test.rs    | 53 ------------
 openraft/src/engine/update_progress_test.rs   | 63 --------------
 openraft/src/raft.rs                          |  2 +-
 .../t90_issue_216_stale_last_log_id.rs        |  2 +-
 .../tests/log_compaction/t10_compaction.rs    |  2 +-
 openraft/tests/membership/t10_add_learner.rs  |  2 +-
 openraft/tests/snapshot/main.rs               |  3 +-
 ...ld.rs => t24_snapshot_when_lacking_log.rs} | 24 +++---
 ..._snapshot_add_learner_and_request_a_log.rs |  2 +-
 .../snapshot/t40_purge_in_snapshot_logs.rs    | 84 +++++++++++++++++++
 .../t41_snapshot_overrides_membership.rs      | 15 +++-
 .../t42_snapshot_uses_prev_snap_membership.rs | 23 +----
 .../t43_snapshot_delete_conflict_logs.rs      |  2 +-
 openraft/tests/state_machine/main.rs          |  1 -
 .../state_machine/t40_clean_applied_logs.rs   | 64 --------------
 21 files changed, 173 insertions(+), 352 deletions(-)
 rename openraft/tests/snapshot/{t24_snapshot_ge_half_threshold.rs => t24_snapshot_when_lacking_log.rs} (79%)
 create mode 100644 openraft/tests/snapshot/t40_purge_in_snapshot_logs.rs
 delete mode 100644 openraft/tests/state_machine/t40_clean_applied_logs.rs

diff --git a/openraft/src/config/config.rs b/openraft/src/config/config.rs
index 1672b3d22..b15e474d6 100644
--- a/openraft/src/config/config.rs
+++ b/openraft/src/config/config.rs
@@ -83,75 +83,56 @@ fn parse_snapshot_policy(src: &str) -> Result<SnapshotPolicy, ConfigError> {
 #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
 pub struct Config {
     /// The application specific name of this Raft cluster
-    #[clap(long, env = "RAFT_CLUSTER_NAME", default_value = "foo")]
+    #[clap(long, default_value = "foo")]
     pub cluster_name: String,
 
     /// The minimum election timeout in milliseconds
-    #[clap(long, env = "RAFT_ELECTION_TIMEOUT_MIN", default_value = "150")]
+    #[clap(long, default_value = "150")]
     pub election_timeout_min: u64,
 
     /// The maximum election timeout in milliseconds
-    #[clap(long, env = "RAFT_ELECTION_TIMEOUT_MAX", default_value = "300")]
+    #[clap(long, default_value = "300")]
     pub election_timeout_max: u64,
 
     /// The heartbeat interval in milliseconds at which leaders will send heartbeats to followers
-    #[clap(long, env = "RAFT_HEARTBEAT_INTERVAL", default_value = "50")]
+    #[clap(long, default_value = "50")]
     pub heartbeat_interval: u64,
 
     /// The timeout for sending a snapshot segment, in millisecond
-    #[clap(long, env = "RAFT_INSTALL_SNAPSHOT_TIMEOUT", default_value = "200")]
+    #[clap(long, default_value = "200")]
     pub install_snapshot_timeout: u64,
 
     /// The maximum number of entries per payload allowed to be transmitted during replication
     ///
     /// If this is too low, it will take longer for the nodes to be brought up to
     /// consistency with the rest of the cluster.
-    #[clap(long, env = "RAFT_MAX_PAYLOAD_ENTRIES", default_value = "300")]
+    #[clap(long, default_value = "300")]
     pub max_payload_entries: u64,
 
     /// The distance behind in log replication a follower must fall before it is considered lagging
     ///
     /// Once a replication stream transition into line-rate state, the target node will be considered safe to join a
     /// cluster.
-    #[clap(long, env = "RAFT_REPLICATION_LAG_THRESHOLD", default_value = "1000")]
+    #[clap(long, default_value = "1000")]
     pub replication_lag_threshold: u64,
 
     /// The snapshot policy to use for a Raft node.
     #[clap(
         long,
-        env = "RAFT_SNAPSHOT_POLICY",
         default_value = "since_last:5000",
         parse(try_from_str=parse_snapshot_policy)
     )]
     pub snapshot_policy: SnapshotPolicy,
 
-    /// Whether to keep `applied_log`s that are not included by snapshots.
-    ///
-    /// If your application may rebuild it's state machine from snapshots,
-    /// please set this to true.
-    ///
-    /// By default, `OpenRaft` purges `applied_log`s from time to time regardless of snapshots, because it assumes once
-    /// logs are `applied` to the state machine, logs are persisted on disk.
-    ///
-    /// If an implementation does not persist data when `RaftStorage::apply_to_state_machine()` returns, and just
-    /// relies on `snapshot` to rebuild the state machine when the next time it restarts, the application must always
-    /// set `keep_unsnapshoted_log` to `true`, so that only logs that are already included in a snapshot will be
-    /// purged.
-    //
-    // This is another way to implement:
-    // #[clap(long, env = "RAFT_KEEP_UNSNAPSHOTED_LOG",
-    //        default_value_t = false,
-    //        value_parser=clap::value_parser!(bool))]
-    #[clap(long, env = "RAFT_KEEP_UNSNAPSHOTED_LOG")]
-    pub keep_unsnapshoted_log: bool,
-
     /// The maximum snapshot chunk size allowed when transmitting snapshots (in bytes)
-    #[clap(long, env = "RAFT_SNAPSHOT_MAX_CHUNK_SIZE", default_value = "3MiB", parse(try_from_str=parse_bytes_with_unit))]
+    #[clap(long, default_value = "3MiB", parse(try_from_str=parse_bytes_with_unit))]
     pub snapshot_max_chunk_size: u64,
 
-    /// The maximum number of applied logs to keep before purging
-    #[clap(long, env = "RAFT_MAX_APPLIED_LOG_TO_KEEP", default_value = "1000")]
-    pub max_applied_log_to_keep: u64,
+    /// The maximum number of logs to keep that are already included in **snapshot**.
+    ///
+    /// Logs that are not in snapshot will never be purged.
+    #[clap(long, default_value = "1000")]
+    pub max_in_snapshot_log_to_keep: u64,
 
     /// The minimal number of applied logs to purge in a batch.
     #[clap(long, default_value = "1")]
diff --git a/openraft/src/config/config_test.rs b/openraft/src/config/config_test.rs
index 05b8732ed..95bdbe3ac 100644
--- a/openraft/src/config/config_test.rs
+++ b/openraft/src/config/config_test.rs
@@ -56,9 +56,8 @@ fn test_build() -> anyhow::Result<()> {
         "--max-payload-entries=201",
         "--replication-lag-threshold=202",
         "--snapshot-policy=since_last:203",
-        "--keep-unsnapshoted-log",
         "--snapshot-max-chunk-size=204",
-        "--max-applied-log-to-keep=205",
+        "--max-in-snapshot-log-to-keep=205",
         "--purge-batch-size=207",
     ])?;
 
@@ -70,25 +69,13 @@ fn test_build() -> anyhow::Result<()> {
     assert_eq!(201, config.max_payload_entries);
     assert_eq!(202, config.replication_lag_threshold);
     assert_eq!(SnapshotPolicy::LogsSinceLast(203), config.snapshot_policy);
-    assert_eq!(true, config.keep_unsnapshoted_log);
     assert_eq!(204, config.snapshot_max_chunk_size);
-    assert_eq!(205, config.max_applied_log_to_keep);
+    assert_eq!(205, config.max_in_snapshot_log_to_keep);
     assert_eq!(207, config.purge_batch_size);
 
     Ok(())
 }
 
-#[test]
-fn test_config_keep_unsnapshoted_log() -> anyhow::Result<()> {
-    let config = Config::build(&["foo", "--keep-unsnapshoted-log"])?;
-    assert_eq!(true, config.keep_unsnapshoted_log);
-
-    let config = Config::build(&["foo"])?;
-    assert_eq!(false, config.keep_unsnapshoted_log);
-
-    Ok(())
-}
-
 #[test]
 fn test_config_enable_tick() -> anyhow::Result<()> {
     let config = Config::build(&["foo", "--enable-tick=false"])?;
diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs
index 079608bc1..ce33368c8 100644
--- a/openraft/src/core/raft_core.rs
+++ b/openraft/src/core/raft_core.rs
@@ -250,9 +250,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
         self.storage.save_vote(&state.vote).await?;
 
         self.engine = Engine::new(self.id, &state, EngineConfig {
-            max_applied_log_to_keep: self.config.max_applied_log_to_keep,
+            max_in_snapshot_log_to_keep: self.config.max_in_snapshot_log_to_keep,
             purge_batch_size: self.config.purge_batch_size,
-            keep_unsnapshoted_log: self.config.keep_unsnapshoted_log,
         });
 
         // Fetch the most recent snapshot in the system.
@@ -850,7 +849,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
         // TODO: add building-session id to identify different building
         match result {
             SnapshotResult::Ok(meta) => {
-                self.engine.update_snapshot(meta);
+                self.engine.finish_building_snapshot(meta);
                 self.run_engine_commands::<Entry<C>>(&[]).await?;
             }
             SnapshotResult::StorageError(sto_err) => {
@@ -868,6 +867,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
     /// If force is True, it will skip the threshold check and start creating snapshot as demanded.
     #[tracing::instrument(level = "debug", skip(self))]
     pub(crate) async fn trigger_log_compaction_if_needed(&mut self, force: bool) {
+        tracing::debug!("trigger_log_compaction_if_needed: force: {}", force);
+
         if let SnapshotState::None = self.snapshot_state {
             // Continue.
         } else {
diff --git a/openraft/src/core/replication.rs b/openraft/src/core/replication.rs
index 46f1a8372..08c0b3c5e 100644
--- a/openraft/src/core/replication.rs
+++ b/openraft/src/core/replication.rs
@@ -4,6 +4,12 @@ pub(crate) fn snapshot_is_within_half_of_threshold(
     last_log_index: &u64,
     threshold: &u64,
 ) -> bool {
+    tracing::debug!(
+        "snapshot_last_indeix:{}, last_log_index:{}, threshold: {}",
+        snapshot_last_index,
+        last_log_index,
+        threshold
+    );
     // Calculate distance from actor's last log index.
     let distance_from_line = last_log_index.saturating_sub(*snapshot_last_index);
 
diff --git a/openraft/src/engine/calc_purge_upto_test.rs b/openraft/src/engine/calc_purge_upto_test.rs
index b54dc237c..8d94d71d5 100644
--- a/openraft/src/engine/calc_purge_upto_test.rs
+++ b/openraft/src/engine/calc_purge_upto_test.rs
@@ -24,7 +24,8 @@ fn eng() -> Engine<u64, ()> {
 
 #[test]
 fn test_calc_purge_upto() -> anyhow::Result<()> {
-    // last_purged_log_id, committed, max_keep, want
+    // last_purged_log_id, last_snapshot_log_id, max_keep, want
+    // last_applied should not affect the purge
     let cases = vec![
         //
         (None, None, 0, None),
@@ -53,71 +54,23 @@ fn test_calc_purge_upto() -> anyhow::Result<()> {
         (Some(log_id(1, 2)), Some(log_id(3, 4)), 5, None),
     ];
 
-    for (last_purged, committed, max_keep, want) in cases {
+    for (last_purged, snapshot_last_log_id, max_keep, want) in cases {
         let mut eng = eng();
-        eng.config.keep_unsnapshoted_log = false;
-        eng.config.max_applied_log_to_keep = max_keep;
+        eng.config.max_in_snapshot_log_to_keep = max_keep;
         eng.config.purge_batch_size = 1;
 
         if let Some(last_purged) = last_purged {
             eng.state.log_ids.purge(&last_purged);
         }
-        eng.state.committed = committed;
+        eng.snapshot_meta.last_log_id = snapshot_last_log_id;
         let got = eng.calc_purge_upto();
 
         assert_eq!(
             want, got,
-            "case: last_purged: {:?}, last_applied: {:?}, max_keep: {}",
-            last_purged, committed, max_keep
+            "case: last_purged: {:?}, snapshot_last_log_id: {:?}, max_keep: {}",
+            last_purged, snapshot_last_log_id, max_keep
         );
     }
 
     Ok(())
 }
-
-#[test]
-// in this test, keep_unsnapshoted_log is set to true.
-// logs being purged should at most the last that was in the snapshot.
-fn test_keep_unsnapshoted() -> anyhow::Result<()> {
-    let cases = vec![
-        // last_deleted, last_applied, last_snapshoted, max_keep, want
-        // empty test
-        (None, None, None, 0, None),
-        (None, None, None, 1, None),
-        // nothing in snapshot
-        (Some(log_id(1, 1)), Some(log_id(2, 2)), None, 0, None),
-        (Some(log_id(1, 1)), Some(log_id(5, 5)), None, 0, None),
-        // snapshot kept up
-        (None, Some(log_id(5, 5)), Some(log_id(3, 4)), 0, Some(log_id(3, 4))),
-        (
-            Some(log_id(1, 1)),
-            Some(log_id(5, 5)),
-            Some(log_id(5, 5)),
-            0,
-            Some(log_id(5, 5)),
-        ),
-    ];
-
-    for (purged, committed, snapshot, keep, want) in cases {
-        let mut eng = eng();
-        eng.config.keep_unsnapshoted_log = true;
-        eng.config.max_applied_log_to_keep = keep;
-        eng.config.purge_batch_size = 1;
-
-        if let Some(last_purged) = purged {
-            eng.state.log_ids.purge(&last_purged);
-        }
-        eng.state.committed = committed;
-        eng.snapshot_meta.last_log_id = snapshot;
-
-        let got = eng.calc_purge_upto();
-
-        assert_eq!(
-            want, got,
-            "case: purged: {:?}, committed: {:?}, snapshot: {:?}, keep: {}",
-            purged, committed, snapshot, keep
-        )
-    }
-
-    Ok(())
-}
diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs
index d57dc5b46..af269fb70 100644
--- a/openraft/src/engine/engine_impl.rs
+++ b/openraft/src/engine/engine_impl.rs
@@ -33,22 +33,17 @@ use crate::Vote;
 #[derive(PartialEq, Eq)]
 pub(crate) struct EngineConfig {
     /// The maximum number of applied logs to keep before purging.
-    pub(crate) max_applied_log_to_keep: u64,
+    pub(crate) max_in_snapshot_log_to_keep: u64,
 
     /// The minimal number of applied logs to purge in a batch.
     pub(crate) purge_batch_size: u64,
-
-    /// whether to keep applied log that are not included in snapshots.
-    /// false by default
-    pub(crate) keep_unsnapshoted_log: bool,
 }
 
 impl Default for EngineConfig {
     fn default() -> Self {
         Self {
-            max_applied_log_to_keep: 1000,
+            max_in_snapshot_log_to_keep: 1000,
             purge_batch_size: 256,
-            keep_unsnapshoted_log: false,
         }
     }
 }
@@ -433,7 +428,6 @@ where
                 already_committed: prev_committed,
                 upto: committed.unwrap(),
             });
-            self.purge_applied_log();
         }
     }
 
@@ -540,13 +534,13 @@ where
         }
     }
 
-    /// Purge applied log if needed.
+    /// Purge logs that are already in snapshot if needed.
     ///
-    /// `max_applied_log_to_keep` specifies the number of applied logs to keep.
-    /// `max_applied_log_to_keep==0` means every applied log can be purged.
+    /// `max_in_snapshot_log_to_keep` specifies the number of logs already included in snapshot to keep.
+    /// `max_in_snapshot_log_to_keep==0` means to purge every log stored in snapshot.
     // NOTE: simple method, not tested.
     #[tracing::instrument(level = "debug", skip_all)]
-    pub(crate) fn purge_applied_log(&mut self) {
+    pub(crate) fn purge_in_snapshot_log(&mut self) {
         if let Some(purge_upto) = self.calc_purge_upto() {
             self.purge_log(purge_upto);
         }
@@ -562,20 +556,13 @@ where
     #[tracing::instrument(level = "debug", skip_all)]
     pub(crate) fn calc_purge_upto(&mut self) -> Option<LogId<NID>> {
         let st = &self.state;
-        let last_applied = &st.committed;
-        let max_keep = self.config.max_applied_log_to_keep;
+        let max_keep = self.config.max_in_snapshot_log_to_keep;
         let batch_size = self.config.purge_batch_size;
 
-        let mut purge_end = last_applied.next_index().saturating_sub(max_keep);
-
-        if self.config.keep_unsnapshoted_log {
-            let idx = self.snapshot_meta.last_log_id.next_index();
-            tracing::debug!("the very last log included in snapshots: {}", idx);
-            purge_end = idx.min(purge_end);
-        }
+        let purge_end = self.snapshot_meta.last_log_id.next_index().saturating_sub(max_keep);
 
         tracing::debug!(
-            last_applied = debug(last_applied),
+            snapshot_last_log_id = debug(self.snapshot_meta.last_log_id),
             max_keep,
             "try purge: (-oo, {})",
             purge_end
@@ -583,7 +570,7 @@ where
 
         if st.last_purged_log_id().next_index() + batch_size > purge_end {
             tracing::debug!(
-                last_applied = debug(last_applied),
+                snapshot_last_log_id = debug(self.snapshot_meta.last_log_id),
                 max_keep,
                 last_purged_log_id = display(st.last_purged_log_id().summary()),
                 batch_size,
@@ -640,6 +627,7 @@ where
         //   membership.log_id       = (10, 5);
         //   local_effective.log_id = (2, 10);
         if effective.log_id.index() <= m.log_id.index() {
+            // TODO: if effective membership changes, call `update_repliation()`
             effective = m;
         }
 
@@ -743,7 +731,6 @@ where
                 already_committed: prev_committed,
                 upto: self.state.committed.unwrap(),
             });
-            self.purge_applied_log();
         }
     }
 
@@ -801,6 +788,18 @@ where
         self.purge_log(snap_last_log_id)
     }
 
+    #[tracing::instrument(level = "debug", skip_all)]
+    pub(crate) fn finish_building_snapshot(&mut self, meta: SnapshotMeta<NID, N>) {
+        tracing::info!("finish_building_snapshot: {:?}", meta);
+
+        let updated = self.update_snapshot(meta);
+        if !updated {
+            return;
+        }
+
+        self.purge_in_snapshot_log();
+    }
+
     /// Update engine state when a new snapshot is built or installed.
     ///
     /// Engine records only the metadata of a snapshot. Snapshot data is stored by RaftStorage implementation.
diff --git a/openraft/src/engine/follower_commit_entries_test.rs b/openraft/src/engine/follower_commit_entries_test.rs
index 5ab97dd13..d85307a5d 100644
--- a/openraft/src/engine/follower_commit_entries_test.rs
+++ b/openraft/src/engine/follower_commit_entries_test.rs
@@ -4,7 +4,6 @@ use maplit::btreeset;
 
 use crate::engine::Command;
 use crate::engine::Engine;
-use crate::engine::LogIdList;
 use crate::EffectiveMembership;
 use crate::Entry;
 use crate::EntryPayload;
@@ -175,55 +174,3 @@ fn test_follower_commit_entries_gt_last_entry() -> anyhow::Result<()> {
 
     Ok(())
 }
-
-#[test]
-fn test_follower_commit_entries_purge_to_committed() -> anyhow::Result<()> {
-    let mut eng = eng();
-    eng.state.log_ids = LogIdList::new([log_id(2, 2), log_id(2, 3)]);
-    eng.config.max_applied_log_to_keep = 0;
-    eng.config.purge_batch_size = 1;
-
-    eng.follower_commit_entries(Some(log_id(3, 1)), None, &[blank(2, 3)]);
-
-    assert_eq!(Some(log_id(2, 3)), eng.state.committed);
-    assert_eq!(Some(log_id(2, 3)), eng.state.last_purged_log_id());
-
-    assert_eq!(
-        vec![
-            Command::FollowerCommit {
-                already_committed: Some(log_id(1, 1)),
-                upto: log_id(2, 3)
-            },
-            Command::PurgeLog { upto: log_id(2, 3) },
-        ],
-        eng.commands
-    );
-
-    Ok(())
-}
-
-#[test]
-fn test_follower_commit_entries_purge_to_committed_minus_1() -> anyhow::Result<()> {
-    let mut eng = eng();
-    eng.state.log_ids = LogIdList::new([log_id(1, 1), log_id(2, 3)]);
-    eng.config.max_applied_log_to_keep = 1;
-    eng.config.purge_batch_size = 1;
-
-    eng.follower_commit_entries(Some(log_id(3, 1)), None, &[blank(2, 3)]);
-
-    assert_eq!(Some(log_id(2, 3)), eng.state.committed);
-    assert_eq!(Some(log_id(1, 2)), eng.state.last_purged_log_id());
-
-    assert_eq!(
-        vec![
-            Command::FollowerCommit {
-                already_committed: Some(log_id(1, 1)),
-                upto: log_id(2, 3)
-            },
-            Command::PurgeLog { upto: log_id(1, 2) },
-        ],
-        eng.commands
-    );
-
-    Ok(())
-}
diff --git a/openraft/src/engine/update_progress_test.rs b/openraft/src/engine/update_progress_test.rs
index ca8fb6e98..1cd5ae03f 100644
--- a/openraft/src/engine/update_progress_test.rs
+++ b/openraft/src/engine/update_progress_test.rs
@@ -4,7 +4,6 @@ use maplit::btreeset;
 
 use crate::engine::Command;
 use crate::engine::Engine;
-use crate::engine::LogIdList;
 use crate::EffectiveMembership;
 use crate::LeaderId;
 use crate::LogId;
@@ -102,65 +101,3 @@ fn test_update_progress_update_leader_progress() -> anyhow::Result<()> {
 
     Ok(())
 }
-
-#[test]
-fn test_update_progress_purge_upto_committed() -> anyhow::Result<()> {
-    let mut eng = eng();
-    eng.state.log_ids = LogIdList::new([log_id(2, 0), log_id(2, 5)]);
-    eng.config.max_applied_log_to_keep = 0;
-    eng.config.purge_batch_size = 1;
-
-    eng.state.new_leader();
-
-    // progress: None, (2,1), (2,3); committed: (2,1)
-    eng.update_progress(3, Some(log_id(1, 2)));
-    eng.update_progress(2, Some(log_id(2, 1)));
-    eng.update_progress(3, Some(log_id(2, 3)));
-    assert_eq!(Some(log_id(2, 1)), eng.state.committed);
-    assert_eq!(
-        vec![
-            Command::ReplicateCommitted {
-                committed: Some(log_id(2, 1))
-            },
-            Command::LeaderCommit {
-                already_committed: None,
-                upto: log_id(2, 1)
-            },
-            Command::PurgeLog { upto: log_id(2, 1) },
-        ],
-        eng.commands
-    );
-
-    Ok(())
-}
-
-#[test]
-fn test_update_progress_purge_upto_committed_minus_1() -> anyhow::Result<()> {
-    let mut eng = eng();
-    eng.state.log_ids = LogIdList::new([log_id(2, 0), log_id(2, 5)]);
-    eng.config.max_applied_log_to_keep = 1;
-    eng.config.purge_batch_size = 1;
-
-    eng.state.new_leader();
-
-    // progress: None, (2,1), (2,3); committed: (2,1)
-    eng.update_progress(3, Some(log_id(1, 2)));
-    eng.update_progress(2, Some(log_id(2, 2)));
-    eng.update_progress(3, Some(log_id(2, 4)));
-    assert_eq!(Some(log_id(2, 2)), eng.state.committed);
-    assert_eq!(
-        vec![
-            Command::ReplicateCommitted {
-                committed: Some(log_id(2, 2))
-            },
-            Command::LeaderCommit {
-                already_committed: None,
-                upto: log_id(2, 2)
-            },
-            Command::PurgeLog { upto: log_id(2, 1) },
-        ],
-        eng.commands
-    );
-
-    Ok(())
-}
diff --git a/openraft/src/raft.rs b/openraft/src/raft.rs
index 6b79b2991..04d294d25 100644
--- a/openraft/src/raft.rs
+++ b/openraft/src/raft.rs
@@ -968,7 +968,7 @@ where
                 format!("InstallSnapshot: {}", rpc.summary())
             }
             RaftMsg::BuildingSnapshotResult { result: update } => {
-                format!("SnapshotUpdate: {:?}", update)
+                format!("BuildingSnapshotResult: {:?}", update)
             }
             RaftMsg::ClientWriteRequest { payload: rpc, .. } => {
                 format!("ClientWriteRequest: {}", rpc.summary())
diff --git a/openraft/tests/append_entries/t90_issue_216_stale_last_log_id.rs b/openraft/tests/append_entries/t90_issue_216_stale_last_log_id.rs
index 4431b59c1..a20360006 100644
--- a/openraft/tests/append_entries/t90_issue_216_stale_last_log_id.rs
+++ b/openraft/tests/append_entries/t90_issue_216_stale_last_log_id.rs
@@ -23,7 +23,7 @@ async fn stale_last_log_id() -> Result<()> {
             election_timeout_min: 500,
             election_timeout_max: 1000,
             max_payload_entries: 1,
-            max_applied_log_to_keep: 0,
+            max_in_snapshot_log_to_keep: 0,
             purge_batch_size: 1,
             enable_heartbeat: false,
             ..Default::default()
diff --git a/openraft/tests/log_compaction/t10_compaction.rs b/openraft/tests/log_compaction/t10_compaction.rs
index 45b231710..a04f39894 100644
--- a/openraft/tests/log_compaction/t10_compaction.rs
+++ b/openraft/tests/log_compaction/t10_compaction.rs
@@ -38,7 +38,7 @@ async fn compaction() -> Result<()> {
     let config = Arc::new(
         Config {
             snapshot_policy: SnapshotPolicy::LogsSinceLast(snapshot_threshold),
-            max_applied_log_to_keep: 2,
+            max_in_snapshot_log_to_keep: 2,
             purge_batch_size: 1,
             enable_tick: false,
             ..Default::default()
diff --git a/openraft/tests/membership/t10_add_learner.rs b/openraft/tests/membership/t10_add_learner.rs
index edda69fa0..da8192586 100644
--- a/openraft/tests/membership/t10_add_learner.rs
+++ b/openraft/tests/membership/t10_add_learner.rs
@@ -24,7 +24,7 @@ async fn add_learner_basic() -> Result<()> {
     let config = Arc::new(
         Config {
             replication_lag_threshold: 0,
-            max_applied_log_to_keep: 2000, // prevent snapshot
+            max_in_snapshot_log_to_keep: 2000, // prevent snapshot
             purge_batch_size: 1,
             enable_tick: false,
             ..Default::default()
diff --git a/openraft/tests/snapshot/main.rs b/openraft/tests/snapshot/main.rs
index 68f3dd5d7..676c61de2 100644
--- a/openraft/tests/snapshot/main.rs
+++ b/openraft/tests/snapshot/main.rs
@@ -7,9 +7,10 @@ mod fixtures;
 mod t20_api_install_snapshot;
 mod t20_trigger_snapshot;
 mod t23_snapshot_chunk_size;
-mod t24_snapshot_ge_half_threshold;
+mod t24_snapshot_when_lacking_log;
 mod t25_snapshot_line_rate_to_snapshot;
 mod t40_after_snapshot_add_learner_and_request_a_log;
+mod t40_purge_in_snapshot_logs;
 mod t41_snapshot_overrides_membership;
 mod t42_snapshot_uses_prev_snap_membership;
 mod t43_snapshot_delete_conflict_logs;
diff --git a/openraft/tests/snapshot/t24_snapshot_ge_half_threshold.rs b/openraft/tests/snapshot/t24_snapshot_when_lacking_log.rs
similarity index 79%
rename from openraft/tests/snapshot/t24_snapshot_ge_half_threshold.rs
rename to openraft/tests/snapshot/t24_snapshot_when_lacking_log.rs
index 19f3aeb97..e67004ad0 100644
--- a/openraft/tests/snapshot/t24_snapshot_ge_half_threshold.rs
+++ b/openraft/tests/snapshot/t24_snapshot_when_lacking_log.rs
@@ -10,25 +10,20 @@ use openraft::SnapshotPolicy;
 use crate::fixtures::init_default_ut_tracing;
 use crate::fixtures::RaftRouter;
 
-/// A leader should create and send snapshot when snapshot is old and is not that old to trigger a snapshot, i.e.:
-/// `threshold/2 < leader.last_log_index - snapshot.applied_index < threshold`
-///
-/// What does this test do?
+/// A leader switch to snapshot replication if a log a follower/learner needs but is already purged.
 ///
 /// - build a stable single node cluster.
 /// - send enough requests to the node that log compaction will be triggered.
-/// - send some other log after snapshot created, to make the `leader.last_log_index - snapshot.applied_index` big
-///   enough.
 /// - add learner and assert that they receive the snapshot and logs.
 #[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
-async fn snapshot_ge_half_threshold() -> Result<()> {
-    let snapshot_threshold: u64 = 10;
-    let log_cnt = snapshot_threshold + 6;
+async fn switch_to_snapshot_replication_when_lacking_log() -> Result<()> {
+    let snapshot_threshold: u64 = 20;
+    let log_cnt = snapshot_threshold + 11;
 
     let config = Arc::new(
         Config {
             snapshot_policy: SnapshotPolicy::LogsSinceLast(snapshot_threshold),
-            max_applied_log_to_keep: 6,
+            max_in_snapshot_log_to_keep: 0,
             purge_batch_size: 1,
             enable_heartbeat: false,
             ..Default::default()
@@ -79,10 +74,15 @@ async fn snapshot_ge_half_threshold() -> Result<()> {
         log_index += 1;
 
         router.wait_for_log(&btreeset![0, 1], Some(log_index), None, "add learner").await?;
-        let expected_snap = Some((log_index.into(), 1));
         router
-            .wait_for_snapshot(&btreeset![1], LogId::new(LeaderId::new(1, 0), log_index), None, "")
+            .wait_for_snapshot(
+                &btreeset![1],
+                LogId::new(LeaderId::new(1, 0), snapshot_threshold - 1),
+                None,
+                "",
+            )
             .await?;
+        let expected_snap = Some(((snapshot_threshold - 1).into(), 1));
         router
             .assert_storage_state(
                 1,
diff --git a/openraft/tests/snapshot/t40_after_snapshot_add_learner_and_request_a_log.rs b/openraft/tests/snapshot/t40_after_snapshot_add_learner_and_request_a_log.rs
index a07f21aa7..6f6cee41a 100644
--- a/openraft/tests/snapshot/t40_after_snapshot_add_learner_and_request_a_log.rs
+++ b/openraft/tests/snapshot/t40_after_snapshot_add_learner_and_request_a_log.rs
@@ -19,7 +19,7 @@ async fn after_snapshot_add_learner_and_request_a_log() -> Result<()> {
     let config = Arc::new(
         Config {
             snapshot_policy: SnapshotPolicy::LogsSinceLast(snapshot_threshold),
-            max_applied_log_to_keep: 2, // do not let add-learner log and client-write log to trigger a snapshot.
+            max_in_snapshot_log_to_keep: 2, // do not let add-learner log and client-write log to trigger a snapshot.
             purge_batch_size: 1,
             enable_heartbeat: false,
             ..Default::default()
diff --git a/openraft/tests/snapshot/t40_purge_in_snapshot_logs.rs b/openraft/tests/snapshot/t40_purge_in_snapshot_logs.rs
new file mode 100644
index 000000000..d4b4fe556
--- /dev/null
+++ b/openraft/tests/snapshot/t40_purge_in_snapshot_logs.rs
@@ -0,0 +1,84 @@
+use std::sync::Arc;
+use std::time::Duration;
+
+use anyhow::Result;
+use maplit::btreeset;
+use openraft::Config;
+use openraft::LeaderId;
+use openraft::LogId;
+use openraft::RaftLogReader;
+
+use crate::fixtures::init_default_ut_tracing;
+use crate::fixtures::RaftRouter;
+
+/// Leader logs should be deleted upto snapshot.last_log_id-max_in_snapshot_log_to_keep after building snapshot;
+/// Follower/learner should delete upto snapshot.last_log_id after installing snapshot.
+#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
+async fn purge_in_snapshot_logs() -> Result<()> {
+    let max_keep = 2;
+
+    let config = Arc::new(
+        Config {
+            max_in_snapshot_log_to_keep: max_keep,
+            purge_batch_size: 1,
+            enable_tick: false,
+            ..Default::default()
+        }
+        .validate()?,
+    );
+
+    let mut router = RaftRouter::new(config.clone());
+    let mut log_index = router.new_nodes_from_single(btreeset! {0}, btreeset! {1}).await?;
+
+    let leader = router.get_raft_handle(&0)?;
+    let learner = router.get_raft_handle(&1)?;
+
+    tracing::info!("--- build snapshot on leader, check purged log");
+    {
+        log_index += router.client_request_many(0, "0", 10).await?;
+        leader.trigger_snapshot().await?;
+        leader
+            .wait(timeout())
+            .snapshot(LogId::new(LeaderId::new(1, 0), log_index), "building 1st snapshot")
+            .await?;
+        let mut sto0 = router.get_storage_handle(&0)?;
+        let logs = sto0.try_get_log_entries(..).await?;
+        assert_eq!(max_keep as usize, logs.len());
+    }
+
+    // Leader:  .......15..20
+    // Learner: 0..10
+    tracing::info!("--- block replication, build another snapshot");
+    {
+        router.isolate_node(1);
+
+        log_index += router.client_request_many(0, "0", 5).await?;
+        router.wait(&0, timeout()).log(Some(log_index), "write another 5 logs").await?;
+
+        leader.trigger_snapshot().await?;
+        leader
+            .wait(timeout())
+            .snapshot(LogId::new(LeaderId::new(1, 0), log_index), "building 2nd snapshot")
+            .await?;
+    }
+
+    tracing::info!("--- restore replication, install the 2nd snapshot on learner");
+    {
+        router.restore_node(1);
+
+        learner
+            .wait(timeout())
+            .snapshot(LogId::new(LeaderId::new(1, 0), log_index), "learner install snapshot")
+            .await?;
+
+        let mut sto1 = router.get_storage_handle(&1)?;
+        let logs = sto1.try_get_log_entries(..).await?;
+        assert_eq!(0, logs.len());
+    }
+
+    Ok(())
+}
+
+fn timeout() -> Option<Duration> {
+    Some(Duration::from_millis(1_000))
+}
diff --git a/openraft/tests/snapshot/t41_snapshot_overrides_membership.rs b/openraft/tests/snapshot/t41_snapshot_overrides_membership.rs
index 834ca210b..763014e60 100644
--- a/openraft/tests/snapshot/t41_snapshot_overrides_membership.rs
+++ b/openraft/tests/snapshot/t41_snapshot_overrides_membership.rs
@@ -36,7 +36,7 @@ async fn snapshot_overrides_membership() -> Result<()> {
     let config = Arc::new(
         Config {
             snapshot_policy: SnapshotPolicy::LogsSinceLast(snapshot_threshold),
-            max_applied_log_to_keep: 0,
+            max_in_snapshot_log_to_keep: 0,
             purge_batch_size: 1,
             enable_heartbeat: false,
             ..Default::default()
@@ -111,6 +111,8 @@ async fn snapshot_overrides_membership() -> Result<()> {
 
         tracing::info!("--- add learner to the cluster to receive snapshot, which overrides the learner storage");
         {
+            let snapshot_index = log_index;
+
             router.add_learner(0, 1).await.expect("failed to add new node as learner");
             log_index += 1;
 
@@ -118,10 +120,15 @@ async fn snapshot_overrides_membership() -> Result<()> {
 
             router.wait_for_log(&btreeset![0, 1], Some(log_index), timeout(), "add learner").await?;
             router
-                .wait_for_snapshot(&btreeset![1], LogId::new(LeaderId::new(1, 0), log_index), timeout(), "")
+                .wait_for_snapshot(
+                    &btreeset![1],
+                    LogId::new(LeaderId::new(1, 0), snapshot_index),
+                    timeout(),
+                    "",
+                )
                 .await?;
 
-            let expected_snap = Some((log_index.into(), 1));
+            let expected_snap = Some((snapshot_index.into(), 1));
 
             router
                 .assert_storage_state(
@@ -152,5 +159,5 @@ async fn snapshot_overrides_membership() -> Result<()> {
 }
 
 fn timeout() -> Option<Duration> {
-    Some(Duration::from_millis(5000))
+    Some(Duration::from_millis(1_000))
 }
diff --git a/openraft/tests/snapshot/t42_snapshot_uses_prev_snap_membership.rs b/openraft/tests/snapshot/t42_snapshot_uses_prev_snap_membership.rs
index 8eb394cb0..7608fb593 100644
--- a/openraft/tests/snapshot/t42_snapshot_uses_prev_snap_membership.rs
+++ b/openraft/tests/snapshot/t42_snapshot_uses_prev_snap_membership.rs
@@ -36,7 +36,7 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> {
             snapshot_policy: SnapshotPolicy::LogsSinceLast(snapshot_threshold),
             // Use 3, with 1 it triggers a compaction when replicating ent-1,
             // because ent-0 is removed.
-            max_applied_log_to_keep: 3,
+            max_in_snapshot_log_to_keep: 3,
             purge_batch_size: 1,
             enable_tick: false,
             ..Default::default()
@@ -67,7 +67,7 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> {
                 &btreeset![0],
                 LogId::new(LeaderId::new(1, 0), log_index),
                 timeout(),
-                "snapshot",
+                "1st snapshot",
             )
             .await?;
 
@@ -87,23 +87,6 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> {
             m.effective.membership,
             "membership "
         );
-
-        // TODO(xp): this assertion fails because when change-membership, a append-entries request does not update
-        //           voted_for and does not call save_vote.
-        //           Thus the storage layer does not know about the leader==Some(0).
-        //           Update voted_for whenever a new leader is seen would solve this issue.
-        // router
-        //     .assert_storage_state(
-        //         1,
-        //         want,
-        //         Some(0),
-        //         want,
-        //         Some((want.into(), 1, MembershipConfig {
-        //             members: btreeset![0, 1],
-        //             members_after_consensus: None,
-        //         })),
-        //     )
-        //     .await;
     }
 
     tracing::info!("--- send just enough logs to trigger the 2nd snapshot");
@@ -117,7 +100,7 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> {
                 &btreeset![0],
                 LogId::new(LeaderId::new(1, 0), log_index),
                 None,
-                "snapshot",
+                "2nd snapshot",
             )
             .await?;
     }
diff --git a/openraft/tests/snapshot/t43_snapshot_delete_conflict_logs.rs b/openraft/tests/snapshot/t43_snapshot_delete_conflict_logs.rs
index e9100f8f9..e1e95cd52 100644
--- a/openraft/tests/snapshot/t43_snapshot_delete_conflict_logs.rs
+++ b/openraft/tests/snapshot/t43_snapshot_delete_conflict_logs.rs
@@ -39,7 +39,7 @@ async fn snapshot_delete_conflicting_logs() -> Result<()> {
     let config = Arc::new(
         Config {
             snapshot_policy: SnapshotPolicy::LogsSinceLast(snapshot_threshold),
-            max_applied_log_to_keep: 0,
+            max_in_snapshot_log_to_keep: 0,
             purge_batch_size: 1,
             enable_heartbeat: false,
             ..Default::default()
diff --git a/openraft/tests/state_machine/main.rs b/openraft/tests/state_machine/main.rs
index 601175407..f667e9b3b 100644
--- a/openraft/tests/state_machine/main.rs
+++ b/openraft/tests/state_machine/main.rs
@@ -9,4 +9,3 @@ mod fixtures;
 
 mod t10_total_order_apply;
 mod t20_state_machine_apply_membership;
-mod t40_clean_applied_logs;
diff --git a/openraft/tests/state_machine/t40_clean_applied_logs.rs b/openraft/tests/state_machine/t40_clean_applied_logs.rs
deleted file mode 100644
index e9835571b..000000000
--- a/openraft/tests/state_machine/t40_clean_applied_logs.rs
+++ /dev/null
@@ -1,64 +0,0 @@
-use std::sync::Arc;
-use std::time::Duration;
-
-use anyhow::Result;
-use maplit::btreeset;
-use openraft::Config;
-use openraft::RaftLogReader;
-
-use crate::fixtures::init_default_ut_tracing;
-use crate::fixtures::RaftRouter;
-
-/// Logs should be deleted by raft after applying them, on leader and learner.
-///
-/// - assert logs are deleted on leader after applying them.
-/// - assert logs are deleted on replication target after installing a snapshot.
-#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
-async fn clean_applied_logs() -> Result<()> {
-    let config = Arc::new(
-        Config {
-            max_applied_log_to_keep: 2,
-            purge_batch_size: 1,
-            enable_tick: false,
-            ..Default::default()
-        }
-        .validate()?,
-    );
-
-    let mut router = RaftRouter::new(config.clone());
-
-    let mut log_index = router.new_nodes_from_single(btreeset! {0}, btreeset! {1}).await?;
-
-    let count = (10 - log_index) as usize;
-    for idx in 0..count {
-        router.client_request(0, "0", idx as u64).await?;
-        log_index += 1;
-
-        // raft commit at once with a single leader cluster.
-        // If we send too fast, logs are removed before forwarding to learner.
-        // Then it triggers snapshot replication, which is not expected.
-        router
-            .wait_for_log(
-                &btreeset! {0,1},
-                Some(log_index),
-                timeout(),
-                "client write repliated to all nodes",
-            )
-            .await?;
-    }
-
-    tracing::info!("--- logs before max_applied_log_to_keep should be cleaned");
-    {
-        for node_id in 0..1 {
-            let mut sto = router.get_storage_handle(&node_id)?;
-            let logs = sto.get_log_entries(..).await?;
-            assert_eq!(2, logs.len(), "node {} should have only {} logs", node_id, 2);
-        }
-    }
-
-    Ok(())
-}
-
-fn timeout() -> Option<Duration> {
-    Some(Duration::from_millis(5000))
-}