-
Notifications
You must be signed in to change notification settings - Fork 138
kop using pulsar consumer metrics #263
kop using pulsar consumer metrics #263
Conversation
961c2b0
to
02ee835
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's the first version for kop using pulsar consumer metrics.
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/MessageRecordUtils.java
Outdated
Show resolved
Hide resolved
@jiazhai @BewareMyPower PTAL |
Please fix the license check first. I'll review this PR soon. As for now, I just have a question, |
@BewareMyPower I will try to check if |
@BewareMyPower the PR which exposed |
Got it. |
1e7fed4
to
7524f0a
Compare
@BewareMyPower apache/pulsar#8951 has been merged, how to trigger this PR using the latest method? |
@zymap I think we may need to change the pulsar dependency to sn-pulsar. Could you give some help on @dockerzhang 's question? |
@dockerzhang I'll release a self-maintained pulsar soon. Then we can use it as KoP's dependency release. Before it, I may release KoP 2.7.0 first to make KoP work with pulsar 2.7.0. |
Now the latest pulsar dependency (2.8.0-rc-202012200040) is ready, please rebase to latest master. |
f912533
to
1860bf9
Compare
Please rebase to latest and some changes are needed. Because now we use public void updateStats(List<Entry> entries, Consumer consumer) |
ok, I will add it tomorrow. |
9aea04a
to
b84e276
Compare
4cdd0d2
to
d289c2f
Compare
...pl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java
Outdated
Show resolved
Hide resolved
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java
Outdated
Show resolved
Hide resolved
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java
Outdated
Show resolved
Hide resolved
The I think it may take some time to figure out why it failed. I'll first take a look for a while. |
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java
Outdated
Show resolved
Hide resolved
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/MessageFetchContext.java
Show resolved
Hide resolved
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ZooKeeperUtils.java
Show resolved
Hide resolved
I found the reason why public CompletableFuture<Consumer> getGroupConsumers(String groupId, TopicPartition kafkaPartition) {
if (groupId == null || groupId.isEmpty()) {
return CompletableFuture.completedFuture(null);
} But I think getting an empty group id is not a proper behavior. After resolving the problem, you can rebase to latest to include PR #361 . |
In addition, should we avoid public final ConcurrentHashMap<String, String> currentConnectedGroup;
public final String groupIdStoredPath; And the internal methods like |
6e93572
to
a642db5
Compare
@BewareMyPower thanks for your help about this pr. the latest commit fixed next problem |
Please rebase to master again since #368 fixes the broken kop integration tests. |
a642db5
to
9e123db
Compare
### Motivation Sometimes multiple brokers could subscribe the same topic partition and fail with `ConsumerBusyException`: ``` WARN io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetAcker - Error when get consumer for offset ack: java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$ConsumerBusyException: Exclusive consumer is already connected ``` This problem was mentioned in #263 before. However, switching the subscription type to shared or failover is not correct because a partition should have only one consumer at the same time. The problem is before updating consumer metrics, it uses `OffsetAcker#getConsumer` to check if the consumer has been created. This check is wrong. Firstly, `OffsetAcker#getConsumer` may create an offset consumer, while the actual offset consumer could have been created by another broker. Secondly, it only checks if the future of consumer is done and returns immediately if not. It could cause a situation that this metrics update was skipped. ### Modifications - Modify the condition check in `KafkaTopicConsumer#getGroupConsumers`. Because the offset consumer could be created by `GroupCoordinator#handleSyncGroup` before, here we just check if the consumer existed. Then update the consumer metrics when the future is completed. - Remove the failed future of offset consumers. - Add consumer stats tests to guarantee the modification works.
Fixes #392 ### Motivation #105 introduced the `OffsetAcker` to reuse the backlog stat of Pulsar subscription. However, after #296 introduced the continuous offset, when the `OffsetAcker` want to find the `MessageId` or `Position`, which contains the ledger id and entry id of a batch, it needs to get the `PersistentTopic` using `BrokerService#getTopic`. If the current broker was not the owner broker of the partition, `getTopic` would fail. However, Kafka consumers send the `OFFSET_COMMIT` requests to the coordinator broker, which is associated with the specified group name. A coordinator broker may not own the partition after the pulsar's topic ownership changed, so the `getTopic` would fail and couldn't be recovered, which affect the performance significantly and a lot of warnings logs would be printed. ### Modifications - Remove the `OffsetAcker` with the related metrics (`CoordinatorStats`) and tests (`CommitOffsetBacklogTest`) - Remove the reused consumer stats from #263 with the related tests (`testKafkaConsumerMetrics`) - Add `testMixedConsumersWithSameSubscription` to cover the case that Kafka consumer and Pulsar consumer subscribes the same topic with same subscription name. Because before this PR, Pulsar consumer were aware of the Kafka consumer's consumed position (committed offset). - Fix the current tests: - `testDeleteTopics`: After this PR, the topic could be deleted even the Kafka consumers were still connected because no broker side Consumer were attached to the topic. - `testPulsarProduceKafkaConsume2`: It's a test for Kafka consumer with `enable.auto.commit=true`, however the validation is wrong because even a consumer hasn't committed any offset, the consumer can still consume from the latest offset. Regarding to the cases that Kafka or Pulsar producer produces and Kafka or Pulsar consumer consumes, they are covered by existed `KafkaRequestTypeTest`. And the offset commit case is covered by `DifferentNamespaceTestBase#testCommitOffset`.
fix for #248