Skip to content

Commit

Permalink
fix: return bound endpoints after server started
Browse files Browse the repository at this point in the history
The impetus behind the change is to support adding monitoring endpoints with
an optional additional TCP listener. While refactoring the server it was
discovered that the run method moves the Server self, and so fixed bytecodealliance#112 to
also ensure that the API endpoint was running before tests could discover the
listener's locally bound port.

In addition a common cancellation token is added to the server to propagate
shutdown to both the core service as well as the API server endpoint. It can be
later used to support additional services later such as a monitoring
endpoint(s).
  • Loading branch information
kriswuollett committed May 12, 2023
1 parent 3d99311 commit eb1767a
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 42 deletions.
122 changes: 84 additions & 38 deletions crates/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use std::{
sync::Arc,
time::Duration,
};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tower_http::{
trace::{DefaultMakeSpan, DefaultOnResponse, TraceLayer},
LatencyUnit,
Expand All @@ -30,7 +32,7 @@ const DEFAULT_CHECKPOINT_INTERVAL: Duration = Duration::from_secs(5);

/// The server configuration.
pub struct Config {
operator_key: PrivateKey,
operator_key: Option<PrivateKey>,
addr: Option<SocketAddr>,
data_store: Option<Box<dyn datastore::DataStore>>,
content_dir: Option<PathBuf>,
Expand Down Expand Up @@ -58,7 +60,7 @@ impl Config {
/// Creates a new server configuration.
pub fn new(operator_key: PrivateKey) -> Self {
Self {
operator_key,
operator_key: Some(operator_key),
addr: None,
data_store: None,
content_dir: None,
Expand Down Expand Up @@ -120,60 +122,67 @@ impl Config {
/// Represents the warg registry server.
pub struct Server {
config: Config,
listener: Option<TcpListener>,
endpoints: Option<Endpoints>,
token: CancellationToken,
tasks: JoinSet<Result<()>>,
}

/// The bound endpoints for the warg registry server.
#[derive(Clone)]
pub struct Endpoints {
pub api: SocketAddr,
}

impl Server {
/// Creates a new server with the given configuration.
pub fn new(config: Config) -> Self {
Self {
config,
listener: None,
token: CancellationToken::new(),
endpoints: None,
tasks: JoinSet::new(),
}
}

/// Binds the server to the configured address.
/// Starts the server on the configured address.
///
/// Returns the address the server bound to.
pub fn bind(&mut self) -> Result<SocketAddr> {
/// Returns the endpoints the server bound to.
pub async fn start(&mut self) -> Result<Endpoints> {
assert!(
self.endpoints.is_none(),
"cannot start server multiple times"
);

tracing::debug!(
"using server configuration: {config:?}",
config = self.config
);

let addr = self
.config
.addr
.to_owned()
.unwrap_or_else(|| DEFAULT_BIND_ADDRESS.parse().unwrap());

tracing::debug!("binding server to address `{addr}`");
tracing::debug!("binding api endpoint to address `{addr}`");
let listener = TcpListener::bind(addr)
.with_context(|| format!("failed to bind to address `{addr}`"))?;
.with_context(|| format!("failed to bind api endpoint to address `{addr}`"))?;

let addr = listener
.local_addr()
.context("failed to get local address for listen socket")?;

tracing::debug!("server bound to address `{addr}`");
self.config.addr = Some(addr);
self.listener = Some(listener);
Ok(addr)
}

/// Runs the server.
pub async fn run(mut self) -> Result<()> {
if self.listener.is_none() {
self.bind()?;
}

let listener = self.listener.unwrap();
.context("failed to get local address for api endpoint listen socket")?;
tracing::debug!("api endpoint bound to address `{addr}`");

tracing::debug!(
"using server configuration: {config:?}",
config = self.config
);
let endpoints = Endpoints { api: addr };
self.endpoints = Some(endpoints.to_owned());

let store = self
.config
.data_store
.take()
.unwrap_or_else(|| Box::<MemoryDataStore>::default());
let (core, handle) = CoreService::spawn(
self.config.operator_key,
self.config.operator_key.take().unwrap(),
store,
self.config
.checkpoint_interval
Expand All @@ -183,32 +192,69 @@ impl Server {

let server = axum::Server::from_tcp(listener)?.serve(
Self::create_router(
format!("http://{addr}", addr = self.config.addr.unwrap()),
self.config.content_dir,
format!("http://{addr}", addr = endpoints.api),
self.config.content_dir.take(),
core,
)?
.into_make_service(),
);

tracing::info!("listening on {addr}", addr = self.config.addr.unwrap());
tracing::info!("api endpoint listening on {addr}", addr = endpoints.api);

if let Some(shutdown) = self.config.shutdown {
tracing::debug!("server is running with a shutdown signal");
// Shut down core service when token cancelled.
let token = self.token.clone();
self.tasks.spawn(async move {
token.cancelled().await;
tracing::info!("waiting for core service to stop");
handle.stop().await;
Ok(())
});

// Shut down server when token cancelled.
let token: CancellationToken = self.token.clone();
self.tasks.spawn(async move {
tracing::info!("waiting for api endpoint to stop");
server
.with_graceful_shutdown(async move { shutdown.await })
.with_graceful_shutdown(async move { token.cancelled().await })
.await?;
Ok(())
});

// Cancel token if shutdown signal received.
if let Some(shutdown) = self.config.shutdown.take() {
tracing::debug!("server is running with a shutdown signal");
let token = self.token.clone();
tokio::spawn(async move {
tracing::info!("waiting for shutdown signal");
shutdown.await;
tracing::info!("shutting down server");
token.cancel();
});
} else {
tracing::debug!("server is running without a shutdown signal");
server.await?;
}

tracing::info!("waiting for core service to stop");
handle.stop().await;
Ok(endpoints)
}

/// Waits on a started server to shutdown.
pub async fn join(&mut self) -> Result<()> {
while (self.tasks.join_next().await).is_some() {}
tracing::info!("server shutdown complete");
Ok(())
}

/// Starts the server on the configured address and waits for completion.
pub async fn run(&mut self) -> Result<()> {
self.start().await?;
self.join().await?;
Ok(())
}

pub fn stop(&mut self) {
self.token.cancel();
}

fn create_router(
base_url: String,
content_dir: Option<PathBuf>,
Expand Down
1 change: 0 additions & 1 deletion tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::{fs, str::FromStr};
use warg_client::{api, Config, FileSystemClient, StorageLockResult};
use warg_crypto::{signing::PrivateKey, Encode, Signable};
use wit_component::DecodedWasm;

mod support;

#[cfg(feature = "postgres")]
Expand Down
6 changes: 3 additions & 3 deletions tests/support/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ pub async fn spawn_server(
}

let mut server = Server::new(config);
let addr = server.bind()?;
let endpoints = server.start().await?;

let task = tokio::spawn(async move {
server.run().await.unwrap();
server.join().await.unwrap();
});

let instance = ServerInstance {
Expand All @@ -94,7 +94,7 @@ pub async fn spawn_server(
};

let config = warg_client::Config {
default_url: Some(format!("http://{addr}")),
default_url: Some(format!("http://{addr}", addr = endpoints.api)),
registries_dir: Some(root.join("registries")),
content_dir: Some(root.join("content")),
};
Expand Down

0 comments on commit eb1767a

Please sign in to comment.