From 9e612d545bfbccd548cd43c87eeec6dcd2bc9410 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov <8622884+dlg99@users.noreply.github.com> Date: Mon, 28 Feb 2022 12:59:40 -0800 Subject: [PATCH] KCA: Option to sanitize topic name for the conenctors that cannot handle pulsar topic names (#14475) Some kafka connectors do not sanitize topic names (or incompletely do) to match what downstream system supports. It works in kafka in most cases, assuming appropriately named topics. This does not work well with Kafka Connect Adaptor because URI part is getting there. * Flag to sanitize the topic name (disabled by default) and corresponding functionality. * test - [ ] Make sure that the change passes the CI checks. - added unit test - verified with specific connector that didn't work without this change *If `yes` was chosen, please highlight the changes* - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API: (yes / no) - The schema: (yes / no / don't know) - The default values of configurations: (yes / no) - The wire protocol: (yes / no) - The rest endpoints: (yes / no) - The admin cli options: (yes / no) - Anything that affects deployment: (yes / no / don't know) Check the box below or label this PR directly (if you have committer privilege). Need to update docs? - [ ] `doc-required` (If you need help on updating docs, create a doc issue) - [ ] `no-need-doc` (Please explain why) - [x] `doc` Config parameter documented in FieldDoc. (cherry picked from commit fd9e63937f5553f1c064f486afd165b0ce6a16af) --- .../io/kafka/connect/KafkaConnectSink.java | 53 +++++++++++++----- .../connect/PulsarKafkaConnectSinkConfig.java | 8 +++ .../kafka/connect/KafkaConnectSinkTest.java | 54 +++++++++++++++++++ 3 files changed, 103 insertions(+), 12 deletions(-) diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java index 90c11d20597ed..b9257e9ebd243 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java @@ -21,10 +21,22 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; @@ -45,16 +57,6 @@ import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData; import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Properties; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG; import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG; @@ -86,6 +88,11 @@ public class KafkaConnectSink implements Sink { protected String topicName; + private boolean sanitizeTopicName = false; + private final Cache sanitizedTopicCache = + CacheBuilder.newBuilder().maximumSize(1000) + .expireAfterAccess(30, TimeUnit.MINUTES).build(); + @Override public void write(Record sourceRecord) { if (log.isDebugEnabled()) { @@ -140,6 +147,7 @@ public void open(Map config, SinkContext ctx) throws Exception { "Source must run with Exclusive or Failover subscription type"); topicName = kafkaSinkConfig.getTopic(); unwrapKeyValueIfAvailable = kafkaSinkConfig.isUnwrapKeyValueIfAvailable(); + sanitizeTopicName = kafkaSinkConfig.isSanitizeTopicName(); String kafkaConnectorFQClassName = kafkaSinkConfig.getKafkaConnectorSinkClass(); kafkaSinkConfig.getKafkaConnectorConfigProperties().forEach(props::put); @@ -280,7 +288,7 @@ protected SinkRecord toSinkRecord(Record sourceRecord) { // keep timestampType = TimestampType.NO_TIMESTAMP_TYPE timestamp = sourceRecord.getMessage().get().getPublishTime(); } - return new SinkRecord(topic, + return new SinkRecord(sanitizeNameIfNeeded(topic, sanitizeTopicName), partition, keySchema, key, @@ -296,4 +304,25 @@ protected long currentOffset(String topic, int partition) { return taskContext.currentOffset(topic, partition); } + // Replace all non-letter, non-digit characters with underscore. + // Append underscore in front of name if it does not begin with alphabet or underscore. + protected String sanitizeNameIfNeeded(String name, boolean sanitize) { + if (!sanitize) { + return name; + } + + try { + return sanitizedTopicCache.get(name, () -> { + String sanitizedName = name.replaceAll("[^a-zA-Z0-9_]", "_"); + if (sanitizedName.matches("^[^a-zA-Z_].*")) { + sanitizedName = "_" + sanitizedName; + } + return sanitizedName; + }); + } catch (ExecutionException e) { + log.error("Failed to get sanitized topic name for {}", name, e); + throw new IllegalStateException("Failed to get sanitized topic name for " + name, e); + } + } + } diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java index 3037bfe403c22..f30591736c5fd 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java @@ -84,6 +84,14 @@ public class PulsarKafkaConnectSinkConfig implements Serializable { help = "In case of Record> data use key from KeyValue<> instead of one from Record.") private boolean unwrapKeyValueIfAvailable = true; + @FieldDoc( + defaultValue = "false", + help = "Some connectors cannot handle pulsar topic names like persistent://a/b/topic" + + " and do not sanitize the topic name themselves. \n" + + "If enabled, all non alpha-digital characters in topic name will be replaced with underscores. \n" + + "In some cases it may result in topic name collisions (topic_a and topic.a will become the same)") + private boolean sanitizeTopicName = false; + public static PulsarKafkaConnectSinkConfig load(String yamlFile) throws IOException { ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); return mapper.readValue(new File(yamlFile), PulsarKafkaConnectSinkConfig.class); diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java index 2431331f945f8..c353209ba5c94 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java @@ -47,6 +47,8 @@ import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData; import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -68,7 +70,9 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -77,6 +81,19 @@ @Slf4j public class KafkaConnectSinkTest extends ProducerConsumerBase { + public class ResultCaptor implements Answer { + private T result = null; + public T getResult() { + return result; + } + + @Override + public T answer(InvocationOnMock invocationOnMock) throws Throwable { + result = (T) invocationOnMock.callRealMethod(); + return result; + } + } + private String offsetTopicName = "persistent://my-property/my-ns/kafka-connect-sink-offset"; private Path file; @@ -143,6 +160,43 @@ public void smokeTest() throws Exception { assertEquals("value", lines.get(0)); } + @Test + public void sanitizeTest() throws Exception { + props.put("sanitizeTopicName", "true"); + KafkaConnectSink originalSink = new KafkaConnectSink(); + KafkaConnectSink sink = spy(originalSink); + + final ResultCaptor resultCaptor = new ResultCaptor<>(); + doAnswer(resultCaptor).when(sink).toSinkRecord(any()); + + sink.open(props, context); + + final GenericRecord rec = getGenericRecord("value", Schema.STRING); + Message msg = mock(MessageImpl.class); + when(msg.getValue()).thenReturn(rec); + when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0)); + + final AtomicInteger status = new AtomicInteger(0); + Record record = PulsarRecord.builder() + .topicName("persistent://a-b/c-d/fake-topic.a") + .message(msg) + .ackFunction(status::incrementAndGet) + .failFunction(status::decrementAndGet) + .schema(Schema.STRING) + .build(); + + sink.write(record); + sink.flush(); + + assertEquals(status.get(), 1); + assertEquals(resultCaptor.getResult().topic(), "persistent___a_b_c_d_fake_topic_a"); + + sink.close(); + + List lines = Files.readAllLines(file, StandardCharsets.US_ASCII); + assertEquals(lines.get(0), "value"); + } + @Test public void seekPauseResumeTest() throws Exception { KafkaConnectSink sink = new KafkaConnectSink();