Skip to content

Commit

Permalink
variant with no ToListener for discussion
Browse files Browse the repository at this point in the history
  • Loading branch information
jbr committed Jun 27, 2020
1 parent e80c51c commit e86e957
Show file tree
Hide file tree
Showing 8 changed files with 303 additions and 444 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ kv-log-macro = "1.0.4"
serde = "1.0.102"
serde_json = "1.0.41"
route-recognizer = "0.2.0"
futures = "0.3.5"

[dev-dependencies]
async-std = { version = "1.6.0", features = ["unstable", "attributes"] }
Expand Down
21 changes: 2 additions & 19 deletions src/listener/mod.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,20 @@
//! Types that represent HTTP transports and binding
mod multi_listener;
mod parsed_listener;
mod tcp_listener;
mod to_listener;
#[cfg(unix)]
mod unix_listener;

use crate::utils::BoxFuture;
use crate::Server;
use async_std::io;

pub use multi_listener::MultiListener;
pub use to_listener::ToListener;
pub use to_listener::Listener;
pub(crate) use to_listener::ResultFuture;

pub(crate) use parsed_listener::ParsedListener;
pub(crate) use tcp_listener::TcpListener;
#[cfg(unix)]
pub(crate) use unix_listener::UnixListener;

/// The Listener trait represents an implementation of http transport
/// for a tide application. In order to provide a Listener to tide,
/// you will also need to implement at least one [`ToListener`](crate::listener::ToListener) that
/// outputs your Listener type.
pub trait Listener<State: 'static>:
std::fmt::Debug + std::fmt::Display + Send + Sync + 'static
{
/// This is the primary entrypoint for the Listener trait. listen
/// is called exactly once, and is expected to spawn tasks for
/// each incoming connection.
fn listen<'a>(&'a mut self, app: Server<State>) -> BoxFuture<'a, io::Result<()>>;
}

pub(crate) fn is_transient_error(e: &io::Error) -> bool {
e.kind() == io::ErrorKind::ConnectionRefused
|| e.kind() == io::ErrorKind::ConnectionAborted
Expand Down
103 changes: 25 additions & 78 deletions src/listener/multi_listener.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use crate::listener::{Listener, ToListener};
use crate::utils::BoxFuture;
use crate::listener::{Listener, ResultFuture};
use crate::Server;
use async_std::{io, task};

use std::fmt::{self, Debug, Display, Formatter};

use async_std::io;
use async_std::prelude::*;
use futures::stream::{futures_unordered::FuturesUnordered, StreamExt};

/// MultiListener allows tide to listen on any number of transports
/// simultaneously (such as tcp ports, unix sockets, or tls).
Expand All @@ -19,10 +16,10 @@ use async_std::prelude::*;
/// app.at("/").get(|_| async { Ok("Hello, world!") });
///
/// let mut multi = tide::listener::MultiListener::new();
/// multi.add("127.0.0.1:8000")?;
/// multi.add(async_std::net::TcpListener::bind("127.0.0.1:8001").await?)?;
/// multi.bind("127.0.0.1:8000")?;
/// multi.bind(async_std::net::TcpListener::bind("127.0.0.1:8001").await?)?;
/// # if cfg!(unix) {
/// multi.add("unix://unix.socket")?;
/// multi.bind("unix://unix.socket")?;
/// # }
///
/// # if false {
Expand All @@ -33,84 +30,34 @@ use async_std::prelude::*;
///}
///```
#[derive(Default)]
pub struct MultiListener<State>(Vec<Box<dyn Listener<State>>>);
pub struct MultiListener<State>(
Server<State>,
FuturesUnordered<task::JoinHandle<io::Result<()>>>,
);

impl<State: Send + Sync + 'static> MultiListener<State> {
/// creates a new MultiListener
pub fn new() -> Self {
Self(vec![])
impl<State> std::fmt::Debug for MultiListener<State> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MultiListener").finish()
}
}

/// Adds any [`ToListener`](crate::listener::ToListener) to this
/// MultiListener. An error result represents a failure to convert
/// the [`ToListener`](crate::listener::ToListener) into a
/// [`Listener`](crate::listener::Listener).
///
/// ```rust
/// # fn main() -> std::io::Result<()> {
/// let mut multi = tide::listener::MultiListener::new();
/// multi.add("127.0.0.1:8000")?;
/// multi.add(("localhost", 8001))?;
/// multi.add(std::net::TcpListener::bind(("localhost", 8002))?)?;
/// # std::mem::drop(tide::new().listen(multi)); // for the State generic
/// # Ok(()) }
/// ```
pub fn add<TL: ToListener<State>>(&mut self, listener: TL) -> io::Result<()> {
self.0.push(Box::new(listener.to_listener()?));
Ok(())
impl<State: Send + Sync + 'static> MultiListener<State> {
pub fn new(app: Server<State>) -> Self {
Self(app, FuturesUnordered::new())
}

/// `MultiListener::with_listener` allows for chained construction of a MultiListener:
/// ```rust,no_run
/// # use tide::listener::MultiListener;
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async move {
/// # let app = tide::new();
/// app.listen(
/// MultiListener::new()
/// .with_listener("127.0.0.1:8080")
/// .with_listener(async_std::net::TcpListener::bind("127.0.0.1:8081").await?),
/// ).await?;
/// # Ok(()) }) }
pub fn with_listener<TL: ToListener<State>>(mut self, listener: TL) -> Self {
self.add(listener).expect("Unable to add listener");
self
pub fn bind<TL: Listener<State> + Send + Sync + 'static>(&mut self, listener: TL) {
self.1.push(task::spawn(listener.listen(self.0.clone())));
}
}

impl<State: Send + Sync + 'static> Listener<State> for MultiListener<State> {
fn listen<'a>(&'a mut self, app: Server<State>) -> BoxFuture<'a, io::Result<()>> {
let mut fut: Option<BoxFuture<'a, io::Result<()>>> = None;

for listener in self.0.iter_mut() {
let app = app.clone();
let listened = listener.listen(app);
if let Some(f) = fut {
fut = Some(Box::pin(f.race(listened)));
} else {
fut = Some(Box::pin(listened));
fn listen(mut self, _: Server<State>) -> ResultFuture {
Box::pin(async move {
while let Some(result) = self.1.next().await {
result?;
}
}

fut.expect("at least one listener must be provided to a MultiListener")
}
}

impl<State> Debug for MultiListener<State> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self.0)
}
}

impl<State> Display for MultiListener<State> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let string = self
.0
.iter()
.map(|l| l.to_string())
.collect::<Vec<_>>()
.join(", ");

writeln!(f, "{}", string)
Ok(())
})
}
}
33 changes: 0 additions & 33 deletions src/listener/parsed_listener.rs

This file was deleted.

5 changes: 2 additions & 3 deletions src/listener/tcp_listener.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use super::is_transient_error;

use crate::listener::Listener;
use crate::utils::BoxFuture;
use crate::listener::{Listener, ResultFuture};
use crate::{log, Server};

use std::fmt::{self, Display, Formatter};
Expand Down Expand Up @@ -62,7 +61,7 @@ fn handle_tcp<State: Send + Sync + 'static>(app: Server<State>, stream: TcpStrea
}

impl<State: Send + Sync + 'static> Listener<State> for TcpListener {
fn listen<'a>(&'a mut self, app: Server<State>) -> BoxFuture<'a, async_std::io::Result<()>> {
fn listen(mut self, app: Server<State>) -> ResultFuture {
Box::pin(async move {
self.connect().await?;
let listener = self.listener()?;
Expand Down
Loading

0 comments on commit e86e957

Please sign in to comment.