Skip to content

Commit

Permalink
Revert kafka client changes until rdkafka can be updated
Browse files Browse the repository at this point in the history
  • Loading branch information
rjobanp committed Apr 15, 2024
1 parent 789c8d1 commit a6e4449
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 143 deletions.
112 changes: 10 additions & 102 deletions src/kafka-util/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use crossbeam::channel::{unbounded, Receiver, Sender};
use mz_ore::collections::CollectionExt;
use mz_ore::error::ErrorExt;
use mz_ore::future::InTask;
use mz_ore::netio::resolve_address;
use mz_ssh_util::tunnel::{SshTimeoutConfig, SshTunnelConfig, SshTunnelStatus};
use mz_ssh_util::tunnel_manager::{ManagedSshTunnelHandle, SshTunnelManager};
use rdkafka::client::{BrokerAddr, Client, NativeClient, OAuthToken};
Expand Down Expand Up @@ -296,9 +295,6 @@ enum BrokerRewriteHandle {
/// For _default_ ssh tunnels, we store an error if _creation_
/// of the tunnel failed, so that `tunnel_status` can return it.
FailedDefaultSshTunnel(String),
/// We store an error if DNS resolution fails when resolving
/// a new broker host.
FailedDnsResolution(String),
}

/// Tunneling clients
Expand All @@ -313,24 +309,6 @@ pub enum TunnelConfig {
None,
}

/// Status of all active ssh tunnels and direct broker connections for a `TunnelingClientContext`.
#[derive(Clone)]
pub struct TunnelingClientStatus {
/// Status of all active ssh tunnels.
pub ssh_status: SshTunnelStatus,
/// Status of direct broker connections.
pub broker_status: BrokerStatus,
}

/// Status of direct broker connections for a `TunnelingClientContext`.
#[derive(Clone)]
pub enum BrokerStatus {
/// The broker connections are nominal.
Nominal,
/// At least one broker connection has failed.
Failed(String),
}

