Skip to content

Commit

Permalink
Implement external iteration for AMT range iteration (#1965)
Browse files Browse the repository at this point in the history
part of #1861

Co-authored-by: Rod Vagg <[email protected]>
  • Loading branch information
Stebalien and rvagg authored Jan 25, 2024
1 parent 9179b45 commit daefd6b
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 202 deletions.
50 changes: 19 additions & 31 deletions ipld/amt/src/amt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ where
/// assert_eq!(num_traversed, 3);
/// assert_eq!(next_idx, Some(10));
/// ```
#[deprecated = "use `.iter_from()` and `.take(limit)` instead"]
pub fn for_each_ranged<F>(
&self,
start_at: Option<u64>,
Expand All @@ -517,25 +518,16 @@ where
where
F: FnMut(u64, &V) -> anyhow::Result<()>,
{
if let Some(start_at) = start_at {
if start_at >= nodes_for_height(self.bit_width(), self.height() + 1) {
return Ok((0, None));
let mut num_traversed = 0;
for kv in self.iter_from(start_at.unwrap_or(0))? {
let (k, v) = kv?;
if limit.map(|l| num_traversed >= l).unwrap_or(false) {
return Ok((num_traversed, Some(k)));
}
num_traversed += 1;
f(k, v)?;
}

let (_, num_traversed, next_index) = self.root.node.for_each_while_ranged(
&self.block_store,
start_at,
limit,
self.height(),
self.bit_width(),
0,
&mut |i, v| {
f(i, v)?;
Ok(true)
},
)?;
Ok((num_traversed, next_index))
Ok((num_traversed, None))
}

/// Iterates over values in the Amt and runs a function on the values, for as long as that
Expand All @@ -547,6 +539,7 @@ where
/// `limit` elements have been traversed. Returns a tuple describing the number of elements
/// iterated over and optionally the index of the next element in the AMT if more elements
/// remain.
#[deprecated = "use `.iter_from()` and `.take(limit)` instead"]
pub fn for_each_while_ranged<F>(
&self,
start_at: Option<u64>,
Expand All @@ -556,22 +549,17 @@ where
where
F: FnMut(u64, &V) -> anyhow::Result<bool>,
{
if let Some(start_at) = start_at {
if start_at >= nodes_for_height(self.bit_width(), self.height() + 1) {
return Ok((0, None));
let mut num_traversed = 0;
let mut keep_going = true;
for kv in self.iter_from(start_at.unwrap_or(0))? {
let (k, v) = kv?;
if !keep_going || limit.map(|l| num_traversed >= l).unwrap_or(false) {
return Ok((num_traversed, Some(k)));
}
num_traversed += 1;
keep_going = f(k, v)?;
}

let (_, num_traversed, next_index) = self.root.node.for_each_while_ranged(
&self.block_store,
start_at,
limit,
self.height(),
self.bit_width(),
0,
&mut f,
)?;
Ok((num_traversed, next_index))
Ok((num_traversed, None))
}

/// Iterates over each value in the Amt and runs a function on the values that allows modifying
Expand Down
22 changes: 12 additions & 10 deletions ipld/amt/src/diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::CborStore;
use serde::{de::DeserializeOwned, Serialize};

use crate::iter::Iter;
use crate::node::{CollapsedNode, Link};
use crate::root::version;

use super::*;

Expand Down Expand Up @@ -108,14 +110,14 @@ where
Node::Leaf { vals } => vals.len(),
Node::Link { links } => links.len(),
});
node.for_each_while(ctx.store, ctx.height, ctx.bit_width, offset, &mut |i, x| {
for kv in Iter::<_, _, version::V3>::new(node, ctx.store, ctx.height, ctx.bit_width, offset) {
let (k, v) = kv?;
changes.push(Change {
key: i,
key: k,
before: None,
after: Some(x.clone()),
after: Some(v.clone()),
});
Ok(true)
})?;
}

Ok(changes)
}
Expand All @@ -133,14 +135,14 @@ where
Node::Leaf { vals } => vals.len(),
Node::Link { links } => links.len(),
});
node.for_each_while(ctx.store, ctx.height, ctx.bit_width, offset, &mut |i, x| {
for kv in Iter::<_, _, version::V3>::new(node, ctx.store, ctx.height, ctx.bit_width, offset) {
let (k, v) = kv?;
changes.push(Change {
key: i,
before: Some(x.clone()),
key: k,
before: Some(v.clone()),
after: None,
});
Ok(true)
})?;
}

Ok(changes)
}
Expand Down
136 changes: 126 additions & 10 deletions ipld/amt/src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::node::CollapsedNode;
use crate::node::{Link, Node};
use crate::MAX_INDEX;
use crate::{nodes_for_height, Error};
use anyhow::anyhow;
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::ser::Serialize;
use fvm_ipld_encoding::CborStore;
Expand All @@ -30,7 +31,7 @@ where
/// let kvs: Vec<u64> = (0..=5).collect();
/// kvs
/// .iter()
/// .map(|k| amt.set(u64::try_from(*k).unwrap(), k.to_string()))
/// .map(|k| amt.set(*k, k.to_string()))
/// .collect::<Vec<_>>();
///
/// for kv in &amt {
Expand All @@ -41,17 +42,111 @@ where
/// # anyhow::Ok(())
/// ```
pub fn iter(&self) -> Iter<'_, V, &BS, Ver> {
Iter {
stack: vec![IterStack {
node: &self.root.node,
idx: 0,
}],
height: self.root.height,
Iter::new(
&self.root.node,
&self.block_store,
self.height(),
self.bit_width(),
0,
)
}

/// Iterate over the AMT from the given starting point.
///
/// ```rust
/// use fvm_ipld_amt::Amt;
/// use fvm_ipld_blockstore::MemoryBlockstore;
///
/// let store = MemoryBlockstore::default();
///
/// let mut amt = Amt::new(store);
/// let kvs: Vec<u64> = (0..=5).collect();
/// kvs
/// .iter()
/// .map(|k| amt.set(*k, k.to_string()))
/// .collect::<Vec<_>>();
///
/// for kv in amt.iter_from(3)? {
/// let (k, v) = kv?;
/// println!("{k:?}: {v:?}");
/// }
///
/// # anyhow::Ok(())
/// ```
pub fn iter_from(&self, start: u64) -> Result<Iter<'_, V, &BS, Ver>, Error> {
// Short-circuit when we're starting at 0.
if start == 0 {
return Ok(self.iter());
}

let height = self.height();
let bit_width = self.bit_width();

// Fast-path for case where start is beyond what we know this amt could currently contain.
if start >= nodes_for_height(bit_width, height + 1) {
return Ok(Iter {
height,
bit_width,
stack: Vec::new(),
blockstore: &self.block_store,
ver: PhantomData,
key: start,
});
}

let mut stack = Vec::with_capacity(height as usize);
let mut node = &self.root.node;
let mut offset = 0;
loop {
let start_idx = start.saturating_sub(offset);
match node {
Node::Leaf { vals } => {
if start_idx >= vals.len() as u64 {
// Not deep enough.
return Err(anyhow!("incorrect height for tree depth: expected values at depth {}, found them at {}", height, stack.len()).into());
}
stack.push(IterStack {
node,
idx: start_idx as usize,
});
break;
}
Node::Link { links } => {
let nfh =
nodes_for_height(self.bit_width(), self.height() - stack.len() as u32);
let idx: usize = (start_idx / nfh).try_into().expect("index overflow");
assert!(idx < links.len(), "miscalculated nodes for height");
let Some(l) = &links[idx] else {
// If there's nothing here, mark this as the starting point. We'll start
// scanning here when we iterate.
stack.push(IterStack { node, idx });
break;
};
let sub = match l {
Link::Dirty(sub) => sub,
Link::Cid { cid, cache } => cache.get_or_try_init(|| {
self.block_store
.get_cbor::<CollapsedNode<V>>(cid)?
.ok_or_else(|| Error::CidNotFound(cid.to_string()))?
.expand(self.bit_width())
.map(Box::new)
})?,
};
// Push idx+1 because we've already processed this node.
stack.push(IterStack { node, idx: idx + 1 });
node = sub;
offset += idx as u64 * nfh;
}
}
}
Ok(Iter {
stack,
height,
bit_width,
blockstore: &self.block_store,
bit_width: self.bit_width(),
ver: PhantomData,
key: 0,
}
key: start,
})
}
}

Expand All @@ -77,6 +172,27 @@ pub struct Iter<'a, V, BS, Ver> {
key: u64,
}

impl<'a, V, BS, Ver> Iter<'a, V, &'a BS, Ver> {
pub(crate) fn new(
node: &'a Node<V>,
blockstore: &'a BS,
height: u32,
bit_width: u32,
offset: u64,
) -> Self {
let mut stack = Vec::with_capacity(height as usize);
stack.push(IterStack { node, idx: 0 });
Iter {
stack,
height,
blockstore,
bit_width,
ver: PhantomData,
key: offset,
}
}
}

pub struct IterStack<'a, V> {
pub(crate) node: &'a Node<V>,
pub(crate) idx: usize,
Expand Down
Loading

0 comments on commit daefd6b

Please sign in to comment.