From a92764444525f846abf8bf5cbb247e3495a0f912 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 24 Nov 2022 16:58:38 +0000 Subject: [PATCH 1/5] rpc: Extend `RpcClientT` to return the subscription ID Signed-off-by: Alexandru Vasile --- subxt/src/rpc/mod.rs | 1 + subxt/src/rpc/rpc_client_t.rs | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/subxt/src/rpc/mod.rs b/subxt/src/rpc/mod.rs index ad08b55d59..af77eff8fd 100644 --- a/subxt/src/rpc/mod.rs +++ b/subxt/src/rpc/mod.rs @@ -67,6 +67,7 @@ pub use rpc_client_t::{ RpcClientT, RpcFuture, RpcSubscription, + RpcSubscriptionId, }; pub use rpc_client::{ diff --git a/subxt/src/rpc/rpc_client_t.rs b/subxt/src/rpc/rpc_client_t.rs index d32924ffbe..c8e4f687f3 100644 --- a/subxt/src/rpc/rpc_client_t.rs +++ b/subxt/src/rpc/rpc_client_t.rs @@ -52,7 +52,7 @@ pub trait RpcClientT: Send + Sync + 'static { sub: &'a str, params: Option>, unsub: &'a str, - ) -> RpcFuture<'a, RpcSubscription>; + ) -> RpcFuture<'a, (RpcSubscription, Option)>; } /// A boxed future that is returned from the [`RpcClientT`] methods. @@ -62,3 +62,6 @@ pub type RpcFuture<'a, T> = /// The inner subscription stream returned from our [`RpcClientT`]'s `subscription` method. pub type RpcSubscription = Pin, RpcError>> + Send + 'static>>; + +/// The ID associated with the [`RpcClientT`]'s `subscription`. +pub type RpcSubscriptionId = String; From ec8486eb3f6b1ceb053230c9fcd36bea758613b4 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 24 Nov 2022 16:59:59 +0000 Subject: [PATCH 2/5] rpc: Return `RpcSubscriptionId` for jsonrpsee clients Signed-off-by: Alexandru Vasile --- subxt/src/rpc/jsonrpsee_impl.rs | 36 ++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/subxt/src/rpc/jsonrpsee_impl.rs b/subxt/src/rpc/jsonrpsee_impl.rs index 09dbfe663b..1e22aba22b 100644 --- a/subxt/src/rpc/jsonrpsee_impl.rs +++ b/subxt/src/rpc/jsonrpsee_impl.rs @@ -6,20 +6,25 @@ use super::{ RpcClientT, RpcFuture, RpcSubscription, + RpcSubscriptionId, }; use crate::error::RpcError; use futures::stream::{ StreamExt, TryStreamExt, }; -use jsonrpsee::core::{ - client::{ - Client, - ClientT, - SubscriptionClientT, +use jsonrpsee::{ + core::{ + client::{ + Client, + ClientT, + SubscriptionClientT, + SubscriptionKind, + }, + traits::ToRpcParams, + Error as JsonRpseeError, }, - traits::ToRpcParams, - Error as JsonRpseeError, + types::SubscriptionId, }; use serde_json::value::RawValue; @@ -50,7 +55,7 @@ impl RpcClientT for Client { sub: &'a str, params: Option>, unsub: &'a str, - ) -> RpcFuture<'a, RpcSubscription> { + ) -> RpcFuture<'a, (RpcSubscription, Option)> { Box::pin(async move { let sub = SubscriptionClientT::subscribe::, _>( self, @@ -59,10 +64,17 @@ impl RpcClientT for Client { unsub, ) .await - .map_err(|e| RpcError::ClientError(Box::new(e)))? - .map_err(|e| RpcError::ClientError(Box::new(e))) - .boxed(); - Ok(sub) + .map_err(|e| RpcError::ClientError(Box::new(e)))?; + + let sub_id = match sub.kind() { + SubscriptionKind::Subscription(SubscriptionId::Str(id)) => { + Some(id.clone().into_owned()) + } + _ => None, + }; + + let sub = sub.map_err(|e| RpcError::ClientError(Box::new(e))).boxed(); + Ok((sub, sub_id)) }) } } From 8db0e98f0261c0071d2b069990851a729a49fc44 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 24 Nov 2022 17:00:48 +0000 Subject: [PATCH 3/5] rpc: Expose subscription ID via subxt subscription Signed-off-by: Alexandru Vasile --- subxt/src/rpc/rpc_client.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/subxt/src/rpc/rpc_client.rs b/subxt/src/rpc/rpc_client.rs index 56592c0363..00d801cf3c 100644 --- a/subxt/src/rpc/rpc_client.rs +++ b/subxt/src/rpc/rpc_client.rs @@ -5,6 +5,7 @@ use super::{ RpcClientT, RpcSubscription, + RpcSubscriptionId, }; use crate::error::Error; use futures::{ @@ -60,8 +61,8 @@ impl RpcClient { params: RpcParams, unsub: &str, ) -> Result, Error> { - let sub = self.0.subscribe_raw(sub, params.build(), unsub).await?; - Ok(Subscription::new(sub)) + let (sub, sub_id) = self.0.subscribe_raw(sub, params.build(), unsub).await?; + Ok(Subscription::new(sub, sub_id)) } } @@ -166,6 +167,7 @@ impl RpcParams { /// [`StreamExt`] extension trait. pub struct Subscription { inner: RpcSubscription, + sub_id: Option, _marker: std::marker::PhantomData, } @@ -179,12 +181,18 @@ impl std::fmt::Debug for Subscription { } impl Subscription { - fn new(inner: RpcSubscription) -> Self { + fn new(inner: RpcSubscription, sub_id: Option) -> Self { Self { inner, + sub_id, _marker: std::marker::PhantomData, } } + + /// Obtain the ID associated with this subscription. + pub fn subscription_id(&self) -> Option<&RpcSubscriptionId> { + self.sub_id.as_ref() + } } impl Subscription { From bd0ab67059ab62c332381c8df89ea69d6f3d9d7d Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 24 Nov 2022 17:01:47 +0000 Subject: [PATCH 4/5] examples: Adjust example to return subscription ID Signed-off-by: Alexandru Vasile --- examples/examples/custom_rpc_client.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/examples/examples/custom_rpc_client.rs b/examples/examples/custom_rpc_client.rs index 52e0cd8227..a4be9a6a0a 100644 --- a/examples/examples/custom_rpc_client.rs +++ b/examples/examples/custom_rpc_client.rs @@ -16,6 +16,7 @@ use subxt::{ RpcClientT, RpcFuture, RpcSubscription, + RpcSubscriptionId, }, OnlineClient, PolkadotConfig, @@ -54,7 +55,7 @@ impl RpcClientT for MyLoggingClient { sub: &'a str, params: Option>, unsub: &'a str, - ) -> RpcFuture<'a, RpcSubscription> { + ) -> RpcFuture<'a, (RpcSubscription, Option)> { writeln!( self.log.lock().unwrap(), "{sub}({}) (unsub: {unsub})", @@ -68,7 +69,8 @@ impl RpcClientT for MyLoggingClient { let res = RawValue::from_string("[]".to_string()).unwrap(); let stream = futures::stream::once(async move { Ok(res) }); let stream: Pin + Send>> = Box::pin(stream); - Box::pin(std::future::ready(Ok(stream))) + // This subscription does not provide an ID. + Box::pin(std::future::ready(Ok((stream, None)))) } } From dc3143234d95554e502454c28eb0b9a390b81a79 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 25 Nov 2022 11:27:52 +0000 Subject: [PATCH 5/5] rpc: Add structure for subscription stream and subscription id Signed-off-by: Alexandru Vasile --- examples/examples/custom_rpc_client.rs | 5 ++--- subxt/src/rpc/jsonrpsee_impl.rs | 13 +++++++------ subxt/src/rpc/mod.rs | 1 + subxt/src/rpc/rpc_client.rs | 12 +++++------- subxt/src/rpc/rpc_client_t.rs | 12 ++++++++++-- 5 files changed, 25 insertions(+), 18 deletions(-) diff --git a/examples/examples/custom_rpc_client.rs b/examples/examples/custom_rpc_client.rs index a4be9a6a0a..ca56a53e9c 100644 --- a/examples/examples/custom_rpc_client.rs +++ b/examples/examples/custom_rpc_client.rs @@ -16,7 +16,6 @@ use subxt::{ RpcClientT, RpcFuture, RpcSubscription, - RpcSubscriptionId, }, OnlineClient, PolkadotConfig, @@ -55,7 +54,7 @@ impl RpcClientT for MyLoggingClient { sub: &'a str, params: Option>, unsub: &'a str, - ) -> RpcFuture<'a, (RpcSubscription, Option)> { + ) -> RpcFuture<'a, RpcSubscription> { writeln!( self.log.lock().unwrap(), "{sub}({}) (unsub: {unsub})", @@ -70,7 +69,7 @@ impl RpcClientT for MyLoggingClient { let stream = futures::stream::once(async move { Ok(res) }); let stream: Pin + Send>> = Box::pin(stream); // This subscription does not provide an ID. - Box::pin(std::future::ready(Ok((stream, None)))) + Box::pin(std::future::ready(Ok(RpcSubscription { stream, id: None }))) } } diff --git a/subxt/src/rpc/jsonrpsee_impl.rs b/subxt/src/rpc/jsonrpsee_impl.rs index 1e22aba22b..e04a7828b8 100644 --- a/subxt/src/rpc/jsonrpsee_impl.rs +++ b/subxt/src/rpc/jsonrpsee_impl.rs @@ -6,7 +6,6 @@ use super::{ RpcClientT, RpcFuture, RpcSubscription, - RpcSubscriptionId, }; use crate::error::RpcError; use futures::stream::{ @@ -55,9 +54,9 @@ impl RpcClientT for Client { sub: &'a str, params: Option>, unsub: &'a str, - ) -> RpcFuture<'a, (RpcSubscription, Option)> { + ) -> RpcFuture<'a, RpcSubscription> { Box::pin(async move { - let sub = SubscriptionClientT::subscribe::, _>( + let stream = SubscriptionClientT::subscribe::, _>( self, sub, Params(params), @@ -66,15 +65,17 @@ impl RpcClientT for Client { .await .map_err(|e| RpcError::ClientError(Box::new(e)))?; - let sub_id = match sub.kind() { + let id = match stream.kind() { SubscriptionKind::Subscription(SubscriptionId::Str(id)) => { Some(id.clone().into_owned()) } _ => None, }; - let sub = sub.map_err(|e| RpcError::ClientError(Box::new(e))).boxed(); - Ok((sub, sub_id)) + let stream = stream + .map_err(|e| RpcError::ClientError(Box::new(e))) + .boxed(); + Ok(RpcSubscription { stream, id }) }) } } diff --git a/subxt/src/rpc/mod.rs b/subxt/src/rpc/mod.rs index af77eff8fd..db26f67702 100644 --- a/subxt/src/rpc/mod.rs +++ b/subxt/src/rpc/mod.rs @@ -68,6 +68,7 @@ pub use rpc_client_t::{ RpcFuture, RpcSubscription, RpcSubscriptionId, + RpcSubscriptionStream, }; pub use rpc_client::{ diff --git a/subxt/src/rpc/rpc_client.rs b/subxt/src/rpc/rpc_client.rs index 00d801cf3c..2d49dc1926 100644 --- a/subxt/src/rpc/rpc_client.rs +++ b/subxt/src/rpc/rpc_client.rs @@ -61,8 +61,8 @@ impl RpcClient { params: RpcParams, unsub: &str, ) -> Result, Error> { - let (sub, sub_id) = self.0.subscribe_raw(sub, params.build(), unsub).await?; - Ok(Subscription::new(sub, sub_id)) + let sub = self.0.subscribe_raw(sub, params.build(), unsub).await?; + Ok(Subscription::new(sub)) } } @@ -167,7 +167,6 @@ impl RpcParams { /// [`StreamExt`] extension trait. pub struct Subscription { inner: RpcSubscription, - sub_id: Option, _marker: std::marker::PhantomData, } @@ -181,17 +180,16 @@ impl std::fmt::Debug for Subscription { } impl Subscription { - fn new(inner: RpcSubscription, sub_id: Option) -> Self { + fn new(inner: RpcSubscription) -> Self { Self { inner, - sub_id, _marker: std::marker::PhantomData, } } /// Obtain the ID associated with this subscription. pub fn subscription_id(&self) -> Option<&RpcSubscriptionId> { - self.sub_id.as_ref() + self.inner.id.as_ref() } } @@ -211,7 +209,7 @@ impl Stream for Subscription { mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> Poll> { - let res = futures::ready!(self.inner.poll_next_unpin(cx)); + let res = futures::ready!(self.inner.stream.poll_next_unpin(cx)); // Decode the inner RawValue to the type we're expecting and map // any errors to the right shape: diff --git a/subxt/src/rpc/rpc_client_t.rs b/subxt/src/rpc/rpc_client_t.rs index c8e4f687f3..7e1ed0f665 100644 --- a/subxt/src/rpc/rpc_client_t.rs +++ b/subxt/src/rpc/rpc_client_t.rs @@ -52,15 +52,23 @@ pub trait RpcClientT: Send + Sync + 'static { sub: &'a str, params: Option>, unsub: &'a str, - ) -> RpcFuture<'a, (RpcSubscription, Option)>; + ) -> RpcFuture<'a, RpcSubscription>; } /// A boxed future that is returned from the [`RpcClientT`] methods. pub type RpcFuture<'a, T> = Pin> + Send + 'a>>; +/// The RPC subscription returned from [`RpcClientT`]'s `subscription` method. +pub struct RpcSubscription { + /// The subscription stream. + pub stream: RpcSubscriptionStream, + /// The ID associated with the subscription. + pub id: Option, +} + /// The inner subscription stream returned from our [`RpcClientT`]'s `subscription` method. -pub type RpcSubscription = +pub type RpcSubscriptionStream = Pin, RpcError>> + Send + 'static>>; /// The ID associated with the [`RpcClientT`]'s `subscription`.