Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sync): properly drop rpc subscriptions, add doc status #1396

Merged
merged 2 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions iroh/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::rpc_protocol::{
DocShareRequest, DocStartSyncRequest, DocStopSyncRequest, DocSubscribeRequest, DocTicket,
ProviderService, ShareMode, StatsGetRequest,
};
use crate::sync::{LiveEvent, PeerSource};
use crate::sync::{LiveEvent, LiveStatus, PeerSource};

pub mod mem;
#[cfg(feature = "cli")]
Expand Down Expand Up @@ -219,7 +219,13 @@ where
.rpc
.server_streaming(DocSubscribeRequest { doc_id: self.id })
.await?;
Ok(stream.map_ok(|res| res.event).map_err(Into::into))
Ok(flatten(stream).map_ok(|res| res.event).map_err(Into::into))
}

/// Get status info for this document
pub async fn status(&self) -> anyhow::Result<LiveStatus> {
let res = self.rpc.rpc(DocInfoRequest { doc_id: self.id }).await??;
Ok(res.status)
}
}

Expand Down
11 changes: 7 additions & 4 deletions iroh/src/rpc_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use serde::{Deserialize, Serialize};

pub use iroh_bytes::{baomap::ValidateProgress, provider::ProvideProgress, util::RpcResult};

use crate::sync::{LiveEvent, PeerSource};
use crate::sync::{LiveEvent, LiveStatus, PeerSource};

/// A 32-byte key or token
pub type KeyBytes = [u8; 32];
Expand Down Expand Up @@ -343,7 +343,7 @@ impl Msg<ProviderService> for DocSubscribeRequest {
}

impl ServerStreamingMsg<ProviderService> for DocSubscribeRequest {
type Response = DocSubscribeResponse;
type Response = RpcResult<DocSubscribeResponse>;
}

