Skip to content

Commit

Permalink
wip: integrate iroh sync and gossip
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Aug 4, 2023
1 parent da9ce49 commit 48f9562
Show file tree
Hide file tree
Showing 15 changed files with 622 additions and 42 deletions.
3 changes: 3 additions & 0 deletions iroh-sync/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ pub trait Store: std::fmt::Debug + Clone + Send + Sync + 'static {
fn get_replica(&self, namespace: &NamespaceId) -> Result<Option<Replica<Self::Instance>>>;
fn get_author(&self, author: &AuthorId) -> Result<Option<Author>>;
fn new_author<R: CryptoRngCore + ?Sized>(&self, rng: &mut R) -> Result<Author>;

// TODO: return iterator
fn list_authors(&self) -> Result<Vec<Author>>;
fn new_replica(&self, namespace: Namespace) -> Result<Replica<Self::Instance>>;

/// Gets all entries matching this key and author.
Expand Down
15 changes: 15 additions & 0 deletions iroh-sync/src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,21 @@ impl super::Store for Store {
Ok(author)
}

/// Generates a new author, using the passed in randomness.
fn list_authors(&self) -> Result<Vec<Author>> {
let read_tx = self.db.begin_read()?;
let author_table = read_tx.open_table(AUTHORS_TABLE)?;

let mut authors = vec![];
let iter = author_table.iter()?;
for entry in iter {
let (_key, value) = entry?;
let author = Author::from_bytes(value.value());
authors.push(author);
}
Ok(authors)
}

fn new_replica(&self, namespace: Namespace) -> Result<Replica<Self::Instance>> {
let id = namespace.id();
self.insert_namespace(namespace.clone())?;
Expand Down
4 changes: 4 additions & 0 deletions iroh-sync/src/store/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ impl super::Store for Store {
Ok(author)
}

fn list_authors(&self) -> Result<Vec<Author>> {
Ok(self.authors.read().values().cloned().collect())
}

fn new_replica(&self, namespace: Namespace) -> Result<Replica<ReplicaStoreInstance>> {
let id = namespace.id();
let replica = Replica::new(namespace, ReplicaStoreInstance::new(id, self.clone()));
Expand Down
12 changes: 12 additions & 0 deletions iroh-sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,18 @@ impl FromStr for Author {
}
}

impl FromStr for AuthorId {
type Err = anyhow::Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let pub_key: [u8; 32] = hex::decode(s)?
.try_into()
.map_err(|_| anyhow::anyhow!("failed to parse: invalid key length"))?;
let pub_key = VerifyingKey::from_bytes(&pub_key)?;
Ok(AuthorId(pub_key))
}
}

impl From<SigningKey> for Author {
fn from(priv_key: SigningKey) -> Self {
Self { priv_key }
Expand Down
5 changes: 3 additions & 2 deletions iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ bytes = "1"
derive_more = { version = "1.0.0-beta.1", features = ["debug", "display", "from", "try_into"] }
flume = "0.10.14"
futures = "0.3.25"
genawaiter = { version = "0.99", default-features = false, features = ["futures03"] }
hex = { version = "0.4.3" }
iroh-io = { version = "0.2.1" }
iroh-net = { version = "0.5.1", path = "../iroh-net" }
Expand All @@ -29,6 +30,7 @@ num_cpus = { version = "1.15.0" }
portable-atomic = "1"
iroh-sync = { path = "../iroh-sync" }
iroh-gossip = { path = "../iroh-gossip" }
once_cell = "1.18.0"
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
quic-rpc = { version = "0.6", default-features = false, features = ["flume-transport"] }
quinn = "0.10"
Expand All @@ -55,7 +57,6 @@ data-encoding = "2.4.0"
url = { version = "2.4", features = ["serde"] }

# Examples
once_cell = { version = "1.18.0", optional = true }
ed25519-dalek = { version = "=2.0.0-rc.3", features = ["serde", "rand_core"], optional = true }
shell-words = { version = "1.1.0", optional = true }
shellexpand = { version = "3.1.0", optional = true }
Expand All @@ -70,7 +71,7 @@ flat-db = []
mem-db = []
iroh-collection = []
test = []
example-sync = ["cli", "ed25519-dalek", "once_cell", "shell-words", "shellexpand", "sync", "rustyline"]
example-sync = ["cli", "ed25519-dalek", "shell-words", "shellexpand", "sync", "rustyline"]

[dev-dependencies]
anyhow = { version = "1", features = ["backtrace"] }
Expand Down
3 changes: 2 additions & 1 deletion iroh/examples/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ async fn run(args: Args) -> anyhow::Result<()> {
let docs = DocStore::new(blobs.clone(), author, docs_path)?;

// create the live syncer
let live_sync = LiveSync::<store::fs::Store>::spawn(endpoint.clone(), gossip.clone());
let live_sync =
LiveSync::<store::fs::Store>::spawn(rt.clone(), endpoint.clone(), gossip.clone());

// construct the state that is passed to the endpoint loop and from there cloned
// into to the connection handler task for incoming connections.
Expand Down
24 changes: 18 additions & 6 deletions iroh/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use iroh::rpc_protocol::*;
use iroh_bytes::{protocol::RequestToken, util::runtime, Hash};
use iroh_net::tls::{Keypair, PeerId};
use quic_rpc::transport::quinn::QuinnConnection;
use quic_rpc::RpcClient;

use crate::config::Config;

Expand All @@ -27,8 +26,13 @@ pub mod doctor;
pub mod get;
pub mod list;
pub mod provide;
pub mod sync;
pub mod validate;

/// RPC client to an iroh node.
pub type RpcClient =
quic_rpc::RpcClient<ProviderService, QuinnConnection<ProviderResponse, ProviderRequest>>;

/// Send data.
///
/// The iroh command line tool has two modes: provide and get.
Expand Down Expand Up @@ -150,6 +154,10 @@ impl Cli {
Ok(())
}
Commands::Doctor { command } => self::doctor::run(command, config).await,
Commands::Sync { command, rpc_port } => {
let client = make_rpc_client(rpc_port).await?;
command.run(client).await
}
}
}
}
Expand Down Expand Up @@ -263,18 +271,22 @@ pub enum Commands {
#[clap(long, default_value_t = DEFAULT_RPC_PORT)]
rpc_port: u16,
},
Sync {
/// RPC port
#[clap(long, default_value_t = DEFAULT_RPC_PORT)]
rpc_port: u16,
#[clap(subcommand)]
command: sync::Commands,
},
}

