Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

TelemetryEndpoints must be valid MutliAddr URL #5069

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bin/node/cli/src/chain_spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ pub fn staging_testnet_config() -> ChainSpec {
"staging_testnet",
staging_testnet_config_genesis,
boot_nodes,
Some(TelemetryEndpoints::new(vec![(STAGING_TELEMETRY_URL.to_string(), 0)])),
Some(TelemetryEndpoints::new(vec![(STAGING_TELEMETRY_URL.to_string(), 0)])
.expect("Staging telemetry url is valid; qed")),
None,
None,
Default::default(),
Expand Down
5 changes: 3 additions & 2 deletions client/cli/src/commands/runcmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ impl RunCmd {
config.telemetry_endpoints = None;
} else if !self.telemetry_endpoints.is_empty() {
config.telemetry_endpoints = Some(
TelemetryEndpoints::new(self.telemetry_endpoints.clone())
TelemetryEndpoints::new(self.telemetry_endpoints.clone()).map_err(|e| e.to_string())?
);
}

Expand Down Expand Up @@ -689,7 +689,8 @@ mod tests {
"test-id",
|| (),
vec!["boo".to_string()],
Some(TelemetryEndpoints::new(vec![("foo".to_string(), 42)])),
Some(TelemetryEndpoints::new(vec![("wss://foo/bar".to_string(), 42)])
.expect("provided url should be valid")),
None,
None,
None::<()>,
Expand Down
107 changes: 75 additions & 32 deletions client/telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
//! `slog_scope::with_logger` followed with `slog_log!`.
//!
//! Note that you are supposed to only ever use `telemetry!` and not `slog_scope::with_logger` at
//! the moment. Substate may eventually be reworked to get proper `slog` support, including sending
//! the moment. Substrate may eventually be reworked to get proper `slog` support, including sending
//! information to the telemetry.
//!
//! The [`Telemetry`] struct implements `Stream` and must be polled regularly (or sent to a
Expand All @@ -41,7 +41,7 @@
//! endpoints: sc_telemetry::TelemetryEndpoints::new(vec![
//! // The `0` is the maximum verbosity level of messages to send to this endpoint.
//! ("wss://example.com".into(), 0)
//! ]),
//! ]).expect("Invalid URL or multiaddr provided"),
//! // Can be used to pass an external implementation of WebSockets.
//! wasm_external_transport: None,
//! });
Expand All @@ -62,7 +62,7 @@ use futures::{prelude::*, channel::mpsc};
use libp2p::{Multiaddr, wasm_ext};
use log::{error, warn};
use parking_lot::Mutex;
use serde::{Serialize, Deserialize};
use serde::{Serialize, Deserialize, Deserializer};
use std::{pin::Pin, sync::Arc, task::{Context, Poll}, time::Duration};
use wasm_timer::Instant;

Expand Down Expand Up @@ -96,12 +96,47 @@ pub struct TelemetryConfig {
///
/// The URL string can be either a URL or a multiaddress.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct TelemetryEndpoints(Vec<(String, u8)>);
pub struct TelemetryEndpoints(
#[serde(deserialize_with = "url_or_multiaddr_deser")]
Vec<(Multiaddr, u8)>
);

/// Custom deserializer for TelemetryEndpoints, used to convert urls or multiaddr to multiaddr.
fn url_or_multiaddr_deser<'de, D>(deserializer: D) -> Result<Vec<(Multiaddr, u8)>, D::Error>
where D: Deserializer<'de>
{
Vec::<(String, u8)>::deserialize(deserializer)?
.iter()
.map(|e| Ok((url_to_multiaddr(&e.0)
.map_err(serde::de::Error::custom)?, e.1)))
.collect()
}

impl TelemetryEndpoints {
pub fn new(endpoints: Vec<(String, u8)>) -> Self {
TelemetryEndpoints(endpoints)
pub fn new(endpoints: Vec<(String, u8)>) -> Result<Self, libp2p::multiaddr::Error> {
let endpoints: Result<Vec<(Multiaddr, u8)>, libp2p::multiaddr::Error> = endpoints.iter()
.map(|e| Ok((url_to_multiaddr(&e.0)?, e.1)))
.collect();
endpoints.map(Self)
}
}

/// Parses a WebSocket URL into a libp2p `Multiaddr`.
fn url_to_multiaddr(url: &str) -> Result<Multiaddr, libp2p::multiaddr::Error> {
// First, assume that we have a `Multiaddr`.
let parse_error = match url.parse() {
Ok(ma) => return Ok(ma),
Err(err) => err,
};

// If not, try the `ws://path/url` format.
if let Ok(ma) = libp2p::multiaddr::from_url(url) {
return Ok(ma)
}

// If we have no clue about the format of that string, assume that we were expecting a
// `Multiaddr`.
Err(parse_error)
}

/// Log levels.
Expand Down Expand Up @@ -149,13 +184,7 @@ struct TelemetryDrain {
/// doesn't provide any way of knowing whether a global logger has already been registered.
pub fn init_telemetry(config: TelemetryConfig) -> Telemetry {
// Build the list of telemetry endpoints.
let mut endpoints = Vec::new();
for &(ref url, verbosity) in &config.endpoints.0 {
match url_to_multiaddr(url) {
Ok(addr) => endpoints.push((addr, verbosity)),
Err(err) => warn!(target: "telemetry", "Invalid telemetry URL {}: {}", url, err),
}
}
let (endpoints, wasm_external_transport) = (config.endpoints.0, config.wasm_external_transport);

let (sender, receiver) = mpsc::channel(16);
let guard = {
Expand All @@ -164,7 +193,7 @@ pub fn init_telemetry(config: TelemetryConfig) -> Telemetry {
slog_scope::set_global_logger(root)
};

let worker = match worker::TelemetryWorker::new(endpoints, config.wasm_external_transport) {
let worker = match worker::TelemetryWorker::new(endpoints, wasm_external_transport) {
Ok(w) => Some(w),
Err(err) => {
error!(target: "telemetry", "Failed to initialize telemetry worker: {:?}", err);
Expand Down Expand Up @@ -271,24 +300,6 @@ impl slog::Drain for TelemetryDrain {
}
}

/// Parses a WebSocket URL into a libp2p `Multiaddr`.
fn url_to_multiaddr(url: &str) -> Result<Multiaddr, libp2p::multiaddr::Error> {
// First, assume that we have a `Multiaddr`.
let parse_error = match url.parse() {
Ok(ma) => return Ok(ma),
Err(err) => err,
};

// If not, try the `ws://path/url` format.
if let Ok(ma) = libp2p::multiaddr::from_url(url) {
return Ok(ma)
}

// If we have no clue about the format of that string, assume that we were expecting a
// `Multiaddr`.
Err(parse_error)
}

/// Translates to `slog_scope::info`, but contains an additional verbosity
/// parameter which the log record is tagged with. Additionally the verbosity
/// parameter is added to the record as a key-value pair.
Expand All @@ -300,3 +311,35 @@ macro_rules! telemetry {
})
}
}

#[cfg(test)]
mod telemetry_endpoints_tests {
use libp2p::Multiaddr;
use super::TelemetryEndpoints;
use super::url_to_multiaddr;

#[test]
fn valid_endpoints() {
let endp = vec![("wss://telemetry.polkadot.io/submit/".into(), 3), ("/ip4/80.123.90.4/tcp/5432".into(), 4)];
let telem = TelemetryEndpoints::new(endp.clone()).expect("Telemetry endpoint should be valid");
let mut res: Vec<(Multiaddr, u8)> = vec![];
for (a, b) in endp.iter() {
res.push((url_to_multiaddr(a).expect("provided url should be valid"), *b))
}
assert_eq!(telem.0, res);
}

#[test]
fn invalid_endpoints() {
let endp = vec![("/ip4/...80.123.90.4/tcp/5432".into(), 3), ("/ip4/no:!?;rlkqre;;::::///tcp/5432".into(), 4)];
let telem = TelemetryEndpoints::new(endp);
assert!(telem.is_err());
}

#[test]
fn valid_and_invalid_endpoints() {
let endp = vec![("/ip4/80.123.90.4/tcp/5432".into(), 3), ("/ip4/no:!?;rlkqre;;::::///tcp/5432".into(), 4)];
let telem = TelemetryEndpoints::new(endp);
assert!(telem.is_err());
}
}