diff --git a/misc/multiaddr/src/lib.rs b/misc/multiaddr/src/lib.rs index 56b46743525..84b1ee19677 100644 --- a/misc/multiaddr/src/lib.rs +++ b/misc/multiaddr/src/lib.rs @@ -113,6 +113,10 @@ impl Multiaddr { self } + pub fn concat(&mut self, m: &Multiaddr) { + m.iter().for_each(|p| self.push(p)); + } + /// Returns the components of this multiaddress. /// /// # Example @@ -268,6 +272,17 @@ impl<'a> From> for Multiaddr { } } +impl<'a> From<&[Protocol<'a>]> for Multiaddr { + fn from(protocols: &[Protocol<'a>]) -> Multiaddr { + protocols.iter() + .map(|p| p.clone().into()) + .fold(Multiaddr::empty(), |mut addr, next| { + addr.concat(&next); + addr + }) + } +} + impl From for Multiaddr { fn from(v: IpAddr) -> Multiaddr { match v { diff --git a/transports/dns/Cargo.toml b/transports/dns/Cargo.toml index 40045b45b23..a251660499b 100644 --- a/transports/dns/Cargo.toml +++ b/transports/dns/Cargo.toml @@ -13,3 +13,5 @@ categories = ["network-programming", "asynchronous"] libp2p-core = { version = "0.15.0", path = "../../core" } log = "0.4.1" futures = "0.3.1" +trust-dns-client = "0.19.2" +trust-dns-proto = "0.19.2" diff --git a/transports/dns/src/lib.rs b/transports/dns/src/lib.rs index 6e03b99ab73..7ee81874763 100644 --- a/transports/dns/src/lib.rs +++ b/transports/dns/src/lib.rs @@ -33,7 +33,7 @@ //! replaced with respectively an `/ip4/` or an `/ip6/` component. //! -use futures::{prelude::*, channel::oneshot, future::BoxFuture}; +use futures::{prelude::*, future::BoxFuture, stream::FuturesOrdered}; use libp2p_core::{ Transport, multiaddr::{Protocol, Multiaddr}, @@ -41,6 +41,11 @@ use libp2p_core::{ }; use log::{error, debug, trace}; use std::{error, fmt, io, net::ToSocketAddrs}; +use std::str::FromStr; +use trust_dns_client::rr::{DNSClass, RecordType}; +use trust_dns_client::udp::UdpClientConnection; +use trust_dns_client::client::{SyncClient, Client}; +use trust_dns_proto::rr::domain::Name; /// Represents the configuration for a DNS transport capability of libp2p. /// @@ -122,6 +127,7 @@ where let contains_dns = addr.iter().any(|cmp| match cmp { Protocol::Dns4(_) => true, Protocol::Dns6(_) => true, + Protocol::Dnsaddr(_) => true, _ => false, }); @@ -133,53 +139,43 @@ where } trace!("Dialing address with DNS: {}", addr); - let resolve_futs = addr.iter() - .map(|cmp| match cmp { - Protocol::Dns4(ref name) | Protocol::Dns6(ref name) => { - let name = name.to_string(); - let to_resolve = format!("{}:0", name); - let (tx, rx) = oneshot::channel(); - self.thread_pool.spawn_ok(async { - let to_resolve = to_resolve; - let _ = tx.send(match to_resolve[..].to_socket_addrs() { - Ok(list) => Ok(list.map(|s| s.ip()).collect::>()), - Err(e) => Err(e), - }); - }); + let mut resolve_futs = FuturesOrdered::new(); + let protocols = addr.iter().collect::>(); - let is_dns4 = if let Protocol::Dns4(_) = cmp { true } else { false }; - - async move { - let list = rx.await - .map_err(|_| { - error!("DNS resolver crashed"); - DnsErr::ResolveFail(name.clone()) - })? - .map_err(|err| DnsErr::ResolveError { - domain_name: name.clone(), - error: err, - })?; - - list.into_iter() - .filter_map(|addr| { - if (is_dns4 && addr.is_ipv4()) || (!is_dns4 && addr.is_ipv6()) { - Some(Protocol::from(addr)) - } else { - None - } - }) - .next() - .ok_or_else(|| DnsErr::ResolveFail(name)) - }.left_future() - }, - cmp => future::ready(Ok(cmp.acquire())).right_future() - }) - .collect::>(); + for (i, protocol) in addr.iter().enumerate() { + resolve_futs.push(match protocol { + Protocol::Dns4(_) | Protocol::Dns6(_) | Protocol::Dnsaddr(_) => { + let is_dnsaddr = if let Protocol::Dnsaddr(_) = protocol { true } else { false }; + let suffix: Multiaddr = (&protocols[(i + 1)..]).into(); + + resolve_dns::(protocol.clone().acquire()) + .and_then(move |resolved_addr| { + if is_dnsaddr { + suffix_matching::(resolved_addr, suffix).left_future() + } else { + future::ok(resolved_addr).right_future() + } + }) + .left_future() + } + ref p => future::ready(Ok(p.clone().into())).right_future() + }); + + // dnsaddr consumes the rest of the components of the multiaddr + if let Protocol::Dnsaddr(_) = protocol { + break; + } + } let future = resolve_futs.collect::>() .then(move |outcome| async move { - let outcome = outcome.into_iter().collect::, _>>()?; - let outcome = outcome.into_iter().collect::(); + let outcome = outcome.into_iter() + .collect::, _>>()? + .iter() + .fold(Multiaddr::empty(), |mut m, next| { + m.concat(next); + m + }); debug!("DNS resolution outcome: {} => {}", addr, outcome); match self.inner.dial(outcome) { @@ -208,6 +204,8 @@ pub enum DnsErr { }, /// Found an IP address, but the underlying transport doesn't support the multiaddr. MultiaddrNotSupported, + /// Found an IP address, but the suffix doesn't matched. + SuffixDoesNotMatched, } impl fmt::Display for DnsErr @@ -221,6 +219,7 @@ where TErr: fmt::Display write!(f, "Failed to resolve DNS address: {:?}; {:?}", domain_name, error) }, DnsErr::MultiaddrNotSupported => write!(f, "Resolve multiaddr not supported"), + DnsErr::SuffixDoesNotMatched => write!(f, "Suffix does not matched"), } } } @@ -234,7 +233,112 @@ where TErr: error::Error + 'static DnsErr::ResolveFail(_) => None, DnsErr::ResolveError { error, .. } => Some(error), DnsErr::MultiaddrNotSupported => None, + DnsErr::SuffixDoesNotMatched => None, + } + } +} + +async fn resolve_dns(p: Protocol<'_>) -> Result> +where + T: Transport +{ + match p { + Protocol::Dns4(ref name) | Protocol::Dns6(ref name) => { + let to_resolve = format!("{}:0", name); + let list = match to_resolve[..].to_socket_addrs() { + Ok(list) => Ok(list.map(|s| s.ip()).collect::>()), + Err(e) => Err(e) + }.map_err(|_| { + error!("DNS resolver crashed"); + DnsErr::ResolveFail(name.to_string()) + })?; + + let is_dns4 = if let Protocol::Dns4(_) = p { true } else { false }; + + list.into_iter() + .filter_map(|addr| { + if (is_dns4 && addr.is_ipv4()) || (!is_dns4 && addr.is_ipv6()) { + Some(Multiaddr::from(addr)) + } else { + None + } + }) + .next() + .ok_or_else(|| DnsErr::ResolveFail(name.to_string())) + } + Protocol::Dnsaddr(ref n) => { + let conn = UdpClientConnection::new("8.8.8.8:53".parse().unwrap()).unwrap(); // TODO: error handling + let client = SyncClient::new(conn); // TODO: should use async client? + + let name = Name::from_str(&format!("_dnsaddr.{}", n)).unwrap(); // TODO: error handling + let response = client.query(&name, DNSClass::IN, RecordType::TXT).unwrap(); // TODO: error handling + let resolved_addrs = response.answers().iter() + .filter_map(|record| { + if let trust_dns_client::proto::rr::RData::TXT(ref txt) = record.rdata() { + Some(txt.iter()) + } else { + None + } + }) + .flatten() + .filter_map(|bytes| { + let str = std::str::from_utf8(bytes).unwrap(); // TODO: error handling + let addr = Multiaddr::from_str(str.trim_start_matches("dnsaddr=")).unwrap(); // TODO: error handling + + match addr.iter().next() { + Some(Protocol::Ip4(_)) | Some(Protocol::Ip6(_)) => { + Some(addr) + } + Some(Protocol::Dnsaddr(ref name)) => { + // TODO: recursive resolution + error!("Resolved to the dnsaddr {:?} but recursive resolution is not supported for now.", name); + None + } + protocol => { + // TODO: error handling + error!("{:?}", protocol); + None + } + } + }) + .collect::>(); + + // if resolved_addrs.len() == 0 {} // TODO + + Ok(resolved_addrs[0].clone()) } + _ => unreachable!() + } +} + +async fn suffix_matching(resolved_addr: Multiaddr, suffix: Multiaddr) -> Result> +where + T: Transport +{ + let resolved_addr_len = resolved_addr.iter().count(); + let suffix_len = suffix.iter().count(); + + // Make sure the addr is at least as long as the suffix we're looking for. + if resolved_addr_len < suffix_len { + // not long enough. + return Err(DnsErr::SuffixDoesNotMatched); + } + + // Matches everything after the /dnsaddr/... with the end of the dnsaddr record. + // + // v-------------------resolved_addr_len---------------------v + // /ip6/2001:db8:20:3:1000:100:20:3/tcp/1234/p2p/Qmxxxxxxxxxxx + // /p2p/Qmxxxxxxxxxxx + // ^----(resolved_addr_len - suffix_len)----^---suffix_len---^ + let resolved_addr_suffix: Multiaddr = { + let protocols = resolved_addr.iter().collect::>(); + protocols[(resolved_addr_len - suffix_len)..].into() + }; + + if resolved_addr_suffix == suffix { + Ok(resolved_addr) + } else { + Err(DnsErr::SuffixDoesNotMatched) } } @@ -299,10 +403,59 @@ mod tests { .unwrap(); let _ = transport + .clone() .dial("/ip4/1.2.3.4/tcp/20000".parse().unwrap()) .unwrap() .await .unwrap(); }); } + + #[test] + fn dnsaddr() { + #[derive(Clone)] + struct CustomTransport; + + impl Transport for CustomTransport { + type Output = (); + type Error = std::io::Error; + type Listener = BoxStream<'static, Result, Self::Error>>; + type ListenerUpgrade = BoxFuture<'static, Result>; + type Dial = BoxFuture<'static, Result>; + + fn listen_on(self, _: Multiaddr) -> Result> { + unreachable!() + } + + fn dial(self, addr: Multiaddr) -> Result> { + let addr = addr.iter().collect::>(); + assert_eq!(addr.len(), 3); + // TODO: ipfs +// match addr[2] { +// Protocol::Ipfs(_) => (), +// _ => panic!(), +// }; + match addr[1] { + Protocol::Tcp(_) => (), + _ => panic!(), + }; + match addr[0] { + Protocol::Ip4(_) => (), + Protocol::Ip6(_) => (), + _ => panic!(), + }; + Ok(Box::pin(future::ready(Ok(())))) + } + } + + futures::executor::block_on(async move { + let transport = DnsConfig::new(CustomTransport).unwrap(); + let _ = transport + .clone() + .dial("/dnsaddr/sjc-1.bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN".parse().unwrap()) + .unwrap() + .await + .unwrap(); + }); + } }