From cad7e28b782c8c942bb60d6b3c9ca69ecdf8ec63 Mon Sep 17 00:00:00 2001 From: chenhang Date: Thu, 22 Apr 2021 18:49:59 +0800 Subject: [PATCH 1/4] add inner topic write protection in kop side --- .../handlers/kop/KafkaRequestHandler.java | 21 ++- .../pulsar/handlers/kop/PendingProduce.java | 6 + .../kop/InnerTopicProtectionTest.java | 167 ++++++++++++++++++ 3 files changed, 191 insertions(+), 3 deletions(-) create mode 100644 tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 53460b38e7..b6e98e1224 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -78,6 +78,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.LeaderNotAvailableException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -651,6 +652,13 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar, for (Map.Entry entry : produceRequest.partitionRecordsOrFail().entrySet()) { TopicPartition topicPartition = entry.getKey(); try { + String fullPartitionName = KopTopic.toString(topicPartition); + if (isOffsetTopic(fullPartitionName) || isTransactionTopic(fullPartitionName)) { + log.error("[{}] Request {}: not support produce message to inner topic. topic: {}", + ctx.channel(), produceHar.getHeader(), topicPartition); + throw new InvalidTopicException(Errors.INVALID_TOPIC_EXCEPTION.message()); + } + MemoryRecords validRecords = validateRecords(produceHar.getHeader().apiVersion(), topicPartition, (MemoryRecords) entry.getValue()); @@ -663,7 +671,6 @@ protected void handleProduceRequest(KafkaHeaderAndRequest produceHar, topicPartition.topic(), topicPartition.partition(), responsesSize); } - String fullPartitionName = KopTopic.toString(topicPartition); PendingProduce pendingProduce = new PendingProduce(partitionResponse, topicManager, fullPartitionName, entryFormatter, validRecords, executor, transactionCoordinator, requestStats); PendingProduceQueue queue = @@ -1692,12 +1699,20 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { return returnFuture; } - private boolean isOffsetTopic(String topic) { + protected boolean isOffsetTopic(String topic) { String offsetsTopic = kafkaConfig.getKafkaMetadataTenant() + "/" + kafkaConfig.getKafkaMetadataNamespace() + "/" + GROUP_METADATA_TOPIC_NAME; - return topic.contains(offsetsTopic); + return topic != null && topic.contains(offsetsTopic); + } + + protected boolean isTransactionTopic(String topic) { + String transactionTopic = kafkaConfig.getKafkaMetadataNamespace() + "/" + + kafkaConfig.getKafkaMetadataNamespace() + + "/" + TRANSACTION_STATE_TOPIC_NAME; + + return topic != null && topic.contains(transactionTopic); } public CompletableFuture findBroker(TopicName topic) { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java index 8ce3db6d9a..e3d06d1a80 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java @@ -126,6 +126,12 @@ public void publishMessages() { throw new RuntimeException(e); } + if (persistentTopic.isSystemTopic()) { + log.error("Not support produce message to system topic: {}", persistentTopic); + responseFuture.complete(new PartitionResponse(Errors.INVALID_TOPIC_EXCEPTION)); + return; + } + if (log.isDebugEnabled()) { log.debug("publishMessages for topic partition: {}, records size is {}", partitionName, numMessages); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java new file mode 100644 index 0000000000..5e272585a7 --- /dev/null +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java @@ -0,0 +1,167 @@ +package io.streamnative.pulsar.handlers.kop; + +import com.google.common.collect.Sets; +import java.net.URL; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Optional; +import java.util.Properties; + +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Slf4j +public class InnerTopicProtectionTest extends KopProtocolHandlerTestBase { + + protected int offsetsTopicNumPartitions; + + protected KafkaServiceConfiguration resetConfig(int brokerPort, int webPort, int kafkaPort) { + KafkaServiceConfiguration kConfig = new KafkaServiceConfiguration(); + kConfig.setBrokerServicePort(Optional.ofNullable(brokerPort)); + kConfig.setWebServicePort(Optional.ofNullable(webPort)); + kConfig.setListeners(PLAINTEXT_PREFIX + "localhost:" + kafkaPort); + + kConfig.setOffsetsTopicNumPartitions(offsetsTopicNumPartitions); + kConfig.setEnableGroupCoordinator(true); + + kConfig.setAdvertisedAddress("localhost"); + kConfig.setClusterName(configClusterName); + kConfig.setManagedLedgerCacheSizeMB(8); + kConfig.setActiveConsumerFailoverDelayTimeMillis(0); + kConfig.setDefaultNumberOfNamespaceBundles(2); + kConfig.setZookeeperServers("localhost:2181"); + kConfig.setConfigurationStoreServers("localhost:3181"); + kConfig.setEnableGroupCoordinator(true); + kConfig.setAuthenticationEnabled(false); + kConfig.setAuthorizationEnabled(false); + kConfig.setAllowAutoTopicCreation(true); + kConfig.setAllowAutoTopicCreationType("partitioned"); + kConfig.setBrokerDeleteInactiveTopicsEnabled(false); + //kConfig.setSystemTopicEnabled(true); + //kConfig.setTopicLevelPoliciesEnabled(true); + + // set protocol related config + URL testHandlerUrl = this.getClass().getClassLoader().getResource("test-protocol-handler.nar"); + Path handlerPath; + try { + handlerPath = Paths.get(testHandlerUrl.toURI()); + } catch (Exception e) { + log.error("failed to get handler Path, handlerUrl: {}. Exception: ", testHandlerUrl, e); + return null; + } + + String protocolHandlerDir = handlerPath.toFile().getParent(); + + kConfig.setProtocolHandlerDirectory( + protocolHandlerDir + ); + kConfig.setMessagingProtocols(Sets.newHashSet("kafka")); + + return kConfig; + } + + @Override + protected void resetConfig() { + offsetsTopicNumPartitions = 16; + conf = resetConfig( + brokerPort, + brokerWebservicePort, + kafkaBrokerPort); + + log.info("Ports -- broker: {}, brokerWeb:{}, kafka: {}", + brokerPort, brokerWebservicePort, kafkaBrokerPort); + } + + @BeforeMethod + @Override + protected void setup() throws Exception { + super.internalSetup(); + log.info("success internal setup"); + + if (!admin.clusters().getClusters().contains(configClusterName)) { + // so that clients can test short names + admin.clusters().createCluster(configClusterName, + new ClusterData("http://127.0.0.1:" + brokerWebservicePort)); + } else { + admin.clusters().updateCluster(configClusterName, + new ClusterData("http://127.0.0.1:" + brokerWebservicePort)); + } + + if (!admin.tenants().getTenants().contains("public")) { + admin.tenants().createTenant("public", + new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test"))); + } else { + admin.tenants().updateTenant("public", + new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test"))); + } + if (!admin.namespaces().getNamespaces("public").contains("public/default")) { + admin.namespaces().createNamespace("public/default"); + admin.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet("test")); + admin.namespaces().setRetention("public/default", + new RetentionPolicies(60, 1000)); + } + if (!admin.namespaces().getNamespaces("public").contains("public/__kafka")) { + admin.namespaces().createNamespace("public/__kafka"); + admin.namespaces().setNamespaceReplicationClusters("public/__kafka", Sets.newHashSet("test")); + admin.namespaces().setRetention("public/__kafka", + new RetentionPolicies(-1, -1)); + } + } + + @AfterMethod + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test(timeOut = 30000) + public void testInnerTopicProduce() throws Exception { + final String offsetTopic = "public/__kafka/__consumer_offsets"; + final String transactionTopic = "public/__kafka/__transaction_state"; + final String systemTopic = "__change_events"; + final String commonTopic = "normal-topic"; + + // test inner topic produce + @Cleanup + final KafkaProducer kafkaProducer = newKafkaProducer(); + final String msg = "test-inner-topic-produce-and-consume"; + assertProduceMessage(kafkaProducer, offsetTopic, msg, true); + assertProduceMessage(kafkaProducer, transactionTopic, msg, true); + assertProduceMessage(kafkaProducer, commonTopic, msg, false); + } + + private void assertProduceMessage(KafkaProducer producer, final String topic, final String value, + boolean assertException) { + try { + producer.send(new ProducerRecord<>(topic, value), ((metadata, exception) -> { + if (assertException) { + Assert.assertNotNull(exception); + } else { + Assert.assertNull(exception); + } + })).get(); + } catch (Exception e) { + + } + } + + protected KafkaProducer newKafkaProducer() { + final Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000); + return new KafkaProducer<>(props); + } +} From b86c9320625ca95001bf74f6072107af90da8b74 Mon Sep 17 00:00:00 2001 From: chenhang Date: Thu, 22 Apr 2021 19:38:06 +0800 Subject: [PATCH 2/4] format code --- .../handlers/kop/KafkaRequestHandler.java | 2 +- .../kop/InnerTopicProtectionTest.java | 20 ++++++++++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index b6e98e1224..2f0252ed07 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -1708,7 +1708,7 @@ protected boolean isOffsetTopic(String topic) { } protected boolean isTransactionTopic(String topic) { - String transactionTopic = kafkaConfig.getKafkaMetadataNamespace() + "/" + String transactionTopic = kafkaConfig.getKafkaMetadataTenant() + "/" + kafkaConfig.getKafkaMetadataNamespace() + "/" + TRANSACTION_STATE_TOPIC_NAME; diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java index 5e272585a7..18501d918b 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java @@ -1,3 +1,16 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.streamnative.pulsar.handlers.kop; import com.google.common.collect.Sets; @@ -13,6 +26,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfo; @@ -21,6 +35,9 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +/** + * Inner topic protection test. + */ @Slf4j public class InnerTopicProtectionTest extends KopProtocolHandlerTestBase { @@ -126,12 +143,13 @@ protected void cleanup() throws Exception { } @Test(timeOut = 30000) - public void testInnerTopicProduce() throws Exception { + public void testInnerTopicProduce() throws PulsarAdminException { final String offsetTopic = "public/__kafka/__consumer_offsets"; final String transactionTopic = "public/__kafka/__transaction_state"; final String systemTopic = "__change_events"; final String commonTopic = "normal-topic"; + admin.topics().createPartitionedTopic(commonTopic, 3); // test inner topic produce @Cleanup final KafkaProducer kafkaProducer = newKafkaProducer(); From 567f3196eadd6048ef425fbdd99efbdb275f9ad4 Mon Sep 17 00:00:00 2001 From: chenhang Date: Fri, 23 Apr 2021 10:22:53 +0800 Subject: [PATCH 3/4] turn off system topic check --- .../pulsar/handlers/kop/PendingProduce.java | 6 ------ .../handlers/kop/InnerTopicProtectionTest.java | 15 +++++++-------- 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java index e3d06d1a80..8ce3db6d9a 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java @@ -126,12 +126,6 @@ public void publishMessages() { throw new RuntimeException(e); } - if (persistentTopic.isSystemTopic()) { - log.error("Not support produce message to system topic: {}", persistentTopic); - responseFuture.complete(new PartitionResponse(Errors.INVALID_TOPIC_EXCEPTION)); - return; - } - if (log.isDebugEnabled()) { log.debug("publishMessages for topic partition: {}, records size is {}", partitionName, numMessages); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java index 18501d918b..5b9ad8e705 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java @@ -162,15 +162,14 @@ public void testInnerTopicProduce() throws PulsarAdminException { private void assertProduceMessage(KafkaProducer producer, final String topic, final String value, boolean assertException) { try { - producer.send(new ProducerRecord<>(topic, value), ((metadata, exception) -> { - if (assertException) { - Assert.assertNotNull(exception); - } else { - Assert.assertNull(exception); - } - })).get(); + producer.send(new ProducerRecord<>(topic, value)).get(); } catch (Exception e) { - + if (assertException) { + Assert.assertEquals(e.getCause().getMessage(), + "The request attempted to perform an operation on an invalid topic."); + } else { + Assert.fail(); + } } } From cb99264c2da2bd673c00940da7cd9cc777776905 Mon Sep 17 00:00:00 2001 From: chenhang Date: Fri, 23 Apr 2021 16:45:50 +0800 Subject: [PATCH 4/4] add system topic protection test --- .../streamnative/pulsar/handlers/kop/PendingProduce.java | 6 ++++++ pom.xml | 8 ++++++++ .../pulsar/handlers/kop/InnerTopicProtectionTest.java | 5 +++-- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java index 8ce3db6d9a..62ec8c9ac5 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/PendingProduce.java @@ -126,6 +126,12 @@ public void publishMessages() { throw new RuntimeException(e); } + if (persistentTopic.isSystemTopic()) { + log.error("Not support producing message to system topic: {}", persistentTopic); + responseFuture.complete(new PartitionResponse(Errors.INVALID_TOPIC_EXCEPTION)); + return; + } + if (log.isDebugEnabled()) { log.debug("publishMessages for topic partition: {}, records size is {}", partitionName, numMessages); } diff --git a/pom.xml b/pom.xml index 06d3eee175..a06f8f36e7 100644 --- a/pom.xml +++ b/pom.xml @@ -61,6 +61,7 @@ 1.4.1.Final 6.19 3.1.8 + 1.10.2 @@ -228,6 +229,13 @@ test + + org.apache.avro + avro + ${avro.version} + provided + + diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java index 5b9ad8e705..e600f0e005 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/InnerTopicProtectionTest.java @@ -65,8 +65,8 @@ protected KafkaServiceConfiguration resetConfig(int brokerPort, int webPort, int kConfig.setAllowAutoTopicCreation(true); kConfig.setAllowAutoTopicCreationType("partitioned"); kConfig.setBrokerDeleteInactiveTopicsEnabled(false); - //kConfig.setSystemTopicEnabled(true); - //kConfig.setTopicLevelPoliciesEnabled(true); + kConfig.setSystemTopicEnabled(true); + kConfig.setTopicLevelPoliciesEnabled(true); // set protocol related config URL testHandlerUrl = this.getClass().getClassLoader().getResource("test-protocol-handler.nar"); @@ -156,6 +156,7 @@ public void testInnerTopicProduce() throws PulsarAdminException { final String msg = "test-inner-topic-produce-and-consume"; assertProduceMessage(kafkaProducer, offsetTopic, msg, true); assertProduceMessage(kafkaProducer, transactionTopic, msg, true); + assertProduceMessage(kafkaProducer, systemTopic, msg, true); assertProduceMessage(kafkaProducer, commonTopic, msg, false); }