Skip to content

Commit

Permalink
KafkaSinkCluster: Add kafka 3.9 integration test (#1821)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Nov 18, 2024
1 parent 8045ecd commit 73ac04c
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 1 deletion.
37 changes: 37 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,43 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
}
}

#[rstest]
#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_2_racks_multi_shotover_kafka_3_9(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose-kafka-3.9.yaml");

// One shotover instance per rack
let mut shotovers = vec![];
for i in 1..3 {
shotovers.push(
shotover_process(&format!(
"tests/test-configs/kafka/cluster-2-racks/topology-rack{i}.yaml"
))
.with_config(&format!(
"tests/test-configs/shotover-config/config{i}.yaml"
))
.with_log_name(&format!("shotover{i}"))
.start()
.await,
);
}

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::cluster_test_suite(&connection_builder).await;

for shotover in shotovers {
tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&multi_shotover_events()),
)
.await
.expect("Shotover did not shutdown within 10s");
}
}

#[rstest]
//#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] // CPP driver does not support scram
#[case::java(KafkaDriver::Java)]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
networks:
cluster_subnet:
name: cluster_subnet
driver: bridge
ipam:
driver: default
config:
- subnet: 172.16.1.0/24
gateway: 172.16.1.1
services:
kafka0:
image: &image 'bitnami/kafka:3.9.0-debian-12-r3'
networks:
cluster_subnet:
ipv4_address: 172.16.1.2
environment: &environment
KAFKA_CFG_LISTENERS: "BROKER://:9092,CONTROLLER://:9093"
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.2:9092"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "BROKER"
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CFG_PROCESS_ROLES: "controller,broker"
KAFKA_KRAFT_CLUSTER_ID: "abcdefghijklmnopqrstuv"
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka0:9093,1@kafka1:9093,2@kafka2:9093,3@kafka3:9093,4@kafka4:9093,5@kafka5:9093"
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_BROKER_RACK: "rack1"
ALLOW_PLAINTEXT_LISTENER: "yes"
# Required for high availability
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 2

# This cfg is set to 3000 by default, which for a typical workload reduces the overhead of creating a
# new consumer group by avoiding constant rebalances as each initial consumer joins.
# See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-134%3A+Delay+initial+consumer+group+rebalance
#
# However for an integration test workload we are constantly spinning up single consumer groups, so the default value makes the tests take twice as long to run.
KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: "0"
volumes: &volumes
- type: tmpfs
target: /bitnami/kafka
kafka1:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.3
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.3:9092"
KAFKA_CFG_NODE_ID: 1
KAFKA_CFG_BROKER_RACK: "rack1"
volumes: *volumes
kafka2:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.4
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.4:9092"
KAFKA_CFG_NODE_ID: 2
KAFKA_CFG_BROKER_RACK: "rack1"
volumes: *volumes
kafka3:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.5
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.5:9092"
KAFKA_CFG_NODE_ID: 3
KAFKA_CFG_BROKER_RACK: "rack2"
volumes: *volumes
kafka4:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.6
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.6:9092"
KAFKA_CFG_NODE_ID: 4
KAFKA_CFG_BROKER_RACK: "rack2"
volumes: *volumes
kafka5:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.7
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.7:9092"
KAFKA_CFG_NODE_ID: 5
KAFKA_CFG_BROKER_RACK: "rack2"
volumes: *volumes
27 changes: 27 additions & 0 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3077,6 +3077,33 @@ The connection to the client has been closed."
self.rewrite_describe_cluster_response(describe_cluster)?;
response.invalidate_cache();
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::ApiVersions(api_versions),
..
})) => {
let original_size = api_versions.api_keys.len();

// List of keys that shotover doesnt support and so should be removed from supported keys list
let disable_keys = [
// This message type has very little documentation available and kafka responds to it with an error code 35 UNSUPPORTED_VERSION
// So its not clear at all how to implement this and its not even possible to test it.
// Instead lets just ask the client to not send it at all.
// We can consider supporting it when kafka itself starts to support it but we will need to be very
// careful to correctly implement the pagination/cursor logic.
ApiKey::DescribeTopicPartitionsKey as i16,
// This message type is part of the new consumer group API, we should implement support for it in the future.
// I've disabled it for now to keep the scope down for kafka 3.9 support.
ApiKey::ConsumerGroupDescribeKey as i16,
];
api_versions
.api_keys
.retain(|x| !disable_keys.contains(&x.api_key));

if original_size != api_versions.api_keys.len() {
// only invalidate the cache if we actually removed anything
response.invalidate_cache();
}
}
_ => {}
}

Expand Down
7 changes: 6 additions & 1 deletion test-helpers/src/docker_compose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub fn new_moto() -> DockerCompose {
docker_compose("tests/transforms/docker-compose-moto.yaml")
}

pub static IMAGE_WAITERS: [Image; 11] = [
pub static IMAGE_WAITERS: [Image; 12] = [
Image {
name: "motoserver/moto",
log_regex_to_wait_for: r"Press CTRL\+C to quit",
Expand Down Expand Up @@ -75,6 +75,11 @@ pub static IMAGE_WAITERS: [Image; 11] = [
log_regex_to_wait_for: r"Kafka Server started",
timeout: Duration::from_secs(120),
},
Image {
name: "bitnami/kafka:3.9.0-debian-12-r3",
log_regex_to_wait_for: r"Kafka Server started",
timeout: Duration::from_secs(120),
},
Image {
name: "opensearchproject/opensearch:2.9.0",
log_regex_to_wait_for: r"Node started",
Expand Down

0 comments on commit 73ac04c

Please sign in to comment.