Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a subscription manager #548

Merged
merged 16 commits into from
May 27, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pubsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ log = "0.4"
parking_lot = "0.10.0"
jsonrpc-core = { version = "14.1", path = "../core" }
serde = "1.0"
rand = "0.7"

[dev-dependencies]
jsonrpc-tcp-server = { version = "14.1", path = "../tcp" }
Expand Down
1 change: 1 addition & 0 deletions pubsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ extern crate log;

mod delegates;
mod handler;
pub mod manager;
pub mod oneshot;
mod subscription;
pub mod typed;
Expand Down
203 changes: 203 additions & 0 deletions pubsub/src/manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
//! The SubscriptionManager used to manage subscription based RPCs.
//!
//! The manager provides four main things in terms of functionality:
//!
//! 1. The ability to create unique subscription IDs through the
//! use of the `IdProvider` trait. Two implementations are availble
//! out of the box, a `NumericIdProvider` and a `RandomStringIdProvider`.
//!
//! 2. An executor with which to drive `Future`s to completion.
//!
//! 3. A way to add new subscriptions. Subscriptions should come in the form
//! of a `Stream`. These subscriptions will be transformed into notifications
//! by the manager, which can be consumed by the client.
//!
//! 4. A way to cancel any currently active subscription.

use std::collections::HashMap;
use std::iter;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};

use crate::core::futures::sync::oneshot;
use crate::core::futures::{future, Future};
use crate::{
typed::{Sink, Subscriber},
SubscriptionId,
};

use log::{error, warn};
use parking_lot::Mutex;
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};

/// Alias for an implementation of `futures::future::Executor`.
pub type TaskExecutor = Arc<dyn future::Executor<Box<dyn Future<Item = (), Error = ()> + Send>> + Send + Sync>;

type ActiveSubscriptions = Arc<Mutex<HashMap<SubscriptionId, oneshot::Sender<()>>>>;

/// Trait used to provide unique subscription IDs.
pub trait IdProvider {
/// A unique ID used to identify a subscription.
type Id: Default + Into<SubscriptionId>;

/// Returns the next ID for the subscription.
fn next_id(&self) -> Self::Id;
}

/// Provides a thread-safe incrementing integer which
/// can be used as a subscription ID.
#[derive(Debug)]
pub struct NumericIdProvider {
current_id: AtomicUsize,
}

impl NumericIdProvider {
/// Create a new NumericIdProvider.
pub fn new() -> Self {
Default::default()
}

/// Create a new NumericIdProvider starting from
/// the given ID.
pub fn with_id(id: AtomicUsize) -> Self {
Self { current_id: id }
}
}

impl IdProvider for NumericIdProvider {
type Id = usize;

fn next_id(&self) -> Self::Id {
self.current_id.fetch_add(1, Ordering::AcqRel)
}
}

impl Default for NumericIdProvider {
fn default() -> Self {
NumericIdProvider {
current_id: AtomicUsize::new(1),
}
}
}

/// Used to generate random strings for use as
/// subscription IDs.
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
pub struct RandomStringIdProvider {
len: usize,
}

impl RandomStringIdProvider {
/// Create a new RandomStringIdProvider.
pub fn new() -> Self {
Default::default()
}

/// Create a new RandomStringIdProvider, which will generate
/// random id strings of the given length.
pub fn with_len(len: usize) -> Self {
Self { len }
}
}

impl IdProvider for RandomStringIdProvider {
type Id = String;

fn next_id(&self) -> Self::Id {
let mut rng = thread_rng();
let id: String = iter::repeat(())
.map(|()| rng.sample(Alphanumeric))
.take(self.len)
.collect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice and elegant :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

id
}
}

impl Default for RandomStringIdProvider {
fn default() -> Self {
Self { len: 16 }
}
}

/// Subscriptions manager.
///
/// Takes care of assigning unique subscription ids and
/// driving the sinks into completion.
#[derive(Clone)]
pub struct SubscriptionManager<I: IdProvider = RandomStringIdProvider> {
id_provider: I,
active_subscriptions: ActiveSubscriptions,
executor: TaskExecutor, // Make generic?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO It's fine, in most workloads spawning subscription is not going to be a bottleneck so we can safely do a virtual call here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
executor: TaskExecutor, // Make generic?
executor: TaskExecutor,

}

