Skip to content

Commit

Permalink
wip: integration of iroh-sync in iroh
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Aug 7, 2023
1 parent 52a3755 commit 1d8f2d9
Show file tree
Hide file tree
Showing 13 changed files with 466 additions and 147 deletions.
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion iroh-gossip/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::proto::{self, TopicId};
pub mod util;

/// ALPN protocol name
pub const GOSSIP_ALPN: &[u8] = b"n0/iroh-gossip/0";
pub const GOSSIP_ALPN: &[u8] = b"/iroh-gossip/0";
/// Maximum message size is limited to 1024 bytes for now.
pub const MAX_MESSAGE_SIZE: usize = 1024;

Expand Down
2 changes: 1 addition & 1 deletion iroh-sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@ tempfile = "3.4"

[features]
default = ["fs-store"]
fs-store = ["redb", "ouroboros"]
fs-store = ["redb", "ouroboros"]
2 changes: 2 additions & 0 deletions iroh-sync/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub trait Store: std::fmt::Debug + Clone + Send + Sync + 'static {
Self: 'a;

fn get_replica(&self, namespace: &NamespaceId) -> Result<Option<Replica<Self::Instance>>>;
// TODO: return iterator
fn list_replicas(&self) -> Result<Vec<NamespaceId>>;
fn get_author(&self, author: &AuthorId) -> Result<Option<Author>>;
fn new_author<R: CryptoRngCore + ?Sized>(&self, rng: &mut R) -> Result<Author>;

Expand Down
11 changes: 11 additions & 0 deletions iroh-sync/src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,17 @@ impl super::Store for Store {
Ok(Some(replica))
}

// TODO: return iterator
fn list_replicas(&self) -> Result<Vec<NamespaceId>> {
let read_tx = self.db.begin_read()?;
let namespace_table = read_tx.open_table(NAMESPACES_TABLE)?;
let namespaces = namespace_table
.iter()?
.filter_map(|entry| entry.ok())
.map(|(_key, value)| Namespace::from_bytes(value.value()).id());
Ok(namespaces.collect())
}

fn get_author(&self, author_id: &AuthorId) -> Result<Option<Author>> {
let read_tx = self.db.begin_read()?;
let author_table = read_tx.open_table(AUTHORS_TABLE)?;
Expand Down
4 changes: 4 additions & 0 deletions iroh-sync/src/store/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ impl super::Store for Store {
Ok(replicas.get(namespace).cloned())
}

fn list_replicas(&self) -> Result<Vec<NamespaceId>> {
Ok(self.replicas.read().keys().cloned().collect())
}

fn get_author(&self, author: &AuthorId) -> Result<Option<Author>> {
let authors = &*self.authors.read();
Ok(authors.get(author).cloned())
Expand Down
4 changes: 2 additions & 2 deletions iroh-sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ pub struct NamespaceId(VerifyingKey);

impl Display for NamespaceId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "NamespaceId({})", hex::encode(self.0.as_bytes()))
write!(f, "{}", hex::encode(self.0.as_bytes()))
}
}

Expand Down Expand Up @@ -364,7 +364,7 @@ impl<S: ranger::Store<RecordIdentifier, SignedEntry>> Replica<S> {
self.inner.read().namespace.id()
}

