Skip to content

Commit

Permalink
Merge branch 'main' into error_on_unregister_broker
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Nov 18, 2024
2 parents a905fc0 + 73ac04c commit 0f955eb
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 27 deletions.
37 changes: 37 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,43 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
}
}

#[rstest]
#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_2_racks_multi_shotover_kafka_3_9(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose-kafka-3.9.yaml");

// One shotover instance per rack
let mut shotovers = vec![];
for i in 1..3 {
shotovers.push(
shotover_process(&format!(
"tests/test-configs/kafka/cluster-2-racks/topology-rack{i}.yaml"
))
.with_config(&format!(
"tests/test-configs/shotover-config/config{i}.yaml"
))
.with_log_name(&format!("shotover{i}"))
.start()
.await,
);
}

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::cluster_test_suite(&connection_builder).await;

for shotover in shotovers {
tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&multi_shotover_events()),
)
.await
.expect("Shotover did not shutdown within 10s");
}
}

#[rstest]
//#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] // CPP driver does not support scram
#[case::java(KafkaDriver::Java)]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
networks:
cluster_subnet:
name: cluster_subnet
driver: bridge
ipam:
driver: default
config:
- subnet: 172.16.1.0/24
gateway: 172.16.1.1
services:
kafka0:
image: &image 'bitnami/kafka:3.9.0-debian-12-r3'
networks:
cluster_subnet:
ipv4_address: 172.16.1.2
environment: &environment
KAFKA_CFG_LISTENERS: "BROKER://:9092,CONTROLLER://:9093"
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.2:9092"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "BROKER"
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CFG_PROCESS_ROLES: "controller,broker"
KAFKA_KRAFT_CLUSTER_ID: "abcdefghijklmnopqrstuv"
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka0:9093,1@kafka1:9093,2@kafka2:9093,3@kafka3:9093,4@kafka4:9093,5@kafka5:9093"
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_BROKER_RACK: "rack1"
ALLOW_PLAINTEXT_LISTENER: "yes"
# Required for high availability
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 2

