From d13fdef776f750ca986bb84aa2c2e4695e72383d Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Thu, 5 Nov 2020 23:59:04 +0200 Subject: [PATCH] http: Shutdown cleanly, support ephemeral binding on port 0 Kind of unrelated, but the changes are interleaved. --- src/http.rs | 69 ++++++++++++++++++++++++++++++++++++++----------- src/util/mod.rs | 11 ++++++++ 2 files changed, 65 insertions(+), 15 deletions(-) diff --git a/src/http.rs b/src/http.rs index a9705bd..c3002f5 100644 --- a/src/http.rs +++ b/src/http.rs @@ -3,29 +3,27 @@ use std::{net, thread}; use serde::{Deserialize, Deserializer}; use tokio::stream::{self, Stream, StreamExt}; -use tokio::sync::mpsc as tmpsc; +use tokio::sync::{mpsc as tmpsc, oneshot}; use warp::http::{header, StatusCode}; use warp::sse::ServerSentEvent; -use warp::{reply, Filter, Reply}; +use warp::{self, reply, Filter, Reply}; use bitcoin::{Address, BlockHash, OutPoint, Txid}; use bitcoin_hashes::hex::FromHex; use crate::error::{fmt_error_chain, BwtError, Error, OptionExt}; use crate::types::{BlockId, ScriptHash}; -use crate::util::descriptor::Checksum; +use crate::util::{block_on_future, descriptor::Checksum}; use crate::{store, util::banner, IndexChange, Query}; type SyncChanSender = Arc>>; -#[tokio::main] -async fn run( - addr: net::SocketAddr, +fn setup( cors: Option, query: Arc, sync_tx: SyncChanSender, listeners: Listeners, -) { +) -> warp::Server + Clone> { let query = warp::any().map(move || Arc::clone(&query)); let sync_tx = warp::any().map(move || Arc::clone(&sync_tx)); let listeners = warp::any().map(move || Arc::clone(&listeners)); @@ -464,14 +462,34 @@ async fn run( .with(warp::log("bwt::http")) .with(warp::reply::with::headers(headers)); - info!("HTTP REST API server starting on http://{}/", addr); + warp::serve(handlers) +} + +#[tokio::main] +async fn spawn( + warp_server: warp::Server, + addr: net::SocketAddr, + addr_tx: oneshot::Sender, + shutdown_rx: oneshot::Receiver<()>, +) where + S: warp::Filter + Clone + Send + Sync + 'static, + S::Extract: warp::Reply, +{ + let (bound_addr, server_ft) = warp_server.bind_with_graceful_shutdown(addr, async { + shutdown_rx.await.ok(); + }); + + // Send back the bound address, useful for binding on port 0 + addr_tx.send(bound_addr).unwrap(); - warp::serve(handlers).run(addr).await + server_ft.await } pub struct HttpServer { - _thread: thread::JoinHandle<()>, + addr: net::SocketAddr, listeners: Listeners, + shutdown_tx: Option>, + thread: Option>, } impl HttpServer { @@ -481,16 +499,25 @@ impl HttpServer { query: Arc, sync_tx: mpsc::Sender<()>, ) -> Self { + let listeners = Arc::new(Mutex::new(Vec::new())); let sync_tx = Arc::new(Mutex::new(sync_tx)); + let warp_server = setup(cors, query, sync_tx, listeners.clone()); + + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let (addr_tx, addr_rx) = oneshot::channel(); - let listeners: Listeners = Arc::new(Mutex::new(Vec::new())); - let thr_listeners = Arc::clone(&listeners); + let thread = thread::spawn(move || { + spawn(warp_server, addr, addr_tx, shutdown_rx); + }); + + let bound_addr = block_on_future(addr_rx).unwrap(); + info!("HTTP REST API server running on http://{}/", bound_addr); HttpServer { - _thread: thread::spawn(move || { - run(addr, cors, query, sync_tx, thr_listeners); - }), listeners, + addr: bound_addr, + shutdown_tx: Some(shutdown_tx), + thread: Some(thread), } } @@ -512,6 +539,18 @@ impl HttpServer { .all(|change| listener.tx.send(change.clone()).is_ok()) }) } + + pub fn addr(&self) -> &net::SocketAddr { + &self.addr + } +} + +impl Drop for HttpServer { + fn drop(&mut self) { + trace!("HTTP server shutting down"); + self.shutdown_tx.take().unwrap().send(()).unwrap(); + self.thread.take().unwrap().join().unwrap(); + } } type Listeners = Arc>>; diff --git a/src/util/mod.rs b/src/util/mod.rs index b951517..dc176d6 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -112,6 +112,17 @@ pub fn debounce_sender(forward_tx: mpsc::Sender<()>, duration: u64) -> mpsc::Sen debounce_tx } +/// Wait for the future to resolve, blocking the current thread until it does +#[cfg(feature = "tokio")] +pub fn block_on_future(future: F) -> F::Output { + let mut rt = tokio::runtime::Builder::new() + .basic_scheduler() + .enable_all() + .build() + .unwrap(); + rt.block_on(future) +} + pub trait BoolThen { // Similar to https://doc.rust-lang.org/std/primitive.bool.html#method.then (nightly only) fn do_then(self, f: impl FnOnce() -> T) -> Option;