Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rpc/admin): missing enode/enr in admin_peers endpoint #9043

Merged
merged 10 commits into from
Jul 1, 2024
6 changes: 6 additions & 0 deletions crates/net/network-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ pub struct PeerInfo {
pub remote_id: PeerId,
/// The client's name and version
pub client_version: Arc<str>,
/// The peer's enode
pub enode: String,
/// The peer's enr
pub enr: Option<String>,
/// The peer's address we're connected to
pub remote_addr: SocketAddr,
/// The local address of the connection
Expand All @@ -207,6 +211,8 @@ pub struct PeerInfo {
pub status: Arc<Status>,
/// The timestamp when the session to that peer has been established.
pub session_established: Instant,
/// The peer's connection kind
pub kind: PeerKind,
}

/// The direction of the connection.
Expand Down
44 changes: 38 additions & 6 deletions crates/net/network/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use reth_eth_wire::{
DisconnectReason, EthVersion, Status,
};
use reth_metrics::common::mpsc::UnboundedMeteredSender;
use reth_network_api::{EthProtocolInfo, NetworkStatus, ReputationChangeKind};
use reth_network_api::{EthProtocolInfo, NetworkStatus, PeerInfo, ReputationChangeKind};
use reth_network_peers::{NodeRecord, PeerId};
use reth_primitives::ForkId;
use reth_provider::{BlockNumReader, BlockReader};
Expand Down Expand Up @@ -604,17 +604,17 @@ where
}
}
NetworkHandleMessage::GetPeerInfos(tx) => {
let _ = tx.send(self.swarm.sessions_mut().get_peer_info());
let _ = tx.send(self.get_peer_infos());
}
NetworkHandleMessage::GetPeerInfoById(peer_id, tx) => {
let _ = tx.send(self.swarm.sessions_mut().get_peer_info_by_id(peer_id));
let _ = tx.send(self.get_peer_info_by_id(peer_id));
}
NetworkHandleMessage::GetPeerInfosByIds(peer_ids, tx) => {
let _ = tx.send(self.swarm.sessions().get_peer_infos_by_ids(peer_ids));
let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
}
NetworkHandleMessage::GetPeerInfosByPeerKind(kind, tx) => {
let peers = self.swarm.state().peers().peers_by_kind(kind);
let _ = tx.send(self.swarm.sessions().get_peer_infos_by_ids(peers));
let peer_ids = self.swarm.state().peers().peers_by_kind(kind);
let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
}
NetworkHandleMessage::AddRlpxSubProtocol(proto) => self.add_rlpx_sub_protocol(proto),
NetworkHandleMessage::GetTransactionsHandle(tx) => {
Expand Down Expand Up @@ -865,6 +865,38 @@ where
}
}

/// Returns [`PeerInfo`] for all connected peers
fn get_peer_infos(&self) -> Vec<PeerInfo> {
let peer_manager = self.swarm.state().peers();
let mut peers = Vec::with_capacity(peer_manager.num_known_peers());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this returns the number of peers in the peer set wich is larger than connected peers. this should allocated based on active sessions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we only return the peer who have peer record, so I think the num_known_peers is sufficient

if let Some((record, kind)) = peer_manager.peer_by_id(*peer_id) {
peers.push(session.peer_info(&record, kind));
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will allocate more capacity then necessary because known_peers > active

you can use

        self.swarm
            .sessions()
            .active_sessions()
            .into_iter()
            .filter_map(|(peer_id, session)| {
                let (record, kind) = self.swarm.state().peers().peer_by_id(*peer_id)?;
                Some(session.peer_info(&record, kind))
            })
            .collect()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the advice, I previously thought of know_peers < active, don't expect know_peers > active, but in which case is known_peers > active, the connection is established, but the session not starting?

for (peer_id, session) in self.swarm.sessions().active_sessions() {
if let Some((record, kind)) = peer_manager.peer_by_id(*peer_id) {
peers.push(session.peer_info(&record, kind));
}
}
peers
}

/// Returns [`PeerInfo`] for a given peer.
///
/// Returns `None` if there's no active session to the peer.
fn get_peer_info_by_id(&self, peer_id: PeerId) -> Option<PeerInfo> {
self.swarm.sessions().active_sessions().get(&peer_id).and_then(|session| {
self.swarm
.state()
.peers()
.peer_by_id(peer_id)
.map(|(record, kind)| session.peer_info(&record, kind))
})
}

/// Returns [`PeerInfo`] for a given peers.
///
/// Ignore the non-active peer.
fn get_peer_infos_by_ids(&self, peer_ids: impl IntoIterator<Item = PeerId>) -> Vec<PeerInfo> {
peer_ids.into_iter().filter_map(|peer_id| self.get_peer_info_by_id(peer_id)).collect()
}

/// Updates the metrics for active,established connections
#[inline]
fn update_active_connection_metrics(&self) {
Expand Down
22 changes: 12 additions & 10 deletions crates/net/network/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,17 @@ impl PeersManager {
})
}

/// Returns the [`NodeRecord`] for the given peer id
#[allow(dead_code)]
fn peer_by_id(&self, peer_id: PeerId) -> Option<NodeRecord> {
/// Returns the `NodeRecord` and `PeerKind` for the given peer id
pub(crate) fn peer_by_id(&self, peer_id: PeerId) -> Option<(NodeRecord, PeerKind)> {
self.peers.get(&peer_id).map(|v| {
NodeRecord::new_with_ports(
v.addr.tcp.ip(),
v.addr.tcp.port(),
v.addr.udp.map(|addr| addr.port()),
peer_id,
(
NodeRecord::new_with_ports(
v.addr.tcp.ip(),
v.addr.tcp.port(),
v.addr.udp.map(|addr| addr.port()),
peer_id,
),
v.kind,
)
})
}
Expand Down Expand Up @@ -1378,7 +1380,7 @@ mod tests {
_ => unreachable!(),
}

let record = peers.peer_by_id(peer).unwrap();
let (record, _) = peers.peer_by_id(peer).unwrap();
assert_eq!(record.tcp_addr(), socket_addr);
assert_eq!(record.udp_addr(), socket_addr);
}
Expand All @@ -1405,7 +1407,7 @@ mod tests {
_ => unreachable!(),
}

let record = peers.peer_by_id(peer).unwrap();
let (record, _) = peers.peer_by_id(peer).unwrap();
assert_eq!(record.tcp_addr(), tcp_addr);
assert_eq!(record.udp_addr(), udp_addr);
}
Expand Down
9 changes: 6 additions & 3 deletions crates/net/network/src/session/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use reth_eth_wire::{
errors::EthStreamError,
DisconnectReason, EthVersion, Status,
};
use reth_network_api::PeerInfo;
use reth_network_peers::PeerId;
use reth_network_api::{PeerInfo, PeerKind};
use reth_network_peers::{NodeRecord, PeerId};
use std::{io, net::SocketAddr, sync::Arc, time::Instant};
use tokio::sync::{
mpsc::{self, error::SendError},
Expand Down Expand Up @@ -136,17 +136,20 @@ impl ActiveSessionHandle {
}

/// Extracts the [`PeerInfo`] from the session handle.
pub(crate) fn peer_info(&self) -> PeerInfo {
pub(crate) fn peer_info(&self, record: &NodeRecord, kind: PeerKind) -> PeerInfo {
PeerInfo {
remote_id: self.remote_id,
direction: self.direction,
enode: record.to_string(),
enr: None,
remote_addr: self.remote_addr,
local_addr: self.local_addr,
capabilities: self.capabilities.clone(),
client_version: self.client_version.clone(),
eth_version: self.version,
status: self.status.clone(),
session_established: self.established,
kind,
}
}
}
Expand Down
34 changes: 5 additions & 29 deletions crates/net/network/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ impl SessionManager {
self.secret_key
}

/// Returns a borrowed reference to the active sessions.
pub const fn active_sessions(&self) -> &HashMap<PeerId, ActiveSessionHandle> {
&self.active_sessions
}

/// Returns the session hello message.
pub fn hello_message(&self) -> HelloMessageWithProtocols {
self.hello_message.clone()
Expand Down Expand Up @@ -587,35 +592,6 @@ impl SessionManager {
}
}
}

/// Returns [`PeerInfo`] for all connected peers
pub(crate) fn get_peer_info(&self) -> Vec<PeerInfo> {
self.active_sessions.values().map(ActiveSessionHandle::peer_info).collect()
}

/// Returns [`PeerInfo`] for a given peer.
///
/// Returns `None` if there's no active session to the peer.
pub(crate) fn get_peer_info_by_id(&self, peer_id: PeerId) -> Option<PeerInfo> {
self.active_sessions.get(&peer_id).map(ActiveSessionHandle::peer_info)
}
/// Returns [`PeerInfo`] for a given peer.
///
/// Returns `None` if there's no active session to the peer.
pub(crate) fn get_peer_infos_by_ids(
&self,
peer_ids: impl IntoIterator<Item = PeerId>,
) -> Vec<PeerInfo> {
let mut infos = Vec::new();
for peer_id in peer_ids {
if let Some(info) =
self.active_sessions.get(&peer_id).map(ActiveSessionHandle::peer_info)
{
infos.push(info);
}
}
infos
}
}

/// Events produced by the [`SessionManager`]
Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/rpc-api/src/admin.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
use reth_network_peers::{AnyNode, NodeRecord};
use reth_rpc_types::{admin::NodeInfo, PeerInfo};
use reth_rpc_types::admin::{NodeInfo, PeerInfo};

/// Admin namespace rpc interface that gives access to several non-standard RPC methods.
#[cfg_attr(not(feature = "client"), rpc(server, namespace = "admin"))]
Expand Down
66 changes: 38 additions & 28 deletions crates/rpc/rpc/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use async_trait::async_trait;
use jsonrpsee::core::RpcResult;
use reth_chainspec::ChainSpec;
use reth_network_api::{NetworkInfo, PeerKind, Peers};
use reth_network_peers::{AnyNode, NodeRecord};
use reth_network_peers::{id2pk, AnyNode, NodeRecord};
use reth_rpc_api::AdminApiServer;
use reth_rpc_server_types::ToRpcResult;
use reth_rpc_types::{
admin::{EthProtocolInfo, NodeInfo, Ports, ProtocolInfo},
PeerEthProtocolInfo, PeerInfo, PeerNetworkInfo, PeerProtocolsInfo,
use reth_rpc_types::admin::{
EthInfo, EthPeerInfo, EthProtocolInfo, NodeInfo, PeerInfo, PeerNetworkInfo, PeerProtocolInfo,
Ports, ProtocolInfo,
};

/// `admin` API implementation.
Expand Down Expand Up @@ -63,33 +63,43 @@ where
Ok(true)
}

/// Handler for `admin_peers`
async fn peers(&self) -> RpcResult<Vec<PeerInfo>> {
let peers = self.network.get_all_peers().await.to_rpc_result()?;
let peers = peers
.into_iter()
.map(|peer| PeerInfo {
id: Some(peer.remote_id.to_string()),
name: peer.client_version.to_string(),
caps: peer.capabilities.capabilities().iter().map(|cap| cap.to_string()).collect(),
network: PeerNetworkInfo {
remote_address: peer.remote_addr.to_string(),
local_address: peer
.local_addr
.unwrap_or_else(|| self.network.local_addr())
.to_string(),
},
protocols: PeerProtocolsInfo {
eth: Some(PeerEthProtocolInfo {
difficulty: Some(peer.status.total_difficulty),
head: peer.status.blockhash.to_string(),
version: peer.status.version as u32,
}),
pip: None,
},
})
.collect();
let mut infos = Vec::with_capacity(peers.len());

Ok(peers)
for peer in peers {
if let Ok(pk) = id2pk(peer.remote_id) {
infos.push(PeerInfo {
id: pk.to_string(),
jsvisa marked this conversation as resolved.
Show resolved Hide resolved
name: peer.client_version.to_string(),
enode: peer.enode,
enr: peer.enr,
caps: peer
.capabilities
.capabilities()
.iter()
.map(|cap| cap.to_string())
.collect(),
network: PeerNetworkInfo {
remote_address: peer.remote_addr,
local_address: peer.local_addr.unwrap_or_else(|| self.network.local_addr()),
inbound: peer.direction.is_incoming(),
trusted: peer.kind.is_trusted(),
static_node: peer.kind.is_static(),
},
protocols: PeerProtocolInfo {
eth: Some(EthPeerInfo::Info(EthInfo {
version: peer.status.version as u64,
})),
snap: None,
other: Default::default(),
},
})
}
}

Ok(infos)
}

/// Handler for `admin_nodeInfo`
Expand Down
Loading