Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Automatically unsubscribe storage listeners when they're dropped (RCP…
Browse files Browse the repository at this point in the history
… node memory leak fix) (#10454)

* Automatically unsubscribe storage listeners when they're dropped

* Fix tests' compilation in `sc-client-api`

* Add an extra test

* Align to review comments; cleanups

* Update client/api/src/notifications.rs

* FMT

Co-authored-by: Bastian Köcher <[email protected]>
Co-authored-by: Bastian Köcher <[email protected]>
  • Loading branch information
3 people authored Dec 10, 2021
1 parent a634b44 commit be75e55
Showing 1 changed file with 174 additions and 55 deletions.
229 changes: 174 additions & 55 deletions client/api/src/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,29 @@
use std::{
collections::{HashMap, HashSet},
sync::Arc,
pin::Pin,
sync::{Arc, Weak},
task::Poll,
};

use fnv::{FnvHashMap, FnvHashSet};
use futures::Stream;
use parking_lot::Mutex;
use prometheus_endpoint::{register, CounterVec, Opts, Registry, U64};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_core::storage::{StorageData, StorageKey};
use sp_core::{
hexdisplay::HexDisplay,
storage::{StorageData, StorageKey},
};
use sp_runtime::traits::Block as BlockT;

/// Storage change set
#[derive(Debug)]
pub struct StorageChangeSet {
changes: Arc<Vec<(StorageKey, Option<StorageData>)>>,
child_changes: Arc<Vec<(StorageKey, Vec<(StorageKey, Option<StorageData>)>)>>,
filter: Option<HashSet<StorageKey>>,
child_filters: Option<HashMap<StorageKey, Option<HashSet<StorageKey>>>>,
filter: Keys,
child_filters: ChildKeys,
}

impl StorageChangeSet {
Expand Down Expand Up @@ -74,15 +81,60 @@ impl StorageChangeSet {
}

/// Type that implements `futures::Stream` of storage change events.
pub type StorageEventStream<H> = TracingUnboundedReceiver<(H, StorageChangeSet)>;
pub struct StorageEventStream<H> {
rx: TracingUnboundedReceiver<(H, StorageChangeSet)>,
storage_notifications: Weak<Mutex<StorageNotificationsImpl<H>>>,
was_triggered: bool,
id: u64,
}

impl<H> Stream for StorageEventStream<H> {
type Item = <TracingUnboundedReceiver<(H, StorageChangeSet)> as Stream>::Item;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let result = Stream::poll_next(Pin::new(&mut self.rx), cx);
if result.is_ready() {
self.was_triggered = true;
}
result
}
}

impl<H> Drop for StorageEventStream<H> {
fn drop(&mut self) {
if let Some(storage_notifications) = self.storage_notifications.upgrade() {
if let Some((keys, child_keys)) =
storage_notifications.lock().remove_subscriber(self.id)
{
if !self.was_triggered {
log::trace!(
target: "storage_notifications",
"Listener was never triggered: id={}, keys={:?}, child_keys={:?}",
self.id,
PrintKeys(&keys),
PrintChildKeys(&child_keys),
);
}
}
}
}
}

type SubscriberId = u64;

type SubscribersGauge = CounterVec<U64>;

/// Manages storage listeners.
#[derive(Debug)]
pub struct StorageNotifications<Block: BlockT> {
pub struct StorageNotifications<Block: BlockT>(Arc<Mutex<StorageNotificationsImpl<Block::Hash>>>);

type Keys = Option<HashSet<StorageKey>>;
type ChildKeys = Option<HashMap<StorageKey, Option<HashSet<StorageKey>>>>;

#[derive(Debug)]
struct StorageNotificationsImpl<Hash> {
metrics: Option<SubscribersGauge>,
next_id: SubscriberId,
wildcard_listeners: FnvHashSet<SubscriberId>,
Expand All @@ -93,15 +145,17 @@ pub struct StorageNotifications<Block: BlockT> {
>,
sinks: FnvHashMap<
SubscriberId,
(
TracingUnboundedSender<(Block::Hash, StorageChangeSet)>,
Option<HashSet<StorageKey>>,
Option<HashMap<StorageKey, Option<HashSet<StorageKey>>>>,
),
(TracingUnboundedSender<(Hash, StorageChangeSet)>, Keys, ChildKeys),
>,
}

impl<Block: BlockT> Default for StorageNotifications<Block> {
fn default() -> Self {
Self(Default::default())
}
}

impl<Hash> Default for StorageNotificationsImpl<Hash> {
fn default() -> Self {
Self {
metrics: Default::default(),
Expand All @@ -114,10 +168,68 @@ impl<Block: BlockT> Default for StorageNotifications<Block> {
}
}

struct PrintKeys<'a>(&'a Keys);
impl<'a> std::fmt::Debug for PrintKeys<'a> {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
if let Some(keys) = self.0 {
fmt.debug_list().entries(keys.iter().map(HexDisplay::from)).finish()
} else {
write!(fmt, "None")
}
}
}

struct PrintChildKeys<'a>(&'a ChildKeys);
impl<'a> std::fmt::Debug for PrintChildKeys<'a> {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
if let Some(map) = self.0 {
fmt.debug_map()
.entries(map.iter().map(|(key, values)| (HexDisplay::from(key), PrintKeys(values))))
.finish()
} else {
write!(fmt, "None")
}
}
}

impl<Block: BlockT> StorageNotifications<Block> {
/// Initialize a new StorageNotifications
/// optionally pass a prometheus registry to send subscriber metrics to
pub fn new(prometheus_registry: Option<Registry>) -> Self {
StorageNotifications(Arc::new(Mutex::new(StorageNotificationsImpl::new(
prometheus_registry,
))))
}

/// Trigger notification to all listeners.
///
/// Note the changes are going to be filtered by listener's filter key.
/// In fact no event might be sent if clients are not interested in the changes.
pub fn trigger(
&mut self,
hash: &Block::Hash,
changeset: impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
child_changeset: impl Iterator<
Item = (Vec<u8>, impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>),
>,
) {
self.0.lock().trigger(hash, changeset, child_changeset);
}

/// Start listening for particular storage keys.
pub fn listen(
&mut self,
filter_keys: Option<&[StorageKey]>,
filter_child_keys: Option<&[(StorageKey, Option<Vec<StorageKey>>)]>,
) -> StorageEventStream<Block::Hash> {
let (id, rx) = self.0.lock().listen(filter_keys, filter_child_keys);
let storage_notifications = Arc::downgrade(&self.0);
StorageEventStream { rx, storage_notifications, was_triggered: false, id }
}
}

impl<Hash> StorageNotificationsImpl<Hash> {
fn new(prometheus_registry: Option<Registry>) -> Self {
let metrics = prometheus_registry.and_then(|r| {
CounterVec::new(
Opts::new(
Expand All @@ -130,7 +242,7 @@ impl<Block: BlockT> StorageNotifications<Block> {
.ok()
});

StorageNotifications {
StorageNotificationsImpl {
metrics,
next_id: Default::default(),
wildcard_listeners: Default::default(),
Expand All @@ -139,18 +251,16 @@ impl<Block: BlockT> StorageNotifications<Block> {
sinks: Default::default(),
}
}
/// Trigger notification to all listeners.
///
/// Note the changes are going to be filtered by listener's filter key.
/// In fact no event might be sent if clients are not interested in the changes.
pub fn trigger(
fn trigger(
&mut self,
hash: &Block::Hash,
hash: &Hash,
changeset: impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
child_changeset: impl Iterator<
Item = (Vec<u8>, impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>),
>,
) {
) where
Hash: Clone,
{
let has_wildcard = !self.wildcard_listeners.is_empty();

// early exit if no listeners
Expand Down Expand Up @@ -244,7 +354,7 @@ impl<Block: BlockT> StorageNotifications<Block> {

fn remove_subscriber_from(
subscriber: &SubscriberId,
filters: &Option<HashSet<StorageKey>>,
filters: &Keys,
listeners: &mut HashMap<StorageKey, FnvHashSet<SubscriberId>>,
wildcards: &mut FnvHashSet<SubscriberId>,
) {
Expand All @@ -269,42 +379,43 @@ impl<Block: BlockT> StorageNotifications<Block> {
}
}

fn remove_subscriber(&mut self, subscriber: SubscriberId) {
if let Some((_, filters, child_filters)) = self.sinks.remove(&subscriber) {
Self::remove_subscriber_from(
&subscriber,
&filters,
&mut self.listeners,
&mut self.wildcard_listeners,
);
if let Some(child_filters) = child_filters.as_ref() {
for (c_key, filters) in child_filters {
if let Some((listeners, wildcards)) = self.child_listeners.get_mut(&c_key) {
Self::remove_subscriber_from(
&subscriber,
&filters,
&mut *listeners,
&mut *wildcards,
);

if listeners.is_empty() && wildcards.is_empty() {
self.child_listeners.remove(&c_key);
}
fn remove_subscriber(&mut self, subscriber: SubscriberId) -> Option<(Keys, ChildKeys)> {
let (_, filters, child_filters) = self.sinks.remove(&subscriber)?;
Self::remove_subscriber_from(
&subscriber,
&filters,
&mut self.listeners,
&mut self.wildcard_listeners,
);
if let Some(child_filters) = child_filters.as_ref() {
for (c_key, filters) in child_filters {
if let Some((listeners, wildcards)) = self.child_listeners.get_mut(&c_key) {
Self::remove_subscriber_from(
&subscriber,
&filters,
&mut *listeners,
&mut *wildcards,
);

if listeners.is_empty() && wildcards.is_empty() {
self.child_listeners.remove(&c_key);
}
}
}
if let Some(m) = self.metrics.as_ref() {
m.with_label_values(&[&"removed"]).inc();
}
}
if let Some(m) = self.metrics.as_ref() {
m.with_label_values(&[&"removed"]).inc();
}

Some((filters, child_filters))
}

fn listen_from(
current_id: SubscriberId,
filter_keys: &Option<impl AsRef<[StorageKey]>>,
listeners: &mut HashMap<StorageKey, FnvHashSet<SubscriberId>>,
wildcards: &mut FnvHashSet<SubscriberId>,
) -> Option<HashSet<StorageKey>> {
) -> Keys {
match filter_keys {
None => {
wildcards.insert(current_id);
Expand All @@ -325,12 +436,11 @@ impl<Block: BlockT> StorageNotifications<Block> {
}
}

/// Start listening for particular storage keys.
pub fn listen(
fn listen(
&mut self,
filter_keys: Option<&[StorageKey]>,
filter_child_keys: Option<&[(StorageKey, Option<Vec<StorageKey>>)]>,
) -> StorageEventStream<Block::Hash> {
) -> (u64, TracingUnboundedReceiver<(Hash, StorageChangeSet)>) {
self.next_id += 1;
let current_id = self.next_id;

Expand Down Expand Up @@ -364,7 +474,7 @@ impl<Block: BlockT> StorageNotifications<Block> {
m.with_label_values(&[&"added"]).inc();
}

rx
(current_id, rx)
}
}

Expand Down Expand Up @@ -517,9 +627,9 @@ mod tests {
let _recv3 = futures::executor::block_on_stream(notifications.listen(None, None));
let _recv4 =
futures::executor::block_on_stream(notifications.listen(None, Some(&child_filter)));
assert_eq!(notifications.listeners.len(), 2);
assert_eq!(notifications.wildcard_listeners.len(), 2);
assert_eq!(notifications.child_listeners.len(), 1);
assert_eq!(notifications.0.lock().listeners.len(), 2);
assert_eq!(notifications.0.lock().wildcard_listeners.len(), 2);
assert_eq!(notifications.0.lock().child_listeners.len(), 1);
}

// when
Expand All @@ -528,9 +638,18 @@ mod tests {
notifications.trigger(&Hash::from_low_u64_be(1), changeset.into_iter(), c_changeset);

// then
assert_eq!(notifications.listeners.len(), 0);
assert_eq!(notifications.wildcard_listeners.len(), 0);
assert_eq!(notifications.child_listeners.len(), 0);
assert_eq!(notifications.0.lock().listeners.len(), 0);
assert_eq!(notifications.0.lock().wildcard_listeners.len(), 0);
assert_eq!(notifications.0.lock().child_listeners.len(), 0);
}

#[test]
fn should_cleanup_subscriber_if_stream_is_dropped() {
let mut notifications = StorageNotifications::<Block>::default();
let stream = notifications.listen(None, None);
assert_eq!(notifications.0.lock().sinks.len(), 1);
std::mem::drop(stream);
assert_eq!(notifications.0.lock().sinks.len(), 0);
}

#[test]
Expand Down

0 comments on commit be75e55

Please sign in to comment.