Skip to content

Commit

Permalink
[core/swarm] Emit events for active connection close and fix `disconn…
Browse files Browse the repository at this point in the history
…ect()`. (#1619)

* Emit events for active connection close and fix `disconnect()`.

The `Network` does currently not emit events for actively
closed connections, e.g. via `EstablishedConnection::close`
or `ConnectedPeer::disconnect()`. As a result, when actively
closing connections, there will be `ConnectionEstablished`
events emitted without eventually a matching `ConnectionClosed`
event. This seems undesirable and has the consequence that
the `Swarm::ban_peer_id` feature in `libp2p-swarm` does not
result in appropriate calls to `NetworkBehaviour::inject_connection_closed`
and `NetworkBehaviour::inject_disconnected`. Furthermore,
the `disconnect()` functionality in `libp2p-core` is currently
broken as it leaves the `Pool` in an inconsistent state.

This commit does the following:

  1. When connection background tasks are dropped
     (i.e. removed from the `Manager`), they
     always terminate immediately, without attempting
     an orderly close of the connection.
  2. An orderly close is sent to the background task
     of a connection as a regular command. The
     background task emits a `Closed` event
     before terminating.
  3. `Pool::disconnect()` removes all connection
     tasks for the affected peer from the `Manager`,
     i.e. without an orderly close, thereby also
     fixing the discovered state inconsistency
     due to not removing the corresponding entries
     in the `Pool` itself after removing them from
     the `Manager`.
  4. A new test is added to `libp2p-swarm` that
     exercises the ban/unban functionality and
     places assertions on the number and order
     of calls to the `NetworkBehaviour`. In that
     context some new testing utilities have
     been added to `libp2p-swarm`.

This addresses libp2p/rust-libp2p#1584.

* Update swarm/src/lib.rs

Co-authored-by: Toralf Wittner <[email protected]>

* Incorporate some review feedback.

* Adapt to changes in master.

* More verbose panic messages.

* Simplify

There is no need for a `StartClose` future.

* Fix doc links.

* Further small cleanup.

* Update CHANGELOGs and versions.

Co-authored-by: Toralf Wittner <[email protected]>
  • Loading branch information
romanb and twittner authored Aug 4, 2020
1 parent 9444ac7 commit 474d2c0
Show file tree
Hide file tree
Showing 51 changed files with 884 additions and 306 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
- [`parity-multiaddr` CHANGELOG](misc/multiaddr/CHANGELOG.md)
- [`libp2p-core-derive` CHANGELOG](misc/core-derive/CHANGELOG.md)

# Version 0.24.0 [unreleased]

- Update `libp2p-core`, `libp2p-swarm` and dependent crates.

# Version 0.23.0 (2020-08-03)

**NOTE**: For a smooth upgrade path from `0.21` to `> 0.22`
Expand Down
42 changes: 21 additions & 21 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,23 @@ atomic = "0.4.6"
bytes = "0.5"
futures = "0.3.1"
lazy_static = "1.2"
libp2p-core = { version = "0.20.0", path = "core" }
libp2p-core-derive = { version = "0.20.0", path = "misc/core-derive" }
libp2p-floodsub = { version = "0.20.0", path = "protocols/floodsub", optional = true }
libp2p-gossipsub = { version = "0.20.0", path = "./protocols/gossipsub", optional = true }
libp2p-identify = { version = "0.20.0", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.21.0", path = "protocols/kad", optional = true }
libp2p-mplex = { version = "0.20.0", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.22.0", path = "protocols/noise", optional = true }
libp2p-ping = { version = "0.20.0", path = "protocols/ping", optional = true }
libp2p-plaintext = { version = "0.20.0", path = "protocols/plaintext", optional = true }
libp2p-core = { version = "0.21.0", path = "core" }
libp2p-core-derive = { version = "0.20.2", path = "misc/core-derive" }
libp2p-floodsub = { version = "0.21.0", path = "protocols/floodsub", optional = true }
libp2p-gossipsub = { version = "0.21.0", path = "./protocols/gossipsub", optional = true }
libp2p-identify = { version = "0.21.0", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.22.0", path = "protocols/kad", optional = true }
libp2p-mplex = { version = "0.21.0", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.23.0", path = "protocols/noise", optional = true }
libp2p-ping = { version = "0.21.0", path = "protocols/ping", optional = true }
libp2p-plaintext = { version = "0.21.0", path = "protocols/plaintext", optional = true }
libp2p-pnet = { version = "0.19.1", path = "protocols/pnet", optional = true }
libp2p-request-response = { version = "0.1.0", path = "protocols/request-response", optional = true }
libp2p-secio = { version = "0.20.0", path = "protocols/secio", default-features = false, optional = true }
libp2p-swarm = { version = "0.20.0", path = "swarm" }
libp2p-uds = { version = "0.20.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.20.0", path = "transports/wasm-ext", optional = true }
libp2p-yamux = { version = "0.20.0", path = "muxers/yamux", optional = true }
libp2p-request-response = { version = "0.2.0", path = "protocols/request-response", optional = true }
libp2p-secio = { version = "0.21.0", path = "protocols/secio", default-features = false, optional = true }
libp2p-swarm = { version = "0.21.0", path = "swarm" }
libp2p-uds = { version = "0.21.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.21.0", path = "transports/wasm-ext", optional = true }
libp2p-yamux = { version = "0.21.0", path = "muxers/yamux", optional = true }
multiaddr = { package = "parity-multiaddr", version = "0.9.1", path = "misc/multiaddr" }
multihash = "0.11.0"
parking_lot = "0.10.0"
Expand All @@ -87,11 +87,11 @@ smallvec = "1.0"
wasm-timer = "0.2.4"

[target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies]
libp2p-deflate = { version = "0.20.0", path = "protocols/deflate", optional = true }
libp2p-dns = { version = "0.20.0", path = "transports/dns", optional = true }
libp2p-mdns = { version = "0.20.0", path = "protocols/mdns", optional = true }
libp2p-tcp = { version = "0.20.0", path = "transports/tcp", optional = true }
libp2p-websocket = { version = "0.21.0", path = "transports/websocket", optional = true }
libp2p-deflate = { version = "0.21.0", path = "protocols/deflate", optional = true }
libp2p-dns = { version = "0.21.0", path = "transports/dns", optional = true }
libp2p-mdns = { version = "0.21.0", path = "protocols/mdns", optional = true }
libp2p-tcp = { version = "0.21.0", path = "transports/tcp", optional = true }
libp2p-websocket = { version = "0.22.0", path = "transports/websocket", optional = true }

[dev-dependencies]
async-std = "1.6.2"
Expand Down
13 changes: 13 additions & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
# 0.21.0 [unreleased]

- Refactoring of connection close and disconnect behaviour. In particular, the former
`NetworkEvent::ConnectionError` is now `NetworkEvent::ConnectionClosed` with the `error`
field being an `Option` and `None` indicating an active (but not necessarily orderly) close.
This guarantees that `ConnectionEstablished` events are always eventually paired
with `ConnectionClosed` events, regardless of how connections are closed.
Correspondingly, `EstablishedConnection::close` is now `EstablishedConnection::start_close`
to reflect that an orderly close completes asynchronously in the background, with the
outcome observed by continued polling of the `Network`. In contrast, `disconnect`ing
a peer takes effect immediately without an orderly connection shutdown.
See [PR 1619](https://github.com/libp2p/rust-libp2p/pull/1619) for further details.

# 0.20.1 [2020-17-17]

- Update ed25519-dalek dependency.
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "libp2p-core"
edition = "2018"
description = "Core traits and structs of libp2p"
version = "0.20.1"
version = "0.21.0"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
62 changes: 41 additions & 21 deletions core/src/connection/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,18 +196,19 @@ pub enum Event<'a, I, O, H, TE, HE, C> {
handler: H
},

/// An established connection has encountered an error.
ConnectionError {
/// An established connection has been closed.
ConnectionClosed {
/// The connection ID.
///
/// As a result of the error, the connection has been removed
/// from the `Manager` and is being closed. Hence this ID will
/// no longer resolve to a valid entry in the manager.
/// > **Note**: Closed connections are removed from the `Manager`.
/// > Hence this ID will no longer resolve to a valid entry in
/// > the manager.
id: ConnectionId,
/// Information about the connection that encountered the error.
/// Information about the closed connection.
connected: Connected<C>,
/// The error that occurred.
error: ConnectionError<HE>,
/// The error that occurred, if any. If `None`, the connection
/// has been actively closed.
error: Option<ConnectionError<HE>>,
},

/// A connection has been established.
Expand Down Expand Up @@ -348,11 +349,11 @@ impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
/// Polls the manager for events relating to the managed connections.
pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Event<'a, I, O, H, TE, HE, C>> {
// Advance the content of `local_spawns`.
while let Poll::Ready(Some(_)) = Stream::poll_next(Pin::new(&mut self.local_spawns), cx) {}
while let Poll::Ready(Some(_)) = self.local_spawns.poll_next_unpin(cx) {}

// Poll for the first event for which the manager still has a registered task, if any.
let event = loop {
match Stream::poll_next(Pin::new(&mut self.events_rx), cx) {
match self.events_rx.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => {
if self.tasks.contains_key(event.id()) { // (1)
break event
Expand Down Expand Up @@ -397,19 +398,18 @@ impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
old_endpoint: old,
new_endpoint: new,
}
},
task::Event::Error { id, error } => {
}
task::Event::Closed { id, error } => {
let id = ConnectionId(id);
let task = task.remove();
match task.state {
TaskState::Established(connected) =>
Event::ConnectionError { id, connected, error },
Event::ConnectionClosed { id, connected, error },
TaskState::Pending => unreachable!(
"`Event::Error` implies (2) occurred on that task and thus (3)."
"`Event::Closed` implies (2) occurred on that task and thus (3)."
),
}
}

})
} else {
unreachable!("By (1)")
Expand Down Expand Up @@ -455,10 +455,11 @@ impl<'a, I, C> EstablishedEntry<'a, I, C> {
/// > task _may not be notified_ if sending the event fails due to
/// > the connection handler not being ready at this time.
pub fn notify_handler(&mut self, event: I) -> Result<(), I> {
let cmd = task::Command::NotifyHandler(event);
let cmd = task::Command::NotifyHandler(event); // (*)
self.task.get_mut().sender.try_send(cmd)
.map_err(|e| match e.into_inner() {
task::Command::NotifyHandler(event) => event
task::Command::NotifyHandler(event) => event,
_ => panic!("Unexpected command. Expected `NotifyHandler`") // see (*)
})
}

Expand All @@ -472,6 +473,22 @@ impl<'a, I, C> EstablishedEntry<'a, I, C> {
self.task.get_mut().sender.poll_ready(cx).map_err(|_| ())
}

/// Sends a close command to the associated background task,
/// thus initiating a graceful active close of the connection.
///
/// Has no effect if the connection is already closing.
///
/// When the connection is ultimately closed, [`Event::ConnectionClosed`]
/// is emitted by [`Manager::poll`].
pub fn start_close(mut self) {
// Clone the sender so that we are guaranteed to have
// capacity for the close command (every sender gets a slot).
match self.task.get_mut().sender.clone().try_send(task::Command::Close) {
Ok(()) => {},
Err(e) => assert!(e.is_disconnected(), "No capacity for close command.")
}
}

/// Obtains information about the established connection.
pub fn connected(&self) -> &Connected<C> {
match &self.task.get().state {
Expand All @@ -480,16 +497,18 @@ impl<'a, I, C> EstablishedEntry<'a, I, C> {
}
}

/// Closes the connection represented by this entry,
/// returning the connection information.
pub fn close(self) -> Connected<C> {
/// Instantly removes the entry from the manager, dropping
/// the command channel to the background task of the connection,
/// which will thus drop the connection asap without an orderly
/// close or emitting another event.
pub fn remove(self) -> Connected<C> {
match self.task.remove().state {
TaskState::Established(c) => c,
TaskState::Pending => unreachable!("By Entry::new()")
}
}

/// Returns the connection id.
/// Returns the connection ID.
pub fn id(&self) -> ConnectionId {
ConnectionId(*self.task.key())
}
Expand All @@ -513,3 +532,4 @@ impl<'a, I, C> PendingEntry<'a, I, C> {
self.task.remove();
}
}

Loading

0 comments on commit 474d2c0

Please sign in to comment.