async fn make_rpc_client(
rpc_port: u16,
) -> anyhow::Result<RpcClient<ProviderService, QuinnConnection<ProviderResponse, ProviderRequest>>>
{
async fn make_rpc_client(rpc_port: u16) -> anyhow::Result<RpcClient> {
let bind_addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0).into();
let endpoint = create_quinn_client(bind_addr, vec![RPC_ALPN.to_vec()], false)?;
let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), rpc_port);
let server_name = "localhost".to_string();
let connection = QuinnConnection::new(endpoint, addr, server_name);
let client = RpcClient::<ProviderService, _>::new(connection);
let client = RpcClient::new(connection);
// Do a version request to check if the server is running.
let _version = tokio::time::timeout(Duration::from_secs(1), client.rpc(VersionRequest))
.await
Expand Down
18 changes: 14 additions & 4 deletions iroh/src/commands/provide.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use iroh::{
};
use iroh_bytes::{protocol::RequestToken, provider::BaoReadonlyDb, util::runtime};
use iroh_net::{derp::DerpMap, tls::Keypair};
use iroh_sync::store::Store;
use quic_rpc::{transport::quinn::QuinnServerEndpoint, ServiceEndpoint};
use tokio::io::AsyncWriteExt;
use tracing::{info_span, Instrument};
Expand All @@ -26,6 +27,10 @@ use super::{
MAX_RPC_CONNECTIONS, MAX_RPC_STREAMS, RPC_ALPN,
};

/// File name inside `IROH_DATA_DIR` where docs stored.
/// TODO: Move some other place
pub const DOCS_PATH: &str = "docs";

#[derive(Debug)]
pub struct ProvideOptions {
pub addr: SocketAddr,
Expand Down Expand Up @@ -60,9 +65,12 @@ pub async fn run(rt: &runtime::Handle, path: Option<PathBuf>, opts: ProvideOptio
Database::default()
}
};
let store = iroh_sync::store::fs::Store::new(iroh_data_root.join(DOCS_PATH))?;
let blobs_path = iroh_data_root.join("blobstemp");

let key = Some(iroh_data_root.join("keypair"));
let token = opts.request_token.clone();
let provider = provide(db.clone(), rt, key, opts).await?;
let provider = provide(db.clone(), store, blobs_path, rt, key, opts).await?;
let controller = provider.controller();
if let Some(t) = token.as_ref() {
println!("Request token: {}", t);
Expand Down Expand Up @@ -122,15 +130,17 @@ pub async fn run(rt: &runtime::Handle, path: Option<PathBuf>, opts: ProvideOptio
Ok(())
}

async fn provide<D: BaoReadonlyDb>(
async fn provide<D: BaoReadonlyDb, S: Store>(
db: D,
store: S,
blobs_path: PathBuf,
rt: &runtime::Handle,
key: Option<PathBuf>,
opts: ProvideOptions,
) -> Result<Node<D>> {
) -> Result<Node<D, S>> {
let keypair = get_keypair(key).await?;

let mut builder = Node::builder(db)
let mut builder = Node::builder(db, store, blobs_path)
.collection_parser(IrohCollectionParser)
.custom_auth_handler(Arc::new(StaticTokenAuthHandler::new(opts.request_token)))
.keylog(opts.keylog);
Expand Down
54 changes: 54 additions & 0 deletions iroh/src/commands/sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use clap::Parser;
use futures::StreamExt;
use iroh::rpc_protocol::{AuthorCreateRequest, AuthorListRequest};

use super::RpcClient;

#[derive(Debug, Clone, Parser)]
pub enum Commands {
Author {
#[clap(subcommand)]
command: Author,
},
}

impl Commands {
pub async fn run(self, client: RpcClient) -> anyhow::Result<()> {
match self {
Commands::Author { command } => command.run(client).await,
}
}
}

#[derive(Debug, Clone, Parser)]
pub enum Author {
List,
Create,
// Import {
// key: String,
// },
// Share {
// mode: ShareMode,
// author_id: AuthorId,
// },
}

impl Author {
pub async fn run(self, client: RpcClient) -> anyhow::Result<()> {
match self {
Author::List => {
let mut stream = client.server_streaming(AuthorListRequest {}).await?;
while let Some(author) = stream.next().await {
println!("{}", author??.author_id);
}
}
Author::Create => {
let author = client.rpc(AuthorCreateRequest).await??;
println!("{}", author.author_id);
}
// Commands::AuthorImport { key } => todo!(),
// Commands::AuthorShare { mode, author_id } => todo!(),
}
Ok(())
}
}
Loading

0 comments on commit 48f9562

Please sign in to comment.