Skip to content

Commit

Permalink
KAFKA-15004: Fix configuration dual-write during migration (#13767)
Browse files Browse the repository at this point in the history
This patch fixes several small bugs with configuration dual-write during migration.

* Topic configs are not written back to ZK while handling snapshot.
* New broker/topic configs in KRaft that did not exist in ZK will not be written to ZK.
* The sensitive configs are not encoded while writing them to Zookeeper.
* Handle topic configs in ConfigMigrationClient and KRaftMigrationZkWriter#handleConfigsSnapshot

Added tests to ensure we no longer have the above mentioned issues.

Co-authored-by: Akhilesh Chaganti <[email protected]>
Reviewers: Colin P. McCabe <[email protected]>
  • Loading branch information
mumrah and akhileshchg committed May 31, 2023
1 parent a485c24 commit ddff9d5
Show file tree
Hide file tree
Showing 11 changed files with 257 additions and 72 deletions.
72 changes: 37 additions & 35 deletions core/src/main/scala/kafka/zk/ZkMigrationClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.{AuthFailedException, NoAuthException, SessionClosedRequireAuthException}

import java.{lang, util}
import java.util.Properties
import java.util.function.Consumer
import scala.collection.Seq
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -145,44 +144,47 @@ class ZkMigrationClient(
topicClient.iterateTopics(
util.EnumSet.allOf(classOf[TopicVisitorInterest]),
new TopicVisitor() {
override def visitTopic(topicName: String, topicId: Uuid, assignments: util.Map[Integer, util.List[Integer]]): Unit = {
if (!topicBatch.isEmpty) {
recordConsumer.accept(topicBatch)
topicBatch = new util.ArrayList[ApiMessageAndVersion]()
}
override def visitTopic(topicName: String, topicId: Uuid, assignments: util.Map[Integer, util.List[Integer]]): Unit = {
if (!topicBatch.isEmpty) {
recordConsumer.accept(topicBatch)
topicBatch = new util.ArrayList[ApiMessageAndVersion]()
}

topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
.setName(topicName)
.setTopicId(topicId), 0.toShort))
}
topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
.setName(topicName)
.setTopicId(topicId), 0.toShort))

override def visitPartition(topicIdPartition: TopicIdPartition, partitionRegistration: PartitionRegistration): Unit = {
val record = new PartitionRecord()
.setTopicId(topicIdPartition.topicId())
.setPartitionId(topicIdPartition.partition())
.setReplicas(partitionRegistration.replicas.map(Integer.valueOf).toList.asJava)
.setAddingReplicas(partitionRegistration.addingReplicas.map(Integer.valueOf).toList.asJava)
.setRemovingReplicas(partitionRegistration.removingReplicas.map(Integer.valueOf).toList.asJava)
.setIsr(partitionRegistration.isr.map(Integer.valueOf).toList.asJava)
.setLeader(partitionRegistration.leader)
.setLeaderEpoch(partitionRegistration.leaderEpoch)
.setPartitionEpoch(partitionRegistration.partitionEpoch)
.setLeaderRecoveryState(partitionRegistration.leaderRecoveryState.value())
partitionRegistration.replicas.foreach(brokerIdConsumer.accept(_))
partitionRegistration.addingReplicas.foreach(brokerIdConsumer.accept(_))
topicBatch.add(new ApiMessageAndVersion(record, 0.toShort))
}
// This breaks the abstraction a bit, but the topic configs belong in the topic batch
// when migrating topics and the logic for reading configs lives elsewhere
configClient.readTopicConfigs(topicName, (topicConfigs: util.Map[String, String]) => {
topicConfigs.forEach((key: Any, value: Any) => {
topicBatch.add(new ApiMessageAndVersion(new ConfigRecord()
.setResourceType(ConfigResource.Type.TOPIC.id)
.setResourceName(topicName)
.setName(key.toString)
.setValue(value.toString), 0.toShort))
})
})
}

