Skip to content

Commit

Permalink
http: Shutdown cleanly, support ephemeral binding on port 0
Browse files Browse the repository at this point in the history
Kind of unrelated, but the changes are interleaved.
  • Loading branch information
shesek committed Nov 6, 2020
1 parent 3b3f5b8 commit d13fdef
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 15 deletions.
69 changes: 54 additions & 15 deletions src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,27 @@ use std::{net, thread};

use serde::{Deserialize, Deserializer};
use tokio::stream::{self, Stream, StreamExt};
use tokio::sync::mpsc as tmpsc;
use tokio::sync::{mpsc as tmpsc, oneshot};
use warp::http::{header, StatusCode};
use warp::sse::ServerSentEvent;
use warp::{reply, Filter, Reply};
use warp::{self, reply, Filter, Reply};

use bitcoin::{Address, BlockHash, OutPoint, Txid};
use bitcoin_hashes::hex::FromHex;

use crate::error::{fmt_error_chain, BwtError, Error, OptionExt};
use crate::types::{BlockId, ScriptHash};
use crate::util::descriptor::Checksum;
use crate::util::{block_on_future, descriptor::Checksum};
use crate::{store, util::banner, IndexChange, Query};

type SyncChanSender = Arc<Mutex<mpsc::Sender<()>>>;

#[tokio::main]
async fn run(
addr: net::SocketAddr,
fn setup(
cors: Option<String>,
query: Arc<Query>,
sync_tx: SyncChanSender,
listeners: Listeners,
) {
) -> warp::Server<impl warp::Filter<Extract = impl warp::Reply> + Clone> {
let query = warp::any().map(move || Arc::clone(&query));
let sync_tx = warp::any().map(move || Arc::clone(&sync_tx));
let listeners = warp::any().map(move || Arc::clone(&listeners));
Expand Down Expand Up @@ -464,14 +462,34 @@ async fn run(
.with(warp::log("bwt::http"))
.with(warp::reply::with::headers(headers));

info!("HTTP REST API server starting on http://{}/", addr);
warp::serve(handlers)
}

#[tokio::main]
async fn spawn<S>(
warp_server: warp::Server<S>,
addr: net::SocketAddr,
addr_tx: oneshot::Sender<net::SocketAddr>,
shutdown_rx: oneshot::Receiver<()>,
) where
S: warp::Filter + Clone + Send + Sync + 'static,
S::Extract: warp::Reply,
{
let (bound_addr, server_ft) = warp_server.bind_with_graceful_shutdown(addr, async {
shutdown_rx.await.ok();
});

// Send back the bound address, useful for binding on port 0
addr_tx.send(bound_addr).unwrap();

warp::serve(handlers).run(addr).await
server_ft.await
}

pub struct HttpServer {
_thread: thread::JoinHandle<()>,
addr: net::SocketAddr,
listeners: Listeners,
shutdown_tx: Option<oneshot::Sender<()>>,
thread: Option<thread::JoinHandle<()>>,
}

impl HttpServer {
Expand All @@ -481,16 +499,25 @@ impl HttpServer {
query: Arc<Query>,
sync_tx: mpsc::Sender<()>,
) -> Self {
let listeners = Arc::new(Mutex::new(Vec::new()));
let sync_tx = Arc::new(Mutex::new(sync_tx));
let warp_server = setup(cors, query, sync_tx, listeners.clone());

let (shutdown_tx, shutdown_rx) = oneshot::channel();
let (addr_tx, addr_rx) = oneshot::channel();

let listeners: Listeners = Arc::new(Mutex::new(Vec::new()));
let thr_listeners = Arc::clone(&listeners);
let thread = thread::spawn(move || {
spawn(warp_server, addr, addr_tx, shutdown_rx);
});

let bound_addr = block_on_future(addr_rx).unwrap();
info!("HTTP REST API server running on http://{}/", bound_addr);

HttpServer {
_thread: thread::spawn(move || {
run(addr, cors, query, sync_tx, thr_listeners);
}),
listeners,
addr: bound_addr,
shutdown_tx: Some(shutdown_tx),
thread: Some(thread),
}
}

Expand All @@ -512,6 +539,18 @@ impl HttpServer {
.all(|change| listener.tx.send(change.clone()).is_ok())
})
}

pub fn addr(&self) -> &net::SocketAddr {
&self.addr
}
}

impl Drop for HttpServer {
fn drop(&mut self) {
trace!("HTTP server shutting down");
self.shutdown_tx.take().unwrap().send(()).unwrap();
self.thread.take().unwrap().join().unwrap();
}
}

type Listeners = Arc<Mutex<Vec<Listener>>>;
Expand Down
11 changes: 11 additions & 0 deletions src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,17 @@ pub fn debounce_sender(forward_tx: mpsc::Sender<()>, duration: u64) -> mpsc::Sen
debounce_tx
}

/// Wait for the future to resolve, blocking the current thread until it does
#[cfg(feature = "tokio")]
pub fn block_on_future<F: std::future::Future>(future: F) -> F::Output {
let mut rt = tokio::runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
.unwrap();
rt.block_on(future)
}

pub trait BoolThen {
// Similar to https://doc.rust-lang.org/std/primitive.bool.html#method.then (nightly only)
fn do_then<T>(self, f: impl FnOnce() -> T) -> Option<T>;
Expand Down

0 comments on commit d13fdef

Please sign in to comment.