Skip to content

Commit

Permalink
Add topicPattern property to KafkaIO.Read to match topics using a reg…
Browse files Browse the repository at this point in the history
…ex (#26948)

* Add topicPattern property to KafkaIO.Read to match topics using a regex

* Add partially matched and unmatched topic pattern test

* Change method signature of withTopicPattern to use String
  • Loading branch information
sjvanrossum authored Jun 30, 2023
1 parent 6247318 commit c36f0f1
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
Expand Down Expand Up @@ -350,10 +351,11 @@
* href="https://beam.apache.org/blog/splittable-do-fn/">blog post</a> and <a
* href="https://s.apache.org/beam-fn-api">design doc</a>. The major difference from {@link
* KafkaIO.Read} is, {@link ReadSourceDescriptors} doesn't require source descriptions(e.g., {@link
* KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
* KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, the
* pipeline can populate these source descriptions during runtime. For example, the pipeline can
* query Kafka topics from a BigQuery table and read these topics via {@link ReadSourceDescriptors}.
* KafkaIO.Read#getTopicPattern()}, {@link KafkaIO.Read#getTopicPartitions()}, {@link
* KafkaIO.Read#getTopics()}, {@link KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline
* construction time. Instead, the pipeline can populate these source descriptions during runtime.
* For example, the pipeline can query Kafka topics from a BigQuery table and read these topics via
* {@link ReadSourceDescriptors}.
*
* <h3>Common Kafka Consumer Configurations</h3>
*
Expand Down Expand Up @@ -633,6 +635,9 @@ public abstract static class Read<K, V>
@Pure
abstract @Nullable List<TopicPartition> getTopicPartitions();

@Pure
abstract @Nullable Pattern getTopicPattern();

@Pure
abstract @Nullable Coder<K> getKeyCoder();

Expand Down Expand Up @@ -692,6 +697,8 @@ abstract static class Builder<K, V> {

abstract Builder<K, V> setTopicPartitions(List<TopicPartition> topicPartitions);

abstract Builder<K, V> setTopicPattern(Pattern topicPattern);

abstract Builder<K, V> setKeyCoder(Coder<K> keyCoder);

abstract Builder<K, V> setValueCoder(Coder<V> valueCoder);
Expand Down Expand Up @@ -922,8 +929,9 @@ public Read<K, V> withTopic(String topic) {
*/
public Read<K, V> withTopics(List<String> topics) {
checkState(
getTopicPartitions() == null || getTopicPartitions().isEmpty(),
"Only topics or topicPartitions can be set, not both");
(getTopicPartitions() == null || getTopicPartitions().isEmpty())
&& getTopicPattern() == null,
"Only one of topics, topicPartitions or topicPattern can be set");
return toBuilder().setTopics(ImmutableList.copyOf(topics)).build();
}

Expand All @@ -936,11 +944,26 @@ public Read<K, V> withTopics(List<String> topics) {
*/
public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
checkState(
getTopics() == null || getTopics().isEmpty(),
"Only topics or topicPartitions can be set, not both");
(getTopics() == null || getTopics().isEmpty()) && getTopicPattern() == null,
"Only one of topics, topicPartitions or topicPattern can be set");
return toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
}

/**
* Internally sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions
* from each of the matching topics are read.
*
* <p>See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the
* partitions are distributed among the splits.
*/
public Read<K, V> withTopicPattern(String topicPattern) {
checkState(
(getTopics() == null || getTopics().isEmpty())
&& (getTopicPartitions() == null || getTopicPartitions().isEmpty()),
"Only one of topics, topicPartitions or topicPattern can be set");
return toBuilder().setTopicPattern(Pattern.compile(topicPattern)).build();
}

/**
* Sets a Kafka {@link Deserializer} to interpret key bytes read from Kafka.
*
Expand Down Expand Up @@ -1274,8 +1297,9 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
if (!isDynamicRead()) {
checkArgument(
(getTopics() != null && getTopics().size() > 0)
|| (getTopicPartitions() != null && getTopicPartitions().size() > 0),
"Either withTopic(), withTopics() or withTopicPartitions() is required");
|| (getTopicPartitions() != null && getTopicPartitions().size() > 0)
|| getTopicPattern() != null,
"Either withTopic(), withTopics(), withTopicPartitions() or withTopicPattern() is required");
} else {
checkArgument(
ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), "beam_fn_api"),
Expand Down Expand Up @@ -1537,6 +1561,7 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
kafkaRead.getConsumerConfig(),
kafkaRead.getCheckStopReadingFn(),
topics,
kafkaRead.getTopicPattern(),
kafkaRead.getStartReadTime(),
kafkaRead.getStopReadTime()));
} else {
Expand All @@ -1561,6 +1586,7 @@ static class GenerateKafkaSourceDescriptor extends DoFn<byte[], KafkaSourceDescr
this.consumerFactoryFn = read.getConsumerFactoryFn();
this.topics = read.getTopics();
this.topicPartitions = read.getTopicPartitions();
this.topicPattern = read.getTopicPattern();
this.startReadTime = read.getStartReadTime();
this.stopReadTime = read.getStopReadTime();
}
Expand All @@ -1578,15 +1604,30 @@ static class GenerateKafkaSourceDescriptor extends DoFn<byte[], KafkaSourceDescr

@VisibleForTesting final @Nullable List<String> topics;

private final @Nullable Pattern topicPattern;

@ProcessElement
public void processElement(OutputReceiver<KafkaSourceDescriptor> receiver) {
List<TopicPartition> partitions =
new ArrayList<>(Preconditions.checkStateNotNull(topicPartitions));
if (partitions.isEmpty()) {
try (Consumer<?, ?> consumer = consumerFactoryFn.apply(consumerConfig)) {
for (String topic : Preconditions.checkStateNotNull(topics)) {
for (PartitionInfo p : consumer.partitionsFor(topic)) {
partitions.add(new TopicPartition(p.topic(), p.partition()));
List<String> topics = Preconditions.checkStateNotNull(this.topics);
if (topics.isEmpty()) {
Pattern pattern = Preconditions.checkStateNotNull(topicPattern);
for (Map.Entry<String, List<PartitionInfo>> entry :
consumer.listTopics().entrySet()) {
if (pattern.matcher(entry.getKey()).matches()) {
for (PartitionInfo p : entry.getValue()) {
partitions.add(new TopicPartition(p.topic(), p.partition()));
}
}
}
} else {
for (String topic : topics) {
for (PartitionInfo p : consumer.partitionsFor(topic)) {
partitions.add(new TopicPartition(p.topic(), p.partition()));
}
}
}
}
Expand Down Expand Up @@ -1634,12 +1675,16 @@ public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
List<String> topics = Preconditions.checkStateNotNull(getTopics());
List<TopicPartition> topicPartitions = Preconditions.checkStateNotNull(getTopicPartitions());
Pattern topicPattern = getTopicPattern();
if (topics.size() > 0) {
builder.add(DisplayData.item("topics", Joiner.on(",").join(topics)).withLabel("Topic/s"));
} else if (topicPartitions.size() > 0) {
builder.add(
DisplayData.item("topicPartitions", Joiner.on(",").join(topicPartitions))
.withLabel("Topic Partition/s"));
} else if (topicPattern != null) {
builder.add(
DisplayData.item("topicPattern", topicPattern.pattern()).withLabel("Topic Pattern"));
}
Set<String> disallowedConsumerPropertiesKeys =
KafkaIOUtils.DISALLOWED_CONSUMER_PROPERTIES.keySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ enum KafkaIOReadProperties {
CONSUMER_CONFIG,
TOPICS,
TOPIC_PARTITIONS,
TOPIC_PATTERN,
KEY_CODER,
VALUE_CODER,
CONSUMER_FACTORY_FN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.UnboundedSource;
Expand Down Expand Up @@ -65,14 +67,26 @@ public List<KafkaUnboundedSource<K, V>> split(int desiredNumSplits, PipelineOpti

if (partitions.isEmpty()) {
try (Consumer<?, ?> consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig())) {
for (String topic : Preconditions.checkStateNotNull(spec.getTopics())) {
List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
checkState(
partitionInfoList != null,
"Could not find any partitions info. Please check Kafka configuration and make sure "
+ "that provided topics exist.");
for (PartitionInfo p : partitionInfoList) {
partitions.add(new TopicPartition(p.topic(), p.partition()));
List<String> topics = Preconditions.checkStateNotNull(spec.getTopics());
if (topics.isEmpty()) {
Pattern pattern = Preconditions.checkStateNotNull(spec.getTopicPattern());
for (Map.Entry<String, List<PartitionInfo>> entry : consumer.listTopics().entrySet()) {
if (pattern.matcher(entry.getKey()).matches()) {
for (PartitionInfo p : entry.getValue()) {
partitions.add(new TopicPartition(p.topic(), p.partition()));
}
}
}
} else {
for (String topic : topics) {
List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
checkState(
partitionInfoList != null,
"Could not find any partitions info. Please check Kafka configuration and make sure "
+ "that provided topics exist.");
for (PartitionInfo p : partitionInfoList) {
partitions.add(new TopicPartition(p.topic(), p.partition()));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
Expand Down Expand Up @@ -64,6 +65,7 @@ class WatchForKafkaTopicPartitions extends PTransform<PBegin, PCollection<KafkaS
private final Map<String, Object> kafkaConsumerConfig;
private final @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn;
private final Set<String> topics;
private final @Nullable Pattern topicPattern;
private final @Nullable Instant startReadTime;
private final @Nullable Instant stopReadTime;

Expand All @@ -73,13 +75,15 @@ public WatchForKafkaTopicPartitions(
Map<String, Object> kafkaConsumerConfig,
@Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn,
Set<String> topics,
@Nullable Pattern topicPattern,
@Nullable Instant startReadTime,
@Nullable Instant stopReadTime) {
this.checkDuration = firstNonNull(checkDuration, DEFAULT_CHECK_DURATION);
this.kafkaConsumerFactoryFn = kafkaConsumerFactoryFn;
this.kafkaConsumerConfig = kafkaConsumerConfig;
this.checkStopReadingFn = checkStopReadingFn;
this.topics = topics;
this.topicPattern = topicPattern;
this.startReadTime = startReadTime;
this.stopReadTime = stopReadTime;
}
Expand All @@ -91,7 +95,8 @@ public PCollection<KafkaSourceDescriptor> expand(PBegin input) {
.apply(
"Match new TopicPartitions",
Watch.growthOf(
new WatchPartitionFn(kafkaConsumerFactoryFn, kafkaConsumerConfig, topics))
new WatchPartitionFn(
kafkaConsumerFactoryFn, kafkaConsumerConfig, topics, topicPattern))
.withPollInterval(checkDuration))
.apply(ParDo.of(new ConvertToDescriptor(checkStopReadingFn, startReadTime, stopReadTime)));
}
Expand Down Expand Up @@ -134,22 +139,27 @@ private static class WatchPartitionFn extends PollFn<byte[], TopicPartition> {
kafkaConsumerFactoryFn;
private final Map<String, Object> kafkaConsumerConfig;
private final Set<String> topics;
private final @Nullable Pattern topicPattern;

private WatchPartitionFn(
SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> kafkaConsumerFactoryFn,
Map<String, Object> kafkaConsumerConfig,
Set<String> topics) {
Set<String> topics,
@Nullable Pattern topicPattern) {
this.kafkaConsumerFactoryFn = kafkaConsumerFactoryFn;
this.kafkaConsumerConfig = kafkaConsumerConfig;
this.topics = topics;
this.topicPattern = topicPattern;
}

@Override
public Watch.Growth.PollResult<TopicPartition> apply(byte[] element, Context c)
throws Exception {
Instant now = Instant.now();
return Watch.Growth.PollResult.incomplete(
now, getAllTopicPartitions(kafkaConsumerFactoryFn, kafkaConsumerConfig, topics))
now,
getAllTopicPartitions(
kafkaConsumerFactoryFn, kafkaConsumerConfig, topics, topicPattern))
.withWatermark(now);
}
}
Expand All @@ -158,7 +168,8 @@ now, getAllTopicPartitions(kafkaConsumerFactoryFn, kafkaConsumerConfig, topics))
static List<TopicPartition> getAllTopicPartitions(
SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> kafkaConsumerFactoryFn,
Map<String, Object> kafkaConsumerConfig,
Set<String> topics) {
Set<String> topics,
@Nullable Pattern topicPattern) {
List<TopicPartition> current = new ArrayList<>();
try (Consumer<byte[], byte[]> kafkaConsumer =
kafkaConsumerFactoryFn.apply(kafkaConsumerConfig)) {
Expand All @@ -168,12 +179,13 @@ static List<TopicPartition> getAllTopicPartitions(
current.add(new TopicPartition(topic, partition.partition()));
}
}

} else {
for (Map.Entry<String, List<PartitionInfo>> topicInfo :
kafkaConsumer.listTopics().entrySet()) {
for (PartitionInfo partition : topicInfo.getValue()) {
current.add(new TopicPartition(topicInfo.getKey(), partition.partition()));
if (topicPattern == null || topicPattern.matcher(topicInfo.getKey()).matches()) {
for (PartitionInfo partition : topicInfo.getValue()) {
current.add(new TopicPartition(partition.topic(), partition.partition()));
}
}
}
}
Expand Down
Loading

0 comments on commit c36f0f1

Please sign in to comment.