-
-
Notifications
You must be signed in to change notification settings - Fork 196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sometimes consumer have no partition assigned to it after rebalance #366
Comments
Would you be able to capture the info level logs of the consumer leader during rebalance? It currently logs the assignment it is giving to all members, so if the assignment looks correct, then something about the members is broken. |
im add debug to OnAssigned logs from leader contains this messages
as you see many consumers does not have any assigned partitions.... |
i can create some workaround for such case like OnAssigned check how much partitions assigned to consumer and if no - restart pod, but this is looks very ugly. |
What is the full log from the leader? |
And the few logs before that -- particularly, log lines that contain "interests". |
i dont have any lines contains word interests |
|
that's all logs from pod |
i'm use franz-go in framework and use this top level code https://github.com/unistack-org/micro-broker-kgo/blob/v3/util.go |
There are multiple things confusing here. First, the log itself includes an "interests" field, franz-go/pkg/kgo/group_balancer.go Line 427 in 3322f31
this is key to seeing what all members are interested in consuming -- without these logs, I can't recreate the inputs to the balance plan. In your logs, I see this:
Also, in your code, I see that you're using the balancers
Something in your group is different -- if everything is cooperative, then that log wouldn't exist. |
can you provide next steps for me? |
does interests string exists in proper place in 1.4.2 ? |
That "interests" string exists since the log line was introduced. Please upgrade to 1.12.1, but be aware of this (the only) behavior change in 1.5.1: https://github.com/twmb/franz-go/blob/master/CHANGELOG.md#v151 |
i'm try this version and its behaviour the same, i'm try to reproduce it with logs and sent results |
i'm use 1.12.1 and slightly modified goroutine per partition example, what does it mean in logs:
and this
my tests contains 48 partitions in topic, about 200000 messages per partition and i'm run 8 instances of service to handle partitions. when i'm start in single instance all works fine. if i'm start one instance that does 4 different consumers - all partitions handled fine.
and this
|
What's the rest of the unable to commit message?
Both of these are missing the error field. It's impossible to help if the output is truncated. |
sorry my logger eats messages:
and
|
but i'm not restart old consumer , why coordinator is not aware of it |
So, it looks like you're taking so long to process records (or finish the rebalance callbacks) that the broker is kicking your member from the group. |
This is empty handler that simplify ack message. But message volume is very
high, so in one fetch I’m get 10000 messages
Пн, 27 февр. 2023 г. в 04:45, Travis Bischel ***@***.***>:
So, it looks like you're taking so long to process records (or finish the
rebalance callbacks) that the broker is kicking your member from the group.
—
Reply to this email directly, view it on GitHub
<#366 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AADVQG2WWS4OZ4NFBNESMQLWZQBMTANCNFSM6AAAAAAVDGEUOA>
.
You are receiving this because you authored the thread.Message ID:
***@***.***>
--
--
Vasiliy Tolstov,
e-mail: ***@***.***
|
Could you add a prefix func to the basic formatter, func() string { return time.Now().Format(time. StampMilli) } And then post some of the logs? That may help see if something is taking too long (not sure what, yet) |
|
what is the recommended size of num for PollRecords ? |
also what channel size to specify? (now i'm set chan size with number of assigned partitions)
|
after sometime i'm start another consumer and on assign it receives no partitions
but admin command says that this member id not present in group!
|
The first group of logs doesn't start early enough, but I'm assuming currently that your revoke callback is taking too long. The first log lines already indicate a rebalance is in progress at 9:17:05, then at 9:17:11, you start getting UNKNOWN_MEMBER_ID, which is due to your revoke callback not being done yet. In fact, your revoke still isn't done in that entire set of logs. Maybe it's a deadlock on your side.
Up to you
Not sure, but given the other issues above, it's not worth looking into this issue because it could just be a fallout of the other problems. |
ok if i'm run goroutine per partition example with mark commit offsets - does it enough to reproduce problems ? |
Not sure, this looks like an application side problem. |
consumer1.log
logs from consumer1 and consumer2 instances in attached files |
Those full logs help -- I see a bug -- can you remove Sticky() from your balancers, to make it just CooperativeSticky? On this line: https://github.com/unistack-org/micro-broker-kgo/blob/adb5c9bfc9fe9c100e6f090a0e5ea73344ed2ece/kgo.go#L385 The bug is that some logic in the client is thinking the group is eager, because there is an eager balancer option. The group is rebalancing as coopartive (i.e. only getting partial assignments) while the client is revoking all partitions. |
thanks, you suggestion works fine for me. now all consumers always get assignment and process messages. |
The previous cooperative check was wrong: if you used multiple group balancers, if any were eager, the client would think you were eager consuming. However this bug did not affect all areas of group balancing, which actually made the bug worse. If everything in the code went eager, then the bug would just be "you saw more stop the world than was intended". Instead, if the chosen balancer was cooperative, then cooperative balancing would occur for eager consumers. On rebalance, partitions would be lost, and then a cooperative rebalance would not occur, meaning the partitions would be stuck until another rebalance. We fix this by saving if we are cooperative based on the rebalancer that is actually chosen. This is an upgrade path that should happen once -- once to cooperative -- but we can downgrade if the user adds a random eager balancer. This is not supported per KIP-429 so we just warn log and continue as best we can (likely with duplicate consumption). Closes #366.
The previous cooperative check was wrong: if you used multiple group balancers, if any were eager, the client would think you were eager consuming. However this bug did not affect all areas of group balancing, which actually made the bug worse. If everything in the code went eager, then the bug would just be "you saw more stop the world than was intended". Instead, if the chosen balancer was cooperative, then cooperative balancing would occur for eager consumers. On rebalance, partitions would be lost, and then a cooperative rebalance would not occur, meaning the partitions would be stuck until another rebalance. We fix this by saving if we are cooperative based on the rebalancer that is actually chosen. This is an upgrade path that should happen once -- once to cooperative -- but we can downgrade if the user adds a random eager balancer. This is not supported per KIP-429 so we just warn log and continue as best we can (likely with duplicate consumption). Closes #366.
i'm try latest 1.12.1 and older 1.4.2 both have the same effect
kafka 2.4 and i cant upgrade it
The text was updated successfully, but these errors were encountered: