diff --git a/Cargo.lock b/Cargo.lock index 02e6a9747db..f9fb440204d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1050,6 +1050,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "either" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" + [[package]] name = "elliptic-curve" version = "0.13.5" @@ -1791,10 +1797,12 @@ dependencies = [ "iroh-metrics", "iroh-net", "iroh-sync", + "itertools", "multibase", "nix", "num_cpus", "once_cell", + "parking_lot", "portable-atomic", "postcard", "proptest", @@ -2020,6 +2028,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "itertools" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.9" diff --git a/iroh-gossip/src/net.rs b/iroh-gossip/src/net.rs index c4821e228d5..c351572a44f 100644 --- a/iroh-gossip/src/net.rs +++ b/iroh-gossip/src/net.rs @@ -22,7 +22,7 @@ use crate::proto::{self, TopicId}; pub mod util; /// ALPN protocol name -pub const GOSSIP_ALPN: &[u8] = b"n0/iroh-gossip/0"; +pub const GOSSIP_ALPN: &[u8] = b"/iroh-gossip/0"; /// Maximum message size is limited to 1024 bytes for now. pub const MAX_MESSAGE_SIZE: usize = 1024; diff --git a/iroh-sync/Cargo.toml b/iroh-sync/Cargo.toml index e0132aed13c..31db3a2dea3 100644 --- a/iroh-sync/Cargo.toml +++ b/iroh-sync/Cargo.toml @@ -35,4 +35,4 @@ tempfile = "3.4" [features] default = ["fs-store"] -fs-store = ["redb", "ouroboros"] \ No newline at end of file +fs-store = ["redb", "ouroboros"] diff --git a/iroh-sync/src/store.rs b/iroh-sync/src/store.rs index f2970903e81..67c372f4715 100644 --- a/iroh-sync/src/store.rs +++ b/iroh-sync/src/store.rs @@ -23,6 +23,8 @@ pub trait Store: std::fmt::Debug + Clone + Send + Sync + 'static { Self: 'a; fn get_replica(&self, namespace: &NamespaceId) -> Result>>; + // TODO: return iterator + fn list_replicas(&self) -> Result>; fn get_author(&self, author: &AuthorId) -> Result>; fn new_author(&self, rng: &mut R) -> Result; diff --git a/iroh-sync/src/store/fs.rs b/iroh-sync/src/store/fs.rs index d8c231e1603..5ebab86dc4b 100644 --- a/iroh-sync/src/store/fs.rs +++ b/iroh-sync/src/store/fs.rs @@ -110,6 +110,17 @@ impl super::Store for Store { Ok(Some(replica)) } + // TODO: return iterator + fn list_replicas(&self) -> Result> { + let read_tx = self.db.begin_read()?; + let namespace_table = read_tx.open_table(NAMESPACES_TABLE)?; + let namespaces = namespace_table + .iter()? + .filter_map(|entry| entry.ok()) + .map(|(_key, value)| Namespace::from_bytes(value.value()).id()); + Ok(namespaces.collect()) + } + fn get_author(&self, author_id: &AuthorId) -> Result> { let read_tx = self.db.begin_read()?; let author_table = read_tx.open_table(AUTHORS_TABLE)?; diff --git a/iroh-sync/src/store/memory.rs b/iroh-sync/src/store/memory.rs index 4351ad592b0..4337d2ffcda 100644 --- a/iroh-sync/src/store/memory.rs +++ b/iroh-sync/src/store/memory.rs @@ -37,6 +37,10 @@ impl super::Store for Store { Ok(replicas.get(namespace).cloned()) } + fn list_replicas(&self) -> Result> { + Ok(self.replicas.read().keys().cloned().collect()) + } + fn get_author(&self, author: &AuthorId) -> Result> { let authors = &*self.authors.read(); Ok(authors.get(author).cloned()) diff --git a/iroh-sync/src/sync.rs b/iroh-sync/src/sync.rs index b3b815965f4..7a045c14147 100644 --- a/iroh-sync/src/sync.rs +++ b/iroh-sync/src/sync.rs @@ -205,7 +205,7 @@ pub struct NamespaceId(VerifyingKey); impl Display for NamespaceId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "NamespaceId({})", hex::encode(self.0.as_bytes())) + write!(f, "{}", hex::encode(self.0.as_bytes())) } } @@ -364,7 +364,7 @@ impl> Replica { self.inner.read().namespace.id() } - pub fn secret_key(&self) -> [u8; 32]{ + pub fn secret_key(&self) -> [u8; 32] { self.inner.read().namespace.to_bytes() } } diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 08fa8c2801c..7b2f91a6a8c 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -31,6 +31,7 @@ portable-atomic = "1" iroh-sync = { path = "../iroh-sync" } iroh-gossip = { path = "../iroh-gossip" } once_cell = "1.18.0" +parking_lot = "0.12.1" postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] } quic-rpc = { version = "0.6", default-features = false, features = ["flume-transport"] } quinn = "0.10" @@ -61,6 +62,7 @@ ed25519-dalek = { version = "=2.0.0-rc.3", features = ["serde", "rand_core"], op shell-words = { version = "1.1.0", optional = true } shellexpand = { version = "3.1.0", optional = true } rustyline = { version = "12.0.0", optional = true } +itertools = "0.11.0" [features] default = ["cli", "metrics", "sync"] diff --git a/iroh/src/commands/sync.rs b/iroh/src/commands/sync.rs index 033f69cea72..3747b99dbac 100644 --- a/iroh/src/commands/sync.rs +++ b/iroh/src/commands/sync.rs @@ -1,14 +1,16 @@ use anyhow::anyhow; use clap::Parser; use futures::StreamExt; +use indicatif::HumanBytes; use iroh::{ rpc_protocol::{ - AuthorCreateRequest, AuthorListRequest, DocGetRequest, DocJoinRequest, DocSetRequest, - DocShareRequest, DocsImportRequest, ShareMode, + AuthorCreateRequest, AuthorListRequest, DocGetRequest, DocGetResponse, DocJoinRequest, + DocListRequest, DocListResponse, DocSetRequest, DocShareRequest, DocShareResponse, + DocsCreateRequest, DocsImportRequest, DocsListRequest, ShareMode, }, sync::PeerSource, }; -use iroh_sync::sync::{NamespaceId, AuthorId}; +use iroh_sync::sync::{AuthorId, NamespaceId, SignedEntry}; use super::RpcClient; @@ -18,6 +20,10 @@ pub enum Commands { #[clap(subcommand)] command: Author, }, + Docs { + #[clap(subcommand)] + command: Docs, + }, Doc { id: NamespaceId, #[clap(subcommand)] @@ -29,6 +35,7 @@ impl Commands { pub async fn run(self, client: RpcClient) -> anyhow::Result<()> { match self { Commands::Author { command } => command.run(client).await, + Commands::Docs { command } => command.run(client).await, Commands::Doc { command, id } => command.run(client, id).await, } } @@ -59,12 +66,44 @@ impl Author { } #[derive(Debug, Clone, Parser)] -pub enum Doc { - Join { - peers: Vec, - }, +pub enum Docs { + List, + Create, Import { key: String, + #[clap(short, long)] + peers: Vec, + }, +} + +impl Docs { + pub async fn run(self, client: RpcClient) -> anyhow::Result<()> { + match self { + Docs::Create => { + let res = client.rpc(DocsCreateRequest {}).await??; + println!("{}", res.id); + } + Docs::Import { key, peers } => { + let key = hex::decode(key)? + .try_into() + .map_err(|_| anyhow!("invalid length"))?; + let res = client.rpc(DocsImportRequest { key, peers }).await??; + println!("{:?}", res); + } + Docs::List => { + let mut iter = client.server_streaming(DocsListRequest {}).await?; + while let Some(doc) = iter.next().await { + println!("{}", doc??.id) + } + } + } + Ok(()) + } +} + +#[derive(Debug, Clone, Parser)] +pub enum Doc { + Join { peers: Vec, }, Share { @@ -80,6 +119,16 @@ pub enum Doc { #[clap(short, long)] prefix: bool, + #[clap(short, long)] + author: Option, + /// Include old entries for keys. + #[clap(short, long)] + old: bool, + }, + List { + /// Include old entries for keys. + #[clap(short, long)] + old: bool, }, } @@ -91,40 +140,75 @@ impl Doc { let res = client.rpc(DocJoinRequest { doc_id, peers }).await??; println!("{:?}", res); } - Doc::Import { key, peers } => { - let key = hex::decode(key)? - .try_into() - .map_err(|_| anyhow!("invalid length"))?; - let res = client.rpc(DocsImportRequest { key, peers }).await??; - println!("{:?}", res); - } Doc::Share { mode } => { - let res = client.rpc(DocShareRequest { doc_id, mode }).await??; - println!("{:?}", res); + let DocShareResponse { key, me } = + client.rpc(DocShareRequest { doc_id, mode }).await??; + println!("key: {}", hex::encode(key)); + println!("me: {}", me); } Doc::Set { author, key, value } => { let res = client .rpc(DocSetRequest { - author, + author_id: author, key: key.as_bytes().to_vec(), value: value.as_bytes().to_vec(), doc_id, }) .await??; - println!("{:?}", res); + println!("{}", fmt_entry(&res.entry)); } - Doc::Get { key, prefix } => { - let res = client - .rpc(DocGetRequest { + Doc::Get { + key, + prefix, + author, + old: all, + } => { + let mut stream = client + .server_streaming(DocGetRequest { key: key.as_bytes().to_vec(), doc_id, - author: None, + author_id: author, prefix, + // todo: support option + latest: !all, }) - .await??; - println!("{:?}", res); + .await?; + while let Some(res) = stream.next().await { + let DocGetResponse { entry } = res??; + println!("{}", fmt_entry(&entry)); + } + } + Doc::List { old: all } => { + let mut stream = client + // TODO: fields + .server_streaming(DocListRequest { + doc_id, + latest: !all, // author: None, + // prefix: None, + }) + .await?; + while let Some(res) = stream.next().await { + let DocListResponse { entry } = res??; + println!("{}", fmt_entry(&entry)); + } } } Ok(()) } } + +fn fmt_entry(entry: &SignedEntry) -> String { + let id = entry.entry().id(); + let key = std::str::from_utf8(id.key()).unwrap_or(""); + let author = fmt_hash(id.author().as_bytes()); + let hash = entry.entry().record().content_hash(); + let hash = fmt_hash(hash.as_bytes()); + let len = HumanBytes(entry.entry().record().content_len()); + format!("@{author}: {key} = {hash} ({len})",) +} + +fn fmt_hash(hash: impl AsRef<[u8]>) -> String { + let mut text = data_encoding::BASE32_NOPAD.encode(&hash.as_ref()[..5]); + text.make_ascii_lowercase(); + format!("{}…", &text) +} diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 920aa2b1282..6d7ed42c560 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -14,7 +14,7 @@ use std::sync::Arc; use std::task::Poll; use std::time::Duration; -use anyhow::{Context, Result}; +use anyhow::{bail, Context, Result}; use bytes::Bytes; use futures::future::{BoxFuture, Shared}; use futures::{FutureExt, Stream, StreamExt, TryFutureExt}; @@ -30,6 +30,7 @@ use iroh_bytes::{ util::Hash, }; use iroh_gossip::net::{GossipHandle, GOSSIP_ALPN}; +use iroh_net::magic_endpoint::get_alpn; use iroh_net::{ config::Endpoint, derp::DerpMap, @@ -49,13 +50,12 @@ use tracing::{debug, trace}; use crate::dial::Ticket; use crate::rpc_protocol::{ - AddrsRequest, AddrsResponse, AuthorCreateRequest, AuthorCreateResponse, AuthorListRequest, - AuthorListResponse, IdRequest, IdResponse, ListBlobsRequest, ListBlobsResponse, + AddrsRequest, AddrsResponse, IdRequest, IdResponse, ListBlobsRequest, ListBlobsResponse, ListCollectionsRequest, ListCollectionsResponse, ProvideRequest, ProviderRequest, - ProviderResponse, ProviderService, RpcResult, ShutdownRequest, ValidateRequest, VersionRequest, + ProviderResponse, ProviderService, ShutdownRequest, ValidateRequest, VersionRequest, VersionResponse, WatchRequest, WatchResponse, }; -use crate::sync::{node::SyncNode, SYNC_ALPN, BlobStore}; +use crate::sync::{node::SyncNode, BlobStore, SYNC_ALPN}; const MAX_CONNECTIONS: u32 = 1024; const MAX_STREAMS: u64 = 10; @@ -318,7 +318,13 @@ where // spawn the sync engine let blobs = BlobStore::new(rt.clone(), self.docs.1, endpoint.clone()).await?; - let sync = SyncNode::spawn(rt.clone(), self.docs.0, endpoint.clone(), gossip.clone(), blobs); + let sync = SyncNode::spawn( + rt.clone(), + self.docs.0, + endpoint.clone(), + gossip.clone(), + blobs, + ); let (internal_rpc, controller) = quic_rpc::transport::flume::connection(1); let rt2 = rt.clone(); @@ -433,7 +439,6 @@ where }, // handle incoming p2p connections Some(mut connecting) = server.accept() => { - let alpn = match get_alpn(&mut connecting).await { Ok(alpn) => alpn, Err(err) => { @@ -441,19 +446,8 @@ where continue; } }; - if alpn.as_bytes() == iroh_bytes::protocol::ALPN.as_ref() { - let db = handler.inner.db.clone(); - let custom_get_handler = custom_get_handler.clone(); - let auth_handler = auth_handler.clone(); - let collection_parser = collection_parser.clone(); - let rt2 = rt.clone(); - let callbacks = callbacks.clone(); - rt.main().spawn(iroh_bytes::provider::handle_connection(connecting, db, callbacks, collection_parser, custom_get_handler, auth_handler, rt2)); - } else { - tracing::error!("unknown protocol: {}", alpn); - continue; - } - } + rt.main().spawn(handle_connection(connecting, alpn, handler.inner.clone(), gossip.clone(), collection_parser.clone(), custom_get_handler.clone(), auth_handler.clone())); + }, // Handle new callbacks Some(cb) = cb_receiver.recv() => { callbacks.push(cb).await; @@ -474,15 +468,33 @@ where } } -async fn get_alpn(connecting: &mut quinn::Connecting) -> Result { - let data = connecting.handshake_data().await?; - match data.downcast::() { - Ok(data) => match data.protocol { - Some(protocol) => std::string::String::from_utf8(protocol).map_err(Into::into), - None => anyhow::bail!("no ALPN protocol available"), - }, - Err(_) => anyhow::bail!("unknown handshake type"), +async fn handle_connection( + connecting: quinn::Connecting, + alpn: String, + node: Arc>, + gossip: GossipHandle, + collection_parser: C, + custom_get_handler: Arc, + auth_handler: Arc, +) -> Result<()> { + match alpn.as_bytes() { + GOSSIP_ALPN => gossip.handle_connection(connecting.await?).await?, + SYNC_ALPN => crate::sync::handle_connection(connecting, node.sync.store.clone()).await?, + alpn if alpn == iroh_bytes::protocol::ALPN => { + iroh_bytes::provider::handle_connection( + connecting, + node.db.clone(), + node.callbacks.clone(), + collection_parser, + custom_get_handler, + auth_handler, + node.rt.clone(), + ) + .await + } + _ => bail!("ignoring connection: unsupported ALPN protocol"), } + Ok(()) } type EventCallback = Box BoxFuture<'static, ()> + 'static + Sync + Send>; @@ -544,7 +556,7 @@ struct NodeInner { #[allow(dead_code)] callbacks: Callbacks, rt: runtime::Handle, - sync: SyncNode, + pub(crate) sync: SyncNode, } /// Events emitted by the [`Node`] informing about the current status. @@ -846,19 +858,6 @@ impl RpcHandler RpcResult { - let res = self.inner.sync.author_create(req)?; - Ok(res) - } - - /// todo - pub fn author_list( - self, - req: AuthorListRequest, - ) -> impl Stream> { - self.inner.sync.author_list(req) - } } fn handle_rpc_request< @@ -900,25 +899,67 @@ fn handle_rpc_request< PeerAdd(_msg) => todo!(), PeerList(_msg) => todo!(), AuthorList(msg) => { - chan.server_streaming(msg, handler, RpcHandler::author_list) - .await + chan.server_streaming(msg, handler, |handler, req| { + handler.inner.sync.author_list(req) + }) + .await } AuthorCreate(msg) => { chan.rpc(msg, handler, |handler, req| async move { - handler.author_create(req) + handler.inner.sync.author_create(req) }) .await } - AuthorImport(_msg) => todo!(), + AuthorImport(_msg) => { + todo!() + } AuthorShare(_msg) => todo!(), - DocsList(_msg) => todo!(), - DocsCreate(_msg) => todo!(), - DocsImport(_msg) => todo!(), - DocSet(_msg) => todo!(), - DocGet(_msg) => todo!(), - DocList(_msg) => todo!(), - DocJoin(_msg) => todo!(), - DocShare(_msg) => todo!(), + DocsList(msg) => { + chan.server_streaming(msg, handler, |handler, req| { + handler.inner.sync.docs_list(req) + }) + .await + } + DocsCreate(msg) => { + chan.rpc(msg, handler, |handler, req| async move { + handler.inner.sync.docs_create(req) + }) + .await + } + DocsImport(msg) => { + chan.rpc(msg, handler, |handler, req| async move { + handler.inner.sync.docs_import(req).await + }) + .await + } + DocSet(msg) => { + chan.rpc(msg, handler, |handler, req| async move { + handler.inner.sync.doc_set(req).await + }) + .await + } + DocGet(msg) => { + chan.server_streaming(msg, handler, |handler, req| handler.inner.sync.doc_get(req)) + .await + } + DocList(msg) => { + chan.server_streaming(msg, handler, |handler, req| { + handler.inner.sync.doc_list(req) + }) + .await + } + DocJoin(msg) => { + chan.rpc(msg, handler, |handler, req| async move { + handler.inner.sync.doc_join(req).await + }) + .await + } + DocShare(msg) => { + chan.rpc(msg, handler, |handler, req| async move { + handler.inner.sync.doc_share(req).await + }) + .await + } } }); } diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index 87585e0ab1c..9feea4b1f48 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -15,7 +15,7 @@ use derive_more::{From, TryInto}; use iroh_bytes::Hash; use iroh_net::tls::PeerId; -use iroh_sync::sync::{Author, AuthorId, NamespaceId, SignedEntry}; +use iroh_sync::sync::{AuthorId, NamespaceId, SignedEntry}; use quic_rpc::{ message::{Msg, RpcMsg, ServerStreaming, ServerStreamingMsg}, Service, @@ -328,7 +328,7 @@ impl ServerStreamingMsg for DocsListRequest { #[derive(Serialize, Deserialize, Debug)] pub struct DocsListResponse { pub id: NamespaceId, - pub writable: bool, + // pub writable: bool, } /// todo @@ -400,7 +400,7 @@ pub struct DocJoinResponse {} #[derive(Serialize, Deserialize, Debug)] pub struct DocSetRequest { pub doc_id: NamespaceId, - pub author: AuthorId, + pub author_id: AuthorId, pub key: Vec, // todo: different forms to supply value pub value: Vec, @@ -420,21 +420,19 @@ pub struct DocSetResponse { #[derive(Serialize, Deserialize, Debug)] pub struct DocGetRequest { pub doc_id: NamespaceId, - pub author: Option, + pub author_id: Option, pub key: Vec, pub prefix: bool, + pub latest: bool, +} + +impl Msg for DocGetRequest { + type Pattern = ServerStreaming; } -impl RpcMsg for DocGetRequest { +impl ServerStreamingMsg for DocGetRequest { type Response = RpcResult; } -// impl Msg for DocGetRequest { -// type Pattern = ServerStreaming; -// } -// -// impl ServerStreamingMsg for DocGetRequest { -// type Response = RpcResult; -// } /// todo #[derive(Serialize, Deserialize, Debug)] @@ -446,9 +444,9 @@ pub struct DocGetResponse { #[derive(Serialize, Deserialize, Debug)] pub struct DocListRequest { pub doc_id: NamespaceId, - pub author: Option, - pub prefix: Option, pub latest: bool, + // pub author: Option, + // pub prefix: Option, } impl Msg for DocListRequest { diff --git a/iroh/src/sync/live.rs b/iroh/src/sync/live.rs index 65d784e5a36..5a83d0d8dcc 100644 --- a/iroh/src/sync/live.rs +++ b/iroh/src/sync/live.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, fmt, net::SocketAddr, sync::Arc, str::FromStr}; +use std::{collections::HashMap, fmt, net::SocketAddr, str::FromStr, sync::Arc}; use crate::sync::connect_and_sync; use anyhow::{anyhow, Result}; diff --git a/iroh/src/sync/node.rs b/iroh/src/sync/node.rs index dc02a981a66..20b78931a50 100644 --- a/iroh/src/sync/node.rs +++ b/iroh/src/sync/node.rs @@ -1,37 +1,35 @@ -// use std::collections::HashMap; +use std::{collections::HashSet, sync::Arc}; -use std::{ - collections::{HashMap, HashSet}, - sync::{Arc, RwLock}, -}; - -use anyhow::{anyhow, bail}; +use anyhow::anyhow; use futures::Stream; use genawaiter::sync::Gen; use iroh_bytes::util::runtime::Handle; use iroh_net::{tls::PeerId, MagicEndpoint}; use iroh_sync::{ store::Store, - sync::{InsertOrigin, Namespace, NamespaceId, Replica}, + sync::{Author, AuthorId, InsertOrigin, Namespace, NamespaceId, Replica, SignedEntry}, }; +use parking_lot::Mutex; use rand::rngs::OsRng; use crate::rpc_protocol::{ AuthorCreateRequest, AuthorCreateResponse, AuthorListRequest, AuthorListResponse, - DocSetRequest, DocSetResponse, DocShareRequest, DocShareResponse, DocsCreateRequest, - DocsCreateResponse, DocsImportRequest, DocsImportResponse, RpcResult, ShareMode, + DocGetRequest, DocGetResponse, DocJoinRequest, DocJoinResponse, DocListRequest, + DocListResponse, DocSetRequest, DocSetResponse, DocShareRequest, DocShareResponse, + DocsCreateRequest, DocsCreateResponse, DocsImportRequest, DocsImportResponse, DocsListRequest, + DocsListResponse, RpcResult, ShareMode, }; -use super::{BlobStore, Doc, DownloadMode, LiveSync, PeerSource}; +use super::{BlobStore, DownloadMode, LiveSync, PeerSource}; /// Document synchronization engine #[derive(Debug, Clone)] pub struct SyncNode { endpoint: MagicEndpoint, - store: S, - live: LiveSync, + pub(crate) store: S, + pub(crate) live: LiveSync, blobs: BlobStore, - // open_docs: Arc> + open_docs: Arc>>, } impl SyncNode { @@ -48,12 +46,13 @@ impl SyncNode { store, live, blobs, - endpoint, // open_docs: Default::default(), + endpoint, + open_docs: Default::default(), } } /// todo - pub fn author_create(&self, _req: AuthorCreateRequest) -> anyhow::Result { + pub fn author_create(&self, _req: AuthorCreateRequest) -> RpcResult { // TODO: pass rng let author = self.store.new_author(&mut rand::rngs::OsRng {})?; Ok(AuthorCreateResponse { @@ -71,16 +70,14 @@ impl SyncNode { match store.list_authors() { Ok(authors) => { for author in authors { - let author = AuthorListResponse { + let res = AuthorListResponse { author_id: author.id(), writable: true, }; - co.yield_(Ok(author)).await; + co.yield_(Ok(res)).await; } } - Err(err) => { - co.yield_(Err(err.into())).await; - } + Err(err) => co.yield_(Err(err.into())).await, } }) } @@ -91,9 +88,24 @@ impl SyncNode { .ok_or_else(|| anyhow!("doc not found")) } + fn get_author(&self, id: &AuthorId) -> anyhow::Result { + self.store + .get_author(id)? + .ok_or_else(|| anyhow!("author not found")) + } + pub async fn doc_open(&self, id: &NamespaceId, peers: Vec) -> anyhow::Result<()> { let replica = self.get_replica(id)?; + { + let mut open_docs = self.open_docs.lock(); + if open_docs.contains(id) { + return Ok(()); + } else { + open_docs.insert(id.clone()); + } + } + let download_mode = DownloadMode::Always; // If download mode is set to always download: @@ -131,17 +143,38 @@ impl SyncNode { Ok(()) } - pub fn docs_create(&self, req: DocsCreateRequest) -> RpcResult { + pub fn docs_create(&self, _req: DocsCreateRequest) -> RpcResult { let doc = self.store.new_replica(Namespace::new(&mut OsRng {}))?; Ok(DocsCreateResponse { id: doc.namespace(), }) } + pub fn docs_list( + &self, + _req: DocsListRequest, + ) -> impl Stream> { + let store = self.store.clone(); + Gen::new(|co| async move { + match store.list_replicas() { + Ok(namespaces) => { + for id in namespaces { + co.yield_(Ok(DocsListResponse { id })).await; + } + } + Err(err) => co.yield_(Err(err.into())).await, + } + }) + } + pub async fn doc_share(&self, req: DocShareRequest) -> RpcResult { let replica = self.get_replica(&req.doc_id)?; + self.doc_open(&replica.namespace(), vec![]).await?; let key = match req.mode { - ShareMode::Read => *replica.namespace().as_bytes(), + // ShareMode::Read => *replica.namespace().as_bytes(), + ShareMode::Read => { + return Err(anyhow!("creating read-only shares is not yet supported").into()) + } ShareMode::Write => replica.secret_key(), }; let me = PeerSource { @@ -158,34 +191,161 @@ impl SyncNode { Ok(DocShareResponse { key, me }) } - pub fn doc_import(&self, req: DocsImportRequest) -> anyhow::Result { - let doc = match NamespaceId::from_bytes(&req.key) { - Ok(id) => bail!("importing read-only replicas is not yet supported"), - Err(_err) => { - let namespace = Namespace::from_bytes(&req.key); - self.store.new_replica(namespace)?; - todo!() + pub async fn docs_import(&self, req: DocsImportRequest) -> RpcResult { + let DocsImportRequest { key, peers } = req; + // let namespace = match NamespaceId::from_bytes(&key) { + // Ok(id) => { + // return Err(anyhow!("importing read-only replicas is not yet supported").into()) + // } + // Err(_err) => Namespace::from_bytes(&key), + // }; + let namespace = Namespace::from_bytes(&key); + let id = namespace.id(); + let replica = self.store.new_replica(namespace)?; + self.doc_open(&id, peers.clone()).await?; + self.live.add(replica, peers).await?; + Ok(DocsImportResponse { doc_id: id }) + } + + pub async fn doc_join(&self, req: DocJoinRequest) -> RpcResult { + let DocJoinRequest { doc_id, peers } = req; + let replica = self.get_replica(&doc_id)?; + self.doc_open(&doc_id, vec![]).await?; + self.live.add(replica, peers).await?; + Ok(DocJoinResponse {}) + } + + pub async fn doc_set(&self, req: DocSetRequest) -> RpcResult { + let DocSetRequest { + doc_id, + author_id, + key, + value, + } = req; + let replica = self.get_replica(&doc_id)?; + let author = self.get_author(&author_id)?; + let (hash, len) = self.blobs.put_bytes(value.into()).await?; + replica + .insert(&key, &author, hash, len) + .map_err(|err| anyhow!(err))?; + let entry = self + .store + .get_latest_by_key_and_author(replica.namespace(), author.id(), &key)? + .expect("inserted successfully"); + Ok(DocSetResponse { entry }) + } + + pub fn doc_get(&self, req: DocGetRequest) -> impl Stream> { + let namespace = req.doc_id; + let latest = req.latest; + let filter = ListFilter::from(req); + let ite = DocIter::new(&self.store, namespace, filter, latest); + let ite = inline_error(ite); + let ite = ite.map(|entry| entry.map(|entry| DocGetResponse { entry })); + // TODO: avoid collect? but the iterator is not Send and has a lifetime on the store. + let entries = ite.collect::>(); + futures::stream::iter(entries.into_iter()) + } + + pub fn doc_list(&self, req: DocListRequest) -> impl Stream> { + let namespace = req.doc_id; + let latest = req.latest; + let filter = ListFilter::from(req); + let ite = DocIter::new(&self.store, namespace, filter, latest); + let ite = inline_error(ite); + let ite = ite.map(|entry| entry.map(|entry| DocListResponse { entry })); + // TODO: avoid collect? but the iterator is not Send and has a lifetime on the store. + let entries = ite.collect::>(); + futures::stream::iter(entries.into_iter()) + } +} + +// TODO: Move to iroh-sync +#[derive(Debug)] +enum ListFilter { + All, + Prefix(Vec), + Key(Vec), + KeyAndAuthor(Vec, AuthorId), +} +impl From for ListFilter { + fn from(_req: DocListRequest) -> Self { + ListFilter::All + } +} +impl From for ListFilter { + fn from(req: DocGetRequest) -> Self { + match (req.prefix, req.author_id) { + (true, None) => ListFilter::Prefix(req.key), + (false, None) => ListFilter::Key(req.key), + (false, Some(author)) => ListFilter::KeyAndAuthor(req.key, author), + // TODO: support get_all|latest_by_prefix_and_author + (true, Some(_author)) => { + unimplemented!("get by prefix and author is not yet implemented") } - }; + } } +} + +// TODO: Move to iroh-sync +enum DocIter<'s, S: Store> { + All(S::GetAllIter<'s>), + Latest(S::GetLatestIter<'s>), + Single(std::option::IntoIter>), +} - pub fn doc_set(&self, req: DocSetRequest) -> RpcResult { - todo!() - } - - // PeerAdd(PeerAddRequest), - // PeerList(PeerListRequest), - // - // AuthorImport(AuthorImportRequest), - // AuthorShare(AuthorShareRequest), - // - // DocsList(DocsListRequest), - // DocsCreate(DocsCreateRequest), - // DocsImport(DocsImportRequest), - // - // DocSet(DocSetRequest), - // DocGet(DocGetRequest), - // DocList(DocListRequest), - // DocJoin(DocJoinRequest), - // DocShare(DocShareRequest), +impl<'s, S: Store> Iterator for DocIter<'s, S> { + type Item = anyhow::Result; + + fn next(&mut self) -> Option { + match self { + DocIter::All(iter) => iter.next().map(|x| x.map(|(_id, entry)| entry)), + DocIter::Latest(iter) => iter.next().map(|x| x.map(|(_id, entry)| entry)), + DocIter::Single(iter) => iter.next(), + } + } +} + +impl<'s, S: Store> DocIter<'s, S> { + pub fn new( + store: &'s S, + namespace: NamespaceId, + filter: ListFilter, + latest: bool, + ) -> anyhow::Result { + Ok(match latest { + false => match filter { + ListFilter::All => Self::All(store.get_all(namespace)?), + ListFilter::Prefix(prefix) => { + Self::All(store.get_all_by_prefix(namespace, &prefix)?) + } + ListFilter::Key(key) => Self::All(store.get_all_by_key(namespace, key)?), + ListFilter::KeyAndAuthor(key, author) => { + Self::All(store.get_all_by_key_and_author(namespace, author, key)?) + } + }, + true => match filter { + ListFilter::All => Self::Latest(store.get_latest(namespace)?), + ListFilter::Prefix(prefix) => { + Self::Latest(store.get_latest_by_prefix(namespace, &prefix)?) + } + ListFilter::Key(key) => Self::Latest(store.get_latest_by_key(namespace, key)?), + ListFilter::KeyAndAuthor(key, author) => Self::Single( + store + .get_latest_by_key_and_author(namespace, author, key)? + .map(|entry| Ok(entry)) + .into_iter(), + ), + }, + }) + } +} + +fn inline_error( + ite: anyhow::Result>>, +) -> impl Iterator> { + match ite { + Ok(ite) => itertools::Either::Left(ite.map(|item| item.map_err(|err| err.into()))), + Err(err) => itertools::Either::Right(Some(Err(err.into())).into_iter()), + } }