Skip to content

Commit

Permalink
Fix other error checks to use dns error response; properly report err…
Browse files Browse the repository at this point in the history
…ors from kafka client dns resolution
  • Loading branch information
rjobanp committed Mar 22, 2024
1 parent ca6c539 commit 8a4deb0
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 54 deletions.
121 changes: 102 additions & 19 deletions src/kafka-util/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,9 @@ enum BrokerRewriteHandle {
/// For _default_ ssh tunnels, we store an error if _creation_
/// of the tunnel failed, so that `tunnel_status` can return it.
FailedDefaultSshTunnel(String),
/// We store an error if DNS resolution fails when resolving
/// a new broker host.
FailedDNSResolution(String),
}

/// Tunneling clients
Expand All @@ -309,6 +312,24 @@ pub enum TunnelConfig {
None,
}

/// Status of all active ssh tunnels and direct broker connections for a `TunnelingClientContext`.
#[derive(Clone)]
pub struct TunnelingClientStatus {
/// Status of all active ssh tunnels.
pub ssh_status: SshTunnelStatus,
/// Status of direct broker connections.
pub broker_status: BrokerStatus,
}

/// Status of direct broker connections for a `TunnelingClientContext`.
#[derive(Clone)]
pub enum BrokerStatus {
/// The broker connections are nominal.
Nominal,
/// At least one broker connection has failed.
Failed(String),
}

/// A client context that supports rewriting broker addresses.
#[derive(Clone)]
pub struct TunnelingClientContext<C> {
Expand Down Expand Up @@ -391,19 +412,22 @@ impl<C> TunnelingClientContext<C> {
&self.inner
}

/// Returns a _consolidated_ `SshTunnelStatus` that communicates the status
/// of all active ssh tunnels `self` knows about.
pub fn tunnel_status(&self) -> SshTunnelStatus {
self.rewrites
.lock()
.expect("poisoned")
/// Returns a `TunnelingClientStatus` that contains a _consolidated_ `SshTunnelStatus` to
/// communicates the status of all active ssh tunnels `self` knows about, and a `BrokerStatus`
/// that contains a _consolidated_ status of all direct broker connections.
pub fn tunnel_status(&self) -> TunnelingClientStatus {
let rewrites = self.rewrites.lock().expect("poisoned");

let ssh_status = rewrites
.values()
.map(|handle| match handle {
BrokerRewriteHandle::SshTunnel(s) => s.check_status(),
BrokerRewriteHandle::FailedDefaultSshTunnel(e) => {
SshTunnelStatus::Errored(e.clone())
}
BrokerRewriteHandle::Simple(_) => SshTunnelStatus::Running,
BrokerRewriteHandle::Simple(_) | BrokerRewriteHandle::FailedDNSResolution(_) => {
SshTunnelStatus::Running
}
})
.fold(SshTunnelStatus::Running, |acc, status| {
match (acc, status) {
Expand All @@ -418,7 +442,27 @@ impl<C> TunnelingClientContext<C> {
SshTunnelStatus::Running
}
}
});

let broker_status = rewrites
.values()
.map(|handle| match handle {
BrokerRewriteHandle::FailedDNSResolution(e) => BrokerStatus::Failed(e.clone()),
_ => BrokerStatus::Nominal,
})
.fold(BrokerStatus::Nominal, |acc, status| match (acc, status) {
(BrokerStatus::Nominal, BrokerStatus::Failed(e))
| (BrokerStatus::Failed(e), BrokerStatus::Nominal) => BrokerStatus::Failed(e),
(BrokerStatus::Failed(err), BrokerStatus::Failed(e)) => {
BrokerStatus::Failed(format!("{}, {}", err, e))
}
(BrokerStatus::Nominal, BrokerStatus::Nominal) => BrokerStatus::Nominal,
});

TunnelingClientStatus {
ssh_status,
broker_status,
}
}
}

Expand All @@ -441,7 +485,8 @@ where
port: Some(addr.port()),
}
}
BrokerRewriteHandle::FailedDefaultSshTunnel(_) => {
BrokerRewriteHandle::FailedDefaultSshTunnel(_)
| BrokerRewriteHandle::FailedDNSResolution(_) => {
unreachable!()
}
};
Expand All @@ -463,16 +508,30 @@ where
let rewrite = self.rewrites.lock().expect("poisoned").get(&addr).cloned();

