Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(docs): prevent deadlocks with streams returned from docs actor #2346

Merged
merged 4 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions iroh-docs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ ed25519-dalek = { version = "2.0.0", features = ["serde", "rand_core"] }
flume = "0.11"
futures-buffered = "0.2.4"
futures-lite = "2.3.0"
futures-util = { version = "0.3.25", optional = true }
futures-util = { version = "0.3.25" }
hex = "0.4"
iroh-base = { version = "0.17.0", path = "../iroh-base" }
iroh-blobs = { version = "0.17.0", path = "../iroh-blobs", optional = true, features = ["downloader"] }
Expand All @@ -42,7 +42,7 @@ serde = { version = "1.0.164", features = ["derive"] }
strum = { version = "0.25", features = ["derive"] }
tempfile = { version = "3.4" }
thiserror = "1"
tokio = { version = "1", features = ["sync"] }
tokio = { version = "1", features = ["sync", "rt", "time", "macros"] }
tokio-stream = { version = "0.1", optional = true, features = ["sync"]}
tokio-util = { version = "0.7", optional = true, features = ["codec", "io-util", "io"] }
tracing = "0.1"
Expand All @@ -57,7 +57,7 @@ test-strategy = "0.3.1"

[features]
default = ["net", "metrics", "engine"]
net = ["dep:iroh-net", "tokio/io-util", "dep:tokio-stream", "dep:tokio-util", "dep:futures-util"]
net = ["dep:iroh-net", "tokio/io-util", "dep:tokio-stream", "dep:tokio-util"]
metrics = ["dep:iroh-metrics"]
engine = ["net", "dep:iroh-gossip", "dep:iroh-blobs"]

