diff --git a/block-filter/src/filter.rs b/block-filter/src/filter.rs index 06a3b4f963..f11eb00b5a 100644 --- a/block-filter/src/filter.rs +++ b/block-filter/src/filter.rs @@ -31,15 +31,14 @@ impl BlockFilter { async_handle.spawn_blocking(move || filter_data_builder.build_filter_data()); async_handle.spawn(async move { - let mut new_block_receiver = notify_controller - .subscribe_new_block(NAME.to_string()) - .await; + let mut new_block_watcher = notify_controller.watch_new_block(NAME.to_string()).await; let _build_filter_data_finished = build_filter_data.await; loop { tokio::select! { - Some(_) = new_block_receiver.recv() => { + Ok(_) = new_block_watcher.changed() => { block_in_place(|| self.build_filter_data()); + new_block_watcher.borrow_and_update(); } _ = &mut stop_rx => break, else => break, diff --git a/notify/src/lib.rs b/notify/src/lib.rs index 007bd255ae..b5fe0f7d7f 100644 --- a/notify/src/lib.rs +++ b/notify/src/lib.rs @@ -3,16 +3,19 @@ use ckb_app_config::NotifyConfig; use ckb_async_runtime::Handle; use ckb_logger::{debug, error, trace}; use ckb_stop_handler::{SignalSender, StopHandler}; +use ckb_types::packed::Byte32; use ckb_types::{ core::{tx_pool::Reject, BlockView}, packed::Alert, }; -use std::collections::HashMap; -use std::process::Command; +use std::{collections::HashMap, time::Duration}; +use tokio::process::Command; +use tokio::sync::watch; use tokio::sync::{ mpsc::{self, Receiver, Sender}, oneshot, }; +use tokio::time::timeout; pub use ckb_types::core::service::PoolTransactionEntry; @@ -48,11 +51,46 @@ pub const NOTIFY_CHANNEL_SIZE: usize = 128; /// TODO(doc): @quake pub type NotifyRegister = Sender>>; +/// watcher request type alias +pub type NotifyWatcher = Sender>>; + +/// Notify timeout config +#[derive(Copy, Clone)] +pub(crate) struct NotifyTimeout { + pub(crate) tx: Duration, + pub(crate) alert: Duration, + pub(crate) script: Duration, +} + +const DEFAULT_TX_NOTIFY_TIMEOUT: Duration = Duration::from_millis(300); +const DEFAULT_ALERT_NOTIFY_TIMEOUT: Duration = Duration::from_millis(10_000); +const DEFAULT_SCRIPT_TIMEOUT: Duration = Duration::from_millis(10_000); + +impl NotifyTimeout { + pub(crate) fn new(config: &NotifyConfig) -> Self { + NotifyTimeout { + tx: config + .notify_tx_timeout + .map(Duration::from_millis) + .unwrap_or(DEFAULT_TX_NOTIFY_TIMEOUT), + alert: config + .notify_alert_timeout + .map(Duration::from_millis) + .unwrap_or(DEFAULT_ALERT_NOTIFY_TIMEOUT), + script: config + .script_timeout + .map(Duration::from_millis) + .unwrap_or(DEFAULT_SCRIPT_TIMEOUT), + } + } +} + /// TODO(doc): @quake #[derive(Clone)] pub struct NotifyController { stop: StopHandler<()>, new_block_register: NotifyRegister, + new_block_watcher: NotifyWatcher, new_block_notifier: Sender, new_transaction_register: NotifyRegister, new_transaction_notifier: Sender, @@ -75,31 +113,42 @@ impl Drop for NotifyController { pub struct NotifyService { config: NotifyConfig, new_block_subscribers: HashMap>, + new_block_watchers: HashMap>, new_transaction_subscribers: HashMap>, proposed_transaction_subscribers: HashMap>, reject_transaction_subscribers: HashMap>, network_alert_subscribers: HashMap>, + timeout: NotifyTimeout, + handle: Handle, } impl NotifyService { /// TODO(doc): @quake - pub fn new(config: NotifyConfig) -> Self { + pub fn new(config: NotifyConfig, handle: Handle) -> Self { + let timeout = NotifyTimeout::new(&config); + Self { config, new_block_subscribers: HashMap::default(), + new_block_watchers: HashMap::default(), new_transaction_subscribers: HashMap::default(), proposed_transaction_subscribers: HashMap::default(), reject_transaction_subscribers: HashMap::default(), network_alert_subscribers: HashMap::default(), + timeout, + handle, } } /// start background tokio spawned task. - pub fn start(mut self, handle: Handle) -> NotifyController { + pub fn start(mut self) -> NotifyController { let (signal_sender, mut signal_receiver) = oneshot::channel(); + let handle = self.handle.clone(); let (new_block_register, mut new_block_register_receiver) = mpsc::channel(REGISTER_CHANNEL_SIZE); + let (new_block_watcher, mut new_block_watcher_receiver) = + mpsc::channel(REGISTER_CHANNEL_SIZE); let (new_block_sender, mut new_block_receiver) = mpsc::channel(NOTIFY_CHANNEL_SIZE); let (new_transaction_register, mut new_transaction_register_receiver) = @@ -128,15 +177,16 @@ impl NotifyService { break; } Some(msg) = new_block_register_receiver.recv() => { self.handle_register_new_block(msg) }, - Some(msg) = new_block_receiver.recv() => { self.handle_notify_new_block(msg).await }, + Some(msg) = new_block_watcher_receiver.recv() => { self.handle_watch_new_block(msg) }, + Some(msg) = new_block_receiver.recv() => { self.handle_notify_new_block(msg) }, Some(msg) = new_transaction_register_receiver.recv() => { self.handle_register_new_transaction(msg) }, - Some(msg) = new_transaction_receiver.recv() => { self.handle_notify_new_transaction(msg).await }, + Some(msg) = new_transaction_receiver.recv() => { self.handle_notify_new_transaction(msg) }, Some(msg) = proposed_transaction_register_receiver.recv() => { self.handle_register_proposed_transaction(msg) }, - Some(msg) = proposed_transaction_receiver.recv() => { self.handle_notify_proposed_transaction(msg).await }, + Some(msg) = proposed_transaction_receiver.recv() => { self.handle_notify_proposed_transaction(msg) }, Some(msg) = reject_transaction_register_receiver.recv() => { self.handle_register_reject_transaction(msg) }, - Some(msg) = reject_transaction_receiver.recv() => { self.handle_notify_reject_transaction(msg).await }, + Some(msg) = reject_transaction_receiver.recv() => { self.handle_notify_reject_transaction(msg) }, Some(msg) = network_alert_register_receiver.recv() => { self.handle_register_network_alert(msg) }, - Some(msg) = network_alert_receiver.recv() => { self.handle_notify_network_alert(msg).await }, + Some(msg) = network_alert_receiver.recv() => { self.handle_notify_network_alert(msg) }, else => break, } } @@ -144,6 +194,7 @@ impl NotifyService { NotifyController { new_block_register, + new_block_watcher, new_block_notifier: new_block_sender, new_transaction_register, new_transaction_notifier: new_transaction_sender, @@ -162,6 +213,17 @@ impl NotifyService { } } + fn handle_watch_new_block(&mut self, msg: Request>) { + let Request { + responder, + arguments: name, + } = msg; + debug!("Watch new_block {:?}", name); + let (sender, receiver) = watch::channel(Byte32::zero()); + self.new_block_watchers.insert(name, sender); + let _ = responder.send(receiver); + } + fn handle_register_new_block(&mut self, msg: Request>) { let Request { responder, @@ -173,22 +235,44 @@ impl NotifyService { let _ = responder.send(receiver); } - async fn handle_notify_new_block(&mut self, block: BlockView) { + fn handle_notify_new_block(&self, block: BlockView) { trace!("event new block {:?}", block); + let block_hash = block.hash(); // notify all subscribers for subscriber in self.new_block_subscribers.values() { - let _ = subscriber.send(block.clone()).await; + let block = block.clone(); + let subscriber = subscriber.clone(); + self.handle.spawn(async move { + if let Err(e) = subscriber.send(block).await { + error!("notify new block error {}", e); + } + }); } - // notify script - if let Some(script) = self.config.new_block_notify_script.as_ref() { - let args = [format!("{:#x}", block.hash())]; - if let Err(err) = Command::new(script).args(&args).status() { - error!( - "failed to run new_block_notify_script: {} {}, error: {}", - script, args[0], err - ); + + // notify all watchers + for watcher in self.new_block_watchers.values() { + if let Err(e) = watcher.send(block_hash.clone()) { + error!("notify new block watcher error {}", e); } } + + // notify script + if let Some(script) = self.config.new_block_notify_script.clone() { + let script_timeout = self.timeout.script; + self.handle.spawn(async move { + let args = [format!("{:#x}", block_hash)]; + match timeout(script_timeout, Command::new(&script).args(&args).status()).await { + Ok(ret) => match ret { + Ok(status) => debug!("the new_block_notify script exited with: {}", status), + Err(e) => error!( + "failed to run new_block_notify_script: {} {:?}, error: {}", + script, args[0], e + ), + }, + Err(_) => ckb_logger::warn!("new_block_notify_script {} timed out", script), + } + }); + } } fn handle_register_new_transaction( @@ -205,11 +289,19 @@ impl NotifyService { let _ = responder.send(receiver); } - async fn handle_notify_new_transaction(&mut self, tx_entry: PoolTransactionEntry) { + fn handle_notify_new_transaction(&self, tx_entry: PoolTransactionEntry) { trace!("event new tx {:?}", tx_entry); // notify all subscribers + let tx_timeout = self.timeout.tx; + // notify all subscribers for subscriber in self.new_transaction_subscribers.values() { - let _ = subscriber.send(tx_entry.clone()).await; + let tx_entry = tx_entry.clone(); + let subscriber = subscriber.clone(); + self.handle.spawn(async move { + if let Err(e) = subscriber.send_timeout(tx_entry, tx_timeout).await { + error!("notify new transaction error {}", e); + } + }); } } @@ -227,11 +319,19 @@ impl NotifyService { let _ = responder.send(receiver); } - async fn handle_notify_proposed_transaction(&mut self, tx_entry: PoolTransactionEntry) { + fn handle_notify_proposed_transaction(&self, tx_entry: PoolTransactionEntry) { trace!("event proposed tx {:?}", tx_entry); // notify all subscribers + let tx_timeout = self.timeout.tx; + // notify all subscribers for subscriber in self.proposed_transaction_subscribers.values() { - let _ = subscriber.send(tx_entry.clone()).await; + let tx_entry = tx_entry.clone(); + let subscriber = subscriber.clone(); + self.handle.spawn(async move { + if let Err(e) = subscriber.send_timeout(tx_entry, tx_timeout).await { + error!("notify proposed transaction error {}", e); + } + }); } } @@ -249,11 +349,19 @@ impl NotifyService { let _ = responder.send(receiver); } - async fn handle_notify_reject_transaction(&mut self, tx_entry: (PoolTransactionEntry, Reject)) { + fn handle_notify_reject_transaction(&self, tx_entry: (PoolTransactionEntry, Reject)) { trace!("event reject tx {:?}", tx_entry); // notify all subscribers + let tx_timeout = self.timeout.tx; + // notify all subscribers for subscriber in self.reject_transaction_subscribers.values() { - let _ = subscriber.send(tx_entry.clone()).await; + let tx_entry = tx_entry.clone(); + let subscriber = subscriber.clone(); + self.handle.spawn(async move { + if let Err(e) = subscriber.send_timeout(tx_entry, tx_timeout).await { + error!("notify reject transaction error {}", e); + } + }); } } @@ -268,27 +376,45 @@ impl NotifyService { let _ = responder.send(receiver); } - async fn handle_notify_network_alert(&mut self, alert: Alert) { + fn handle_notify_network_alert(&self, alert: Alert) { trace!("event network alert {:?}", alert); + let alert_timeout = self.timeout.alert; + let message = alert + .as_reader() + .raw() + .message() + .as_utf8() + .expect("alert message should be utf8") + .to_owned(); // notify all subscribers for subscriber in self.network_alert_subscribers.values() { - let _ = subscriber.send(alert.clone()).await; + let subscriber = subscriber.clone(); + let alert = alert.clone(); + self.handle.spawn(async move { + if let Err(e) = subscriber.send_timeout(alert, alert_timeout).await { + error!("notify network_alert error {}", e); + } + }); } + // notify script - if let Some(script) = self.config.network_alert_notify_script.as_ref() { - let args = [alert - .as_reader() - .raw() - .message() - .as_utf8() - .expect("alert message should be utf8") - .to_owned()]; - if let Err(err) = Command::new(script).args(&args).status() { - error!( - "failed to run network_alert_notify_script: {} {}, error: {}", - script, args[0], err - ); - } + if let Some(script) = self.config.network_alert_notify_script.clone() { + let script_timeout = self.timeout.script; + self.handle.spawn(async move { + let args = [message]; + match timeout(script_timeout, Command::new(&script).args(&args).status()).await { + Ok(ret) => match ret { + Ok(status) => { + debug!("the network_alert_notify script exited with: {}", status) + } + Err(e) => error!( + "failed to run network_alert_notify_script: {} {}, error: {}", + script, args[0], e + ), + }, + Err(_) => ckb_logger::warn!("network_alert_notify_script {} timed out", script), + } + }); } } } @@ -301,11 +427,20 @@ impl NotifyController { .expect("Subscribe new block should be OK") } + /// watch new block notify + pub async fn watch_new_block(&self, name: S) -> watch::Receiver { + Request::call(&self.new_block_watcher, name.to_string()) + .await + .expect("Watch new block should be OK") + } + /// TODO(doc): @quake pub fn notify_new_block(&self, block: BlockView) { let new_block_notifier = self.new_block_notifier.clone(); self.handle.spawn(async move { - let _ = new_block_notifier.send(block).await; + if let Err(e) = new_block_notifier.send(block).await { + error!("notify_new_block channel is closed: {}", e); + } }); } @@ -323,7 +458,9 @@ impl NotifyController { pub fn notify_new_transaction(&self, tx_entry: PoolTransactionEntry) { let new_transaction_notifier = self.new_transaction_notifier.clone(); self.handle.spawn(async move { - let _ = new_transaction_notifier.send(tx_entry).await; + if let Err(e) = new_transaction_notifier.send(tx_entry).await { + error!("notify_new_transaction channel is closed: {}", e); + } }); } @@ -341,7 +478,9 @@ impl NotifyController { pub fn notify_proposed_transaction(&self, tx_entry: PoolTransactionEntry) { let proposed_transaction_notifier = self.proposed_transaction_notifier.clone(); self.handle.spawn(async move { - let _ = proposed_transaction_notifier.send(tx_entry).await; + if let Err(e) = proposed_transaction_notifier.send(tx_entry).await { + error!("notify_proposed_transaction channel is closed: {}", e); + } }); } @@ -359,7 +498,9 @@ impl NotifyController { pub fn notify_reject_transaction(&self, tx_entry: PoolTransactionEntry, reject: Reject) { let reject_transaction_notifier = self.reject_transaction_notifier.clone(); self.handle.spawn(async move { - let _ = reject_transaction_notifier.send((tx_entry, reject)).await; + if let Err(e) = reject_transaction_notifier.send((tx_entry, reject)).await { + error!("notify_reject_transaction channel is closed: {}", e); + } }); } @@ -374,7 +515,9 @@ impl NotifyController { pub fn notify_network_alert(&self, alert: Alert) { let network_alert_notifier = self.network_alert_notifier.clone(); self.handle.spawn(async move { - let _ = network_alert_notifier.send(alert).await; + if let Err(e) = network_alert_notifier.send(alert).await { + error!("notify_network_alert channel is closed: {}", e); + } }); } } diff --git a/rpc/src/tests/examples.rs b/rpc/src/tests/examples.rs index 1f86112bc0..fafaf95b39 100644 --- a/rpc/src/tests/examples.rs +++ b/rpc/src/tests/examples.rs @@ -188,7 +188,7 @@ fn setup_rpc_test_suite(height: u64) -> RpcTestSuite { )); let notify_controller = - NotifyService::new(Default::default()).start(shared.async_handle().clone()); + NotifyService::new(Default::default(), shared.async_handle().clone()).start(); let (alert_notifier, alert_verifier) = { let alert_relayer = AlertRelayer::new( "0.1.0".to_string(), diff --git a/util/app-config/src/configs/notify.rs b/util/app-config/src/configs/notify.rs index 3772c9f472..dddfc80628 100644 --- a/util/app-config/src/configs/notify.rs +++ b/util/app-config/src/configs/notify.rs @@ -11,4 +11,58 @@ pub struct Config { /// /// The script is called with the alert message as the argument. pub network_alert_notify_script: Option, + + /// Notify tx timeout in milliseconds + #[serde(default, deserialize_with = "at_least_100")] + pub notify_tx_timeout: Option, + + /// Notify alert timeout in milliseconds + #[serde(default, deserialize_with = "at_least_100")] + pub notify_alert_timeout: Option, + + /// Notify alert timeout in milliseconds + #[serde(default, deserialize_with = "at_least_100")] + pub script_timeout: Option, +} + +fn at_least_100<'de, D>(d: D) -> Result, D::Error> +where + D: serde::de::Deserializer<'de>, +{ + let op = Option::::deserialize(d)?; + + if let Some(ref value) = op { + if value < &100 { + return Err(serde::de::Error::invalid_value( + serde::de::Unexpected::Unsigned(*value), + &"a value at least 100", + )); + } + } + Ok(op) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_deserialize() { + let s = r#" + new_block_notify_script = "dasd" + network_alert_notify_script = "dasd" + script_timeout = 1 + "#; + + let ret = toml::from_str::(s); + assert!(ret.is_err()); + + let s = r#" + new_block_notify_script = "dasd" + network_alert_notify_script = "dasd" + script_timeout = 100 + "#; + let ret = toml::from_str::(s); + assert!(ret.is_ok()); + } } diff --git a/util/indexer/src/service.rs b/util/indexer/src/service.rs index d6abe36628..8e20791dcb 100644 --- a/util/indexer/src/service.rs +++ b/util/indexer/src/service.rs @@ -188,20 +188,21 @@ impl IndexerService { let poll_service = self.clone(); self.async_handle.spawn(async move { let _initial_finished = initial_syncing.await; - let mut new_block_receiver = notify_controller - .subscribe_new_block(SUBSCRIBER_NAME.to_string()) + let mut new_block_watcher = notify_controller + .watch_new_block(SUBSCRIBER_NAME.to_string()) .await; let mut interval = time::interval(poll_service.poll_interval); interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip); loop { tokio::select! { - Some(_) = new_block_receiver.recv() => { + Ok(_) = new_block_watcher.changed() => { let service = poll_service.clone(); if let Err(e) = async_handle.spawn_blocking(move || { service.try_loop_sync() }).await { error!("ckb indexer syncing join error {:?}", e); } + new_block_watcher.borrow_and_update(); }, _ = interval.tick() => { let service = poll_service.clone(); diff --git a/util/launcher/src/shared_builder.rs b/util/launcher/src/shared_builder.rs index ee3a11f8fc..87fa8d1c4b 100644 --- a/util/launcher/src/shared_builder.rs +++ b/util/launcher/src/shared_builder.rs @@ -406,7 +406,7 @@ impl SharedPackage { } fn start_notify_service(notify_config: NotifyConfig, handle: Handle) -> NotifyController { - NotifyService::new(notify_config).start(handle) + NotifyService::new(notify_config, handle).start() } fn build_store( diff --git a/util/network-alert/src/tests/test_notifier.rs b/util/network-alert/src/tests/test_notifier.rs index 955affc585..6b6a1f9bd8 100644 --- a/util/network-alert/src/tests/test_notifier.rs +++ b/util/network-alert/src/tests/test_notifier.rs @@ -31,13 +31,15 @@ fn new_notifier(version: &str) -> Notifier { } let notify_controller = RUNTIME_HANDLE.with(|runtime| { - NotifyService::new(Default::default()).start( + NotifyService::new( + Default::default(), runtime .borrow() .get_or_init(new_background_runtime) .0 .clone(), ) + .start() }); Notifier::new(version.into(), notify_controller) }