diff --git a/src/kafka-util/src/client.rs b/src/kafka-util/src/client.rs
index 1f1c43e05eba3..73971272e1d76 100644
--- a/src/kafka-util/src/client.rs
+++ b/src/kafka-util/src/client.rs
@@ -295,6 +295,9 @@ enum BrokerRewriteHandle {
     /// For _default_ ssh tunnels, we store an error if _creation_
     /// of the tunnel failed, so that `tunnel_status` can return it.
     FailedDefaultSshTunnel(String),
+    /// We store an error if DNS resolution fails when resolving
+    /// a new broker host.
+    FailedDNSResolution(String),
 }
 
 /// Tunneling clients
@@ -309,6 +312,24 @@ pub enum TunnelConfig {
     None,
 }
 
+/// Status of all active ssh tunnels and direct broker connections for a `TunnelingClientContext`.
+#[derive(Clone)]
+pub struct TunnelingClientStatus {
+    /// Status of all active ssh tunnels.
+    pub ssh_status: SshTunnelStatus,
+    /// Status of direct broker connections.
+    pub broker_status: BrokerStatus,
+}
+
+/// Status of direct broker connections for a `TunnelingClientContext`.
+#[derive(Clone)]
+pub enum BrokerStatus {
+    /// The broker connections are nominal.
+    Nominal,
+    /// At least one broker connection has failed.
+    Failed(String),
+}
+
 /// A client context that supports rewriting broker addresses.
 #[derive(Clone)]
 pub struct TunnelingClientContext<C> {
@@ -391,19 +412,22 @@ impl<C> TunnelingClientContext<C> {
         &self.inner
     }
 
-    /// Returns a _consolidated_ `SshTunnelStatus` that communicates the status
-    /// of all active ssh tunnels `self` knows about.
-    pub fn tunnel_status(&self) -> SshTunnelStatus {
-        self.rewrites
-            .lock()
-            .expect("poisoned")
+    /// Returns a `TunnelingClientStatus` that contains a _consolidated_ `SshTunnelStatus` to
+    /// communicates the status of all active ssh tunnels `self` knows about, and a `BrokerStatus`
+    /// that contains a _consolidated_ status of all direct broker connections.
+    pub fn tunnel_status(&self) -> TunnelingClientStatus {
+        let rewrites = self.rewrites.lock().expect("poisoned");
+
+        let ssh_status = rewrites
             .values()
             .map(|handle| match handle {
                 BrokerRewriteHandle::SshTunnel(s) => s.check_status(),
                 BrokerRewriteHandle::FailedDefaultSshTunnel(e) => {
                     SshTunnelStatus::Errored(e.clone())
                 }
-                BrokerRewriteHandle::Simple(_) => SshTunnelStatus::Running,
+                BrokerRewriteHandle::Simple(_) | BrokerRewriteHandle::FailedDNSResolution(_) => {
+                    SshTunnelStatus::Running
+                }
             })
             .fold(SshTunnelStatus::Running, |acc, status| {
                 match (acc, status) {
@@ -418,7 +442,27 @@ impl<C> TunnelingClientContext<C> {
                         SshTunnelStatus::Running
                     }
                 }
+            });
+
+        let broker_status = rewrites
+            .values()
+            .map(|handle| match handle {
+                BrokerRewriteHandle::FailedDNSResolution(e) => BrokerStatus::Failed(e.clone()),
+                _ => BrokerStatus::Nominal,
             })
+            .fold(BrokerStatus::Nominal, |acc, status| match (acc, status) {
+                (BrokerStatus::Nominal, BrokerStatus::Failed(e))
+                | (BrokerStatus::Failed(e), BrokerStatus::Nominal) => BrokerStatus::Failed(e),
+                (BrokerStatus::Failed(err), BrokerStatus::Failed(e)) => {
+                    BrokerStatus::Failed(format!("{}, {}", err, e))
+                }
+                (BrokerStatus::Nominal, BrokerStatus::Nominal) => BrokerStatus::Nominal,
+            });
+
+        TunnelingClientStatus {
+            ssh_status,
+            broker_status,
+        }
     }
 }
 
@@ -441,7 +485,8 @@ where
                         port: Some(addr.port()),
                     }
                 }
