Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update libp2p_kad::store::RecordStore trait to be amenable to a persistent implementation #4817

Open
nathanielc opened this issue Nov 7, 2023 · 17 comments

Comments

@nathanielc
Copy link
Contributor

nathanielc commented Nov 7, 2023

Description

The libp2p_kad::store::RecordStore trait currently has a design that makes it difficult to implement a persistent backend.

There are challenges with the current design of the trait that make this difficult:

  • An Instant is used in the record types which prevents valid serialization/deserialization to a persistent store.
  • The API assumes that all data fits easily in memory, i.e. doesn't have a way to batch or otherwise partition the set of records.
  • The API does not have an async paradigm. The API does not follow either a poll model or a async/await model allowing for efficient system IO to a persistent store.

Instant Serialization

Specifically the ProviderRecord and Record types contain an Instant which by design cannot be serialized/deserialized.

I suggest we change the time type used from Instant to SystemTime. The trade off is that SystemTime is not guaranteed to be monotonic, i.e the system clock can be modified and so a time that was expected to be in the future may not be. However its possible to serialize/desialize a SystemTime (i.e. using seconds since the Unix Epoch). Time scales typically involved in record expiration are typically hours, at this scale its uncommon to see changes in monotinicity of a SystemTime.

Memory Pressure

The provided method produces an iterator over all entries in the store. Without a mechanism to paginate or resume from a cursor in the store the iterator may block other concurrent requests to the underlying store (i.e. sqlite).

Async API

Its not clear from the trait API or docs how/if any async system IO can be performed efficiently by an implementer. Are methods called concurrently? If system IO blocks the current thread will that potentially create a deadlock in the calling code? A persistent implementation needs answers to these questions.

Motivation

We have a use case where we are storing on the order of 100K - 10M provider records. Storing all of this data in memory is in efficient. Additionally we need the set of records to be persistent across restarts of the process.

Based on the design of the trait we have a few choices:

  • Persist the data in a different store and populate the memory store on startup. This has challenges of keeping the two stores in sync and does not solve the memory pressure needs.
  • Implement the RecordStore trait using a persistent store hacking around the Instant serialization problem and blocking the current thread on system IO. Its not clear what the performance impact of such a design would be.

Current Implementation

The current implementation has one other limitation. While the records and provided methods return an iterator over the data, the iterator is immediately cloned/collected into a heap allocated vector. This means not only would we need to update the trait API but also update the consuming code to be memory efficient.

Are you planning to do it yourself in a pull request ?

Maybe

@thomaseizinger
Copy link
Contributor

Related: #3035.

@thomaseizinger
Copy link
Contributor

There is a long discussion in #3076 that is highly relevant for this.

I am still of the same opinion that I think we should design RecordStore to:

  • Implement Clone
  • Return Futures that don't have lifetimes
  • Take &self

As a result, we can delete a lot of code in kademlia that just does message-passing between handler and behaviour. It also means fallible and async implementations of the store are trivial. It works out of the box (contrary to the "always emit and event and have the user submit the response"-model).

@mxinden Thumbed-up the last comment there but I am not sure if that means he agrees with this direction overall or just the argument in particular.

Happy to mentor you through this if you are open to send a PR :)

@mxinden
Copy link
Member

mxinden commented Nov 12, 2023

I am still of the same opinion that I think we should design RecordStore to:

  • Implement Clone

  • Return Futures that don't have lifetimes

  • Take &self

Sounds good to me.

@nathanielc
Copy link
Contributor Author

nathanielc commented Nov 13, 2023

I am still of the same opinion that I think we should design RecordStore to:

  • Implement Clone
  • Return Futures that don't have lifetimes
  • Take &self

Sounds good to me.

Me too.

So in summary combining feedback from these several issues we have:

  • Implement Clone
  • Return Futures that don't have lifetimes
  • Take &self
  • Change Instant to SytemTime
  • Futures return Result<,> from methods. So all operations can be failable

However with these changes we still do not have anything that addresses the memory pressure. Is there something we can change about the records and provided methods to allow for some kind of batching/paging of the records? Maybe make the trait generic over a Cursor type that is opaque to the kademlia code but is returned and passed back into the records and provided calls to resume where it left off?