impl<I: IdProvider> SubscriptionManager<I> {
/// Creates a new SubscriptionManager.
pub fn new_with_id_provider(id_provider: I, executor: TaskExecutor) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn new_with_id_provider(id_provider: I, executor: TaskExecutor) -> Self {
pub fn with_id_provider(id_provider: I, executor: TaskExecutor) -> Self {

AFAIK the overall convention is to skip new_ in with_ constructors.

Self {
id_provider,
active_subscriptions: Default::default(),
executor,
}
}

/// Borrows the internal task executor.
///
/// This can be used to spawn additional tasks on the underlying event loop.
pub fn executor(&self) -> &TaskExecutor {
&self.executor
}

/// Creates new subscription for given subscriber.
///
/// Second parameter is a function that converts Subscriber Sink into a Future.
/// This future will be driven to completion by the underlying event loop
pub fn add<T, E, G, R, F>(&self, subscriber: Subscriber<T, E>, into_future: G) -> SubscriptionId
where
G: FnOnce(Sink<T, E>) -> R,
R: future::IntoFuture<Future = F, Item = (), Error = ()>,
F: future::Future<Item = (), Error = ()> + Send + 'static,
{
let id = self.id_provider.next_id();
let subscription_id: SubscriptionId = id.into();
if let Ok(sink) = subscriber.assign_id(subscription_id.clone()) {
let (tx, rx) = oneshot::channel();
let future = into_future(sink)
.into_future()
.select(rx.map_err(|e| warn!("Error timing out: {:?}", e)))
.then(|_| Ok(()));

self.active_subscriptions.lock().insert(subscription_id.clone(), tx);
if self.executor.execute(Box::new(future)).is_err() {
error!("Failed to spawn RPC subscription task");
}
}

subscription_id
}

/// Cancel subscription.
///
/// Returns true if subscription existed or false otherwise.
pub fn cancel(&self, id: SubscriptionId) -> bool {
if let Some(tx) = self.active_subscriptions.lock().remove(&id) {
let _ = tx.send(());
return true;
}

false
}
}

impl<I: Default + IdProvider> SubscriptionManager<I> {
/// Creates a new SubscriptionManager.
pub fn new(executor: TaskExecutor) -> Self {
Self {
id_provider: Default::default(),
active_subscriptions: Default::default(),
executor,
}
}
}
104 changes: 80 additions & 24 deletions pubsub/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ impl<T: PubSubMetadata> PubSubMetadata for Option<T> {
}

/// Unique subscription id.
///
/// NOTE Assigning same id to different requests will cause the previous request to be unsubscribed.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum SubscriptionId {
/// U64 number
/// A numerical ID, represented by a `u64`.
Number(u64),
/// String
/// A non-numerical ID, for example a hash.
String(String),
}

Expand All @@ -61,9 +62,11 @@ impl From<String> for SubscriptionId {
}
}

impl From<u64> for SubscriptionId {
fn from(other: u64) -> Self {
SubscriptionId::Number(other)
// TODO: Don't unwrap, and probably implement TryFrom instead of From
use std::convert::TryInto;
impl From<usize> for SubscriptionId {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think instead of providing this From<usize> implementation we should rather change the NumericIdProvider to convert usize to u64 internally. The reason is that it's highly unlikely for NumericIdProvider to reach the case where this conversion might fail (given it runs on 128-bit processor anyway O.o) and here we are making stronger assumption that usize can be safely converted to SubscriptionId.

fn from(id: usize) -> Self {
SubscriptionId::Number(id.try_into().unwrap())
}
}

Expand All @@ -76,30 +79,83 @@ impl From<SubscriptionId> for core::Value {
}
}

macro_rules! impl_from_num {
($num:ty) => {
impl From<$num> for SubscriptionId {
fn from(other: $num) -> Self {
SubscriptionId::Number(other.into())
}
}
};
}

impl_from_num!(u8);
impl_from_num!(u16);
impl_from_num!(u32);
impl_from_num!(u64);

#[cfg(test)]
mod tests {
use super::SubscriptionId;
use crate::core::Value;

#[test]
fn should_convert_between_value_and_subscription_id() {
// given
let val1 = Value::Number(5.into());
let val2 = Value::String("asdf".into());
let val3 = Value::Null;

// when
let res1 = SubscriptionId::parse_value(&val1);
let res2 = SubscriptionId::parse_value(&val2);
let res3 = SubscriptionId::parse_value(&val3);

// then
assert_eq!(res1, Some(SubscriptionId::Number(5)));
assert_eq!(res2, Some(SubscriptionId::String("asdf".into())));
assert_eq!(res3, None);

// and back
assert_eq!(Value::from(res1.unwrap()), val1);
assert_eq!(Value::from(res2.unwrap()), val2);
fn should_convert_between_number_value_and_subscription_id() {
let val = Value::Number(5.into());
let res = SubscriptionId::parse_value(&val);

assert_eq!(res, Some(SubscriptionId::Number(5)));
assert_eq!(Value::from(res.unwrap()), val);
}

#[test]
fn should_convert_between_string_value_and_subscription_id() {
let val = Value::String("asdf".into());
let res = SubscriptionId::parse_value(&val);

assert_eq!(res, Some(SubscriptionId::String("asdf".into())));
assert_eq!(Value::from(res.unwrap()), val);
}

#[test]
fn should_convert_between_null_value_and_subscription_id() {
let val = Value::Null;
let res = SubscriptionId::parse_value(&val);
assert_eq!(res, None);
}

#[test]
fn should_convert_from_u8_to_subscription_id() {
let val = 5u8;
let res: SubscriptionId = val.into();
assert_eq!(res, SubscriptionId::Number(5));
}

#[test]
fn should_convert_from_u16_to_subscription_id() {
let val = 5u16;
let res: SubscriptionId = val.into();
assert_eq!(res, SubscriptionId::Number(5));
}

#[test]
fn should_convert_from_u32_to_subscription_id() {
let val = 5u32;
let res: SubscriptionId = val.into();
assert_eq!(res, SubscriptionId::Number(5));
}

#[test]
fn should_convert_from_u64_to_subscription_id() {
let val = 5u64;
let res: SubscriptionId = val.into();
assert_eq!(res, SubscriptionId::Number(5));
}

#[test]
fn should_convert_from_string_to_subscription_id() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the test ❤️

let val = "String".to_string();
let res: SubscriptionId = val.into();
assert_eq!(res, SubscriptionId::String("String".to_string()));
}
}