Skip to content

Commit

Permalink
Merge pull request #740 from http-rs/bind
Browse files Browse the repository at this point in the history
Add `Server::bind`
  • Loading branch information
yoshuawuyts authored Nov 13, 2020
2 parents 09a53f0 + 6daa700 commit bdf2639
Show file tree
Hide file tree
Showing 10 changed files with 578 additions and 246 deletions.
51 changes: 38 additions & 13 deletions src/listener/concurrent_listener.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::listener::{Listener, ToListener};
use crate::listener::{ListenInfo, Listener, ToListener};
use crate::Server;

use std::fmt::{self, Debug, Display, Formatter};
Expand Down Expand Up @@ -33,12 +33,14 @@ use futures_util::stream::{futures_unordered::FuturesUnordered, StreamExt};
///```

#[derive(Default)]
pub struct ConcurrentListener<State>(Vec<Box<dyn Listener<State>>>);
pub struct ConcurrentListener<State> {
listeners: Vec<Box<dyn Listener<State>>>,
}

impl<State: Clone + Send + Sync + 'static> ConcurrentListener<State> {
/// creates a new ConcurrentListener
pub fn new() -> Self {
Self(vec![])
Self { listeners: vec![] }
}

/// Adds any [`ToListener`](crate::listener::ToListener) to this
Expand All @@ -55,8 +57,11 @@ impl<State: Clone + Send + Sync + 'static> ConcurrentListener<State> {
/// # std::mem::drop(tide::new().listen(listener)); // for the State generic
/// # Ok(()) }
/// ```
pub fn add<TL: ToListener<State>>(&mut self, listener: TL) -> io::Result<()> {
self.0.push(Box::new(listener.to_listener()?));
pub fn add<L>(&mut self, listener: L) -> io::Result<()>
where
L: ToListener<State>,
{
self.listeners.push(Box::new(listener.to_listener()?));
Ok(())
}

Expand All @@ -71,39 +76,59 @@ impl<State: Clone + Send + Sync + 'static> ConcurrentListener<State> {
/// .with_listener(async_std::net::TcpListener::bind("127.0.0.1:8081").await?),
/// ).await?;
/// # Ok(()) }) }
pub fn with_listener<TL: ToListener<State>>(mut self, listener: TL) -> Self {
pub fn with_listener<L>(mut self, listener: L) -> Self
where
L: ToListener<State>,
{
self.add(listener).expect("Unable to add listener");
self
}
}

#[async_trait::async_trait]
impl<State: Clone + Send + Sync + 'static> Listener<State> for ConcurrentListener<State> {
async fn listen(&mut self, app: Server<State>) -> io::Result<()> {
impl<State> Listener<State> for ConcurrentListener<State>
where
State: Clone + Send + Sync + 'static,
{
async fn bind(&mut self, app: Server<State>) -> io::Result<()> {
for listener in self.listeners.iter_mut() {
listener.bind(app.clone()).await?;
}
Ok(())
}

async fn accept(&mut self) -> io::Result<()> {
let mut futures_unordered = FuturesUnordered::new();

for listener in self.0.iter_mut() {
let app = app.clone();
futures_unordered.push(listener.listen(app));
for listener in self.listeners.iter_mut() {
futures_unordered.push(listener.accept());
}

while let Some(result) = futures_unordered.next().await {
result?;
}
Ok(())
}

fn info(&self) -> Vec<ListenInfo> {
self.listeners
.iter()
.map(|listener| listener.info().into_iter())
.flatten()
.collect()
}
}

impl<State> Debug for ConcurrentListener<State> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self.0)
write!(f, "{:?}", self.listeners)
}
}

impl<State> Display for ConcurrentListener<State> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let string = self
.0
.listeners
.iter()
.map(|l| l.to_string())
.collect::<Vec<_>>()
Expand Down
83 changes: 66 additions & 17 deletions src/listener/failover_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use std::fmt::{self, Debug, Display, Formatter};