# This cfg is set to 3000 by default, which for a typical workload reduces the overhead of creating a
# new consumer group by avoiding constant rebalances as each initial consumer joins.
# See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-134%3A+Delay+initial+consumer+group+rebalance
#
# However for an integration test workload we are constantly spinning up single consumer groups, so the default value makes the tests take twice as long to run.
KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: "0"
volumes: &volumes
- type: tmpfs
target: /bitnami/kafka
kafka1:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.3
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.3:9092"
KAFKA_CFG_NODE_ID: 1
KAFKA_CFG_BROKER_RACK: "rack1"
volumes: *volumes
kafka2:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.4
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.4:9092"
KAFKA_CFG_NODE_ID: 2
KAFKA_CFG_BROKER_RACK: "rack1"
volumes: *volumes
kafka3:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.5
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.5:9092"
KAFKA_CFG_NODE_ID: 3
KAFKA_CFG_BROKER_RACK: "rack2"
volumes: *volumes
kafka4:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.6
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.6:9092"
KAFKA_CFG_NODE_ID: 4
KAFKA_CFG_BROKER_RACK: "rack2"
volumes: *volumes
kafka5:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.7
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.7:9092"
KAFKA_CFG_NODE_ID: 5
KAFKA_CFG_BROKER_RACK: "rack2"
volumes: *volumes
134 changes: 109 additions & 25 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use kafka_node::{ConnectionFactory, KafkaAddress, KafkaNode, KafkaNodeState};
use kafka_protocol::messages::add_partitions_to_txn_request::AddPartitionsToTxnTransaction;
use kafka_protocol::messages::delete_records_request::DeleteRecordsTopic;
use kafka_protocol::messages::delete_records_response::DeleteRecordsTopicResult;
use kafka_protocol::messages::describe_cluster_response::DescribeClusterBroker;
use kafka_protocol::messages::describe_producers_request::TopicRequest;
use kafka_protocol::messages::describe_producers_response::TopicResponse;
use kafka_protocol::messages::fetch_request::FetchTopic;
Expand All @@ -32,15 +33,16 @@ use kafka_protocol::messages::produce_response::{
use kafka_protocol::messages::{
AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey,
BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsRequest,
DeleteRecordsResponse, DescribeGroupsRequest, DescribeGroupsResponse, DescribeProducersRequest,
DescribeProducersResponse, DescribeTransactionsRequest, DescribeTransactionsResponse,
EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse,
GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest,
ListGroupsResponse, ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse,
MetadataRequest, MetadataResponse, OffsetFetchRequest, OffsetFetchResponse,
OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse,
RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest,
SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest,
DeleteRecordsResponse, DescribeClusterResponse, DescribeGroupsRequest, DescribeGroupsResponse,
DescribeProducersRequest, DescribeProducersResponse, DescribeTransactionsRequest,
DescribeTransactionsResponse, EndTxnRequest, FetchRequest, FetchResponse,
FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest,
InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse,
ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, MetadataRequest,
MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, OffsetForLeaderEpochRequest,
OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, RequestHeader,
SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest,
TopicName, TransactionalId, TxnOffsetCommitRequest,
};
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::ResponseError;
Expand Down Expand Up @@ -1142,6 +1144,7 @@ The connection to the client has been closed."
Some(Frame::Kafka(KafkaFrame::Request {
body:
RequestBody::Metadata(_)
| RequestBody::DescribeCluster(_)
| RequestBody::DescribeConfigs(_)
| RequestBody::AlterConfigs(_)
| RequestBody::CreatePartitions(_)
Expand Down Expand Up @@ -3077,14 +3080,40 @@ The connection to the client has been closed."
response.invalidate_cache();
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::DescribeCluster(_),
body: ResponseBody::DescribeCluster(describe_cluster),
..
})) => {
// If clients were to send this we would need to rewrite the broker information.
// However I dont think clients actually send this, so just error to ensure we dont break invariants.
return Err(anyhow!(
"I think this is a raft specific message and never sent by clients"
));
self.process_describe_cluster_response(describe_cluster)
.await;
self.rewrite_describe_cluster_response(describe_cluster)?;
response.invalidate_cache();
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::ApiVersions(api_versions),
..
})) => {
let original_size = api_versions.api_keys.len();

// List of keys that shotover doesnt support and so should be removed from supported keys list
let disable_keys = [
// This message type has very little documentation available and kafka responds to it with an error code 35 UNSUPPORTED_VERSION
// So its not clear at all how to implement this and its not even possible to test it.
// Instead lets just ask the client to not send it at all.
// We can consider supporting it when kafka itself starts to support it but we will need to be very
// careful to correctly implement the pagination/cursor logic.
ApiKey::DescribeTopicPartitionsKey as i16,
// This message type is part of the new consumer group API, we should implement support for it in the future.
// I've disabled it for now to keep the scope down for kafka 3.9 support.
ApiKey::ConsumerGroupDescribeKey as i16,
];
api_versions
.api_keys
.retain(|x| !disable_keys.contains(&x.api_key));

if original_size != api_versions.api_keys.len() {
// only invalidate the cache if we actually removed anything
response.invalidate_cache();
}
}
_ => {}
}
Expand Down Expand Up @@ -3398,6 +3427,26 @@ The connection to the client has been closed."
}
}

async fn process_describe_cluster_response(
&mut self,
describe_cluster: &DescribeClusterResponse,
) {
for broker in &describe_cluster.brokers {
let node = KafkaNode::new(
broker.broker_id,
KafkaAddress::new(broker.host.clone(), broker.port),
broker.rack.clone(),
);
self.add_node_if_new(node).await;
}

tracing::debug!(
"Storing controller metadata, controller is now broker {}",
describe_cluster.controller_id.0
);
self.controller_broker.set(describe_cluster.controller_id);
}

fn process_find_coordinator_response(
&mut self,
version: i16,
Expand Down Expand Up @@ -3604,34 +3653,69 @@ The connection to the client has been closed."
}
}

self.rewrite_controller_id(&mut metadata.controller_id, &up_shotover_nodes);

Ok(())
}

