-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Retry messages on shutdown #420
Conversation
0d90bd3
to
1ae385a
Compare
A recent benchmarking and profiling push says: maybe this has a tiny impact on performance, but it's still swamped out by stupid stuff like CRC calculations, so not a concern. |
18c94ac
to
91534c6
Compare
CI failing because of kisielk/errcheck#70 |
CI fixed. @Shopify/kafka this is ready for review. |
bd75081
to
644af59
Compare
@@ -355,6 +347,7 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch | |||
// in fact this message is not even the current retry level, so buffer it for now (unless it's a just a chaser) | |||
if msg.flags&chaser == chaser { | |||
retryState[msg.retries].expectChaser = false | |||
p.inFlight.Done() // this chaser is now useless and will be garbage collected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/is now useless/is now handled/
?
I think the accounting is correct.
👍, this is a nice simplification and it's much easier to understand now. |
644af59
to
54eb5af
Compare
Your description of the accounting matches my understanding exactly. Once CI is 🍏 and I've had a quick pair of 👀 on the third commit, I think this is good to go. |
continue | ||
} else if msg.retries == 0 { | ||
if shuttingDown { | ||
p.returnError(msg, ErrShuttingDown) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reduces the inflight counter, but it was never incremented for this message. We should probably move the p.inFlight.Add(1)
up so it always gets executed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oooh, really nice catch. I will fix and see if I can add or adjust a test for this case too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, this path is fixed and tested now.
2bb62c1
to
08ccf5e
Compare
I think this looks good. We should do some stress testing of this though. |
I am comfortable enough to push this to master now. I will do some stressing before releasing the next stable version. |
@Shopify/kafka fixes #419.
Rather than use the old complicated system of reference-counting flags to shutdown cleanly, do the much simpler thing: keep a
sync.WaitGroup
counting the number of messages "in flight" (aka owned by the producer). When shutdown is requested, spawn a goroutine that waits for this counter to hit 0, then closes everything in one go.We add messages to the in-flight set in the
topicDispatcher
(only new messages with retries==0 though). We remove messages from the in-flight set inreturnError
andreturnSuccesses
; even if theProducer.Return.*
values are false they are still guaranteed to see every message.We also add/remove chaser messages in
leaderDispatcher
.This still needs tests.I'm not sure what performance impact the waitgroup will have. An alternative might be an atomic counter, and have the shutdown goroutine just poll it every 10ms or something.