Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Revert "Use Subscription Manager from jsonrpc-pubsub" #6252

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 62 additions & 64 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion bin/node/browser-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ license = "Apache-2.0"
[dependencies]
futures-timer = "3.0.2"
libp2p = { version = "0.19.1", default-features = false }
jsonrpc-core = "14.2"
jsonrpc-core = "14.0.5"
serde = "1.0.106"
serde_json = "1.0.48"
wasm-bindgen = { version = "=0.2.62", features = ["serde-serialize"] }
Expand Down
2 changes: 1 addition & 1 deletion bin/node/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ codec = { package = "parity-scale-codec", version = "1.3.0" }
serde = { version = "1.0.102", features = ["derive"] }
futures = { version = "0.3.1", features = ["compat"] }
hex-literal = "0.2.1"
jsonrpc-core = "14.2"
jsonrpc-core = "14.0.3"
log = "0.4.8"
rand = "0.7.2"
structopt = { version = "0.3.8", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion bin/node/rpc-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ targets = ["x86_64-unknown-linux-gnu"]
env_logger = "0.7.0"
futures = "0.1.29"
hyper = "0.12.35"
jsonrpc-core-client = { version = "14.2", default-features = false, features = ["http"] }
jsonrpc-core-client = { version = "14.0.5", default-features = false, features = ["http"] }
log = "0.4.8"
node-primitives = { version = "2.0.0-rc2", path = "../primitives" }
sc-rpc = { version = "2.0.0-rc2", path = "../../../client/rpc" }
2 changes: 1 addition & 1 deletion bin/node/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
sc-client-api = { version = "2.0.0-rc2", path = "../../../client/api" }
jsonrpc-core = "14.2"
jsonrpc-core = "14.0.3"
node-primitives = { version = "2.0.0-rc2", path = "../primitives" }
node-runtime = { version = "2.0.0-rc2", path = "../runtime" }
sp-runtime = { version = "2.0.0-rc2", path = "../../../primitives/runtime" }
Expand Down
2 changes: 1 addition & 1 deletion bin/utils/subkey/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ rpassword = "4.0.1"
itertools = "0.8.2"
derive_more = { version = "0.99.2" }
sc-rpc = { version = "2.0.0-rc2", path = "../../../client/rpc" }
jsonrpc-core-client = { version = "14.2", features = ["http"] }
jsonrpc-core-client = { version = "14.0.3", features = ["http"] }
hyper = "0.12.35"
libp2p = { version = "0.19.1", default-features = false }
serde_json = "1.0"
Expand Down
6 changes: 3 additions & 3 deletions client/consensus/babe/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
sc-consensus-babe = { version = "0.8.0-rc2", path = "../" }
sc-rpc-api = { version = "0.8.0-rc2", path = "../../../rpc-api" }
jsonrpc-core = "14.2"
jsonrpc-core-client = "14.2"
jsonrpc-derive = "14.2.1"
jsonrpc-core = "14.0.3"
jsonrpc-core-client = "14.0.5"
jsonrpc-derive = "14.0.3"
sp-consensus-babe = { version = "0.8.0-rc2", path = "../../../../primitives/consensus/babe" }
serde = { version = "1.0.104", features=["derive"] }
sp-blockchain = { version = "2.0.0-rc2", path = "../../../../primitives/blockchain" }
Expand Down
6 changes: 3 additions & 3 deletions client/consensus/manual-seal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
derive_more = "0.99.2"
futures = "0.3.4"
jsonrpc-core = "14.2"
jsonrpc-core-client = "14.2"
jsonrpc-derive = "14.2.1"
jsonrpc-core = "14.0.5"
jsonrpc-core-client = "14.0.5"
jsonrpc-derive = "14.0.5"
log = "0.4.8"
parking_lot = "0.10.0"
serde = { version = "1.0", features=["derive"] }
Expand Down
6 changes: 3 additions & 3 deletions client/finality-grandpa/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
[dependencies]
sc-finality-grandpa = { version = "0.8.0-rc2", path = "../" }
finality-grandpa = { version = "0.12.3", features = ["derive-codec"] }
jsonrpc-core = "14.2"
jsonrpc-core-client = "14.2"
jsonrpc-derive = "14.2.1"
jsonrpc-core = "14.0.3"
jsonrpc-core-client = "14.0.3"
jsonrpc-derive = "14.0.3"
futures = { version = "0.3.4", features = ["compat"] }
serde = { version = "1.0.105", features = ["derive"] }
serde_json = "1.0.50"
Expand Down
8 changes: 4 additions & 4 deletions client/rpc-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ targets = ["x86_64-unknown-linux-gnu"]
codec = { package = "parity-scale-codec", version = "1.3.0" }
derive_more = "0.99.2"
futures = { version = "0.3.1", features = ["compat"] }
jsonrpc-core = "14.2"
jsonrpc-core-client = "14.2"
jsonrpc-derive = "14.2.1"
jsonrpc-pubsub = "14.2"
jsonrpc-core = "14.0.3"
jsonrpc-core-client = "14.0.5"
jsonrpc-derive = "14.0.3"
jsonrpc-pubsub = "14.0.3"
log = "0.4.8"
parking_lot = "0.10.0"
sp-core = { version = "2.0.0-rc2", path = "../../primitives/core" }
Expand Down
2 changes: 2 additions & 0 deletions client/rpc-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
mod errors;
mod helpers;
mod policy;
mod subscriptions;

pub use jsonrpc_core::IoHandlerExtension as RpcExtension;
pub use subscriptions::{Subscriptions, TaskExecutor};
pub use helpers::Receiver;
pub use policy::DenyUnsafe;

Expand Down
121 changes: 121 additions & 0 deletions client/rpc-api/src/subscriptions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// This file is part of Substrate.

// Copyright (C) 2017-2020 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use std::collections::HashMap;
use std::sync::{Arc, atomic::{self, AtomicUsize}};

use log::{error, warn};
use jsonrpc_pubsub::{SubscriptionId, typed::{Sink, Subscriber}};
use parking_lot::Mutex;
use jsonrpc_core::futures::sync::oneshot;
use jsonrpc_core::futures::{Future, future};

type Id = u64;

/// Alias for a an implementation of `futures::future::Executor`.
pub type TaskExecutor = Arc<dyn future::Executor<Box<dyn Future<Item = (), Error = ()> + Send>> + Send + Sync>;

/// Generate unique ids for subscriptions.
#[derive(Clone, Debug)]
pub struct IdProvider {
next_id: Arc<AtomicUsize>,
}
impl Default for IdProvider {
fn default() -> Self {
IdProvider {
next_id: Arc::new(AtomicUsize::new(1)),
}
}
}

impl IdProvider {
/// Returns next id for the subscription.
pub fn next_id(&self) -> Id {
self.next_id.fetch_add(1, atomic::Ordering::AcqRel) as u64
}
}

/// Subscriptions manager.
///
/// Takes care of assigning unique subscription ids and
/// driving the sinks into completion.
#[derive(Clone)]
pub struct Subscriptions {
next_id: IdProvider,
active_subscriptions: Arc<Mutex<HashMap<Id, oneshot::Sender<()>>>>,
executor: TaskExecutor,
}

impl Subscriptions {
/// Creates new `Subscriptions` object.
pub fn new(executor: TaskExecutor) -> Self {
Subscriptions {
next_id: Default::default(),
active_subscriptions: Default::default(),
executor,
}
}

/// Borrows the internal task executor.
///
/// This can be used to spawn additional tasks on the underlying event loop.
pub fn executor(&self) -> &TaskExecutor {
&self.executor
}

/// Creates new subscription for given subscriber.
///
/// Second parameter is a function that converts Subscriber sink into a future.
/// This future will be driven to completion by the underlying event loop
/// or will be cancelled in case #cancel is invoked.
pub fn add<T, E, G, R, F>(&self, subscriber: Subscriber<T, E>, into_future: G) -> SubscriptionId where
G: FnOnce(Sink<T, E>) -> R,
R: future::IntoFuture<Future=F, Item=(), Error=()>,
F: future::Future<Item=(), Error=()> + Send + 'static,
{
let id = self.next_id.next_id();
let subscription_id: SubscriptionId = id.into();
if let Ok(sink) = subscriber.assign_id(subscription_id.clone()) {
let (tx, rx) = oneshot::channel();
let future = into_future(sink)
.into_future()
.select(rx.map_err(|e| warn!("Error timeing out: {:?}", e)))
.then(|_| Ok(()));

self.active_subscriptions.lock().insert(id, tx);
if self.executor.execute(Box::new(future)).is_err() {
error!("Failed to spawn RPC subscription task");
}
}

subscription_id
}

/// Cancel subscription.
///
/// Returns true if subscription existed or false otherwise.
pub fn cancel(&self, id: SubscriptionId) -> bool {
if let SubscriptionId::Number(id) = id {
if let Some(tx) = self.active_subscriptions.lock().remove(&id) {
let _ = tx.send(());
return true;
}
}
false
}
}
8 changes: 4 additions & 4 deletions client/rpc-servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ description = "Substrate RPC servers."
targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
jsonrpc-core = "14.2"
pubsub = { package = "jsonrpc-pubsub", version = "14.2" }
jsonrpc-core = "14.0.3"
pubsub = { package = "jsonrpc-pubsub", version = "14.0.3" }
log = "0.4.8"
serde = "1.0.101"
serde_json = "1.0.41"
sp-runtime = { version = "2.0.0-rc2", path = "../../primitives/runtime" }

[target.'cfg(not(target_os = "unknown"))'.dependencies]
http = { package = "jsonrpc-http-server", version = "14.2" }
ws = { package = "jsonrpc-ws-server", version = "14.2" }
http = { package = "jsonrpc-http-server", version = "14.0.3" }
ws = { package = "jsonrpc-ws-server", version = "14.0.3" }
4 changes: 2 additions & 2 deletions client/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ sc-client-api = { version = "2.0.0-rc2", path = "../api" }
sp-api = { version = "2.0.0-rc2", path = "../../primitives/api" }
codec = { package = "parity-scale-codec", version = "1.3.0" }
futures = { version = "0.3.1", features = ["compat"] }
jsonrpc-pubsub = "14.2"
jsonrpc-pubsub = "14.0.3"
log = "0.4.8"
sp-core = { version = "2.0.0-rc2", path = "../../primitives/core" }
rpc = { package = "jsonrpc-core", version = "14.2" }
rpc = { package = "jsonrpc-core", version = "14.0.3" }
sp-version = { version = "2.0.0-rc2", path = "../../primitives/version" }
serde_json = "1.0.41"
sp-session = { version = "2.0.0-rc2", path = "../../primitives/session" }
Expand Down
9 changes: 5 additions & 4 deletions client/rpc/src/author/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ use rpc::futures::{
};
use futures::{StreamExt as _, compat::Compat};
use futures::future::{ready, FutureExt, TryFutureExt};
use sc_rpc_api::DenyUnsafe;
use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId, manager::SubscriptionManager};
use sc_rpc_api::{DenyUnsafe, Subscriptions};
use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
use codec::{Encode, Decode};
use sp_core::{Bytes, traits::BareCryptoStorePtr};
use sp_api::ProvideRuntimeApi;
Expand All @@ -55,7 +55,7 @@ pub struct Author<P, Client> {
/// Transactions pool
pool: Arc<P>,
/// Subscriptions manager
subscriptions: SubscriptionManager,
subscriptions: Subscriptions,
/// The key store.
keystore: BareCryptoStorePtr,
/// Whether to deny unsafe calls
Expand All @@ -67,7 +67,7 @@ impl<P, Client> Author<P, Client> {
pub fn new(
client: Arc<Client>,
pool: Arc<P>,
subscriptions: SubscriptionManager,
subscriptions: Subscriptions,
keystore: BareCryptoStorePtr,
deny_unsafe: DenyUnsafe,
) -> Self {
Expand All @@ -81,6 +81,7 @@ impl<P, Client> Author<P, Client> {
}
}


/// Currently we treat all RPC transactions as externals.
///
/// Possibly in the future we could allow opt-in for special treatment
Expand Down
35 changes: 11 additions & 24 deletions client/rpc/src/author/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl TestSetup {
Author {
client: self.client.clone(),
pool: self.pool.clone(),
subscriptions: SubscriptionManager::new(Arc::new(crate::testing::TaskExecutor)),
subscriptions: Subscriptions::new(Arc::new(crate::testing::TaskExecutor)),
keystore: self.keystore.clone(),
deny_unsafe: DenyUnsafe::No,
}
Expand Down Expand Up @@ -133,14 +133,8 @@ fn should_watch_extrinsic() {
uxt(AccountKeyring::Alice, 0).encode().into(),
);

let id = executor::block_on(id_rx.compat()).unwrap().unwrap();
assert_matches!(id, SubscriptionId::String(_));

let id = match id {
SubscriptionId::String(id) => id,
_ => unreachable!(),
};

// then
assert_eq!(executor::block_on(id_rx.compat()), Ok(Ok(1.into())));
// check notifications
let replacement = {
let tx = Transfer {
Expand All @@ -153,22 +147,15 @@ fn should_watch_extrinsic() {
};
AuthorApi::submit_extrinsic(&p, replacement.encode().into()).wait().unwrap();
let (res, data) = executor::block_on(data.into_future().compat()).unwrap();

let expected = Some(format!(
r#"{{"jsonrpc":"2.0","method":"test","params":{{"result":"ready","subscription":"{}"}}}}"#,
id,
));
assert_eq!(res, expected);

assert_eq!(
res,
Some(r#"{"jsonrpc":"2.0","method":"test","params":{"result":"ready","subscription":1}}"#.into())
);
let h = blake2_256(&replacement.encode());
let expected = Some(format!(
r#"{{"jsonrpc":"2.0","method":"test","params":{{"result":{{"usurped":"0x{}"}},"subscription":"{}"}}}}"#,
HexDisplay::from(&h),
id,
));

let res = executor::block_on(data.into_future().compat()).unwrap().0;
assert_eq!(res, expected);
assert_eq!(
executor::block_on(data.into_future().compat()).unwrap().0,
Some(format!(r#"{{"jsonrpc":"2.0","method":"test","params":{{"result":{{"usurped":"0x{}"}},"subscription":1}}}}"#, HexDisplay::from(&h)))
);
}

#[test]
Expand Down
8 changes: 4 additions & 4 deletions client/rpc/src/chain/chain_full.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

use std::sync::Arc;
use rpc::futures::future::result;
use jsonrpc_pubsub::manager::SubscriptionManager;

use sc_rpc_api::Subscriptions;
use sc_client_api::{BlockchainEvents, BlockBackend};
use sp_runtime::{generic::{BlockId, SignedBlock}, traits::{Block as BlockT}};

Expand All @@ -32,14 +32,14 @@ pub struct FullChain<Block: BlockT, Client> {
/// Substrate client.
client: Arc<Client>,
/// Current subscriptions.
subscriptions: SubscriptionManager,
subscriptions: Subscriptions,
/// phantom member to pin the block type
_phantom: PhantomData<Block>,
}

impl<Block: BlockT, Client> FullChain<Block, Client> {
/// Create new Chain API RPC handler.
pub fn new(client: Arc<Client>, subscriptions: SubscriptionManager) -> Self {
pub fn new(client: Arc<Client>, subscriptions: Subscriptions) -> Self {
Self {
client,
subscriptions,
Expand All @@ -56,7 +56,7 @@ impl<Block, Client> ChainBackend<Client, Block> for FullChain<Block, Client> whe
&self.client
}

fn subscriptions(&self) -> &SubscriptionManager {
fn subscriptions(&self) -> &Subscriptions {
&self.subscriptions
}

Expand Down
Loading