diff --git a/scylla/src/transport/connection_pool.rs b/scylla/src/transport/connection_pool.rs index 9e7a6b575..54d70c0eb 100644 --- a/scylla/src/transport/connection_pool.rs +++ b/scylla/src/transport/connection_pool.rs @@ -988,7 +988,7 @@ impl PoolRefiller { // `last_error` must not be `None` if there is a possibility of the pool // being empty. fn update_shared_conns(&mut self, last_error: Option) { - let new_conns = if !self.has_connections() { + let new_conns = if self.is_empty() { Arc::new(MaybePoolConnections::Broken(last_error.unwrap())) } else { let new_conns = if let Some(sharder) = self.sharder.as_ref() { @@ -1046,7 +1046,7 @@ impl PoolRefiller { self.conns[shard_id].len(), self.active_connection_count(), ); - if !self.has_connections() { + if self.is_empty() { let _ = self.pool_empty_notifier.send(()); } self.update_shared_conns(Some(last_error)); @@ -1152,10 +1152,6 @@ impl PoolRefiller { ); } - fn has_connections(&self) -> bool { - self.conns.iter().any(|v| !v.is_empty()) - } - fn active_connection_count(&self) -> usize { self.conns.iter().map(Vec::len).sum::() } diff --git a/scylla/src/transport/node.rs b/scylla/src/transport/node.rs index c7f438362..ba2b3d9f4 100644 --- a/scylla/src/transport/node.rs +++ b/scylla/src/transport/node.rs @@ -1,3 +1,4 @@ +use itertools::Itertools; use tokio::net::lookup_host; use tracing::warn; use uuid::Uuid; @@ -270,27 +271,23 @@ pub(crate) struct ResolvedContactPoint { // The resolution may return multiple IPs and the function returns one of them. // It prefers to return IPv4s first, and only if there are none, IPv6s. pub(crate) async fn resolve_hostname(hostname: &str) -> Result { - let mut ret = None; - let addrs: Vec = match lookup_host(hostname).await { - Ok(addrs) => addrs.collect(), + let addrs = match lookup_host(hostname).await { + Ok(addrs) => itertools::Either::Left(addrs), // Use a default port in case of error, but propagate the original error on failure - Err(e) => lookup_host((hostname, 9042)).await.or(Err(e))?.collect(), - }; - for a in addrs { - match a { - SocketAddr::V4(_) => return Ok(a), - _ => { - ret = Some(a); - } + Err(e) => { + let addrs = lookup_host((hostname, 9042)).await.or(Err(e))?; + itertools::Either::Right(addrs) } - } + }; - ret.ok_or_else(|| { - io::Error::new( - io::ErrorKind::Other, - format!("Empty address list returned by DNS for {}", hostname), - ) - }) + addrs + .find_or_last(|addr| matches!(addr, SocketAddr::V4(_))) + .ok_or_else(|| { + io::Error::new( + io::ErrorKind::Other, + format!("Empty address list returned by DNS for {}", hostname), + ) + }) } /// Transforms the given [`InternalKnownNode`]s into [`ContactPoint`]s. @@ -323,18 +320,20 @@ pub(crate) async fn resolve_contact_points( }) => to_resolve.push((hostname, Some(datacenter.clone()))), }; } - let resolve_futures = to_resolve.iter().map(|(hostname, datacenter)| async move { - match resolve_hostname(hostname).await { - Ok(address) => Some(ResolvedContactPoint { - address, - datacenter: datacenter.clone(), - }), - Err(e) => { - warn!("Hostname resolution failed for {}: {}", hostname, &e); - None + let resolve_futures = to_resolve + .into_iter() + .map(|(hostname, datacenter)| async move { + match resolve_hostname(hostname).await { + Ok(address) => Some(ResolvedContactPoint { + address, + datacenter, + }), + Err(e) => { + warn!("Hostname resolution failed for {}: {}", hostname, &e); + None + } } - } - }); + }); let resolved: Vec<_> = futures::future::join_all(resolve_futures).await; initial_peers.extend(resolved.into_iter().flatten());