-
Notifications
You must be signed in to change notification settings - Fork 138
Avoid creating offset consumer before updating consumer metrics #426
Avoid creating offset consumer before updating consumer metrics #426
Conversation
4336570
to
1f1dc01
Compare
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java
Outdated
Show resolved
Hide resolved
I've also changed the log level of empty group check to debug. public CompletableFuture<Consumer> getGroupConsumers(String groupId, TopicPartition kafkaPartition) {
if (StringUtils.isEmpty(groupId)) {
if (log.isDebugEnabled()) {
log.debug("Try to get group consumers with an empty group id");
}
return CompletableFuture.completedFuture(null);
} Because in my test environment, there're a lot of logs like
It looks like there's something wrong with kop/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java Lines 359 to 368 in 0116024
Could you help take a look? @dockerzhang |
@BewareMyPower I think there is no |
@dockerzhang It's the problem in the real environment not UT. The ZK node exists like:
The logs occurred in another broker whose IP is |
BTW, the |
Wait for #427 merged. |
Motivation
Sometimes multiple brokers could subscribe the same topic partition and fail with
ConsumerBusyException
:This problem was mentioned in #263 before. However, switching the subscription type to shared or failover is not correct because a partition should have only one consumer at the same time. The problem is before updating consumer metrics, it uses
OffsetAcker#getConsumer
to check if the consumer has been created. This check is wrong.Firstly,
OffsetAcker#getConsumer
may create an offset consumer, while the actual offset consumer could have been created by another broker.Secondly, it only checks if the future of consumer is done and returns immediately if not. It could cause a situation that this metrics update was skipped.
Modifications
KafkaTopicConsumer#getGroupConsumers
. Because the offset consumer could be created byGroupCoordinator#handleSyncGroup
before, here we just check if the consumer existed. Then update the consumer metrics when the future is completed.