-                BrokerRewriteHandle::FailedDefaultSshTunnel(_) => {
+                BrokerRewriteHandle::FailedDefaultSshTunnel(_)
+                | BrokerRewriteHandle::FailedDNSResolution(_) => {
                     unreachable!()
                 }
             };
@@ -463,16 +508,30 @@ where
         let rewrite = self.rewrites.lock().expect("poisoned").get(&addr).cloned();
 
         match rewrite {
-            None | Some(BrokerRewriteHandle::FailedDefaultSshTunnel(_)) => {
+            None
+            | Some(BrokerRewriteHandle::FailedDefaultSshTunnel(_))
+            | Some(BrokerRewriteHandle::FailedDNSResolution(_)) => {
                 match &self.default_tunnel {
                     TunnelConfig::Ssh(default_tunnel) => {
                         // Multiple users could all run `connect` at the same time; only one ssh
                         // tunnel will ever be connected, and only one will be inserted into the
                         // map.
                         let ssh_tunnel = self.runtime.block_on(async {
+                            // Ensure the default tunnel host is resolved to an external address.
+                            let resolved_tunnel_addr = resolve_external_address(
+                                &default_tunnel.host,
+                                self.enforce_external_addresses,
+                            )
+                            .await?;
+                            let tunnel_config = SshTunnelConfig {
+                                host: resolved_tunnel_addr.to_string(),
+                                port: default_tunnel.port,
+                                user: default_tunnel.user.clone(),
+                                key_pair: default_tunnel.key_pair.clone(),
+                            };
                             self.ssh_tunnel_manager
                                 .connect(
-                                    default_tunnel.clone(),
+                                    tunnel_config,
                                     &addr.host,
                                     addr.port.parse().unwrap(),
                                     self.ssh_timeout_config,
@@ -487,6 +546,7 @@ where
                                         if matches!(
                                             o.get(),
                                             BrokerRewriteHandle::FailedDefaultSshTunnel(_)
+                                                | BrokerRewriteHandle::FailedDNSResolution(_)
                                         ) =>
                                     {
                                         o.insert(BrokerRewriteHandle::SshTunnel(
@@ -533,19 +593,42 @@ where
                     TunnelConfig::None => {
                         // If no rewrite is specified, we still should check that this potentially
                         // new broker address is a global address.
-                        let rewrite = self.runtime.block_on(async {
-                            let resolved = resolve_external_address(
+                        self.runtime.block_on(async {
+                            match resolve_external_address(
                                 &addr.host,
                                 self.enforce_external_addresses,
                             )
                             .await
-                            .unwrap();
-                            BrokerRewriteHandle::Simple(BrokerRewrite {
-                                host: resolved.to_string(),
-                                port: addr.port.parse().ok(),
-                            })
-                        });
-                        return_rewrite(&rewrite)
+                            {
+                                Ok(resolved) => {
+                                    let rewrite = BrokerRewriteHandle::Simple(BrokerRewrite {
+                                        host: resolved.to_string(),
+                                        port: addr.port.parse().ok(),
+                                    });
+                                    return_rewrite(&rewrite)
+                                }
+                                Err(e) => {
+                                    warn!(
+                                        "failed to resolve external address for {:?}: {}",
+                                        addr,
+                                        e.display_with_causes()
+                                    );
+                                    // Write an error if no one else has already written one.
+                                    let mut rewrites = self.rewrites.lock().expect("poisoned");
+                                    rewrites.entry(addr.clone()).or_insert_with(|| {
+                                        BrokerRewriteHandle::FailedDNSResolution(
+                                            e.to_string_with_causes(),
+                                        )
+                                    });
+                                    // We have to give rdkafka an address, as this callback can't fail.
+                                    BrokerAddr {
+                                        host: "failed-dns-resolution.dev.materialize.com"
+                                            .to_string(),
+                                        port: 1337.to_string(),
+                                    }
+                                }
+                            }
+                        })
                     }
                 }
             }
diff --git a/src/mysql-util/src/tunnel.rs b/src/mysql-util/src/tunnel.rs
index 468bba64ac138..ecbb1830b2c42 100644
--- a/src/mysql-util/src/tunnel.rs
+++ b/src/mysql-util/src/tunnel.rs
@@ -239,7 +239,7 @@ impl Config {
                             // the TLS hostname back to the actual upstream host and not the
                             // TCP hostname.
                             opts_builder = opts_builder.ssl_opts(Some(
-                                ssl_opts.clone().with_tls_hostname_override(Some(
+                                ssl_opts.clone().with_danger_tls_hostname_override(Some(
                                     self.inner.ip_or_hostname().to_string(),
                                 )),
                             ));
diff --git a/src/storage-types/src/connections.rs b/src/storage-types/src/connections.rs
index f7ee310db0a09..275fce03f4e38 100644
--- a/src/storage-types/src/connections.rs
+++ b/src/storage-types/src/connections.rs
@@ -547,8 +547,14 @@ impl KafkaConnection {
                     .await?;
                 let key_pair = SshKeyPair::from_bytes(&secret)?;
 
+                // Ensure any ssh-bastion address we connect to is resolved to an external address.
+                let resolved = resolve_external_address(
+                    &ssh_tunnel.connection.host,
+                    ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
+                )
+                .await?;
                 context.set_default_tunnel(TunnelConfig::Ssh(SshTunnelConfig {
-                    host: ssh_tunnel.connection.host.clone(),
+                    host: resolved.to_string(),
                     port: ssh_tunnel.connection.port,
                     user: ssh_tunnel.connection.user.clone(),
                     key_pair,
@@ -567,25 +573,21 @@ impl KafkaConnection {
             };
             match &broker.tunnel {
                 Tunnel::Direct => {
+                    // By default, don't override broker address lookup.
+                    //
+                    // N.B.
+                    //
                     // We _could_ pre-setup the default ssh tunnel for all known brokers here, but
                     // we avoid doing because:
                     // - Its not necessary.
                     // - Not doing so makes it easier to test the `FailedDefaultSshTunnel` path
                     // in the `TunnelingClientContext`.
-
-                    // Ensure any broker address we connect to is resolved to an external address.
-                    let resolved = resolve_external_address(
-                        &addr.host,
-                        ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
-                    )
-                    .await?;
-                    context.add_broker_rewrite(
-                        addr,
-                        BrokerRewrite {
-                            host: resolved.to_string(),
-                            port: None,
-                        },
-                    )
+                    //
+                    // NOTE that we do not need to use the `resolve_external_address` method to
+                    // validate the broker address here since it will be validated when the
+                    // connection is established in `src/kafka-util/src/client.rs`, and we do not
+                    // want to specify any BrokerRewrite that would override any default-tunnel
+                    // settings.
                 }
                 Tunnel::AwsPrivatelink(aws_privatelink) => {
                     let host = mz_cloud_resources::vpc_endpoint_host(
diff --git a/src/storage-types/src/errors.rs b/src/storage-types/src/errors.rs
index d21932a623258..a4270d6eac799 100644
--- a/src/storage-types/src/errors.rs
+++ b/src/storage-types/src/errors.rs
@@ -1026,7 +1026,7 @@ where
         cx: &TunnelingClientContext<C>,
     ) -> Result<T, ContextCreationError> {
         self.map_err(|e| {
-            if let SshTunnelStatus::Errored(e) = cx.tunnel_status() {
+            if let SshTunnelStatus::Errored(e) = cx.tunnel_status().ssh_status {
                 ContextCreationError::Ssh(anyhow!(e))
             } else {
                 ContextCreationError::from(e)
diff --git a/src/storage/src/source/kafka.rs b/src/storage/src/source/kafka.rs
index 4654757d1c00d..daea60f14768d 100644
--- a/src/storage/src/source/kafka.rs
+++ b/src/storage/src/source/kafka.rs
@@ -20,7 +20,9 @@ use chrono::NaiveDateTime;
 use differential_dataflow::{AsCollection, Collection};
 use futures::StreamExt;
 use maplit::btreemap;
-use mz_kafka_util::client::{get_partitions, MzClientContext, PartitionId, TunnelingClientContext};
+use mz_kafka_util::client::{
+    get_partitions, BrokerStatus, MzClientContext, PartitionId, TunnelingClientContext,
+};
 use mz_ore::error::ErrorExt;
 use mz_ore::thread::{JoinHandleExt, UnparkOnDropHandle};
 use mz_repr::adt::timestamp::CheckedTimestamp;
@@ -384,13 +386,29 @@ impl SourceRender for KafkaSourceConnection {
                                         "kafka metadata thread: updated partition metadata info",
                                     );
 
-                                    // Clear all the health namespaces we know about.
-                                    // Note that many kafka sources's don't have an ssh tunnel, but
-                                    // the `health_operator` handles this fine.
-                                    *status_report.lock().unwrap() = HealthStatus {
-                                        kafka: Some(HealthStatusUpdate::running()),
-                                        ssh: Some(HealthStatusUpdate::running()),
-                                    };
+                                    // Check to see if any broker errors have been hit
+                                    match consumer.client().context().tunnel_status().broker_status
+                                    {
+                                        BrokerStatus::Failed(err) => {
+                                            let status = HealthStatusUpdate::stalled(
+                                                format!("broker error: {}", err),
+                                                None,
+                                            );
+                                            *status_report.lock().unwrap() = HealthStatus {
+                                                kafka: Some(status),
+                                                ssh: None,
+                                            };
+                                        }
+                                        BrokerStatus::Nominal => {
+                                            // Clear all the health namespaces we know about.
+                                            // Note that many kafka sources's don't have an ssh tunnel, but
+                                            // the `health_operator` handles this fine.
+                                            *status_report.lock().unwrap() = HealthStatus {
+                                                kafka: Some(HealthStatusUpdate::running()),
+                                                ssh: Some(HealthStatusUpdate::running()),
+                                            };
+                                        }
+                                    }
                                 }
                                 Err(e) => {
                                     let kafka_status = Some(HealthStatusUpdate::stalled(
@@ -398,7 +416,8 @@ impl SourceRender for KafkaSourceConnection {
                                         None,
                                     ));
 
-                                    let ssh_status = consumer.client().context().tunnel_status();
+                                    let ssh_status =
+                                        consumer.client().context().tunnel_status().ssh_status;
                                     let ssh_status = match ssh_status {
                                         SshTunnelStatus::Running => {
                                             Some(HealthStatusUpdate::running())
diff --git a/test/kafka-auth/test-kafka-ssl.td b/test/kafka-auth/test-kafka-ssl.td
index 886471c34d08c..e0fd6c7d49f85 100644
--- a/test/kafka-auth/test-kafka-ssl.td
+++ b/test/kafka-auth/test-kafka-ssl.td
@@ -121,7 +121,7 @@ running
 # ALTER CONNECTION for Kafka + SSH
 
 ! ALTER CONNECTION testdrive_no_reset_connections.public.ssh SET (HOST = 'abcd') WITH (VALIDATE = true);
-contains:Could not resolve hostname abcd
+contains:failed to lookup address information: Name or service not known
 
 ! ALTER CONNECTION testdrive_no_reset_connections.public.ssh RESET (HOST);
 contains:HOST option is required
diff --git a/test/ssh-connection/kafka-source-after-ssh-failure-restart-replica.td b/test/ssh-connection/kafka-source-after-ssh-failure-restart-replica.td
index fb8f327289bb6..8a1aa0b5c6223 100644
--- a/test/ssh-connection/kafka-source-after-ssh-failure-restart-replica.td
+++ b/test/ssh-connection/kafka-source-after-ssh-failure-restart-replica.td
@@ -14,7 +14,6 @@
 
 > SELECT status FROM mz_internal.mz_source_statuses st
   JOIN mz_sources s ON st.id = s.id
-  WHERE error LIKE 'ssh:%'
   AND s.name in ('dynamic_text', 'fixed_text', 'fixed_plus_csr', 'dynamic_plus_csr')
 stalled
 stalled
diff --git a/test/ssh-connection/pg-source-after-ssh-failure.td b/test/ssh-connection/pg-source-after-ssh-failure.td
index 67442f72e93d8..1a9da9cd86538 100644
--- a/test/ssh-connection/pg-source-after-ssh-failure.td
+++ b/test/ssh-connection/pg-source-after-ssh-failure.td
@@ -11,5 +11,5 @@
 
 > SELECT status FROM mz_internal.mz_source_statuses st
   JOIN mz_sources s ON st.id = s.id
-  WHERE s.name = 'mz_source' AND error LIKE 'ssh:%'
+  WHERE s.name = 'mz_source'
 stalled
diff --git a/test/testdrive/connection-alter.td b/test/testdrive/connection-alter.td
index ecbea8a113221..93771b59bb6ff 100644
--- a/test/testdrive/connection-alter.td
+++ b/test/testdrive/connection-alter.td
@@ -48,7 +48,7 @@ first second
 contains:invalid ALTER CONNECTION: invalid CONNECTION: must set one of BROKER, BROKERS, or AWS PRIVATELINK
 
 ! ALTER CONNECTION conn SET (broker = 'abcd') WITH (validate = true);
-contains:Failed to resolve hostname
+contains:failed to lookup address information: Name or service not known
 
 > ALTER CONNECTION conn SET (broker = 'abcd') WITH (validate = false);
 
diff --git a/test/testdrive/connection-create-external.td b/test/testdrive/connection-create-external.td
index f13bf2e66ad29..e1b98520cdbed 100644
--- a/test/testdrive/connection-create-external.td
+++ b/test/testdrive/connection-create-external.td
@@ -18,7 +18,7 @@ ALTER SYSTEM SET enable_connection_validation_syntax = true
 ALTER SYSTEM SET storage_enforce_external_addresses = true
 
 ! CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT) WITH (VALIDATE = true)
-contains:address is not global
+contains:Failed to resolve hostname
 
 # Setup kafka topic with schema
 # must be a subset of the keys in the rows
diff --git a/test/testdrive/connection-validation.td b/test/testdrive/connection-validation.td
index 36db1e9876dbc..7692368ac1116 100644
--- a/test/testdrive/connection-validation.td
+++ b/test/testdrive/connection-validation.td
@@ -37,10 +37,10 @@ ALTER SYSTEM SET enable_connection_validation_syntax = true
 > CREATE CONNECTION invalid_tunnel TO SSH TUNNEL (HOST 'invalid', USER 'invalid', PORT 22)
 
 ! CREATE CONNECTION invalid_kafka_conn TO KAFKA (BROKERS ('${testdrive.kafka-addr}' USING SSH TUNNEL invalid_tunnel), SECURITY PROTOCOL PLAINTEXT)
-contains:failed to connect to the remote host
+contains:failed to lookup address information
 
 # Create the connection without validation and validate later
 > CREATE CONNECTION invalid_kafka_conn TO KAFKA (BROKERS ('${testdrive.kafka-addr}' USING SSH TUNNEL invalid_tunnel), SECURITY PROTOCOL PLAINTEXT) WITH (VALIDATE = false)
 
 ! VALIDATE CONNECTION invalid_kafka_conn
-contains:failed to connect to the remote host
+contains:failed to lookup address information
diff --git a/test/testdrive/kafka-source-errors.td b/test/testdrive/kafka-source-errors.td
index 9a848f89d44fd..adf328375e2f8 100644
--- a/test/testdrive/kafka-source-errors.td
+++ b/test/testdrive/kafka-source-errors.td
@@ -25,7 +25,7 @@ contains:Meta data fetch error: BrokerTransportFailure (Local: Broker transport
 
 ! CREATE CONNECTION fawlty_kafka_conn
   TO KAFKA (BROKER 'non-existent-broker:9092');
-contains:Failed to resolve hostname
+contains:failed to lookup address information: Name or service not known
 
 > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
     URL '${testdrive.schema-registry-url}'
@@ -34,7 +34,7 @@ contains:Failed to resolve hostname
 ! CREATE CONNECTION IF NOT EXISTS fawlty_csr_conn TO CONFLUENT SCHEMA REGISTRY (
     URL 'http://non-existent-csr:8081'
   );
-contains:failed to lookup address information
+contains:failed to lookup address information: Name or service not known
 
 # Check that for all tables clause is rejected
 ! CREATE SOURCE bad_definition1