Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add transport support for dnsaddr #1450

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions misc/multiaddr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ impl Multiaddr {
self
}

pub fn concat(&mut self, m: &Multiaddr) {
m.iter().for_each(|p| self.push(p));
}

/// Returns the components of this multiaddress.
///
/// # Example
Expand Down Expand Up @@ -268,6 +272,17 @@ impl<'a> From<Protocol<'a>> for Multiaddr {
}
}

impl<'a> From<&[Protocol<'a>]> for Multiaddr {
fn from(protocols: &[Protocol<'a>]) -> Multiaddr {
protocols.iter()
.map(|p| p.clone().into())
.fold(Multiaddr::empty(), |mut addr, next| {
addr.concat(&next);
addr
})
}
}

impl From<IpAddr> for Multiaddr {
fn from(v: IpAddr) -> Multiaddr {
match v {
Expand Down
2 changes: 2 additions & 0 deletions transports/dns/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ categories = ["network-programming", "asynchronous"]
libp2p-core = { version = "0.15.0", path = "../../core" }
log = "0.4.1"
futures = "0.3.1"
trust-dns-client = "0.19.2"
trust-dns-proto = "0.19.2"
241 changes: 197 additions & 44 deletions transports/dns/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,19 @@
//! replaced with respectively an `/ip4/` or an `/ip6/` component.
//!

use futures::{prelude::*, channel::oneshot, future::BoxFuture};
use futures::{prelude::*, future::BoxFuture, stream::FuturesOrdered};
use libp2p_core::{
Transport,
multiaddr::{Protocol, Multiaddr},
transport::{TransportError, ListenerEvent}
};
use log::{error, debug, trace};
use std::{error, fmt, io, net::ToSocketAddrs};
use std::str::FromStr;
use trust_dns_client::rr::{DNSClass, RecordType};
use trust_dns_client::udp::UdpClientConnection;
use trust_dns_client::client::{SyncClient, Client};
use trust_dns_proto::rr::domain::Name;

/// Represents the configuration for a DNS transport capability of libp2p.
///
Expand Down Expand Up @@ -122,6 +127,7 @@ where
let contains_dns = addr.iter().any(|cmp| match cmp {
Protocol::Dns4(_) => true,
Protocol::Dns6(_) => true,
Protocol::Dnsaddr(_) => true,
_ => false,
});

Expand All @@ -133,53 +139,43 @@ where
}

trace!("Dialing address with DNS: {}", addr);
let resolve_futs = addr.iter()
.map(|cmp| match cmp {
Protocol::Dns4(ref name) | Protocol::Dns6(ref name) => {
let name = name.to_string();
let to_resolve = format!("{}:0", name);
let (tx, rx) = oneshot::channel();
self.thread_pool.spawn_ok(async {
let to_resolve = to_resolve;
let _ = tx.send(match to_resolve[..].to_socket_addrs() {
Ok(list) => Ok(list.map(|s| s.ip()).collect::<Vec<_>>()),
Err(e) => Err(e),
});
});
let mut resolve_futs = FuturesOrdered::new();
let protocols = addr.iter().collect::<Vec<_>>();

let is_dns4 = if let Protocol::Dns4(_) = cmp { true } else { false };

async move {
let list = rx.await
.map_err(|_| {
error!("DNS resolver crashed");
DnsErr::ResolveFail(name.clone())
})?
.map_err(|err| DnsErr::ResolveError {
domain_name: name.clone(),
error: err,
})?;

list.into_iter()
.filter_map(|addr| {
if (is_dns4 && addr.is_ipv4()) || (!is_dns4 && addr.is_ipv6()) {
Some(Protocol::from(addr))
} else {
None
}
})
.next()
.ok_or_else(|| DnsErr::ResolveFail(name))
}.left_future()
},
cmp => future::ready(Ok(cmp.acquire())).right_future()
})
.collect::<stream::FuturesOrdered<_>>();
for (i, protocol) in addr.iter().enumerate() {
resolve_futs.push(match protocol {
Protocol::Dns4(_) | Protocol::Dns6(_) | Protocol::Dnsaddr(_) => {
let is_dnsaddr = if let Protocol::Dnsaddr(_) = protocol { true } else { false };
let suffix: Multiaddr = (&protocols[(i + 1)..]).into();

resolve_dns::<T>(protocol.clone().acquire())
.and_then(move |resolved_addr| {
if is_dnsaddr {
suffix_matching::<T>(resolved_addr, suffix).left_future()
} else {
future::ok(resolved_addr).right_future()
}
})
.left_future()
}
ref p => future::ready(Ok(p.clone().into())).right_future()
});

// dnsaddr consumes the rest of the components of the multiaddr
if let Protocol::Dnsaddr(_) = protocol {
break;
}
}

let future = resolve_futs.collect::<Vec<_>>()
.then(move |outcome| async move {
let outcome = outcome.into_iter().collect::<Result<Vec<_>, _>>()?;
let outcome = outcome.into_iter().collect::<Multiaddr>();
let outcome = outcome.into_iter()
.collect::<Result<Vec<Multiaddr>, _>>()?
.iter()
.fold(Multiaddr::empty(), |mut m, next| {
m.concat(next);
m
});
debug!("DNS resolution outcome: {} => {}", addr, outcome);

match self.inner.dial(outcome) {
Expand Down Expand Up @@ -208,6 +204,8 @@ pub enum DnsErr<TErr> {
},
/// Found an IP address, but the underlying transport doesn't support the multiaddr.
MultiaddrNotSupported,
/// Found an IP address, but the suffix doesn't matched.
SuffixDoesNotMatched,
}

impl<TErr> fmt::Display for DnsErr<TErr>
Expand All @@ -221,6 +219,7 @@ where TErr: fmt::Display
write!(f, "Failed to resolve DNS address: {:?}; {:?}", domain_name, error)
},
DnsErr::MultiaddrNotSupported => write!(f, "Resolve multiaddr not supported"),
DnsErr::SuffixDoesNotMatched => write!(f, "Suffix does not matched"),
}
}
}
Expand All @@ -234,7 +233,112 @@ where TErr: error::Error + 'static
DnsErr::ResolveFail(_) => None,
DnsErr::ResolveError { error, .. } => Some(error),
DnsErr::MultiaddrNotSupported => None,
DnsErr::SuffixDoesNotMatched => None,
}
}
}

