From a6e44492574529bd37bd495a92fa7acaaef2f909 Mon Sep 17 00:00:00 2001 From: Roshan Jobanputra Date: Fri, 12 Apr 2024 11:12:27 -0400 Subject: [PATCH] Revert kafka client changes until rdkafka can be updated --- src/kafka-util/src/client.rs | 112 ++----------------- src/storage-types/src/connections.rs | 12 +- src/storage-types/src/errors.rs | 2 +- src/storage/src/source/kafka.rs | 37 ++---- test/testdrive/connection-create-external.td | 3 - test/testdrive/kafka-source-errors.td | 2 +- 6 files changed, 25 insertions(+), 143 deletions(-) diff --git a/src/kafka-util/src/client.rs b/src/kafka-util/src/client.rs index f44ae6c04ea6d..621bddebffd76 100644 --- a/src/kafka-util/src/client.rs +++ b/src/kafka-util/src/client.rs @@ -23,7 +23,6 @@ use crossbeam::channel::{unbounded, Receiver, Sender}; use mz_ore::collections::CollectionExt; use mz_ore::error::ErrorExt; use mz_ore::future::InTask; -use mz_ore::netio::resolve_address; use mz_ssh_util::tunnel::{SshTimeoutConfig, SshTunnelConfig, SshTunnelStatus}; use mz_ssh_util::tunnel_manager::{ManagedSshTunnelHandle, SshTunnelManager}; use rdkafka::client::{BrokerAddr, Client, NativeClient, OAuthToken}; @@ -296,9 +295,6 @@ enum BrokerRewriteHandle { /// For _default_ ssh tunnels, we store an error if _creation_ /// of the tunnel failed, so that `tunnel_status` can return it. FailedDefaultSshTunnel(String), - /// We store an error if DNS resolution fails when resolving - /// a new broker host. - FailedDnsResolution(String), } /// Tunneling clients @@ -313,24 +309,6 @@ pub enum TunnelConfig { None, } -/// Status of all active ssh tunnels and direct broker connections for a `TunnelingClientContext`. -#[derive(Clone)] -pub struct TunnelingClientStatus { - /// Status of all active ssh tunnels. - pub ssh_status: SshTunnelStatus, - /// Status of direct broker connections. - pub broker_status: BrokerStatus, -} - -/// Status of direct broker connections for a `TunnelingClientContext`. -#[derive(Clone)] -pub enum BrokerStatus { - /// The broker connections are nominal. - Nominal, - /// At least one broker connection has failed. - Failed(String), -} - /// A client context that supports rewriting broker addresses. #[derive(Clone)] pub struct TunnelingClientContext { @@ -341,7 +319,6 @@ pub struct TunnelingClientContext { ssh_tunnel_manager: SshTunnelManager, ssh_timeout_config: SshTimeoutConfig, runtime: Handle, - enforce_external_addresses: bool, } impl TunnelingClientContext { @@ -352,7 +329,6 @@ impl TunnelingClientContext { ssh_tunnel_manager: SshTunnelManager, ssh_timeout_config: SshTimeoutConfig, in_task: InTask, - enforce_external_addresses: bool, ) -> TunnelingClientContext { TunnelingClientContext { inner, @@ -362,7 +338,6 @@ impl TunnelingClientContext { ssh_tunnel_manager, ssh_timeout_config, runtime, - enforce_external_addresses, } } @@ -417,22 +392,19 @@ impl TunnelingClientContext { &self.inner } - /// Returns a `TunnelingClientStatus` that contains a _consolidated_ `SshTunnelStatus` to - /// communicates the status of all active ssh tunnels `self` knows about, and a `BrokerStatus` - /// that contains a _consolidated_ status of all direct broker connections. - pub fn tunnel_status(&self) -> TunnelingClientStatus { - let rewrites = self.rewrites.lock().expect("poisoned"); - - let ssh_status = rewrites + /// Returns a _consolidated_ `SshTunnelStatus` that communicates the status + /// of all active ssh tunnels `self` knows about. + pub fn tunnel_status(&self) -> SshTunnelStatus { + self.rewrites + .lock() + .expect("poisoned") .values() .map(|handle| match handle { BrokerRewriteHandle::SshTunnel(s) => s.check_status(), BrokerRewriteHandle::FailedDefaultSshTunnel(e) => { SshTunnelStatus::Errored(e.clone()) } - BrokerRewriteHandle::Simple(_) | BrokerRewriteHandle::FailedDnsResolution(_) => { - SshTunnelStatus::Running - } + BrokerRewriteHandle::Simple(_) => SshTunnelStatus::Running, }) .fold(SshTunnelStatus::Running, |acc, status| { match (acc, status) { @@ -447,27 +419,7 @@ impl TunnelingClientContext { SshTunnelStatus::Running } } - }); - - let broker_status = rewrites - .values() - .map(|handle| match handle { - BrokerRewriteHandle::FailedDnsResolution(e) => BrokerStatus::Failed(e.clone()), - _ => BrokerStatus::Nominal, }) - .fold(BrokerStatus::Nominal, |acc, status| match (acc, status) { - (BrokerStatus::Nominal, BrokerStatus::Failed(e)) - | (BrokerStatus::Failed(e), BrokerStatus::Nominal) => BrokerStatus::Failed(e), - (BrokerStatus::Failed(err), BrokerStatus::Failed(e)) => { - BrokerStatus::Failed(format!("{}, {}", err, e)) - } - (BrokerStatus::Nominal, BrokerStatus::Nominal) => BrokerStatus::Nominal, - }); - - TunnelingClientStatus { - ssh_status, - broker_status, - } } } @@ -490,8 +442,7 @@ where port: Some(addr.port()), } } - BrokerRewriteHandle::FailedDefaultSshTunnel(_) - | BrokerRewriteHandle::FailedDnsResolution(_) => { + BrokerRewriteHandle::FailedDefaultSshTunnel(_) => { unreachable!() } }; @@ -513,9 +464,7 @@ where let rewrite = self.rewrites.lock().expect("poisoned").get(&addr).cloned(); match rewrite { - None - | Some(BrokerRewriteHandle::FailedDefaultSshTunnel(_)) - | Some(BrokerRewriteHandle::FailedDnsResolution(_)) => { + None | Some(BrokerRewriteHandle::FailedDefaultSshTunnel(_)) => { match &self.default_tunnel { TunnelConfig::Ssh(default_tunnel) => { // Multiple users could all run `connect` at the same time; only one ssh @@ -524,8 +473,6 @@ where let ssh_tunnel = self.runtime.block_on(async { self.ssh_tunnel_manager .connect( - // We know the default_tunnel has already been validated by the `resolve_address` - // method when it was provided to the client, so we don't need to check it again. default_tunnel.clone(), &addr.host, addr.port.parse().unwrap(), @@ -542,7 +489,6 @@ where if matches!( o.get(), BrokerRewriteHandle::FailedDefaultSshTunnel(_) - | BrokerRewriteHandle::FailedDnsResolution(_) ) => { o.insert(BrokerRewriteHandle::SshTunnel( @@ -586,45 +532,7 @@ where host: host.to_owned(), port: addr.port, }, - TunnelConfig::None => { - // If no rewrite is specified, we still should check that this potentially - // new broker address is a global address. - self.runtime.block_on(async { - match resolve_address(&addr.host, self.enforce_external_addresses).await - { - Ok(resolved) => { - let rewrite = BrokerRewriteHandle::Simple(BrokerRewrite { - // `resolve_address` will always return at least one address. - // TODO: Once we have a way to provide multiple hosts to rdkafka, we should - // return all resolved addresses here. - host: resolved.first().unwrap().to_string(), - port: addr.port.parse().ok(), - }); - return_rewrite(&rewrite) - } - Err(e) => { - warn!( - "failed to resolve external address for {:?}: {}", - addr, - e.display_with_causes() - ); - // Write an error if no one else has already written one. - let mut rewrites = self.rewrites.lock().expect("poisoned"); - rewrites.entry(addr.clone()).or_insert_with(|| { - BrokerRewriteHandle::FailedDnsResolution( - e.to_string_with_causes(), - ) - }); - // We have to give rdkafka an address, as this callback can't fail. - BrokerAddr { - host: "failed-dns-resolution.dev.materialize.com" - .to_string(), - port: 1337.to_string(), - } - } - } - }) - } + TunnelConfig::None => addr, } } Some(rewrite) => return_rewrite(&rewrite), diff --git a/src/storage-types/src/connections.rs b/src/storage-types/src/connections.rs index f221c6d486811..e9dcf5cdbd973 100644 --- a/src/storage-types/src/connections.rs +++ b/src/storage-types/src/connections.rs @@ -52,7 +52,7 @@ use url::Url; use crate::configuration::StorageConfiguration; use crate::connections::aws::{AwsConnection, AwsConnectionValidationError}; use crate::controller::AlterError; -use crate::dyncfgs::{KAFKA_CLIENT_ID_ENRICHMENT_RULES, ENFORCE_EXTERNAL_ADDRESSES}; +use crate::dyncfgs::{ENFORCE_EXTERNAL_ADDRESSES, KAFKA_CLIENT_ID_ENRICHMENT_RULES}; use crate::errors::{ContextCreationError, CsrConnectError}; use crate::AlterCompatible; @@ -662,6 +662,9 @@ impl KafkaConnection { config.set(*k, v); } + // TODO(roshan): Implement enforcement of external address validation once + // rdkafka client has been updated to support providing multiple resolved + // addresses for brokers let mut context = TunnelingClientContext::new( context, Handle::current(), @@ -671,7 +674,6 @@ impl KafkaConnection { .clone(), storage_configuration.parameters.ssh_timeout_config, in_task, - ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), ); match &self.default_tunnel { @@ -727,12 +729,6 @@ impl KafkaConnection { // - Its not necessary. // - Not doing so makes it easier to test the `FailedDefaultSshTunnel` path // in the `TunnelingClientContext`. - // - // NOTE that we do not need to use the `resolve_address` method to - // validate the broker address here since it will be validated when the - // connection is established in `src/kafka-util/src/client.rs`, and we do not - // want to specify any BrokerRewrite that would override any default-tunnel - // settings. } Tunnel::AwsPrivatelink(aws_privatelink) => { let host = mz_cloud_resources::vpc_endpoint_host( diff --git a/src/storage-types/src/errors.rs b/src/storage-types/src/errors.rs index a4270d6eac799..d21932a623258 100644 --- a/src/storage-types/src/errors.rs +++ b/src/storage-types/src/errors.rs @@ -1026,7 +1026,7 @@ where cx: &TunnelingClientContext, ) -> Result { self.map_err(|e| { - if let SshTunnelStatus::Errored(e) = cx.tunnel_status().ssh_status { + if let SshTunnelStatus::Errored(e) = cx.tunnel_status() { ContextCreationError::Ssh(anyhow!(e)) } else { ContextCreationError::from(e) diff --git a/src/storage/src/source/kafka.rs b/src/storage/src/source/kafka.rs index 9afe8dd60221d..db33fd741f4bb 100644 --- a/src/storage/src/source/kafka.rs +++ b/src/storage/src/source/kafka.rs @@ -20,9 +20,7 @@ use chrono::{DateTime, NaiveDateTime}; use differential_dataflow::{AsCollection, Collection}; use futures::StreamExt; use maplit::btreemap; -use mz_kafka_util::client::{ - get_partitions, BrokerStatus, MzClientContext, PartitionId, TunnelingClientContext, -}; +use mz_kafka_util::client::{get_partitions, MzClientContext, PartitionId, TunnelingClientContext}; use mz_ore::error::ErrorExt; use mz_ore::future::InTask; use mz_ore::thread::{JoinHandleExt, UnparkOnDropHandle}; @@ -392,29 +390,13 @@ impl SourceRender for KafkaSourceConnection { "kafka metadata thread: updated partition metadata info", ); - // Check to see if any broker errors have been hit - match consumer.client().context().tunnel_status().broker_status - { - BrokerStatus::Failed(err) => { - let status = HealthStatusUpdate::stalled( - format!("broker error: {}", err), - None, - ); - *status_report.lock().unwrap() = HealthStatus { - kafka: Some(status), - ssh: None, - }; - } - BrokerStatus::Nominal => { - // Clear all the health namespaces we know about. - // Note that many kafka sources's don't have an ssh tunnel, but - // the `health_operator` handles this fine. - *status_report.lock().unwrap() = HealthStatus { - kafka: Some(HealthStatusUpdate::running()), - ssh: Some(HealthStatusUpdate::running()), - }; - } - } + // Clear all the health namespaces we know about. + // Note that many kafka sources's don't have an ssh tunnel, but + // the `health_operator` handles this fine. + *status_report.lock().unwrap() = HealthStatus { + kafka: Some(HealthStatusUpdate::running()), + ssh: Some(HealthStatusUpdate::running()), + }; } Err(e) => { let kafka_status = Some(HealthStatusUpdate::stalled( @@ -422,8 +404,7 @@ impl SourceRender for KafkaSourceConnection { None, )); - let ssh_status = - consumer.client().context().tunnel_status().ssh_status; + let ssh_status = consumer.client().context().tunnel_status(); let ssh_status = match ssh_status { SshTunnelStatus::Running => { Some(HealthStatusUpdate::running()) diff --git a/test/testdrive/connection-create-external.td b/test/testdrive/connection-create-external.td index e1b98520cdbed..7b1cebfbe61d3 100644 --- a/test/testdrive/connection-create-external.td +++ b/test/testdrive/connection-create-external.td @@ -17,9 +17,6 @@ $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.mater ALTER SYSTEM SET enable_connection_validation_syntax = true ALTER SYSTEM SET storage_enforce_external_addresses = true -! CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT) WITH (VALIDATE = true) -contains:Failed to resolve hostname - # Setup kafka topic with schema # must be a subset of the keys in the rows $ set keyschema={ diff --git a/test/testdrive/kafka-source-errors.td b/test/testdrive/kafka-source-errors.td index fd89a44104efa..9a848f89d44fd 100644 --- a/test/testdrive/kafka-source-errors.td +++ b/test/testdrive/kafka-source-errors.td @@ -34,7 +34,7 @@ contains:Failed to resolve hostname ! CREATE CONNECTION IF NOT EXISTS fawlty_csr_conn TO CONFLUENT SCHEMA REGISTRY ( URL 'http://non-existent-csr:8081' ); -contains:failed to lookup address information: Name or service not known +contains:failed to lookup address information # Check that for all tables clause is rejected ! CREATE SOURCE bad_definition1