Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(transport): allow ListenerId to be user-controlled #3567

Merged
merged 34 commits into from
May 14, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
db8dbec
refactor(transport): Provide ListenerId in Transport::listen_on
dariusc93 Mar 7, 2023
3b4c37a
fix: Fix test
dariusc93 Mar 7, 2023
b35c50f
fix: Additional fixes
dariusc93 Mar 7, 2023
de5bd32
fix: Additional fixes
dariusc93 Mar 7, 2023
e55f4ed
fix: Remove ListenerId from Result
dariusc93 Mar 7, 2023
90446c2
fix: Additional fixes
dariusc93 Mar 7, 2023
a651083
Merge branch 'master' into listen_on_with_id
dariusc93 Mar 9, 2023
cc2b41c
chore: Formatted code
dariusc93 Mar 9, 2023
5178876
Merge branch 'master' into listen_on_with_id
dariusc93 Mar 10, 2023
117cf08
Merge branch 'master' into listen_on_with_id
dariusc93 Mar 12, 2023
c755ec0
chore: Update comments
dariusc93 Mar 12, 2023
2657ee9
Merge branch 'master' into listen_on_with_id
dariusc93 May 3, 2023
e55144f
chore: Formatted code
dariusc93 May 3, 2023
9d32aee
chore: Update example
dariusc93 May 4, 2023
861b7cc
chore: Use ListenerId instead of Default
dariusc93 May 4, 2023
579e002
chore: Use atomicu64 instead of random numbers
dariusc93 May 4, 2023
f17fe54
Merge branch 'master' into listen_on_with_id
dariusc93 May 4, 2023
0842eed
chore: format code
dariusc93 May 4, 2023
6acd5ac
chore: Import `ListenerId`
dariusc93 May 4, 2023
188fb15
Merge branch 'master' into listen_on_with_id
dariusc93 May 6, 2023
450a2d5
refactor: Add `ListenerId::next` and deprecate `ListenerId::new` and …
dariusc93 May 6, 2023
562d3b5
chore: formatted code
dariusc93 May 6, 2023
5d5df5c
chore: Use ListenerId::next instead of default
dariusc93 May 6, 2023
1e43396
chore: Use ListenerId::next
dariusc93 May 6, 2023
114c271
Update transports/websocket/src/framed.rs
dariusc93 May 6, 2023
07a7071
chore: Update CHANGELOG.md
dariusc93 May 6, 2023
cf10cc6
Merge branch 'master' into listen_on_with_id
dariusc93 May 8, 2023
3d65a69
Merge branch 'master' into listen_on_with_id
dariusc93 May 9, 2023
a3fc4c3
Merge branch 'master' into listen_on_with_id
dariusc93 May 13, 2023
ffe19ec
fix: Correct merged code
dariusc93 May 13, 2023
3199300
Update core/CHANGELOG.md
dariusc93 May 14, 2023
7d2a295
Update core/CHANGELOG.md
dariusc93 May 14, 2023
b1ebf65
chore: Use AtomicUsize
dariusc93 May 14, 2023
08ceb02
Update core/CHANGELOG.md
mxinden May 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions core/src/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,18 @@ where
}
}

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
fn listen_on(
&mut self,
id: ListenerId,
addr: Multiaddr,
) -> Result<(), TransportError<Self::Error>> {
use TransportError::*;
match self {
Either::Left(a) => a.listen_on(addr).map_err(|e| match e {
Either::Left(a) => a.listen_on(id, addr).map_err(|e| match e {
MultiaddrNotSupported(addr) => MultiaddrNotSupported(addr),
Other(err) => Other(Either::Left(err)),
}),
Either::Right(b) => b.listen_on(addr).map_err(|e| match e {
Either::Right(b) => b.listen_on(id, addr).map_err(|e| match e {
MultiaddrNotSupported(addr) => MultiaddrNotSupported(addr),
Other(err) => Other(Either::Right(err)),
}),
Expand Down
28 changes: 21 additions & 7 deletions core/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use std::{
error::Error,
fmt,
pin::Pin,
sync::atomic::{AtomicU64, Ordering},
task::{Context, Poll},
};

Expand All @@ -54,6 +55,8 @@ pub use self::memory::MemoryTransport;
pub use self::optional::OptionalTransport;
pub use self::upgrade::Upgrade;

static NEXT_LISTENER_ID: AtomicU64 = AtomicU64::new(1);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change this to AtomicUsize and use usize in ListenerId instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it really matters, happy to leave it as a u64. What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can leave it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding to the above the argument of portability.

PowerPC and MIPS platforms with 32-bit pointers do not have AtomicU64 or AtomicI64 types.

https://doc.rust-lang.org/std/sync/atomic/index.html#portability

Thus slight preference for AtomicUsize from my end. Also consistent with ConnectionId.


/// A transport provides connection-oriented communication between two peers
/// through ordered streams of data (i.e. connections).
///
Expand Down Expand Up @@ -108,8 +111,12 @@ pub trait Transport {
/// obtained from [dialing](Transport::dial).
type Dial: Future<Output = Result<Self::Output, Self::Error>>;

/// Listens on the given [`Multiaddr`] for inbound connections.
fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>>;
/// Listens on the given [`Multiaddr`] for inbound connections with a provided [`ListenerId`].
fn listen_on(
&mut self,
id: ListenerId,
addr: Multiaddr,
) -> Result<(), TransportError<Self::Error>>;

/// Remove a listener.
///
Expand Down Expand Up @@ -243,15 +250,22 @@ pub trait Transport {
pub struct ListenerId(u64);

impl ListenerId {
#[deprecated(note = "Renamed to ` ListenerId::next`.")]
#[allow(clippy::new_without_default)]
/// Creates a new `ListenerId`.
pub fn new() -> Self {
dariusc93 marked this conversation as resolved.
Show resolved Hide resolved
ListenerId(rand::random())
ListenerId::next()
}

/// Creates a new `ListenerId`.
pub fn next() -> Self {
ListenerId(NEXT_LISTENER_ID.fetch_add(1, Ordering::SeqCst))
}
}

impl Default for ListenerId {
fn default() -> Self {
Self::new()
#[deprecated(note = "Use ` ListenerId::next` instead.")]
#[allow(clippy::should_implement_trait)]
pub fn default() -> Self {
Self::next()
}
}

Expand Down
8 changes: 6 additions & 2 deletions core/src/transport/and_then.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,13 @@ where
type ListenerUpgrade = AndThenFuture<T::ListenerUpgrade, C, F>;
type Dial = AndThenFuture<T::Dial, C, F>;

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
fn listen_on(
&mut self,
id: ListenerId,
addr: Multiaddr,
) -> Result<(), TransportError<Self::Error>> {
self.transport
.listen_on(addr)
.listen_on(id, addr)
.map_err(|err| err.map(Either::Left))
}

Expand Down
22 changes: 17 additions & 5 deletions core/src/transport/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ type Dial<O> = Pin<Box<dyn Future<Output = io::Result<O>> + Send>>;
type ListenerUpgrade<O> = Pin<Box<dyn Future<Output = io::Result<O>> + Send>>;

trait Abstract<O> {
fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>>;
fn listen_on(
&mut self,
id: ListenerId,
addr: Multiaddr,
) -> Result<(), TransportError<io::Error>>;
fn remove_listener(&mut self, id: ListenerId) -> bool;
fn dial(&mut self, addr: Multiaddr) -> Result<Dial<O>, TransportError<io::Error>>;
fn dial_as_listener(&mut self, addr: Multiaddr) -> Result<Dial<O>, TransportError<io::Error>>;
Expand All @@ -70,8 +74,12 @@ where
T::Dial: Send + 'static,
T::ListenerUpgrade: Send + 'static,
{
fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
Transport::listen_on(self, addr).map_err(|e| e.map(box_err))
fn listen_on(
&mut self,
id: ListenerId,
addr: Multiaddr,
) -> Result<(), TransportError<io::Error>> {
Transport::listen_on(self, id, addr).map_err(|e| e.map(box_err))
}

fn remove_listener(&mut self, id: ListenerId) -> bool {
Expand Down Expand Up @@ -123,8 +131,12 @@ impl<O> Transport for Boxed<O> {
type ListenerUpgrade = ListenerUpgrade<O>;
type Dial = Dial<O>;

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
self.inner.listen_on(addr)
fn listen_on(
&mut self,
id: ListenerId,
addr: Multiaddr,
) -> Result<(), TransportError<Self::Error>> {
self.inner.listen_on(id, addr)
}

fn remove_listener(&mut self, id: ListenerId) -> bool {
Expand Down
10 changes: 7 additions & 3 deletions core/src/transport/choice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,17 @@ where
type ListenerUpgrade = EitherFuture<A::ListenerUpgrade, B::ListenerUpgrade>;
type Dial = EitherFuture<A::Dial, B::Dial>;

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
let addr = match self.0.listen_on(addr) {
fn listen_on(
&mut self,
id: ListenerId,
addr: Multiaddr,
) -> Result<(), TransportError<Self::Error>> {
let addr = match self.0.listen_on(id, addr) {
Err(TransportError::MultiaddrNotSupported(addr)) => addr,
res => return res.map_err(|err| err.map(Either::Left)),
};

let addr = match self.1.listen_on(addr) {
let addr = match self.1.listen_on(id, addr) {
Err(TransportError::MultiaddrNotSupported(addr)) => addr,
res => return res.map_err(|err| err.map(Either::Right)),
};
Expand Down
6 changes: 5 additions & 1 deletion core/src/transport/dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ impl<TOut> Transport for DummyTransport<TOut> {
type ListenerUpgrade = futures::future::Pending<Result<Self::Output, io::Error>>;
type Dial = futures::future::Pending<Result<Self::Output, io::Error>>;

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
fn listen_on(
&mut self,
_id: ListenerId,
addr: Multiaddr,
) -> Result<(), TransportError<Self::Error>> {
Err(TransportError::MultiaddrNotSupported(addr))
}

Expand Down
8 changes: 6 additions & 2 deletions core/src/transport/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,12 @@ where
type ListenerUpgrade = MapFuture<T::ListenerUpgrade, F>;
type Dial = MapFuture<T::Dial, F>;

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
self.transport.listen_on(addr)
fn listen_on(
&mut self,
id: ListenerId,
addr: Multiaddr,
) -> Result<(), TransportError<Self::Error>> {
self.transport.listen_on(id, addr)
}

fn remove_listener(&mut self, id: ListenerId) -> bool {
Expand Down
10 changes: 8 additions & 2 deletions core/src/transport/map_err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,15 @@ where
type ListenerUpgrade = MapErrListenerUpgrade<T, F>;
type Dial = MapErrDial<T, F>;

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
fn listen_on(
&mut self,
id: ListenerId,
addr: Multiaddr,
) -> Result<(), TransportError<Self::Error>> {
let map = self.map.clone();
self.transport.listen_on(addr).map_err(|err| err.map(map))
self.transport
.listen_on(id, addr)
.map_err(|err| err.map(map))
}

fn remove_listener(&mut self, id: ListenerId) -> bool {
Expand Down
55 changes: 38 additions & 17 deletions core/src/transport/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,11 @@ impl Transport for MemoryTransport {
type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
type Dial = DialFuture;

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
fn listen_on(
&mut self,
id: ListenerId,
addr: Multiaddr,
) -> Result<(), TransportError<Self::Error>> {
let port = if let Ok(port) = parse_memory_addr(&addr) {
port
} else {
Expand All @@ -191,7 +195,6 @@ impl Transport for MemoryTransport {
None => return Err(TransportError::Other(MemoryTransportError::Unreachable)),
};

let id = ListenerId::new();
let listener = Listener {
id,
port,
Expand All @@ -201,7 +204,7 @@ impl Transport for MemoryTransport {
};
self.listeners.push_back(Box::pin(listener));

Ok(id)
Ok(())
}

fn remove_listener(&mut self, id: ListenerId) -> bool {
Expand Down Expand Up @@ -457,30 +460,40 @@ mod tests {
let addr_1: Multiaddr = "/memory/1639174018481".parse().unwrap();
let addr_2: Multiaddr = "/memory/8459375923478".parse().unwrap();

let listener_id_1 = transport.listen_on(addr_1.clone()).unwrap();
let listener_id_1 = ListenerId::next();

transport.listen_on(listener_id_1, addr_1.clone()).unwrap();
assert!(
transport.remove_listener(listener_id_1),
"Listener doesn't exist."
);

let listener_id_2 = transport.listen_on(addr_1.clone()).unwrap();
let listener_id_3 = transport.listen_on(addr_2.clone()).unwrap();
let listener_id_2 = ListenerId::next();
transport.listen_on(listener_id_2, addr_1.clone()).unwrap();
let listener_id_3 = ListenerId::next();
transport.listen_on(listener_id_3, addr_2.clone()).unwrap();

assert!(transport.listen_on(addr_1.clone()).is_err());
assert!(transport.listen_on(addr_2.clone()).is_err());
assert!(transport
.listen_on(ListenerId::next(), addr_1.clone())
.is_err());
assert!(transport
.listen_on(ListenerId::next(), addr_2.clone())
.is_err());

assert!(
transport.remove_listener(listener_id_2),
"Listener doesn't exist."
);
assert!(transport.listen_on(addr_1).is_ok());
assert!(transport.listen_on(addr_2.clone()).is_err());
assert!(transport.listen_on(ListenerId::next(), addr_1).is_ok());
assert!(transport
.listen_on(ListenerId::next(), addr_2.clone())
.is_err());

assert!(
transport.remove_listener(listener_id_3),
"Listener doesn't exist."
);
assert!(transport.listen_on(addr_2).is_ok());
assert!(transport.listen_on(ListenerId::next(), addr_2).is_ok());
}

#[test]
Expand All @@ -489,8 +502,11 @@ mod tests {
assert!(transport
.dial("/memory/810172461024613".parse().unwrap())
.is_err());
let _listener = transport
.listen_on("/memory/810172461024613".parse().unwrap())
transport
.listen_on(
ListenerId::next(),
"/memory/810172461024613".parse().unwrap(),
)
.unwrap();
assert!(transport
.dial("/memory/810172461024613".parse().unwrap())
Expand All @@ -504,7 +520,8 @@ mod tests {

let mut transport = MemoryTransport::default().boxed();
futures::executor::block_on(async {
let listener_id = transport.listen_on(addr.clone()).unwrap();
let listener_id = ListenerId::next();
transport.listen_on(listener_id, addr.clone()).unwrap();
let reported_addr = transport
.select_next_some()
.await
Expand Down Expand Up @@ -539,7 +556,7 @@ mod tests {
let mut t1 = MemoryTransport::default().boxed();

let listener = async move {
t1.listen_on(t1_addr.clone()).unwrap();
t1.listen_on(ListenerId::next(), t1_addr.clone()).unwrap();
let upgrade = loop {
let event = t1.select_next_some().await;
if let Some(upgrade) = event.into_incoming() {
Expand Down Expand Up @@ -577,7 +594,9 @@ mod tests {
let mut listener_transport = MemoryTransport::default().boxed();

let listener = async move {
listener_transport.listen_on(listener_addr.clone()).unwrap();
listener_transport
.listen_on(ListenerId::next(), listener_addr.clone())
.unwrap();
loop {
if let TransportEvent::Incoming { send_back_addr, .. } =
listener_transport.select_next_some().await
Expand Down Expand Up @@ -614,7 +633,9 @@ mod tests {
let mut listener_transport = MemoryTransport::default().boxed();

let listener = async move {
listener_transport.listen_on(listener_addr.clone()).unwrap();
listener_transport
.listen_on(ListenerId::next(), listener_addr.clone())
.unwrap();
loop {
if let TransportEvent::Incoming { send_back_addr, .. } =
listener_transport.select_next_some().await
Expand Down
8 changes: 6 additions & 2 deletions core/src/transport/optional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,13 @@ where
type ListenerUpgrade = T::ListenerUpgrade;
type Dial = T::Dial;

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
fn listen_on(
&mut self,
id: ListenerId,
addr: Multiaddr,
) -> Result<(), TransportError<Self::Error>> {
if let Some(inner) = self.0.as_mut() {
inner.listen_on(addr)
inner.listen_on(id, addr)
} else {
Err(TransportError::MultiaddrNotSupported(addr))
}
Expand Down
8 changes: 6 additions & 2 deletions core/src/transport/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,13 @@ where
type ListenerUpgrade = Timeout<InnerTrans::ListenerUpgrade>;
type Dial = Timeout<InnerTrans::Dial>;

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
fn listen_on(
&mut self,
id: ListenerId,
addr: Multiaddr,
) -> Result<(), TransportError<Self::Error>> {
self.inner
.listen_on(addr)
.listen_on(id, addr)
.map_err(|err| err.map(TransportTimeoutError::Other))
}

Expand Down
Loading