Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #199] Kafka Connector: Initial Implementation of Consumer & Producer #1014

Merged
merged 20 commits into from
Aug 26, 2022
Merged

Conversation

Markliniubility
Copy link
Member

@Markliniubility Markliniubility commented Jul 12, 2022

Impl ISSUE #199

Motivation

The goal is to expand the event storage component that EventMesh can support, Kafka, based on the existing API. This PR is the first step of this project, which implemented a consumer.

This PR, as well as all the following PR on Kafka Connectors, will not be merged into the master. It will be merged into a Kafka branch instead

Modifications

A basic/mimimal consumer of Kafka.

Documentation

  • Does this pull request introduces a new feature? (yes / no)
  • Yes
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
  • Not yet documented
  • If a feature is not applicable for documentation, explain why?
  • It will be documented after the completion of the producer and a functional Kafka connector
  • If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation
  • The follow-up will be creating the producer, as well as implementing more features of Kafka

//import org.apache.eventmesh.api.exception.ConnectorRuntimeException;
//import org.apache.eventmesh.api.exception.OnExceptionContext;
//import org.apache.eventmesh.api.producer.Producer;
//import org.apache.eventmesh.common.Constants;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the invalid code.

@Markliniubility Markliniubility changed the title Kafka Connector: Initial Implementation of Consumer [ISSUE #1018] Kafka Connector: Initial Implementation of Consumer Jul 12, 2022
@codecov
Copy link

codecov bot commented Jul 12, 2022

Codecov Report

Merging #1014 (6b53e20) into kafka-connector (dfe58f5) will increase coverage by 0.01%.
The diff coverage is 0.00%.

@@                 Coverage Diff                  @@
##             kafka-connector   #1014      +/-   ##
====================================================
+ Coverage               8.37%   8.39%   +0.01%     
  Complexity               529     529              
====================================================
  Files                    362     362              
  Lines                  23228   23245      +17     
  Branches                2546    2546              
====================================================
+ Hits                    1946    1951       +5     
- Misses                 21132   21144      +12     
  Partials                 150     150              
Impacted Files Coverage Δ
.../connector/rocketmq/consumer/PushConsumerImpl.java 24.64% <ø> (ø)
...nector/rocketmq/producer/RocketMQProducerImpl.java 0.00% <0.00%> (ø)
...pl/eventmeshmessage/EventMeshMessageTCPClient.java 10.81% <0.00%> (-2.99%) ⬇️
...pache/eventmesh/runtime/boot/EventMeshStartup.java 0.00% <0.00%> (ø)
.../http/processor/RemoteSubscribeEventProcessor.java 0.00% <0.00%> (ø)
.../http/processor/SendAsyncRemoteEventProcessor.java 0.00% <0.00%> (ø)
.../eventmesh/common/config/ConfigurationWrapper.java 50.00% <0.00%> (+6.75%) ⬆️

📣 Codecov can now indicate which changes are the most critical in Pull Requests. Learn more

@Alonexc
Copy link
Contributor

Alonexc commented Jul 13, 2022

License check did not pass, please check the license format or did you forget to add a license?

@mytang0
Copy link
Member

mytang0 commented Jul 13, 2022

Hello, please fix the license check problem.

@LIU-WEI-git
Copy link
Contributor

LIU-WEI-git commented Jul 25, 2022

Hi, thanks for your initial implementation. Here are some suggestions.

This pr just implements consumer without enough tests. I recommend you implement producer first and verify it through local kafka server. If you could write some UT with kafka testcontainer, it will be much better. After that, we could have a base to implement consumer correctly.

What's more, please replace rocketmq with kafka in your code and comment.

@Markliniubility
Copy link
Member Author

Hi, thanks for your initial implementation. Here are some suggestions.

This pr just implements consumer without enough tests. I recommend you implement producer first and verify it through local kafka server. If you could write some UT with kafka testcontainer, it will be much better. After that, we could have a base to implement consumer correctly.

What's more, please replace rocketmq with kafka in your code and comment.

Thank you for reviewing! I agree with you on implementing the producer first (which I should have done first) and providing unit tests, and I am working on implementing the producer. However, the branch was only going to merge into a development branch, kafka-connector, instead of the master branch. I was going to add the unit tests and producer in separate PRs and make sure everything works and is fully tested before rolling out to the master branch. I prefer splitting the project into small PRs since it can be more easily reviewed and understood.

# under the License.
#

org.apache.eventmesh.connector.kafka.producer.ProducerImpl
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, these test resource files is never used. Maybe you should commit them when you really need.

loadProperties();
}

public String getProp(String key) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is never used. Maybe you need write another Configration class like RocketMQ Connector to enable your .properties file.

Map<String, List<PartitionInfo>> topicsAndPartition = this.kafkaConsumer.listTopics();
Set<String> topicsSet = topicsAndPartition.keySet();
List<String> topics = new ArrayList<>();
topics.addAll(topicsSet);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, you get all topics but not do any change of them. It seems that you should remove the special topic from the already subscribed topics.

@Markliniubility Markliniubility changed the title [ISSUE #1018] Kafka Connector: Initial Implementation of Consumer [ISSUE #1018 #200] [WIP: do not merge] Kafka Connector: Initial Implementation of Consumer Jul 28, 2022
@Markliniubility Markliniubility changed the title [ISSUE #1018 #200] [WIP: do not merge] Kafka Connector: Initial Implementation of Consumer [ISSUE #199] [WIP: do not merge] Kafka Connector: Initial Implementation of Consumer Jul 28, 2022
@Markliniubility Markliniubility changed the title [ISSUE #199] [WIP: do not merge] Kafka Connector: Initial Implementation of Consumer [ISSUE #199] [WIP: do not merge] Kafka Connector: Initial Implementation of Consumer & Producer Jul 28, 2022
@xwm1992 xwm1992 marked this pull request as draft August 2, 2022 02:46
@xwm1992 xwm1992 changed the title [ISSUE #199] [WIP: do not merge] Kafka Connector: Initial Implementation of Consumer & Producer [ISSUE #199] Kafka Connector: Initial Implementation of Consumer & Producer Aug 2, 2022
@qqeasonchen qqeasonchen marked this pull request as ready for review August 26, 2022 10:20
@qqeasonchen qqeasonchen merged commit 817dbd0 into apache:kafka-connector Aug 26, 2022
xwm1992 pushed a commit that referenced this pull request Sep 22, 2022
[ISSUE #199] Kafka Connector: Initial Implementation of Consumer & Producer
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants