Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Update libp2p #3039

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
401 changes: 255 additions & 146 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 1 addition & 3 deletions beacon_node/lighthouse_network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ prometheus-client = "0.15.0"
unused_port = { path = "../../common/unused_port" }

[dependencies.libp2p]
git = "https://github.com/sigp/rust-libp2p"
# branch libp2p-gossipsub-interval-hotfix
rev = "e213703e616eaba3c482d7714775e0d37c4ae8e5"
version = "0.43.0"
default-features = false
features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns-tokio", "tcp-tokio", "plaintext", "secp256k1"]

Expand Down
2 changes: 1 addition & 1 deletion beacon_node/lighthouse_network/src/behaviour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
cx: &mut Context,
_: &mut impl PollParameters,
) -> Poll<
NBAction<BehaviourEvent<TSpec>, <Behaviour<TSpec> as NetworkBehaviour>::ProtocolsHandler>,
NBAction<BehaviourEvent<TSpec>, <Behaviour<TSpec> as NetworkBehaviour>::ConnectionHandler>,
> {
if let Some(waker) = &self.waker {
if waker.will_wake(cx.waker()) {
Expand Down
14 changes: 7 additions & 7 deletions beacon_node/lighthouse_network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use futures::stream::FuturesUnordered;
pub use libp2p::{
core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId},
swarm::{
protocols_handler::ProtocolsHandler, DialError, NetworkBehaviour,
handler::ConnectionHandler, DialError, NetworkBehaviour,
NetworkBehaviourAction as NBAction, NotifyHandler, PollParameters, SubstreamProtocol,
},
};
Expand Down Expand Up @@ -908,11 +908,11 @@ impl<TSpec: EthSpec> Discovery<TSpec> {

impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
// Discovery is not a real NetworkBehaviour...
type ProtocolsHandler = libp2p::swarm::protocols_handler::DummyProtocolsHandler;
type ConnectionHandler = libp2p::swarm::handler::DummyConnectionHandler;
type OutEvent = DiscoveryEvent;

fn new_handler(&mut self) -> Self::ProtocolsHandler {
libp2p::swarm::protocols_handler::DummyProtocolsHandler::default()
fn new_handler(&mut self) -> Self::ConnectionHandler {
libp2p::swarm::handler::DummyConnectionHandler::default()
}

// Handles the libp2p request to obtain multiaddrs for peer_id's in order to dial them.
Expand All @@ -932,14 +932,14 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
&mut self,
_: PeerId,
_: ConnectionId,
_: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
_: <Self::ConnectionHandler as ConnectionHandler>::OutEvent,
) {
}

fn inject_dial_failure(
&mut self,
peer_id: Option<PeerId>,
_handler: Self::ProtocolsHandler,
_handler: Self::ConnectionHandler,
error: &DialError,
) {
if let Some(peer_id) = peer_id {
Expand Down Expand Up @@ -967,7 +967,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
&mut self,
cx: &mut Context,
_: &mut impl PollParameters,
) -> Poll<NBAction<Self::OutEvent, Self::ProtocolsHandler>> {
) -> Poll<NBAction<Self::OutEvent, Self::ConnectionHandler>> {
if !self.started {
return Poll::Pending;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use std::task::{Context, Poll};
use futures::StreamExt;
use libp2p::core::connection::ConnectionId;
use libp2p::core::ConnectedPoint;
use libp2p::swarm::protocols_handler::DummyProtocolsHandler;
use libp2p::swarm::handler::DummyConnectionHandler;
use libp2p::swarm::{
DialError, NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler,
ConnectionHandler, DialError, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
};
use libp2p::{Multiaddr, PeerId};
use slog::{debug, error};
Expand All @@ -19,21 +19,21 @@ use super::peerdb::BanResult;
use super::{PeerManager, PeerManagerEvent, ReportSource};

impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
type ProtocolsHandler = DummyProtocolsHandler;
type ConnectionHandler = DummyConnectionHandler;

type OutEvent = PeerManagerEvent;

/* Required trait members */

fn new_handler(&mut self) -> Self::ProtocolsHandler {
DummyProtocolsHandler::default()
fn new_handler(&mut self) -> Self::ConnectionHandler {
DummyConnectionHandler::default()
}

fn inject_event(
&mut self,
_: PeerId,
_: ConnectionId,
_: <DummyProtocolsHandler as ProtocolsHandler>::OutEvent,
_: <DummyConnectionHandler as ConnectionHandler>::OutEvent,
) {
unreachable!("Dummy handler does not emit events")
}
Expand All @@ -42,7 +42,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
&mut self,
cx: &mut Context<'_>,
_params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
// perform the heartbeat when necessary
while self.heartbeat.poll_tick(cx).is_ready() {
self.heartbeat();
Expand Down Expand Up @@ -178,7 +178,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
peer_id: &PeerId,
_: &ConnectionId,
_: &ConnectedPoint,
_: DummyProtocolsHandler,
_: DummyConnectionHandler,
remaining_established: usize,
) {
if remaining_established > 0 {
Expand Down Expand Up @@ -243,7 +243,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
fn inject_dial_failure(
&mut self,
peer_id: Option<PeerId>,
_handler: DummyProtocolsHandler,
_handler: DummyConnectionHandler,
_error: &DialError,
) {
if let Some(peer_id) = peer_id {
Expand Down
50 changes: 26 additions & 24 deletions beacon_node/lighthouse_network/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ use futures::{Sink, SinkExt};
use libp2p::core::upgrade::{
InboundUpgrade, NegotiationError, OutboundUpgrade, ProtocolError, UpgradeError,
};
use libp2p::swarm::protocols_handler::{
KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
use libp2p::swarm::handler::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
SubstreamProtocol,
};
use libp2p::swarm::NegotiatedSubstream;
use slog::{crit, debug, trace, warn};
Expand Down Expand Up @@ -76,7 +77,7 @@ pub enum HandlerErr {
},
}

/// Implementation of `ProtocolsHandler` for the RPC protocol.
/// Implementation of `ConnectionHandler` for the RPC protocol.
pub struct RPCHandler<TSpec>
where
TSpec: EthSpec,
Expand Down Expand Up @@ -309,7 +310,7 @@ where
}
}

impl<TSpec> ProtocolsHandler for RPCHandler<TSpec>
impl<TSpec> ConnectionHandler for RPCHandler<TSpec>
where
TSpec: EthSpec,
{
Expand Down Expand Up @@ -442,12 +443,13 @@ where
fn inject_dial_upgrade_error(
&mut self,
request_info: Self::OutboundOpenInfo,
error: ProtocolsHandlerUpgrErr<
error: ConnectionHandlerUpgrErr<
<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error,
>,
) {
let (id, req) = request_info;
if let ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(RPCError::IoError(_))) = error {
if let ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(RPCError::IoError(_))) = error
{
self.outbound_io_error_retries += 1;
if self.outbound_io_error_retries < IO_ERROR_RETRIES {
self.send_request(id, req);
Expand All @@ -461,13 +463,13 @@ where
self.outbound_io_error_retries = 0;
// map the error
let error = match error {
ProtocolsHandlerUpgrErr::Timer => RPCError::InternalError("Timer failed"),
ProtocolsHandlerUpgrErr::Timeout => RPCError::NegotiationTimeout,
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => e,
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
ConnectionHandlerUpgrErr::Timer => RPCError::InternalError("Timer failed"),
ConnectionHandlerUpgrErr::Timeout => RPCError::NegotiationTimeout,
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => e,
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
RPCError::UnsupportedProtocol
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::ProtocolError(e),
)) => match e {
ProtocolError::IoError(io_err) => RPCError::IoError(io_err.to_string()),
Expand Down Expand Up @@ -517,7 +519,7 @@ where
&mut self,
cx: &mut Context<'_>,
) -> Poll<
ProtocolsHandlerEvent<
ConnectionHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::OutEvent,
Expand All @@ -533,7 +535,7 @@ where
}
// return any events that need to be reported
if !self.events_out.is_empty() {
return Poll::Ready(ProtocolsHandlerEvent::Custom(self.events_out.remove(0)));
return Poll::Ready(ConnectionHandlerEvent::Custom(self.events_out.remove(0)));
} else {
self.events_out.shrink_to_fit();
}
Expand All @@ -543,7 +545,7 @@ where
if delay.is_elapsed() {
self.state = HandlerState::Deactivated;
debug!(self.log, "Handler deactivated");
return Poll::Ready(ProtocolsHandlerEvent::Close(RPCError::InternalError(
return Poll::Ready(ConnectionHandlerEvent::Close(RPCError::InternalError(
"Shutdown timeout",
)));
}
Expand Down Expand Up @@ -575,7 +577,7 @@ where
Poll::Ready(Some(Err(e))) => {
warn!(self.log, "Inbound substream poll failed"; "error" => ?e);
// drops the peer if we cannot read the delay queue
return Poll::Ready(ProtocolsHandlerEvent::Close(RPCError::InternalError(
return Poll::Ready(ConnectionHandlerEvent::Close(RPCError::InternalError(
"Could not poll inbound stream timer",
)));
}
Expand All @@ -596,14 +598,14 @@ where
error: RPCError::StreamTimeout,
};
// notify the user
return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(outbound_err)));
return Poll::Ready(ConnectionHandlerEvent::Custom(Err(outbound_err)));
} else {
crit!(self.log, "timed out substream not in the books"; "stream_id" => outbound_id.get_ref());
}
}
Poll::Ready(Some(Err(e))) => {
warn!(self.log, "Outbound substream poll failed"; "error" => ?e);
return Poll::Ready(ProtocolsHandlerEvent::Close(RPCError::InternalError(
return Poll::Ready(ConnectionHandlerEvent::Close(RPCError::InternalError(
"Could not poll outbound stream timer",
)));
}
Expand Down Expand Up @@ -856,7 +858,7 @@ where
}),
};

return Poll::Ready(ProtocolsHandlerEvent::Custom(received));
return Poll::Ready(ConnectionHandlerEvent::Custom(received));
}
Poll::Ready(None) => {
// stream closed
Expand All @@ -871,7 +873,7 @@ where
// notify the application error
if request.expected_responses() > 1 {
// return an end of stream result
return Poll::Ready(ProtocolsHandlerEvent::Custom(Ok(
return Poll::Ready(ConnectionHandlerEvent::Custom(Ok(
RPCReceived::EndOfStream(request_id, request.stream_termination()),
)));
}
Expand All @@ -882,7 +884,7 @@ where
proto: request.protocol(),
error: RPCError::IncompleteStream,
};
return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(outbound_err)));
return Poll::Ready(ConnectionHandlerEvent::Custom(Err(outbound_err)));
}
Poll::Pending => {
entry.get_mut().state =
Expand All @@ -898,7 +900,7 @@ where
error: e,
};
entry.remove_entry();
return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(outbound_err)));
return Poll::Ready(ConnectionHandlerEvent::Custom(Err(outbound_err)));
}
},
OutboundSubstreamState::Closing(mut substream) => {
Expand All @@ -924,7 +926,7 @@ where
};

if let Some(termination) = termination {
return Poll::Ready(ProtocolsHandlerEvent::Custom(Ok(
return Poll::Ready(ConnectionHandlerEvent::Custom(Ok(
RPCReceived::EndOfStream(request_id, termination),
)));
}
Expand All @@ -946,7 +948,7 @@ where
self.dial_negotiated += 1;
let (id, req) = self.dial_queue.remove(0);
self.dial_queue.shrink_to_fit();
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
OutboundRequestContainer {
req: req.clone(),
Expand All @@ -967,7 +969,7 @@ where
&& self.events_out.is_empty()
&& self.dial_negotiated == 0
{
return Poll::Ready(ProtocolsHandlerEvent::Close(RPCError::Disconnected));
return Poll::Ready(ConnectionHandlerEvent::Close(RPCError::Disconnected));
}
}

Expand Down
12 changes: 6 additions & 6 deletions beacon_node/lighthouse_network/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use futures::future::FutureExt;
use handler::RPCHandler;
use libp2p::core::{connection::ConnectionId, ConnectedPoint};
use libp2p::swarm::{
protocols_handler::ProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler,
handler::ConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler,
PollParameters, SubstreamProtocol,
};
use libp2p::{Multiaddr, PeerId};
Expand Down Expand Up @@ -92,7 +92,7 @@ pub struct RPCMessage<TSpec: EthSpec> {
/// Handler managing this message.
pub conn_id: ConnectionId,
/// The message that was sent.
pub event: <RPCHandler<TSpec> as ProtocolsHandler>::OutEvent,
pub event: <RPCHandler<TSpec> as ConnectionHandler>::OutEvent,
}

/// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level
Expand Down Expand Up @@ -178,10 +178,10 @@ impl<TSpec> NetworkBehaviour for RPC<TSpec>
where
TSpec: EthSpec,
{
type ProtocolsHandler = RPCHandler<TSpec>;
type ConnectionHandler = RPCHandler<TSpec>;
type OutEvent = RPCMessage<TSpec>;

fn new_handler(&mut self) -> Self::ProtocolsHandler {
fn new_handler(&mut self) -> Self::ConnectionHandler {
RPCHandler::new(
SubstreamProtocol::new(
RPCProtocol {
Expand Down Expand Up @@ -227,7 +227,7 @@ where
&mut self,
peer_id: PeerId,
conn_id: ConnectionId,
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
event: <Self::ConnectionHandler as ConnectionHandler>::OutEvent,
) {
if let Ok(RPCReceived::Request(ref id, ref req)) = event {
// check if the request is conformant to the quota
Expand Down Expand Up @@ -289,7 +289,7 @@ where
&mut self,
cx: &mut Context,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
// let the rate limiter prune
let _ = self.limiter.poll_unpin(cx);
if !self.events.is_empty() {
Expand Down
5 changes: 2 additions & 3 deletions beacon_node/lighthouse_network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@ use crate::EnrExt;
use crate::{NetworkConfig, NetworkGlobals, PeerAction, ReportSource};
use futures::prelude::*;
use libp2p::core::{
connection::ConnectionLimits, identity::Keypair, multiaddr::Multiaddr, muxing::StreamMuxerBox,
transport::Boxed,
identity::Keypair, multiaddr::Multiaddr, muxing::StreamMuxerBox, transport::Boxed,
};
use libp2p::{
bandwidth::{BandwidthLogging, BandwidthSinks},
core, noise,
swarm::{SwarmBuilder, SwarmEvent},
swarm::{ConnectionLimits, SwarmBuilder, SwarmEvent},
PeerId, Swarm, Transport,
};
use prometheus_client::registry::Registry;
Expand Down
Loading