diff --git a/iroh-docs/Cargo.toml b/iroh-docs/Cargo.toml index 005d2e1ea3..f08c97fee0 100644 --- a/iroh-docs/Cargo.toml +++ b/iroh-docs/Cargo.toml @@ -23,7 +23,7 @@ ed25519-dalek = { version = "2.0.0", features = ["serde", "rand_core"] } flume = "0.11" futures-buffered = "0.2.4" futures-lite = "2.3.0" -futures-util = { version = "0.3.25", optional = true } +futures-util = { version = "0.3.25" } hex = "0.4" iroh-base = { version = "0.17.0", path = "../iroh-base" } iroh-blobs = { version = "0.17.0", path = "../iroh-blobs", optional = true, features = ["downloader"] } @@ -42,7 +42,7 @@ serde = { version = "1.0.164", features = ["derive"] } strum = { version = "0.25", features = ["derive"] } tempfile = { version = "3.4" } thiserror = "1" -tokio = { version = "1", features = ["sync"] } +tokio = { version = "1", features = ["sync", "rt", "time", "macros"] } tokio-stream = { version = "0.1", optional = true, features = ["sync"]} tokio-util = { version = "0.7", optional = true, features = ["codec", "io-util", "io"] } tracing = "0.1" @@ -57,7 +57,7 @@ test-strategy = "0.3.1" [features] default = ["net", "metrics", "engine"] -net = ["dep:iroh-net", "tokio/io-util", "dep:tokio-stream", "dep:tokio-util", "dep:futures-util"] +net = ["dep:iroh-net", "tokio/io-util", "dep:tokio-stream", "dep:tokio-util"] metrics = ["dep:iroh-metrics"] engine = ["net", "dep:iroh-gossip", "dep:iroh-blobs"] diff --git a/iroh-docs/src/actor.rs b/iroh-docs/src/actor.rs index bbe91181cb..a48e8f55b3 100644 --- a/iroh-docs/src/actor.rs +++ b/iroh-docs/src/actor.rs @@ -10,9 +10,10 @@ use std::{ use anyhow::{anyhow, Context, Result}; use bytes::Bytes; +use futures_util::FutureExt; use iroh_base::hash::Hash; use serde::{Deserialize, Serialize}; -use tokio::sync::oneshot; +use tokio::{sync::oneshot, task::JoinSet}; use tracing::{debug, error, error_span, trace, warn}; use crate::{ @@ -253,6 +254,7 @@ impl SyncHandle { states: Default::default(), action_rx, content_status_callback, + tasks: Default::default(), }; let join_handle = std::thread::Builder::new() .name("sync-actor".to_string()) @@ -570,22 +572,37 @@ struct Actor { states: OpenReplicas, action_rx: flume::Receiver, content_status_callback: Option, + tasks: JoinSet<()>, } impl Actor { - fn run(mut self) -> Result<()> { + fn run(self) -> Result<()> { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_time() + .build()?; + let local_set = tokio::task::LocalSet::new(); + local_set.block_on(&rt, async move { self.run_async().await }) + } + async fn run_async(mut self) -> Result<()> { loop { - let action = match self.action_rx.recv_timeout(MAX_COMMIT_DELAY) { - Ok(action) => action, - Err(flume::RecvTimeoutError::Timeout) => { + let timeout = tokio::time::sleep(MAX_COMMIT_DELAY); + tokio::pin!(timeout); + let action = tokio::select! { + _ = &mut timeout => { if let Err(cause) = self.store.flush() { error!(?cause, "failed to flush store"); } continue; } - Err(flume::RecvTimeoutError::Disconnected) => { - debug!("action channel disconnected"); - break; + action = self.action_rx.recv_async() => { + match action { + Ok(action) => action, + Err(flume::RecvError::Disconnected) => { + debug!("action channel disconnected"); + break; + } + + } } }; trace!(%action, "tick"); @@ -607,6 +624,7 @@ impl Actor { } } } + self.tasks.abort_all(); debug!("shutdown"); Ok(()) } @@ -636,13 +654,21 @@ impl Actor { } Ok(id) }), - Action::ListAuthors { reply } => iter_to_channel( - reply, - self.store + Action::ListAuthors { reply } => { + let iter = self + .store .list_authors() - .map(|a| a.map(|a| a.map(|a| a.id()))), - ), - Action::ListReplicas { reply } => iter_to_channel(reply, self.store.list_namespaces()), + .map(|a| a.map(|a| a.map(|a| a.id()))); + self.tasks + .spawn_local(iter_to_channel_async(reply, iter).map(|_| ())); + Ok(()) + } + Action::ListReplicas { reply } => { + let iter = self.store.list_namespaces(); + self.tasks + .spawn_local(iter_to_channel_async(reply, iter).map(|_| ())); + Ok(()) + } Action::ContentHashes { reply } => { send_reply_with(reply, self, |this| this.store.content_hashes()) } @@ -657,7 +683,9 @@ impl Actor { ) -> Result<(), SendReplyError> { match action { ReplicaAction::Open { reply, opts } => { + tracing::trace!("open in"); let res = self.open(namespace, opts); + tracing::trace!("open out"); send_reply(reply, res) } ReplicaAction::Close { reply } => { @@ -759,7 +787,9 @@ impl Actor { .states .ensure_open(&namespace) .and_then(|_| self.store.get_many(namespace, query)); - iter_to_channel(reply, iter) + self.tasks + .spawn_local(iter_to_channel_async(reply, iter).map(|_| ())); + Ok(()) } ReplicaAction::DropReplica { reply } => send_reply_with(reply, self, |this| { this.close(namespace); @@ -921,15 +951,18 @@ impl OpenReplicas { } } -fn iter_to_channel( +async fn iter_to_channel_async( channel: flume::Sender>, iter: Result>>, ) -> Result<(), SendReplyError> { match iter { - Err(err) => channel.send(Err(err)).map_err(send_reply_error)?, + Err(err) => channel + .send_async(Err(err)) + .await + .map_err(send_reply_error)?, Ok(iter) => { for item in iter { - channel.send(item).map_err(send_reply_error)?; + channel.send_async(item).await.map_err(send_reply_error)?; } } } diff --git a/iroh-docs/src/store/fs.rs b/iroh-docs/src/store/fs.rs index ab1171b756..981143ca86 100644 --- a/iroh-docs/src/store/fs.rs +++ b/iroh-docs/src/store/fs.rs @@ -154,6 +154,22 @@ impl Store { } } + /// Get an owned read-only snapshot of the database. + /// + /// This will open a new read transaction. The read transaction won't be reused for other + /// reads. + /// + /// This has the side effect of committing any open write transaction, + /// so it can be used as a way to ensure that the data is persisted. + pub fn snapshot_owned(&mut self) -> Result { + // make sure the current transaction is committed + self.flush()?; + assert!(matches!(self.transaction, CurrentTransaction::None)); + let tx = self.db.begin_read()?; + let tables = ReadOnlyTables::new(tx)?; + Ok(tables) + } + /// Get access to the tables to read from them. /// /// The underlying transaction is a write transaction, but with a non-mut @@ -223,8 +239,6 @@ impl Store { } } -type AuthorsIter = std::vec::IntoIter>; -type NamespaceIter = std::vec::IntoIter>; type PeersIter = std::vec::IntoIter; impl Store { @@ -297,18 +311,16 @@ impl Store { } /// List all replica namespaces in this store. - pub fn list_namespaces(&mut self) -> Result { - // TODO: avoid collect - let tables = self.tables()?; - let namespaces: Vec<_> = tables - .namespaces - .iter()? - .map(|res| { - let capability = parse_capability(res?.1.value())?; - Ok((capability.id(), capability.kind())) - }) - .collect(); - Ok(namespaces.into_iter()) + pub fn list_namespaces( + &mut self, + ) -> Result>> { + let snapshot = self.snapshot()?; + let iter = snapshot.namespaces.range::<&'static [u8; 32]>(..)?; + let iter = iter.map(|res| { + let capability = parse_capability(res?.1.value())?; + Ok((capability.id(), capability.kind())) + }); + Ok(iter) } /// Get an author key from the store. @@ -340,19 +352,16 @@ impl Store { } /// List all author keys in this store. - pub fn list_authors(&mut self) -> Result { - // TODO: avoid collect - let tables = self.tables()?; - let authors: Vec<_> = tables + pub fn list_authors(&mut self) -> Result>> { + let tables = self.snapshot()?; + let iter = tables .authors - .iter()? + .range::<&'static [u8; 32]>(..)? .map(|res| match res { Ok((_key, value)) => Ok(Author::from_bytes(value.value())), Err(err) => Err(err.into()), - }) - .collect(); - - Ok(authors.into_iter()) + }); + Ok(iter) } /// Import a new replica namespace. @@ -413,7 +422,8 @@ impl Store { namespace: NamespaceId, query: impl Into, ) -> Result { - QueryIterator::new(self.tables()?, namespace, query.into()) + let tables = self.snapshot_owned()?; + QueryIterator::new(tables, namespace, query.into()) } /// Get an entry by key and author. @@ -435,13 +445,8 @@ impl Store { /// Get all content hashes of all replicas in the store. pub fn content_hashes(&mut self) -> Result { - // make sure the current transaction is committed - self.flush()?; - assert!(matches!(self.transaction, CurrentTransaction::None)); - let tx = self.db.begin_read()?; - let tables = ReadOnlyTables::new(tx)?; - let records = tables.records; - ContentHashesIterator::all(records) + let tables = self.snapshot_owned()?; + ContentHashesIterator::all(&tables.records) } /// Get the latest entry for each author in a namespace. @@ -870,14 +875,6 @@ impl Iterator for ParentIterator { } } -self_cell::self_cell!( - struct ContentHashesIteratorInner { - owner: RecordsTable, - #[covariant] - dependent: RecordsRange, - } -); - /// Iterator for all content hashes /// /// Note that you might get duplicate hashes. Also, the iterator will keep @@ -886,13 +883,16 @@ self_cell::self_cell!( /// Also, this represents a snapshot of the database at the time of creation. /// It nees a copy of a redb::ReadOnlyTable to be self-contained. #[derive(derive_more::Debug)] -pub struct ContentHashesIterator(#[debug(skip)] ContentHashesIteratorInner); +pub struct ContentHashesIterator { + #[debug(skip)] + range: RecordsRange<'static>, +} impl ContentHashesIterator { /// Create a new iterator over all content hashes. - pub fn all(owner: RecordsTable) -> anyhow::Result { - let inner = ContentHashesIteratorInner::try_new(owner, |owner| RecordsRange::all(owner))?; - Ok(Self(inner)) + pub fn all(table: &RecordsTable) -> anyhow::Result { + let range = RecordsRange::all_static(table)?; + Ok(Self { range }) } } @@ -900,7 +900,7 @@ impl Iterator for ContentHashesIterator { type Item = Result; fn next(&mut self) -> Option { - let v = self.0.with_dependent_mut(|_, d| d.next())?; + let v = self.range.next()?; Some(v.map(|e| e.content_hash())) } } diff --git a/iroh-docs/src/store/fs/query.rs b/iroh-docs/src/store/fs/query.rs index a73dbcd8e7..f05b4ecfb3 100644 --- a/iroh-docs/src/store/fs/query.rs +++ b/iroh-docs/src/store/fs/query.rs @@ -3,6 +3,7 @@ use iroh_base::hash::Hash; use crate::{ store::{ + fs::tables::ReadOnlyTables, util::{IndexKind, LatestPerKeySelector, SelectorRes}, AuthorFilter, KeyFilter, Query, }, @@ -12,34 +13,33 @@ use crate::{ use super::{ bounds::{ByKeyBounds, RecordsBounds}, ranges::{RecordsByKeyRange, RecordsRange}, - tables::Tables, RecordsValue, }; /// A query iterator for entry queries. #[derive(Debug)] -pub struct QueryIterator<'a> { - range: QueryRange<'a>, +pub struct QueryIterator { + range: QueryRange, query: Query, offset: u64, count: u64, } #[derive(Debug)] -enum QueryRange<'a> { +enum QueryRange { AuthorKey { - range: RecordsRange<'a>, + range: RecordsRange<'static>, key_filter: KeyFilter, }, KeyAuthor { - range: RecordsByKeyRange<'a>, + range: RecordsByKeyRange, author_filter: AuthorFilter, selector: Option, }, } -impl<'a> QueryIterator<'a> { - pub fn new(tables: &'a Tables<'a>, namespace: NamespaceId, query: Query) -> Result { +impl QueryIterator { + pub fn new(tables: ReadOnlyTables, namespace: NamespaceId, query: Query) -> Result { let index_kind = IndexKind::from(&query); let range = match index_kind { IndexKind::AuthorKey { range, key_filter } => { @@ -53,7 +53,7 @@ impl<'a> QueryIterator<'a> { // no author set => full table scan with the provided key filter AuthorFilter::Any => (RecordsBounds::namespace(namespace), key_filter), }; - let range = RecordsRange::with_bounds(&tables.records, bounds)?; + let range = RecordsRange::with_bounds_static(&tables.records, bounds)?; QueryRange::AuthorKey { range, key_filter: filter, @@ -65,11 +65,8 @@ impl<'a> QueryIterator<'a> { latest_per_key, } => { let bounds = ByKeyBounds::new(namespace, &range); - let range = RecordsByKeyRange::with_bounds( - &tables.records_by_key, - &tables.records, - bounds, - )?; + let range = + RecordsByKeyRange::with_bounds(tables.records_by_key, tables.records, bounds)?; let selector = latest_per_key.then(LatestPerKeySelector::default); QueryRange::KeyAuthor { author_filter, @@ -88,7 +85,7 @@ impl<'a> QueryIterator<'a> { } } -impl<'a> Iterator for QueryIterator<'a> { +impl Iterator for QueryIterator { type Item = Result; fn next(&mut self) -> Option> { diff --git a/iroh-docs/src/store/fs/ranges.rs b/iroh-docs/src/store/fs/ranges.rs index 9219c620ac..f28d95ae63 100644 --- a/iroh-docs/src/store/fs/ranges.rs +++ b/iroh-docs/src/store/fs/ranges.rs @@ -1,6 +1,6 @@ //! Ranges and helpers for working with [`redb`] tables -use redb::{Key, Range, ReadableTable, Table, Value}; +use redb::{Key, Range, ReadOnlyTable, ReadableTable, Value}; use crate::{store::SortDirection, SignedEntry}; @@ -74,14 +74,9 @@ impl<'a, K: Key + 'static, V: Value + 'static> RangeExt for Range<'a, K, V #[debug("RecordsRange")] pub struct RecordsRange<'a>(Range<'a, RecordsId<'static>, RecordsValue<'static>>); -impl<'a> RecordsRange<'a> { - pub(super) fn all( - records: &'a impl ReadableTable, RecordsValue<'static>>, - ) -> anyhow::Result { - let range = records.range::>(..)?; - Ok(Self(range)) - } +// pub type RecordsRange<'a> = Range<'a, RecordsId<'static>, RecordsValue<'static>>; +impl<'a> RecordsRange<'a> { pub(super) fn with_bounds( records: &'a impl ReadableTable, RecordsValue<'static>>, bounds: RecordsBounds, @@ -90,6 +85,7 @@ impl<'a> RecordsRange<'a> { Ok(Self(range)) } + // /// Get the next item in the range. /// /// Omit items for which the `matcher` function returns false. @@ -103,6 +99,22 @@ impl<'a> RecordsRange<'a> { } } +impl RecordsRange<'static> { + pub(super) fn all_static( + records: &ReadOnlyTable, RecordsValue<'static>>, + ) -> anyhow::Result { + let range = records.range::>(..)?; + Ok(Self(range)) + } + pub(super) fn with_bounds_static( + records: &ReadOnlyTable, RecordsValue<'static>>, + bounds: RecordsBounds, + ) -> anyhow::Result { + let range = records.range(bounds.as_ref())?; + Ok(Self(range)) + } +} + impl<'a> Iterator for RecordsRange<'a> { type Item = anyhow::Result; fn next(&mut self) -> Option { @@ -112,15 +124,15 @@ impl<'a> Iterator for RecordsRange<'a> { #[derive(derive_more::Debug)] #[debug("RecordsByKeyRange")] -pub struct RecordsByKeyRange<'a> { - records_table: &'a Table<'a, RecordsId<'static>, RecordsValue<'static>>, - by_key_range: Range<'a, RecordsByKeyId<'static>, ()>, +pub struct RecordsByKeyRange { + records_table: ReadOnlyTable, RecordsValue<'static>>, + by_key_range: Range<'static, RecordsByKeyId<'static>, ()>, } -impl<'a> RecordsByKeyRange<'a> { +impl RecordsByKeyRange { pub fn with_bounds( - records_by_key_table: &'a impl ReadableTable, ()>, - records_table: &'a Table<'a, RecordsId<'static>, RecordsValue<'static>>, + records_by_key_table: ReadOnlyTable, ()>, + records_table: ReadOnlyTable, RecordsValue<'static>>, bounds: ByKeyBounds, ) -> anyhow::Result { let by_key_range = records_by_key_table.range(bounds.as_ref())?; diff --git a/iroh/tests/sync.rs b/iroh/tests/sync.rs index 556f5829a7..afa2591588 100644 --- a/iroh/tests/sync.rs +++ b/iroh/tests/sync.rs @@ -973,6 +973,44 @@ async fn sync_big() -> Result<()> { Ok(()) } +#[tokio::test] +#[cfg(feature = "test-utils")] +async fn test_list_docs_stream() -> Result<()> { + let node = Node::memory() + .node_discovery(iroh::node::DiscoveryConfig::None) + .relay_mode(iroh::net::relay::RelayMode::Disabled) + .spawn() + .await?; + let count = 200; + + // create docs + for _i in 0..count { + let doc = node.docs.create().await?; + doc.close().await?; + } + + // create doc stream + let mut stream = node.docs.list().await?; + + // process each doc and call into the docs actor. + // this makes sure that we don't deadlock the docs actor. + let mut i = 0; + let fut = async { + while let Some((id, _)) = stream.try_next().await.unwrap() { + let _doc = node.docs.open(id).await.unwrap().unwrap(); + i += 1; + } + }; + + tokio::time::timeout(Duration::from_secs(2), fut) + .await + .expect("not to timeout"); + + assert_eq!(i, count); + + Ok(()) +} + /// Get all entries of a document. async fn get_all(doc: &MemDoc) -> anyhow::Result> { let entries = doc.get_many(Query::all()).await?;