diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 7299c01a0..0c096b37d 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -6,7 +6,7 @@ use test_helpers::{ ExpectedResponse, IsolationLevel, KafkaAdmin, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer, ListOffsetsResultInfo, NewPartition, NewPartitionReassignment, NewTopic, OffsetAndMetadata, OffsetSpec, Record, RecordsToDelete, ResourcePatternType, - ResourceSpecifier, ResourceType, TopicPartition, + ResourceSpecifier, ResourceType, TopicPartition, TransactionDescription, }, docker_compose::DockerCompose, }; @@ -1687,15 +1687,29 @@ async fn list_groups(connection_builder: &KafkaConnectionBuilder) { } } -async fn list_transactions(connection_builder: &KafkaConnectionBuilder) { +async fn list_and_describe_transactions(connection_builder: &KafkaConnectionBuilder) { let admin = connection_builder.connect_admin().await; - let _transaction_producer = connection_builder - .connect_producer_with_transactions("some_transaction_id".to_owned()) + let _transaction_producer1 = connection_builder + .connect_producer_with_transactions("some_transaction_id1".to_owned()) + .await; + let _transaction_producer2 = connection_builder + .connect_producer_with_transactions("some_transaction_id2".to_owned()) .await; let actual_results = admin.list_transactions().await; - let expected_results = ["some_transaction_id".to_owned()]; - assert_eq!(actual_results, expected_results); + assert!(actual_results.contains(&"some_transaction_id1".to_owned())); + assert!(actual_results.contains(&"some_transaction_id2".to_owned())); + + let result = admin + .describe_transactions(&["some_transaction_id1", "some_transaction_id2"]) + .await; + assert_eq!( + result, + HashMap::from([ + ("some_transaction_id1".to_owned(), TransactionDescription {}), + ("some_transaction_id2".to_owned(), TransactionDescription {}), + ]) + ); } async fn create_and_list_partition_reassignments(connection_builder: &KafkaConnectionBuilder) { @@ -1771,7 +1785,7 @@ pub async fn tests_requiring_all_shotover_nodes(connection_builder: &KafkaConnec #[allow(irrefutable_let_patterns)] if let KafkaConnectionBuilder::Java(_) = connection_builder { list_groups(connection_builder).await; - list_transactions(connection_builder).await; + list_and_describe_transactions(connection_builder).await; create_and_list_partition_reassignments(connection_builder).await; } } diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 1b19754a8..a985d370f 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -32,14 +32,15 @@ use kafka_protocol::messages::produce_response::{ use kafka_protocol::messages::{ AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey, BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsRequest, - DeleteRecordsResponse, DescribeProducersRequest, DescribeProducersResponse, EndTxnRequest, - FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, - HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, - ListGroupsResponse, ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, - MetadataRequest, MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, - OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, - RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, - SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest, + DeleteRecordsResponse, DescribeProducersRequest, DescribeProducersResponse, + DescribeTransactionsRequest, DescribeTransactionsResponse, EndTxnRequest, FetchRequest, + FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest, + InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse, + ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, MetadataRequest, + MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, OffsetForLeaderEpochRequest, + OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, RequestHeader, + SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest, + TopicName, TransactionalId, TxnOffsetCommitRequest, }; use kafka_protocol::protocol::StrBytes; use kafka_protocol::ResponseError; @@ -56,9 +57,9 @@ use shotover_node::{ShotoverNode, ShotoverNodeConfig}; use split::{ AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter, DeleteRecordsRequestSplitAndRouter, DescribeProducersRequestSplitAndRouter, - ListGroupsSplitAndRouter, ListOffsetsRequestSplitAndRouter, ListTransactionsSplitAndRouter, - OffsetFetchSplitAndRouter, OffsetForLeaderEpochRequestSplitAndRouter, - ProduceRequestSplitAndRouter, RequestSplitAndRouter, + DescribeTransactionsSplitAndRouter, ListGroupsSplitAndRouter, ListOffsetsRequestSplitAndRouter, + ListTransactionsSplitAndRouter, OffsetFetchSplitAndRouter, + OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter, }; use std::collections::{HashMap, HashSet, VecDeque}; use std::hash::Hasher; @@ -770,6 +771,14 @@ impl KafkaSinkCluster { })) => { self.store_transaction(&mut transactions, transactional_id.clone()); } + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::DescribeTransactions(describe_transaction), + .. + })) => { + for transactional_id in &describe_transaction.transactional_ids { + self.store_transaction(&mut transactions, transactional_id.clone()); + } + } Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::AddPartitionsToTxn(add_partitions_to_txn_request), header, @@ -1090,6 +1099,10 @@ The connection to the client has been closed." body: RequestBody::ListGroups(_), .. })) => self.split_and_route_request::(request)?, + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::DescribeTransactions(_), + .. + })) => self.split_and_route_request::(request)?, Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::ListTransactions(_), .. @@ -1560,6 +1573,29 @@ The connection to the client has been closed." result } + /// This method removes all transactions from the DescribeTransactions request and returns them split up by their destination. + /// If any transactions are unroutable they will have their BrokerId set to -1 + fn split_describe_transactions_request_by_destination( + &mut self, + body: &mut DescribeTransactionsRequest, + ) -> HashMap> { + let mut result: HashMap> = Default::default(); + + for transaction in body.transactional_ids.drain(..) { + if let Some(destination) = self.transaction_to_coordinator_broker.get(&transaction) { + let dest_transactions = result.entry(*destination).or_default(); + dest_transactions.push(transaction); + } else { + tracing::warn!("no known coordinator for transactions {transaction:?}, routing request to a random broker so that a NOT_COORDINATOR or similar error is returned to the client"); + let destination = BrokerId(-1); + let dest_transactions = result.entry(destination).or_default(); + dest_transactions.push(transaction); + } + } + + result + } + /// Route broadcasted requests to all brokers split across all shotover nodes. /// That is, each shotover node in a rack will deterministically be assigned a portion of the rack to route the request to. /// If a shotover node is the only node in its rack it will route to all kafka brokers in the rack. @@ -2243,6 +2279,10 @@ The connection to the client has been closed." body: ResponseBody::ListGroups(base), .. })) => Self::combine_list_groups(base, drain)?, + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::DescribeTransactions(base), + .. + })) => Self::combine_describe_transactions(base, drain)?, Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::ListTransactions(base), .. @@ -2572,6 +2612,24 @@ The connection to the client has been closed." Ok(()) } + fn combine_describe_transactions( + base: &mut DescribeTransactionsResponse, + drain: impl Iterator, + ) -> Result<()> { + for mut next in drain { + if let Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::DescribeTransactions(next), + .. + })) = next.frame() + { + base.transaction_states + .extend(std::mem::take(&mut next.transaction_states)); + } + } + + Ok(()) + } + fn combine_list_transactions( base_list_transactions: &mut ListTransactionsResponse, drain: impl Iterator, diff --git a/shotover/src/transforms/kafka/sink_cluster/split.rs b/shotover/src/transforms/kafka/sink_cluster/split.rs index a9b888853..19a84efa2 100644 --- a/shotover/src/transforms/kafka/sink_cluster/split.rs +++ b/shotover/src/transforms/kafka/sink_cluster/split.rs @@ -12,9 +12,9 @@ use kafka_protocol::messages::{ list_offsets_request::ListOffsetsTopic, offset_fetch_request::OffsetFetchRequestGroup, offset_for_leader_epoch_request::OffsetForLeaderTopic, produce_request::TopicProduceData, AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, DeleteRecordsRequest, - DescribeProducersRequest, GroupId, ListGroupsRequest, ListOffsetsRequest, - ListTransactionsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest, ProduceRequest, - TopicName, + DescribeProducersRequest, DescribeTransactionsRequest, GroupId, ListGroupsRequest, + ListOffsetsRequest, ListTransactionsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest, + ProduceRequest, TopicName, TransactionalId, }; use std::collections::HashMap; @@ -281,6 +281,34 @@ impl RequestSplitAndRouter for ListTransactionsSplitAndRouter { } } +pub struct DescribeTransactionsSplitAndRouter; + +impl RequestSplitAndRouter for DescribeTransactionsSplitAndRouter { + type Request = DescribeTransactionsRequest; + type SubRequests = Vec; + + fn split_by_destination( + transform: &mut KafkaSinkCluster, + request: &mut Self::Request, + ) -> HashMap { + transform.split_describe_transactions_request_by_destination(request) + } + + fn get_request_frame(request: &mut Message) -> &mut Self::Request { + match request.frame() { + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::DescribeTransactions(request), + .. + })) => request, + _ => unreachable!(), + } + } + + fn reassemble(request: &mut Self::Request, item: Self::SubRequests) { + request.transactional_ids = item; + } +} + pub struct OffsetFetchSplitAndRouter; impl RequestSplitAndRouter for OffsetFetchSplitAndRouter { diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index 70c912a2c..5c26d08e1 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -3,7 +3,7 @@ use super::{ ListOffsetsResultInfo, NewPartition, NewPartitionReassignment, NewTopic, OffsetAndMetadata, OffsetSpec, PartitionReassignment, ProduceResult, ProducerState, Record, RecordsToDelete, ResourcePatternType, ResourceSpecifier, ResourceType, TopicDescription, TopicPartition, - TopicPartitionInfo, + TopicPartitionInfo, TransactionDescription, }; use crate::connection::java::{map_iterator, Jvm, Value}; use anyhow::Result; @@ -768,6 +768,28 @@ impl KafkaAdminJava { results } + pub async fn desscribe_transactions( + &self, + transaction_ids: &[&str], + ) -> HashMap { + let transaction_ids = transaction_ids + .iter() + .map(|x| self.jvm.new_string(x)) + .collect(); + let transaction_ids = self.jvm.new_list("java.lang.String", transaction_ids); + let java_results = self + .admin + .call("describeTransactions", vec![transaction_ids]) + .call_async("all", vec![]) + .await; + + map_iterator(java_results) + .map(|(transaction_id, _transaction_description)| { + (transaction_id.into_rust(), TransactionDescription {}) + }) + .collect() + } + pub async fn list_consumer_group_offsets( &self, group_id: String, diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index f2f7e3fef..d208c6140 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -489,6 +489,17 @@ impl KafkaAdmin { } } + pub async fn describe_transactions( + &self, + transaction_ids: &[&str], + ) -> HashMap { + match self { + #[cfg(feature = "kafka-cpp-driver-tests")] + Self::Cpp(_) => panic!("rdkafka-rs driver does not support desscribe_transactions"), + Self::Java(java) => java.desscribe_transactions(transaction_ids).await, + } + } + pub async fn list_consumer_group_offsets( &self, group_id: String, @@ -750,6 +761,9 @@ pub struct RecordsToDelete { pub delete_before_offset: i64, } +#[derive(PartialEq, Debug)] +pub struct TransactionDescription {} + #[derive(PartialEq, Debug)] pub struct PartitionReassignment { pub adding_replica_broker_ids: Vec,