async fn resolve_dns<T>(p: Protocol<'_>) -> Result<Multiaddr, DnsErr<T::Error>>
where
T: Transport
{
match p {
Protocol::Dns4(ref name) | Protocol::Dns6(ref name) => {
let to_resolve = format!("{}:0", name);
let list = match to_resolve[..].to_socket_addrs() {
Ok(list) => Ok(list.map(|s| s.ip()).collect::<Vec<_>>()),
Err(e) => Err(e)
}.map_err(|_| {
error!("DNS resolver crashed");
DnsErr::ResolveFail(name.to_string())
})?;

let is_dns4 = if let Protocol::Dns4(_) = p { true } else { false };

list.into_iter()
.filter_map(|addr| {
if (is_dns4 && addr.is_ipv4()) || (!is_dns4 && addr.is_ipv6()) {
Some(Multiaddr::from(addr))
} else {
None
}
})
.next()
.ok_or_else(|| DnsErr::ResolveFail(name.to_string()))
}
Protocol::Dnsaddr(ref n) => {
let conn = UdpClientConnection::new("8.8.8.8:53".parse().unwrap()).unwrap(); // TODO: error handling
let client = SyncClient::new(conn); // TODO: should use async client?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// TODO: should use async client?

Yes!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm...

I've added trust-dns in order to query TXT records. I'm trying to switch the SyncClient to AsyncClient but I think AsyncClient can not be used for rust-libp2p because a background future is required for the client. 🤔

https://docs.rs/trust-dns-client/0.19.3/trust_dns_client/#async-usage

// Create a new client, the bg is a background future which handles
//   the multiplexing of the DNS requests to the server.
//   the client is a handle to an unbounded queue for sending requests via the
//   background. The background must be scheduled to run before the client can
//   send any dns requests
let client = AsyncClient::connect(stream);

// await the connection to be established
let (mut client, bg) = runtime.block_on(client).expect("connection failed");

// make sure to run the background task
runtime.spawn(bg);

======

Do you know other library what can query TXT records asynchronously? 💦

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think AsyncClient can not be used for rust-libp2p because a background future is required for the client.

That makes it more difficult to implement, but normally not impossible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is not to run the background future in the background, but to run it alongside with the "front" one. I don't have more precise guidelines without looking deeply into details (which I don't really have the time right now to do) though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've thought for a while about this but I can not come up anything to address the issue... hmm... 🤔💦


let name = Name::from_str(&format!("_dnsaddr.{}", n)).unwrap(); // TODO: error handling
let response = client.query(&name, DNSClass::IN, RecordType::TXT).unwrap(); // TODO: error handling
let resolved_addrs = response.answers().iter()
.filter_map(|record| {
if let trust_dns_client::proto::rr::RData::TXT(ref txt) = record.rdata() {
Some(txt.iter())
} else {
None
}
})
.flatten()
.filter_map(|bytes| {
let str = std::str::from_utf8(bytes).unwrap(); // TODO: error handling
let addr = Multiaddr::from_str(str.trim_start_matches("dnsaddr=")).unwrap(); // TODO: error handling

match addr.iter().next() {
Some(Protocol::Ip4(_)) | Some(Protocol::Ip6(_)) => {
Some(addr)
}
Some(Protocol::Dnsaddr(ref name)) => {
// TODO: recursive resolution
error!("Resolved to the dnsaddr {:?} but recursive resolution is not supported for now.", name);
None
}
protocol => {
// TODO: error handling
error!("{:?}", protocol);
None
}
}
})
.collect::<Vec<_>>();

// if resolved_addrs.len() == 0 {} // TODO

Ok(resolved_addrs[0].clone())
}
_ => unreachable!()
}
}

