Skip to content

Commit

Permalink
KCA: Option to sanitize topic name for the conenctors that cannot han…
Browse files Browse the repository at this point in the history
…dle pulsar topic names (apache#14475)

Some kafka connectors do not sanitize topic names (or incompletely do) to match what downstream system supports.
It works in kafka in most cases, assuming appropriately named topics. This does not work well with Kafka Connect Adaptor because URI part is getting there.

* Flag to sanitize the topic name (disabled by default) and corresponding functionality.
* test

- [ ] Make sure that the change passes the CI checks.

- added unit test
- verified with specific connector that didn't work without this change

*If `yes` was chosen, please highlight the changes*

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API: (yes / no)
  - The schema: (yes / no / don't know)
  - The default values of configurations: (yes / no)
  - The wire protocol: (yes / no)
  - The rest endpoints: (yes / no)
  - The admin cli options: (yes / no)
  - Anything that affects deployment: (yes / no / don't know)

Check the box below or label this PR directly (if you have committer privilege).

Need to update docs?

- [ ] `doc-required`

  (If you need help on updating docs, create a doc issue)

- [ ] `no-need-doc`

  (Please explain why)

- [x] `doc`

  Config parameter documented in FieldDoc.

(cherry picked from commit fd9e639)
  • Loading branch information
dlg99 authored and nicoloboschi committed Mar 1, 2022
1 parent c4f65cb commit 9e612d5
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,22 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -45,16 +57,6 @@
import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG;
import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG;
Expand Down Expand Up @@ -86,6 +88,11 @@ public class KafkaConnectSink implements Sink<GenericObject> {

protected String topicName;

private boolean sanitizeTopicName = false;
private final Cache<String, String> sanitizedTopicCache =
CacheBuilder.newBuilder().maximumSize(1000)
.expireAfterAccess(30, TimeUnit.MINUTES).build();

@Override
public void write(Record<GenericObject> sourceRecord) {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -140,6 +147,7 @@ public void open(Map<String, Object> config, SinkContext ctx) throws Exception {
"Source must run with Exclusive or Failover subscription type");
topicName = kafkaSinkConfig.getTopic();
unwrapKeyValueIfAvailable = kafkaSinkConfig.isUnwrapKeyValueIfAvailable();
sanitizeTopicName = kafkaSinkConfig.isSanitizeTopicName();

String kafkaConnectorFQClassName = kafkaSinkConfig.getKafkaConnectorSinkClass();
kafkaSinkConfig.getKafkaConnectorConfigProperties().forEach(props::put);
Expand Down Expand Up @@ -280,7 +288,7 @@ protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
// keep timestampType = TimestampType.NO_TIMESTAMP_TYPE
timestamp = sourceRecord.getMessage().get().getPublishTime();
}
return new SinkRecord(topic,
return new SinkRecord(sanitizeNameIfNeeded(topic, sanitizeTopicName),
partition,
keySchema,
key,
Expand All @@ -296,4 +304,25 @@ protected long currentOffset(String topic, int partition) {
return taskContext.currentOffset(topic, partition);
}

// Replace all non-letter, non-digit characters with underscore.
// Append underscore in front of name if it does not begin with alphabet or underscore.
protected String sanitizeNameIfNeeded(String name, boolean sanitize) {
if (!sanitize) {
return name;
}

try {
return sanitizedTopicCache.get(name, () -> {
String sanitizedName = name.replaceAll("[^a-zA-Z0-9_]", "_");
if (sanitizedName.matches("^[^a-zA-Z_].*")) {
sanitizedName = "_" + sanitizedName;
}
return sanitizedName;
});
} catch (ExecutionException e) {
log.error("Failed to get sanitized topic name for {}", name, e);
throw new IllegalStateException("Failed to get sanitized topic name for " + name, e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ public class PulsarKafkaConnectSinkConfig implements Serializable {
help = "In case of Record<KeyValue<>> data use key from KeyValue<> instead of one from Record.")
private boolean unwrapKeyValueIfAvailable = true;

@FieldDoc(
defaultValue = "false",
help = "Some connectors cannot handle pulsar topic names like persistent://a/b/topic"
+ " and do not sanitize the topic name themselves. \n"
+ "If enabled, all non alpha-digital characters in topic name will be replaced with underscores. \n"
+ "In some cases it may result in topic name collisions (topic_a and topic.a will become the same)")
private boolean sanitizeTopicName = false;

public static PulsarKafkaConnectSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), PulsarKafkaConnectSinkConfig.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand All @@ -68,7 +70,9 @@
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -77,6 +81,19 @@
@Slf4j
public class KafkaConnectSinkTest extends ProducerConsumerBase {

public class ResultCaptor<T> implements Answer {
private T result = null;
public T getResult() {
return result;
}

@Override
public T answer(InvocationOnMock invocationOnMock) throws Throwable {
result = (T) invocationOnMock.callRealMethod();
return result;
}
}

private String offsetTopicName = "persistent://my-property/my-ns/kafka-connect-sink-offset";

private Path file;
Expand Down Expand Up @@ -143,6 +160,43 @@ public void smokeTest() throws Exception {
assertEquals("value", lines.get(0));
}

@Test
public void sanitizeTest() throws Exception {
props.put("sanitizeTopicName", "true");
KafkaConnectSink originalSink = new KafkaConnectSink();
KafkaConnectSink sink = spy(originalSink);

final ResultCaptor<SinkRecord> resultCaptor = new ResultCaptor<>();
doAnswer(resultCaptor).when(sink).toSinkRecord(any());

sink.open(props, context);

final GenericRecord rec = getGenericRecord("value", Schema.STRING);
Message msg = mock(MessageImpl.class);
when(msg.getValue()).thenReturn(rec);
when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));

final AtomicInteger status = new AtomicInteger(0);
Record<GenericObject> record = PulsarRecord.<String>builder()
.topicName("persistent://a-b/c-d/fake-topic.a")
.message(msg)
.ackFunction(status::incrementAndGet)
.failFunction(status::decrementAndGet)
.schema(Schema.STRING)
.build();

sink.write(record);
sink.flush();

assertEquals(status.get(), 1);
assertEquals(resultCaptor.getResult().topic(), "persistent___a_b_c_d_fake_topic_a");

sink.close();

List<String> lines = Files.readAllLines(file, StandardCharsets.US_ASCII);
assertEquals(lines.get(0), "value");
}

@Test
public void seekPauseResumeTest() throws Exception {
KafkaConnectSink sink = new KafkaConnectSink();
Expand Down

0 comments on commit 9e612d5

Please sign in to comment.