diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index 3059529c7..7675093b2 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -596,6 +596,43 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) { } } +#[rstest] +#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] +#[case::java(KafkaDriver::Java)] +#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear +async fn cluster_2_racks_multi_shotover_kafka_3_9(#[case] driver: KafkaDriver) { + let _docker_compose = + docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose-kafka-3.9.yaml"); + + // One shotover instance per rack + let mut shotovers = vec![]; + for i in 1..3 { + shotovers.push( + shotover_process(&format!( + "tests/test-configs/kafka/cluster-2-racks/topology-rack{i}.yaml" + )) + .with_config(&format!( + "tests/test-configs/shotover-config/config{i}.yaml" + )) + .with_log_name(&format!("shotover{i}")) + .start() + .await, + ); + } + + let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); + test_cases::cluster_test_suite(&connection_builder).await; + + for shotover in shotovers { + tokio::time::timeout( + Duration::from_secs(10), + shotover.shutdown_and_then_consume_events(&multi_shotover_events()), + ) + .await + .expect("Shotover did not shutdown within 10s"); + } +} + #[rstest] //#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] // CPP driver does not support scram #[case::java(KafkaDriver::Java)] diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/docker-compose-kafka-3.9.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/docker-compose-kafka-3.9.yaml new file mode 100644 index 000000000..430d8e1c0 --- /dev/null +++ b/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/docker-compose-kafka-3.9.yaml @@ -0,0 +1,96 @@ +networks: + cluster_subnet: + name: cluster_subnet + driver: bridge + ipam: + driver: default + config: + - subnet: 172.16.1.0/24 + gateway: 172.16.1.1 +services: + kafka0: + image: &image 'bitnami/kafka:3.9.0-debian-12-r3' + networks: + cluster_subnet: + ipv4_address: 172.16.1.2 + environment: &environment + KAFKA_CFG_LISTENERS: "BROKER://:9092,CONTROLLER://:9093" + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.2:9092" + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT" + KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "BROKER" + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER" + KAFKA_CFG_PROCESS_ROLES: "controller,broker" + KAFKA_KRAFT_CLUSTER_ID: "abcdefghijklmnopqrstuv" + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka0:9093,1@kafka1:9093,2@kafka2:9093,3@kafka3:9093,4@kafka4:9093,5@kafka5:9093" + KAFKA_CFG_NODE_ID: 0 + KAFKA_CFG_BROKER_RACK: "rack1" + ALLOW_PLAINTEXT_LISTENER: "yes" + # Required for high availability + KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 + KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 2 + + # This cfg is set to 3000 by default, which for a typical workload reduces the overhead of creating a + # new consumer group by avoiding constant rebalances as each initial consumer joins. + # See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-134%3A+Delay+initial+consumer+group+rebalance + # + # However for an integration test workload we are constantly spinning up single consumer groups, so the default value makes the tests take twice as long to run. + KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: "0" + volumes: &volumes + - type: tmpfs + target: /bitnami/kafka + kafka1: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.3 + environment: + <<: *environment + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.3:9092" + KAFKA_CFG_NODE_ID: 1 + KAFKA_CFG_BROKER_RACK: "rack1" + volumes: *volumes + kafka2: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.4 + environment: + <<: *environment + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.4:9092" + KAFKA_CFG_NODE_ID: 2 + KAFKA_CFG_BROKER_RACK: "rack1" + volumes: *volumes + kafka3: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.5 + environment: + <<: *environment + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.5:9092" + KAFKA_CFG_NODE_ID: 3 + KAFKA_CFG_BROKER_RACK: "rack2" + volumes: *volumes + kafka4: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.6 + environment: + <<: *environment + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.6:9092" + KAFKA_CFG_NODE_ID: 4 + KAFKA_CFG_BROKER_RACK: "rack2" + volumes: *volumes + kafka5: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.7 + environment: + <<: *environment + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.7:9092" + KAFKA_CFG_NODE_ID: 5 + KAFKA_CFG_BROKER_RACK: "rack2" + volumes: *volumes diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 9acfa8f58..6a4c72a71 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -16,6 +16,7 @@ use kafka_node::{ConnectionFactory, KafkaAddress, KafkaNode, KafkaNodeState}; use kafka_protocol::messages::add_partitions_to_txn_request::AddPartitionsToTxnTransaction; use kafka_protocol::messages::delete_records_request::DeleteRecordsTopic; use kafka_protocol::messages::delete_records_response::DeleteRecordsTopicResult; +use kafka_protocol::messages::describe_cluster_response::DescribeClusterBroker; use kafka_protocol::messages::describe_producers_request::TopicRequest; use kafka_protocol::messages::describe_producers_response::TopicResponse; use kafka_protocol::messages::fetch_request::FetchTopic; @@ -32,15 +33,16 @@ use kafka_protocol::messages::produce_response::{ use kafka_protocol::messages::{ AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey, BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsRequest, - DeleteRecordsResponse, DescribeGroupsRequest, DescribeGroupsResponse, DescribeProducersRequest, - DescribeProducersResponse, DescribeTransactionsRequest, DescribeTransactionsResponse, - EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, - GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, - ListGroupsResponse, ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, - MetadataRequest, MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, - OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, - RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, - SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest, + DeleteRecordsResponse, DescribeClusterResponse, DescribeGroupsRequest, DescribeGroupsResponse, + DescribeProducersRequest, DescribeProducersResponse, DescribeTransactionsRequest, + DescribeTransactionsResponse, EndTxnRequest, FetchRequest, FetchResponse, + FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest, + InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse, + ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, MetadataRequest, + MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, OffsetForLeaderEpochRequest, + OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, RequestHeader, + SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest, + TopicName, TransactionalId, TxnOffsetCommitRequest, }; use kafka_protocol::protocol::StrBytes; use kafka_protocol::ResponseError; @@ -1142,6 +1144,7 @@ The connection to the client has been closed." Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::Metadata(_) + | RequestBody::DescribeCluster(_) | RequestBody::DescribeConfigs(_) | RequestBody::AlterConfigs(_) | RequestBody::CreatePartitions(_) @@ -3077,14 +3080,40 @@ The connection to the client has been closed." response.invalidate_cache(); } Some(Frame::Kafka(KafkaFrame::Response { - body: ResponseBody::DescribeCluster(_), + body: ResponseBody::DescribeCluster(describe_cluster), .. })) => { - // If clients were to send this we would need to rewrite the broker information. - // However I dont think clients actually send this, so just error to ensure we dont break invariants. - return Err(anyhow!( - "I think this is a raft specific message and never sent by clients" - )); + self.process_describe_cluster_response(describe_cluster) + .await; + self.rewrite_describe_cluster_response(describe_cluster)?; + response.invalidate_cache(); + } + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::ApiVersions(api_versions), + .. + })) => { + let original_size = api_versions.api_keys.len(); + + // List of keys that shotover doesnt support and so should be removed from supported keys list + let disable_keys = [ + // This message type has very little documentation available and kafka responds to it with an error code 35 UNSUPPORTED_VERSION + // So its not clear at all how to implement this and its not even possible to test it. + // Instead lets just ask the client to not send it at all. + // We can consider supporting it when kafka itself starts to support it but we will need to be very + // careful to correctly implement the pagination/cursor logic. + ApiKey::DescribeTopicPartitionsKey as i16, + // This message type is part of the new consumer group API, we should implement support for it in the future. + // I've disabled it for now to keep the scope down for kafka 3.9 support. + ApiKey::ConsumerGroupDescribeKey as i16, + ]; + api_versions + .api_keys + .retain(|x| !disable_keys.contains(&x.api_key)); + + if original_size != api_versions.api_keys.len() { + // only invalidate the cache if we actually removed anything + response.invalidate_cache(); + } } _ => {} } @@ -3398,6 +3427,26 @@ The connection to the client has been closed." } } + async fn process_describe_cluster_response( + &mut self, + describe_cluster: &DescribeClusterResponse, + ) { + for broker in &describe_cluster.brokers { + let node = KafkaNode::new( + broker.broker_id, + KafkaAddress::new(broker.host.clone(), broker.port), + broker.rack.clone(), + ); + self.add_node_if_new(node).await; + } + + tracing::debug!( + "Storing controller metadata, controller is now broker {}", + describe_cluster.controller_id.0 + ); + self.controller_broker.set(describe_cluster.controller_id); + } + fn process_find_coordinator_response( &mut self, version: i16, @@ -3604,34 +3653,69 @@ The connection to the client has been closed." } } + self.rewrite_controller_id(&mut metadata.controller_id, &up_shotover_nodes); + + Ok(()) + } + + /// Rewrite DescribeCluster response to appear as if the shotover cluster is the real cluster and the real kafka brokers do not exist + fn rewrite_describe_cluster_response( + &self, + describe_cluster: &mut DescribeClusterResponse, + ) -> Result<()> { + // This should never be empty since the local shotover node always considers itself UP + let up_shotover_nodes: Vec<_> = self + .shotover_nodes + .iter() + .filter(|shotover_node| shotover_node.is_up()) + .collect(); + + // Overwrite list of brokers with the list of UP shotover nodes + describe_cluster.brokers = up_shotover_nodes + .iter() + .map(|shotover_node| { + DescribeClusterBroker::default() + .with_broker_id(shotover_node.broker_id) + .with_host(shotover_node.address_for_clients.host.clone()) + .with_port(shotover_node.address_for_clients.port) + .with_rack(Some(shotover_node.rack.clone())) + }) + .collect(); + + self.rewrite_controller_id(&mut describe_cluster.controller_id, &up_shotover_nodes); + + Ok(()) + } + + fn rewrite_controller_id( + &self, + controller_id_field: &mut BrokerId, + up_shotover_nodes: &[&ShotoverNode], + ) { if let Some(controller_node_rack) = self .nodes .iter() - .find(|node| node.broker_id == metadata.controller_id) - .map(|x| x.rack.clone()) + .find(|node| node.broker_id == *controller_id_field) + .map(|x| &x.rack) { // If broker has no rack - use the first UP shotover node. // If broker has rack - use the first UP shotover node with the same rack if available, // and fall back to use the first UP shotover node out of the rack otherwise. // This is deterministic because the list of UP shotover nodes is sorted and partitioning does not change the order. - let shotover_nodes_by_rack = partition_shotover_nodes_by_rack( - &up_shotover_nodes, - &controller_node_rack.as_ref(), - ); + let shotover_nodes_by_rack = + partition_shotover_nodes_by_rack(up_shotover_nodes, &controller_node_rack.as_ref()); let shotover_node = shotover_nodes_by_rack .nodes_in_rack .first() .or_else(|| shotover_nodes_by_rack.nodes_out_of_rack.first()) .expect("There will always be at least one up shotover node"); - metadata.controller_id = shotover_node.broker_id; + *controller_id_field = shotover_node.broker_id; } else { // controller is either -1 or an unknown broker // In both cases it is reasonable to set to -1 to indicate the controller is unknown. - metadata.controller_id = BrokerId(-1); + *controller_id_field = BrokerId(-1); } - - Ok(()) } async fn add_node_if_new(&mut self, new_node: KafkaNode) { diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index 59b36cb0c..996010b92 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -35,7 +35,7 @@ impl KafkaConnectionBuilderJava { // The list of dependencies can be found here: https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.7.0/kafka-clients-3.7.0.pom // These are deployed to and loaded from a path like target/debug/jassets let jvm = Jvm::new(&[ - "org.apache.kafka:kafka-clients:3.7.0", + "org.apache.kafka:kafka-clients:3.8.1", "org.slf4j:slf4j-api:1.7.36", "org.slf4j:slf4j-simple:1.7.36", ]); diff --git a/test-helpers/src/docker_compose.rs b/test-helpers/src/docker_compose.rs index e8c2e9378..adb99308b 100644 --- a/test-helpers/src/docker_compose.rs +++ b/test-helpers/src/docker_compose.rs @@ -20,7 +20,7 @@ pub fn new_moto() -> DockerCompose { docker_compose("tests/transforms/docker-compose-moto.yaml") } -pub static IMAGE_WAITERS: [Image; 11] = [ +pub static IMAGE_WAITERS: [Image; 12] = [ Image { name: "motoserver/moto", log_regex_to_wait_for: r"Press CTRL\+C to quit", @@ -75,6 +75,11 @@ pub static IMAGE_WAITERS: [Image; 11] = [ log_regex_to_wait_for: r"Kafka Server started", timeout: Duration::from_secs(120), }, + Image { + name: "bitnami/kafka:3.9.0-debian-12-r3", + log_regex_to_wait_for: r"Kafka Server started", + timeout: Duration::from_secs(120), + }, Image { name: "opensearchproject/opensearch:2.9.0", log_regex_to_wait_for: r"Node started",