Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kad: Change RecordStore trait interface - add results to methods. #3076

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions protocols/kad/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

- Update to `libp2p-swarm` `v0.41.0`.

- Change interface of the `RecordStore` trait. Add missed results for its operations.

# 0.41.0

- Remove deprecated `set_protocol_name()` from `KademliaConfig` & `KademliaProtocolConfig`.
Expand Down
16 changes: 11 additions & 5 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use libp2p_swarm::{
dial_opts::{self, DialOpts},
DialError, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
};
use log::{debug, info, warn};
use log::{debug, error, info, warn};
use smallvec::SmallVec;
use std::collections::{BTreeMap, HashSet, VecDeque};
use std::fmt;
Expand Down Expand Up @@ -686,7 +686,9 @@ where

if let Some(record) = self.store.get(&key) {
if record.is_expired(Instant::now()) {
self.store.remove(&key)
if let Err(ref err) = self.store.remove(&key) {
warn!("Record removal failed: {:?}", err);
};
} else {
records.push(PeerRecord {
peer: None,
Expand Down Expand Up @@ -815,12 +817,14 @@ where
/// This is a _local_ operation. However, it also has the effect that
/// the record will no longer be periodically re-published, allowing the
/// record to eventually expire throughout the DHT.
pub fn remove_record(&mut self, key: &record::Key) {
pub fn remove_record(&mut self, key: &record::Key) -> store::Result<()> {
if let Some(r) = self.store.get(key) {
if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
self.store.remove(key)
return self.store.remove(key);
}
}

Ok(())
}

/// Gets a mutable reference to the record store.
Expand Down Expand Up @@ -2094,7 +2098,9 @@ where
let record = match self.store.get(&key) {
Some(record) => {
if record.is_expired(Instant::now()) {
self.store.remove(&key);
if let Err(ref err) = self.store.remove(&key) {
error!("Record removal failed: {:?}", err);
};
None
} else {
Some(record.into_owned())
Expand Down
6 changes: 5 additions & 1 deletion protocols/kad/src/behaviour/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,11 @@ fn put_record() {
);
assert_eq!(swarms[0].behaviour_mut().queries.size(), 0);
for k in records.keys() {
swarms[0].behaviour_mut().store.remove(k);
swarms[0]
.behaviour_mut()
.store
.remove(k)
.expect("Valid response from MemoryStore.");
}
assert_eq!(swarms[0].behaviour_mut().store.records().count(), 0);
// All records have been republished, thus the test is complete.
Expand Down
5 changes: 4 additions & 1 deletion protocols/kad/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use futures::prelude::*;
use futures_timer::Delay;
use instant::Instant;
use libp2p_core::PeerId;
use log::error;
use std::collections::HashSet;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -229,7 +230,9 @@ impl PutRecordJob {
if let PeriodicJobState::Running(records) = &mut self.inner.state {
for r in records {
if r.is_expired(now) {
store.remove(&r.key)
if let Err(ref err) = store.remove(&r.key) {
error!("Record removal failed: {:?}", err);
};
} else {
return Poll::Ready(r);
}
Expand Down
6 changes: 5 additions & 1 deletion protocols/kad/src/record/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ pub enum Error {
/// The store cannot store this value because it is too large.
#[error("the value is too large to be stored")]
ValueTooLarge,

/// The store cannot remove the value.
#[error("can't remove the value from the store")]
RemoveValueError,
}

/// Trait for types implementing a record store.
Expand Down Expand Up @@ -75,7 +79,7 @@ pub trait RecordStore<'a> {
fn put(&'a mut self, r: Record) -> Result<()>;

/// Removes the record with the given key from the store.
fn remove(&'a mut self, k: &Key);
fn remove(&'a mut self, k: &Key) -> Result<()>;

/// Gets an iterator over all (value-) records currently stored.
fn records(&'a self) -> Self::RecordsIter;
Expand Down
8 changes: 6 additions & 2 deletions protocols/kad/src/record/store/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,10 @@ impl<'a> RecordStore<'a> for MemoryStore {
Ok(())
}

fn remove(&'a mut self, k: &Key) {
fn remove(&'a mut self, k: &Key) -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain in more detail, what benefits you are expecting to see from this?

Removing an element should be an idempotent operation in my opinion. What failure scenarios would you like to express here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current RecordStore implies the MemoryStore implementation that could be considered infallible which is not the case for most other custom implementations. Custom stores implemented with DB or file layers are always fallible for multiple reasons.

Just an example of a possible error for the remove operation: "No write permissions for the database file".

Also, the remove operation is just an example of the whole RecordStore API. Specifically, the most problems I have are with the records operation. My database returns a Result for such iterations and I needed to create a special empty iterator with some error logging which is not a good practice IMHO.

The original issue contains a similar concern from other users: #3035 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that DB or file system APIs are fallible. But what is the NetworkBehaviour supposed to do with the error? Log it? If there is no reasonable error handling strategy, we might as well not allow the implementation to return one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An implementation of RecordStore that interacts with a DB could also just represent a Handle that sends commands to the actual implementation, similar to how SQL DBs usually write to a journal first before modifying data on disk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that DB or file system APIs are fallible. But what is the NetworkBehaviour supposed to do with the error? Log it? If there is no reasonable error handling strategy, we might as well not allow the implementation to return one.

That depends on the use case and libp2p implementation (architecture). Some use cases are compatible with the Result already, for example, remove_record from Behaviour will return it to the caller. Some with the current code are hard to work with and logging seems the easiest change. I agree, that it will add complexity to the design, however, IMHO it makes the API more idiomatic and reflects a broader set of use cases.

An implementation of RecordStore that interacts with a DB could also just represent a Handle that sends commands to the actual implementation, similar to how SQL DBs usually write to a journal first before modifying data on disk.

I'm not sure I follow here. There are a lot of ways to implement the storage indeed and some will hide the errors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One important property I’d like to uphold is that NetworkBehaviour is about networking stuff, connection handlers care about handling connections, and other things live outside these abstractions — at least in terms of code module structure. Kademlia might require an implementation of RecordStore in its constructor, and it might use the provided functions from the NetworkBehaviour or from the ConnectionHandler, but the actual RecordStore should live outside these two pieces — also regarding resource usage.

One somewhat similar case is bitswap, where our store also is structured like this, but it has a fully synchronous API and is spawned inside a dedicated thread, with communication over async channels. Failures will need to be bubbled up to the network peer, plus logging (with careful tuning of levels — only things like “disk full” warrant a WARN or ERROR, everything else should stay below INFO, or preferably below DEBUG for per-interaction events).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would vote for option 2 (direct integration into the ConnectionHandler). However, it seems like a major change. So, my question is: whether there is any benefit from this PR as the preparation for that refactoring (that unlikely will make it in the nearest working plan)? Otherwise, feel free to reject it.

Copy link
Contributor Author

@shamil-gadelshin shamil-gadelshin Nov 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kademlia might require an implementation of RecordStore in its constructor

Did you mean this one?

pub fn new(id: PeerId, store: TStore) -> Self {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One important property I’d like to uphold is that NetworkBehaviour is about networking stuff, connection handlers care about handling connections, and other things live outside these abstractions — at least in terms of code module structure.

Agreed.

Copy link
Contributor

@thomaseizinger thomaseizinger Dec 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One important property I’d like to uphold is that NetworkBehaviour is about networking stuff, connection handlers care about handling connections, and other things live outside these abstractions — at least in terms of code module structure. Kademlia might require an implementation of RecordStore in its constructor, and it might use the provided functions from the NetworkBehaviour or from the ConnectionHandler, but the actual RecordStore should live outside these two pieces — also regarding resource usage.

The reason I am challenging it is because we pay a very big price for this separation and the gains are hard to quantify for me:

  • There is a lot of message passing between ConnectionHandlers and NetworkBehaviours, only because we can't access a shared resource directly from the handlers.
  • Because of the message passing, we can't use async-await to describe our protocols, despite them being mostly request-response.

The interface to RecordStore could still look something like this:

trait RecordStore: Clone {
	fn get_value(&self, key: Key) -> GetValueFuture;
}

This would force implementations to use channels or other things to communicate with the centralised store (self receiver is not mutable and future doesn't have a lifetime). Thus, we would still uphold the "share memory by communicating" mantra and the connections themselves would only wait for a channel wake-up but not perform any other work.

At the same time though, this RecordStore can be referenced within each ConnectionHandler, the actual network protocol can be expressed in a few LoC and even adding things like timeouts for retrieving something from the store could be added trivially.

self.records.remove(k);

Ok(())
}

fn records(&'a self) -> Self::RecordsIter {
Expand Down Expand Up @@ -234,7 +236,9 @@ mod tests {
let mut store = MemoryStore::new(PeerId::random());
assert!(store.put(r.clone()).is_ok());
assert_eq!(Some(Cow::Borrowed(&r)), store.get(&r.key));
store.remove(&r.key);
store
.remove(&r.key)
.expect("Valid response from MemoryStore.");
assert!(store.get(&r.key).is_none());
}
quickcheck(prop as fn(_))
Expand Down