Skip to content

Commit

Permalink
Change: instead of a slice, RaftStorage::append_to_log() now accept…
Browse files Browse the repository at this point in the history
…s an `IntoIterator`

Using an `IntoIterator` is more generic than using a slice, and
could avoid potential memory allocation for the slice.

Upgrade tip:

Update the method signature in the `RaftStorage` implementation and
ensure that it compiles without errors.
The method body may require minimal modifications as as the new input
type is just a more general type.
  • Loading branch information
drmingdrmer committed Mar 30, 2023
1 parent f4f582a commit 285e622
Show file tree
Hide file tree
Showing 15 changed files with 92 additions and 70 deletions.
5 changes: 3 additions & 2 deletions examples/raft-kv-memstore/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,11 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
}

#[tracing::instrument(level = "trace", skip(self, entries))]
async fn append_to_log(&mut self, entries: &[Entry<ExampleTypeConfig>]) -> Result<(), StorageError<ExampleNodeId>> {
async fn append_to_log<I>(&mut self, entries: I) -> Result<(), StorageError<ExampleNodeId>>
where I: IntoIterator<Item = Entry<ExampleTypeConfig>> + Send {
let mut log = self.log.write().await;
for entry in entries {
log.insert(entry.log_id.index, (*entry).clone());
log.insert(entry.log_id.index, entry);
}
Ok(())
}
Expand Down
5 changes: 3 additions & 2 deletions examples/raft-kv-rocksdb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,15 +431,16 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
}

#[tracing::instrument(level = "trace", skip(self, entries))]
async fn append_to_log(&mut self, entries: &[Entry<ExampleTypeConfig>]) -> StorageResult<()> {
async fn append_to_log<I>(&mut self, entries: I) -> StorageResult<()>
where I: IntoIterator<Item = Entry<ExampleTypeConfig>> + Send {
for entry in entries {
let id = id_to_bin(entry.log_id.index);
assert_eq!(bin_to_id(&id), entry.log_id.index);
self.db
.put_cf(
self.logs(),
id,
serde_json::to_vec(entry).map_err(|e| StorageIOError::write_logs(&e))?,
serde_json::to_vec(&entry).map_err(|e| StorageIOError::write_logs(&e))?,
)
.map_err(|e| StorageIOError::write_logs(&e))?;
}
Expand Down
5 changes: 3 additions & 2 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,12 @@ impl RaftStorage<Config> for Arc<MemStore> {
}

#[tracing::instrument(level = "trace", skip(self, entries))]
async fn append_to_log(&mut self, entries: &[Entry<Config>]) -> Result<(), StorageError<MemNodeId>> {
async fn append_to_log<I>(&mut self, entries: I) -> Result<(), StorageError<MemNodeId>>
where I: IntoIterator<Item = Entry<Config>> + Send {
let mut log = self.log.write().await;
for entry in entries {
let s =
serde_json::to_string(entry).map_err(|e| StorageIOError::write_log_entry(*entry.get_log_id(), &e))?;
serde_json::to_string(&entry).map_err(|e| StorageIOError::write_log_entry(*entry.get_log_id(), &e))?;
log.insert(entry.log_id.index, s);
}
Ok(())
Expand Down
18 changes: 11 additions & 7 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1330,17 +1330,21 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftRuntime
// The entries before `range.start` are discarded.
self.input_entries.drain(..range.start);

let entries = self.input_entries.drain(..(range.end - range.start)).collect::<Vec<_>>();
tracing::debug!("AppendInputEntries: {}", DisplaySlice::<_>(&entries));

if !entries.is_empty() {
self.storage.append_to_log(&entries).await?
if range.end > range.start {
tracing::debug!(
"AppendInputEntries: {},..,{}",
self.input_entries.get(0).unwrap(),
self.input_entries.get(range.end - range.start - 1).unwrap()
);
let entries = self.input_entries.drain(..(range.end - range.start));

self.storage.append_to_log(entries).await?
}
}
Command::AppendBlankLog { log_id } => {
let ent = C::Entry::new_blank(log_id);
let entry_refs = vec![ent];
self.storage.append_to_log(&entry_refs).await?
let entries = [ent];
self.storage.append_to_log(entries).await?
}
Command::SaveVote { vote } => {
self.storage.save_vote(&vote).await?;
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ where C: RaftTypeConfig
///
/// - There must not be a **hole** in logs. Because Raft only examine the last log id to ensure
/// correctness.
async fn append_to_log(&mut self, entries: &[C::Entry]) -> Result<(), StorageError<C::NodeId>>;
async fn append_to_log<I>(&mut self, entries: I) -> Result<(), StorageError<C::NodeId>>
where I: IntoIterator<Item = C::Entry> + Send;

/// Delete conflict log entries since `log_id`, inclusive.
///
Expand Down
27 changes: 19 additions & 8 deletions openraft/src/store_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,25 @@ where
self.inner().purge_logs_upto(log_id).await
}

#[tracing::instrument(level = "trace", skip(self, entries), fields(entries=display(DisplaySlice::<_>(entries))))]
async fn append_to_log(&mut self, entries: &[C::Entry]) -> Result<(), StorageError<C::NodeId>> {
self.defensive_nonempty_input(entries).await?;
self.defensive_consecutive_input(entries).await?;
self.defensive_append_log_index_is_last_plus_one(entries).await?;
self.defensive_append_log_id_gt_last(entries).await?;

self.inner().append_to_log(entries).await
#[tracing::instrument(level = "trace", skip(self, entries))]
async fn append_to_log<I>(&mut self, entries: I) -> Result<(), StorageError<C::NodeId>>
where I: IntoIterator<Item = C::Entry> + Send {
if self.is_defensive() {
let entries_vec = entries.into_iter().collect::<Vec<_>>();
tracing::debug!(
"Defensive check: append_to_log: entries={}",
DisplaySlice::<_>(&entries_vec)
);

self.defensive_nonempty_input(&entries_vec).await?;
self.defensive_consecutive_input(&entries_vec).await?;
self.defensive_append_log_index_is_last_plus_one(&entries_vec).await?;
self.defensive_append_log_id_gt_last(&entries_vec).await?;

self.inner().append_to_log(entries_vec).await
} else {
self.inner().append_to_log(entries).await
}
}

#[tracing::instrument(level = "trace", skip(self, entries), fields(entries=display(DisplaySlice::<_>(entries))))]
Expand Down
Loading

0 comments on commit 285e622

Please sign in to comment.