Skip to content

Commit

Permalink
wip: more methods
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Aug 4, 2023
1 parent 48f9562 commit 52a3755
Show file tree
Hide file tree
Showing 7 changed files with 319 additions and 72 deletions.
16 changes: 16 additions & 0 deletions iroh-sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,18 @@ impl FromStr for AuthorId {
}
}

impl FromStr for NamespaceId {
type Err = anyhow::Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let pub_key: [u8; 32] = hex::decode(s)?
.try_into()
.map_err(|_| anyhow::anyhow!("failed to parse: invalid key length"))?;
let pub_key = VerifyingKey::from_bytes(&pub_key)?;
Ok(NamespaceId(pub_key))
}
}

impl From<SigningKey> for Author {
fn from(priv_key: SigningKey) -> Self {
Self { priv_key }
Expand Down Expand Up @@ -351,6 +363,10 @@ impl<S: ranger::Store<RecordIdentifier, SignedEntry>> Replica<S> {
pub fn namespace(&self) -> NamespaceId {
self.inner.read().namespace.id()
}

pub fn secret_key(&self) -> [u8; 32]{
self.inner.read().namespace.to_bytes()
}
}

/// A signed entry.
Expand Down
96 changes: 86 additions & 10 deletions iroh/src/commands/sync.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
use anyhow::anyhow;
use clap::Parser;
use futures::StreamExt;
use iroh::rpc_protocol::{AuthorCreateRequest, AuthorListRequest};
use iroh::{
rpc_protocol::{
AuthorCreateRequest, AuthorListRequest, DocGetRequest, DocJoinRequest, DocSetRequest,
DocShareRequest, DocsImportRequest, ShareMode,
},
sync::PeerSource,
};
use iroh_sync::sync::{NamespaceId, AuthorId};

use super::RpcClient;

Expand All @@ -10,12 +18,18 @@ pub enum Commands {
#[clap(subcommand)]
command: Author,
},
Doc {
id: NamespaceId,
#[clap(subcommand)]
command: Doc,
},
}

impl Commands {
pub async fn run(self, client: RpcClient) -> anyhow::Result<()> {
match self {
Commands::Author { command } => command.run(client).await,
Commands::Doc { command, id } => command.run(client, id).await,
}
}
}
Expand All @@ -24,13 +38,6 @@ impl Commands {
pub enum Author {
List,
Create,
// Import {
// key: String,
// },
// Share {
// mode: ShareMode,
// author_id: AuthorId,
// },
}

impl Author {
Expand All @@ -46,8 +53,77 @@ impl Author {
let author = client.rpc(AuthorCreateRequest).await??;
println!("{}", author.author_id);
}
// Commands::AuthorImport { key } => todo!(),
// Commands::AuthorShare { mode, author_id } => todo!(),
}
Ok(())
}
}

#[derive(Debug, Clone, Parser)]
pub enum Doc {
Join {
peers: Vec<PeerSource>,
},
Import {
key: String,
peers: Vec<PeerSource>,
},
Share {
mode: ShareMode,
},
Set {
author: AuthorId,
key: String,
value: String,
},
Get {
key: String,

#[clap(short, long)]
prefix: bool,
},
}

impl Doc {
pub async fn run(self, client: RpcClient, doc_id: NamespaceId) -> anyhow::Result<()> {
match self {
Doc::Join { peers } => {
// let peers = peers.map(|peer| PeerSource::try_from)?;
let res = client.rpc(DocJoinRequest { doc_id, peers }).await??;
println!("{:?}", res);
}
Doc::Import { key, peers } => {
let key = hex::decode(key)?
.try_into()
.map_err(|_| anyhow!("invalid length"))?;
let res = client.rpc(DocsImportRequest { key, peers }).await??;
println!("{:?}", res);
}
Doc::Share { mode } => {
let res = client.rpc(DocShareRequest { doc_id, mode }).await??;
println!("{:?}", res);
}
Doc::Set { author, key, value } => {
let res = client
.rpc(DocSetRequest {
author,
key: key.as_bytes().to_vec(),
value: value.as_bytes().to_vec(),
doc_id,
})
.await??;
println!("{:?}", res);
}
Doc::Get { key, prefix } => {
let res = client
.rpc(DocGetRequest {
key: key.as_bytes().to_vec(),
doc_id,
author: None,
prefix,
})
.await??;
println!("{:?}", res);
}
}
Ok(())
}
Expand Down
10 changes: 5 additions & 5 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use crate::rpc_protocol::{
ProviderResponse, ProviderService, RpcResult, ShutdownRequest, ValidateRequest, VersionRequest,
VersionResponse, WatchRequest, WatchResponse,
};
use crate::sync::{node::SyncNode, SYNC_ALPN};
use crate::sync::{node::SyncNode, SYNC_ALPN, BlobStore};

const MAX_CONNECTIONS: u32 = 1024;
const MAX_STREAMS: u64 = 10;
Expand Down Expand Up @@ -99,7 +99,7 @@ pub struct Builder<
derp_map: Option<DerpMap>,
collection_parser: C,
rt: Option<runtime::Handle>,
docs: (S, Option<PathBuf>),
docs: (S, PathBuf),
}

const PROTOCOLS: [&[u8]; 3] = [&iroh_bytes::protocol::ALPN, GOSSIP_ALPN, SYNC_ALPN];
Expand Down Expand Up @@ -157,7 +157,7 @@ impl<D: BaoMap, S: Store> Builder<D, S> {
auth_handler: Arc::new(NoopRequestAuthorizationHandler),
collection_parser: NoCollectionParser,
rt: None,
docs: (store, Some(blobs_path)),
docs: (store, blobs_path),
}
}
}
Expand Down Expand Up @@ -317,8 +317,8 @@ where
gossip_cell.set(gossip.clone()).unwrap();

// spawn the sync engine
// let blobs = BlobStore::new(rt.clone(), blobs_path, endpoint.clone());
let sync = SyncNode::spawn(rt.clone(), self.docs.0, endpoint.clone(), gossip.clone());
let blobs = BlobStore::new(rt.clone(), self.docs.1, endpoint.clone()).await?;
let sync = SyncNode::spawn(rt.clone(), self.docs.0, endpoint.clone(), gossip.clone(), blobs);

let (internal_rpc, controller) = quic_rpc::transport::flume::connection(1);
let rt2 = rt.clone();
Expand Down
Loading

0 comments on commit 52a3755

Please sign in to comment.