Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Feature: New RaftEntry methods: log_id() and index()
Browse files Browse the repository at this point in the history
This commit adds two auto-implemented method to `RaftEntry`
implementation:
- `RaftEntry::log_id()` returns an owned `LogId<C>` instance.
- `RaftEntry::index()` returns the log index of this entry.

And populate these two method to the codebase for simplicity.
drmingdrmer committed Jan 28, 2025

Unverified

This commit is not signed, but one or more authors requires that any commit attributed to them is signed.
1 parent 8dfe854 commit 4635a94
Showing 16 changed files with 72 additions and 60 deletions.
5 changes: 3 additions & 2 deletions cluster_benchmark/tests/benchmark/store.rs
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ use std::sync::Arc;

use openraft::alias::LogIdOf;
use openraft::alias::SnapshotDataOf;
use openraft::entry::RaftEntry;
use openraft::storage::IOFlushed;
use openraft::storage::LogState;
use openraft::storage::RaftLogReader;
@@ -188,7 +189,7 @@ impl RaftLogStorage<TypeConfig> for Arc<LogStore> {

let last = match last_serialized {
None => None,
Some(ent) => Some(*ent.get_log_id()),
Some(ent) => Some(ent.log_id()),
};

let last_purged = self.last_purged_log_id.read().await.clone();
@@ -237,7 +238,7 @@ impl RaftLogStorage<TypeConfig> for Arc<LogStore> {
where I: IntoIterator<Item = Entry<TypeConfig>> + Send {
{
let mut log = self.log.write().await;
log.extend(entries.into_iter().map(|entry| (entry.get_log_id().index(), entry)));
log.extend(entries.into_iter().map(|entry| (entry.index(), entry)));
}
callback.io_completed(Ok(()));
Ok(())
6 changes: 3 additions & 3 deletions examples/memstore/src/log_store.rs
Original file line number Diff line number Diff line change
@@ -8,9 +8,9 @@ use std::sync::Arc;

use openraft::alias::LogIdOf;
use openraft::alias::VoteOf;
use openraft::entry::RaftEntry;
use openraft::storage::IOFlushed;
use openraft::LogState;
use openraft::RaftLogId;
use openraft::RaftTypeConfig;
use openraft::StorageError;
use tokio::sync::Mutex;
@@ -60,7 +60,7 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
}

async fn get_log_state(&mut self) -> Result<LogState<C>, StorageError<C>> {
let last = self.log.iter().next_back().map(|(_, ent)| ent.get_log_id().clone());
let last = self.log.iter().next_back().map(|(_, ent)| ent.log_id());

let last_purged = self.last_purged_log_id.clone();

@@ -97,7 +97,7 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
where I: IntoIterator<Item = C::Entry> {
// Simple implementation that calls the flush-before-return `append_to_log`.
for entry in entries {
self.log.insert(entry.get_log_id().index(), entry);
self.log.insert(entry.index(), entry);
}
callback.io_completed(Ok(()));

4 changes: 2 additions & 2 deletions examples/rocksstore/src/lib.rs
Original file line number Diff line number Diff line change
@@ -18,13 +18,13 @@ use std::sync::Arc;

use log_store::RocksLogStore;
use openraft::alias::SnapshotDataOf;
use openraft::entry::RaftEntry;
use openraft::storage::RaftStateMachine;
use openraft::storage::Snapshot;
use openraft::AnyError;
use openraft::Entry;
use openraft::EntryPayload;
use openraft::LogId;
use openraft::RaftLogId;
use openraft::RaftSnapshotBuilder;
use openraft::RaftTypeConfig;
use openraft::SnapshotMeta;
@@ -179,7 +179,7 @@ impl RaftStateMachine<TypeConfig> for RocksStateMachine {
for entry in entries_iter {
tracing::debug!(%entry.log_id, "replicate to sm");

sm.last_applied_log = Some(*entry.get_log_id());
sm.last_applied_log = Some(entry.log_id());

match entry.payload {
EntryPayload::Blank => res.push(RocksResponse { value: None }),
10 changes: 5 additions & 5 deletions examples/rocksstore/src/log_store.rs
Original file line number Diff line number Diff line change
@@ -11,11 +11,11 @@ use meta::StoreMeta;
use openraft::alias::EntryOf;
use openraft::alias::LogIdOf;
use openraft::alias::VoteOf;
use openraft::entry::RaftEntry;
use openraft::storage::IOFlushed;
use openraft::storage::RaftLogStorage;
use openraft::LogState;
use openraft::OptionalSend;
use openraft::RaftLogId;
use openraft::RaftLogReader;
use openraft::RaftTypeConfig;
use openraft::StorageError;
@@ -103,7 +103,7 @@ where C: RaftTypeConfig

let entry: EntryOf<C> = serde_json::from_slice(&val).map_err(read_logs_err)?;

assert_eq!(id, entry.get_log_id().index());
assert_eq!(id, entry.index());

res.push(entry);
}
@@ -128,7 +128,7 @@ where C: RaftTypeConfig
Some(res) => {
let (_log_index, entry_bytes) = res.map_err(read_logs_err)?;
let ent = serde_json::from_slice::<EntryOf<C>>(&entry_bytes).map_err(read_logs_err)?;
Some(ent.get_log_id().clone())
Some(ent.log_id())
}
};

@@ -158,8 +158,8 @@ where C: RaftTypeConfig
async fn append<I>(&mut self, entries: I, callback: IOFlushed<C>) -> Result<(), StorageError<C>>
where I: IntoIterator<Item = EntryOf<C>> + Send {
for entry in entries {
let id = id_to_bin(entry.get_log_id().index());
assert_eq!(bin_to_id(&id), entry.get_log_id().index());
let id = id_to_bin(entry.index());
assert_eq!(bin_to_id(&id), entry.index());
self.db
.put_cf(
self.cf_logs(),
8 changes: 3 additions & 5 deletions openraft/src/core/sm/worker.rs
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@ use crate::core::ApplyResult;
use crate::core::ApplyingEntry;
use crate::display_ext::DisplayOptionExt;
use crate::display_ext::DisplaySliceExt;
use crate::entry::RaftEntry;
use crate::entry::RaftPayload;
use crate::storage::RaftStateMachine;
use crate::storage::Snapshot;
@@ -23,7 +24,6 @@ use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::MpscUnboundedReceiverOf;
use crate::type_config::alias::MpscUnboundedSenderOf;
use crate::type_config::TypeConfigExt;
use crate::RaftLogId;
use crate::RaftLogReader;
use crate::RaftSnapshotBuilder;
use crate::RaftTypeConfig;
@@ -184,10 +184,8 @@ where

// Fake complain: avoid using `collect()` when not needed
#[allow(clippy::needless_collect)]
let applying_entries = entries
.iter()
.map(|e| ApplyingEntry::new(e.get_log_id().clone(), e.get_membership()))
.collect::<Vec<_>>();
let applying_entries =
entries.iter().map(|e| ApplyingEntry::new(e.log_id(), e.get_membership())).collect::<Vec<_>>();

let n_entries = end - since;

14 changes: 6 additions & 8 deletions openraft/src/engine/handler/following_handler/mod.rs
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ use crate::engine::Command;
use crate::engine::Condition;
use crate::engine::EngineConfig;
use crate::engine::EngineOutput;
use crate::entry::RaftEntry;
use crate::entry::RaftPayload;
use crate::error::RejectAppendEntries;
use crate::raft_state::IOId;
@@ -69,10 +70,10 @@ where C: RaftTypeConfig
);

if let Some(first_ent) = entries.first() {
debug_assert!(first_ent.get_log_id().index() == prev_log_id.next_index());
debug_assert!(first_ent.index() == prev_log_id.next_index());
}

let last_log_id = entries.last().map(|ent| ent.get_log_id().clone());
let last_log_id = entries.last().map(|ent| ent.log_id());
let last_log_id = std::cmp::max(prev_log_id, last_log_id);

let prev_accepted = self.state.accept_io(IOId::new_log_io(self.leader_vote.clone(), last_log_id.clone()));
@@ -84,7 +85,7 @@ where C: RaftTypeConfig
// the entries after it has to be deleted first.
// Raft requires log ids are in total order by (term,index).
// Otherwise the log id with max index makes committed entry invisible in election.
self.truncate_logs(entries[since].get_log_id().index());
self.truncate_logs(entries[since].index());

let entries = entries.split_off(since);
self.do_append_entries(entries);
@@ -143,10 +144,7 @@ where C: RaftTypeConfig
#[tracing::instrument(level = "debug", skip(self, entries))]
pub(crate) fn do_append_entries(&mut self, entries: Vec<C::Entry>) {
debug_assert!(!entries.is_empty());
debug_assert_eq!(
entries[0].get_log_id().index(),
self.state.log_ids.last().cloned().next_index(),
);
debug_assert_eq!(entries[0].index(), self.state.log_ids.last().cloned().next_index(),);
debug_assert!(Some(entries[0].get_log_id()) > self.state.log_ids.last());

self.state.extend_log_ids(&entries);
@@ -343,7 +341,7 @@ where C: RaftTypeConfig
// Find the last 2 membership config entries: the committed and the effective.
for ent in entries.rev() {
if let Some(m) = ent.get_membership() {
memberships.insert(0, StoredMembership::new(Some(ent.get_log_id().clone()), m));
memberships.insert(0, StoredMembership::new(Some(ent.log_id()), m));
if memberships.len() == 2 {
break;
}
4 changes: 2 additions & 2 deletions openraft/src/engine/handler/leader_handler/mod.rs
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ use crate::engine::handler::replication_handler::ReplicationHandler;
use crate::engine::Command;
use crate::engine::EngineConfig;
use crate::engine::EngineOutput;
use crate::entry::RaftEntry;
use crate::entry::RaftPayload;
use crate::proposer::Leader;
use crate::proposer::LeaderQuorumSet;
@@ -10,7 +11,6 @@ use crate::raft_state::IOId;
use crate::raft_state::LogStateReader;
use crate::replication::ReplicationSessionId;
use crate::type_config::alias::LogIdOf;
use crate::RaftLogId;
use crate::RaftState;
use crate::RaftTypeConfig;

@@ -67,7 +67,7 @@ where C: RaftTypeConfig
membership_entry.is_none(),
"only one membership entry is allowed in a batch"
);
membership_entry = Some((entry.get_log_id().clone(), m));
membership_entry = Some((entry.log_id(), m));
}
}

17 changes: 17 additions & 0 deletions openraft/src/entry/traits.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::fmt::Debug;
use std::fmt::Display;

use openraft_macros::since;

use crate::base::finalized::Final;
use crate::base::OptionalFeatures;
use crate::log_id::RaftLogId;
use crate::type_config::alias::LogIdOf;
@@ -34,6 +37,20 @@ where
///
/// The returned instance must return `Some()` for `Self::get_membership()`.
fn new_membership(log_id: LogIdOf<C>, m: Membership<C>) -> Self;

/// Returns the `LogId` of this entry.
#[since(version = "0.10.0")]
fn log_id(&self) -> LogIdOf<C>
where Self: Final {
self.get_log_id().clone()
}

/// Returns the index of this log entry.
#[since(version = "0.10.0")]
fn index(&self) -> u64
where Self: Final {
self.get_log_id().index()
}
}

/// Build a raft log entry from app data.
13 changes: 6 additions & 7 deletions openraft/src/proposer/leader.rs
Original file line number Diff line number Diff line change
@@ -234,7 +234,6 @@ mod tests {
use crate::type_config::TypeConfigExt;
use crate::vote::raft_vote::RaftVoteExt;
use crate::Entry;
use crate::RaftLogId;
use crate::Vote;

#[test]
@@ -297,8 +296,8 @@ mod tests {
leader.assign_log_ids(&mut entries);

assert_eq!(
entries[0].get_log_id(),
&log_id(2, 2, 4),
entries[0].log_id(),
log_id(2, 2, 4),
"entry log id assigned following last-log-id"
);
assert_eq!(Some(log_id(2, 2, 4)), leader.last_log_id);
@@ -312,7 +311,7 @@ mod tests {
let mut entries: Vec<Entry<UTConfig>> = vec![blank_ent(1, 1, 1)];
leading.assign_log_ids(&mut entries);

assert_eq!(entries[0].get_log_id(), &log_id(0, 0, 0),);
assert_eq!(entries[0].log_id(), log_id(0, 0, 0),);
assert_eq!(Some(log_id(0, 0, 0)), leading.last_log_id);
}

@@ -336,9 +335,9 @@ mod tests {
let mut entries: Vec<Entry<UTConfig>> = vec![blank_ent(1, 1, 1), blank_ent(1, 1, 1), blank_ent(1, 1, 1)];

leading.assign_log_ids(&mut entries);
assert_eq!(entries[0].get_log_id(), &log_id(2, 2, 9));
assert_eq!(entries[1].get_log_id(), &log_id(2, 2, 10));
assert_eq!(entries[2].get_log_id(), &log_id(2, 2, 11));
assert_eq!(entries[0].log_id(), log_id(2, 2, 9));
assert_eq!(entries[1].log_id(), log_id(2, 2, 10));
assert_eq!(entries[2].log_id(), log_id(2, 2, 11));
assert_eq!(Some(log_id(2, 2, 11)), leading.last_log_id);
}

3 changes: 2 additions & 1 deletion openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ use crate::core::notification::Notification;
use crate::core::sm::handle::SnapshotReader;
use crate::display_ext::DisplayInstantExt;
use crate::display_ext::DisplayOptionExt;
use crate::entry::RaftEntry;
use crate::error::HigherVote;
use crate::error::PayloadTooLarge;
use crate::error::RPCError;
@@ -398,7 +399,7 @@ where
let logs = self.log_reader.limited_get_log_entries(start, end).await?;

let first = logs.first().map(|x| x.get_log_id()).unwrap();
let last = logs.last().map(|x| x.get_log_id().clone()).unwrap();
let last = logs.last().map(|x| x.log_id()).unwrap();

debug_assert!(
!logs.is_empty() && logs.len() <= (end - start) as usize,
8 changes: 4 additions & 4 deletions openraft/src/storage/helper.rs
Original file line number Diff line number Diff line change
@@ -7,8 +7,8 @@ use validit::Valid;

use crate::display_ext::DisplayOptionExt;
use crate::engine::LogIdList;
use crate::entry::RaftEntry;
use crate::entry::RaftPayload;
use crate::log_id::RaftLogId;
use crate::raft_state::IOState;
use crate::storage::log_reader_ext::RaftLogReaderExt;
use crate::storage::RaftLogStorage;
@@ -193,8 +193,8 @@ where
let chunk_end = std::cmp::min(end, start + chunk_size);
let entries = log_reader.try_get_log_entries(start..chunk_end).await?;

let first = entries.first().map(|x| x.get_log_id().index());
let last = entries.last().map(|x| x.get_log_id().index());
let first = entries.first().map(|ent| ent.index());
let last = entries.last().map(|ent| ent.index());

let make_err = || {
let err = AnyError::error(format!(
@@ -299,7 +299,7 @@ where

for ent in entries.iter().rev() {
if let Some(mem) = ent.get_membership() {
let em = StoredMembership::new(Some(ent.get_log_id().clone()), mem);
let em = StoredMembership::new(Some(ent.log_id()), mem);
res.insert(0, em);
if res.len() == 2 {
return Ok(res);
4 changes: 2 additions & 2 deletions openraft/src/storage/log_reader_ext.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use anyerror::AnyError;
use openraft_macros::add_async_trait;

use crate::entry::RaftEntry;
use crate::type_config::alias::LogIdOf;
use crate::RaftLogId;
use crate::RaftLogReader;
use crate::RaftTypeConfig;
use crate::StorageError;
@@ -30,7 +30,7 @@ where C: RaftTypeConfig
));
}

Ok(entries[0].get_log_id().clone())
Ok(entries[0].log_id())
}
}

4 changes: 2 additions & 2 deletions openraft/src/storage/v2/raft_log_storage_ext.rs
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ use openraft_macros::add_async_trait;
use crate::async_runtime::MpscUnboundedReceiver;
use crate::async_runtime::MpscUnboundedSender;
use crate::core::notification::Notification;
use crate::log_id::RaftLogId;
use crate::entry::RaftEntry;
use crate::raft_state::io_state::io_id::IOId;
use crate::storage::IOFlushed;
use crate::storage::RaftLogStorage;
@@ -31,7 +31,7 @@ where C: RaftTypeConfig
{
let entries = entries.into_iter().collect::<Vec<_>>();

let last_log_id = entries.last().unwrap().get_log_id().clone();
let last_log_id = entries.last().unwrap().log_id();

let (tx, mut rx) = C::mpsc_unbounded();

21 changes: 10 additions & 11 deletions openraft/src/testing/log/suite.rs
Original file line number Diff line number Diff line change
@@ -11,7 +11,6 @@ use crate::async_runtime::MpscUnboundedReceiver;
use crate::async_runtime::MpscUnboundedSender;
use crate::core::notification::Notification;
use crate::entry::RaftEntry;
use crate::log_id::RaftLogId;
use crate::membership::EffectiveMembership;
use crate::raft_state::io_state::io_id::IOId;
use crate::raft_state::LogStateReader;
@@ -845,8 +844,8 @@ where
let logs = store.try_get_log_entries(5..7).await?;

assert_eq!(logs.len(), 2);
assert_eq!(*logs[0].get_log_id(), log_id_0(1, 5));
assert_eq!(*logs[1].get_log_id(), log_id_0(1, 6));
assert_eq!(logs[0].log_id(), log_id_0(1, 5));
assert_eq!(logs[1].log_id(), log_id_0(1, 6));
}

Ok(())
@@ -867,7 +866,7 @@ where

assert!(!logs.is_empty());
assert!(logs.len() <= 2);
assert_eq!(*logs[0].get_log_id(), log_id_0(1, 5));
assert_eq!(logs[0].log_id(), log_id_0(1, 5));
}

Ok(())
@@ -883,13 +882,13 @@ where
C::sleep(Duration::from_millis(1_000)).await;

let ent = store.try_get_log_entry(3).await?;
assert_eq!(Some(log_id_0(1, 3)), ent.map(|x| x.get_log_id().clone()));
assert_eq!(Some(log_id_0(1, 3)), ent.map(|x| x.log_id()));

let ent = store.try_get_log_entry(0).await?;
assert_eq!(None, ent.map(|x| x.get_log_id().clone()));
assert_eq!(None, ent.map(|x| x.log_id()));

let ent = store.try_get_log_entry(11).await?;
assert_eq!(None, ent.map(|x| x.get_log_id().clone()));
assert_eq!(None, ent.map(|x| x.log_id()));

Ok(())
}
@@ -1072,7 +1071,7 @@ where

let logs = store.try_get_log_entries(0..100).await?;
assert_eq!(logs.len(), 10);
assert_eq!(logs[0].get_log_id().index(), 1);
assert_eq!(logs[0].index(), 1);

assert_eq!(
LogState {
@@ -1097,7 +1096,7 @@ where

let logs = store.try_get_log_entries(0..100).await?;
assert_eq!(logs.len(), 5);
assert_eq!(logs[0].get_log_id().index(), 6);
assert_eq!(logs[0].index(), 6);

assert_eq!(
LogState {
@@ -1189,7 +1188,7 @@ where
let last = store.try_get_log_entries(0..).await?.into_iter().last().unwrap();

assert_eq!(l, 11, "expected 11 entries to exist in the log");
assert_eq!(*last.get_log_id(), log_id_0(2, 11), "unexpected log id");
assert_eq!(last.log_id(), log_id_0(2, 11), "unexpected log id");
Ok(())
}

@@ -1441,7 +1440,7 @@ where
{
let entries = entries.into_iter().collect::<Vec<_>>();

let last_log_id = entries.last().unwrap().get_log_id().clone();
let last_log_id = entries.last().unwrap().log_id();

let (tx, mut rx) = C::mpsc_unbounded();

7 changes: 3 additions & 4 deletions stores/memstore/src/lib.rs
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@ use std::sync::Arc;
use std::sync::Mutex;

use openraft::alias::SnapshotDataOf;
use openraft::entry::RaftEntry;
use openraft::storage::IOFlushed;
use openraft::storage::LogState;
use openraft::storage::RaftLogReader;
@@ -26,7 +27,6 @@ use openraft::Entry;
use openraft::EntryPayload;
use openraft::LogId;
use openraft::OptionalSend;
use openraft::RaftLogId;
use openraft::SnapshotMeta;
use openraft::StorageError;
use openraft::StoredMembership;
@@ -336,7 +336,7 @@ impl RaftLogStorage<TypeConfig> for Arc<MemLogStore> {
Some(serialized) => {
let ent: Entry<TypeConfig> =
serde_json::from_str(serialized).map_err(|e| StorageError::read_logs(&e))?;
Some(*ent.get_log_id())
Some(ent.log_id())
}
};

@@ -392,8 +392,7 @@ impl RaftLogStorage<TypeConfig> for Arc<MemLogStore> {
where I: IntoIterator<Item = Entry<TypeConfig>> + OptionalSend {
let mut log = self.log.write().await;
for entry in entries {
let s =
serde_json::to_string(&entry).map_err(|e| StorageError::write_log_entry(*entry.get_log_id(), &e))?;
let s = serde_json::to_string(&entry).map_err(|e| StorageError::write_log_entry(entry.log_id(), &e))?;
log.insert(entry.log_id.index(), s);
}

4 changes: 2 additions & 2 deletions tests/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
@@ -52,7 +52,6 @@ use openraft::LogIdOptionExt;
use openraft::OptionalSend;
use openraft::RPCTypes;
use openraft::Raft;
use openraft::RaftLogId;
use openraft::RaftLogReader;
use openraft::RaftMetrics;
use openraft::RaftState;
@@ -193,6 +192,7 @@ impl fmt::Display for Direction {

use openraft::alias::LogIdOf;
use openraft::alias::VoteOf;
use openraft::entry::RaftEntry;
use openraft::network::v2::RaftNetworkV2;
use openraft::vote::RaftLeaderId;
use openraft::vote::RaftLeaderIdExt;
@@ -1052,7 +1052,7 @@ impl RaftNetworkV2<MemConfig> for RaftRouterNetwork {
rpc.entries.truncate(quota as usize);
*x = Some(0);
if let Some(last) = rpc.entries.last() {
Some(Some(*last.get_log_id()))
Some(Some(last.log_id()))
} else {
Some(rpc.prev_log_id)
}

0 comments on commit 4635a94

Please sign in to comment.