Skip to content

Commit

Permalink
Add poll_accept & struct that impl Future
Browse files Browse the repository at this point in the history
  • Loading branch information
cecton committed Jan 13, 2022
1 parent d0dba7a commit c2ec467
Showing 1 changed file with 47 additions and 36 deletions.
83 changes: 47 additions & 36 deletions tokio-util/src/net/unix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::either::Either;
use std::future::Future;
use std::io::Result;
use std::pin::Pin;
use std::task::{Context, Poll};

/// A trait for a listener: `TcpListener` and `UnixListener`.
pub trait Listener: Send + Unpin {
Expand All @@ -12,10 +13,16 @@ pub trait Listener: Send + Unpin {
/// The socket address type of this listener.
type Addr;

/// Polls to accept a new incoming connection to this listener.
fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<Result<(Self::Io, Self::Addr)>>;

/// Accepts a new incoming connection from this listener.
fn accept<'a>(
&'a self,
) -> Pin<Box<dyn Future<Output = Result<(Self::Io, Self::Addr)>> + Send + 'a>>;
fn accept<'a>(&'a mut self) -> ListenerAcceptFut<'a, Self>
where
Self: Sized + Unpin,
{
ListenerAcceptFut::new(self)
}

/// Returns the local address that this listener is bound to.
fn local_addr(&self) -> Result<Self::Addr>;
Expand All @@ -25,14 +32,8 @@ impl Listener for tokio::net::TcpListener {
type Io = tokio::net::TcpStream;
type Addr = std::net::SocketAddr;

fn accept<'a>(
&'a self,
) -> Pin<Box<dyn Future<Output = Result<(Self::Io, Self::Addr)>> + Send + 'a>> {
let accept = self.accept();
Box::pin(async {
let (stream, addr) = accept.await?;
Ok((stream, addr.into()))
})
fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<Result<(Self::Io, Self::Addr)>> {
Self::poll_accept(self, cx)
}

fn local_addr(&self) -> Result<Self::Addr> {
Expand All @@ -44,14 +45,8 @@ impl Listener for tokio::net::UnixListener {
type Io = tokio::net::UnixStream;
type Addr = tokio::net::unix::SocketAddr;

fn accept<'a>(
&'a self,
) -> Pin<Box<dyn Future<Output = Result<(Self::Io, Self::Addr)>> + Send + 'a>> {
let accept = self.accept();
Box::pin(async {
let (stream, addr) = accept.await?;
Ok((stream, addr.into()))
})
fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<Result<(Self::Io, Self::Addr)>> {
Self::poll_accept(self, cx)
}

fn local_addr(&self) -> Result<Self::Addr> {
Expand All @@ -67,24 +62,14 @@ where
type Io = Either<<L as Listener>::Io, <R as Listener>::Io>;
type Addr = Either<<L as Listener>::Addr, <R as Listener>::Addr>;

fn accept<'a>(
&'a self,
) -> Pin<Box<dyn Future<Output = Result<(Self::Io, Self::Addr)>> + Send + 'a>> {
fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<Result<(Self::Io, Self::Addr)>> {
match self {
Either::Left(listener) => {
let fut = listener.accept();
Box::pin(async move {
let (stream, addr) = fut.await?;
Ok((Either::Left(stream), Either::Left(addr)))
})
}
Either::Right(listener) => {
let fut = listener.accept();
Box::pin(async move {
let (stream, addr) = fut.await?;
Ok((Either::Right(stream), Either::Right(addr)))
})
}
Either::Left(listener) => listener
.poll_accept(cx)
.map(|res| res.map(|(stream, addr)| (Either::Left(stream), Either::Left(addr)))),
Either::Right(listener) => listener
.poll_accept(cx)
.map(|res| res.map(|(stream, addr)| (Either::Right(stream), Either::Right(addr)))),
}
}

Expand All @@ -101,3 +86,29 @@ where
}
}
}

/// Future for accepting a new connection from a listener.
#[derive(Debug)]
pub struct ListenerAcceptFut<'a, L> {
listener: &'a L,
}

impl<'a, L> ListenerAcceptFut<'a, L>
where
L: Listener,
{
fn new(listener: &'a L) -> Self {
Self { listener }
}
}

impl<'a, L> Future for ListenerAcceptFut<'a, L>
where
L: Listener,
{
type Output = Result<(L::Io, L::Addr)>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.listener.poll_accept(cx)
}
}

0 comments on commit c2ec467

Please sign in to comment.