/// Response to [`DocSubscribeRequest`]
Expand Down Expand Up @@ -473,7 +473,10 @@ impl RpcMsg<ProviderService> for DocInfoRequest {
/// Response to [`DocInfoRequest`]
// TODO: actually provide info
#[derive(Serialize, Deserialize, Debug)]
pub struct DocInfoResponse {}
pub struct DocInfoResponse {
/// Live sync status
pub status: LiveStatus,
}

/// Start to sync a doc with peers.
#[derive(Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -670,7 +673,7 @@ pub enum ProviderResponse {
DocShare(RpcResult<DocShareResponse>),
DocStartSync(RpcResult<DocStartSyncResponse>),
DocStopSync(RpcResult<DocStopSyncResponse>),
DocSubscribe(DocSubscribeResponse),
DocSubscribe(RpcResult<DocSubscribeResponse>),

BytesGet(RpcResult<BytesGetResponse>),

Expand Down
84 changes: 73 additions & 11 deletions iroh/src/sync/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,21 @@ enum SyncState {
Failed(anyhow::Error),
}

/// Sync status for a document
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct LiveStatus {
/// Whether this document is in the live sync
pub active: bool,
/// Number of event listeners registered
pub subscriptions: usize,
}

#[derive(derive_more::Debug)]
enum ToActor<S: store::Store> {
Status {
namespace: NamespaceId,
s: sync::oneshot::Sender<Option<LiveStatus>>,
},
StartSync {
replica: Replica<S::Instance>,
peers: Vec<PeerSource>,
Expand All @@ -136,9 +149,18 @@ enum ToActor<S: store::Store> {
},
}

/// Whether to keep a live event callback active.
#[derive(Debug)]
pub enum KeepCallback {
/// Keep active
Keep,
/// Drop this callback
Drop,
}

/// Callback used for tracking [`LiveEvent`]s.
pub type OnLiveEventCallback =
Box<dyn Fn(LiveEvent) -> BoxFuture<'static, ()> + Send + Sync + 'static>;
Box<dyn Fn(LiveEvent) -> BoxFuture<'static, KeepCallback> + Send + Sync + 'static>;

/// Events informing about actions of the live sync progres.
#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down Expand Up @@ -264,7 +286,7 @@ impl<S: store::Store> LiveSync<S> {
/// Subscribes `cb` to events on this `namespace`.
pub async fn subscribe<F>(&self, namespace: NamespaceId, cb: F) -> Result<RemovalToken>
where
F: Fn(LiveEvent) -> BoxFuture<'static, ()> + Send + Sync + 'static,
F: Fn(LiveEvent) -> BoxFuture<'static, KeepCallback> + Send + Sync + 'static,
{
let (s, r) = sync::oneshot::channel();
self.to_actor_tx
Expand Down Expand Up @@ -292,6 +314,16 @@ impl<S: store::Store> LiveSync<S> {
let token = r.await?;
Ok(token)
}

/// Get status for a document
pub async fn status(&self, namespace: NamespaceId) -> Result<Option<LiveStatus>> {
let (s, r) = sync::oneshot::channel();
self.to_actor_tx
.send(ToActor::<S>::Status { namespace, s })
.await?;
let status = r.await?;
Ok(status)
}
}

// Currently peers might double-sync in both directions.
Expand Down Expand Up @@ -320,7 +352,7 @@ struct Actor<S: store::Store, B: baomap::Store> {
}

/// Token needed to remove inserted callbacks.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Copy)]
pub struct RemovalToken(u64);

impl<S: store::Store, B: baomap::Store> Actor<S, B> {
Expand Down Expand Up @@ -367,13 +399,17 @@ impl<S: store::Store, B: baomap::Store> Actor<S, B> {
Some(ToActor::StopSync { namespace }) => self.stop_sync(&namespace).await?,
Some(ToActor::JoinPeers { namespace, peers }) => self.join_gossip_and_start_initial_sync(&namespace, peers).await?,
Some(ToActor::Subscribe { namespace, cb, s }) => {
let subscribe_result = self.subscribe(&namespace, cb).await;
s.send(subscribe_result).ok();
let result = self.subscribe(&namespace, cb).await;
s.send(result).ok();
},
Some(ToActor::Unsubscribe { namespace, token, s }) => {
let result = self.unsubscribe(&namespace, token).await;
s.send(result).ok();
},
Some(ToActor::Status { namespace , s }) => {
let result = self.status(&namespace).await;
s.send(result).ok();
},
}
}
// new gossip message
Expand All @@ -389,7 +425,6 @@ impl<S: store::Store, B: baomap::Store> Actor<S, B> {
}
}
Some((topic, peer, res)) = self.pending_syncs.next() => {
// let (topic, peer, res) = res.context("task sync_with_peer paniced")?;
self.on_sync_finished(topic, peer, res);

}
Expand All @@ -403,7 +438,7 @@ impl<S: store::Store, B: baomap::Store> Actor<S, B> {
}
Some(res) = self.pending_downloads.next() => {
if let Some((topic, hash)) = res {
if let Some(subs) = self.event_subscriptions.get(&topic) {
if let Some(subs) = self.event_subscriptions.get_mut(&topic) {
let event = LiveEvent::ContentReady { hash };
notify_all(subs, event).await;
}
Expand Down Expand Up @@ -451,6 +486,23 @@ impl<S: store::Store, B: baomap::Store> Actor<S, B> {
Ok(())
}

async fn status(&mut self, namespace: &NamespaceId) -> Option<LiveStatus> {
let topic = TopicId::from_bytes(*namespace.as_bytes());
if self.replicas.contains_key(&topic) {
let subscriptions = self
.event_subscriptions
.get(&topic)
.map(|map| map.len())
.unwrap_or_default();
Some(LiveStatus {
active: true,
subscriptions,
})
} else {
None
}
}

async fn subscribe(
&mut self,
namespace: &NamespaceId,
Expand All @@ -474,7 +526,8 @@ impl<S: store::Store, B: baomap::Store> Actor<S, B> {
async fn unsubscribe(&mut self, namespace: &NamespaceId, token: RemovalToken) -> bool {
let topic = TopicId::from_bytes(*namespace.as_bytes());
if let Some(subs) = self.event_subscriptions.get_mut(&topic) {
return subs.remove(&token.0).is_some();
let res = subs.remove(&token.0).is_some();
return res;
}

false
Expand Down Expand Up @@ -591,7 +644,7 @@ impl<S: store::Store, B: baomap::Store> Actor<S, B> {
signed_entry: SignedEntry,
) -> Result<()> {
let topic = TopicId::from_bytes(*signed_entry.entry().namespace().as_bytes());
let subs = self.event_subscriptions.get(&topic);
let subs = self.event_subscriptions.get_mut(&topic);
match origin {
InsertOrigin::Local => {
let entry = signed_entry.entry().clone();
Expand Down Expand Up @@ -643,6 +696,15 @@ impl<S: store::Store, B: baomap::Store> Actor<S, B> {
}
}

async fn notify_all(subs: &HashMap<u64, OnLiveEventCallback>, event: LiveEvent) {
futures::future::join_all(subs.values().map(|sub| sub(event.clone()))).await;
async fn notify_all(subs: &mut HashMap<u64, OnLiveEventCallback>, event: LiveEvent) {
let res = futures::future::join_all(
subs.iter()
.map(|(idx, sub)| sub(event.clone()).map(|res| (*idx, res))),
)
.await;
for (idx, res) in res {
if matches!(res, KeepCallback::Drop) {
subs.remove(&idx);
}
}
}
59 changes: 39 additions & 20 deletions iroh/src/sync/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@ use iroh_sync::{store::Store, sync::Namespace};
use itertools::Itertools;
use rand::rngs::OsRng;

use crate::rpc_protocol::{
AuthorCreateRequest, AuthorCreateResponse, AuthorListRequest, AuthorListResponse,
DocCreateRequest, DocCreateResponse, DocGetRequest, DocGetResponse, DocImportRequest,
DocImportResponse, DocInfoRequest, DocInfoResponse, DocListRequest, DocListResponse,
DocSetRequest, DocSetResponse, DocShareRequest, DocShareResponse, DocStartSyncRequest,
DocStartSyncResponse, DocStopSyncRequest, DocStopSyncResponse, DocSubscribeRequest,
DocSubscribeResponse, DocTicket, RpcResult, ShareMode,
use crate::{
rpc_protocol::{
AuthorCreateRequest, AuthorCreateResponse, AuthorListRequest, AuthorListResponse,
DocCreateRequest, DocCreateResponse, DocGetRequest, DocGetResponse, DocImportRequest,
DocImportResponse, DocInfoRequest, DocInfoResponse, DocListRequest, DocListResponse,
DocSetRequest, DocSetResponse, DocShareRequest, DocShareResponse, DocStartSyncRequest,
DocStartSyncResponse, DocStopSyncRequest, DocStopSyncResponse, DocSubscribeRequest,
DocSubscribeResponse, DocTicket, RpcResult, ShareMode,
},
sync::KeepCallback,
};

use super::{engine::SyncEngine, PeerSource};
use super::{engine::SyncEngine, LiveStatus, PeerSource};

/// Capacity for the flume channels to forward sync store iterators to async RPC streams.
const ITER_CHANNEL_CAP: usize = 64;
Expand Down Expand Up @@ -74,9 +77,13 @@ impl<S: Store> SyncEngine<S> {
}

pub async fn doc_info(&self, req: DocInfoRequest) -> RpcResult<DocInfoResponse> {
let replica = self.get_replica(&req.doc_id)?;
self.start_sync(replica.namespace(), vec![]).await?;
Ok(DocInfoResponse {})
let _replica = self.get_replica(&req.doc_id)?;
let status = self.live.status(req.doc_id).await?;
let status = status.unwrap_or(LiveStatus {
active: false,
subscriptions: 0,
});
Ok(DocInfoResponse { status })
}

pub async fn doc_share(&self, req: DocShareRequest) -> RpcResult<DocShareResponse> {
Expand All @@ -100,19 +107,31 @@ impl<S: Store> SyncEngine<S> {
pub async fn doc_subscribe(
&self,
req: DocSubscribeRequest,
) -> impl Stream<Item = DocSubscribeResponse> {
) -> impl Stream<Item = RpcResult<DocSubscribeResponse>> {
let (s, r) = flume::bounded(64);
self.live
.subscribe(req.doc_id, move |event| {
let res = self
.live
.subscribe(req.doc_id, {
let s = s.clone();
async move {
s.send_async(DocSubscribeResponse { event }).await.ok();
move |event| {
let s = s.clone();
async move {
// Send event over the channel, unsubscribe if the channel is closed.
match s.send_async(Ok(DocSubscribeResponse { event })).await {
Err(_err) => KeepCallback::Drop,
Ok(()) => KeepCallback::Keep,
}
}
.boxed()
}
.boxed()
})
.await
.unwrap(); // TODO: handle error

.await;
match res {
Err(err) => {
s.send_async(Err(err.into())).await.ok();
}
Ok(_token) => {}
};
r.into_stream()
}

Expand Down
29 changes: 29 additions & 0 deletions iroh/tests/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,35 @@ async fn sync_full_basic() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn sync_subscribe_stop() -> Result<()> {
setup_logging();
let rt = test_runtime();
let node = spawn_node(rt).await?;
let client = node.client();

let doc = client.create_doc().await?;
let author = client.create_author().await?;
doc.start_sync(vec![]).await?;

let status = doc.status().await?;
assert!(status.active);
assert_eq!(status.subscriptions, 0);

let sub = doc.subscribe().await?;
let status = doc.status().await?;
assert_eq!(status.subscriptions, 1);
drop(sub);

doc.set_bytes(author, b"x".to_vec(), b"x".to_vec()).await?;
let status = doc.status().await?;
assert_eq!(status.subscriptions, 0);

node.shutdown();

Ok(())
}

async fn assert_latest(doc: &Doc, key: &[u8], value: &[u8]) {
let content = get_latest(doc, key).await.unwrap();
assert_eq!(content, value.to_vec());
Expand Down
Loading