/// Rewrite DescribeCluster response to appear as if the shotover cluster is the real cluster and the real kafka brokers do not exist
fn rewrite_describe_cluster_response(
&self,
describe_cluster: &mut DescribeClusterResponse,
) -> Result<()> {
// This should never be empty since the local shotover node always considers itself UP
let up_shotover_nodes: Vec<_> = self
.shotover_nodes
.iter()
.filter(|shotover_node| shotover_node.is_up())
.collect();

// Overwrite list of brokers with the list of UP shotover nodes
describe_cluster.brokers = up_shotover_nodes
.iter()
.map(|shotover_node| {
DescribeClusterBroker::default()
.with_broker_id(shotover_node.broker_id)
.with_host(shotover_node.address_for_clients.host.clone())
.with_port(shotover_node.address_for_clients.port)
.with_rack(Some(shotover_node.rack.clone()))
})
.collect();

self.rewrite_controller_id(&mut describe_cluster.controller_id, &up_shotover_nodes);

Ok(())
}

fn rewrite_controller_id(
&self,
controller_id_field: &mut BrokerId,
up_shotover_nodes: &[&ShotoverNode],
) {
if let Some(controller_node_rack) = self
.nodes
.iter()
.find(|node| node.broker_id == metadata.controller_id)
.map(|x| x.rack.clone())
.find(|node| node.broker_id == *controller_id_field)
.map(|x| &x.rack)
{
// If broker has no rack - use the first UP shotover node.
// If broker has rack - use the first UP shotover node with the same rack if available,
// and fall back to use the first UP shotover node out of the rack otherwise.
// This is deterministic because the list of UP shotover nodes is sorted and partitioning does not change the order.
let shotover_nodes_by_rack = partition_shotover_nodes_by_rack(
&up_shotover_nodes,
&controller_node_rack.as_ref(),
);
let shotover_nodes_by_rack =
partition_shotover_nodes_by_rack(up_shotover_nodes, &controller_node_rack.as_ref());
let shotover_node = shotover_nodes_by_rack
.nodes_in_rack
.first()
.or_else(|| shotover_nodes_by_rack.nodes_out_of_rack.first())
.expect("There will always be at least one up shotover node");

metadata.controller_id = shotover_node.broker_id;
*controller_id_field = shotover_node.broker_id;
} else {
// controller is either -1 or an unknown broker
// In both cases it is reasonable to set to -1 to indicate the controller is unknown.
metadata.controller_id = BrokerId(-1);
*controller_id_field = BrokerId(-1);
}

Ok(())
}

async fn add_node_if_new(&mut self, new_node: KafkaNode) {
Expand Down
2 changes: 1 addition & 1 deletion test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl KafkaConnectionBuilderJava {
// The list of dependencies can be found here: https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.7.0/kafka-clients-3.7.0.pom
// These are deployed to and loaded from a path like target/debug/jassets
let jvm = Jvm::new(&[
"org.apache.kafka:kafka-clients:3.7.0",
"org.apache.kafka:kafka-clients:3.8.1",
"org.slf4j:slf4j-api:1.7.36",
"org.slf4j:slf4j-simple:1.7.36",
]);
Expand Down
7 changes: 6 additions & 1 deletion test-helpers/src/docker_compose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub fn new_moto() -> DockerCompose {
docker_compose("tests/transforms/docker-compose-moto.yaml")
}

pub static IMAGE_WAITERS: [Image; 11] = [
pub static IMAGE_WAITERS: [Image; 12] = [
Image {
name: "motoserver/moto",
log_regex_to_wait_for: r"Press CTRL\+C to quit",
Expand Down Expand Up @@ -75,6 +75,11 @@ pub static IMAGE_WAITERS: [Image; 11] = [
log_regex_to_wait_for: r"Kafka Server started",
timeout: Duration::from_secs(120),
},
Image {
name: "bitnami/kafka:3.9.0-debian-12-r3",
log_regex_to_wait_for: r"Kafka Server started",
timeout: Duration::from_secs(120),
},
Image {
name: "opensearchproject/opensearch:2.9.0",
log_regex_to_wait_for: r"Node started",
Expand Down

0 comments on commit 0f955eb

Please sign in to comment.