match rewrite {
None | Some(BrokerRewriteHandle::FailedDefaultSshTunnel(_)) => {
None
| Some(BrokerRewriteHandle::FailedDefaultSshTunnel(_))
| Some(BrokerRewriteHandle::FailedDNSResolution(_)) => {
match &self.default_tunnel {
TunnelConfig::Ssh(default_tunnel) => {
// Multiple users could all run `connect` at the same time; only one ssh
// tunnel will ever be connected, and only one will be inserted into the
// map.
let ssh_tunnel = self.runtime.block_on(async {
// Ensure the default tunnel host is resolved to an external address.
let resolved_tunnel_addr = resolve_external_address(
&default_tunnel.host,
self.enforce_external_addresses,
)
.await?;
let tunnel_config = SshTunnelConfig {
host: resolved_tunnel_addr.to_string(),
port: default_tunnel.port,
user: default_tunnel.user.clone(),
key_pair: default_tunnel.key_pair.clone(),
};
self.ssh_tunnel_manager
.connect(
default_tunnel.clone(),
tunnel_config,
&addr.host,
addr.port.parse().unwrap(),
self.ssh_timeout_config,
Expand All @@ -487,6 +546,7 @@ where
if matches!(
o.get(),
BrokerRewriteHandle::FailedDefaultSshTunnel(_)
| BrokerRewriteHandle::FailedDNSResolution(_)
) =>
{
o.insert(BrokerRewriteHandle::SshTunnel(
Expand Down Expand Up @@ -533,19 +593,42 @@ where
TunnelConfig::None => {
// If no rewrite is specified, we still should check that this potentially
// new broker address is a global address.
let rewrite = self.runtime.block_on(async {
let resolved = resolve_external_address(
self.runtime.block_on(async {
match resolve_external_address(
&addr.host,
self.enforce_external_addresses,
)
.await
.unwrap();
BrokerRewriteHandle::Simple(BrokerRewrite {
host: resolved.to_string(),
port: addr.port.parse().ok(),
})
});
return_rewrite(&rewrite)
{
Ok(resolved) => {
let rewrite = BrokerRewriteHandle::Simple(BrokerRewrite {
host: resolved.to_string(),
port: addr.port.parse().ok(),
});
return_rewrite(&rewrite)
}
Err(e) => {
warn!(
"failed to resolve external address for {:?}: {}",
addr,
e.display_with_causes()
);
// Write an error if no one else has already written one.
let mut rewrites = self.rewrites.lock().expect("poisoned");
rewrites.entry(addr.clone()).or_insert_with(|| {
BrokerRewriteHandle::FailedDNSResolution(
e.to_string_with_causes(),
)
});
// We have to give rdkafka an address, as this callback can't fail.
BrokerAddr {
host: "failed-dns-resolution.dev.materialize.com"
.to_string(),
port: 1337.to_string(),
}
}
}
})
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/mysql-util/src/tunnel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ impl Config {
// the TLS hostname back to the actual upstream host and not the
// TCP hostname.
opts_builder = opts_builder.ssl_opts(Some(
ssl_opts.clone().with_tls_hostname_override(Some(
ssl_opts.clone().with_danger_tls_hostname_override(Some(
self.inner.ip_or_hostname().to_string(),
)),
));
Expand Down
32 changes: 17 additions & 15 deletions src/storage-types/src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,8 +547,14 @@ impl KafkaConnection {
.await?;
let key_pair = SshKeyPair::from_bytes(&secret)?;

// Ensure any ssh-bastion address we connect to is resolved to an external address.
let resolved = resolve_external_address(
&ssh_tunnel.connection.host,
ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
)
.await?;
context.set_default_tunnel(TunnelConfig::Ssh(SshTunnelConfig {
host: ssh_tunnel.connection.host.clone(),
host: resolved.to_string(),
port: ssh_tunnel.connection.port,
user: ssh_tunnel.connection.user.clone(),
key_pair,
Expand All @@ -567,25 +573,21 @@ impl KafkaConnection {
};
match &broker.tunnel {
Tunnel::Direct => {
// By default, don't override broker address lookup.
//
// N.B.
//
// We _could_ pre-setup the default ssh tunnel for all known brokers here, but
// we avoid doing because:
// - Its not necessary.
// - Not doing so makes it easier to test the `FailedDefaultSshTunnel` path
// in the `TunnelingClientContext`.

// Ensure any broker address we connect to is resolved to an external address.
let resolved = resolve_external_address(
&addr.host,
ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
)
.await?;
context.add_broker_rewrite(
addr,
BrokerRewrite {
host: resolved.to_string(),
port: None,
},
)
//
// NOTE that we do not need to use the `resolve_external_address` method to
// validate the broker address here since it will be validated when the
// connection is established in `src/kafka-util/src/client.rs`, and we do not
// want to specify any BrokerRewrite that would override any default-tunnel
// settings.
}
Tunnel::AwsPrivatelink(aws_privatelink) => {
let host = mz_cloud_resources::vpc_endpoint_host(
Expand Down
2 changes: 1 addition & 1 deletion src/storage-types/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,7 @@ where
cx: &TunnelingClientContext<C>,
) -> Result<T, ContextCreationError> {
self.map_err(|e| {
if let SshTunnelStatus::Errored(e) = cx.tunnel_status() {
if let SshTunnelStatus::Errored(e) = cx.tunnel_status().ssh_status {
ContextCreationError::Ssh(anyhow!(e))
} else {
ContextCreationError::from(e)
Expand Down
37 changes: 28 additions & 9 deletions src/storage/src/source/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use chrono::NaiveDateTime;
use differential_dataflow::{AsCollection, Collection};
use futures::StreamExt;
use maplit::btreemap;
use mz_kafka_util::client::{get_partitions, MzClientContext, PartitionId, TunnelingClientContext};
use mz_kafka_util::client::{
get_partitions, BrokerStatus, MzClientContext, PartitionId, TunnelingClientContext,
};
use mz_ore::error::ErrorExt;
use mz_ore::thread::{JoinHandleExt, UnparkOnDropHandle};
use mz_repr::adt::timestamp::CheckedTimestamp;
Expand Down Expand Up @@ -384,21 +386,38 @@ impl SourceRender for KafkaSourceConnection {
"kafka metadata thread: updated partition metadata info",
);

// Clear all the health namespaces we know about.
// Note that many kafka sources's don't have an ssh tunnel, but
// the `health_operator` handles this fine.
*status_report.lock().unwrap() = HealthStatus {
kafka: Some(HealthStatusUpdate::running()),
ssh: Some(HealthStatusUpdate::running()),
};
// Check to see if any broker errors have been hit
match consumer.client().context().tunnel_status().broker_status
{
BrokerStatus::Failed(err) => {
let status = HealthStatusUpdate::stalled(
format!("broker error: {}", err),
None,
);
*status_report.lock().unwrap() = HealthStatus {
kafka: Some(status),
ssh: None,
};
}
BrokerStatus::Nominal => {
// Clear all the health namespaces we know about.
// Note that many kafka sources's don't have an ssh tunnel, but
// the `health_operator` handles this fine.
*status_report.lock().unwrap() = HealthStatus {
kafka: Some(HealthStatusUpdate::running()),
ssh: Some(HealthStatusUpdate::running()),
};
}
}
}
Err(e) => {
let kafka_status = Some(HealthStatusUpdate::stalled(
format!("{}", e.display_with_causes()),
None,
));

let ssh_status = consumer.client().context().tunnel_status();
let ssh_status =
consumer.client().context().tunnel_status().ssh_status;
let ssh_status = match ssh_status {
SshTunnelStatus::Running => {
Some(HealthStatusUpdate::running())
Expand Down
2 changes: 1 addition & 1 deletion test/kafka-auth/test-kafka-ssl.td
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ running
# ALTER CONNECTION for Kafka + SSH

! ALTER CONNECTION testdrive_no_reset_connections.public.ssh SET (HOST = 'abcd') WITH (VALIDATE = true);
contains:Could not resolve hostname abcd
contains:failed to lookup address information: Name or service not known

! ALTER CONNECTION testdrive_no_reset_connections.public.ssh RESET (HOST);
contains:HOST option is required
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

> SELECT status FROM mz_internal.mz_source_statuses st
JOIN mz_sources s ON st.id = s.id
WHERE error LIKE 'ssh:%'
AND s.name in ('dynamic_text', 'fixed_text', 'fixed_plus_csr', 'dynamic_plus_csr')
stalled
stalled
Expand Down
2 changes: 1 addition & 1 deletion test/ssh-connection/pg-source-after-ssh-failure.td
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@

> SELECT status FROM mz_internal.mz_source_statuses st
JOIN mz_sources s ON st.id = s.id
WHERE s.name = 'mz_source' AND error LIKE 'ssh:%'
WHERE s.name = 'mz_source'
stalled
2 changes: 1 addition & 1 deletion test/testdrive/connection-alter.td
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ first second
contains:invalid ALTER CONNECTION: invalid CONNECTION: must set one of BROKER, BROKERS, or AWS PRIVATELINK

! ALTER CONNECTION conn SET (broker = 'abcd') WITH (validate = true);
contains:Failed to resolve hostname
contains:failed to lookup address information: Name or service not known

> ALTER CONNECTION conn SET (broker = 'abcd') WITH (validate = false);

Expand Down
2 changes: 1 addition & 1 deletion test/testdrive/connection-create-external.td
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ ALTER SYSTEM SET enable_connection_validation_syntax = true
ALTER SYSTEM SET storage_enforce_external_addresses = true

! CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT) WITH (VALIDATE = true)
contains:address is not global
contains:Failed to resolve hostname

# Setup kafka topic with schema
# must be a subset of the keys in the rows
Expand Down
4 changes: 2 additions & 2 deletions test/testdrive/connection-validation.td
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ ALTER SYSTEM SET enable_connection_validation_syntax = true
> CREATE CONNECTION invalid_tunnel TO SSH TUNNEL (HOST 'invalid', USER 'invalid', PORT 22)

! CREATE CONNECTION invalid_kafka_conn TO KAFKA (BROKERS ('${testdrive.kafka-addr}' USING SSH TUNNEL invalid_tunnel), SECURITY PROTOCOL PLAINTEXT)
contains:failed to connect to the remote host
contains:failed to lookup address information

# Create the connection without validation and validate later
> CREATE CONNECTION invalid_kafka_conn TO KAFKA (BROKERS ('${testdrive.kafka-addr}' USING SSH TUNNEL invalid_tunnel), SECURITY PROTOCOL PLAINTEXT) WITH (VALIDATE = false)

! VALIDATE CONNECTION invalid_kafka_conn
contains:failed to connect to the remote host
contains:failed to lookup address information
4 changes: 2 additions & 2 deletions test/testdrive/kafka-source-errors.td
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ contains:Meta data fetch error: BrokerTransportFailure (Local: Broker transport

! CREATE CONNECTION fawlty_kafka_conn
TO KAFKA (BROKER 'non-existent-broker:9092');
contains:Failed to resolve hostname
contains:failed to lookup address information: Name or service not known

> CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
URL '${testdrive.schema-registry-url}'
Expand All @@ -34,7 +34,7 @@ contains:Failed to resolve hostname
! CREATE CONNECTION IF NOT EXISTS fawlty_csr_conn TO CONFLUENT SCHEMA REGISTRY (
URL 'http://non-existent-csr:8081'
);
contains:failed to lookup address information
contains:failed to lookup address information: Name or service not known

# Check that for all tables clause is rejected
! CREATE SOURCE bad_definition1
Expand Down

0 comments on commit 8a4deb0

Please sign in to comment.