pub fn secret_key(&self) -> [u8; 32]{
pub fn secret_key(&self) -> [u8; 32] {
self.inner.read().namespace.to_bytes()
}
}
Expand Down
2 changes: 2 additions & 0 deletions iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ portable-atomic = "1"
iroh-sync = { path = "../iroh-sync" }
iroh-gossip = { path = "../iroh-gossip" }
once_cell = "1.18.0"
parking_lot = "0.12.1"
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
quic-rpc = { version = "0.6", default-features = false, features = ["flume-transport"] }
quinn = "0.10"
Expand Down Expand Up @@ -61,6 +62,7 @@ ed25519-dalek = { version = "=2.0.0-rc.3", features = ["serde", "rand_core"], op
shell-words = { version = "1.1.0", optional = true }
shellexpand = { version = "3.1.0", optional = true }
rustyline = { version = "12.0.0", optional = true }
itertools = "0.11.0"

[features]
default = ["cli", "metrics", "sync"]
Expand Down
132 changes: 108 additions & 24 deletions iroh/src/commands/sync.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use anyhow::anyhow;
use clap::Parser;
use futures::StreamExt;
use indicatif::HumanBytes;
use iroh::{
rpc_protocol::{
AuthorCreateRequest, AuthorListRequest, DocGetRequest, DocJoinRequest, DocSetRequest,
DocShareRequest, DocsImportRequest, ShareMode,
AuthorCreateRequest, AuthorListRequest, DocGetRequest, DocGetResponse, DocJoinRequest,
DocListRequest, DocListResponse, DocSetRequest, DocShareRequest, DocShareResponse,
DocsCreateRequest, DocsImportRequest, DocsListRequest, ShareMode,
},
sync::PeerSource,
};
use iroh_sync::sync::{NamespaceId, AuthorId};
use iroh_sync::sync::{AuthorId, NamespaceId, SignedEntry};

use super::RpcClient;

Expand All @@ -18,6 +20,10 @@ pub enum Commands {
#[clap(subcommand)]
command: Author,
},
Docs {
#[clap(subcommand)]
command: Docs,
},
Doc {
id: NamespaceId,
#[clap(subcommand)]
Expand All @@ -29,6 +35,7 @@ impl Commands {
pub async fn run(self, client: RpcClient) -> anyhow::Result<()> {
match self {
Commands::Author { command } => command.run(client).await,
Commands::Docs { command } => command.run(client).await,
Commands::Doc { command, id } => command.run(client, id).await,
}
}
Expand Down Expand Up @@ -59,12 +66,44 @@ impl Author {
}

#[derive(Debug, Clone, Parser)]
pub enum Doc {
Join {
peers: Vec<PeerSource>,
},
pub enum Docs {
List,
Create,
Import {
key: String,
#[clap(short, long)]
peers: Vec<PeerSource>,
},
}

impl Docs {
pub async fn run(self, client: RpcClient) -> anyhow::Result<()> {
match self {
Docs::Create => {
let res = client.rpc(DocsCreateRequest {}).await??;
println!("{}", res.id);
}
Docs::Import { key, peers } => {
let key = hex::decode(key)?
.try_into()
.map_err(|_| anyhow!("invalid length"))?;
let res = client.rpc(DocsImportRequest { key, peers }).await??;
println!("{:?}", res);
}
Docs::List => {
let mut iter = client.server_streaming(DocsListRequest {}).await?;
while let Some(doc) = iter.next().await {
println!("{}", doc??.id)
}
}
}
Ok(())
}
}

#[derive(Debug, Clone, Parser)]
pub enum Doc {
Join {
peers: Vec<PeerSource>,
},
Share {
Expand All @@ -80,6 +119,16 @@ pub enum Doc {

#[clap(short, long)]
prefix: bool,
#[clap(short, long)]
author: Option<AuthorId>,
/// Include old entries for keys.
#[clap(short, long)]
old: bool,
},
List {
/// Include old entries for keys.
#[clap(short, long)]
old: bool,
},
}

Expand All @@ -91,40 +140,75 @@ impl Doc {
let res = client.rpc(DocJoinRequest { doc_id, peers }).await??;
println!("{:?}", res);
}
Doc::Import { key, peers } => {
let key = hex::decode(key)?
.try_into()
.map_err(|_| anyhow!("invalid length"))?;
let res = client.rpc(DocsImportRequest { key, peers }).await??;
println!("{:?}", res);
}
Doc::Share { mode } => {
let res = client.rpc(DocShareRequest { doc_id, mode }).await??;
println!("{:?}", res);
let DocShareResponse { key, me } =
client.rpc(DocShareRequest { doc_id, mode }).await??;
println!("key: {}", hex::encode(key));
println!("me: {}", me);
}
Doc::Set { author, key, value } => {
let res = client
.rpc(DocSetRequest {
author,
author_id: author,
key: key.as_bytes().to_vec(),
value: value.as_bytes().to_vec(),
doc_id,
})
.await??;
println!("{:?}", res);
println!("{}", fmt_entry(&res.entry));
}
Doc::Get { key, prefix } => {
let res = client
.rpc(DocGetRequest {
Doc::Get {
key,
prefix,
author,
old: all,
} => {
let mut stream = client
.server_streaming(DocGetRequest {
key: key.as_bytes().to_vec(),
doc_id,
author: None,
author_id: author,
prefix,
// todo: support option
latest: !all,
})
.await??;
println!("{:?}", res);
.await?;
while let Some(res) = stream.next().await {
let DocGetResponse { entry } = res??;
println!("{}", fmt_entry(&entry));
}
}
Doc::List { old: all } => {
let mut stream = client
// TODO: fields
.server_streaming(DocListRequest {
doc_id,
latest: !all, // author: None,
// prefix: None,
})
.await?;
while let Some(res) = stream.next().await {
let DocListResponse { entry } = res??;
println!("{}", fmt_entry(&entry));
}
}
}
Ok(())
}
}

fn fmt_entry(entry: &SignedEntry) -> String {
let id = entry.entry().id();
let key = std::str::from_utf8(id.key()).unwrap_or("<bad key>");
let author = fmt_hash(id.author().as_bytes());
let hash = entry.entry().record().content_hash();
let hash = fmt_hash(hash.as_bytes());
let len = HumanBytes(entry.entry().record().content_len());
format!("@{author}: {key} = {hash} ({len})",)
}

fn fmt_hash(hash: impl AsRef<[u8]>) -> String {
let mut text = data_encoding::BASE32_NOPAD.encode(&hash.as_ref()[..5]);
text.make_ascii_lowercase();
format!("{}…", &text)
}
Loading

0 comments on commit 1d8f2d9

Please sign in to comment.