async fn suffix_matching<T>(resolved_addr: Multiaddr, suffix: Multiaddr) -> Result<Multiaddr, DnsErr<T::Error>>
where
T: Transport
{
let resolved_addr_len = resolved_addr.iter().count();
let suffix_len = suffix.iter().count();

// Make sure the addr is at least as long as the suffix we're looking for.
if resolved_addr_len < suffix_len {
// not long enough.
return Err(DnsErr::SuffixDoesNotMatched);
}

// Matches everything after the /dnsaddr/... with the end of the dnsaddr record.
//
// v-------------------resolved_addr_len---------------------v
// /ip6/2001:db8:20:3:1000:100:20:3/tcp/1234/p2p/Qmxxxxxxxxxxx
// /p2p/Qmxxxxxxxxxxx
// ^----(resolved_addr_len - suffix_len)----^---suffix_len---^
let resolved_addr_suffix: Multiaddr = {
let protocols = resolved_addr.iter().collect::<Vec<Protocol>>();
protocols[(resolved_addr_len - suffix_len)..].into()
};

if resolved_addr_suffix == suffix {
Ok(resolved_addr)
} else {
Err(DnsErr::SuffixDoesNotMatched)
}
}

Expand Down Expand Up @@ -299,10 +403,59 @@ mod tests {
.unwrap();

let _ = transport
.clone()
.dial("/ip4/1.2.3.4/tcp/20000".parse().unwrap())
.unwrap()
.await
.unwrap();
});
}

#[test]
fn dnsaddr() {
#[derive(Clone)]
struct CustomTransport;

impl Transport for CustomTransport {
type Output = ();
type Error = std::io::Error;
type Listener = BoxStream<'static, Result<ListenerEvent<Self::ListenerUpgrade>, Self::Error>>;
type ListenerUpgrade = BoxFuture<'static, Result<Self::Output, Self::Error>>;
type Dial = BoxFuture<'static, Result<Self::Output, Self::Error>>;

fn listen_on(self, _: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
unreachable!()
}

fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let addr = addr.iter().collect::<Vec<_>>();
assert_eq!(addr.len(), 3);
// TODO: ipfs
// match addr[2] {
// Protocol::Ipfs(_) => (),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll file another PR to add the ipfs support.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// _ => panic!(),
// };
match addr[1] {
Protocol::Tcp(_) => (),
_ => panic!(),
};
match addr[0] {
Protocol::Ip4(_) => (),
Protocol::Ip6(_) => (),
_ => panic!(),
};
Ok(Box::pin(future::ready(Ok(()))))
}
}

futures::executor::block_on(async move {
let transport = DnsConfig::new(CustomTransport).unwrap();
let _ = transport
.clone()
.dial("/dnsaddr/sjc-1.bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN".parse().unwrap())
.unwrap()
.await
.unwrap();
});
}
}