From 453adf5a64e0e63c9a36b08938b3e378a1ccd933 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Wed, 30 Oct 2024 07:10:31 +1100 Subject: [PATCH] fix tests with down shotover node --- shotover-proxy/tests/kafka_int_tests/mod.rs | 4 +- .../tests/kafka_int_tests/test_cases.rs | 37 ++++++++++++++++--- 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index e046aad48..813962821 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -485,7 +485,7 @@ async fn cluster_3_racks_multi_shotover_with_2_shotover_down(#[case] driver: Kaf // create a new connection and produce and consume messages let new_connection_builder = KafkaConnectionBuilder::new(driver, "localhost:9193"); - test_cases::cluster_test_suite(&new_connection_builder).await; + test_cases::cluster_test_suite_with_lost_shotover_node(&new_connection_builder).await; let mut expected_events = multi_shotover_events(); // The UP shotover node should detect the killed nodes at least once @@ -537,7 +537,7 @@ async fn cluster_3_racks_multi_shotover_with_1_shotover_missing(#[case] driver: // Send some produce and consume requests let connection_builder = KafkaConnectionBuilder::new(driver, "localhost:9192"); - test_cases::cluster_test_suite(&connection_builder).await; + test_cases::cluster_test_suite_with_lost_shotover_node(&connection_builder).await; let mut expected_events = multi_shotover_events(); // Other shotover nodes should detect the missing node at least once diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index e96746b5a..72a7f95fb 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -1326,7 +1326,7 @@ async fn test_produce_consume_10_times(producer: &mut KafkaProducer, consumer: & } } -pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) { +async fn standard_test_suite_base(connection_builder: &KafkaConnectionBuilder) { admin_setup(connection_builder).await; produce_consume_partitions1(connection_builder, "partitions1").await; produce_consume_partitions1(connection_builder, "unknown_topic").await; @@ -1363,9 +1363,7 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) { .await; produce_consume_partitions1(connection_builder, "partitions1").await; - // rdkafka-rs doesnt support these methods list_offsets(&admin).await; - list_groups(connection_builder, &admin).await; } produce_consume_acks0(connection_builder).await; @@ -1439,7 +1437,8 @@ async fn list_offsets(admin: &KafkaAdmin) { assert_eq!(results, expected); } -async fn list_groups(connection_builder: &KafkaConnectionBuilder, admin: &KafkaAdmin) { +async fn list_groups(connection_builder: &KafkaConnectionBuilder) { + let admin = connection_builder.connect_admin().await; let mut consumer = connection_builder .connect_consumer( ConsumerConfig::consume_from_topics(vec!["partitions1".to_owned()]) @@ -1461,8 +1460,7 @@ async fn list_groups(connection_builder: &KafkaConnectionBuilder, admin: &KafkaA } } -pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) { - standard_test_suite(connection_builder).await; +async fn cluster_test_suite_base(connection_builder: &KafkaConnectionBuilder) { let admin = connection_builder.connect_admin().await; admin .create_topics_and_wait(&[ @@ -1482,6 +1480,33 @@ pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) { produce_consume_partitions3(connection_builder, "partitions3_rf3", 1, 500).await; } +pub async fn tests_requiring_all_shotover_nodes(connection_builder: &KafkaConnectionBuilder) { + // rdkafka-rs doesnt support these methods + #[allow(irrefutable_let_patterns)] + if let KafkaConnectionBuilder::Java(_) = connection_builder { + list_groups(connection_builder).await; + } +} + +pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) { + standard_test_suite_base(connection_builder).await; + cluster_test_suite_base(connection_builder).await; + tests_requiring_all_shotover_nodes(connection_builder).await; +} + +pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) { + standard_test_suite_base(connection_builder).await; + cluster_test_suite_base(connection_builder).await; + tests_requiring_all_shotover_nodes(connection_builder).await; +} + +pub async fn cluster_test_suite_with_lost_shotover_node( + connection_builder: &KafkaConnectionBuilder, +) { + standard_test_suite_base(connection_builder).await; + cluster_test_suite_base(connection_builder).await; +} + pub async fn setup_basic_user_acls(connection: &KafkaConnectionBuilder, username: &str) { let admin = connection.connect_admin().await; admin