-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix sarama consumer deadlock #2587
Fix sarama consumer deadlock #2587
Conversation
Codecov Report
@@ Coverage Diff @@
## master #2587 +/- ##
=======================================
Coverage 95.06% 95.07%
=======================================
Files 209 209
Lines 9370 9364 -6
=======================================
- Hits 8908 8903 -5
+ Misses 387 385 -2
- Partials 75 76 +1
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am concerned that this makes the logic even more complex by introducing additional signaling. My suggestion was to instead simplify it by reducing # of goroutines and handing more events in a single select{}
, i.e. messages, errors, deadlock events. The errors handling is especially trivial (just a counter bump). Then perhaps with this refactoring the WaitGroup won't be necessary at all.
Yeah, good call, agree it's more complex than necessary. I think we can do away completely with additional goroutines and channels/signalling that this PR introduced, though I think a waitgroup (at consumer scope) is still necessary as we still want some mechanism to block If we use a single |
Signed-off-by: albertteoh <[email protected]>
Signed-off-by: albertteoh <[email protected]>
592e1d0
to
409c72a
Compare
@@ -78,17 +79,11 @@ func (c *Consumer) Start() { | |||
c.logger.Info("Starting main loop") | |||
for pc := range c.internalConsumer.Partitions() { | |||
c.partitionMapLock.Lock() | |||
if p, ok := c.partitionIDToState[pc.Partition()]; ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure this is safe to remove? From the comment is seems like a case where a partition gets re-assigned to the same service instance, so there's a potentially another partition consumer for this partition ID already running (or more likely shutting down).
@vprithvi do you have any more color to add to this condition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We noticed the case that you described a couple of times, but weren't able to reproduce it in tests.
Since then, many things have changed with both Kafka and Sarama. I think we can remove this now in the interest of simplifying and revisit if see problems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How did it manifest when it happened? Committed offsets going backwards?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, this happens when new messages arrive on the partition (because the Messages()
channel isn't closed yet) after the partition consumer was closed (via the c.closePartition(p.partitionConsumer)
call), causing a new partition consumer to be created.
This PR proposes to solve this by calling c.internalConsumer.Close()
first within Close()
which closes the Messages()
channel, preventing new messages from arriving, and then closes the Partitions()
channel, preventing new PartitionConsumers from being created.
func (c *Consumer) handleMessages(pc sc.PartitionConsumer) { | ||
c.logger.Info("Starting message handler", zap.Int32("partition", pc.Partition())) | ||
c.partitionMapLock.Lock() | ||
c.partitionsHeld++ | ||
c.partitionsHeldGauge.Update(c.partitionsHeld) | ||
wg := &c.partitionIDToState[pc.Partition()].wg | ||
c.partitionMapLock.Unlock() | ||
defer func() { | ||
c.closePartition(pc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this closePartition
still necessary provided internalConsumer.Close()
closes all partitions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I think you're right, @vprithvi. 👍
However, closePartition(
) also increments a metrics counter, which is used in tests to detect whether if the partition is closed or not; neither the Consumer
nor PartitionConsumer
interface appear to offer an "OnClosed" callback or channel to signal successful closure of a partition.
We could instead replace c.closePartition(pc)
with c.partitionMetrics(pc.Partition()).closeCounter.Inc(1)
to bump up the metric and remove the closePartition()
function, if that sounds reasonable to you?
Pull request has been modified.
Thanks! |
Thanks for the review, @yurishkuro & @vprithvi! |
Signed-off-by: albertteoh [email protected]
Which problem is this PR solving?
Short description of the changes
Prevent deadlock between the
handleErrors()
andClose()
functions trying to acquire thepartitionLock
and access the partition WaitGroup by:partitionLock
withinClose()
Additional improvements:
close
calls: Ensure underlying sarama consumer is closed first to prevent repartitioning, causing potentially new partition consumers to be created that attempt to write to already closed channels.Close()
will rely onConsumer
-scoped WaitGroups to determine graceful closure of message and error handlers. These WaitGroups will be managed through goroutines monitoringmessagesDone
anderrorsDone
channels, which will be produced to by the respective handler functions.New
Consumer constructor to DRY up duplicated construction logic.Testing
./cmd/ingester/ingester-darwin-amd64 --log-level debug
:rate(jaeger_kafka_spans_written_total[5m])
andrate(jaeger_ingester_sarama_consumer_messages_total[5m])
more or less line up../cmd/ingester/ingester-darwin-amd64 --log-level debug & pid1="$\!"; sleep 0.1; kill $pid1