From 52a37551a296de7379887af99401832f59931a4e Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Fri, 4 Aug 2023 19:28:59 +0200 Subject: [PATCH] wip: more methods --- iroh-sync/src/sync.rs | 16 ++++ iroh/src/commands/sync.rs | 96 +++++++++++++++++++++--- iroh/src/node.rs | 10 +-- iroh/src/rpc_protocol.rs | 84 +++++++++++---------- iroh/src/sync/content.rs | 2 + iroh/src/sync/live.rs | 34 ++++++++- iroh/src/sync/node.rs | 149 +++++++++++++++++++++++++++++++++----- 7 files changed, 319 insertions(+), 72 deletions(-) diff --git a/iroh-sync/src/sync.rs b/iroh-sync/src/sync.rs index 9055133eae7..b3b815965f4 100644 --- a/iroh-sync/src/sync.rs +++ b/iroh-sync/src/sync.rs @@ -142,6 +142,18 @@ impl FromStr for AuthorId { } } +impl FromStr for NamespaceId { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let pub_key: [u8; 32] = hex::decode(s)? + .try_into() + .map_err(|_| anyhow::anyhow!("failed to parse: invalid key length"))?; + let pub_key = VerifyingKey::from_bytes(&pub_key)?; + Ok(NamespaceId(pub_key)) + } +} + impl From for Author { fn from(priv_key: SigningKey) -> Self { Self { priv_key } @@ -351,6 +363,10 @@ impl> Replica { pub fn namespace(&self) -> NamespaceId { self.inner.read().namespace.id() } + + pub fn secret_key(&self) -> [u8; 32]{ + self.inner.read().namespace.to_bytes() + } } /// A signed entry. diff --git a/iroh/src/commands/sync.rs b/iroh/src/commands/sync.rs index a361fdd9a07..033f69cea72 100644 --- a/iroh/src/commands/sync.rs +++ b/iroh/src/commands/sync.rs @@ -1,6 +1,14 @@ +use anyhow::anyhow; use clap::Parser; use futures::StreamExt; -use iroh::rpc_protocol::{AuthorCreateRequest, AuthorListRequest}; +use iroh::{ + rpc_protocol::{ + AuthorCreateRequest, AuthorListRequest, DocGetRequest, DocJoinRequest, DocSetRequest, + DocShareRequest, DocsImportRequest, ShareMode, + }, + sync::PeerSource, +}; +use iroh_sync::sync::{NamespaceId, AuthorId}; use super::RpcClient; @@ -10,12 +18,18 @@ pub enum Commands { #[clap(subcommand)] command: Author, }, + Doc { + id: NamespaceId, + #[clap(subcommand)] + command: Doc, + }, } impl Commands { pub async fn run(self, client: RpcClient) -> anyhow::Result<()> { match self { Commands::Author { command } => command.run(client).await, + Commands::Doc { command, id } => command.run(client, id).await, } } } @@ -24,13 +38,6 @@ impl Commands { pub enum Author { List, Create, - // Import { - // key: String, - // }, - // Share { - // mode: ShareMode, - // author_id: AuthorId, - // }, } impl Author { @@ -46,8 +53,77 @@ impl Author { let author = client.rpc(AuthorCreateRequest).await??; println!("{}", author.author_id); } - // Commands::AuthorImport { key } => todo!(), - // Commands::AuthorShare { mode, author_id } => todo!(), + } + Ok(()) + } +} + +#[derive(Debug, Clone, Parser)] +pub enum Doc { + Join { + peers: Vec, + }, + Import { + key: String, + peers: Vec, + }, + Share { + mode: ShareMode, + }, + Set { + author: AuthorId, + key: String, + value: String, + }, + Get { + key: String, + + #[clap(short, long)] + prefix: bool, + }, +} + +impl Doc { + pub async fn run(self, client: RpcClient, doc_id: NamespaceId) -> anyhow::Result<()> { + match self { + Doc::Join { peers } => { + // let peers = peers.map(|peer| PeerSource::try_from)?; + let res = client.rpc(DocJoinRequest { doc_id, peers }).await??; + println!("{:?}", res); + } + Doc::Import { key, peers } => { + let key = hex::decode(key)? + .try_into() + .map_err(|_| anyhow!("invalid length"))?; + let res = client.rpc(DocsImportRequest { key, peers }).await??; + println!("{:?}", res); + } + Doc::Share { mode } => { + let res = client.rpc(DocShareRequest { doc_id, mode }).await??; + println!("{:?}", res); + } + Doc::Set { author, key, value } => { + let res = client + .rpc(DocSetRequest { + author, + key: key.as_bytes().to_vec(), + value: value.as_bytes().to_vec(), + doc_id, + }) + .await??; + println!("{:?}", res); + } + Doc::Get { key, prefix } => { + let res = client + .rpc(DocGetRequest { + key: key.as_bytes().to_vec(), + doc_id, + author: None, + prefix, + }) + .await??; + println!("{:?}", res); + } } Ok(()) } diff --git a/iroh/src/node.rs b/iroh/src/node.rs index e9213b84842..920aa2b1282 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -55,7 +55,7 @@ use crate::rpc_protocol::{ ProviderResponse, ProviderService, RpcResult, ShutdownRequest, ValidateRequest, VersionRequest, VersionResponse, WatchRequest, WatchResponse, }; -use crate::sync::{node::SyncNode, SYNC_ALPN}; +use crate::sync::{node::SyncNode, SYNC_ALPN, BlobStore}; const MAX_CONNECTIONS: u32 = 1024; const MAX_STREAMS: u64 = 10; @@ -99,7 +99,7 @@ pub struct Builder< derp_map: Option, collection_parser: C, rt: Option, - docs: (S, Option), + docs: (S, PathBuf), } const PROTOCOLS: [&[u8]; 3] = [&iroh_bytes::protocol::ALPN, GOSSIP_ALPN, SYNC_ALPN]; @@ -157,7 +157,7 @@ impl Builder { auth_handler: Arc::new(NoopRequestAuthorizationHandler), collection_parser: NoCollectionParser, rt: None, - docs: (store, Some(blobs_path)), + docs: (store, blobs_path), } } } @@ -317,8 +317,8 @@ where gossip_cell.set(gossip.clone()).unwrap(); // spawn the sync engine - // let blobs = BlobStore::new(rt.clone(), blobs_path, endpoint.clone()); - let sync = SyncNode::spawn(rt.clone(), self.docs.0, endpoint.clone(), gossip.clone()); + let blobs = BlobStore::new(rt.clone(), self.docs.1, endpoint.clone()).await?; + let sync = SyncNode::spawn(rt.clone(), self.docs.0, endpoint.clone(), gossip.clone(), blobs); let (internal_rpc, controller) = quic_rpc::transport::flume::connection(1); let rt2 = rt.clone(); diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index 9f9130fae46..87585e0ab1c 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -27,6 +27,8 @@ pub use iroh_bytes::{ util::RpcResult, }; +use crate::sync::PeerSource; + /// A 32-byte key or token pub type KeyBytes = [u8; 32]; @@ -207,7 +209,7 @@ pub struct PeerAddRequest { } impl RpcMsg for PeerAddRequest { - type Response = PeerAddResponse; + type Response = RpcResult; } /// todo @@ -223,7 +225,7 @@ impl Msg for PeerListRequest { } impl ServerStreamingMsg for PeerListRequest { - type Response = PeerListResponse; + type Response = RpcResult; } /// todo @@ -275,7 +277,7 @@ pub struct AuthorImportRequest { } impl RpcMsg for AuthorImportRequest { - type Response = AuthorImportResponse; + type Response = RpcResult; } /// todo @@ -301,7 +303,7 @@ pub enum ShareMode { } impl RpcMsg for AuthorShareRequest { - type Response = AuthorShareResponse; + type Response = RpcResult; } /// todo @@ -319,7 +321,7 @@ impl Msg for DocsListRequest { } impl ServerStreamingMsg for DocsListRequest { - type Response = DocsListResponse; + type Response = RpcResult; } /// todo @@ -334,7 +336,7 @@ pub struct DocsListResponse { pub struct DocsCreateRequest {} impl RpcMsg for DocsCreateRequest { - type Response = DocsCreateResponse; + type Response = RpcResult; } /// todo @@ -348,45 +350,46 @@ pub struct DocsCreateResponse { pub struct DocsImportRequest { // either a public or private key pub key: KeyBytes, - pub peers: Vec, + pub peers: Vec, } impl RpcMsg for DocsImportRequest { - type Response = DocsImportResponse; + type Response = RpcResult; } /// todo #[derive(Serialize, Deserialize, Debug)] pub struct DocsImportResponse { - pub id: NamespaceId, + pub doc_id: NamespaceId, } /// todo #[derive(Serialize, Deserialize, Debug)] pub struct DocShareRequest { - pub doc: NamespaceId, + pub doc_id: NamespaceId, pub mode: ShareMode, } impl RpcMsg for DocShareRequest { - type Response = DocShareResponse; + type Response = RpcResult; } /// todo #[derive(Serialize, Deserialize, Debug)] pub struct DocShareResponse { pub key: KeyBytes, + pub me: PeerSource, } /// todo #[derive(Serialize, Deserialize, Debug)] pub struct DocJoinRequest { - pub doc: NamespaceId, - pub peer: PeerId, + pub doc_id: NamespaceId, + pub peers: Vec, } impl RpcMsg for DocJoinRequest { - type Response = DocJoinResponse; + type Response = RpcResult; } /// todo @@ -396,7 +399,7 @@ pub struct DocJoinResponse {} /// todo #[derive(Serialize, Deserialize, Debug)] pub struct DocSetRequest { - pub doc: NamespaceId, + pub doc_id: NamespaceId, pub author: AuthorId, pub key: Vec, // todo: different forms to supply value @@ -404,7 +407,7 @@ pub struct DocSetRequest { } impl RpcMsg for DocSetRequest { - type Response = DocSetResponse; + type Response = RpcResult; } /// todo @@ -416,19 +419,22 @@ pub struct DocSetResponse { /// todo #[derive(Serialize, Deserialize, Debug)] pub struct DocGetRequest { - pub doc: NamespaceId, + pub doc_id: NamespaceId, pub author: Option, pub key: Vec, pub prefix: bool, } -impl Msg for DocGetRequest { - type Pattern = ServerStreaming; -} - -impl ServerStreamingMsg for DocGetRequest { - type Response = DocGetResponse; +impl RpcMsg for DocGetRequest { + type Response = RpcResult; } +// impl Msg for DocGetRequest { +// type Pattern = ServerStreaming; +// } +// +// impl ServerStreamingMsg for DocGetRequest { +// type Response = RpcResult; +// } /// todo #[derive(Serialize, Deserialize, Debug)] @@ -439,7 +445,7 @@ pub struct DocGetResponse { /// todo #[derive(Serialize, Deserialize, Debug)] pub struct DocListRequest { - pub doc: NamespaceId, + pub doc_id: NamespaceId, pub author: Option, pub prefix: Option, pub latest: bool, @@ -450,7 +456,7 @@ impl Msg for DocListRequest { } impl ServerStreamingMsg for DocListRequest { - type Response = DocListResponse; + type Response = RpcResult; } /// todo @@ -512,23 +518,23 @@ pub enum ProviderResponse { // TODO: I see I changed naming convention here but at least to me it becomes easier to parse // with the subject in front if there's many commands - PeerAdd(PeerAddResponse), - PeerList(PeerListResponse), + PeerAdd(RpcResult), + PeerList(RpcResult), AuthorList(RpcResult), AuthorCreate(RpcResult), - AuthorImport(AuthorImportResponse), - AuthorShare(AuthorShareResponse), - - DocsList(DocsListResponse), - DocsCreate(DocsCreateResponse), - DocsImport(DocsImportResponse), - - DocSet(DocSetResponse), - DocGet(DocGetResponse), - DocList(DocListResponse), - DocJoin(DocJoinResponse), - DocShare(DocShareResponse), + AuthorImport(RpcResult), + AuthorShare(RpcResult), + + DocsList(RpcResult), + DocsCreate(RpcResult), + DocsImport(RpcResult), + + DocSet(RpcResult), + DocGet(RpcResult), + DocList(RpcResult), + DocJoin(RpcResult), + DocShare(RpcResult), // DocGetContent(DocGetContentResponse), } diff --git a/iroh/src/sync/content.rs b/iroh/src/sync/content.rs index 677b7c808e4..09a7bb1b2ad 100644 --- a/iroh/src/sync/content.rs +++ b/iroh/src/sync/content.rs @@ -90,6 +90,8 @@ impl DocStore { /// TODO: Currently content is only downloaded from the author of a entry. /// We want to try other peers if the author is offline (or always). /// We'll need some heuristics which peers to try. +/// +/// TODO: remove #[derive(Clone, Debug)] pub struct Doc { replica: Replica, diff --git a/iroh/src/sync/live.rs b/iroh/src/sync/live.rs index 7290f63d983..65d784e5a36 100644 --- a/iroh/src/sync/live.rs +++ b/iroh/src/sync/live.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, net::SocketAddr, sync::Arc}; +use std::{collections::HashMap, fmt, net::SocketAddr, sync::Arc, str::FromStr}; use crate::sync::connect_and_sync; use anyhow::{anyhow, Result}; @@ -36,6 +36,38 @@ pub struct PeerSource { pub derp_region: Option, } +impl PeerSource { + /// Deserializes from bytes. + fn from_bytes(bytes: &[u8]) -> anyhow::Result { + postcard::from_bytes(bytes).map_err(Into::into) + } + /// Serializes to bytes. + pub fn to_bytes(&self) -> Vec { + postcard::to_stdvec(self).expect("postcard::to_stdvec is infallible") + } +} + +/// Serializes to base32. +impl fmt::Display for PeerSource { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let encoded = self.to_bytes(); + let mut text = data_encoding::BASE32_NOPAD.encode(&encoded); + text.make_ascii_lowercase(); + write!(f, "{text}") + } +} + +/// Deserializes from base32. +impl FromStr for PeerSource { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let bytes = data_encoding::BASE32_NOPAD.decode(s.to_ascii_uppercase().as_bytes())?; + let slf = Self::from_bytes(&bytes)?; + Ok(slf) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Op { Put(SignedEntry), diff --git a/iroh/src/sync/node.rs b/iroh/src/sync/node.rs index 95cd0f777ce..dc02a981a66 100644 --- a/iroh/src/sync/node.rs +++ b/iroh/src/sync/node.rs @@ -1,43 +1,54 @@ // use std::collections::HashMap; +use std::{ + collections::{HashMap, HashSet}, + sync::{Arc, RwLock}, +}; + +use anyhow::{anyhow, bail}; use futures::Stream; use genawaiter::sync::Gen; use iroh_bytes::util::runtime::Handle; -use iroh_net::MagicEndpoint; -use iroh_sync::store::Store; +use iroh_net::{tls::PeerId, MagicEndpoint}; +use iroh_sync::{ + store::Store, + sync::{InsertOrigin, Namespace, NamespaceId, Replica}, +}; +use rand::rngs::OsRng; use crate::rpc_protocol::{ - AuthorCreateRequest, AuthorCreateResponse, AuthorListRequest, AuthorListResponse, RpcResult, + AuthorCreateRequest, AuthorCreateResponse, AuthorListRequest, AuthorListResponse, + DocSetRequest, DocSetResponse, DocShareRequest, DocShareResponse, DocsCreateRequest, + DocsCreateResponse, DocsImportRequest, DocsImportResponse, RpcResult, ShareMode, }; -// use super::{BlobStore, Doc, LiveSync}; +use super::{BlobStore, Doc, DownloadMode, LiveSync, PeerSource}; /// Document synchronization engine #[derive(Debug, Clone)] pub struct SyncNode { - // rt: Handle, - // endpoint: MagicEndpoint, + endpoint: MagicEndpoint, store: S, - // live: LiveSync, - // blobs: BlobStore, - // open_docs: HashMap>, + live: LiveSync, + blobs: BlobStore, + // open_docs: Arc> } impl SyncNode { /// todo pub fn spawn( - _rt: Handle, + rt: Handle, store: S, - _endpoint: MagicEndpoint, - _gossip: iroh_gossip::net::GossipHandle, - // blobs: BlobStore, + endpoint: MagicEndpoint, + gossip: iroh_gossip::net::GossipHandle, + blobs: BlobStore, ) -> Self { - // let live = LiveSync::spawn(rt, endpoint, gossip); + let live = LiveSync::spawn(rt, endpoint.clone(), gossip); Self { store, - // live, - // blobs, - // open_docs: Default::default(), + live, + blobs, + endpoint, // open_docs: Default::default(), } } @@ -73,4 +84,108 @@ impl SyncNode { } }) } + + fn get_replica(&self, id: &NamespaceId) -> anyhow::Result> { + self.store + .get_replica(id)? + .ok_or_else(|| anyhow!("doc not found")) + } + + pub async fn doc_open(&self, id: &NamespaceId, peers: Vec) -> anyhow::Result<()> { + let replica = self.get_replica(id)?; + + let download_mode = DownloadMode::Always; + + // If download mode is set to always download: + // setup on_insert callback to trigger download on remote insert + if let DownloadMode::Always = download_mode { + let replica = replica.clone(); + let blobs = self.blobs.clone(); + replica.on_insert(Box::new(move |origin, entry| { + if matches!(origin, InsertOrigin::Sync) { + let hash = *entry.entry().record().content_hash(); + let peer_id = PeerId::from_bytes(entry.entry().id().author().as_bytes()) + .expect("failed to convert author to peer id"); + blobs.start_download(hash, peer_id); + } + })); + } + + self.live.add(replica, peers).await?; + + // Collect metrics + // replica.on_insert(Box::new(move |origin, entry| { + // let size = entry.entry().record().content_len(); + // match origin { + // InsertOrigin::Local => { + // inc!(Metrics, new_entries_local); + // inc_by!(Metrics, new_entries_local_size, size); + // } + // InsertOrigin::Sync => { + // inc!(Metrics, new_entries_remote); + // inc_by!(Metrics, new_entries_remote_size, size); + // } + // } + // })); + + Ok(()) + } + + pub fn docs_create(&self, req: DocsCreateRequest) -> RpcResult { + let doc = self.store.new_replica(Namespace::new(&mut OsRng {}))?; + Ok(DocsCreateResponse { + id: doc.namespace(), + }) + } + + pub async fn doc_share(&self, req: DocShareRequest) -> RpcResult { + let replica = self.get_replica(&req.doc_id)?; + let key = match req.mode { + ShareMode::Read => *replica.namespace().as_bytes(), + ShareMode::Write => replica.secret_key(), + }; + let me = PeerSource { + peer_id: self.endpoint.peer_id(), + derp_region: self.endpoint.my_derp().await, + addrs: self + .endpoint + .local_endpoints() + .await? + .into_iter() + .map(|ep| ep.addr) + .collect(), + }; + Ok(DocShareResponse { key, me }) + } + + pub fn doc_import(&self, req: DocsImportRequest) -> anyhow::Result { + let doc = match NamespaceId::from_bytes(&req.key) { + Ok(id) => bail!("importing read-only replicas is not yet supported"), + Err(_err) => { + let namespace = Namespace::from_bytes(&req.key); + self.store.new_replica(namespace)?; + todo!() + } + }; + } + + pub fn doc_set(&self, req: DocSetRequest) -> RpcResult { + todo!() + } + + // PeerAdd(PeerAddRequest), + // PeerList(PeerListRequest), + // + // AuthorImport(AuthorImportRequest), + // AuthorShare(AuthorShareRequest), + // + // DocsList(DocsListRequest), + // DocsCreate(DocsCreateRequest), + // DocsImport(DocsImportRequest), + // + // DocSet(DocSetRequest), + // DocGet(DocGetRequest), + // DocList(DocListRequest), + // DocJoin(DocJoinRequest), + // DocShare(DocShareRequest), }