/// A client context that supports rewriting broker addresses.
#[derive(Clone)]
pub struct TunnelingClientContext<C> {
Expand All @@ -341,7 +319,6 @@ pub struct TunnelingClientContext<C> {
ssh_tunnel_manager: SshTunnelManager,
ssh_timeout_config: SshTimeoutConfig,
runtime: Handle,
enforce_external_addresses: bool,
}

impl<C> TunnelingClientContext<C> {
Expand All @@ -352,7 +329,6 @@ impl<C> TunnelingClientContext<C> {
ssh_tunnel_manager: SshTunnelManager,
ssh_timeout_config: SshTimeoutConfig,
in_task: InTask,
enforce_external_addresses: bool,
) -> TunnelingClientContext<C> {
TunnelingClientContext {
inner,
Expand All @@ -362,7 +338,6 @@ impl<C> TunnelingClientContext<C> {
ssh_tunnel_manager,
ssh_timeout_config,
runtime,
enforce_external_addresses,
}
}

Expand Down Expand Up @@ -417,22 +392,19 @@ impl<C> TunnelingClientContext<C> {
&self.inner
}

/// Returns a `TunnelingClientStatus` that contains a _consolidated_ `SshTunnelStatus` to
/// communicates the status of all active ssh tunnels `self` knows about, and a `BrokerStatus`
/// that contains a _consolidated_ status of all direct broker connections.
pub fn tunnel_status(&self) -> TunnelingClientStatus {
let rewrites = self.rewrites.lock().expect("poisoned");

let ssh_status = rewrites
/// Returns a _consolidated_ `SshTunnelStatus` that communicates the status
/// of all active ssh tunnels `self` knows about.
pub fn tunnel_status(&self) -> SshTunnelStatus {
self.rewrites
.lock()
.expect("poisoned")
.values()
.map(|handle| match handle {
BrokerRewriteHandle::SshTunnel(s) => s.check_status(),
BrokerRewriteHandle::FailedDefaultSshTunnel(e) => {
SshTunnelStatus::Errored(e.clone())
}
BrokerRewriteHandle::Simple(_) | BrokerRewriteHandle::FailedDnsResolution(_) => {
SshTunnelStatus::Running
}
BrokerRewriteHandle::Simple(_) => SshTunnelStatus::Running,
})
.fold(SshTunnelStatus::Running, |acc, status| {
match (acc, status) {
Expand All @@ -447,27 +419,7 @@ impl<C> TunnelingClientContext<C> {
SshTunnelStatus::Running
}
}
});

let broker_status = rewrites
.values()
.map(|handle| match handle {
BrokerRewriteHandle::FailedDnsResolution(e) => BrokerStatus::Failed(e.clone()),
_ => BrokerStatus::Nominal,
})
.fold(BrokerStatus::Nominal, |acc, status| match (acc, status) {
(BrokerStatus::Nominal, BrokerStatus::Failed(e))
| (BrokerStatus::Failed(e), BrokerStatus::Nominal) => BrokerStatus::Failed(e),
(BrokerStatus::Failed(err), BrokerStatus::Failed(e)) => {
BrokerStatus::Failed(format!("{}, {}", err, e))
}
(BrokerStatus::Nominal, BrokerStatus::Nominal) => BrokerStatus::Nominal,
});

TunnelingClientStatus {
ssh_status,
broker_status,
}
}
}

Expand All @@ -490,8 +442,7 @@ where
port: Some(addr.port()),
}
}
BrokerRewriteHandle::FailedDefaultSshTunnel(_)
| BrokerRewriteHandle::FailedDnsResolution(_) => {
BrokerRewriteHandle::FailedDefaultSshTunnel(_) => {
unreachable!()
}
};
Expand All @@ -513,9 +464,7 @@ where
let rewrite = self.rewrites.lock().expect("poisoned").get(&addr).cloned();

match rewrite {
None
| Some(BrokerRewriteHandle::FailedDefaultSshTunnel(_))
| Some(BrokerRewriteHandle::FailedDnsResolution(_)) => {
None | Some(BrokerRewriteHandle::FailedDefaultSshTunnel(_)) => {
match &self.default_tunnel {
TunnelConfig::Ssh(default_tunnel) => {
// Multiple users could all run `connect` at the same time; only one ssh
Expand All @@ -524,8 +473,6 @@ where
let ssh_tunnel = self.runtime.block_on(async {
self.ssh_tunnel_manager
.connect(
// We know the default_tunnel has already been validated by the `resolve_address`
// method when it was provided to the client, so we don't need to check it again.
default_tunnel.clone(),
&addr.host,
addr.port.parse().unwrap(),
Expand All @@ -542,7 +489,6 @@ where
if matches!(
o.get(),
BrokerRewriteHandle::FailedDefaultSshTunnel(_)
| BrokerRewriteHandle::FailedDnsResolution(_)
) =>
{
o.insert(BrokerRewriteHandle::SshTunnel(
Expand Down Expand Up @@ -586,45 +532,7 @@ where
host: host.to_owned(),
port: addr.port,
},
TunnelConfig::None => {
// If no rewrite is specified, we still should check that this potentially
// new broker address is a global address.
self.runtime.block_on(async {
match resolve_address(&addr.host, self.enforce_external_addresses).await
{
Ok(resolved) => {
let rewrite = BrokerRewriteHandle::Simple(BrokerRewrite {
// `resolve_address` will always return at least one address.
// TODO: Once we have a way to provide multiple hosts to rdkafka, we should
// return all resolved addresses here.
host: resolved.first().unwrap().to_string(),
port: addr.port.parse().ok(),
});
return_rewrite(&rewrite)
}
Err(e) => {
warn!(
"failed to resolve external address for {:?}: {}",
addr,
e.display_with_causes()
);
// Write an error if no one else has already written one.
let mut rewrites = self.rewrites.lock().expect("poisoned");
rewrites.entry(addr.clone()).or_insert_with(|| {
BrokerRewriteHandle::FailedDnsResolution(
e.to_string_with_causes(),
)
});
// We have to give rdkafka an address, as this callback can't fail.
BrokerAddr {
host: "failed-dns-resolution.dev.materialize.com"
.to_string(),
port: 1337.to_string(),
}
}
}
})
}
TunnelConfig::None => addr,
}
}
Some(rewrite) => return_rewrite(&rewrite),
Expand Down
12 changes: 4 additions & 8 deletions src/storage-types/src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use url::Url;
use crate::configuration::StorageConfiguration;
use crate::connections::aws::{AwsConnection, AwsConnectionValidationError};
use crate::controller::AlterError;
use crate::dyncfgs::{KAFKA_CLIENT_ID_ENRICHMENT_RULES, ENFORCE_EXTERNAL_ADDRESSES};
use crate::dyncfgs::{ENFORCE_EXTERNAL_ADDRESSES, KAFKA_CLIENT_ID_ENRICHMENT_RULES};
use crate::errors::{ContextCreationError, CsrConnectError};
use crate::AlterCompatible;

Expand Down Expand Up @@ -662,6 +662,9 @@ impl KafkaConnection {
config.set(*k, v);
}

// TODO(roshan): Implement enforcement of external address validation once
// rdkafka client has been updated to support providing multiple resolved
// addresses for brokers
let mut context = TunnelingClientContext::new(
context,
Handle::current(),
Expand All @@ -671,7 +674,6 @@ impl KafkaConnection {
.clone(),
storage_configuration.parameters.ssh_timeout_config,
in_task,
ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
);

match &self.default_tunnel {
Expand Down Expand Up @@ -727,12 +729,6 @@ impl KafkaConnection {
// - Its not necessary.
// - Not doing so makes it easier to test the `FailedDefaultSshTunnel` path
// in the `TunnelingClientContext`.
//
// NOTE that we do not need to use the `resolve_address` method to
// validate the broker address here since it will be validated when the
// connection is established in `src/kafka-util/src/client.rs`, and we do not
// want to specify any BrokerRewrite that would override any default-tunnel
// settings.
}
Tunnel::AwsPrivatelink(aws_privatelink) => {
let host = mz_cloud_resources::vpc_endpoint_host(
Expand Down
2 changes: 1 addition & 1 deletion src/storage-types/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,7 @@ where
cx: &TunnelingClientContext<C>,
) -> Result<T, ContextCreationError> {
self.map_err(|e| {
if let SshTunnelStatus::Errored(e) = cx.tunnel_status().ssh_status {
if let SshTunnelStatus::Errored(e) = cx.tunnel_status() {
ContextCreationError::Ssh(anyhow!(e))
} else {
ContextCreationError::from(e)
Expand Down
37 changes: 9 additions & 28 deletions src/storage/src/source/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ use chrono::{DateTime, NaiveDateTime};
use differential_dataflow::{AsCollection, Collection};
use futures::StreamExt;
use maplit::btreemap;
use mz_kafka_util::client::{
get_partitions, BrokerStatus, MzClientContext, PartitionId, TunnelingClientContext,
};
use mz_kafka_util::client::{get_partitions, MzClientContext, PartitionId, TunnelingClientContext};
use mz_ore::error::ErrorExt;
use mz_ore::future::InTask;
use mz_ore::thread::{JoinHandleExt, UnparkOnDropHandle};
Expand Down Expand Up @@ -392,38 +390,21 @@ impl SourceRender for KafkaSourceConnection {
"kafka metadata thread: updated partition metadata info",
);

// Check to see if any broker errors have been hit
match consumer.client().context().tunnel_status().broker_status
{
BrokerStatus::Failed(err) => {
let status = HealthStatusUpdate::stalled(
format!("broker error: {}", err),
None,
);
*status_report.lock().unwrap() = HealthStatus {
kafka: Some(status),
ssh: None,
};
}
BrokerStatus::Nominal => {
// Clear all the health namespaces we know about.
// Note that many kafka sources's don't have an ssh tunnel, but
// the `health_operator` handles this fine.
*status_report.lock().unwrap() = HealthStatus {
kafka: Some(HealthStatusUpdate::running()),
ssh: Some(HealthStatusUpdate::running()),
};
}
}
// Clear all the health namespaces we know about.
// Note that many kafka sources's don't have an ssh tunnel, but
// the `health_operator` handles this fine.
*status_report.lock().unwrap() = HealthStatus {
kafka: Some(HealthStatusUpdate::running()),
ssh: Some(HealthStatusUpdate::running()),
};
}
Err(e) => {
let kafka_status = Some(HealthStatusUpdate::stalled(
format!("{}", e.display_with_causes()),
None,
));

let ssh_status =
consumer.client().context().tunnel_status().ssh_status;
let ssh_status = consumer.client().context().tunnel_status();
let ssh_status = match ssh_status {
SshTunnelStatus::Running => {
Some(HealthStatusUpdate::running())
Expand Down
3 changes: 0 additions & 3 deletions test/testdrive/connection-create-external.td
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.mater
ALTER SYSTEM SET enable_connection_validation_syntax = true
ALTER SYSTEM SET storage_enforce_external_addresses = true

! CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT) WITH (VALIDATE = true)
contains:Failed to resolve hostname

# Setup kafka topic with schema
# must be a subset of the keys in the rows
$ set keyschema={
Expand Down
2 changes: 1 addition & 1 deletion test/testdrive/kafka-source-errors.td
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ contains:Failed to resolve hostname
! CREATE CONNECTION IF NOT EXISTS fawlty_csr_conn TO CONFLUENT SCHEMA REGISTRY (
URL 'http://non-existent-csr:8081'
);
contains:failed to lookup address information: Name or service not known
contains:failed to lookup address information

# Check that for all tables clause is rejected
! CREATE SOURCE bad_definition1
Expand Down

0 comments on commit a6e4449

Please sign in to comment.