Skip to content

Commit

Permalink
Refactor the relationship between the assorted web / websocket servers (
Browse files Browse the repository at this point in the history
#1844)

* Refactor the relationship between the servers and handles
* Rename RemoteViewerServer to WebViewerSink
* CLI arguments for specifying ports
* Proper typing for the ports
  • Loading branch information
jleibs authored Apr 14, 2023
1 parent 31237fe commit f6f5b7d
Show file tree
Hide file tree
Showing 18 changed files with 417 additions and 173 deletions.
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/re_viewer/src/remote_viewer_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl RemoteViewerApp {
}
}
Err(err) => {
re_log::error!("Failed to parse message: {}", re_error::format(&err));
re_log::error!("Failed to parse message: {err}");
std::ops::ControlFlow::Break(())
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/re_viewer/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ fn get_url(info: &eframe::IntegrationInfo) -> String {
url = param.clone();
}
if url.is_empty() {
re_ws_comms::default_server_url(&info.web_info.location.hostname)
re_ws_comms::server_url(&info.web_info.location.hostname, Default::default())
} else {
url
}
Expand Down
2 changes: 1 addition & 1 deletion crates/re_web_viewer_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ analytics = ["dep:re_analytics"]
[dependencies]
re_log.workspace = true

anyhow.workspace = true
ctrlc.workspace = true
document-features = "0.2"
futures-util = "0.3"
hyper = { version = "0.14", features = ["full"] }
thiserror.workspace = true
tokio = { workspace = true, default-features = false, features = [
"macros",
"rt-multi-thread",
Expand Down
118 changes: 110 additions & 8 deletions crates/re_web_viewer_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,17 @@
#![forbid(unsafe_code)]
#![warn(clippy::all, rust_2018_idioms)]

use std::task::{Context, Poll};
use std::{
fmt::Display,
str::FromStr,
task::{Context, Poll},
};

use futures_util::future;
use hyper::{server::conn::AddrIncoming, service::Service, Body, Request, Response};

pub const DEFAULT_WEB_VIEWER_SERVER_PORT: u16 = 9090;

#[cfg(not(feature = "__ci"))]
mod data {
// If you add/remove/change the paths here, also update the include-list in `Cargo.toml`!
Expand All @@ -32,6 +38,21 @@ mod data {
pub const VIEWER_WASM_RELEASE: &[u8] = include_bytes!("../web_viewer/re_viewer_bg.wasm");
}

#[derive(thiserror::Error, Debug)]
pub enum WebViewerServerError {
#[error("Could not parse address: {0}")]
AddrParseFailed(#[from] std::net::AddrParseError),

#[error("failed to bind to port {0}: {1}")]
BindFailed(WebViewerServerPort, hyper::Error),

#[error("failed to join web viewer server task: {0}")]
JoinError(#[from] tokio::task::JoinError),

#[error("failed to serve web viewer: {0}")]
ServeFailed(hyper::Error),
}

struct Svc {
// NOTE: Optional because it is possible to have the `analytics` feature flag enabled
// while at the same time opting-out of analytics at run-time.
Expand Down Expand Up @@ -149,27 +170,108 @@ impl<T> Service<T> for MakeSvc {

// ----------------------------------------------------------------------------

/// Hosts the Web Viewer Wasm+HTML
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
/// Typed port for use with [`WebViewerServer`]
pub struct WebViewerServerPort(pub u16);

impl Default for WebViewerServerPort {
fn default() -> Self {
Self(DEFAULT_WEB_VIEWER_SERVER_PORT)
}
}

impl Display for WebViewerServerPort {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

// Needed for clap
impl FromStr for WebViewerServerPort {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.parse::<u16>() {
Ok(port) => Ok(WebViewerServerPort(port)),
Err(err) => Err(format!("Failed to parse port: {err}")),
}
}
}

/// HTTP host for the Rerun Web Viewer application
/// This serves the HTTP+Wasm+JS files that make up the web-viewer.
pub struct WebViewerServer {
server: hyper::Server<AddrIncoming, MakeSvc>,
}

impl WebViewerServer {
pub fn new(port: u16) -> Self {
let bind_addr = format!("0.0.0.0:{port}").parse().unwrap();
let server = hyper::Server::bind(&bind_addr).serve(MakeSvc);
Self { server }
/// Create new [`WebViewerServer`] to host the Rerun Web Viewer on a specified port.
///
/// A port of 0 will let the OS choose a free port.
pub fn new(port: WebViewerServerPort) -> Result<Self, WebViewerServerError> {
let bind_addr = format!("0.0.0.0:{port}").parse()?;
let server = hyper::Server::try_bind(&bind_addr)
.map_err(|e| WebViewerServerError::BindFailed(port, e))?
.serve(MakeSvc);
Ok(Self { server })
}

pub async fn serve(
self,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) -> anyhow::Result<()> {
) -> Result<(), WebViewerServerError> {
self.server
.with_graceful_shutdown(async {
shutdown_rx.recv().await.ok();
})
.await?;
.await
.map_err(WebViewerServerError::ServeFailed)?;
Ok(())
}

pub fn port(&self) -> WebViewerServerPort {
WebViewerServerPort(self.server.local_addr().port())
}
}

/// Sync handle for the [`WebViewerServer`]
///
/// When dropped, the server will be shut down.
pub struct WebViewerServerHandle {
port: WebViewerServerPort,
shutdown_tx: tokio::sync::broadcast::Sender<()>,
}

impl Drop for WebViewerServerHandle {
fn drop(&mut self) {
re_log::info!("Shutting down web server on port {}.", self.port);
self.shutdown_tx.send(()).ok();
}
}

impl WebViewerServerHandle {
/// Create new [`WebViewerServer`] to host the Rerun Web Viewer on a specified port.
/// Returns a [`WebViewerServerHandle`] that will shutdown the server when dropped.
///
/// A port of 0 will let the OS choose a free port.
///
/// The caller needs to ensure that there is a `tokio` runtime running.
pub fn new(requested_port: WebViewerServerPort) -> Result<Self, WebViewerServerError> {
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);

let web_server = WebViewerServer::new(requested_port)?;

let port = web_server.port();

tokio::spawn(async move { web_server.serve(shutdown_rx).await });

re_log::info!("Started web server on port {}.", port);

Ok(Self { port, shutdown_tx })
}

/// Get the port where the HTTP server is listening
pub fn port(&self) -> WebViewerServerPort {
self.port
}
}
3 changes: 2 additions & 1 deletion crates/re_web_viewer_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#[tokio::main]
async fn main() {
re_log::setup_native_logging();
let port = 9090;
let port = Default::default();
eprintln!("Hosting web-viewer on http://127.0.0.1:{port}");

// Shutdown server via Ctrl+C
Expand All @@ -16,6 +16,7 @@ async fn main() {
.expect("Error setting Ctrl-C handler");

re_web_viewer_server::WebViewerServer::new(port)
.expect("Could not create web server")
.serve(shutdown_rx)
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions crates/re_ws_comms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ re_log_types = { workspace = true, features = ["serde"] }
anyhow.workspace = true
bincode = "1.3"
document-features = "0.2"
thiserror.workspace = true

# Client:
ewebsock = { version = "0.2", optional = true }
Expand Down
3 changes: 2 additions & 1 deletion crates/re_ws_comms/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use std::ops::ControlFlow;

use ewebsock::{WsEvent, WsMessage, WsSender};

use crate::Result;
// TODO(jleibs): use thiserror
pub type Result<T> = anyhow::Result<T>;

/// Represents a connection to the server.
/// Disconnects on drop.
Expand Down
69 changes: 58 additions & 11 deletions crates/re_ws_comms/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@
#[cfg(feature = "client")]
mod client;
use std::{fmt::Display, str::FromStr};

#[cfg(feature = "client")]
pub use client::Connection;

#[cfg(feature = "server")]
mod server;
#[cfg(feature = "server")]
pub use server::Server;
pub use server::{RerunServer, RerunServerHandle};

use re_log_types::LogMsg;

pub type Result<T> = anyhow::Result<T>;

pub const DEFAULT_WS_SERVER_PORT: u16 = 9877;

#[cfg(feature = "tls")]
Expand All @@ -26,8 +26,58 @@ pub const PROTOCOL: &str = "wss";
#[cfg(not(feature = "tls"))]
pub const PROTOCOL: &str = "ws";

pub fn default_server_url(hostname: &str) -> String {
format!("{PROTOCOL}://{hostname}:{DEFAULT_WS_SERVER_PORT}")
// ----------------------------------------------------------------------------

#[derive(thiserror::Error, Debug)]
pub enum RerunServerError {
#[error("failed to bind to port {0}: {1}")]
BindFailed(RerunServerPort, std::io::Error),

#[error("received an invalid message")]
InvalidMessagePrefix,

#[error("received an invalid message")]
InvalidMessage(#[from] bincode::Error),

#[cfg(feature = "server")]
#[error("failed to join web viewer server task: {0}")]
JoinError(#[from] tokio::task::JoinError),

#[cfg(feature = "server")]
#[error("tokio error: {0}")]
TokioIoError(#[from] tokio::io::Error),
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
/// Typed port for use with [`RerunServer`]
pub struct RerunServerPort(pub u16);

impl Default for RerunServerPort {
fn default() -> Self {
Self(DEFAULT_WS_SERVER_PORT)
}
}

impl Display for RerunServerPort {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

// Needed for clap
impl FromStr for RerunServerPort {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.parse::<u16>() {
Ok(port) => Ok(RerunServerPort(port)),
Err(err) => Err(format!("Failed to parse port: {err}")),
}
}
}

pub fn server_url(hostname: &str, port: RerunServerPort) -> String {
format!("{PROTOCOL}://{hostname}:{port}")
}

const PREFIX: [u8; 4] = *b"RR00";
Expand All @@ -41,14 +91,11 @@ pub fn encode_log_msg(log_msg: &LogMsg) -> Vec<u8> {
bytes
}

pub fn decode_log_msg(data: &[u8]) -> Result<LogMsg> {
pub fn decode_log_msg(data: &[u8]) -> Result<LogMsg, RerunServerError> {
let payload = data
.strip_prefix(&PREFIX)
.ok_or_else(|| anyhow::format_err!("Message didn't start with the correct prefix"))?;
.ok_or(RerunServerError::InvalidMessagePrefix)?;

use anyhow::Context as _;
use bincode::Options as _;
bincode::DefaultOptions::new()
.deserialize(payload)
.context("bincode")
Ok(bincode::DefaultOptions::new().deserialize(payload)?)
}
Loading

0 comments on commit f6f5b7d

Please sign in to comment.