Skip to content

Commit

Permalink
feat: remove node events
Browse files Browse the repository at this point in the history
These seem to be mostly unused by consumers, and add complexity.
  • Loading branch information
dignifiedquire committed May 10, 2024
1 parent ec48b0d commit b777472
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 116 deletions.
68 changes: 6 additions & 62 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::path::Path;
use std::sync::Arc;

use anyhow::{anyhow, Result};
use futures_lite::{future::Boxed as BoxFuture, FutureExt, StreamExt};
use futures_lite::StreamExt;
use iroh_base::ticket::BlobTicket;
use iroh_blobs::downloader::Downloader;
use iroh_blobs::store::Store as BaoStore;
Expand All @@ -26,7 +26,6 @@ use iroh_net::{
};
use quic_rpc::transport::flume::FlumeConnection;
use quic_rpc::RpcClient;
use tokio::sync::{mpsc, RwLock};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tokio_util::task::LocalPoolHandle;
Expand All @@ -42,38 +41,6 @@ mod rpc_status;
pub use self::builder::{Builder, DiscoveryConfig, GcPolicy, StorageConfig};
pub use self::rpc_status::RpcStatus;

type EventCallback = Box<dyn Fn(Event) -> BoxFuture<()> + 'static + Sync + Send>;

#[derive(Default, derive_more::Debug, Clone)]
struct Callbacks(#[debug("..")] Arc<RwLock<Vec<EventCallback>>>);

impl Callbacks {
async fn push(&self, cb: EventCallback) {
self.0.write().await.push(cb);
}

#[allow(dead_code)]
async fn send(&self, event: Event) {
let cbs = self.0.read().await;
for cb in &*cbs {
cb(event.clone()).await;
}
}
}

impl iroh_blobs::provider::EventSender for Callbacks {
fn send(&self, event: iroh_blobs::provider::Event) -> BoxFuture<()> {
let this = self.clone();
async move {
let cbs = this.0.read().await;
for cb in &*cbs {
cb(Event::ByteProvide(event.clone())).await;
}
}
.boxed()
}
}

/// A server which implements the iroh node.
///
/// Clients can connect to this server and requests hashes from it.
Expand All @@ -98,9 +65,6 @@ struct NodeInner<D> {
secret_key: SecretKey,
cancel_token: CancellationToken,
controller: FlumeConnection<Response, Request>,
#[debug("callbacks: Sender<Box<dyn Fn(Event)>>")]
cb_sender: mpsc::Sender<Box<dyn Fn(Event) -> BoxFuture<()> + Send + Sync + 'static>>,
callbacks: Callbacks,
#[allow(dead_code)]
gc_task: Option<AbortingJoinHandle<()>>,
#[debug("rt")]
Expand All @@ -109,15 +73,6 @@ struct NodeInner<D> {
downloader: Downloader,
}

/// Events emitted by the [`Node`] informing about the current status.
#[derive(Debug, Clone)]
pub enum Event {
/// Events from the iroh-blobs transfer protocol.
ByteProvide(iroh_blobs::provider::Event),
/// Events from database
Db(iroh_blobs::store::Event),
}

/// In memory node.
pub type MemNode = Node<iroh_blobs::store::mem::Store>;

Expand Down Expand Up @@ -184,18 +139,6 @@ impl<D: BaoStore> Node<D> {
self.inner.secret_key.public()
}

/// Subscribe to [`Event`]s emitted from the node, informing about connections and
/// progress.
///
/// Warning: The callback must complete quickly, as otherwise it will block ongoing work.
pub async fn subscribe<F: Fn(Event) -> BoxFuture<()> + Send + Sync + 'static>(
&self,
cb: F,
) -> Result<()> {
self.inner.cb_sender.send(Box::new(cb)).await?;
Ok(())
}

/// Returns a handle that can be used to do RPC calls to the node internally.
pub fn controller(&self) -> crate::client::MemRpcClient {
RpcClient::new(self.inner.controller.clone())
Expand Down Expand Up @@ -335,7 +278,7 @@ mod tests {

let _drop_guard = node.cancel_token().drop_guard();

let (r, mut s) = mpsc::channel(1);
/* let (r, mut s) = mpsc::channel(1);
node.subscribe(move |event| {
let r = r.clone();
async move {
Expand All @@ -349,7 +292,8 @@ mod tests {
}
.boxed()
})
.await?;
.await?;
*/

let got_hash = tokio::time::timeout(Duration::from_secs(1), async move {
let mut stream = node
Expand Down Expand Up @@ -380,8 +324,8 @@ mod tests {
.context("timeout")?
.context("get failed")?;

let event_hash = s.recv().await.expect("missing add tagged blob event");
assert_eq!(got_hash, event_hash);
/*let event_hash = s.recv().await.expect("missing add tagged blob event");
assert_eq!(got_hash, event_hash);*/

Ok(())
}
Expand Down
44 changes: 14 additions & 30 deletions iroh/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,18 @@ use quic_rpc::{
RpcServer, ServiceEndpoint,
};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tokio_util::{sync::CancellationToken, task::LocalPoolHandle};
use tracing::{debug, error, error_span, info, trace, warn, Instrument};

use crate::{
client::RPC_ALPN,
docs_engine::Engine,
node::{Event, NodeInner},
node::NodeInner,
rpc_protocol::{Request, Response, RpcService},
util::{fs::load_secret_key, path::IrohPaths},
};

use super::{rpc, rpc_status::RpcStatus, Callbacks, EventCallback, Node};
use super::{rpc, rpc_status::RpcStatus, Node};

pub const PROTOCOLS: [&[u8]; 3] = [iroh_blobs::protocol::ALPN, GOSSIP_ALPN, DOCS_ALPN];

Expand Down Expand Up @@ -406,7 +405,6 @@ where
let endpoint = endpoint.bind(bind_port).await?;
trace!("created quinn endpoint");

let (cb_sender, cb_receiver) = mpsc::channel(8);
let cancel_token = CancellationToken::new();

debug!("rpc listening on: {:?}", self.rpc_endpoint.local_addr());
Expand All @@ -427,12 +425,10 @@ where
);
let sync_db = sync.sync.clone();

let callbacks = Callbacks::default();
let gc_task = if let GcPolicy::Interval(gc_period) = self.gc_policy {
tracing::info!("Starting GC task with interval {:?}", gc_period);
let db = self.blobs_store.clone();
let callbacks = callbacks.clone();
let task = lp.spawn_pinned(move || Self::gc_loop(db, sync_db, gc_period, callbacks));
let task = lp.spawn_pinned(move || Self::gc_loop(db, sync_db, gc_period));
Some(task.into())
} else {
None
Expand All @@ -446,8 +442,6 @@ where
secret_key: self.secret_key,
controller,
cancel_token,
callbacks: callbacks.clone(),
cb_sender,
gc_task,
rt: lp.clone(),
sync,
Expand All @@ -464,8 +458,6 @@ where
async move {
Self::run(
ep,
callbacks,
cb_receiver,
handler,
self.rpc_endpoint,
internal_rpc,
Expand Down Expand Up @@ -508,8 +500,6 @@ where
#[allow(clippy::too_many_arguments)]
async fn run(
server: MagicEndpoint,
callbacks: Callbacks,
mut cb_receiver: mpsc::Receiver<EventCallback>,
handler: rpc::Handler<D>,
rpc: E,
internal_rpc: impl ServiceEndpoint<RpcService>,
Expand Down Expand Up @@ -586,10 +576,6 @@ where
}
});
},
// Handle new callbacks
Some(cb) = cb_receiver.recv() => {
callbacks.push(cb).await;
}
else => break,
}
}
Expand All @@ -605,12 +591,7 @@ where
.ok();
}

async fn gc_loop(
db: D,
ds: iroh_docs::actor::SyncHandle,
gc_period: Duration,
callbacks: Callbacks,
) {
async fn gc_loop(db: D, ds: iroh_docs::actor::SyncHandle, gc_period: Duration) {
let mut live = BTreeSet::new();
tracing::debug!("GC loop starting {:?}", gc_period);
'outer: loop {
Expand All @@ -623,9 +604,6 @@ where
// do delay before the two phases of GC
tokio::time::sleep(gc_period).await;
tracing::debug!("Starting GC");
callbacks
.send(Event::Db(iroh_blobs::store::Event::GcStarted))
.await;
live.clear();
let doc_hashes = match ds.content_hashes().await {
Ok(hashes) => hashes,
Expand Down Expand Up @@ -680,9 +658,6 @@ where
}
}
}
callbacks
.send(Event::Db(iroh_blobs::store::Event::GcCompleted))
.await;
}
}
}
Expand Down Expand Up @@ -719,7 +694,7 @@ async fn handle_connection<D: BaoStore>(
iroh_blobs::provider::handle_connection(
connection,
node.db.clone(),
node.callbacks.clone(),
MockEventSender,
node.rt.clone(),
)
.await
Expand Down Expand Up @@ -776,3 +751,12 @@ fn make_rpc_endpoint(

Ok((rpc_endpoint, actual_rpc_port))
}

#[derive(Debug, Clone)]
struct MockEventSender;

impl iroh_blobs::provider::EventSender for MockEventSender {
fn send(&self, _event: iroh_blobs::provider::Event) -> futures_lite::future::Boxed<()> {
Box::pin(std::future::ready(()))
}
}
9 changes: 1 addition & 8 deletions iroh/src/node/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use crate::rpc_protocol::{
Request, RpcService, SetTagOption,
};

use super::{Event, NodeInner};
use super::NodeInner;

const HEALTH_POLL_WAIT: Duration = Duration::from_secs(1);
/// Chunk size for getting blobs over RPC
Expand Down Expand Up @@ -757,13 +757,6 @@ impl<D: BaoStore> Handler<D> {
tag: tag.clone(),
})
.await?;
self.inner
.callbacks
.send(Event::ByteProvide(
iroh_blobs::provider::Event::TaggedBlobAdded { hash, format, tag },
))
.await;

Ok(())
}

Expand Down
8 changes: 4 additions & 4 deletions iroh/tests/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn attach_db_events<D: iroh_blobs::store::Store>(
node: &Node<D>,
) -> flume::Receiver<iroh_blobs::store::Event> {
let (db_send, db_recv) = flume::unbounded();
node.subscribe(move |ev| {
/*node.subscribe(move |ev| {
let db_send = db_send.clone();
async move {
if let iroh::node::Event::Db(ev) = ev {
Expand All @@ -64,7 +64,7 @@ async fn attach_db_events<D: iroh_blobs::store::Store>(
.boxed()
})
.await
.unwrap();
.unwrap();*/
db_recv
}

Expand All @@ -83,9 +83,9 @@ async fn step(evs: &flume::Receiver<iroh_blobs::store::Event>) {
while evs.try_recv().is_ok() {}
for _ in 0..3 {
while let Ok(ev) = evs.recv_async().await {
if let iroh_blobs::store::Event::GcCompleted = ev {
/*if let iroh_blobs::store::Event::GcCompleted = ev {
break;
}
}*/
}
}
}
Expand Down
Loading

0 comments on commit b777472

Please sign in to comment.