@thomaseizinger
Copy link
Contributor

thomaseizinger commented Nov 13, 2023

Looking at the interface, what are these functions actually used for? We don't need them within libp2p-kad, do we?

Thus we could also just remove them and move the responsibility of providing such a function to the concrete type that implements Store. If we don't have any functions that are abstract over these iterators, we don't need to include them in the interface.

@nathanielc
Copy link
Contributor Author

They are used for the republish logic. The current implementation calls provided and collects all records into a Vec and starts to publish them in small batches. In our use we have disabled the republish logic and implemented it outside of the protocol implementation because of these limitations. I would support removing that logic and letting concrete types manage it externally.

@thomaseizinger
Copy link
Contributor

Could we implement republishing by just returning a list of keys and then fetch each key individually? That would reduce the memory consumption to only fit all keys into memory.

It would result in O(n) queries to republish which could be expensive if we are talking to an external DB.

Alternatively, we could return a Stream inside for those functions but then we have to be careful in regards to allocations and probably use Bytes or something internally.

@nathanielc
Copy link
Contributor Author

For our use case we are using provider records exclusively and so the key is the significant portion of the data. We expect 10M+ keys for a single peer. That's a lot of memory to use when we only need to access the memory about once every 12h and whenever its queried.

A Stream sounds like a workable solution to me.

@thomaseizinger
Copy link
Contributor

For our use case we are using provider records exclusively and so the key is the significant portion of the data. We expect 10M+ keys for a single peer. That's a lot of memory to use when we only need to access the memory about once every 12h and whenever its queried.

The memory would only be consumed when you want to re-publish. I was thinking of something like:

pub trait RecordStore {
	fn record_keys(&self) -> Vec<Key>;
}

For 10M+ keys, this would mean allocating ~320MB (assuming a key-size of 32bytes) in order to iterate over it, get every record from the store and republish it. But yeah, it is O(n) so not ideal.

One issue with making records and provided a Stream is that the PeriodicJob will need to poll it. Currently, this polling happens within NetworkBehaviour::poll. We should avoid doing any form of IO in there because it will impact the latency with which we can handle incoming events. So far, rust-libp2p offloads any form of IO or heavy computation to the ConnectionHandlers which all run in a separate task.

We can say that the Futures and Streams returned from RecordStore MUST NOT perform any IO but use channels or something like that. We could enforce that by having RecordStore return an mpsc::Receiver<Record>.

@nathanielc
Copy link
Contributor Author

Thanks for the context, re IO in behaviors vs handlers that's really helpful to understand.

Currently we use a custom implementation to publish the records. Instead of once every publish interval waking up and starting to publish the records we continuously publish small batches of records during the whole publish interval. This way we amortize the work across the interval. We have enough records it can take the whole interval of 12 hours to publish them. With the current implementation that would basically mean we always have a copy of the keys on disk and in memory constantly.

What if we split these concerns of re-publishing and the store into two different traits?

  • RecordStore - only does get/put of records
  • PublishJob - a job that can be pulled for a batch of records that need to be re-published

The RecordStore would not need to be to have the records/provided methods anymore keeping it simple and focused on serving queries from the network.

The PublishJob trait would be a Stream that returns a set of keys. Once those keys are re-published (or replicated ) the Stream is polled for a new batch. This way the kademlia protocol doesn't have to concern itself with passing data around from the RecordStore to the publisher logic.

The current implemtation of the publish job can be refactored in terms of this trait and so most users can simply consume that logic. Then for these more complicated uses cases like our we can implement our own version of the PublishJob to play our amortize games etc.

It seems like having two traits could actually be simpler than having one more complex trait. Thoughts?

@nathanielc
Copy link
Contributor Author

As an aside once you get enough records it can become more efficient to invert the republish logic. The current implementation iterates through each record and contacts the closest peers in the DHT to republish. Instead you can walk the peers in the DHT and query which records you need to publish to that node. Once the number of records is significantly greater than the number of peers in the network this inverted logic is much more efficient.

