Skip to content

Commit

Permalink
refactor(iroh): remove iroh-blobs
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Nov 14, 2024
1 parent 4090f38 commit 430e19a
Show file tree
Hide file tree
Showing 14 changed files with 48 additions and 1,553 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions iroh-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use clap::Parser;
mod commands;
mod config;
mod logging;
mod progress;

use crate::commands::Cli;

Expand Down
File renamed without changes.
5 changes: 2 additions & 3 deletions iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ genawaiter = { version = "0.99", default-features = false, features = [
"futures03",
] }
hex = { version = "0.4.3" }
iroh-blobs = { version = "0.28.0", features = ["downloader"] }
iroh-base = { version = "0.28.0", features = ["key"] }
iroh-io = { version = "0.6.0", features = ["stats"] }
iroh-metrics = { version = "0.28.0", optional = true }
Expand Down Expand Up @@ -81,8 +80,8 @@ serde-error = "0.1.3"

[features]
default = ["metrics", "fs-store"]
metrics = ["iroh-metrics", "iroh-blobs/metrics"]
fs-store = ["iroh-blobs/fs-store"]
metrics = ["iroh-metrics"]
fs-store = []
test = []
examples = ["dep:clap", "dep:indicatif"]
discovery-local-network = [
Expand Down
12 changes: 0 additions & 12 deletions iroh/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ pub use crate::rpc_protocol::RpcService;

mod quic;

pub use iroh_blobs::rpc::client::{blobs, tags};

pub use self::net::NodeStatus;
pub(crate) use self::quic::{connect_raw as quic_connect_raw, RPC_ALPN};
pub mod net;
Expand Down Expand Up @@ -57,16 +55,6 @@ impl Iroh {
self.rpc.clone()
}

/// Returns the blobs client.
pub fn blobs(&self) -> blobs::Client {
blobs::Client::new(self.rpc.clone().map().boxed())
}

/// Returns the tags client.
pub fn tags(&self) -> tags::Client {
tags::Client::new(self.rpc.clone().map().boxed())
}

/// Returns the net client.
pub fn net(&self) -> &net::Client {
net::Client::ref_cast(&self.rpc)
Expand Down
2 changes: 0 additions & 2 deletions iroh/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@
#[doc(inline)]
pub use iroh_base as base;
#[doc(inline)]
pub use iroh_blobs as blobs;
#[doc(inline)]
pub use iroh_net as net;
#[doc(inline)]
pub use iroh_router as router;
Expand Down
84 changes: 15 additions & 69 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
use std::{
collections::BTreeSet,
fmt::Debug,
marker::PhantomData,
net::SocketAddr,
path::{Path, PathBuf},
sync::Arc,
Expand All @@ -49,11 +48,6 @@ use anyhow::{anyhow, Result};
use futures_lite::StreamExt;
use futures_util::future::{MapErr, Shared};
use iroh_base::key::PublicKey;
use iroh_blobs::{
net_protocol::Blobs as BlobsProtocol,
store::Store as BaoStore,
util::local_pool::{LocalPool, LocalPoolHandle},
};
use iroh_net::{
endpoint::{DirectAddrsStream, RemoteInfo},
AddrInfo, Endpoint, NodeAddr,
Expand All @@ -73,9 +67,7 @@ mod rpc_status;

pub(crate) use self::rpc::RpcResult;
pub use self::{
builder::{
Builder, DiscoveryConfig, GcPolicy, ProtocolBuilder, StorageConfig, DEFAULT_RPC_ADDR,
},
builder::{Builder, DiscoveryConfig, ProtocolBuilder, StorageConfig, DEFAULT_RPC_ADDR},
rpc_status::RpcStatus,
};

Expand All @@ -101,8 +93,8 @@ pub type IrohServerEndpoint = quic_rpc::transport::boxed::BoxedListener<
/// await the [`Node`] struct directly, it will complete when the task completes. If
/// this is dropped the node task is not stopped but keeps running.
#[derive(Debug, Clone)]
pub struct Node<D> {
inner: Arc<NodeInner<D>>,
pub struct Node {
inner: Arc<NodeInner>,
// `Node` needs to be `Clone + Send`, and we need to `task.await` in its `shutdown()` impl.
// So we need
// - `Shared` so we can `task.await` from all `Node` clones
Expand All @@ -116,43 +108,37 @@ pub struct Node<D> {
pub(crate) type JoinErrToStr = Box<dyn Fn(JoinError) -> String + Send + Sync + 'static>;

#[derive(derive_more::Debug)]
struct NodeInner<D> {
db: PhantomData<D>,
struct NodeInner {
rpc_addr: Option<SocketAddr>,
endpoint: Endpoint,
cancel_token: CancellationToken,
client: crate::client::Iroh,
local_pool_handle: LocalPoolHandle,
}

/// In memory node.
pub type MemNode = Node<iroh_blobs::store::mem::Store>;
#[deprecated]
pub type MemNode = Node;

/// Persistent node.
pub type FsNode = Node<iroh_blobs::store::fs::Store>;
#[deprecated]
pub type FsNode = Node;

impl MemNode {
impl Node {
/// Returns a new builder for the [`Node`], by default configured to run in memory.
///
/// Once done with the builder call [`Builder::spawn`] to create the node.
pub fn memory() -> Builder<iroh_blobs::store::mem::Store> {
Builder::default()
pub fn memory() -> Builder {
Builder::memory()
}
}

impl FsNode {
/// Returns a new builder for the [`Node`], configured to persist all data
/// from the given path.
///
/// Once done with the builder call [`Builder::spawn`] to create the node.
pub async fn persistent(
root: impl AsRef<Path>,
) -> Result<Builder<iroh_blobs::store::fs::Store>> {
Builder::default().persist(root).await
pub async fn persistent(root: impl AsRef<Path>) -> Result<Builder> {
Builder::memory().persist(root).await
}
}

impl<D: BaoStore> Node<D> {
/// Returns the [`Endpoint`] of the node.
///
/// This can be used to establish connections to other nodes under any
Expand Down Expand Up @@ -196,11 +182,6 @@ impl<D: BaoStore> Node<D> {
&self.inner.client
}

/// Returns a reference to the used `LocalPoolHandle`.
pub fn local_pool_handle(&self) -> &LocalPoolHandle {
&self.inner.local_pool_handle
}

/// Get the relay server we are connected to.
pub fn home_relay(&self) -> Option<iroh_net::RelayUrl> {
self.inner.endpoint.home_relay()
Expand Down Expand Up @@ -243,15 +224,15 @@ impl<D: BaoStore> Node<D> {
}
}

impl<D> std::ops::Deref for Node<D> {
impl std::ops::Deref for Node {
type Target = crate::client::Iroh;

fn deref(&self) -> &Self::Target {
&self.inner.client
}
}

impl<D: iroh_blobs::store::Store> NodeInner<D> {
impl NodeInner {
async fn local_endpoint_addresses(&self) -> Result<Vec<SocketAddr>> {
let endpoints = self
.endpoint
Expand All @@ -268,10 +249,7 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
external_rpc: IrohServerEndpoint,
internal_rpc: IrohServerEndpoint,
router: Router,
gc_policy: GcPolicy,
gc_done_callback: Option<Box<dyn Fn() + Send>>,
nodes_data_path: Option<PathBuf>,
local_pool: LocalPool,
) {
let (ipv4, ipv6) = self.endpoint.bound_sockets();
debug!(
Expand All @@ -287,37 +265,6 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
let external_rpc = RpcServer::new(external_rpc);
let internal_rpc = RpcServer::new(internal_rpc);

// Spawn a task for the garbage collection.
if let GcPolicy::Interval(gc_period) = gc_policy {
let router = router.clone();
let handle = local_pool.spawn(move || async move {
let blobs = router
.get_protocol::<BlobsProtocol<D>>(iroh_blobs::protocol::ALPN)
.expect("missing blobs");

blobs
.store()
.gc_run(
iroh_blobs::store::GcConfig {
period: gc_period,
done_callback: gc_done_callback,
},
|| async move { BTreeSet::default() },
)
.await;
});
// We cannot spawn tasks that run on the local pool directly into the join set,
// so instead we create a new task that supervises the local task.
join_set.spawn({
async move {
if let Err(err) = handle.await {
return Err(anyhow::Error::from(err));
}
Ok(())
}
});
}

if let Some(nodes_data_path) = nodes_data_path {
let ep = self.endpoint.clone();
let token = self.cancel_token.clone();
Expand Down Expand Up @@ -419,7 +366,6 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {

// Abort remaining local tasks.
tracing::info!("Shutting down local pool");
local_pool.shutdown().await;
}
}

Expand Down
Loading

0 comments on commit 430e19a

Please sign in to comment.