Expand Down
69 changes: 51 additions & 18 deletions iroh-docs/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ use std::{

use anyhow::{anyhow, Context, Result};
use bytes::Bytes;
use futures_util::FutureExt;
use iroh_base::hash::Hash;
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot;
use tokio::{sync::oneshot, task::JoinSet};
use tracing::{debug, error, error_span, trace, warn};

use crate::{
Expand Down Expand Up @@ -253,6 +254,7 @@ impl SyncHandle {
states: Default::default(),
action_rx,
content_status_callback,
tasks: Default::default(),
};
let join_handle = std::thread::Builder::new()
.name("sync-actor".to_string())
Expand Down Expand Up @@ -570,22 +572,37 @@ struct Actor {
states: OpenReplicas,
action_rx: flume::Receiver<Action>,
content_status_callback: Option<ContentStatusCallback>,
tasks: JoinSet<()>,
}

impl Actor {
fn run(mut self) -> Result<()> {
fn run(self) -> Result<()> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()?;
let local_set = tokio::task::LocalSet::new();
local_set.block_on(&rt, async move { self.run_async().await })
}
async fn run_async(mut self) -> Result<()> {
loop {
let action = match self.action_rx.recv_timeout(MAX_COMMIT_DELAY) {
Ok(action) => action,
Err(flume::RecvTimeoutError::Timeout) => {
let timeout = tokio::time::sleep(MAX_COMMIT_DELAY);
tokio::pin!(timeout);
let action = tokio::select! {
_ = &mut timeout => {
if let Err(cause) = self.store.flush() {
error!(?cause, "failed to flush store");
}
continue;
}
Err(flume::RecvTimeoutError::Disconnected) => {
debug!("action channel disconnected");
break;
action = self.action_rx.recv_async() => {
match action {
Ok(action) => action,
Err(flume::RecvError::Disconnected) => {
debug!("action channel disconnected");
break;
}

}
}
};
trace!(%action, "tick");
Expand All @@ -607,6 +624,7 @@ impl Actor {
}
}
}
self.tasks.abort_all();
debug!("shutdown");
Ok(())
}
Expand Down Expand Up @@ -636,13 +654,21 @@ impl Actor {
}
Ok(id)
}),
Action::ListAuthors { reply } => iter_to_channel(
reply,
self.store
Action::ListAuthors { reply } => {
let iter = self
.store
.list_authors()
.map(|a| a.map(|a| a.map(|a| a.id()))),
),
Action::ListReplicas { reply } => iter_to_channel(reply, self.store.list_namespaces()),
.map(|a| a.map(|a| a.map(|a| a.id())));
self.tasks
.spawn_local(iter_to_channel_async(reply, iter).map(|_| ()));
Ok(())
}
Action::ListReplicas { reply } => {
let iter = self.store.list_namespaces();
self.tasks
.spawn_local(iter_to_channel_async(reply, iter).map(|_| ()));
Ok(())
}
Action::ContentHashes { reply } => {
send_reply_with(reply, self, |this| this.store.content_hashes())
}
Expand All @@ -657,7 +683,9 @@ impl Actor {
) -> Result<(), SendReplyError> {
match action {
ReplicaAction::Open { reply, opts } => {
tracing::trace!("open in");
let res = self.open(namespace, opts);
tracing::trace!("open out");
send_reply(reply, res)
}
ReplicaAction::Close { reply } => {
Expand Down Expand Up @@ -759,7 +787,9 @@ impl Actor {
.states
.ensure_open(&namespace)
.and_then(|_| self.store.get_many(namespace, query));
iter_to_channel(reply, iter)
self.tasks
.spawn_local(iter_to_channel_async(reply, iter).map(|_| ()));
Ok(())
}
ReplicaAction::DropReplica { reply } => send_reply_with(reply, self, |this| {
this.close(namespace);
Expand Down Expand Up @@ -921,15 +951,18 @@ impl OpenReplicas {
}
}

fn iter_to_channel<T: Send + 'static>(
async fn iter_to_channel_async<T: Send + 'static>(
channel: flume::Sender<Result<T>>,
iter: Result<impl Iterator<Item = Result<T>>>,
) -> Result<(), SendReplyError> {
match iter {
Err(err) => channel.send(Err(err)).map_err(send_reply_error)?,
Err(err) => channel
.send_async(Err(err))
.await
.map_err(send_reply_error)?,
Ok(iter) => {
for item in iter {
channel.send(item).map_err(send_reply_error)?;
channel.send_async(item).await.map_err(send_reply_error)?;
}
}
}
Expand Down
88 changes: 44 additions & 44 deletions iroh-docs/src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,22 @@ impl Store {
}
}

/// Get an owned read-only snapshot of the database.
///
/// This will open a new read transaction. The read transaction won't be reused for other
/// reads.
///
/// This has the side effect of committing any open write transaction,
/// so it can be used as a way to ensure that the data is persisted.
pub fn snapshot_owned(&mut self) -> Result<ReadOnlyTables> {
// make sure the current transaction is committed
self.flush()?;
assert!(matches!(self.transaction, CurrentTransaction::None));
Copy link
Contributor

@rklaehn rklaehn Jun 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we are already in a read transaction? (We don't use that frequently, but maybe we should).

Then flush is a noop and this assertion will fail as far as I can see.

E.g. you want multiple iterators without writing something.

Ah, never mind. Flush takes the transaction. But we should maybe not use flush here but just ensure that we are in read mode...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should be able to just open another read txn if you are already in read mode, and then wrap that in a ReadonlyTables...

let tx = self.db.begin_read()?;
let tables = ReadOnlyTables::new(tx)?;
Ok(tables)
}

/// Get access to the tables to read from them.
///
/// The underlying transaction is a write transaction, but with a non-mut
Expand Down Expand Up @@ -223,8 +239,6 @@ impl Store {
}
}

type AuthorsIter = std::vec::IntoIter<Result<Author>>;
type NamespaceIter = std::vec::IntoIter<Result<(NamespaceId, CapabilityKind)>>;
type PeersIter = std::vec::IntoIter<PeerIdBytes>;

impl Store {
Expand Down Expand Up @@ -297,18 +311,16 @@ impl Store {
}

/// List all replica namespaces in this store.
pub fn list_namespaces(&mut self) -> Result<NamespaceIter> {
// TODO: avoid collect
let tables = self.tables()?;
let namespaces: Vec<_> = tables
.namespaces
.iter()?
.map(|res| {
let capability = parse_capability(res?.1.value())?;
Ok((capability.id(), capability.kind()))
})
.collect();
Ok(namespaces.into_iter())
pub fn list_namespaces(
&mut self,
) -> Result<impl Iterator<Item = Result<(NamespaceId, CapabilityKind)>>> {
let snapshot = self.snapshot()?;
let iter = snapshot.namespaces.range::<&'static [u8; 32]>(..)?;
let iter = iter.map(|res| {
let capability = parse_capability(res?.1.value())?;
Ok((capability.id(), capability.kind()))
});
Ok(iter)
}

/// Get an author key from the store.
Expand Down Expand Up @@ -340,19 +352,16 @@ impl Store {
}

/// List all author keys in this store.
pub fn list_authors(&mut self) -> Result<AuthorsIter> {
// TODO: avoid collect
let tables = self.tables()?;
let authors: Vec<_> = tables
pub fn list_authors(&mut self) -> Result<impl Iterator<Item = Result<Author>>> {
let tables = self.snapshot()?;
let iter = tables
.authors
.iter()?
.range::<&'static [u8; 32]>(..)?
.map(|res| match res {
Ok((_key, value)) => Ok(Author::from_bytes(value.value())),
Err(err) => Err(err.into()),
})
.collect();

Ok(authors.into_iter())
});
Ok(iter)
}

/// Import a new replica namespace.
Expand Down Expand Up @@ -413,7 +422,8 @@ impl Store {
namespace: NamespaceId,
query: impl Into<Query>,
) -> Result<QueryIterator> {
QueryIterator::new(self.tables()?, namespace, query.into())
let tables = self.snapshot_owned()?;
QueryIterator::new(tables, namespace, query.into())
}

/// Get an entry by key and author.
Expand All @@ -435,13 +445,8 @@ impl Store {

/// Get all content hashes of all replicas in the store.
pub fn content_hashes(&mut self) -> Result<ContentHashesIterator> {
// make sure the current transaction is committed
self.flush()?;
assert!(matches!(self.transaction, CurrentTransaction::None));
let tx = self.db.begin_read()?;
let tables = ReadOnlyTables::new(tx)?;
let records = tables.records;
ContentHashesIterator::all(records)
let tables = self.snapshot_owned()?;
ContentHashesIterator::all(&tables.records)
}

/// Get the latest entry for each author in a namespace.
Expand Down Expand Up @@ -870,14 +875,6 @@ impl Iterator for ParentIterator {
}
}

self_cell::self_cell!(
struct ContentHashesIteratorInner {
owner: RecordsTable,
#[covariant]
dependent: RecordsRange,
}
);

/// Iterator for all content hashes
///
/// Note that you might get duplicate hashes. Also, the iterator will keep
Expand All @@ -886,21 +883,24 @@ self_cell::self_cell!(
/// Also, this represents a snapshot of the database at the time of creation.
/// It nees a copy of a redb::ReadOnlyTable to be self-contained.
#[derive(derive_more::Debug)]
pub struct ContentHashesIterator(#[debug(skip)] ContentHashesIteratorInner);
pub struct ContentHashesIterator {
#[debug(skip)]
range: RecordsRange<'static>,
}

impl ContentHashesIterator {
/// Create a new iterator over all content hashes.
pub fn all(owner: RecordsTable) -> anyhow::Result<Self> {
let inner = ContentHashesIteratorInner::try_new(owner, |owner| RecordsRange::all(owner))?;
Ok(Self(inner))
pub fn all(table: &RecordsTable) -> anyhow::Result<Self> {
let range = RecordsRange::all_static(table)?;
Ok(Self { range })
}
}

impl Iterator for ContentHashesIterator {
type Item = Result<Hash>;

fn next(&mut self) -> Option<Self::Item> {
let v = self.0.with_dependent_mut(|_, d| d.next())?;
let v = self.range.next()?;
Some(v.map(|e| e.content_hash()))
}
}
Expand Down
Loading
Loading