From 0addb8306f5c002e8318150bf23863b7434fd5db Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Tue, 4 Jun 2024 23:09:38 +0200 Subject: [PATCH 1/4] fix: deadlock for list_docs --- iroh-docs/src/actor.rs | 67 +++++++++++++++++++++++++++++++++------ iroh-docs/src/store/fs.rs | 33 +++++++++++-------- iroh/src/node/rpc/docs.rs | 2 ++ 3 files changed, 79 insertions(+), 23 deletions(-) diff --git a/iroh-docs/src/actor.rs b/iroh-docs/src/actor.rs index bbe91181cb..cba2c2bec8 100644 --- a/iroh-docs/src/actor.rs +++ b/iroh-docs/src/actor.rs @@ -10,9 +10,10 @@ use std::{ use anyhow::{anyhow, Context, Result}; use bytes::Bytes; +use futures_util::FutureExt; use iroh_base::hash::Hash; use serde::{Deserialize, Serialize}; -use tokio::sync::oneshot; +use tokio::{sync::oneshot, task::JoinSet}; use tracing::{debug, error, error_span, trace, warn}; use crate::{ @@ -253,6 +254,7 @@ impl SyncHandle { states: Default::default(), action_rx, content_status_callback, + tasks: Default::default(), }; let join_handle = std::thread::Builder::new() .name("sync-actor".to_string()) @@ -275,8 +277,12 @@ impl SyncHandle { pub async fn open(&self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> { let (reply, rx) = oneshot::channel(); let action = ReplicaAction::Open { reply, opts }; + tracing::debug!("SyncHandle::open IN"); self.send_replica(namespace, action).await?; - rx.await? + tracing::debug!("SyncHandle::open MID"); + let res = rx.await?; + tracing::debug!("SyncHandle::open OUT"); + res } pub async fn close(&self, namespace: NamespaceId) -> Result { @@ -570,22 +576,37 @@ struct Actor { states: OpenReplicas, action_rx: flume::Receiver, content_status_callback: Option, + tasks: JoinSet<()>, } impl Actor { - fn run(mut self) -> Result<()> { + fn run(self) -> Result<()> { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_time() + .build()?; + let local_set = tokio::task::LocalSet::new(); + local_set.block_on(&rt, async move { self.run_async().await }) + } + async fn run_async(mut self) -> Result<()> { loop { - let action = match self.action_rx.recv_timeout(MAX_COMMIT_DELAY) { - Ok(action) => action, - Err(flume::RecvTimeoutError::Timeout) => { + let timeout = tokio::time::sleep(MAX_COMMIT_DELAY); + tokio::pin!(timeout); + let action = tokio::select! { + _ = &mut timeout => { if let Err(cause) = self.store.flush() { error!(?cause, "failed to flush store"); } continue; } - Err(flume::RecvTimeoutError::Disconnected) => { - debug!("action channel disconnected"); - break; + action = self.action_rx.recv_async() => { + match action { + Ok(action) => action, + Err(flume::RecvError::Disconnected) => { + debug!("action channel disconnected"); + break; + } + + } } }; trace!(%action, "tick"); @@ -607,6 +628,7 @@ impl Actor { } } } + self.tasks.abort_all(); debug!("shutdown"); Ok(()) } @@ -642,7 +664,12 @@ impl Actor { .list_authors() .map(|a| a.map(|a| a.map(|a| a.id()))), ), - Action::ListReplicas { reply } => iter_to_channel(reply, self.store.list_namespaces()), + Action::ListReplicas { reply } => { + let iter = self.store.list_namespaces(); + self.tasks + .spawn_local(iter_to_channel_async(reply, iter).map(|_| ())); + Ok(()) + } Action::ContentHashes { reply } => { send_reply_with(reply, self, |this| this.store.content_hashes()) } @@ -657,7 +684,9 @@ impl Actor { ) -> Result<(), SendReplyError> { match action { ReplicaAction::Open { reply, opts } => { + tracing::trace!("open in"); let res = self.open(namespace, opts); + tracing::trace!("open out"); send_reply(reply, res) } ReplicaAction::Close { reply } => { @@ -936,6 +965,24 @@ fn iter_to_channel( Ok(()) } +async fn iter_to_channel_async( + channel: flume::Sender>, + iter: Result>>, +) -> Result<(), SendReplyError> { + match iter { + Err(err) => channel + .send_async(Err(err)) + .await + .map_err(send_reply_error)?, + Ok(iter) => { + for item in iter { + channel.send_async(item).await.map_err(send_reply_error)?; + } + } + } + Ok(()) +} + fn get_author(store: &mut Store, id: &AuthorId) -> Result { store.get_author(id)?.context("author not found") } diff --git a/iroh-docs/src/store/fs.rs b/iroh-docs/src/store/fs.rs index ab1171b756..a81b38a779 100644 --- a/iroh-docs/src/store/fs.rs +++ b/iroh-docs/src/store/fs.rs @@ -224,7 +224,6 @@ impl Store { } type AuthorsIter = std::vec::IntoIter>; -type NamespaceIter = std::vec::IntoIter>; type PeersIter = std::vec::IntoIter; impl Store { @@ -297,18 +296,26 @@ impl Store { } /// List all replica namespaces in this store. - pub fn list_namespaces(&mut self) -> Result { - // TODO: avoid collect - let tables = self.tables()?; - let namespaces: Vec<_> = tables - .namespaces - .iter()? - .map(|res| { - let capability = parse_capability(res?.1.value())?; - Ok((capability.id(), capability.kind())) - }) - .collect(); - Ok(namespaces.into_iter()) + pub fn list_namespaces( + &mut self, + ) -> Result>> { + let snapshot = self.snapshot()?; + let iter = snapshot.namespaces.range::<&'static [u8; 32]>(..)?; + let iter = iter.map(|res| { + let capability = parse_capability(res?.1.value())?; + Ok((capability.id(), capability.kind())) + }); + Ok(iter) + // let tables = self.tables()?; + // let namespaces: Vec<_> = tables + // .namespaces + // .iter()? + // .map(|res| { + // let capability = parse_capability(res?.1.value())?; + // Ok((capability.id(), capability.kind())) + // }) + // .collect(); + // Ok(namespaces.into_iter()) } /// Get an author key from the store. diff --git a/iroh/src/node/rpc/docs.rs b/iroh/src/node/rpc/docs.rs index a0433a803e..2b64bf0b3c 100644 --- a/iroh/src/node/rpc/docs.rs +++ b/iroh/src/node/rpc/docs.rs @@ -126,7 +126,9 @@ impl DocsEngine { } pub async fn doc_open(&self, req: DocOpenRequest) -> RpcResult { + tracing::debug!("doc_open IN"); self.sync.open(req.doc_id, Default::default()).await?; + tracing::debug!("doc_open OUT"); Ok(DocOpenResponse {}) } From 04137daa359919539b9942b3e4688671ba8063bd Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Wed, 5 Jun 2024 00:03:09 +0200 Subject: [PATCH 2/4] fix: make all iterators static and spawn them on tasks --- iroh-docs/src/actor.rs | 32 +++++++------------- iroh-docs/src/store/fs.rs | 50 ++++++++++++++------------------ iroh-docs/src/store/fs/query.rs | 27 ++++++++--------- iroh-docs/src/store/fs/ranges.rs | 40 ++++++++++++++++--------- 4 files changed, 71 insertions(+), 78 deletions(-) diff --git a/iroh-docs/src/actor.rs b/iroh-docs/src/actor.rs index cba2c2bec8..6e8cef8f6e 100644 --- a/iroh-docs/src/actor.rs +++ b/iroh-docs/src/actor.rs @@ -658,12 +658,15 @@ impl Actor { } Ok(id) }), - Action::ListAuthors { reply } => iter_to_channel( - reply, - self.store + Action::ListAuthors { reply } => { + let iter = self + .store .list_authors() - .map(|a| a.map(|a| a.map(|a| a.id()))), - ), + .map(|a| a.map(|a| a.map(|a| a.id()))); + self.tasks + .spawn_local(iter_to_channel_async(reply, iter).map(|_| ())); + Ok(()) + } Action::ListReplicas { reply } => { let iter = self.store.list_namespaces(); self.tasks @@ -788,7 +791,9 @@ impl Actor { .states .ensure_open(&namespace) .and_then(|_| self.store.get_many(namespace, query)); - iter_to_channel(reply, iter) + self.tasks + .spawn_local(iter_to_channel_async(reply, iter).map(|_| ())); + Ok(()) } ReplicaAction::DropReplica { reply } => send_reply_with(reply, self, |this| { this.close(namespace); @@ -950,21 +955,6 @@ impl OpenReplicas { } } -fn iter_to_channel( - channel: flume::Sender>, - iter: Result>>, -) -> Result<(), SendReplyError> { - match iter { - Err(err) => channel.send(Err(err)).map_err(send_reply_error)?, - Ok(iter) => { - for item in iter { - channel.send(item).map_err(send_reply_error)?; - } - } - } - Ok(()) -} - async fn iter_to_channel_async( channel: flume::Sender>, iter: Result>>, diff --git a/iroh-docs/src/store/fs.rs b/iroh-docs/src/store/fs.rs index a81b38a779..6e2cc2ddcc 100644 --- a/iroh-docs/src/store/fs.rs +++ b/iroh-docs/src/store/fs.rs @@ -38,7 +38,7 @@ pub(crate) mod tables; use self::{ bounds::{ByKeyBounds, RecordsBounds}, - ranges::RangeExt, + ranges::{RangeExt, RecordsRange}, tables::{RecordsTable, TransactionAndTables}, }; use self::{ @@ -48,8 +48,6 @@ use self::{ }, }; -pub use self::ranges::RecordsRange; - /// Manages the replicas and authors for an instance. #[derive(Debug)] pub struct Store { @@ -223,7 +221,6 @@ impl Store { } } -type AuthorsIter = std::vec::IntoIter>; type PeersIter = std::vec::IntoIter; impl Store { @@ -347,19 +344,16 @@ impl Store { } /// List all author keys in this store. - pub fn list_authors(&mut self) -> Result { - // TODO: avoid collect - let tables = self.tables()?; - let authors: Vec<_> = tables + pub fn list_authors(&mut self) -> Result>> { + let tables = self.snapshot()?; + let iter = tables .authors - .iter()? + .range::<&'static [u8; 32]>(..)? .map(|res| match res { Ok((_key, value)) => Ok(Author::from_bytes(value.value())), Err(err) => Err(err.into()), - }) - .collect(); - - Ok(authors.into_iter()) + }); + Ok(iter) } /// Import a new replica namespace. @@ -420,7 +414,12 @@ impl Store { namespace: NamespaceId, query: impl Into, ) -> Result { - QueryIterator::new(self.tables()?, namespace, query.into()) + // make sure the current transaction is committed + self.flush()?; + assert!(matches!(self.transaction, CurrentTransaction::None)); + let tx = self.db.begin_read()?; + let tables = ReadOnlyTables::new(tx)?; + QueryIterator::new(tables, namespace, query.into()) } /// Get an entry by key and author. @@ -448,7 +447,7 @@ impl Store { let tx = self.db.begin_read()?; let tables = ReadOnlyTables::new(tx)?; let records = tables.records; - ContentHashesIterator::all(records) + ContentHashesIterator::all(&records) } /// Get the latest entry for each author in a namespace. @@ -877,14 +876,6 @@ impl Iterator for ParentIterator { } } -self_cell::self_cell!( - struct ContentHashesIteratorInner { - owner: RecordsTable, - #[covariant] - dependent: RecordsRange, - } -); - /// Iterator for all content hashes /// /// Note that you might get duplicate hashes. Also, the iterator will keep @@ -893,13 +884,16 @@ self_cell::self_cell!( /// Also, this represents a snapshot of the database at the time of creation. /// It nees a copy of a redb::ReadOnlyTable to be self-contained. #[derive(derive_more::Debug)] -pub struct ContentHashesIterator(#[debug(skip)] ContentHashesIteratorInner); +pub struct ContentHashesIterator { + #[debug(skip)] + range: RecordsRange<'static>, +} impl ContentHashesIterator { /// Create a new iterator over all content hashes. - pub fn all(owner: RecordsTable) -> anyhow::Result { - let inner = ContentHashesIteratorInner::try_new(owner, |owner| RecordsRange::all(owner))?; - Ok(Self(inner)) + pub fn all(table: &RecordsTable) -> anyhow::Result { + let range = RecordsRange::all_static(&table)?; + Ok(Self { range }) } } @@ -907,7 +901,7 @@ impl Iterator for ContentHashesIterator { type Item = Result; fn next(&mut self) -> Option { - let v = self.0.with_dependent_mut(|_, d| d.next())?; + let v = self.range.next()?; Some(v.map(|e| e.content_hash())) } } diff --git a/iroh-docs/src/store/fs/query.rs b/iroh-docs/src/store/fs/query.rs index a73dbcd8e7..f05b4ecfb3 100644 --- a/iroh-docs/src/store/fs/query.rs +++ b/iroh-docs/src/store/fs/query.rs @@ -3,6 +3,7 @@ use iroh_base::hash::Hash; use crate::{ store::{ + fs::tables::ReadOnlyTables, util::{IndexKind, LatestPerKeySelector, SelectorRes}, AuthorFilter, KeyFilter, Query, }, @@ -12,34 +13,33 @@ use crate::{ use super::{ bounds::{ByKeyBounds, RecordsBounds}, ranges::{RecordsByKeyRange, RecordsRange}, - tables::Tables, RecordsValue, }; /// A query iterator for entry queries. #[derive(Debug)] -pub struct QueryIterator<'a> { - range: QueryRange<'a>, +pub struct QueryIterator { + range: QueryRange, query: Query, offset: u64, count: u64, } #[derive(Debug)] -enum QueryRange<'a> { +enum QueryRange { AuthorKey { - range: RecordsRange<'a>, + range: RecordsRange<'static>, key_filter: KeyFilter, }, KeyAuthor { - range: RecordsByKeyRange<'a>, + range: RecordsByKeyRange, author_filter: AuthorFilter, selector: Option, }, } -impl<'a> QueryIterator<'a> { - pub fn new(tables: &'a Tables<'a>, namespace: NamespaceId, query: Query) -> Result { +impl QueryIterator { + pub fn new(tables: ReadOnlyTables, namespace: NamespaceId, query: Query) -> Result { let index_kind = IndexKind::from(&query); let range = match index_kind { IndexKind::AuthorKey { range, key_filter } => { @@ -53,7 +53,7 @@ impl<'a> QueryIterator<'a> { // no author set => full table scan with the provided key filter AuthorFilter::Any => (RecordsBounds::namespace(namespace), key_filter), }; - let range = RecordsRange::with_bounds(&tables.records, bounds)?; + let range = RecordsRange::with_bounds_static(&tables.records, bounds)?; QueryRange::AuthorKey { range, key_filter: filter, @@ -65,11 +65,8 @@ impl<'a> QueryIterator<'a> { latest_per_key, } => { let bounds = ByKeyBounds::new(namespace, &range); - let range = RecordsByKeyRange::with_bounds( - &tables.records_by_key, - &tables.records, - bounds, - )?; + let range = + RecordsByKeyRange::with_bounds(tables.records_by_key, tables.records, bounds)?; let selector = latest_per_key.then(LatestPerKeySelector::default); QueryRange::KeyAuthor { author_filter, @@ -88,7 +85,7 @@ impl<'a> QueryIterator<'a> { } } -impl<'a> Iterator for QueryIterator<'a> { +impl Iterator for QueryIterator { type Item = Result; fn next(&mut self) -> Option> { diff --git a/iroh-docs/src/store/fs/ranges.rs b/iroh-docs/src/store/fs/ranges.rs index 9219c620ac..f28d95ae63 100644 --- a/iroh-docs/src/store/fs/ranges.rs +++ b/iroh-docs/src/store/fs/ranges.rs @@ -1,6 +1,6 @@ //! Ranges and helpers for working with [`redb`] tables -use redb::{Key, Range, ReadableTable, Table, Value}; +use redb::{Key, Range, ReadOnlyTable, ReadableTable, Value}; use crate::{store::SortDirection, SignedEntry}; @@ -74,14 +74,9 @@ impl<'a, K: Key + 'static, V: Value + 'static> RangeExt for Range<'a, K, V #[debug("RecordsRange")] pub struct RecordsRange<'a>(Range<'a, RecordsId<'static>, RecordsValue<'static>>); -impl<'a> RecordsRange<'a> { - pub(super) fn all( - records: &'a impl ReadableTable, RecordsValue<'static>>, - ) -> anyhow::Result { - let range = records.range::>(..)?; - Ok(Self(range)) - } +// pub type RecordsRange<'a> = Range<'a, RecordsId<'static>, RecordsValue<'static>>; +impl<'a> RecordsRange<'a> { pub(super) fn with_bounds( records: &'a impl ReadableTable, RecordsValue<'static>>, bounds: RecordsBounds, @@ -90,6 +85,7 @@ impl<'a> RecordsRange<'a> { Ok(Self(range)) } + // /// Get the next item in the range. /// /// Omit items for which the `matcher` function returns false. @@ -103,6 +99,22 @@ impl<'a> RecordsRange<'a> { } } +impl RecordsRange<'static> { + pub(super) fn all_static( + records: &ReadOnlyTable, RecordsValue<'static>>, + ) -> anyhow::Result { + let range = records.range::>(..)?; + Ok(Self(range)) + } + pub(super) fn with_bounds_static( + records: &ReadOnlyTable, RecordsValue<'static>>, + bounds: RecordsBounds, + ) -> anyhow::Result { + let range = records.range(bounds.as_ref())?; + Ok(Self(range)) + } +} + impl<'a> Iterator for RecordsRange<'a> { type Item = anyhow::Result; fn next(&mut self) -> Option { @@ -112,15 +124,15 @@ impl<'a> Iterator for RecordsRange<'a> { #[derive(derive_more::Debug)] #[debug("RecordsByKeyRange")] -pub struct RecordsByKeyRange<'a> { - records_table: &'a Table<'a, RecordsId<'static>, RecordsValue<'static>>, - by_key_range: Range<'a, RecordsByKeyId<'static>, ()>, +pub struct RecordsByKeyRange { + records_table: ReadOnlyTable, RecordsValue<'static>>, + by_key_range: Range<'static, RecordsByKeyId<'static>, ()>, } -impl<'a> RecordsByKeyRange<'a> { +impl RecordsByKeyRange { pub fn with_bounds( - records_by_key_table: &'a impl ReadableTable, ()>, - records_table: &'a Table<'a, RecordsId<'static>, RecordsValue<'static>>, + records_by_key_table: ReadOnlyTable, ()>, + records_table: ReadOnlyTable, RecordsValue<'static>>, bounds: ByKeyBounds, ) -> anyhow::Result { let by_key_range = records_by_key_table.range(bounds.as_ref())?; From ada1d0a5ea035a58c3eb5ffda0a1e34d9d197d76 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Wed, 5 Jun 2024 00:15:53 +0200 Subject: [PATCH 3/4] fixup & cleanup --- iroh-docs/Cargo.toml | 6 ++--- iroh-docs/src/actor.rs | 6 +---- iroh-docs/src/store/fs.rs | 47 +++++++++++++++++++-------------------- iroh/src/node/rpc/docs.rs | 2 -- 4 files changed, 27 insertions(+), 34 deletions(-) diff --git a/iroh-docs/Cargo.toml b/iroh-docs/Cargo.toml index 005d2e1ea3..f08c97fee0 100644 --- a/iroh-docs/Cargo.toml +++ b/iroh-docs/Cargo.toml @@ -23,7 +23,7 @@ ed25519-dalek = { version = "2.0.0", features = ["serde", "rand_core"] } flume = "0.11" futures-buffered = "0.2.4" futures-lite = "2.3.0" -futures-util = { version = "0.3.25", optional = true } +futures-util = { version = "0.3.25" } hex = "0.4" iroh-base = { version = "0.17.0", path = "../iroh-base" } iroh-blobs = { version = "0.17.0", path = "../iroh-blobs", optional = true, features = ["downloader"] } @@ -42,7 +42,7 @@ serde = { version = "1.0.164", features = ["derive"] } strum = { version = "0.25", features = ["derive"] } tempfile = { version = "3.4" } thiserror = "1" -tokio = { version = "1", features = ["sync"] } +tokio = { version = "1", features = ["sync", "rt", "time", "macros"] } tokio-stream = { version = "0.1", optional = true, features = ["sync"]} tokio-util = { version = "0.7", optional = true, features = ["codec", "io-util", "io"] } tracing = "0.1" @@ -57,7 +57,7 @@ test-strategy = "0.3.1" [features] default = ["net", "metrics", "engine"] -net = ["dep:iroh-net", "tokio/io-util", "dep:tokio-stream", "dep:tokio-util", "dep:futures-util"] +net = ["dep:iroh-net", "tokio/io-util", "dep:tokio-stream", "dep:tokio-util"] metrics = ["dep:iroh-metrics"] engine = ["net", "dep:iroh-gossip", "dep:iroh-blobs"] diff --git a/iroh-docs/src/actor.rs b/iroh-docs/src/actor.rs index 6e8cef8f6e..a48e8f55b3 100644 --- a/iroh-docs/src/actor.rs +++ b/iroh-docs/src/actor.rs @@ -277,12 +277,8 @@ impl SyncHandle { pub async fn open(&self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> { let (reply, rx) = oneshot::channel(); let action = ReplicaAction::Open { reply, opts }; - tracing::debug!("SyncHandle::open IN"); self.send_replica(namespace, action).await?; - tracing::debug!("SyncHandle::open MID"); - let res = rx.await?; - tracing::debug!("SyncHandle::open OUT"); - res + rx.await? } pub async fn close(&self, namespace: NamespaceId) -> Result { diff --git a/iroh-docs/src/store/fs.rs b/iroh-docs/src/store/fs.rs index 6e2cc2ddcc..981143ca86 100644 --- a/iroh-docs/src/store/fs.rs +++ b/iroh-docs/src/store/fs.rs @@ -38,7 +38,7 @@ pub(crate) mod tables; use self::{ bounds::{ByKeyBounds, RecordsBounds}, - ranges::{RangeExt, RecordsRange}, + ranges::RangeExt, tables::{RecordsTable, TransactionAndTables}, }; use self::{ @@ -48,6 +48,8 @@ use self::{ }, }; +pub use self::ranges::RecordsRange; + /// Manages the replicas and authors for an instance. #[derive(Debug)] pub struct Store { @@ -152,6 +154,22 @@ impl Store { } } + /// Get an owned read-only snapshot of the database. + /// + /// This will open a new read transaction. The read transaction won't be reused for other + /// reads. + /// + /// This has the side effect of committing any open write transaction, + /// so it can be used as a way to ensure that the data is persisted. + pub fn snapshot_owned(&mut self) -> Result { + // make sure the current transaction is committed + self.flush()?; + assert!(matches!(self.transaction, CurrentTransaction::None)); + let tx = self.db.begin_read()?; + let tables = ReadOnlyTables::new(tx)?; + Ok(tables) + } + /// Get access to the tables to read from them. /// /// The underlying transaction is a write transaction, but with a non-mut @@ -303,16 +321,6 @@ impl Store { Ok((capability.id(), capability.kind())) }); Ok(iter) - // let tables = self.tables()?; - // let namespaces: Vec<_> = tables - // .namespaces - // .iter()? - // .map(|res| { - // let capability = parse_capability(res?.1.value())?; - // Ok((capability.id(), capability.kind())) - // }) - // .collect(); - // Ok(namespaces.into_iter()) } /// Get an author key from the store. @@ -414,11 +422,7 @@ impl Store { namespace: NamespaceId, query: impl Into, ) -> Result { - // make sure the current transaction is committed - self.flush()?; - assert!(matches!(self.transaction, CurrentTransaction::None)); - let tx = self.db.begin_read()?; - let tables = ReadOnlyTables::new(tx)?; + let tables = self.snapshot_owned()?; QueryIterator::new(tables, namespace, query.into()) } @@ -441,13 +445,8 @@ impl Store { /// Get all content hashes of all replicas in the store. pub fn content_hashes(&mut self) -> Result { - // make sure the current transaction is committed - self.flush()?; - assert!(matches!(self.transaction, CurrentTransaction::None)); - let tx = self.db.begin_read()?; - let tables = ReadOnlyTables::new(tx)?; - let records = tables.records; - ContentHashesIterator::all(&records) + let tables = self.snapshot_owned()?; + ContentHashesIterator::all(&tables.records) } /// Get the latest entry for each author in a namespace. @@ -892,7 +891,7 @@ pub struct ContentHashesIterator { impl ContentHashesIterator { /// Create a new iterator over all content hashes. pub fn all(table: &RecordsTable) -> anyhow::Result { - let range = RecordsRange::all_static(&table)?; + let range = RecordsRange::all_static(table)?; Ok(Self { range }) } } diff --git a/iroh/src/node/rpc/docs.rs b/iroh/src/node/rpc/docs.rs index 2b64bf0b3c..a0433a803e 100644 --- a/iroh/src/node/rpc/docs.rs +++ b/iroh/src/node/rpc/docs.rs @@ -126,9 +126,7 @@ impl DocsEngine { } pub async fn doc_open(&self, req: DocOpenRequest) -> RpcResult { - tracing::debug!("doc_open IN"); self.sync.open(req.doc_id, Default::default()).await?; - tracing::debug!("doc_open OUT"); Ok(DocOpenResponse {}) } From d73f28b4951dd9e6b0c22d6967882eeb2290fe25 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Wed, 5 Jun 2024 15:23:46 +0200 Subject: [PATCH 4/4] add test with many docs --- iroh/tests/sync.rs | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/iroh/tests/sync.rs b/iroh/tests/sync.rs index 556f5829a7..afa2591588 100644 --- a/iroh/tests/sync.rs +++ b/iroh/tests/sync.rs @@ -973,6 +973,44 @@ async fn sync_big() -> Result<()> { Ok(()) } +#[tokio::test] +#[cfg(feature = "test-utils")] +async fn test_list_docs_stream() -> Result<()> { + let node = Node::memory() + .node_discovery(iroh::node::DiscoveryConfig::None) + .relay_mode(iroh::net::relay::RelayMode::Disabled) + .spawn() + .await?; + let count = 200; + + // create docs + for _i in 0..count { + let doc = node.docs.create().await?; + doc.close().await?; + } + + // create doc stream + let mut stream = node.docs.list().await?; + + // process each doc and call into the docs actor. + // this makes sure that we don't deadlock the docs actor. + let mut i = 0; + let fut = async { + while let Some((id, _)) = stream.try_next().await.unwrap() { + let _doc = node.docs.open(id).await.unwrap().unwrap(); + i += 1; + } + }; + + tokio::time::timeout(Duration::from_secs(2), fut) + .await + .expect("not to timeout"); + + assert_eq!(i, count); + + Ok(()) +} + /// Get all entries of a document. async fn get_all(doc: &MemDoc) -> anyhow::Result> { let entries = doc.get_many(Query::all()).await?;