Skip to content

Commit

Permalink
fix #639 fix semantics of topicOfInterest (#642)
Browse files Browse the repository at this point in the history
Signed-off-by: Zhanhui Li <[email protected]>
  • Loading branch information
lizhanhui authored Dec 1, 2023
1 parent e2c0b97 commit c38f7d5
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 9 deletions.
1 change: 1 addition & 0 deletions cpp/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ bazel-rocketmq-client-cpp
/bazel-*
/compile_commands.json
/.cache/
.clangd
2 changes: 1 addition & 1 deletion cpp/source/rocketmq/Producer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ ProducerBuilder& ProducerBuilder::withConfiguration(Configuration configuration)
}

ProducerBuilder& ProducerBuilder::withTopics(const std::vector<std::string>& topics) {
impl_->topicsOfInterest(topics);
impl_->withTopics(topics);
return *this;
}

Expand Down
18 changes: 16 additions & 2 deletions cpp/source/rocketmq/ProducerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
#include "ProducerImpl.h"

#include <algorithm>
#include <apache/rocketmq/v2/definition.pb.h>

#include <atomic>
Expand Down Expand Up @@ -575,9 +576,22 @@ void ProducerImpl::onOrphanedTransactionalMessage(MessageConstSharedPtr message)
}
}

void ProducerImpl::topicsOfInterest(std::vector<std::string> topics) {
void ProducerImpl::topicsOfInterest(std::vector<std::string> &topics) {
absl::MutexLock lk(&topics_mtx_);
topics_.swap(topics);
for (auto& topic : topics_) {
if (std::find(topics.begin(), topics.end(), topic) == topics.end()) {
topics.push_back(topic);
}
}
}

void ProducerImpl::withTopics(const std::vector<std::string> &topics) {
absl::MutexLock lk(&topics_mtx_);
for (auto &topic: topics) {
if (std::find(topics_.begin(), topics_.end(), topic) == topics_.end()) {
topics_.push_back(topic);
}
}
}

void ProducerImpl::buildClientSettings(rmq::Settings& settings) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/source/rocketmq/PushConsumerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ PushConsumerImpl::~PushConsumerImpl() {
shutdown();
}

void PushConsumerImpl::topicsOfInterest(std::vector<std::string> topics) {
void PushConsumerImpl::topicsOfInterest(std::vector<std::string> &topics) {
absl::MutexLock lk(&topic_filter_expression_table_mtx_);
for (const auto& entry : topic_filter_expression_table_) {
topics.push_back(entry.first);
Expand Down
2 changes: 1 addition & 1 deletion cpp/source/rocketmq/SimpleConsumerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void SimpleConsumerImpl::buildClientSettings(rmq::Settings& settings) {
}
}

void SimpleConsumerImpl::topicsOfInterest(std::vector<std::string> topics) {
void SimpleConsumerImpl::topicsOfInterest(std::vector<std::string> &topics) {
absl::MutexLock lk(&subscriptions_mtx_);
for (const auto& entry : subscriptions_) {
if (std::find(topics.begin(), topics.end(), entry.first) == topics.end()) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/source/rocketmq/include/ClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class ClientImpl : virtual public Client {
absl::flat_hash_map<std::string, std::unique_ptr<Session>> session_map_ GUARDED_BY(session_map_mtx_);
absl::Mutex session_map_mtx_;

virtual void topicsOfInterest(std::vector<std::string> topics) {
virtual void topicsOfInterest(std::vector<std::string> &topics) {
}

void updateRouteInfo() LOCKS_EXCLUDED(topic_route_table_mtx_);
Expand Down
4 changes: 3 additions & 1 deletion cpp/source/rocketmq/include/ProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ class ProducerImpl : virtual public ClientImpl, public std::enable_shared_from_t

void buildClientSettings(rmq::Settings& settings) override;

void topicsOfInterest(std::vector<std::string> topics) override LOCKS_EXCLUDED(topics_mtx_);
void topicsOfInterest(std::vector<std::string> &topics) override LOCKS_EXCLUDED(topics_mtx_);

void withTopics(const std::vector<std::string> &topics) LOCKS_EXCLUDED(topics_mtx_);

const PublishStats& stats() const {
return stats_;
Expand Down
2 changes: 1 addition & 1 deletion cpp/source/rocketmq/include/PushConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class PushConsumerImpl : virtual public ClientImpl, public std::enable_shared_fr

void prepareHeartbeatData(HeartbeatRequest& request) override;

void topicsOfInterest(std::vector<std::string> topics) override LOCKS_EXCLUDED(topic_filter_expression_table_mtx_);
void topicsOfInterest(std::vector<std::string> &topics) override LOCKS_EXCLUDED(topic_filter_expression_table_mtx_);

void start() override;

Expand Down
2 changes: 1 addition & 1 deletion cpp/source/rocketmq/include/SimpleConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class SimpleConsumerImpl : public ClientImpl, public std::enable_shared_from_thi
}

protected:
void topicsOfInterest(std::vector<std::string> topics) override;
void topicsOfInterest(std::vector<std::string> &topics) override;

private:
absl::flat_hash_map<std::string, FilterExpression> subscriptions_ GUARDED_BY(subscriptions_mtx_);
Expand Down

0 comments on commit c38f7d5

Please sign in to comment.