-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add topicPattern property to KafkaIO.Read to match topics using a regex #26948
Conversation
@@ -1578,15 +1604,30 @@ static class GenerateKafkaSourceDescriptor extends DoFn<byte[], KafkaSourceDescr | |||
|
|||
@VisibleForTesting final @Nullable List<String> topics; | |||
|
|||
private final @Nullable Pattern topicPattern; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not annotated with @VisibleForTesting
since the property is not accessed directly in tests, but neither are the properties which do specify @VisibleForTesting
.
Assigning reviewers. If you would like to opt out of this review, comment R: @bvolpato for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
} | ||
} else { | ||
for (String topic : topics) { | ||
for (PartitionInfo p : consumer.partitionsFor(topic)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any chance this is null at this point? In the split below you have null checks, but not here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this referring to topics
? Both topicPartitions
and topics
are initialized as empty lists in the builder by default and replaced using .withTopics()
and .withTopicPartitions()
. The previous Preconditions.checkStateNotNull(topics)
expression in the for loop should still not be null under any circumstance. Special care should be taken to carry that property forward when we add support for this property in KafkaIO's ExternalTransformRegistrar though, since it doesn't guarantee the same object state the builder guarantees.
In regards to topicPattern
, if both topicPartitions
and topics
are empty, then topicPattern
must be non-null, since the PTransform's expansion checks that at least one of those properties is set and the .withX()
builder methods check that none are previously set.
As far as Kafka's topic metadata goes, .partitionsFor()
will throw an exception if an unauthorized topic is requested and .listTopics()
will only list all authorized topics. Both methods return initialized objects and ensure that potential null responses from the server are translated to empty collections (as far back as org.apache.kafka:kafka-clients:0.11.0.3
) or throw an exception in the case of an authorization failure. I'd say that the existing check on partitionInfoList
seems superfluous and could potentially be considered for deletion:
List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
checkState(
partitionInfoList != null,
"Could not find any partitions info. Please check Kafka configuration and make sure "
+ "that provided topics exist.");
for (PartitionInfo p : partitionInfoList) {
partitions.add(new TopicPartition(p.topic(), p.partition()));
}
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @robertwb for label java. Available commands:
|
@pabloem @johnjcasey can you please TAL / merge? Thanks! |
Reminder, please take a look at this pr: @robertwb @chamikaramj |
@@ -723,6 +724,31 @@ public void testUnboundedSourceWithExplicitPartitions() { | |||
p.run(); | |||
} | |||
|
|||
@Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a test, or update this test, such that you don't match all the topics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added one for partial matches and one for no matches.
@johnjcasey Unit tests are failing early on a missing dependency. It has been superseded by com.github.davidmc24.gradle.plugin.avro it seems, see changelog. |
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @Abacn for label java. Available commands:
|
run RAT PreCommit |
Tests appear to be building fine to me |
* <p>See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the | ||
* partitions are distributed among the splits. | ||
*/ | ||
public Read<K, V> withTopicPattern(Pattern topicPattern) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can change this API to use a portable type (for example, a string regex) so that this can be supported via cross-language wrappers ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Read.External.Configuration
has fields for keyDeserializer
and valueDeserializer
which are resolved from String
to Class
during external transform construction, does it make sense to provide a mapping there instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we don't want to be using external read configuration as a pattern. It makes it so the configurations for an IO diverge for python v. java users
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Would an additional method overload with String topicPattern
help at all?
Otherwise I'll change the method signature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can have both
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a significant advantage to using "Pattern" over a string regex ?
If it's a perf issue, we could just build the Patter object once within "withTopicPattern" and use that ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly to make sure that pattern flags can be specified by users, but nearly all (except LITERAL
and CANON_EQ
) can be specified in the expression as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. If there is no significant advantage for specifying a Pattern object I would just support specifying a String regex to keep the API simple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. LGTM.
…ex (apache#26948) * Add topicPattern property to KafkaIO.Read to match topics using a regex * Add partially matched and unmatched topic pattern test * Change method signature of withTopicPattern to use String
…ex (apache#26948) * Add topicPattern property to KafkaIO.Read to match topics using a regex * Add partially matched and unmatched topic pattern test * Change method signature of withTopicPattern to use String
…ex (apache#26948) * Add topicPattern property to KafkaIO.Read to match topics using a regex * Add partially matched and unmatched topic pattern test * Change method signature of withTopicPattern to use String
…ex (apache#26948) * Add topicPattern property to KafkaIO.Read to match topics using a regex * Add partially matched and unmatched topic pattern test * Change method signature of withTopicPattern to use String
…ex (apache#26948) * Add topicPattern property to KafkaIO.Read to match topics using a regex * Add partially matched and unmatched topic pattern test * Change method signature of withTopicPattern to use String
This addresses #19217 and #21338, matching Apache Flink's KafkaSource property
topicPattern
.Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.