Skip to content

Commit

Permalink
refactor: Avoid having to build a giant vec from the store iterators
Browse files Browse the repository at this point in the history
Filtering is now done one by one
  • Loading branch information
rklaehn committed Oct 13, 2022
1 parent 8d63f49 commit e834d6b
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 36 deletions.
41 changes: 19 additions & 22 deletions protocols/kad/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
//! > out of the job to the consumer, where they can be dropped after being sent.
use crate::record::{self, store::RecordStore, ProviderRecord, Record};
use crate::store::{ProviderRecordsIter, RecordsIter};
use futures::prelude::*;
use futures_timer::Delay;
use instant::Instant;
Expand Down Expand Up @@ -135,7 +136,7 @@ pub struct PutRecordJob {
publish_interval: Option<Duration>,
record_ttl: Option<Duration>,
skipped: HashSet<record::Key>,
inner: PeriodicJob<vec::IntoIter<Record>>,
inner: PeriodicJob<RecordsIter>,
}

impl PutRecordJob {
Expand Down Expand Up @@ -197,32 +198,28 @@ impl PutRecordJob {
{
if self.inner.check_ready(cx, now) {
let publish = self.next_publish.map_or(false, |t_pub| now >= t_pub);
let records = store
.records()
.filter_map(|r| {
let is_publisher = r.publisher.as_ref() == Some(&self.local_id);
if self.skipped.contains(&r.key) || (!publish && is_publisher) {
None
} else {
let mut record = r;
if publish && is_publisher {
record.expires = record
.expires
.or_else(|| self.record_ttl.map(|ttl| now + ttl));
}
Some(record)
let mut skipped = Default::default();
std::mem::swap(&mut skipped, &mut self.skipped);
let record_ttl = self.record_ttl;
let local_id = self.local_id;
let records = Box::new(store.records().filter_map(move |r| {
let is_publisher = r.publisher.as_ref() == Some(&local_id);
if skipped.contains(&r.key) || (!publish && is_publisher) {
None
} else {
let mut record = r;
if publish && is_publisher {
record.expires = record.expires.or_else(|| record_ttl.map(|ttl| now + ttl));
}
})
.collect::<Vec<_>>()
.into_iter();
Some(record)
}
}));

// Schedule the next publishing run.
if publish {
self.next_publish = self.publish_interval.map(|i| now + i);
}

self.skipped.clear();

self.inner.state = PeriodicJobState::Running(records);
}

Expand Down Expand Up @@ -251,7 +248,7 @@ impl PutRecordJob {

/// Periodic job for replicating provider records.
pub struct AddProviderJob {
inner: PeriodicJob<vec::IntoIter<ProviderRecord>>,
inner: PeriodicJob<ProviderRecordsIter>,
}

impl AddProviderJob {
Expand Down Expand Up @@ -297,7 +294,7 @@ impl AddProviderJob {
T: RecordStore,
{
if self.inner.check_ready(cx, now) {
let records = store.provided().collect::<Vec<_>>().into_iter();
let records = store.provided();
self.inner.state = PeriodicJobState::Running(records);
}

Expand Down
10 changes: 5 additions & 5 deletions protocols/kad/src/record/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ pub enum Error {
ValueTooLarge,
}

pub type RecordsIter = Box<dyn Iterator<Item = Record> + Send + Sync + 'static>;
pub type ProviderRecordsIter = Box<dyn Iterator<Item = ProviderRecord> + Send + Sync + 'static>;

/// Trait for types implementing a record store.
///
/// There are two types of records managed by a `RecordStore`:
Expand All @@ -64,9 +67,6 @@ pub enum Error {
/// to the closest nodes to the key.
///
pub trait RecordStore {
type RecordsIter: Iterator<Item = Record>;
type ProvidedIter: Iterator<Item = ProviderRecord>;

/// Gets a record from the store, given its key.
fn get(&self, k: &Key) -> Option<Record>;

Expand All @@ -77,7 +77,7 @@ pub trait RecordStore {
fn remove(&mut self, k: &Key);

/// Gets an iterator over all (value-) records currently stored.
fn records(&self) -> Self::RecordsIter;
fn records(&self) -> RecordsIter;

/// Adds a provider record to the store.
///
Expand All @@ -91,7 +91,7 @@ pub trait RecordStore {

/// Gets an iterator over all stored provider records for which the
/// node owning the store is itself the provider.
fn provided(&self) -> Self::ProvidedIter;
fn provided(&self) -> ProviderRecordsIter;

/// Removes a provider record from the store.
fn remove_provider(&mut self, k: &Key, p: &PeerId);
Expand Down
19 changes: 10 additions & 9 deletions protocols/kad/src/record/store/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ use super::*;
use crate::kbucket;
use libp2p_core::PeerId;
use smallvec::SmallVec;
use std::collections::{hash_map, hash_set, HashMap, HashSet};
use std::iter;
use std::collections::{hash_map, HashMap, HashSet};

/// In-memory implementation of a `RecordStore`.
pub struct MemoryStore {
Expand Down Expand Up @@ -96,10 +95,6 @@ impl MemoryStore {
}

impl RecordStore for MemoryStore {
type RecordsIter = Box<dyn Iterator<Item = Record>>;

type ProvidedIter = Box<dyn Iterator<Item = ProviderRecord>>;

fn get(&self, k: &Key) -> Option<Record> {
self.records.get(k).cloned()
}
Expand Down Expand Up @@ -130,7 +125,7 @@ impl RecordStore for MemoryStore {
self.records.remove(k);
}

fn records(&self) -> Self::RecordsIter {
fn records(&self) -> RecordsIter {
Box::new(
self.records
.values()
Expand Down Expand Up @@ -196,8 +191,14 @@ impl RecordStore for MemoryStore {
.map_or_else(Vec::new, |ps| ps.clone().into_vec())
}

fn provided(&self) -> Self::ProvidedIter {
Box::new(self.provided.iter().cloned())
fn provided(&self) -> ProviderRecordsIter {
Box::new(
self.provided
.iter()
.cloned()
.collect::<Vec<_>>()
.into_iter(),
)
}

fn remove_provider(&mut self, key: &Key, provider: &PeerId) {
Expand Down

0 comments on commit e834d6b

Please sign in to comment.