override def visitConfigs(topicName: String, topicProps: Properties): Unit = {
topicProps.forEach((key: Any, value: Any) => {
topicBatch.add(new ApiMessageAndVersion(new ConfigRecord()
.setResourceType(ConfigResource.Type.TOPIC.id)
.setResourceName(topicName)
.setName(key.toString)
.setValue(value.toString), 0.toShort))
})
override def visitPartition(topicIdPartition: TopicIdPartition, partitionRegistration: PartitionRegistration): Unit = {
val record = new PartitionRecord()
.setTopicId(topicIdPartition.topicId())
.setPartitionId(topicIdPartition.partition())
.setReplicas(partitionRegistration.replicas.map(Integer.valueOf).toList.asJava)
.setAddingReplicas(partitionRegistration.addingReplicas.map(Integer.valueOf).toList.asJava)
.setRemovingReplicas(partitionRegistration.removingReplicas.map(Integer.valueOf).toList.asJava)
.setIsr(partitionRegistration.isr.map(Integer.valueOf).toList.asJava)
.setLeader(partitionRegistration.leader)
.setLeaderEpoch(partitionRegistration.leaderEpoch)
.setPartitionEpoch(partitionRegistration.partitionEpoch)
.setLeaderRecoveryState(partitionRegistration.leaderRecoveryState.value())
partitionRegistration.replicas.foreach(brokerIdConsumer.accept(_))
partitionRegistration.addingReplicas.foreach(brokerIdConsumer.accept(_))
topicBatch.add(new ApiMessageAndVersion(record, 0.toShort))
}
}
})
)

if (!topicBatch.isEmpty) {
recordConsumer.accept(topicBatch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import kafka.zk.ZkMigrationClient.{logAndRethrow, wrapZkException}
import kafka.zk._
import kafka.zookeeper.{CreateRequest, DeleteRequest, SetDataRequest}
import org.apache.kafka.clients.admin.ScramMechanism
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
import org.apache.kafka.common.errors.InvalidRequestException
import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
Expand All @@ -35,7 +36,7 @@ import org.apache.zookeeper.{CreateMode, KeeperException}

import java.{lang, util}
import java.util.Properties
import java.util.function.BiConsumer
import java.util.function.{BiConsumer, Consumer}
import scala.collection.Seq
import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -145,6 +146,28 @@ class ZkConfigMigrationClient(
}
}

override def iterateTopicConfigs(configConsumer: BiConsumer[String, util.Map[String, String]]): Unit = {
val topicEntities = zkClient.getAllEntitiesWithConfig(ConfigType.Topic)
topicEntities.foreach { topic =>
readTopicConfigs(topic, props => configConsumer.accept(topic, props))
}
}

override def readTopicConfigs(topicName: String, configConsumer: Consumer[util.Map[String, String]]): Unit = {
val topicResource = fromZkEntityName(topicName)
val props = zkClient.getEntityConfigs(ConfigType.Topic, topicResource)
val decodedProps = props.asScala.map { case (key, value) =>
if (DynamicBrokerConfig.isPasswordConfig(key))
key -> passwordEncoder.decode(value).value
else
key -> value
}.toMap.asJava

logAndRethrow(this, s"Error in topic config consumer. Topic was $topicResource.") {
configConsumer.accept(decodedProps)
}
}

override def writeConfigs(
configResource: ConfigResource,
configMap: util.Map[String, String],
Expand All @@ -159,7 +182,12 @@ class ZkConfigMigrationClient(
val configName = toZkEntityName(configResource.name())
if (configType.isDefined) {
val props = new Properties()
configMap.forEach { case (key, value) => props.put(key, value) }
configMap.forEach { case (key, value) =>
if (DynamicBrokerConfig.isPasswordConfig(key)) {
props.put(key, passwordEncoder.encode(new Password(value)))
} else
props.put(key, value)
}
tryWriteEntityConfig(configType.get, configName, props, create = false, state) match {
case Some(newState) =>
newState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClie
throw new IllegalArgumentException("Must specify at least TOPICS in topic visitor interests.")
}
val topics = zkClient.getAllTopicsInCluster()
val topicConfigs = zkClient.getEntitiesConfigs(ConfigType.Topic, topics)
val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(topics)
replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(topic, topicIdOpt, partitionAssignments) =>
val topicAssignment = partitionAssignments.map { case (partition, assignment) =>
Expand Down Expand Up @@ -91,12 +90,6 @@ class ZkTopicMigrationClient(zkClient: KafkaZkClient) extends TopicMigrationClie
}
}
}
if (interests.contains(TopicVisitorInterest.CONFIGS)) {
val props = topicConfigs(topic)
logAndRethrow(this, s"Error in topic config consumer. Topic was $topic.") {
visitor.visitConfigs(topic, props)
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.kafka.server.util.MockRandom
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.Test

import java.util
import java.util.Properties
import scala.collection.Map
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -68,6 +69,23 @@ class ZkConfigMigrationClientTest extends ZkMigrationTestHarness {
}
})

// Update the sensitive config value from the config client and check that the value
// persisted in Zookeeper is encrypted.
val newProps = new util.HashMap[String, String]()
newProps.put(KafkaConfig.DefaultReplicationFactorProp, "2") // normal config
newProps.put(KafkaConfig.SslKeystorePasswordProp, NEW_SECRET) // sensitive config
migrationState = migrationClient.configClient().writeConfigs(
new ConfigResource(ConfigResource.Type.BROKER, "1"), newProps, migrationState)
val actualPropsInZk = zkClient.getEntityConfigs(ConfigType.Broker, "1")
assertEquals(2, actualPropsInZk.size())
actualPropsInZk.forEach { case (key, value) =>
if (key == KafkaConfig.SslKeystorePasswordProp) {
assertEquals(NEW_SECRET, encoder.decode(value.toString).value)
} else {
assertEquals(newProps.get(key), value)
}
}

migrationState = migrationClient.configClient().deleteConfigs(
new ConfigResource(ConfigResource.Type.BROKER, "1"), migrationState)
assertEquals(0, zkClient.getEntityConfigs(ConfigType.Broker, "1").size())
Expand Down
125 changes: 115 additions & 10 deletions core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
package kafka.zk.migration

import kafka.api.LeaderAndIsr
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.controller.{LeaderIsrAndControllerEpoch, ReplicaAssignment}
import kafka.coordinator.transaction.ProducerIdManager
import kafka.zk.migration.ZkMigrationTestHarness
import org.apache.kafka.common.config.TopicConfig
import kafka.server.{ConfigType, KafkaConfig}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.errors.ControllerMovedException
import org.apache.kafka.common.metadata.{ConfigRecord, MetadataRecordType, ProducerIdsRecord}
import org.apache.kafka.common.metadata.{ConfigRecord, MetadataRecordType, PartitionRecord, ProducerIdsRecord, TopicRecord}
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
import org.apache.kafka.metadata.migration.{KRaftMigrationZkWriter, ZkMigrationLeadershipState}
import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail}
Expand Down Expand Up @@ -252,11 +253,115 @@ class ZkMigrationClientTest extends ZkMigrationTestHarness {
.map {_.message() }
.filter(message => MetadataRecordType.fromId(message.apiKey()).equals(MetadataRecordType.CONFIG_RECORD))
.map { _.asInstanceOf[ConfigRecord] }
.toSeq
.map { record => record.name() -> record.value()}
.toMap
assertEquals(2, configs.size)
assertEquals(TopicConfig.FLUSH_MS_CONFIG, configs.head.name())
assertEquals("60000", configs.head.value())
assertEquals(TopicConfig.RETENTION_MS_CONFIG, configs.last.name())
assertEquals("300000", configs.last.value())
assertTrue(configs.contains(TopicConfig.FLUSH_MS_CONFIG))
assertEquals("60000", configs(TopicConfig.FLUSH_MS_CONFIG))
assertTrue(configs.contains(TopicConfig.RETENTION_MS_CONFIG))
assertEquals("300000", configs(TopicConfig.RETENTION_MS_CONFIG))
}

@Test
def testTopicAndBrokerConfigsMigrationWithSnapshots(): Unit = {
val kraftWriter = new KRaftMigrationZkWriter(migrationClient, (_, operation) => {
migrationState = operation.apply(migrationState)
})

// Add add some topics and broker configs and create new image.
val topicName = "testTopic"
val partition = 0
val tp = new TopicPartition(topicName, partition)
val leaderPartition = 1
val leaderEpoch = 100
val partitionEpoch = 10
val brokerId = "1"
val replicas = List(1, 2, 3).map(int2Integer).asJava
val topicId = Uuid.randomUuid()
val props = new Properties()
props.put(KafkaConfig.DefaultReplicationFactorProp, "1") // normal config
props.put(KafkaConfig.SslKeystorePasswordProp, SECRET) // sensitive config

// // Leave Zk in an incomplete state.
// zkClient.createTopicAssignment(topicName, Some(topicId), Map(tp -> Seq(1)))

val delta = new MetadataDelta(MetadataImage.EMPTY)
delta.replay(new TopicRecord()
.setTopicId(topicId)
.setName(topicName)
)
delta.replay(new PartitionRecord()
.setTopicId(topicId)
.setIsr(replicas)
.setLeader(leaderPartition)
.setReplicas(replicas)
.setAddingReplicas(List.empty.asJava)
.setRemovingReplicas(List.empty.asJava)
.setLeaderEpoch(leaderEpoch)
.setPartitionEpoch(partitionEpoch)
.setPartitionId(partition)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value())
)
// Use same props for the broker and topic.
props.asScala.foreach { case (key, value) =>
delta.replay(new ConfigRecord()
.setName(key)
.setValue(value)
.setResourceName(topicName)
.setResourceType(ConfigResource.Type.TOPIC.id())
)
delta.replay(new ConfigRecord()
.setName(key)
.setValue(value)
.setResourceName(brokerId)
.setResourceType(ConfigResource.Type.BROKER.id())
)
}
val image = delta.apply(MetadataProvenance.EMPTY)

// Handle migration using the generated snapshot.
kraftWriter.handleSnapshot(image)

// Verify topic state.
val topicIdReplicaAssignment =
zkClient.getReplicaAssignmentAndTopicIdForTopics(Set(topicName))
assertEquals(1, topicIdReplicaAssignment.size)
topicIdReplicaAssignment.foreach { assignment =>
assertEquals(topicName, assignment.topic)
assertEquals(Some(topicId), assignment.topicId)
assertEquals(Map(tp -> ReplicaAssignment(replicas.asScala.map(Integer2int).toSeq)),
assignment.assignment)
}

// Verify the topic partition states.
val topicPartitionState = zkClient.getTopicPartitionState(tp)
assertTrue(topicPartitionState.isDefined)
topicPartitionState.foreach { state =>
assertEquals(leaderPartition, state.leaderAndIsr.leader)
assertEquals(leaderEpoch, state.leaderAndIsr.leaderEpoch)
assertEquals(LeaderRecoveryState.RECOVERED, state.leaderAndIsr.leaderRecoveryState)
assertEquals(replicas.asScala.map(Integer2int).toList, state.leaderAndIsr.isr)
}

// Verify the broker and topic configs (including sensitive configs).
val brokerProps = zkClient.getEntityConfigs(ConfigType.Broker, brokerId)
val topicProps = zkClient.getEntityConfigs(ConfigType.Topic, topicName)
assertEquals(2, brokerProps.size())

brokerProps.asScala.foreach { case (key, value) =>
if (key == KafkaConfig.SslKeystorePasswordProp) {
assertEquals(SECRET, encoder.decode(value).value)
} else {
assertEquals(props.getProperty(key), value)
}
}

topicProps.asScala.foreach { case (key, value) =>
if (key == KafkaConfig.SslKeystorePasswordProp) {
assertEquals(SECRET, encoder.decode(value).value)
} else {
assertEquals(props.getProperty(key), value)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class ZkMigrationTestHarness extends QuorumTestHarness {

val SECRET = "secret"

val NEW_SECRET = "newSecret"

val encoder: PasswordEncoder = {
val encoderProps = new Properties()
encoderProps.put(KafkaConfig.ZkConnectProp, "localhost:1234") // Get around the config validation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public interface ConfigMigrationClient {

Expand All @@ -38,6 +39,10 @@ interface ClientQuotaVisitor {

void iterateBrokerConfigs(BiConsumer<String, Map<String, String>> configConsumer);

void iterateTopicConfigs(BiConsumer<String, Map<String, String>> configConsumer);

void readTopicConfigs(String topicName, Consumer<Map<String, String>> configConsumer);

ZkMigrationLeadershipState writeConfigs(
ConfigResource configResource,
Map<String, String> configMap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private void recoverMigrationStateFromZK() {
String maybeDone = migrationLeadershipState.zkMigrationComplete() ? "done" : "not done";
log.info("Recovered migration state {}. ZK migration is {}.", migrationLeadershipState, maybeDone);

// Once we've recovered the migration state from ZK, install this class as a metadata published
// Once we've recovered the migration state from ZK, install this class as a metadata publisher
// by calling the initialZkLoadHandler.
initialZkLoadHandler.accept(this);

Expand Down
Loading

0 comments on commit ddff9d5

Please sign in to comment.