-
Notifications
You must be signed in to change notification settings - Fork 138
Use Subscription#resetCursor for cumulative ACK #515
Use Subscription#resetCursor for cumulative ACK #515
Conversation
@hangc0276 The CPU usage problem may be related to the openmessaging-benchmark's bug, the
KoP has reset the cursor for about 33 to 47 times in every second. There may be something wrong with my benchmark client. (openmessaging-benchmark) However, for the same workloads, after this PR, KoP has a significant better performance. See the following compare. Before this PR, broker could be easily broken when lots of OFFSET_COMMIT requests arrived. |
@dockerzhang I removed the consumer side stats in this PR. Reusing broker side consumer's stats is convenient, however, KoP will manage N broker consumers and N client consumers for N topics/partitions. When N is too large, the performance could be impacted significantly. I think we can add these metrics to prometheus metrics later. |
I have a question, have you tried adjusting different commit frequencies to observe performance changes? Or in other words, how to determine that the performance loss is caused by changing the way of commit(ackOffset -> resetCursor) ? |
@wuzhanpeng I'm going to do this today because I just found the commit frequency is too last after this change (lots of INFO logs from pulsar side). BTW, for the previous test (#513), you can see all errors in |
There're 2 tests that may fail.
I'll try to fix them or remove them. |
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/OffsetAcker.java
Show resolved
Hide resolved
We're going to remove |
Motivation
Currently KoP uses
OffsetAcker
to manage a list of Pulsar consumers for cumulative ACK. However, it wastes a lot of resources. For example, a topic with N partitions will create N consumers across all brokers. Besides, for each client side consumer, broker will create an associated broker side consumer (in packageorg.apache.pulsar.broker.service
). In addition, the broker side consumer also updates two fields.See #513, it could easily makes a network channel broken when
OFFSET_COMMIT
request are frequent.What's worse is, when a bundle ownership changes, the consumers in
OffsetAcker
cannot detect the change. Because it needs to callBrokerService#getTopic
to get thePersistentTopic
, then find the offset's position by looking up in theManagedLedger
. However, even it failed, no error code will be sent to Kafka client.Modifications
Subscription#resetCursor
for cumulative ACK, for the first time when the cursor doesn't exist, create thePersistentSubscription
.OffsetAcker#ackOffset
will we write the data to__consumer_offsets
.CommitOffsetBacklogTest
because it relies on the update ofSubscription
.OffsetAcker
's internal consumers and the associated broker side consumers.