From 8a454bcadc933bc3e7e1fd7f1e71e29609943c49 Mon Sep 17 00:00:00 2001 From: Shamil Gadelshin Date: Thu, 3 Nov 2022 14:07:24 +0700 Subject: [PATCH] kad: Refactor record store. - adding Result to the `remove` operation for the RecordStore trait. --- protocols/kad/CHANGELOG.md | 2 ++ protocols/kad/src/behaviour.rs | 16 +++++++++++----- protocols/kad/src/behaviour/test.rs | 6 +++++- protocols/kad/src/jobs.rs | 5 ++++- protocols/kad/src/record/store.rs | 6 +++++- protocols/kad/src/record/store/memory.rs | 8 ++++++-- 6 files changed, 33 insertions(+), 10 deletions(-) diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index 3d996c677a3..e5e70b843f8 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -4,6 +4,8 @@ - Update to `libp2p-swarm` `v0.41.0`. +- Change interface of the `RecordStore` trait. Add missed results for its operations. + # 0.41.0 - Remove deprecated `set_protocol_name()` from `KademliaConfig` & `KademliaProtocolConfig`. diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index b267f87d386..8d376618065 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -46,7 +46,7 @@ use libp2p_swarm::{ dial_opts::{self, DialOpts}, DialError, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, }; -use log::{debug, info, warn}; +use log::{debug, error, info, warn}; use smallvec::SmallVec; use std::collections::{BTreeMap, HashSet, VecDeque}; use std::fmt; @@ -686,7 +686,9 @@ where if let Some(record) = self.store.get(&key) { if record.is_expired(Instant::now()) { - self.store.remove(&key) + if let Err(ref err) = self.store.remove(&key) { + warn!("Record removal failed: {:?}", err); + }; } else { records.push(PeerRecord { peer: None, @@ -815,12 +817,14 @@ where /// This is a _local_ operation. However, it also has the effect that /// the record will no longer be periodically re-published, allowing the /// record to eventually expire throughout the DHT. - pub fn remove_record(&mut self, key: &record::Key) { + pub fn remove_record(&mut self, key: &record::Key) -> store::Result<()> { if let Some(r) = self.store.get(key) { if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) { - self.store.remove(key) + return self.store.remove(key); } } + + Ok(()) } /// Gets a mutable reference to the record store. @@ -2094,7 +2098,9 @@ where let record = match self.store.get(&key) { Some(record) => { if record.is_expired(Instant::now()) { - self.store.remove(&key); + if let Err(ref err) = self.store.remove(&key) { + error!("Record removal failed: {:?}", err); + }; None } else { Some(record.into_owned()) diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index c61ffaf158f..e8903ce1bf6 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -711,7 +711,11 @@ fn put_record() { ); assert_eq!(swarms[0].behaviour_mut().queries.size(), 0); for k in records.keys() { - swarms[0].behaviour_mut().store.remove(k); + swarms[0] + .behaviour_mut() + .store + .remove(k) + .expect("Valid response from MemoryStore."); } assert_eq!(swarms[0].behaviour_mut().store.records().count(), 0); // All records have been republished, thus the test is complete. diff --git a/protocols/kad/src/jobs.rs b/protocols/kad/src/jobs.rs index 8855026e8d5..83096248707 100644 --- a/protocols/kad/src/jobs.rs +++ b/protocols/kad/src/jobs.rs @@ -66,6 +66,7 @@ use futures::prelude::*; use futures_timer::Delay; use instant::Instant; use libp2p_core::PeerId; +use log::error; use std::collections::HashSet; use std::pin::Pin; use std::task::{Context, Poll}; @@ -229,7 +230,9 @@ impl PutRecordJob { if let PeriodicJobState::Running(records) = &mut self.inner.state { for r in records { if r.is_expired(now) { - store.remove(&r.key) + if let Err(ref err) = store.remove(&r.key) { + error!("Record removal failed: {:?}", err); + }; } else { return Poll::Ready(r); } diff --git a/protocols/kad/src/record/store.rs b/protocols/kad/src/record/store.rs index 9e75e5a3e8a..43df77f65bc 100644 --- a/protocols/kad/src/record/store.rs +++ b/protocols/kad/src/record/store.rs @@ -44,6 +44,10 @@ pub enum Error { /// The store cannot store this value because it is too large. #[error("the value is too large to be stored")] ValueTooLarge, + + /// The store cannot remove the value. + #[error("can't remove the value from the store")] + RemoveValueError, } /// Trait for types implementing a record store. @@ -75,7 +79,7 @@ pub trait RecordStore<'a> { fn put(&'a mut self, r: Record) -> Result<()>; /// Removes the record with the given key from the store. - fn remove(&'a mut self, k: &Key); + fn remove(&'a mut self, k: &Key) -> Result<()>; /// Gets an iterator over all (value-) records currently stored. fn records(&'a self) -> Self::RecordsIter; diff --git a/protocols/kad/src/record/store/memory.rs b/protocols/kad/src/record/store/memory.rs index 39d17d37c2b..189611b84e1 100644 --- a/protocols/kad/src/record/store/memory.rs +++ b/protocols/kad/src/record/store/memory.rs @@ -131,8 +131,10 @@ impl<'a> RecordStore<'a> for MemoryStore { Ok(()) } - fn remove(&'a mut self, k: &Key) { + fn remove(&'a mut self, k: &Key) -> Result<()> { self.records.remove(k); + + Ok(()) } fn records(&'a self) -> Self::RecordsIter { @@ -234,7 +236,9 @@ mod tests { let mut store = MemoryStore::new(PeerId::random()); assert!(store.put(r.clone()).is_ok()); assert_eq!(Some(Cow::Borrowed(&r)), store.get(&r.key)); - store.remove(&r.key); + store + .remove(&r.key) + .expect("Valid response from MemoryStore."); assert!(store.get(&r.key).is_none()); } quickcheck(prop as fn(_))