So I can see a world in which there are two long lived implementations of the PublishJob for the kad protocol. One were its expect that number of peers >> number of records and another where number of records >> number of peers. It would be reasonable to have both of these implementations as part of the libp2p_kad crate itself.

@thomaseizinger
Copy link
Contributor

What if we split these concerns of re-publishing and the store into two different traits?

* RecordStore - only does get/put of records

* PublishJob - a job that can be pulled for a batch of records that need to be re-published

How does the PublishJob learn about the records that need to be re-published? Would it internally connect to the same record store, e.g. by accessing a shared hashmap or using the same DB connection?

We'd have the most flexibility if we'd remove the RecordStore entirely from libp2p-kad and have the user handle this. But that is a pretty low-effort solution because it means libp2p-kad doesn't work out of the box.

Perhaps a wrapper around libp2p_kad::Behaviour would be ideal? We could have a "low-level" behaviour that doesn't have a record store and emits events for each incoming message & requires passing the response back. The wrapper would also handle re-publishing.

More elaborate use-cases could then create their own wrapper behaviour.

What do you think?

@mxinden
Copy link
Member

mxinden commented Nov 15, 2023

As an aside once you get enough records it can become more efficient to invert the republish logic. The current implementation iterates through each record and contacts the closest peers in the DHT to republish. Instead you can walk the peers in the DHT and query which records you need to publish to that node. Once the number of records is significantly greater than the number of peers in the network this inverted logic is much more efficient.

The go-libp2p accelerated DHT client does something similar, where it sorts the to-be-published records by key and then walks the DHT, sending multiple records to the same peer in one go.

Agreed that this is a cool optimization. That said, I suggest narrowing the scope of this issue, i.e. do this in a future effort.

@nathanielc
Copy link
Contributor Author

How does the PublishJob learn about the records that need to be re-published? Would it internally connect to the same record store, e.g. by accessing a shared hashmap or using the same DB connection?

This is what I was thinking, however I like the low-level behavior design as well. Its the idea the rust-libp2p would contain two kad behaviors? The low level one and the wrapper that does what the current implementation does using an in memory store? I.e. split the existing one into two behaviors?

Agreed that this is a cool optimization. That said, I suggest narrowing the scope of this issue, i.e. do this in a future effort.

Agreed, at this point I am trying to understand the general direction. Adding an optimized kad republish logic is definitely a separate issue.

@nathanielc
Copy link
Contributor Author

Another question re the two behaviours design.

My understanding is that to pass messages between behaviours that happens outside the swarm. Meaning what ever type contains the swarm is responsible for calling the methods on the wrapping behavior when it gets events from the low-level behaviour.How would we ship that out of the box?

@thomaseizinger
Copy link
Contributor

Its the idea the rust-libp2p would contain two kad behaviors? The low level one and the wrapper that does what the current implementation does using an in memory store? I.e. split the existing one into two behaviors?

Yes, that is the idea. Similar to how e.g. libp2p-rendezvous wraps the libp2p-request-response behaviour:

inner: libp2p_request_response::Behaviour<crate::codec::Codec>,

My understanding is that to pass messages between behaviours that happens outside the swarm. Meaning what ever type contains the swarm is responsible for calling the methods on the wrapping behavior when it gets events from the low-level behaviour.How would we ship that out of the box?

Not quite. Because one wraps the other, it receives all events emitted by the inner one and can selective answer them and/or forward them. Each behaviour would probably have their own Event type. I'd lean towards making the one with the MemoryStore the "default" behaviour, i.e. libp2p_kad::Behaviour and expose and additional libp2p_kad::low_level::{Behaviour,Event} that allows for more customization.

@thomaseizinger
Copy link
Contributor

Another alternative could be to entirely replace the current MemoryStore with SQLite and by default, use the SQLite :memory: database.

That would avoid the need for creating several layers of abstraction but be a more closed design, yet likely easier to ship.

Splitting the behaviour on the other hand gives users more freedom but it also means they might have to write more code and makes it less likely that we can provide something for that out of the box. For example, if access to the record store requires IO, we'd have to pull in an executor for the republishing logic whereas you can just unconditionally depend on tokio in your application or create yourself a libp2p-kad-ceramic crate that combines tokio, sqlite and an advanced republish logic .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants