Skip to content

Commit

Permalink
feat: metrics, and backport fixes from sync-gossip-bytes branch
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Jul 28, 2023
1 parent cbdf850 commit 2ccfba6
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions iroh-gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ rand = { version = "0.8.5", features = ["std_rng"] }
rand_core = "0.6.4"
serde = { version = "1.0.164", features = ["derive"] }
tracing = "0.1.37"
iroh-metrics = { path = "../iroh-metrics", version = "0.5.0" }

# net dependencies (optional)
futures = { version = "0.3.25", optional = true }
Expand Down
1 change: 1 addition & 0 deletions iroh-gossip/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#![deny(missing_docs, rustdoc::broken_intra_doc_links)]

pub mod metrics;
#[cfg(feature = "net")]
pub mod net;
pub mod proto;
49 changes: 49 additions & 0 deletions iroh-gossip/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
//! Metrics for iroh-gossip

use iroh_metrics::{
core::{Counter, Metric},
struct_iterable::Iterable,
};

/// Enum of metrics for the module
#[allow(missing_docs)]
#[derive(Debug, Clone, Iterable)]
pub struct Metrics {
pub msgs_ctrl_sent: Counter,
pub msgs_ctrl_recv: Counter,
pub msgs_data_sent: Counter,
pub msgs_data_recv: Counter,
pub msgs_data_sent_size: Counter,
pub msgs_data_recv_size: Counter,
pub msgs_ctrl_sent_size: Counter,
pub msgs_ctrl_recv_size: Counter,
pub neighbor_up: Counter,
pub neighbor_down: Counter,
// pub topics_joined: Counter,
// pub topics_left: Counter,
}

impl Default for Metrics {
fn default() -> Self {
Self {
msgs_ctrl_sent: Counter::new("Number of control messages sent"),
msgs_ctrl_recv: Counter::new("Number of control messages received"),
msgs_data_sent: Counter::new("Number of data messages sent"),
msgs_data_recv: Counter::new("Number of data messages received"),
msgs_data_sent_size: Counter::new("Total size of all data messages sent"),
msgs_data_recv_size: Counter::new("Total size of all data messages received"),
msgs_ctrl_sent_size: Counter::new("Total size of all control messages sent"),
msgs_ctrl_recv_size: Counter::new("Total size of all control messages received"),
neighbor_up: Counter::new("Number of times we connected to a peer"),
neighbor_down: Counter::new("Number of times we disconnected from a peer"),
// topics_joined: Counter::new("Number of times we joined a topic"),
// topics_left: Counter::new("Number of times we left a topic"),
}
}
}

impl Metric for Metrics {
fn name() -> &'static str {
"Iroh Gossip"
}
}
18 changes: 16 additions & 2 deletions iroh-gossip/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tracing::{debug, warn};
use self::util::{read_message, write_message, Dialer, Timers};
use crate::proto::{self, TopicId};

mod util;
pub mod util;

/// ALPN protocol name
pub const GOSSIP_ALPN: &[u8] = b"n0/iroh-gossip/0";
Expand Down Expand Up @@ -133,6 +133,14 @@ impl GossipHandle {
rx.await?
}

/// Quit a topic
///
/// This leaves the swarm for a topic, notifying other peers.
pub async fn quit(&self, topic: TopicId) -> anyhow::Result<()> {
self.send(ToActor::Quit(topic)).await?;
Ok(())
}

/// Broadcast a message on a topic
///
/// Does not join the topic automatically, so you have to call [Self::join] yourself
Expand Down Expand Up @@ -231,6 +239,7 @@ enum ConnOrigin {
enum ToActor {
ConnIncoming(PeerId, ConnOrigin, quinn::Connection),
Join(TopicId, Vec<PeerId>, oneshot::Sender<anyhow::Result<()>>),
Quit(TopicId),
Broadcast(TopicId, Bytes, oneshot::Sender<anyhow::Result<()>>),
Subscribe(
TopicId,
Expand All @@ -246,6 +255,7 @@ impl fmt::Debug for ToActor {
write!(f, "ConnIncoming({peer_id:?}, {origin:?})")
}
ToActor::Join(topic, peers, _reply) => write!(f, "Join({topic:?}, {peers:?})"),
ToActor::Quit(topic) => write!(f, "Quit({topic:?})"),
ToActor::Broadcast(topic, message, _reply) => {
write!(f, "Broadcast({topic:?}, bytes<{}>)", message.len())
}
Expand Down Expand Up @@ -332,7 +342,6 @@ impl GossipActor {
for (_instant, timer) in drain {
self.handle_in_event(InEvent::TimerExpired(timer), now).await.context("timers.drain_expired -> handle_in_event")?;
}
self.timers.reset();
}

}
Expand Down Expand Up @@ -392,6 +401,11 @@ impl GossipActor {
});
}
}
ToActor::Quit(topic_id) => {
self.handle_in_event(InEvent::Command(topic_id, Command::Quit), now)
.await?;
self.subscribers_topic.remove(&topic_id);
}
ToActor::Broadcast(topic_id, message, reply) => {
self.handle_in_event(InEvent::Command(topic_id, Command::Broadcast(message)), now)
.await?;
Expand Down
44 changes: 38 additions & 6 deletions iroh-gossip/src/net/util.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Utilities for iroh-gossip networking

use std::{collections::HashMap, io, pin::Pin, time::Instant};

use anyhow::{anyhow, bail, ensure, Context, Result};
Expand All @@ -14,7 +16,7 @@ use crate::proto::util::TimerMap;

use super::{ProtoMessage, MAX_MESSAGE_SIZE};

/// Write a [ProtoMessage] as a length-prefixed, postcard-encoded message.
/// Write a `ProtoMessage` as a length-prefixed, postcard-encoded message.
pub async fn write_message<W: AsyncWrite + Unpin>(
writer: &mut W,
buffer: &mut BytesMut,
Expand All @@ -30,7 +32,7 @@ pub async fn write_message<W: AsyncWrite + Unpin>(
Ok(())
}

/// Read a length-prefixed message and decode as [[ProtoMessage]];
/// Read a length-prefixed message and decode as `ProtoMessage`;
pub async fn read_message(
reader: impl AsyncRead + Unpin,
buffer: &mut BytesMut,
Expand Down Expand Up @@ -74,25 +76,35 @@ pub async fn read_lp(
Ok(Some(buffer.split_to(size).freeze()))
}

/// Future for a pending dial operation
pub type DialFuture = BoxFuture<'static, (PeerId, anyhow::Result<quinn::Connection>)>;

/// Dial peers and maintain a queue of pending dials
///
/// This wraps a [MagicEndpoint], connects to peers through the endpoint, stores
/// the pending connect futures and emits finished connect results.
///
/// TODO: Move to iroh-net
#[derive(Debug)]
pub struct Dialer {
endpoint: MagicEndpoint,
pending: FuturesUnordered<DialFuture>,
pending_peers: HashMap<PeerId, CancellationToken>,
}
impl Dialer {
/// Create a new dialer for a [`MagicEndpoint`]
pub fn new(endpoint: MagicEndpoint) -> Self {
Self {
endpoint,
pending: Default::default(),
pending_peers: Default::default(),
}
}

/// Start to dial a peer
///
/// Note that the peer's addresses and/or derp region must be added to the endpoint's
/// addressbook for a dial to succeed, see [`MagicEndpoint::add_known_addrs`].
pub fn queue_dial(&mut self, peer_id: PeerId, alpn_protocol: &'static [u8]) {
if self.is_pending(&peer_id) {
return;
Expand All @@ -112,16 +124,19 @@ impl Dialer {
self.pending.push(fut.boxed());
}

/// Abort a pending dial
pub fn abort_dial(&mut self, peer_id: &PeerId) {
if let Some(cancel) = self.pending_peers.remove(peer_id) {
cancel.cancel();
}
}

/// Check if a peer is currently being dialed
pub fn is_pending(&self, peer: &PeerId) -> bool {
self.pending_peers.contains_key(peer)
}

/// Wait for the next dial operation to complete
pub async fn next(&mut self) -> (PeerId, anyhow::Result<quinn::Connection>) {
match self.pending_peers.is_empty() {
false => {
Expand All @@ -134,31 +149,48 @@ impl Dialer {
}
}

/// A [`TimerMap`] with an async method to wait for the next timer expiration.
#[derive(Debug)]
pub struct Timers<T> {
next: Option<(Instant, Pin<Box<Sleep>>)>,
map: TimerMap<T>,
}
impl<T> Timers<T> {
pub fn new() -> Self {

impl<T> Default for Timers<T> {
fn default() -> Self {
Self {
next: None,
map: TimerMap::default(),
}
}
}

impl<T> Timers<T> {
/// Create a new timer map
pub fn new() -> Self {
Self::default()
}

/// Insert a new entry at the specified instant
pub fn insert(&mut self, instant: Instant, item: T) {
self.map.insert(instant, item);
self.reset();
}

pub fn reset(&mut self) {
fn reset(&mut self) {
self.next = self
.map
.first()
.map(|(instant, _)| (*instant, Box::pin(sleep_until((*instant).into()))))
}

/// Wait for the next timer to expire and return an iterator of all expired timers
///
/// If the [TimerMap] is empty, this will return a future that is pending forever.
/// After inserting a new entry, prior futures returned from this method will not become ready.
/// They should be dropped after calling [Self::insert], and a new future as returned from
/// this method should be awaited instead.
pub async fn wait_and_drain(&mut self) -> impl Iterator<Item = (Instant, T)> {
self.reset();
match self.next.as_mut() {
Some((instant, sleep)) => {
sleep.await;
Expand Down
1 change: 1 addition & 0 deletions iroh-gossip/src/proto/hyparview.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub enum InEvent<PA> {
}

/// Output event for HyParView
#[derive(Debug)]
pub enum OutEvent<PA> {
/// Ask the IO layer to send a [`Message`] to peer `PA`.
SendMessage(PA, Message<PA>),
Expand Down
1 change: 0 additions & 1 deletion iroh-gossip/src/proto/plumtree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use std::{

use bytes::Bytes;
use derive_more::{Add, From, Sub};
use indexmap::IndexSet;
use serde::{Deserialize, Serialize};
use tracing::warn;

Expand Down
Loading

0 comments on commit 2ccfba6

Please sign in to comment.