Skip to content

Commit

Permalink
Cleanly shutdown the App syncer
Browse files Browse the repository at this point in the history
Listens for SIGINT and SIGTERM signals by default,
also supports manual shutdown via an mpsc channel.
  • Loading branch information
shesek committed Nov 6, 2020
1 parent a0d5097 commit 3b3f5b8
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 15 deletions.
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ tokio = { version = "0.2.20", features = ["macros"], optional = true }
warp = { version = "0.2.3", optional = true }
reqwest = { version = "0.10.6", optional = true, features = ["json", "blocking"] }

[target.'cfg(unix)'.dependencies]
signal-hook = "0.1.16"

# Statically link OpenSSL when cross-compiling to ARM
# OpenSSL is currently disabled on ARM, see https://github.com/shesek/bwt/issues/52
Expand Down
15 changes: 12 additions & 3 deletions examples/use-from-rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,24 @@ use structopt::StructOpt;
// Use the HTTP API if you want stronger backwards compatibility guarantees.

fn main() -> Result<()> {
let config = Config::from_args(); // or construct manually with Config { ... }
// Initialize the config
let config = Config::from_args();
config.setup_logger(); // optional

// Boot up bwt. Blocks the thread until the initial sync is completed.
let app = App::boot(config)?;

// The index is now ready for querying
let query = app.query();
println!("{:?}", query.get_tip()?);

std::thread::spawn(move || app.sync());
// Start syncing new blocks/transactions in the background
let (shutdown_tx, shutdown_rx) = std::sync::mpsc::channel();
std::thread::spawn(move || app.sync(Some(shutdown_rx)));

println!("{:?}", query.get_tip()?);
// You can shutdown the app by sending a message to `shutdown_tx`.
// This will also happen automatically when its dropped out of scope.
shutdown_tx.send(());

Ok(())
}
60 changes: 55 additions & 5 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl App {

let (sync_tx, sync_rx) = mpsc::channel();
// debounce sync message rate to avoid excessive indexing when bitcoind catches up
let sync_tx = debounce_sender(sync_tx, DEBOUNCE_SEC);
let debounced_sync_tx = debounce_sender(sync_tx.clone(), DEBOUNCE_SEC);

#[cfg(feature = "electrum")]
let electrum = ElectrumServer::start(
Expand All @@ -80,13 +80,13 @@ impl App {
config.http_server_addr,
config.http_cors.clone(),
query.clone(),
sync_tx.clone(),
debounced_sync_tx.clone(),
);

#[cfg(unix)]
{
if let Some(listener_path) = &config.unix_listener_path {
listener::start(listener_path.clone(), sync_tx.clone());
listener::start(listener_path.clone(), debounced_sync_tx.clone());
}
}

Expand All @@ -108,8 +108,19 @@ impl App {
}

/// Start a sync loop blocking the current thread
pub fn sync(self) {
pub fn sync(&self, shutdown_rx: Option<mpsc::Receiver<()>>) {
let shutdown_rx = shutdown_rx
.map(|rx| self.pipe_shutdown(rx))
.or_else(|| self.default_shutdown_signal());

loop {
if let Some(shutdown_rx) = &shutdown_rx {
match shutdown_rx.try_recv() {
Err(mpsc::TryRecvError::Empty) => (),
Ok(()) | Err(mpsc::TryRecvError::Disconnected) => break,
}
}

#[allow(clippy::option_map_unit_fn)]
match self.indexer.write().unwrap().sync() {
Ok(updates) if !updates.is_empty() => {
Expand All @@ -128,7 +139,8 @@ impl App {
Err(e) => warn!("error while updating index: {:#?}", e),
}

// wait for poll_interval seconds, or until we receive a sync notification message
// wait for poll_interval seconds, or until we receive a sync notification message,
// or until the shutdown signal is emitted
self.sync_chan
.1
.recv_timeout(self.config.poll_interval)
Expand All @@ -140,6 +152,44 @@ impl App {
pub fn query(&self) -> Arc<Query> {
self.query.clone()
}

// Pipe the shutdown receiver `rx` to trigger `sync_tx`. This is needed to start the next
// sync loop run immediately, which will then process the shutdown signal itself. Without
// this, the shutdown signal will only be noticed after a delay.
fn pipe_shutdown(&self, rx: mpsc::Receiver<()>) -> mpsc::Receiver<()> {
let sync_tx = self.sync_chan.0.clone();
let (c_tx, c_rx) = mpsc::sync_channel(1);
thread::spawn(move || {
rx.recv().ok();
c_tx.send(()).unwrap();
sync_tx.send(()).unwrap();
});
c_rx
}

#[cfg(unix)]
fn default_shutdown_signal(&self) -> Option<mpsc::Receiver<()>> {
use signal_hook::iterator::Signals;

let signals = Signals::new(&[signal_hook::SIGINT, signal_hook::SIGTERM]).unwrap();
let (shutdown_tx, shutdown_rx) = mpsc::sync_channel(1);
let sync_tx = self.sync_chan.0.clone();

thread::spawn(move || {
let signal = signals.into_iter().next().unwrap();
trace!("received shutdown signal {}", signal);
shutdown_tx.send(()).unwrap();
// Need to also trigger `sync_tx`, see rational above
sync_tx.send(()).unwrap();
});

Some(shutdown_rx)
}

#[cfg(not(unix))]
fn default_shutdown_signal(&self) -> Option<mpsc::Receiver<()>> {
None
}
}

// Load the specified wallet, ignore "wallet is already loaded" errors
Expand Down
6 changes: 5 additions & 1 deletion src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ fn bind_listener(socket_path: PathBuf, sync_tx: mpsc::Sender<()>) -> std::io::Re
let listener = UnixListener::bind(socket_path)?;
for stream in listener.incoming() {
trace!("received sync notification via unix socket");
sync_tx.send(()).unwrap();
// drop the connection, ignore any errors
stream.and_then(|s| s.shutdown(net::Shutdown::Both)).ok();

if sync_tx.send(()).is_err() {
break;
}
// FIXME the listener thread won't be closed until it receives a connection and attempts to send()
}
Ok(())
}
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ fn main() -> Result<()> {
config.setup_logger();

let app = App::boot(config)?;
app.sync();
app.sync(None);

Ok(())
}
15 changes: 10 additions & 5 deletions src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use std::collections::hash_map::{Entry, HashMap};
use std::time::{Duration, Instant};
use std::{sync::mpsc, thread};

use bitcoin::secp256k1::{self, Secp256k1};
use serde_json::Value;

use bitcoin::secp256k1::{self, Secp256k1};
use bitcoin::Txid;

#[macro_use]
Expand Down Expand Up @@ -82,10 +82,12 @@ pub fn debounce_sender(forward_tx: mpsc::Sender<()>, duration: u64) -> mpsc::Sen
let (debounce_tx, debounce_rx) = mpsc::channel();

thread::spawn(move || {
loop {
'outer: loop {
let tick_start = Instant::now();
// always wait for the first sync message to arrive first
debounce_rx.recv().unwrap();
if debounce_rx.recv().is_err() {
break 'outer;
}
if tick_start.elapsed() < duration {
// if duration hasn't passed, debounce for another `duration` seconds
loop {
Expand All @@ -95,13 +97,16 @@ pub fn debounce_sender(forward_tx: mpsc::Sender<()>, duration: u64) -> mpsc::Sen
Ok(()) => continue,
// if we timed-out, we're good!
Err(mpsc::RecvTimeoutError::Timeout) => break,
Err(mpsc::RecvTimeoutError::Disconnected) => panic!(),
Err(mpsc::RecvTimeoutError::Disconnected) => break 'outer,
}
}
}
info!(target: "bwt::real-time", "triggering real-time index sync");
forward_tx.send(()).unwrap();
if forward_tx.send(()).is_err() {
break 'outer;
}
}
trace!(target: "bwt::real-time", "debounce sync thread shutting down");
});

debounce_tx
Expand Down
1 change: 1 addition & 0 deletions src/webhooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ impl WebHookNotifier {
.ok();
}
}
trace!("webhooks shutting down");
}),
tx,
num_urls,
Expand Down

0 comments on commit 3b3f5b8

Please sign in to comment.