-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug][Connector-v2][PulsarSource]Fix pulsar option topic-pattern bug. #3989
[Bug][Connector-v2][PulsarSource]Fix pulsar option topic-pattern bug. #3989
Conversation
Is topic-pattern a required parameter? |
It's not required parameter. |
if (StringUtils.isNotBlank(topic)) { | ||
this.partitionDiscoverer = new TopicListDiscoverer(Arrays.asList(StringUtils.split(topic, ","))); | ||
} | ||
String topicPattern = config.getString(TOPIC_PATTERN.key()); | ||
if (StringUtils.isNotBlank(topicPattern)) { | ||
if (this.partitionDiscoverer != null) { | ||
throw new PulsarConnectorException(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, String.format("The properties '%s' and '%s' is exclusive.", TOPIC.key(), TOPIC_PATTERN.key())); | ||
if (config.hasPath(TOPIC_PATTERN.key())) { | ||
String topicPattern = config.getString(TOPIC_PATTERN.key()); | ||
if (StringUtils.isNotBlank(topicPattern)) { | ||
if (this.partitionDiscoverer != null) { | ||
throw new PulsarConnectorException(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, String.format("The properties '%s' and '%s' is exclusive.", TOPIC.key(), TOPIC_PATTERN.key())); | ||
} | ||
this.partitionDiscoverer = new TopicPatternDiscoverer(Pattern.compile(topicPattern)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If these two parameters are mutually exclusive, we should prompt the user during the check phase, not here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand what you mean. I just judge whether the topic-pattern parameter is configured.
if (this.partitionDiscoverer != null) { | ||
throw new PulsarConnectorException(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, String.format("The properties '%s' and '%s' is exclusive.", TOPIC.key(), TOPIC_PATTERN.key())); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this redundant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it needs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, if these two parameters are mutually exclusive, then they should be verified in the check phase, not here. so it is redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the judgment here also belongs to the check phase.
if (this.partitionDiscoverer != null) { | ||
throw new PulsarConnectorException(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, String.format("The properties '%s' and '%s' is exclusive.", TOPIC.key(), TOPIC_PATTERN.key())); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, if these two parameters are mutually exclusive, then they should be verified in the check phase, not here. so it is redundant.
if (StringUtils.isNotBlank(topicPattern)) { | ||
if (this.partitionDiscoverer != null) { | ||
throw new PulsarConnectorException(SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, String.format("The properties '%s' and '%s' is exclusive.", TOPIC.key(), TOPIC_PATTERN.key())); | ||
if (config.hasPath(TOPIC_PATTERN.key())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If TOPIC_PATTERN
and TOPIC
are exclusive, you need add exclusive(TOPIC_PATTERN, TOPIC)
in OptionRule. And you can reference https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
back to this question. #3989 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, redundancy judgment deleted.
@EricJoy2048 @Hisoka-X PTAL |
@@ -213,12 +213,11 @@ private void setPartitionDiscoverer(Config config) { | |||
if (StringUtils.isNotBlank(topic)) { | |||
this.partitionDiscoverer = new TopicListDiscoverer(Arrays.asList(StringUtils.split(topic, ","))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
topic
should use config.hasPath(TOPIC_PATTERN.key()
judge whether there is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
Purpose of this pull request
When the user does not set the topic-pattern option, it should be judged whether there is a setting, otherwise the following exception will appear.
![image](https://user-images.githubusercontent.com/40714172/214010793-f7cf5f57-f963-48fc-8cc2-f9ae0674cd1d.png)
Check list
New License Guide
release-note
.