From 8a4deb01d8adee6e0d26592d2f53068a85b642b1 Mon Sep 17 00:00:00 2001 From: Roshan Jobanputra Date: Thu, 21 Mar 2024 13:42:13 -0400 Subject: [PATCH] Fix other error checks to use dns error response; properly report errors from kafka client dns resolution --- src/kafka-util/src/client.rs | 121 +++++++++++++++--- src/mysql-util/src/tunnel.rs | 2 +- src/storage-types/src/connections.rs | 32 ++--- src/storage-types/src/errors.rs | 2 +- src/storage/src/source/kafka.rs | 37 ++++-- test/kafka-auth/test-kafka-ssl.td | 2 +- ...ource-after-ssh-failure-restart-replica.td | 1 - .../pg-source-after-ssh-failure.td | 2 +- test/testdrive/connection-alter.td | 2 +- test/testdrive/connection-create-external.td | 2 +- test/testdrive/connection-validation.td | 4 +- test/testdrive/kafka-source-errors.td | 4 +- 12 files changed, 157 insertions(+), 54 deletions(-) diff --git a/src/kafka-util/src/client.rs b/src/kafka-util/src/client.rs index 1f1c43e05eba3..73971272e1d76 100644 --- a/src/kafka-util/src/client.rs +++ b/src/kafka-util/src/client.rs @@ -295,6 +295,9 @@ enum BrokerRewriteHandle { /// For _default_ ssh tunnels, we store an error if _creation_ /// of the tunnel failed, so that `tunnel_status` can return it. FailedDefaultSshTunnel(String), + /// We store an error if DNS resolution fails when resolving + /// a new broker host. + FailedDNSResolution(String), } /// Tunneling clients @@ -309,6 +312,24 @@ pub enum TunnelConfig { None, } +/// Status of all active ssh tunnels and direct broker connections for a `TunnelingClientContext`. +#[derive(Clone)] +pub struct TunnelingClientStatus { + /// Status of all active ssh tunnels. + pub ssh_status: SshTunnelStatus, + /// Status of direct broker connections. + pub broker_status: BrokerStatus, +} + +/// Status of direct broker connections for a `TunnelingClientContext`. +#[derive(Clone)] +pub enum BrokerStatus { + /// The broker connections are nominal. + Nominal, + /// At least one broker connection has failed. + Failed(String), +} + /// A client context that supports rewriting broker addresses. #[derive(Clone)] pub struct TunnelingClientContext { @@ -391,19 +412,22 @@ impl TunnelingClientContext { &self.inner } - /// Returns a _consolidated_ `SshTunnelStatus` that communicates the status - /// of all active ssh tunnels `self` knows about. - pub fn tunnel_status(&self) -> SshTunnelStatus { - self.rewrites - .lock() - .expect("poisoned") + /// Returns a `TunnelingClientStatus` that contains a _consolidated_ `SshTunnelStatus` to + /// communicates the status of all active ssh tunnels `self` knows about, and a `BrokerStatus` + /// that contains a _consolidated_ status of all direct broker connections. + pub fn tunnel_status(&self) -> TunnelingClientStatus { + let rewrites = self.rewrites.lock().expect("poisoned"); + + let ssh_status = rewrites .values() .map(|handle| match handle { BrokerRewriteHandle::SshTunnel(s) => s.check_status(), BrokerRewriteHandle::FailedDefaultSshTunnel(e) => { SshTunnelStatus::Errored(e.clone()) } - BrokerRewriteHandle::Simple(_) => SshTunnelStatus::Running, + BrokerRewriteHandle::Simple(_) | BrokerRewriteHandle::FailedDNSResolution(_) => { + SshTunnelStatus::Running + } }) .fold(SshTunnelStatus::Running, |acc, status| { match (acc, status) { @@ -418,7 +442,27 @@ impl TunnelingClientContext { SshTunnelStatus::Running } } + }); + + let broker_status = rewrites + .values() + .map(|handle| match handle { + BrokerRewriteHandle::FailedDNSResolution(e) => BrokerStatus::Failed(e.clone()), + _ => BrokerStatus::Nominal, }) + .fold(BrokerStatus::Nominal, |acc, status| match (acc, status) { + (BrokerStatus::Nominal, BrokerStatus::Failed(e)) + | (BrokerStatus::Failed(e), BrokerStatus::Nominal) => BrokerStatus::Failed(e), + (BrokerStatus::Failed(err), BrokerStatus::Failed(e)) => { + BrokerStatus::Failed(format!("{}, {}", err, e)) + } + (BrokerStatus::Nominal, BrokerStatus::Nominal) => BrokerStatus::Nominal, + }); + + TunnelingClientStatus { + ssh_status, + broker_status, + } } } @@ -441,7 +485,8 @@ where port: Some(addr.port()), } } - BrokerRewriteHandle::FailedDefaultSshTunnel(_) => { + BrokerRewriteHandle::FailedDefaultSshTunnel(_) + | BrokerRewriteHandle::FailedDNSResolution(_) => { unreachable!() } }; @@ -463,16 +508,30 @@ where let rewrite = self.rewrites.lock().expect("poisoned").get(&addr).cloned(); match rewrite { - None | Some(BrokerRewriteHandle::FailedDefaultSshTunnel(_)) => { + None + | Some(BrokerRewriteHandle::FailedDefaultSshTunnel(_)) + | Some(BrokerRewriteHandle::FailedDNSResolution(_)) => { match &self.default_tunnel { TunnelConfig::Ssh(default_tunnel) => { // Multiple users could all run `connect` at the same time; only one ssh // tunnel will ever be connected, and only one will be inserted into the // map. let ssh_tunnel = self.runtime.block_on(async { + // Ensure the default tunnel host is resolved to an external address. + let resolved_tunnel_addr = resolve_external_address( + &default_tunnel.host, + self.enforce_external_addresses, + ) + .await?; + let tunnel_config = SshTunnelConfig { + host: resolved_tunnel_addr.to_string(), + port: default_tunnel.port, + user: default_tunnel.user.clone(), + key_pair: default_tunnel.key_pair.clone(), + }; self.ssh_tunnel_manager .connect( - default_tunnel.clone(), + tunnel_config, &addr.host, addr.port.parse().unwrap(), self.ssh_timeout_config, @@ -487,6 +546,7 @@ where if matches!( o.get(), BrokerRewriteHandle::FailedDefaultSshTunnel(_) + | BrokerRewriteHandle::FailedDNSResolution(_) ) => { o.insert(BrokerRewriteHandle::SshTunnel( @@ -533,19 +593,42 @@ where TunnelConfig::None => { // If no rewrite is specified, we still should check that this potentially // new broker address is a global address. - let rewrite = self.runtime.block_on(async { - let resolved = resolve_external_address( + self.runtime.block_on(async { + match resolve_external_address( &addr.host, self.enforce_external_addresses, ) .await - .unwrap(); - BrokerRewriteHandle::Simple(BrokerRewrite { - host: resolved.to_string(), - port: addr.port.parse().ok(), - }) - }); - return_rewrite(&rewrite) + { + Ok(resolved) => { + let rewrite = BrokerRewriteHandle::Simple(BrokerRewrite { + host: resolved.to_string(), + port: addr.port.parse().ok(), + }); + return_rewrite(&rewrite) + } + Err(e) => { + warn!( + "failed to resolve external address for {:?}: {}", + addr, + e.display_with_causes() + ); + // Write an error if no one else has already written one. + let mut rewrites = self.rewrites.lock().expect("poisoned"); + rewrites.entry(addr.clone()).or_insert_with(|| { + BrokerRewriteHandle::FailedDNSResolution( + e.to_string_with_causes(), + ) + }); + // We have to give rdkafka an address, as this callback can't fail. + BrokerAddr { + host: "failed-dns-resolution.dev.materialize.com" + .to_string(), + port: 1337.to_string(), + } + } + } + }) } } } diff --git a/src/mysql-util/src/tunnel.rs b/src/mysql-util/src/tunnel.rs index 468bba64ac138..ecbb1830b2c42 100644 --- a/src/mysql-util/src/tunnel.rs +++ b/src/mysql-util/src/tunnel.rs @@ -239,7 +239,7 @@ impl Config { // the TLS hostname back to the actual upstream host and not the // TCP hostname. opts_builder = opts_builder.ssl_opts(Some( - ssl_opts.clone().with_tls_hostname_override(Some( + ssl_opts.clone().with_danger_tls_hostname_override(Some( self.inner.ip_or_hostname().to_string(), )), )); diff --git a/src/storage-types/src/connections.rs b/src/storage-types/src/connections.rs index f7ee310db0a09..275fce03f4e38 100644 --- a/src/storage-types/src/connections.rs +++ b/src/storage-types/src/connections.rs @@ -547,8 +547,14 @@ impl KafkaConnection { .await?; let key_pair = SshKeyPair::from_bytes(&secret)?; + // Ensure any ssh-bastion address we connect to is resolved to an external address. + let resolved = resolve_external_address( + &ssh_tunnel.connection.host, + ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), + ) + .await?; context.set_default_tunnel(TunnelConfig::Ssh(SshTunnelConfig { - host: ssh_tunnel.connection.host.clone(), + host: resolved.to_string(), port: ssh_tunnel.connection.port, user: ssh_tunnel.connection.user.clone(), key_pair, @@ -567,25 +573,21 @@ impl KafkaConnection { }; match &broker.tunnel { Tunnel::Direct => { + // By default, don't override broker address lookup. + // + // N.B. + // // We _could_ pre-setup the default ssh tunnel for all known brokers here, but // we avoid doing because: // - Its not necessary. // - Not doing so makes it easier to test the `FailedDefaultSshTunnel` path // in the `TunnelingClientContext`. - - // Ensure any broker address we connect to is resolved to an external address. - let resolved = resolve_external_address( - &addr.host, - ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), - ) - .await?; - context.add_broker_rewrite( - addr, - BrokerRewrite { - host: resolved.to_string(), - port: None, - }, - ) + // + // NOTE that we do not need to use the `resolve_external_address` method to + // validate the broker address here since it will be validated when the + // connection is established in `src/kafka-util/src/client.rs`, and we do not + // want to specify any BrokerRewrite that would override any default-tunnel + // settings. } Tunnel::AwsPrivatelink(aws_privatelink) => { let host = mz_cloud_resources::vpc_endpoint_host( diff --git a/src/storage-types/src/errors.rs b/src/storage-types/src/errors.rs index d21932a623258..a4270d6eac799 100644 --- a/src/storage-types/src/errors.rs +++ b/src/storage-types/src/errors.rs @@ -1026,7 +1026,7 @@ where cx: &TunnelingClientContext, ) -> Result { self.map_err(|e| { - if let SshTunnelStatus::Errored(e) = cx.tunnel_status() { + if let SshTunnelStatus::Errored(e) = cx.tunnel_status().ssh_status { ContextCreationError::Ssh(anyhow!(e)) } else { ContextCreationError::from(e) diff --git a/src/storage/src/source/kafka.rs b/src/storage/src/source/kafka.rs index 4654757d1c00d..daea60f14768d 100644 --- a/src/storage/src/source/kafka.rs +++ b/src/storage/src/source/kafka.rs @@ -20,7 +20,9 @@ use chrono::NaiveDateTime; use differential_dataflow::{AsCollection, Collection}; use futures::StreamExt; use maplit::btreemap; -use mz_kafka_util::client::{get_partitions, MzClientContext, PartitionId, TunnelingClientContext}; +use mz_kafka_util::client::{ + get_partitions, BrokerStatus, MzClientContext, PartitionId, TunnelingClientContext, +}; use mz_ore::error::ErrorExt; use mz_ore::thread::{JoinHandleExt, UnparkOnDropHandle}; use mz_repr::adt::timestamp::CheckedTimestamp; @@ -384,13 +386,29 @@ impl SourceRender for KafkaSourceConnection { "kafka metadata thread: updated partition metadata info", ); - // Clear all the health namespaces we know about. - // Note that many kafka sources's don't have an ssh tunnel, but - // the `health_operator` handles this fine. - *status_report.lock().unwrap() = HealthStatus { - kafka: Some(HealthStatusUpdate::running()), - ssh: Some(HealthStatusUpdate::running()), - }; + // Check to see if any broker errors have been hit + match consumer.client().context().tunnel_status().broker_status + { + BrokerStatus::Failed(err) => { + let status = HealthStatusUpdate::stalled( + format!("broker error: {}", err), + None, + ); + *status_report.lock().unwrap() = HealthStatus { + kafka: Some(status), + ssh: None, + }; + } + BrokerStatus::Nominal => { + // Clear all the health namespaces we know about. + // Note that many kafka sources's don't have an ssh tunnel, but + // the `health_operator` handles this fine. + *status_report.lock().unwrap() = HealthStatus { + kafka: Some(HealthStatusUpdate::running()), + ssh: Some(HealthStatusUpdate::running()), + }; + } + } } Err(e) => { let kafka_status = Some(HealthStatusUpdate::stalled( @@ -398,7 +416,8 @@ impl SourceRender for KafkaSourceConnection { None, )); - let ssh_status = consumer.client().context().tunnel_status(); + let ssh_status = + consumer.client().context().tunnel_status().ssh_status; let ssh_status = match ssh_status { SshTunnelStatus::Running => { Some(HealthStatusUpdate::running()) diff --git a/test/kafka-auth/test-kafka-ssl.td b/test/kafka-auth/test-kafka-ssl.td index 886471c34d08c..e0fd6c7d49f85 100644 --- a/test/kafka-auth/test-kafka-ssl.td +++ b/test/kafka-auth/test-kafka-ssl.td @@ -121,7 +121,7 @@ running # ALTER CONNECTION for Kafka + SSH ! ALTER CONNECTION testdrive_no_reset_connections.public.ssh SET (HOST = 'abcd') WITH (VALIDATE = true); -contains:Could not resolve hostname abcd +contains:failed to lookup address information: Name or service not known ! ALTER CONNECTION testdrive_no_reset_connections.public.ssh RESET (HOST); contains:HOST option is required diff --git a/test/ssh-connection/kafka-source-after-ssh-failure-restart-replica.td b/test/ssh-connection/kafka-source-after-ssh-failure-restart-replica.td index fb8f327289bb6..8a1aa0b5c6223 100644 --- a/test/ssh-connection/kafka-source-after-ssh-failure-restart-replica.td +++ b/test/ssh-connection/kafka-source-after-ssh-failure-restart-replica.td @@ -14,7 +14,6 @@ > SELECT status FROM mz_internal.mz_source_statuses st JOIN mz_sources s ON st.id = s.id - WHERE error LIKE 'ssh:%' AND s.name in ('dynamic_text', 'fixed_text', 'fixed_plus_csr', 'dynamic_plus_csr') stalled stalled diff --git a/test/ssh-connection/pg-source-after-ssh-failure.td b/test/ssh-connection/pg-source-after-ssh-failure.td index 67442f72e93d8..1a9da9cd86538 100644 --- a/test/ssh-connection/pg-source-after-ssh-failure.td +++ b/test/ssh-connection/pg-source-after-ssh-failure.td @@ -11,5 +11,5 @@ > SELECT status FROM mz_internal.mz_source_statuses st JOIN mz_sources s ON st.id = s.id - WHERE s.name = 'mz_source' AND error LIKE 'ssh:%' + WHERE s.name = 'mz_source' stalled diff --git a/test/testdrive/connection-alter.td b/test/testdrive/connection-alter.td index ecbea8a113221..93771b59bb6ff 100644 --- a/test/testdrive/connection-alter.td +++ b/test/testdrive/connection-alter.td @@ -48,7 +48,7 @@ first second contains:invalid ALTER CONNECTION: invalid CONNECTION: must set one of BROKER, BROKERS, or AWS PRIVATELINK ! ALTER CONNECTION conn SET (broker = 'abcd') WITH (validate = true); -contains:Failed to resolve hostname +contains:failed to lookup address information: Name or service not known > ALTER CONNECTION conn SET (broker = 'abcd') WITH (validate = false); diff --git a/test/testdrive/connection-create-external.td b/test/testdrive/connection-create-external.td index f13bf2e66ad29..e1b98520cdbed 100644 --- a/test/testdrive/connection-create-external.td +++ b/test/testdrive/connection-create-external.td @@ -18,7 +18,7 @@ ALTER SYSTEM SET enable_connection_validation_syntax = true ALTER SYSTEM SET storage_enforce_external_addresses = true ! CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT) WITH (VALIDATE = true) -contains:address is not global +contains:Failed to resolve hostname # Setup kafka topic with schema # must be a subset of the keys in the rows diff --git a/test/testdrive/connection-validation.td b/test/testdrive/connection-validation.td index 36db1e9876dbc..7692368ac1116 100644 --- a/test/testdrive/connection-validation.td +++ b/test/testdrive/connection-validation.td @@ -37,10 +37,10 @@ ALTER SYSTEM SET enable_connection_validation_syntax = true > CREATE CONNECTION invalid_tunnel TO SSH TUNNEL (HOST 'invalid', USER 'invalid', PORT 22) ! CREATE CONNECTION invalid_kafka_conn TO KAFKA (BROKERS ('${testdrive.kafka-addr}' USING SSH TUNNEL invalid_tunnel), SECURITY PROTOCOL PLAINTEXT) -contains:failed to connect to the remote host +contains:failed to lookup address information # Create the connection without validation and validate later > CREATE CONNECTION invalid_kafka_conn TO KAFKA (BROKERS ('${testdrive.kafka-addr}' USING SSH TUNNEL invalid_tunnel), SECURITY PROTOCOL PLAINTEXT) WITH (VALIDATE = false) ! VALIDATE CONNECTION invalid_kafka_conn -contains:failed to connect to the remote host +contains:failed to lookup address information diff --git a/test/testdrive/kafka-source-errors.td b/test/testdrive/kafka-source-errors.td index 9a848f89d44fd..adf328375e2f8 100644 --- a/test/testdrive/kafka-source-errors.td +++ b/test/testdrive/kafka-source-errors.td @@ -25,7 +25,7 @@ contains:Meta data fetch error: BrokerTransportFailure (Local: Broker transport ! CREATE CONNECTION fawlty_kafka_conn TO KAFKA (BROKER 'non-existent-broker:9092'); -contains:Failed to resolve hostname +contains:failed to lookup address information: Name or service not known > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY ( URL '${testdrive.schema-registry-url}' @@ -34,7 +34,7 @@ contains:Failed to resolve hostname ! CREATE CONNECTION IF NOT EXISTS fawlty_csr_conn TO CONFLUENT SCHEMA REGISTRY ( URL 'http://non-existent-csr:8081' ); -contains:failed to lookup address information +contains:failed to lookup address information: Name or service not known # Check that for all tables clause is rejected ! CREATE SOURCE bad_definition1