From 474d2c02fe2946d1e3a92e50eb9e3697a4c0c78b Mon Sep 17 00:00:00 2001 From: Roman Borschel Date: Tue, 4 Aug 2020 11:30:09 +0200 Subject: [PATCH] [core/swarm] Emit events for active connection close and fix `disconnect()`. (#1619) * Emit events for active connection close and fix `disconnect()`. The `Network` does currently not emit events for actively closed connections, e.g. via `EstablishedConnection::close` or `ConnectedPeer::disconnect()`. As a result, when actively closing connections, there will be `ConnectionEstablished` events emitted without eventually a matching `ConnectionClosed` event. This seems undesirable and has the consequence that the `Swarm::ban_peer_id` feature in `libp2p-swarm` does not result in appropriate calls to `NetworkBehaviour::inject_connection_closed` and `NetworkBehaviour::inject_disconnected`. Furthermore, the `disconnect()` functionality in `libp2p-core` is currently broken as it leaves the `Pool` in an inconsistent state. This commit does the following: 1. When connection background tasks are dropped (i.e. removed from the `Manager`), they always terminate immediately, without attempting an orderly close of the connection. 2. An orderly close is sent to the background task of a connection as a regular command. The background task emits a `Closed` event before terminating. 3. `Pool::disconnect()` removes all connection tasks for the affected peer from the `Manager`, i.e. without an orderly close, thereby also fixing the discovered state inconsistency due to not removing the corresponding entries in the `Pool` itself after removing them from the `Manager`. 4. A new test is added to `libp2p-swarm` that exercises the ban/unban functionality and places assertions on the number and order of calls to the `NetworkBehaviour`. In that context some new testing utilities have been added to `libp2p-swarm`. This addresses https://github.com/libp2p/rust-libp2p/issues/1584. * Update swarm/src/lib.rs Co-authored-by: Toralf Wittner * Incorporate some review feedback. * Adapt to changes in master. * More verbose panic messages. * Simplify There is no need for a `StartClose` future. * Fix doc links. * Further small cleanup. * Update CHANGELOGs and versions. Co-authored-by: Toralf Wittner --- CHANGELOG.md | 4 + Cargo.toml | 42 ++-- core/CHANGELOG.md | 13 ++ core/Cargo.toml | 2 +- core/src/connection/manager.rs | 62 ++++-- core/src/connection/manager/task.rs | 209 ++++++++++--------- core/src/connection/pool.rs | 175 ++++++++++------ core/src/network.rs | 6 +- core/src/network/event.rs | 34 ++-- core/src/network/peer.rs | 4 +- muxers/mplex/CHANGELOG.md | 4 + muxers/mplex/Cargo.toml | 4 +- muxers/yamux/CHANGELOG.md | 4 + muxers/yamux/Cargo.toml | 4 +- protocols/deflate/CHANGELOG.md | 4 + protocols/deflate/Cargo.toml | 4 +- protocols/floodsub/CHANGELOG.md | 4 + protocols/floodsub/Cargo.toml | 6 +- protocols/gossipsub/CHANGELOG.md | 4 +- protocols/gossipsub/Cargo.toml | 6 +- protocols/identify/CHANGELOG.md | 4 + protocols/identify/Cargo.toml | 6 +- protocols/kad/CHANGELOG.md | 4 + protocols/kad/Cargo.toml | 6 +- protocols/mdns/CHANGELOG.md | 4 + protocols/mdns/Cargo.toml | 6 +- protocols/noise/CHANGELOG.md | 4 + protocols/noise/Cargo.toml | 4 +- protocols/ping/CHANGELOG.md | 4 + protocols/ping/Cargo.toml | 6 +- protocols/plaintext/CHANGELOG.md | 4 + protocols/plaintext/Cargo.toml | 4 +- protocols/request-response/CHANGELOG.md | 4 + protocols/request-response/Cargo.toml | 6 +- protocols/secio/CHANGELOG.md | 4 + protocols/secio/Cargo.toml | 4 +- swarm/CHANGELOG.md | 12 ++ swarm/Cargo.toml | 5 +- swarm/src/lib.rs | 196 +++++++++++++++--- swarm/src/protocols_handler/dummy.rs | 14 +- swarm/src/test.rs | 254 ++++++++++++++++++++++++ transports/dns/CHANGELOG.md | 4 + transports/dns/Cargo.toml | 4 +- transports/tcp/CHANGELOG.md | 4 + transports/tcp/Cargo.toml | 4 +- transports/uds/CHANGELOG.md | 4 + transports/uds/Cargo.toml | 4 +- transports/wasm-ext/CHANGELOG.md | 4 + transports/wasm-ext/Cargo.toml | 4 +- transports/websocket/CHANGELOG.md | 4 + transports/websocket/Cargo.toml | 4 +- 51 files changed, 884 insertions(+), 306 deletions(-) create mode 100644 swarm/src/test.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 91043255..ebb3882e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,10 @@ - [`parity-multiaddr` CHANGELOG](misc/multiaddr/CHANGELOG.md) - [`libp2p-core-derive` CHANGELOG](misc/core-derive/CHANGELOG.md) +# Version 0.24.0 [unreleased] + +- Update `libp2p-core`, `libp2p-swarm` and dependent crates. + # Version 0.23.0 (2020-08-03) **NOTE**: For a smooth upgrade path from `0.21` to `> 0.22` diff --git a/Cargo.toml b/Cargo.toml index 7c76671a..09af466d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,23 +62,23 @@ atomic = "0.4.6" bytes = "0.5" futures = "0.3.1" lazy_static = "1.2" -libp2p-core = { version = "0.20.0", path = "core" } -libp2p-core-derive = { version = "0.20.0", path = "misc/core-derive" } -libp2p-floodsub = { version = "0.20.0", path = "protocols/floodsub", optional = true } -libp2p-gossipsub = { version = "0.20.0", path = "./protocols/gossipsub", optional = true } -libp2p-identify = { version = "0.20.0", path = "protocols/identify", optional = true } -libp2p-kad = { version = "0.21.0", path = "protocols/kad", optional = true } -libp2p-mplex = { version = "0.20.0", path = "muxers/mplex", optional = true } -libp2p-noise = { version = "0.22.0", path = "protocols/noise", optional = true } -libp2p-ping = { version = "0.20.0", path = "protocols/ping", optional = true } -libp2p-plaintext = { version = "0.20.0", path = "protocols/plaintext", optional = true } +libp2p-core = { version = "0.21.0", path = "core" } +libp2p-core-derive = { version = "0.20.2", path = "misc/core-derive" } +libp2p-floodsub = { version = "0.21.0", path = "protocols/floodsub", optional = true } +libp2p-gossipsub = { version = "0.21.0", path = "./protocols/gossipsub", optional = true } +libp2p-identify = { version = "0.21.0", path = "protocols/identify", optional = true } +libp2p-kad = { version = "0.22.0", path = "protocols/kad", optional = true } +libp2p-mplex = { version = "0.21.0", path = "muxers/mplex", optional = true } +libp2p-noise = { version = "0.23.0", path = "protocols/noise", optional = true } +libp2p-ping = { version = "0.21.0", path = "protocols/ping", optional = true } +libp2p-plaintext = { version = "0.21.0", path = "protocols/plaintext", optional = true } libp2p-pnet = { version = "0.19.1", path = "protocols/pnet", optional = true } -libp2p-request-response = { version = "0.1.0", path = "protocols/request-response", optional = true } -libp2p-secio = { version = "0.20.0", path = "protocols/secio", default-features = false, optional = true } -libp2p-swarm = { version = "0.20.0", path = "swarm" } -libp2p-uds = { version = "0.20.0", path = "transports/uds", optional = true } -libp2p-wasm-ext = { version = "0.20.0", path = "transports/wasm-ext", optional = true } -libp2p-yamux = { version = "0.20.0", path = "muxers/yamux", optional = true } +libp2p-request-response = { version = "0.2.0", path = "protocols/request-response", optional = true } +libp2p-secio = { version = "0.21.0", path = "protocols/secio", default-features = false, optional = true } +libp2p-swarm = { version = "0.21.0", path = "swarm" } +libp2p-uds = { version = "0.21.0", path = "transports/uds", optional = true } +libp2p-wasm-ext = { version = "0.21.0", path = "transports/wasm-ext", optional = true } +libp2p-yamux = { version = "0.21.0", path = "muxers/yamux", optional = true } multiaddr = { package = "parity-multiaddr", version = "0.9.1", path = "misc/multiaddr" } multihash = "0.11.0" parking_lot = "0.10.0" @@ -87,11 +87,11 @@ smallvec = "1.0" wasm-timer = "0.2.4" [target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies] -libp2p-deflate = { version = "0.20.0", path = "protocols/deflate", optional = true } -libp2p-dns = { version = "0.20.0", path = "transports/dns", optional = true } -libp2p-mdns = { version = "0.20.0", path = "protocols/mdns", optional = true } -libp2p-tcp = { version = "0.20.0", path = "transports/tcp", optional = true } -libp2p-websocket = { version = "0.21.0", path = "transports/websocket", optional = true } +libp2p-deflate = { version = "0.21.0", path = "protocols/deflate", optional = true } +libp2p-dns = { version = "0.21.0", path = "transports/dns", optional = true } +libp2p-mdns = { version = "0.21.0", path = "protocols/mdns", optional = true } +libp2p-tcp = { version = "0.21.0", path = "transports/tcp", optional = true } +libp2p-websocket = { version = "0.22.0", path = "transports/websocket", optional = true } [dev-dependencies] async-std = "1.6.2" diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index 14c8889d..67256d6a 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -1,3 +1,16 @@ +# 0.21.0 [unreleased] + +- Refactoring of connection close and disconnect behaviour. In particular, the former + `NetworkEvent::ConnectionError` is now `NetworkEvent::ConnectionClosed` with the `error` + field being an `Option` and `None` indicating an active (but not necessarily orderly) close. + This guarantees that `ConnectionEstablished` events are always eventually paired + with `ConnectionClosed` events, regardless of how connections are closed. + Correspondingly, `EstablishedConnection::close` is now `EstablishedConnection::start_close` + to reflect that an orderly close completes asynchronously in the background, with the + outcome observed by continued polling of the `Network`. In contrast, `disconnect`ing + a peer takes effect immediately without an orderly connection shutdown. + See [PR 1619](https://github.com/libp2p/rust-libp2p/pull/1619) for further details. + # 0.20.1 [2020-17-17] - Update ed25519-dalek dependency. diff --git a/core/Cargo.toml b/core/Cargo.toml index f242a45f..0478e7b2 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-core" edition = "2018" description = "Core traits and structs of libp2p" -version = "0.20.1" +version = "0.21.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/core/src/connection/manager.rs b/core/src/connection/manager.rs index ef5d4e67..1844f6e5 100644 --- a/core/src/connection/manager.rs +++ b/core/src/connection/manager.rs @@ -196,18 +196,19 @@ pub enum Event<'a, I, O, H, TE, HE, C> { handler: H }, - /// An established connection has encountered an error. - ConnectionError { + /// An established connection has been closed. + ConnectionClosed { /// The connection ID. /// - /// As a result of the error, the connection has been removed - /// from the `Manager` and is being closed. Hence this ID will - /// no longer resolve to a valid entry in the manager. + /// > **Note**: Closed connections are removed from the `Manager`. + /// > Hence this ID will no longer resolve to a valid entry in + /// > the manager. id: ConnectionId, - /// Information about the connection that encountered the error. + /// Information about the closed connection. connected: Connected, - /// The error that occurred. - error: ConnectionError, + /// The error that occurred, if any. If `None`, the connection + /// has been actively closed. + error: Option>, }, /// A connection has been established. @@ -348,11 +349,11 @@ impl Manager { /// Polls the manager for events relating to the managed connections. pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll> { // Advance the content of `local_spawns`. - while let Poll::Ready(Some(_)) = Stream::poll_next(Pin::new(&mut self.local_spawns), cx) {} + while let Poll::Ready(Some(_)) = self.local_spawns.poll_next_unpin(cx) {} // Poll for the first event for which the manager still has a registered task, if any. let event = loop { - match Stream::poll_next(Pin::new(&mut self.events_rx), cx) { + match self.events_rx.poll_next_unpin(cx) { Poll::Ready(Some(event)) => { if self.tasks.contains_key(event.id()) { // (1) break event @@ -397,19 +398,18 @@ impl Manager { old_endpoint: old, new_endpoint: new, } - }, - task::Event::Error { id, error } => { + } + task::Event::Closed { id, error } => { let id = ConnectionId(id); let task = task.remove(); match task.state { TaskState::Established(connected) => - Event::ConnectionError { id, connected, error }, + Event::ConnectionClosed { id, connected, error }, TaskState::Pending => unreachable!( - "`Event::Error` implies (2) occurred on that task and thus (3)." + "`Event::Closed` implies (2) occurred on that task and thus (3)." ), } } - }) } else { unreachable!("By (1)") @@ -455,10 +455,11 @@ impl<'a, I, C> EstablishedEntry<'a, I, C> { /// > task _may not be notified_ if sending the event fails due to /// > the connection handler not being ready at this time. pub fn notify_handler(&mut self, event: I) -> Result<(), I> { - let cmd = task::Command::NotifyHandler(event); + let cmd = task::Command::NotifyHandler(event); // (*) self.task.get_mut().sender.try_send(cmd) .map_err(|e| match e.into_inner() { - task::Command::NotifyHandler(event) => event + task::Command::NotifyHandler(event) => event, + _ => panic!("Unexpected command. Expected `NotifyHandler`") // see (*) }) } @@ -472,6 +473,22 @@ impl<'a, I, C> EstablishedEntry<'a, I, C> { self.task.get_mut().sender.poll_ready(cx).map_err(|_| ()) } + /// Sends a close command to the associated background task, + /// thus initiating a graceful active close of the connection. + /// + /// Has no effect if the connection is already closing. + /// + /// When the connection is ultimately closed, [`Event::ConnectionClosed`] + /// is emitted by [`Manager::poll`]. + pub fn start_close(mut self) { + // Clone the sender so that we are guaranteed to have + // capacity for the close command (every sender gets a slot). + match self.task.get_mut().sender.clone().try_send(task::Command::Close) { + Ok(()) => {}, + Err(e) => assert!(e.is_disconnected(), "No capacity for close command.") + } + } + /// Obtains information about the established connection. pub fn connected(&self) -> &Connected { match &self.task.get().state { @@ -480,16 +497,18 @@ impl<'a, I, C> EstablishedEntry<'a, I, C> { } } - /// Closes the connection represented by this entry, - /// returning the connection information. - pub fn close(self) -> Connected { + /// Instantly removes the entry from the manager, dropping + /// the command channel to the background task of the connection, + /// which will thus drop the connection asap without an orderly + /// close or emitting another event. + pub fn remove(self) -> Connected { match self.task.remove().state { TaskState::Established(c) => c, TaskState::Pending => unreachable!("By Entry::new()") } } - /// Returns the connection id. + /// Returns the connection ID. pub fn id(&self) -> ConnectionId { ConnectionId(*self.task.key()) } @@ -513,3 +532,4 @@ impl<'a, I, C> PendingEntry<'a, I, C> { self.task.remove(); } } + diff --git a/core/src/connection/manager/task.rs b/core/src/connection/manager/task.rs index 7d715516..4b7ca0dd 100644 --- a/core/src/connection/manager/task.rs +++ b/core/src/connection/manager/task.rs @@ -46,6 +46,9 @@ pub struct TaskId(pub(super) usize); pub enum Command { /// Notify the connection handler of an event. NotifyHandler(T), + /// Gracefully close the connection (active close) before + /// terminating the task. + Close, } /// Events that a task can emit to its manager. @@ -53,24 +56,27 @@ pub enum Command { pub enum Event { /// A connection to a node has succeeded. Established { id: TaskId, info: Connected }, - /// An established connection produced an error. - Error { id: TaskId, error: ConnectionError }, /// A pending connection failed. Failed { id: TaskId, error: PendingConnectionError, handler: H }, /// A node we are connected to has changed its address. AddressChange { id: TaskId, new_address: Multiaddr }, /// Notify the manager of an event from the connection. Notify { id: TaskId, event: T }, + /// A connection closed, possibly due to an error. + /// + /// If `error` is `None`, the connection has completed + /// an active orderly close. + Closed { id: TaskId, error: Option> } } impl Event { pub fn id(&self) -> &TaskId { match self { Event::Established { id, .. } => id, - Event::Error { id, .. } => id, Event::Failed { id, .. } => id, Event::AddressChange { id, .. } => id, Event::Notify { id, .. } => id, + Event::Closed { id, .. } => id, } } } @@ -131,7 +137,7 @@ where id, events, commands: commands.fuse(), - state: State::EstablishedPending(connection), + state: State::Established { connection, event: None }, } } } @@ -143,7 +149,7 @@ where H: IntoConnectionHandler, H::Handler: ConnectionHandler> { - /// The task is waiting for the connection to be established. + /// The connection is being negotiated. Pending { /// The future that will attempt to reach the node. // TODO: don't pin this Future; this requires deeper changes though @@ -152,20 +158,22 @@ where handler: H, }, - /// The connection is established and a new event is ready to be emitted. - EstablishedReady { - /// The node, if available. - connection: Option>, - /// The actual event message to send. - event: Event::Error, C> + /// The connection is established. + Established { + connection: Connection, + /// An event to send to the `Manager`. If `None`, the `connection` + /// is polled for new events in this state, otherwise the event + /// must be sent to the `Manager` before the connection can be + /// polled again. + event: Option::Error, C>> }, - /// The connection is established and pending a new event to occur. - EstablishedPending(Connection), - - /// The task is closing the connection. + /// The connection is closing (active close). Closing(Close), + /// The task is terminating with a final event for the `Manager`. + Terminating(Event::Error, C>), + /// The task has finished. Done } @@ -197,24 +205,27 @@ where 'poll: loop { match std::mem::replace(&mut this.state, State::Done) { State::Pending { mut future, handler } => { - // Check if the manager aborted this task by dropping the `commands` - // channel sender side. - match Stream::poll_next(Pin::new(&mut this.commands), cx) { + // Check whether the task is still registered with a `Manager` + // by polling the commands channel. + match this.commands.poll_next_unpin(cx) { Poll::Pending => {}, - Poll::Ready(None) => return Poll::Ready(()), - Poll::Ready(Some(Command::NotifyHandler(_))) => unreachable!( - "Manager does not allow sending commands to pending tasks.", + Poll::Ready(None) => { + // The manager has dropped the task; abort. + return Poll::Ready(()) + } + Poll::Ready(Some(_)) => panic!( + "Task received command while the connection is pending." ) } // Check if the connection succeeded. - match Future::poll(Pin::new(&mut future), cx) { + match future.poll_unpin(cx) { Poll::Ready(Ok((info, muxer))) => { - this.state = State::EstablishedReady { - connection: Some(Connection::new( + this.state = State::Established { + connection: Connection::new( muxer, handler.into_handler(&info), - )), - event: Event::Established { id, info } + ), + event: Some(Event::Established { id, info }) } } Poll::Pending => { @@ -222,120 +233,120 @@ where return Poll::Pending } Poll::Ready(Err(error)) => { + // Don't accept any further commands and terminate the + // task with a final event. + this.commands.get_mut().close(); let event = Event::Failed { id, handler, error }; - this.state = State::EstablishedReady { connection: None, event } + this.state = State::Terminating(event) } } } - State::EstablishedPending(mut connection) => { - // Start by handling commands received from the manager, if any. + State::Established { mut connection, event } => { + // Check for commands from the `Manager`. loop { - match Stream::poll_next(Pin::new(&mut this.commands), cx) { + match this.commands.poll_next_unpin(cx) { Poll::Pending => break, Poll::Ready(Some(Command::NotifyHandler(event))) => connection.inject_event(event), - Poll::Ready(None) => { - // The manager has dropped the task, thus initiate a - // graceful shutdown of the connection. + Poll::Ready(Some(Command::Close)) => { + // Don't accept any further commands. + this.commands.get_mut().close(); + // Discard the event, if any, and start a graceful close. this.state = State::Closing(connection.close()); continue 'poll } + Poll::Ready(None) => { + // The manager has dropped the task or disappeared; abort. + return Poll::Ready(()) + } } } - // Poll the connection for new events. - loop { + + if let Some(event) = event { + // Send the event to the manager. + match this.events.poll_ready(cx) { + Poll::Pending => { + this.state = State::Established { connection, event: Some(event) }; + return Poll::Pending + } + Poll::Ready(result) => { + if result.is_ok() { + if let Ok(()) = this.events.start_send(event) { + this.state = State::Established { connection, event: None }; + continue 'poll + } + } + // The manager is no longer reachable; abort. + return Poll::Ready(()) + } + } + } else { + // Poll the connection for new events. match Connection::poll(Pin::new(&mut connection), cx) { Poll::Pending => { - this.state = State::EstablishedPending(connection); + this.state = State::Established { connection, event: None }; return Poll::Pending } Poll::Ready(Ok(connection::Event::Handler(event))) => { - this.state = State::EstablishedReady { - connection: Some(connection), - event: Event::Notify { id, event } + this.state = State::Established { + connection, + event: Some(Event::Notify { id, event }) }; - continue 'poll } Poll::Ready(Ok(connection::Event::AddressChange(new_address))) => { - this.state = State::EstablishedReady { - connection: Some(connection), - event: Event::AddressChange { id, new_address } + this.state = State::Established { + connection, + event: Some(Event::AddressChange { id, new_address }) }; - continue 'poll } Poll::Ready(Err(error)) => { - // Notify the manager of the error via an event, - // dropping the connection. - let event = Event::Error { id, error }; - this.state = State::EstablishedReady { connection: None, event }; - continue 'poll + // Don't accept any further commands. + this.commands.get_mut().close(); + // Terminate the task with the error, dropping the connection. + let event = Event::Closed { id, error: Some(error) }; + this.state = State::Terminating(event); } } } } - // Deliver an event to the manager. - State::EstablishedReady { mut connection, event } => { - // Process commands received from the manager, if any. - loop { - match Stream::poll_next(Pin::new(&mut this.commands), cx) { - Poll::Pending => break, - Poll::Ready(Some(Command::NotifyHandler(event))) => - if let Some(ref mut c) = connection { - c.inject_event(event) - } - Poll::Ready(None) => - // The manager has dropped the task, thus initiate a - // graceful shutdown of the connection, if given. - if let Some(c) = connection { - this.state = State::Closing(c.close()); - continue 'poll - } else { - return Poll::Ready(()) - } + State::Closing(mut closing) => { + // Try to gracefully close the connection. + match closing.poll_unpin(cx) { + Poll::Ready(Ok(())) => { + let event = Event::Closed { id: this.id, error: None }; + this.state = State::Terminating(event); + } + Poll::Ready(Err(e)) => { + let event = Event::Closed { + id: this.id, + error: Some(ConnectionError::IO(e)) + }; + this.state = State::Terminating(event); } - } - // Send the event to the manager. - match this.events.poll_ready(cx) { Poll::Pending => { - self.state = State::EstablishedReady { connection, event }; + this.state = State::Closing(closing); return Poll::Pending } - Poll::Ready(Ok(())) => { - // We assume that if `poll_ready` has succeeded, then sending the event - // will succeed as well. If it turns out that it didn't, we will detect - // the closing at the next loop iteration. - let _ = this.events.start_send(event); - if let Some(c) = connection { - this.state = State::EstablishedPending(c) - } else { - // The connection has been dropped, thus this was the last event - // to send to the manager and the task is done. - return Poll::Ready(()) - } - }, - Poll::Ready(Err(_)) => { - // The manager is no longer reachable, maybe due to - // application shutdown. Try a graceful shutdown of the - // connection, if available, and end the task. - if let Some(c) = connection { - this.state = State::Closing(c.close()); - continue 'poll - } - return Poll::Ready(()) - } } } - State::Closing(mut closing) => - match Future::poll(Pin::new(&mut closing), cx) { - Poll::Ready(_) => return Poll::Ready(()), // end task + State::Terminating(event) => { + // Try to deliver the final event. + match this.events.poll_ready(cx) { Poll::Pending => { - this.state = State::Closing(closing); + self.state = State::Terminating(event); return Poll::Pending } + Poll::Ready(result) => { + if result.is_ok() { + let _ = this.events.start_send(event); + } + return Poll::Ready(()) + } } + } State::Done => panic!("`Task::poll()` called after completion.") } diff --git a/core/src/connection/pool.rs b/core/src/connection/pool.rs index d89308c5..e27fde27 100644 --- a/core/src/connection/pool.rs +++ b/core/src/connection/pool.rs @@ -64,6 +64,12 @@ pub struct Pool)>, + + /// Established connections that have been closed in the context of + /// a [`Pool::disconnect`] in order to emit a `ConnectionClosed` + /// event for each. Every `ConnectionEstablished` event must be + /// paired with (eventually) a `ConnectionClosed`. + disconnected: Vec>, } impl fmt::Debug @@ -84,17 +90,28 @@ for Pool { /// A new connection has been established. ConnectionEstablished { - connection: EstablishedConnection<'a, TInEvent, TConnInfo, TPeerId>, + connection: EstablishedConnection<'a, TInEvent, TConnInfo>, num_established: NonZeroU32, }, - /// An established connection has encountered an error. - ConnectionError { + /// An established connection was closed. + /// + /// A connection may close if + /// + /// * it encounters an error, which includes the connection being + /// closed by the remote. In this case `error` is `Some`. + /// * it was actively closed by [`EstablishedConnection::start_close`], + /// i.e. a successful, orderly close. + /// * it was actively closed by [`Pool::disconnect`], i.e. + /// dropped without an orderly close. + /// + ConnectionClosed { id: ConnectionId, /// Information about the connection that errored. connected: Connected, - /// The error that occurred. - error: ConnectionError, + /// The error that occurred, if any. If `None`, the connection + /// was closed by the local peer. + error: Option>, /// A reference to the pool that used to manage the connection. pool: &'a mut Pool, /// The remaining number of established connections to the same peer. @@ -121,7 +138,7 @@ pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TC /// A node has produced an event. ConnectionEvent { /// The connection that has generated the event. - connection: EstablishedConnection<'a, TInEvent, TConnInfo, TPeerId>, + connection: EstablishedConnection<'a, TInEvent, TConnInfo>, /// The produced event. event: TOutEvent, }, @@ -129,7 +146,7 @@ pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TC /// The connection to a node has changed its address. AddressChange { /// The connection that has changed address. - connection: EstablishedConnection<'a, TInEvent, TConnInfo, TPeerId>, + connection: EstablishedConnection<'a, TInEvent, TConnInfo>, /// The new endpoint. new_endpoint: ConnectedPoint, /// The old endpoint. @@ -153,8 +170,8 @@ where .field(connection) .finish() }, - PoolEvent::ConnectionError { ref id, ref connected, ref error, .. } => { - f.debug_struct("PoolEvent::ConnectionError") + PoolEvent::ConnectionClosed { ref id, ref connected, ref error, .. } => { + f.debug_struct("PoolEvent::ConnectionClosed") .field("id", id) .field("connected", connected) .field("error", error) @@ -200,6 +217,7 @@ where manager: Manager::new(manager_config), established: Default::default(), pending: Default::default(), + disconnected: Vec::new(), } } @@ -392,8 +410,7 @@ where match self.manager.entry(id) { Some(manager::Entry::Established(entry)) => Some(PoolConnection::Established(EstablishedConnection { - entry, - established: &mut self.established, + entry })), Some(manager::Entry::Pending(entry)) => Some(PoolConnection::Pending(PendingConnection { @@ -406,7 +423,7 @@ where /// Gets an established connection from the pool by ID. pub fn get_established(&mut self, id: ConnectionId) - -> Option> + -> Option> { match self.get(id) { Some(PoolConnection::Established(c)) => Some(c), @@ -445,25 +462,49 @@ where self.established.len() } - /// Close all connections to the given peer. + /// (Forcefully) close all connections to the given peer. + /// + /// All connections to the peer, whether pending or established are + /// dropped asap and no more events from these connections are emitted + /// by the pool effective immediately. + /// + /// > **Note**: Established connections are dropped without performing + /// > an orderly close. See [`EstablishedConnection::start_close`] for + /// > performing such an orderly close. pub fn disconnect(&mut self, peer: &TPeerId) { if let Some(conns) = self.established.get(peer) { - for id in conns.keys() { - match self.manager.entry(*id) { - Some(manager::Entry::Established(e)) => { e.close(); }, + // Count upwards because we push to / pop from the end. See also `Pool::poll`. + let mut num_established = 0; + for &id in conns.keys() { + match self.manager.entry(id) { + Some(manager::Entry::Established(e)) => { + let connected = e.remove(); + self.disconnected.push(Disconnected { + id, connected, num_established + }); + num_established += 1; + }, _ => {} } } } + self.established.remove(peer); - for (id, (_endpoint, peer2)) in &self.pending { + let mut aborted = Vec::new(); + for (&id, (_endpoint, peer2)) in &self.pending { if Some(peer) == peer2.as_ref() { - match self.manager.entry(*id) { - Some(manager::Entry::Pending(e)) => { e.abort(); }, + match self.manager.entry(id) { + Some(manager::Entry::Pending(e)) => { + e.abort(); + aborted.push(id); + }, _ => {} } } } + for id in aborted { + self.pending.remove(&id); + } } /// Counts the number of established connections in the pool. @@ -568,6 +609,26 @@ where TConnInfo: ConnectionInfo + Clone, TPeerId: Clone { + // Drain events resulting from forced disconnections. + // + // Note: The `Disconnected` entries in `self.disconnected` + // are inserted in ascending order of the remaining `num_established` + // connections. Thus we `pop()` them off from the end to emit the + // events in an order that properly counts down `num_established`. + // See also `Pool::disconnect`. + while let Some(Disconnected { + id, connected, num_established + }) = self.disconnected.pop() { + return Poll::Ready(PoolEvent::ConnectionClosed { + id, + connected, + num_established, + error: None, + pool: self, + }) + } + + // Poll the connection `Manager`. loop { let item = match self.manager.poll(cx) { Poll::Ready(item) => item, @@ -587,7 +648,7 @@ where }) } }, - manager::Event::ConnectionError { id, connected, error } => { + manager::Event::ConnectionClosed { id, connected, error } => { let num_established = if let Some(conns) = self.established.get_mut(connected.peer_id()) { conns.remove(&id); @@ -598,10 +659,10 @@ where if num_established == 0 { self.established.remove(connected.peer_id()); } - return Poll::Ready(PoolEvent::ConnectionError { + return Poll::Ready(PoolEvent::ConnectionClosed { id, connected, error, num_established, pool: self }) - }, + } manager::Event::ConnectionEstablished { entry } => { let id = entry.id(); if let Some((endpoint, peer)) = self.pending.remove(&id) { @@ -610,7 +671,7 @@ where let current = || established.get(entry.connected().peer_id()) .map_or(0, |conns| conns.len()); if let Err(e) = self.limits.check_established(current) { - let connected = entry.close(); + let connected = entry.remove(); return Poll::Ready(PoolEvent::PendingConnectionError { id, endpoint: connected.endpoint, @@ -686,7 +747,7 @@ where /// A connection in a [`Pool`]. pub enum PoolConnection<'a, TInEvent, TConnInfo, TPeerId> { Pending(PendingConnection<'a, TInEvent, TConnInfo, TPeerId>), - Established(EstablishedConnection<'a, TInEvent, TConnInfo, TPeerId>), + Established(EstablishedConnection<'a, TInEvent, TConnInfo>), } /// A pending connection in a [`Pool`]. @@ -721,13 +782,12 @@ impl } /// An established connection in a [`Pool`]. -pub struct EstablishedConnection<'a, TInEvent, TConnInfo, TPeerId> { +pub struct EstablishedConnection<'a, TInEvent, TConnInfo> { entry: manager::EstablishedEntry<'a, TInEvent, TConnInfo>, - established: &'a mut FnvHashMap>, } -impl fmt::Debug -for EstablishedConnection<'_, TInEvent, TConnInfo, TPeerId> +impl fmt::Debug +for EstablishedConnection<'_, TInEvent, TConnInfo> where TInEvent: fmt::Debug, TConnInfo: fmt::Debug, @@ -739,9 +799,7 @@ where } } -impl - EstablishedConnection<'_, TInEvent, TConnInfo, TPeerId> -{ +impl EstablishedConnection<'_, TInEvent, TConnInfo> { pub fn connected(&self) -> &Connected { self.entry.connected() } @@ -757,11 +815,9 @@ impl } } -impl - EstablishedConnection<'_, TInEvent, TConnInfo, TPeerId> +impl<'a, TInEvent, TConnInfo> EstablishedConnection<'a, TInEvent, TConnInfo> where - TConnInfo: ConnectionInfo, - TPeerId: Eq + Hash + Clone, + TConnInfo: ConnectionInfo, { /// Returns the local connection ID. pub fn id(&self) -> ConnectionId { @@ -769,7 +825,7 @@ where } /// Returns the identity of the connected peer. - pub fn peer_id(&self) -> &TPeerId { + pub fn peer_id(&self) -> &TConnInfo::PeerId { self.info().peer_id() } @@ -797,24 +853,11 @@ where self.entry.poll_ready_notify_handler(cx) } - /// Closes the connection, returning the connection information. - pub fn close(self) -> Connected { - let id = self.entry.id(); - let info = self.entry.close(); - - let empty = - if let Some(conns) = self.established.get_mut(info.peer_id()) { - conns.remove(&id); - conns.is_empty() - } else { - false - }; - - if empty { - self.established.remove(info.peer_id()); - } - - info + /// Initiates a graceful close of the connection. + /// + /// Has no effect if the connection is already closing. + pub fn start_close(self) { + self.entry.start_close() } } @@ -833,16 +876,15 @@ where I: Iterator { /// Obtains the next connection, if any. - pub fn next<'b>(&'b mut self) -> Option> + pub fn next<'b>(&'b mut self) -> Option> { while let Some(id) = self.ids.next() { if self.pool.manager.is_established(&id) { // (*) match self.pool.manager.entry(id) { Some(manager::Entry::Established(entry)) => { - let established = &mut self.pool.established; - return Some(EstablishedConnection { entry, established }) + return Some(EstablishedConnection { entry }) } - _ => unreachable!("by (*)") + _ => panic!("Established entry not found in manager.") // see (*) } } } @@ -856,17 +898,16 @@ where /// Returns the first connection, if any, consuming the iterator. pub fn into_first<'b>(mut self) - -> Option> + -> Option> where 'a: 'b { while let Some(id) = self.ids.next() { if self.pool.manager.is_established(&id) { // (*) match self.pool.manager.entry(id) { Some(manager::Entry::Established(entry)) => { - let established = &mut self.pool.established; - return Some(EstablishedConnection { entry, established }) + return Some(EstablishedConnection { entry }) } - _ => unreachable!("by (*)") + _ => panic!("Established entry not found in manager.") // see (*) } } } @@ -925,3 +966,13 @@ impl PoolLimits { Ok(()) } } + +/// Information about a former established connection to a peer +/// that was dropped via [`Pool::disconnect`]. +struct Disconnected { + id: ConnectionId, + connected: Connected, + /// The remaining number of established connections + /// to the same peer. + num_established: u32, +} diff --git a/core/src/network.rs b/core/src/network.rs index e1220143..36360dad 100644 --- a/core/src/network.rs +++ b/core/src/network.rs @@ -405,12 +405,12 @@ where } event } - Poll::Ready(PoolEvent::ConnectionError { id, connected, error, num_established, .. }) => { - NetworkEvent::ConnectionError { + Poll::Ready(PoolEvent::ConnectionClosed { id, connected, error, num_established, .. }) => { + NetworkEvent::ConnectionClosed { id, connected, - error, num_established, + error, } } Poll::Ready(PoolEvent::ConnectionEvent { connection, event }) => { diff --git a/core/src/network/event.rs b/core/src/network/event.rs index 5e06be51..c740cf0f 100644 --- a/core/src/network/event.rs +++ b/core/src/network/event.rs @@ -101,25 +101,34 @@ where error: PendingConnectionError, }, - /// A new connection to a peer has been opened. + /// A new connection to a peer has been established. ConnectionEstablished { /// The newly established connection. - connection: EstablishedConnection<'a, TInEvent, TConnInfo, TPeerId>, - /// The total number of established connections to the same peer, including the one that - /// has just been opened. + connection: EstablishedConnection<'a, TInEvent, TConnInfo>, + /// The total number of established connections to the same peer, + /// including the one that has just been opened. num_established: NonZeroU32, }, - /// An established connection to a peer has encountered an error. + /// An established connection to a peer has been closed. /// - /// The connection is closed as a result of the error. - ConnectionError { + /// A connection may close if + /// + /// * it encounters an error, which includes the connection being + /// closed by the remote. In this case `error` is `Some`. + /// * it was actively closed by [`EstablishedConnection::start_close`], + /// i.e. a successful, orderly close. In this case `error` is `None`. + /// * it was actively closed by [`super::peer::ConnectedPeer::disconnect`] or + /// [`super::peer::DialingPeer::disconnect`], i.e. dropped without an + /// orderly close. In this case `error` is `None`. + /// + ConnectionClosed { /// The ID of the connection that encountered an error. id: ConnectionId, /// Information about the connection that encountered the error. connected: Connected, /// The error that occurred. - error: ConnectionError<::Error>, + error: Option::Error>>, /// The remaining number of established connections to the same peer. num_established: u32, }, @@ -151,7 +160,7 @@ where /// An established connection produced an event. ConnectionEvent { /// The connection on which the event occurred. - connection: EstablishedConnection<'a, TInEvent, TConnInfo, TPeerId>, + connection: EstablishedConnection<'a, TInEvent, TConnInfo>, /// Event that was produced by the node. event: TOutEvent, }, @@ -159,7 +168,7 @@ where /// An established connection has changed its address. AddressChange { /// The connection whose address has changed. - connection: EstablishedConnection<'a, TInEvent, TConnInfo, TPeerId>, + connection: EstablishedConnection<'a, TInEvent, TConnInfo>, /// New endpoint of this connection. new_endpoint: ConnectedPoint, /// Old endpoint of this connection. @@ -224,8 +233,9 @@ where .field("connection", connection) .finish() } - NetworkEvent::ConnectionError { connected, error, .. } => { - f.debug_struct("ConnectionError") + NetworkEvent::ConnectionClosed { id, connected, error, .. } => { + f.debug_struct("ConnectionClosed") + .field("id", id) .field("connected", connected) .field("error", error) .finish() diff --git a/core/src/network/peer.rs b/core/src/network/peer.rs index 8f9dd099..29664047 100644 --- a/core/src/network/peer.rs +++ b/core/src/network/peer.rs @@ -313,7 +313,7 @@ where /// Obtains an established connection to the peer by ID. pub fn connection<'b>(&'b mut self, id: ConnectionId) - -> Option> + -> Option> { self.network.pool.get_established(id) } @@ -359,7 +359,7 @@ where /// Obtains some established connection to the peer. pub fn some_connection<'b>(&'b mut self) - -> EstablishedConnection<'b, TInEvent, TConnInfo, TPeerId> + -> EstablishedConnection<'b, TInEvent, TConnInfo> { self.connections() .into_first() diff --git a/muxers/mplex/CHANGELOG.md b/muxers/mplex/CHANGELOG.md index 16b0a7ef..59db446b 100644 --- a/muxers/mplex/CHANGELOG.md +++ b/muxers/mplex/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.21.0 [unreleased] + +- Bump `libp2p-core` dependency. + # 0.20.0 [2020-07-01] - Update `libp2p-core`, i.e. `StreamMuxer::poll_inbound` has been renamed diff --git a/muxers/mplex/Cargo.toml b/muxers/mplex/Cargo.toml index 936bc0bd..2c580cea 100644 --- a/muxers/mplex/Cargo.toml +++ b/muxers/mplex/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-mplex" edition = "2018" description = "Mplex multiplexing protocol for libp2p" -version = "0.20.0" +version = "0.21.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -14,7 +14,7 @@ bytes = "0.5" fnv = "1.0" futures = "0.3.1" futures_codec = "0.4" -libp2p-core = { version = "0.20.0", path = "../../core" } +libp2p-core = { version = "0.21.0", path = "../../core" } log = "0.4" parking_lot = "0.10" unsigned-varint = { version = "0.4", features = ["futures-codec"] } diff --git a/muxers/yamux/CHANGELOG.md b/muxers/yamux/CHANGELOG.md index 422e1524..71a16f3f 100644 --- a/muxers/yamux/CHANGELOG.md +++ b/muxers/yamux/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.21.0 [unreleased] + +- Bump `libp2p-core` dependency. + # 0.20.0 [2020-07-01] - Update `libp2p-core`, i.e. `StreamMuxer::poll_inbound` has been renamed diff --git a/muxers/yamux/Cargo.toml b/muxers/yamux/Cargo.toml index 5d472e38..fa6a9d93 100644 --- a/muxers/yamux/Cargo.toml +++ b/muxers/yamux/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-yamux" edition = "2018" description = "Yamux multiplexing protocol for libp2p" -version = "0.20.0" +version = "0.21.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" -libp2p-core = { version = "0.20.0", path = "../../core" } +libp2p-core = { version = "0.21.0", path = "../../core" } parking_lot = "0.10" thiserror = "1.0" yamux = "0.4.5" diff --git a/protocols/deflate/CHANGELOG.md b/protocols/deflate/CHANGELOG.md index d9b606ea..7d1fdaf1 100644 --- a/protocols/deflate/CHANGELOG.md +++ b/protocols/deflate/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.21.0 [unreleased] + +- Bump `libp2p-core` dependency. + # 0.20.0 [2020-07-01] - Updated dependencies. diff --git a/protocols/deflate/Cargo.toml b/protocols/deflate/Cargo.toml index b96a7495..35526a2e 100644 --- a/protocols/deflate/Cargo.toml +++ b/protocols/deflate/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-deflate" edition = "2018" description = "Deflate encryption protocol for libp2p" -version = "0.20.0" +version = "0.21.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" -libp2p-core = { version = "0.20.0", path = "../../core" } +libp2p-core = { version = "0.21.0", path = "../../core" } flate2 = "1.0" [dev-dependencies] diff --git a/protocols/floodsub/CHANGELOG.md b/protocols/floodsub/CHANGELOG.md index c39adc29..8b5db418 100644 --- a/protocols/floodsub/CHANGELOG.md +++ b/protocols/floodsub/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.21.0 [unreleased] + +- Bump `libp2p-core` and `libp2p-swarm` dependency. + # 0.20.0 [2020-07-01] - Updated dependencies. diff --git a/protocols/floodsub/Cargo.toml b/protocols/floodsub/Cargo.toml index f293c500..974539c5 100644 --- a/protocols/floodsub/Cargo.toml +++ b/protocols/floodsub/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-floodsub" edition = "2018" description = "Floodsub protocol for libp2p" -version = "0.20.0" +version = "0.21.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -13,8 +13,8 @@ categories = ["network-programming", "asynchronous"] cuckoofilter = "0.3.2" fnv = "1.0" futures = "0.3.1" -libp2p-core = { version = "0.20.0", path = "../../core" } -libp2p-swarm = { version = "0.20.0", path = "../../swarm" } +libp2p-core = { version = "0.21.0", path = "../../core" } +libp2p-swarm = { version = "0.21.0", path = "../../swarm" } prost = "0.6.1" rand = "0.7" smallvec = "1.0" diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index b1a156f3..aa1b818d 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,7 +1,9 @@ -# 0.??.? [????-??-??] +# 0.21.0 [unreleased] - `Debug` instance for `Gossipsub`. [PR 1673](https://github.com/libp2p/rust-libp2p/pull/1673). +- Bump `libp2p-core` and `libp2p-swarm` dependency. + # 0.20.0 [2020-07-01] - Updated dependencies. diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 1541d6f9..fe6395bb 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-gossipsub" edition = "2018" description = "Gossipsub protocol for libp2p" -version = "0.20.0" +version = "0.21.0" authors = ["Age Manning "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -10,8 +10,8 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -libp2p-swarm = { version = "0.20.0", path = "../../swarm" } -libp2p-core = { version = "0.20.0", path = "../../core" } +libp2p-swarm = { version = "0.21.0", path = "../../swarm" } +libp2p-core = { version = "0.21.0", path = "../../core" } bytes = "0.5.4" byteorder = "1.3.2" fnv = "1.0.6" diff --git a/protocols/identify/CHANGELOG.md b/protocols/identify/CHANGELOG.md index d9b606ea..1d8e9ad6 100644 --- a/protocols/identify/CHANGELOG.md +++ b/protocols/identify/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.21.0 [unreleased] + +- Bump `libp2p-core` and `libp2p-swarm` dependencies. + # 0.20.0 [2020-07-01] - Updated dependencies. diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index 043a3bda..d6b79751 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-identify" edition = "2018" description = "Nodes identifcation protocol for libp2p" -version = "0.20.0" +version = "0.21.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -11,8 +11,8 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" -libp2p-core = { version = "0.20.0", path = "../../core" } -libp2p-swarm = { version = "0.20.0", path = "../../swarm" } +libp2p-core = { version = "0.21.0", path = "../../core" } +libp2p-swarm = { version = "0.21.0", path = "../../swarm" } log = "0.4.1" prost = "0.6.1" smallvec = "1.0" diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index 44cb3f18..57cfa801 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.22.0 [unreleased] + +- Update `libp2p-core` and `libp2p-swarm` dependencies. + # 0.21.0 [2020-07-01] - Remove `KademliaEvent::Discovered` diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index 272a4279..fb5d47ed 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-kad" edition = "2018" description = "Kademlia protocol for libp2p" -version = "0.21.0" +version = "0.22.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -17,8 +17,8 @@ fnv = "1.0" futures_codec = "0.4" futures = "0.3.1" log = "0.4" -libp2p-core = { version = "0.20.0", path = "../../core" } -libp2p-swarm = { version = "0.20.0", path = "../../swarm" } +libp2p-core = { version = "0.21.0", path = "../../core" } +libp2p-swarm = { version = "0.21.0", path = "../../swarm" } multihash = "0.11.0" prost = "0.6.1" rand = "0.7.2" diff --git a/protocols/mdns/CHANGELOG.md b/protocols/mdns/CHANGELOG.md index d9b606ea..1d8e9ad6 100644 --- a/protocols/mdns/CHANGELOG.md +++ b/protocols/mdns/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.21.0 [unreleased] + +- Bump `libp2p-core` and `libp2p-swarm` dependencies. + # 0.20.0 [2020-07-01] - Updated dependencies. diff --git a/protocols/mdns/Cargo.toml b/protocols/mdns/Cargo.toml index cb7de95e..9d86bd35 100644 --- a/protocols/mdns/Cargo.toml +++ b/protocols/mdns/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "libp2p-mdns" edition = "2018" -version = "0.20.0" +version = "0.21.0" description = "Implementation of the libp2p mDNS discovery method" authors = ["Parity Technologies "] license = "MIT" @@ -16,8 +16,8 @@ dns-parser = "0.8" either = "1.5.3" futures = "0.3.1" lazy_static = "1.2" -libp2p-core = { version = "0.20.0", path = "../../core" } -libp2p-swarm = { version = "0.20.0", path = "../../swarm" } +libp2p-core = { version = "0.21.0", path = "../../core" } +libp2p-swarm = { version = "0.21.0", path = "../../swarm" } log = "0.4" net2 = "0.2" rand = "0.7" diff --git a/protocols/noise/CHANGELOG.md b/protocols/noise/CHANGELOG.md index b30920fb..0eb2c757 100644 --- a/protocols/noise/CHANGELOG.md +++ b/protocols/noise/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.23.0 [unreleased] + +- Bump `libp2p-core` dependency. + # 0.22.0 [2020-08-03] **NOTE**: For a smooth upgrade path from `0.20` to `> 0.21` diff --git a/protocols/noise/Cargo.toml b/protocols/noise/Cargo.toml index d3bef686..cfc9e70f 100644 --- a/protocols/noise/Cargo.toml +++ b/protocols/noise/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "libp2p-noise" description = "Cryptographic handshake protocol using the noise framework." -version = "0.22.0" +version = "0.23.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -12,7 +12,7 @@ bytes = "0.5" curve25519-dalek = "2.0.0" futures = "0.3.1" lazy_static = "1.2" -libp2p-core = { version = "0.20.0", path = "../../core" } +libp2p-core = { version = "0.21.0", path = "../../core" } log = "0.4" prost = "0.6.1" rand = "0.7.2" diff --git a/protocols/ping/CHANGELOG.md b/protocols/ping/CHANGELOG.md index d7cc7e2f..6d82b843 100644 --- a/protocols/ping/CHANGELOG.md +++ b/protocols/ping/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.21.0 [unreleased] + +- Bump `libp2p-core` and `libp2p-swarm` dependencies. + # 0.20.0 [2020-07-01] - Updated dependencies. diff --git a/protocols/ping/Cargo.toml b/protocols/ping/Cargo.toml index 695199b9..669267af 100644 --- a/protocols/ping/Cargo.toml +++ b/protocols/ping/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-ping" edition = "2018" description = "Ping protocol for libp2p" -version = "0.20.0" +version = "0.21.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -11,8 +11,8 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" -libp2p-core = { version = "0.20.0", path = "../../core" } -libp2p-swarm = { version = "0.20.0", path = "../../swarm" } +libp2p-core = { version = "0.21.0", path = "../../core" } +libp2p-swarm = { version = "0.21.0", path = "../../swarm" } log = "0.4.1" rand = "0.7.2" void = "1.0" diff --git a/protocols/plaintext/CHANGELOG.md b/protocols/plaintext/CHANGELOG.md index c39adc29..0cc1cfc5 100644 --- a/protocols/plaintext/CHANGELOG.md +++ b/protocols/plaintext/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.21.0 [unreleased] + +- Bump `libp2p-core` dependency. + # 0.20.0 [2020-07-01] - Updated dependencies. diff --git a/protocols/plaintext/Cargo.toml b/protocols/plaintext/Cargo.toml index a1b9e6a9..37fc94df 100644 --- a/protocols/plaintext/Cargo.toml +++ b/protocols/plaintext/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-plaintext" edition = "2018" description = "Plaintext encryption dummy protocol for libp2p" -version = "0.20.0" +version = "0.21.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"] bytes = "0.5" futures = "0.3.1" futures_codec = "0.4.0" -libp2p-core = { version = "0.20.0", path = "../../core" } +libp2p-core = { version = "0.21.0", path = "../../core" } log = "0.4.8" prost = "0.6.1" rw-stream-sink = "0.2.0" diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index b4d45ca4..2b5df6ac 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.2.0 + +- Bump `libp2p-core` and `libp2p-swarm` dependencies. + # 0.1.1 - Always properly `close()` the substream after sending requests and diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index 79ee83d6..aaf1b993 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-request-response" edition = "2018" description = "Generic Request/Response Protocols" -version = "0.1.1" +version = "0.2.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -12,8 +12,8 @@ categories = ["network-programming", "asynchronous"] [dependencies] async-trait = "0.1" futures = "0.3.1" -libp2p-core = { version = "0.20.0", path = "../../core" } -libp2p-swarm = { version = "0.20.0", path = "../../swarm" } +libp2p-core = { version = "0.21.0", path = "../../core" } +libp2p-swarm = { version = "0.21.0", path = "../../swarm" } smallvec = "1.4" wasm-timer = "0.2" diff --git a/protocols/secio/CHANGELOG.md b/protocols/secio/CHANGELOG.md index 8f9fce13..7e0942d7 100644 --- a/protocols/secio/CHANGELOG.md +++ b/protocols/secio/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.21.0 [unreleased] + +- Bump `libp2p-core` dependency. + # 0.20.0 [2020-07-01] - Updated dependencies. diff --git a/protocols/secio/Cargo.toml b/protocols/secio/Cargo.toml index ae45eda0..857cc8f9 100644 --- a/protocols/secio/Cargo.toml +++ b/protocols/secio/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-secio" edition = "2018" description = "Secio encryption protocol for libp2p" -version = "0.20.0" +version = "0.21.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -16,7 +16,7 @@ ctr = "0.3" futures = "0.3.1" hmac = "0.7.0" lazy_static = "1.2.0" -libp2p-core = { version = "0.20.0", path = "../../core" } +libp2p-core = { version = "0.21.0", path = "../../core" } log = "0.4.6" prost = "0.6.1" pin-project = "0.4.17" diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 9208db10..1efa6461 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,3 +1,15 @@ +# 0.21.0 [unreleased] + +- The `cause` of `SwarmEvent::ConnectionClosed` is now an `Option`, +and `None` indicates an active connection close not caused by an +error. + +- `DialError::Banned` has been added and is returned from `Swarm::dial` +if the peer is banned, thereby also invoking the `NetworkBehaviour::inject_dial_failure` +callback. + +- Update the `libp2p-core` dependency to `0.21`, fixing [1584](https://github.com/libp2p/rust-libp2p/issues/1584). + # 0.20.1 [2020-07-08] - Documentation updates. diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index 99848420..72d43c38 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-swarm" edition = "2018" description = "The libp2p swarm" -version = "0.20.1" +version = "0.21.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" -libp2p-core = { version = "0.20.0", path = "../core" } +libp2p-core = { version = "0.21.0", path = "../core" } log = "0.4" rand = "0.7" smallvec = "1.0" @@ -20,5 +20,6 @@ void = "1" [dev-dependencies] libp2p-mplex = { path = "../muxers/mplex" } +libp2p-secio = { path = "../protocols/secio" } quickcheck = "0.9.0" rand = "0.7.2" diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 84e69183..f2ad2266 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -55,6 +55,8 @@ mod behaviour; mod registry; +#[cfg(test)] +mod test; mod upgrade; pub mod protocols_handler; @@ -156,7 +158,8 @@ pub enum SwarmEvent { /// opened. num_established: NonZeroU32, }, - /// A connection with the given peer has been closed. + /// A connection with the given peer has been closed, + /// possibly as a result of an error. ConnectionClosed { /// Identity of the peer that we have connected to. peer_id: PeerId, @@ -164,8 +167,9 @@ pub enum SwarmEvent { endpoint: ConnectedPoint, /// Number of other remaining connections to this same peer. num_established: u32, - /// Reason for the disconnection. - cause: ConnectionError>, + /// Reason for the disconnection, if it was not a successful + /// active close. + cause: Option>>, }, /// A new connection arrived on a listener and is in the process of protocol negotiation. /// @@ -366,22 +370,19 @@ where TBehaviour: NetworkBehaviour, me.network.remove_listener(id) } - /// Tries to dial the given address. - /// - /// Returns an error if the address is not supported. + /// Initiates a new dialing attempt to the given address. pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), ConnectionLimit> { let handler = me.behaviour.new_handler(); me.network.dial(&addr, handler.into_node_handler_builder()).map(|_id| ()) } - /// Tries to initiate a dialing attempt to the given peer. - /// - /// If a new dialing attempt has been initiated, `Ok(true)` is returned. - /// - /// If no new dialing attempt has been initiated, meaning there is an ongoing - /// dialing attempt or `addresses_of_peer` reports no addresses, `Ok(false)` - /// is returned. + /// Initiates a new dialing attempt to the given peer. pub fn dial(me: &mut Self, peer_id: &PeerId) -> Result<(), DialError> { + if me.banned_peers.contains(peer_id) { + me.behaviour.inject_dial_failure(peer_id); + return Err(DialError::Banned) + } + let self_listening = &me.listened_addrs; let mut addrs = me.behaviour.addresses_of_peer(peer_id) .into_iter() @@ -446,11 +447,12 @@ where TBehaviour: NetworkBehaviour, /// Bans a peer by its peer ID. /// /// Any incoming connection and any dialing attempt will immediately be rejected. - /// This function has no effect is the peer is already banned. + /// This function has no effect if the peer is already banned. pub fn ban_peer_id(me: &mut Self, peer_id: PeerId) { - me.banned_peers.insert(peer_id.clone()); - if let Some(c) = me.network.peer(peer_id).into_connected() { - c.disconnect(); + if me.banned_peers.insert(peer_id.clone()) { + if let Some(peer) = me.network.peer(peer_id).into_connected() { + peer.disconnect(); + } } } @@ -529,8 +531,12 @@ where TBehaviour: NetworkBehaviour, }); } }, - Poll::Ready(NetworkEvent::ConnectionError { id, connected, error, num_established }) => { - log::debug!("Connection {:?} closed: {:?}", connected, error); + Poll::Ready(NetworkEvent::ConnectionClosed { id, connected, error, num_established }) => { + if let Some(error) = error.as_ref() { + log::debug!("Connection {:?} closed: {:?}", connected, error); + } else { + log::debug!("Connection {:?} closed (active close).", connected); + } let info = connected.info; let endpoint = connected.endpoint; this.behaviour.inject_connection_closed(info.peer_id(), &id, &endpoint); @@ -776,14 +782,13 @@ enum PendingNotifyHandler { /// /// Returns `None` if the connection is closing or the event has been /// successfully sent, in either case the event is consumed. -fn notify_one<'a, TInEvent, TConnInfo, TPeerId>( - conn: &mut EstablishedConnection<'a, TInEvent, TConnInfo, TPeerId>, +fn notify_one<'a, TInEvent, TConnInfo>( + conn: &mut EstablishedConnection<'a, TInEvent, TConnInfo>, event: TInEvent, cx: &mut Context<'_>, ) -> Option where - TPeerId: Eq + std::hash::Hash + Clone, - TConnInfo: ConnectionInfo + TConnInfo: ConnectionInfo { match conn.poll_ready_notify_handler(cx) { Poll::Pending => Some(event), @@ -1124,6 +1129,8 @@ where TBehaviour: NetworkBehaviour, /// The possible failures of [`ExpandedSwarm::dial`]. #[derive(Debug)] pub enum DialError { + /// The peer is currently banned. + Banned, /// The configured limit for simultaneous outgoing connections /// has been reached. ConnectionLimit(ConnectionLimit), @@ -1136,7 +1143,8 @@ impl fmt::Display for DialError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { DialError::ConnectionLimit(err) => write!(f, "Dial error: {}", err), - DialError::NoAddresses => write!(f, "Dial error: no addresses for peer.") + DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."), + DialError::Banned => write!(f, "Dial error: peer is banned.") } } } @@ -1145,7 +1153,8 @@ impl error::Error for DialError { fn source(&self) -> Option<&(dyn error::Error + 'static)> { match self { DialError::ConnectionLimit(err) => Some(err), - DialError::NoAddresses => None + DialError::NoAddresses => None, + DialError::Banned => None } } } @@ -1184,24 +1193,45 @@ impl NetworkBehaviour for DummyBehaviour { { Poll::Pending } - } #[cfg(test)] mod tests { - use crate::{DummyBehaviour, SwarmBuilder}; + use crate::protocols_handler::DummyProtocolsHandler; + use crate::test::{MockBehaviour, CallTraceBehaviour}; + use futures::{future, executor}; use libp2p_core::{ - PeerId, - PublicKey, identity, - transport::dummy::{DummyStream, DummyTransport} + upgrade, + multiaddr, + transport::{self, dummy::*} }; use libp2p_mplex::Multiplex; + use super::*; - fn get_random_id() -> PublicKey { + fn get_random_id() -> identity::PublicKey { identity::Keypair::generate_ed25519().public() } + fn new_test_swarm(handler_proto: T) -> Swarm>> + where + T: ProtocolsHandler + Clone, + T::OutEvent: Clone, + O: Send + 'static + { + let keypair1 = identity::Keypair::generate_ed25519(); + let pubkey1 = keypair1.public(); + let transport1 = transport::MemoryTransport::default() + .upgrade(upgrade::Version::V1) + .authenticate(libp2p_secio::SecioConfig::new(keypair1)) + .multiplex(libp2p_mplex::MplexConfig::new()) + .map(|(p, m), _| (p, StreamMuxerBox::new(m))) + .map_err(|e| -> io::Error { panic!("Failed to create transport: {:?}", e); }) + .boxed(); + let behaviour1 = CallTraceBehaviour::new(MockBehaviour::new(handler_proto)); + SwarmBuilder::new(transport1, behaviour1, pubkey1.into()).build() + } + #[test] fn test_build_swarm() { let id = get_random_id(); @@ -1220,4 +1250,108 @@ mod tests { let swarm = SwarmBuilder::new(transport, DummyBehaviour {}, id.into()).build(); assert!(swarm.network.incoming_limit().is_none()) } + + /// Establishes a number of connections between two peers, + /// after which one peer bans the other. + /// + /// The test expects both behaviours to be notified via pairs of + /// inject_connected / inject_disconnected as well as + /// inject_connection_established / inject_connection_closed calls. + #[test] + fn test_connect_disconnect_ban() { + // Since the test does not try to open any substreams, we can + // use the dummy protocols handler. + let mut handler_proto = DummyProtocolsHandler::default(); + handler_proto.keep_alive = KeepAlive::Yes; + + let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone()); + let mut swarm2 = new_test_swarm::<_, ()>(handler_proto); + + let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); + let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::()).into(); + + Swarm::listen_on(&mut swarm1, addr1.clone().into()).unwrap(); + Swarm::listen_on(&mut swarm2, addr2.clone().into()).unwrap(); + + // Test execution state. Connection => Disconnecting => Connecting. + enum State { + Connecting, + Disconnecting, + } + + let swarm1_id = Swarm::local_peer_id(&swarm1).clone(); + + let mut banned = false; + let mut unbanned = false; + + let num_connections = 10; + + for _ in 0 .. num_connections { + Swarm::dial_addr(&mut swarm1, addr2.clone()).unwrap(); + } + let mut state = State::Connecting; + + executor::block_on(future::poll_fn(move |cx| { + loop { + let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx); + let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx); + match state { + State::Connecting => { + for s in &[&swarm1, &swarm2] { + if s.behaviour.inject_connection_established.len() > 0 { + assert_eq!(s.behaviour.inject_connected.len(), 1); + } else { + assert_eq!(s.behaviour.inject_connected.len(), 0); + } + assert!(s.behaviour.inject_connection_closed.len() == 0); + assert!(s.behaviour.inject_disconnected.len() == 0); + } + if [&swarm1, &swarm2].iter().all(|s| { + s.behaviour.inject_connection_established.len() == num_connections + }) { + if banned { + return Poll::Ready(()) + } + Swarm::ban_peer_id(&mut swarm2, swarm1_id.clone()); + swarm1.behaviour.reset(); + swarm2.behaviour.reset(); + banned = true; + state = State::Disconnecting; + } + } + State::Disconnecting => { + for s in &[&swarm1, &swarm2] { + if s.behaviour.inject_connection_closed.len() < num_connections { + assert_eq!(s.behaviour.inject_disconnected.len(), 0); + } else { + assert_eq!(s.behaviour.inject_disconnected.len(), 1); + } + assert_eq!(s.behaviour.inject_connection_established.len(), 0); + assert_eq!(s.behaviour.inject_connected.len(), 0); + } + if [&swarm1, &swarm2].iter().all(|s| { + s.behaviour.inject_connection_closed.len() == num_connections + }) { + if unbanned { + return Poll::Ready(()) + } + // Unban the first peer and reconnect. + Swarm::unban_peer_id(&mut swarm2, swarm1_id.clone()); + swarm1.behaviour.reset(); + swarm2.behaviour.reset(); + unbanned = true; + for _ in 0 .. num_connections { + Swarm::dial_addr(&mut swarm2, addr1.clone()).unwrap(); + } + state = State::Connecting; + } + } + } + + if poll1.is_pending() && poll2.is_pending() { + return Poll::Pending + } + } + })) + } } diff --git a/swarm/src/protocols_handler/dummy.rs b/swarm/src/protocols_handler/dummy.rs index 07047eb5..3cd9f7d3 100644 --- a/swarm/src/protocols_handler/dummy.rs +++ b/swarm/src/protocols_handler/dummy.rs @@ -31,12 +31,15 @@ use std::task::{Context, Poll}; use void::Void; /// Implementation of `ProtocolsHandler` that doesn't handle anything. +#[derive(Clone, Debug)] pub struct DummyProtocolsHandler { + pub keep_alive: KeepAlive, } impl Default for DummyProtocolsHandler { fn default() -> Self { DummyProtocolsHandler { + keep_alive: KeepAlive::No } } } @@ -49,19 +52,16 @@ impl ProtocolsHandler for DummyProtocolsHandler { type OutboundProtocol = DeniedUpgrade; type OutboundOpenInfo = Void; - #[inline] fn listen_protocol(&self) -> SubstreamProtocol { SubstreamProtocol::new(DeniedUpgrade) } - #[inline] fn inject_fully_negotiated_inbound( &mut self, _: >::Output ) { } - #[inline] fn inject_fully_negotiated_outbound( &mut self, _: >::Output, @@ -69,16 +69,14 @@ impl ProtocolsHandler for DummyProtocolsHandler { ) { } - #[inline] fn inject_event(&mut self, _: Self::InEvent) {} - #[inline] fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<>::Error>) {} - #[inline] - fn connection_keep_alive(&self) -> KeepAlive { KeepAlive::No } + fn connection_keep_alive(&self) -> KeepAlive { + self.keep_alive + } - #[inline] fn poll( &mut self, _: &mut Context<'_>, diff --git a/swarm/src/test.rs b/swarm/src/test.rs new file mode 100644 index 00000000..8249c358 --- /dev/null +++ b/swarm/src/test.rs @@ -0,0 +1,254 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::{ + NetworkBehaviour, + NetworkBehaviourAction, + ProtocolsHandler, + IntoProtocolsHandler, + PollParameters +}; +use libp2p_core::{ + ConnectedPoint, + PeerId, + connection::{ConnectionId, ListenerId}, + multiaddr::Multiaddr, +}; +use std::collections::HashMap; +use std::task::{Context, Poll}; + +/// A `MockBehaviour` is a `NetworkBehaviour` that allows for +/// the instrumentation of return values, without keeping +/// any further state. +pub struct MockBehaviour +where + THandler: ProtocolsHandler, +{ + /// The prototype protocols handler that is cloned for every + /// invocation of `new_handler`. + pub handler_proto: THandler, + /// The addresses to return from `addresses_of_peer`. + pub addresses: HashMap>, + /// The next action to return from `poll`. + /// + /// An action is only returned once. + pub next_action: Option>, +} + +impl MockBehaviour +where + THandler: ProtocolsHandler +{ + pub fn new(handler_proto: THandler) -> Self { + MockBehaviour { + handler_proto, + addresses: HashMap::new(), + next_action: None, + } + } +} + +impl NetworkBehaviour for MockBehaviour +where + THandler: ProtocolsHandler + Clone, + THandler::OutEvent: Clone, + TOutEvent: Send + 'static, +{ + type ProtocolsHandler = THandler; + type OutEvent = TOutEvent; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + self.handler_proto.clone() + } + + fn addresses_of_peer(&mut self, p: &PeerId) -> Vec { + self.addresses.get(p).map_or(Vec::new(), |v| v.clone()) + } + + fn inject_connected(&mut self, _: &PeerId) { + } + + fn inject_disconnected(&mut self, _: &PeerId) { + } + + fn inject_event(&mut self, _: PeerId, _: ConnectionId, _: THandler::OutEvent) { + } + + fn poll(&mut self, _: &mut Context, _: &mut impl PollParameters) -> + Poll> + { + self.next_action.take().map_or(Poll::Pending, Poll::Ready) + } +} + +/// A `CallTraceBehaviour` is a `NetworkBehaviour` that tracks +/// invocations of callback methods and their arguments, wrapping +/// around an inner behaviour. +pub struct CallTraceBehaviour +where + TInner: NetworkBehaviour, +{ + inner: TInner, + + pub addresses_of_peer: Vec, + pub inject_connected: Vec, + pub inject_disconnected: Vec, + pub inject_connection_established: Vec<(PeerId, ConnectionId, ConnectedPoint)>, + pub inject_connection_closed: Vec<(PeerId, ConnectionId, ConnectedPoint)>, + pub inject_event: Vec<(PeerId, ConnectionId, <::Handler as ProtocolsHandler>::OutEvent)>, + pub inject_addr_reach_failure: Vec<(Option, Multiaddr)>, + pub inject_dial_failure: Vec, + pub inject_new_listen_addr: Vec, + pub inject_new_external_addr: Vec, + pub inject_expired_listen_addr: Vec, + pub inject_listener_error: Vec, + pub inject_listener_closed: Vec<(ListenerId, bool)>, + pub poll: usize, +} + +impl CallTraceBehaviour +where + TInner: NetworkBehaviour +{ + pub fn new(inner: TInner) -> Self { + Self { + inner, + addresses_of_peer: Vec::new(), + inject_connected: Vec::new(), + inject_disconnected: Vec::new(), + inject_connection_established: Vec::new(), + inject_connection_closed: Vec::new(), + inject_event: Vec::new(), + inject_addr_reach_failure: Vec::new(), + inject_dial_failure: Vec::new(), + inject_new_listen_addr: Vec::new(), + inject_new_external_addr: Vec::new(), + inject_expired_listen_addr: Vec::new(), + inject_listener_error: Vec::new(), + inject_listener_closed: Vec::new(), + poll: 0, + } + } + + pub fn reset(&mut self) { + self.addresses_of_peer = Vec::new(); + self.inject_connected = Vec::new(); + self.inject_disconnected = Vec::new(); + self.inject_connection_established = Vec::new(); + self.inject_connection_closed = Vec::new(); + self.inject_event = Vec::new(); + self.inject_addr_reach_failure = Vec::new(); + self.inject_dial_failure = Vec::new(); + self.inject_new_listen_addr = Vec::new(); + self.inject_new_external_addr = Vec::new(); + self.inject_expired_listen_addr = Vec::new(); + self.inject_listener_error = Vec::new(); + self.inject_listener_closed = Vec::new(); + self.poll = 0; + } +} + +impl NetworkBehaviour for CallTraceBehaviour +where + TInner: NetworkBehaviour, + <::Handler as ProtocolsHandler>::OutEvent: Clone, +{ + type ProtocolsHandler = TInner::ProtocolsHandler; + type OutEvent = TInner::OutEvent; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + self.inner.new_handler() + } + + fn addresses_of_peer(&mut self, p: &PeerId) -> Vec { + self.addresses_of_peer.push(p.clone()); + self.inner.addresses_of_peer(p) + } + + fn inject_connected(&mut self, peer: &PeerId) { + self.inject_connected.push(peer.clone()); + self.inner.inject_connected(peer); + } + + fn inject_connection_established(&mut self, p: &PeerId, c: &ConnectionId, e: &ConnectedPoint) { + self.inject_connection_established.push((p.clone(), c.clone(), e.clone())); + self.inner.inject_connection_established(p, c, e); + } + + fn inject_disconnected(&mut self, peer: &PeerId) { + self.inject_disconnected.push(peer.clone()); + self.inner.inject_disconnected(peer); + } + + fn inject_connection_closed(&mut self, p: &PeerId, c: &ConnectionId, e: &ConnectedPoint) { + self.inject_connection_closed.push((p.clone(), c.clone(), e.clone())); + self.inner.inject_connection_closed(p, c, e); + } + + fn inject_event(&mut self, p: PeerId, c: ConnectionId, e: <::Handler as ProtocolsHandler>::OutEvent) { + self.inject_event.push((p.clone(), c.clone(), e.clone())); + self.inner.inject_event(p, c, e); + } + + fn inject_addr_reach_failure(&mut self, p: Option<&PeerId>, a: &Multiaddr, e: &dyn std::error::Error) { + self.inject_addr_reach_failure.push((p.cloned(), a.clone())); + self.inner.inject_addr_reach_failure(p, a, e); + } + + fn inject_dial_failure(&mut self, p: &PeerId) { + self.inject_dial_failure.push(p.clone()); + self.inner.inject_dial_failure(p); + } + + fn inject_new_listen_addr(&mut self, a: &Multiaddr) { + self.inject_new_listen_addr.push(a.clone()); + self.inner.inject_new_listen_addr(a); + } + + fn inject_expired_listen_addr(&mut self, a: &Multiaddr) { + self.inject_expired_listen_addr.push(a.clone()); + self.inner.inject_expired_listen_addr(a); + } + + fn inject_new_external_addr(&mut self, a: &Multiaddr) { + self.inject_new_external_addr.push(a.clone()); + self.inner.inject_new_external_addr(a); + } + + fn inject_listener_error(&mut self, l: ListenerId, e: &(dyn std::error::Error + 'static)) { + self.inject_listener_error.push(l.clone()); + self.inner.inject_listener_error(l, e); + } + + fn inject_listener_closed(&mut self, l: ListenerId, r: Result<(), &std::io::Error>) { + self.inject_listener_closed.push((l, r.is_ok())); + self.inner.inject_listener_closed(l, r); + } + + fn poll(&mut self, cx: &mut Context, args: &mut impl PollParameters) -> + Poll::Handler as ProtocolsHandler>::InEvent, + Self::OutEvent + >> + { + self.poll += 1; + self.inner.poll(cx, args) + } +} diff --git a/transports/dns/CHANGELOG.md b/transports/dns/CHANGELOG.md index 72ff0bd5..6df94406 100644 --- a/transports/dns/CHANGELOG.md +++ b/transports/dns/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.21.0 [unreleased] + +- Bump `libp2p-core` dependency. + # 0.20.0 [2020-07-01] - Dependency and documentation updates. diff --git a/transports/dns/Cargo.toml b/transports/dns/Cargo.toml index 49c3f732..5d4c38a3 100644 --- a/transports/dns/Cargo.toml +++ b/transports/dns/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-dns" edition = "2018" description = "DNS transport implementation for libp2p" -version = "0.20.0" +version = "0.21.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -10,6 +10,6 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -libp2p-core = { version = "0.20.0", path = "../../core" } +libp2p-core = { version = "0.21.0", path = "../../core" } log = "0.4.1" futures = "0.3.1" diff --git a/transports/tcp/CHANGELOG.md b/transports/tcp/CHANGELOG.md index d9b606ea..7d1fdaf1 100644 --- a/transports/tcp/CHANGELOG.md +++ b/transports/tcp/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.21.0 [unreleased] + +- Bump `libp2p-core` dependency. + # 0.20.0 [2020-07-01] - Updated dependencies. diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index e5328209..6ddceb75 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-tcp" edition = "2018" description = "TCP/IP transport protocol for libp2p" -version = "0.20.0" +version = "0.21.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -15,7 +15,7 @@ futures = "0.3.1" futures-timer = "3.0" get_if_addrs = "0.5.3" ipnet = "2.0.0" -libp2p-core = { version = "0.20.0", path = "../../core" } +libp2p-core = { version = "0.21.0", path = "../../core" } log = "0.4.1" socket2 = "0.3.12" tokio = { version = "0.2", default-features = false, features = ["tcp"], optional = true } diff --git a/transports/uds/CHANGELOG.md b/transports/uds/CHANGELOG.md index 8f9fce13..d029bf22 100644 --- a/transports/uds/CHANGELOG.md +++ b/transports/uds/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.21.0 [unreleased] + +- Update `libp2p-core` dependency. + # 0.20.0 [2020-07-01] - Updated dependencies. diff --git a/transports/uds/Cargo.toml b/transports/uds/Cargo.toml index 9ffe5434..d61c7392 100644 --- a/transports/uds/Cargo.toml +++ b/transports/uds/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-uds" edition = "2018" description = "Unix domain sockets transport for libp2p" -version = "0.20.0" +version = "0.21.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"] [target.'cfg(all(unix, not(target_os = "emscripten")))'.dependencies] async-std = { version = "1.6.2", optional = true } -libp2p-core = { version = "0.20.0", path = "../../core" } +libp2p-core = { version = "0.21.0", path = "../../core" } log = "0.4.1" futures = "0.3.1" tokio = { version = "0.2", default-features = false, features = ["uds"], optional = true } diff --git a/transports/wasm-ext/CHANGELOG.md b/transports/wasm-ext/CHANGELOG.md index 1c87dc21..3e3b56a4 100644 --- a/transports/wasm-ext/CHANGELOG.md +++ b/transports/wasm-ext/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.21.0 [unreleased] + +- Update `libp2p-core` dependency. + # 0.20.1 [2020-07-06] - Improve the code quality of the `websockets.js` binding with the browser's `WebSocket` API. diff --git a/transports/wasm-ext/Cargo.toml b/transports/wasm-ext/Cargo.toml index 324fee2a..22c18dd7 100644 --- a/transports/wasm-ext/Cargo.toml +++ b/transports/wasm-ext/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "libp2p-wasm-ext" -version = "0.20.1" +version = "0.21.0" authors = ["Pierre Krieger "] edition = "2018" description = "Allows passing in an external transport in a WASM environment" @@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" js-sys = "0.3.19" -libp2p-core = { version = "0.20.0", path = "../../core" } +libp2p-core = { version = "0.21.0", path = "../../core" } parity-send-wrapper = "0.1.0" wasm-bindgen = "0.2.42" wasm-bindgen-futures = "0.4.4" diff --git a/transports/websocket/CHANGELOG.md b/transports/websocket/CHANGELOG.md index 7c28dd5e..0ff31f62 100644 --- a/transports/websocket/CHANGELOG.md +++ b/transports/websocket/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.22.0 [unreleased] + +- Bump `libp2p-core` dependency. + # 0.21.1 [2020-07-09] - Update `async-tls` and `rustls` dependency. diff --git a/transports/websocket/Cargo.toml b/transports/websocket/Cargo.toml index 017e263b..e937631e 100644 --- a/transports/websocket/Cargo.toml +++ b/transports/websocket/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-websocket" edition = "2018" description = "WebSocket transport for libp2p" -version = "0.21.1" +version = "0.22.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"] async-tls = "0.8.0" either = "1.5.3" futures = "0.3.1" -libp2p-core = { version = "0.20.0", path = "../../core" } +libp2p-core = { version = "0.21.0", path = "../../core" } log = "0.4.8" quicksink = "0.1" rustls = "0.18.0"