use async_std::io;

use crate::listener::ListenInfo;

/// FailoverListener allows tide to attempt to listen in a sequential
/// order to any number of ports/addresses. The first successful
/// listener is used.
Expand All @@ -31,14 +33,22 @@ use async_std::io;
/// })
///}
///```

#[derive(Default)]
pub struct FailoverListener<State>(Vec<Box<dyn Listener<State>>>);
pub struct FailoverListener<State> {
listeners: Vec<Option<Box<dyn Listener<State>>>>,
index: Option<usize>,
}

impl<State: Clone + Send + Sync + 'static> FailoverListener<State> {
impl<State> FailoverListener<State>
where
State: Clone + Send + Sync + 'static,
{
/// creates a new FailoverListener
pub fn new() -> Self {
Self(vec![])
Self {
listeners: vec![],
index: None,
}
}

/// Adds any [`ToListener`](crate::listener::ToListener) to this
Expand All @@ -57,8 +67,11 @@ impl<State: Clone + Send + Sync + 'static> FailoverListener<State> {
/// # std::mem::drop(tide::new().listen(listener)); // for the State generic
/// # Ok(()) }
/// ```
pub fn add<TL: ToListener<State>>(&mut self, listener: TL) -> io::Result<()> {
self.0.push(Box::new(listener.to_listener()?));
pub fn add<L>(&mut self, listener: L) -> io::Result<()>
where
L: ToListener<State>,
{
self.listeners.push(Some(Box::new(listener.to_listener()?)));
Ok(())
}

Expand All @@ -73,21 +86,30 @@ impl<State: Clone + Send + Sync + 'static> FailoverListener<State> {
/// .with_listener(("localhost", 8081)),
/// ).await?;
/// # Ok(()) }) }
pub fn with_listener<TL: ToListener<State>>(mut self, listener: TL) -> Self {
pub fn with_listener<L>(mut self, listener: L) -> Self
where
L: ToListener<State>,
{
self.add(listener).expect("Unable to add listener");
self
}
}

#[async_trait::async_trait]
impl<State: Clone + Send + Sync + 'static> Listener<State> for FailoverListener<State> {
async fn listen(&mut self, app: Server<State>) -> io::Result<()> {
for listener in self.0.iter_mut() {
let app = app.clone();
match listener.listen(app).await {
Ok(_) => return Ok(()),
impl<State> Listener<State> for FailoverListener<State>
where
State: Clone + Send + Sync + 'static,
{
async fn bind(&mut self, app: Server<State>) -> io::Result<()> {
for (index, listener) in self.listeners.iter_mut().enumerate() {
let listener = listener.as_deref_mut().expect("bind called twice");
match listener.bind(app.clone()).await {
Ok(_) => {
self.index = Some(index);
return Ok(());
}
Err(e) => {
crate::log::info!("unable to listen", {
crate::log::info!("unable to bind", {
listener: listener.to_string(),
error: e.to_string()
});
Expand All @@ -100,20 +122,47 @@ impl<State: Clone + Send + Sync + 'static> Listener<State> for FailoverListener<
"unable to bind to any supplied listener spec",
))
}

async fn accept(&mut self) -> io::Result<()> {
match self.index {
Some(index) => {
let mut listener = self.listeners[index].take().expect("accept called twice");
listener.accept().await?;
Ok(())
}
None => Err(io::Error::new(
io::ErrorKind::AddrNotAvailable,
"unable to listen to any supplied listener spec",
)),
}
}

fn info(&self) -> Vec<ListenInfo> {
match self.index {
Some(index) => match self.listeners.get(index) {
Some(Some(listener)) => listener.info(),
_ => vec![],
},
None => vec![],
}
}
}

impl<State> Debug for FailoverListener<State> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self.0)
write!(f, "{:?}", self.listeners)
}
}

impl<State> Display for FailoverListener<State> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let string = self
.0
.listeners
.iter()
.map(|l| l.to_string())
.map(|l| match l {
Some(l) => l.to_string(),
None => String::new(),
})
.collect::<Vec<_>>()
.join(", ");

Expand Down
101 changes: 90 additions & 11 deletions src/listener/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@ mod to_listener_impls;
#[cfg(all(unix, feature = "h1-server"))]
mod unix_listener;

use crate::Server;
use std::fmt::{Debug, Display};

use async_std::io;
use async_trait::async_trait;

use crate::Server;

pub use concurrent_listener::ConcurrentListener;
pub use failover_listener::FailoverListener;
Expand All @@ -26,18 +30,46 @@ pub(crate) use tcp_listener::TcpListener;
#[cfg(all(unix, feature = "h1-server"))]
pub(crate) use unix_listener::UnixListener;

/// The Listener trait represents an implementation of http transport
/// for a tide application. In order to provide a Listener to tide,
/// you will also need to implement at least one [`ToListener`](crate::listener::ToListener) that
/// The Listener trait represents an implementation of http transport for a tide
/// application. In order to provide a Listener to tide, you will also need to
/// implement at least one [`ToListener`](crate::listener::ToListener) that
/// outputs your Listener type.
#[async_trait::async_trait]
pub trait Listener<State: 'static>:
std::fmt::Debug + std::fmt::Display + Send + Sync + 'static
#[async_trait]
pub trait Listener<State>: Debug + Display + Send + Sync + 'static
where
State: Send + Sync + 'static,
{
/// Bind the listener. This starts the listening process by opening the
/// necessary network ports, but not yet accepting incoming connections. This
/// method must be called before `accept`.
async fn bind(&mut self, app: Server<State>) -> io::Result<()>;

/// Start accepting incoming connections. This method must be called only
/// after `bind` has succeeded.
async fn accept(&mut self) -> io::Result<()>;

/// Expose information about the connection. This should always return valid
/// data after `bind` has succeeded.
fn info(&self) -> Vec<ListenInfo>;
}

#[async_trait]
impl<L, State> Listener<State> for Box<L>
where
L: Listener<State>,
State: Send + Sync + 'static,
{
/// This is the primary entrypoint for the Listener trait. listen
/// is called exactly once, and is expected to spawn tasks for
/// each incoming connection.
async fn listen(&mut self, app: Server<State>) -> io::Result<()>;
async fn bind(&mut self, app: Server<State>) -> io::Result<()> {
self.as_mut().bind(app).await
}

async fn accept(&mut self) -> io::Result<()> {
self.as_mut().accept().await
}

fn info(&self) -> Vec<ListenInfo> {
self.as_ref().info()
}
}

/// crate-internal shared logic used by tcp and unix listeners to
Expand All @@ -52,3 +84,50 @@ pub(crate) fn is_transient_error(e: &io::Error) -> bool {
ConnectionRefused | ConnectionAborted | ConnectionReset
)
}

/// Information about the `Listener`.
///
/// See [`Report`](../listener/trait.Report.html) for more.
#[derive(Debug, Clone)]
pub struct ListenInfo {
conn_string: String,
transport: String,
tls: bool,
}

impl ListenInfo {
/// Create a new instance of `ListenInfo`.
///
/// This method should only be called when implementing a new Tide `listener`
/// strategy.
pub fn new(conn_string: String, transport: String, tls: bool) -> Self {
Self {
conn_string,
transport,
tls,
}
}

/// Get the connection string.
pub fn connection(&self) -> &str {
self.conn_string.as_str()
}

/// The underlying transport this connection listens on.
///
/// Examples are: "tcp", "uds", etc.
pub fn transport(&self) -> &str {
self.transport.as_str()
}

/// Is the connection encrypted?
pub fn is_encrypted(&self) -> bool {
self.tls
}
}

impl Display for ListenInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.conn_string)
}
}
Loading

0 comments on commit bdf2639

Please sign in to comment.