-
Notifications
You must be signed in to change notification settings - Fork 20.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
event: fix Resubscribe deadlock when unsubscribing after inner sub ends #28359
Conversation
A goroutine is used to manage the lifetime of subscriptions managed by resubscriptions. When the subscription ends with no error, the resub goroutine ends as well. However, the resub goroutine needs to live long enough to read from the unsub channel. Otheriwse, an Unsubscribe call deadlocks when writing to the unsub channel.
event/subscription_test.go
Outdated
sub := ResubscribeErr(100*time.Millisecond, func(ctx context.Context, lastErr error) (Subscription, error) { | ||
return NewSubscription(func(unsubscribed <-chan struct{}) error { | ||
select { | ||
case <-time.After(2 * time.Second): | ||
innerSubDone <- struct{}{} | ||
return nil | ||
case <-unsubscribed: | ||
return nil | ||
} | ||
}), nil | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not too familiar with how the semantics of subscriptions work in any depth, but I've been trying to figure out if this is 'correct' or not. The docs for NewSubscription says
NewSubscription runs a producer function as a subscription in a new goroutine. The
channel given to the producer is closed when Unsubscribe is called. If fn returns an
error, it is sent on the subscription's error channel.
In other words, unsubscribed
will be closed when externalities wants the producer to stop. In this testcase, however, the producer just stops producing, and exiting without returning any error or closing any channel. And yeah, that intuitively seems like something that could lead to a deadlock.
If the select is changed into
select {
case <-time.After(2 * time.Second):
innerSubDone <- struct{}{}
return errors.New("time to go")
case <-unsubscribed:
return nil
}
Then the deadlock disappears.
So my thinking goes: this testcase is based on a flawed producer. But also, the NewSubscription
documentation should mention something like "the producer must either exit with an error or keep listening to the given channel".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Clarifying the behavior of producers would be a better fix in that case. Will do that instead. Perhaps it'll be good to add defensive checks against nil
producer errors by panicking in that case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note, though: I'm no authority here. @fjl would know
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that this documentation already clarifies the intended behavior based on if fn returns an error
; implying that producers are permitted to return nil errors. And Subscription
explicitly checks for a nil error here. While this behavior doesn't compose well with Resubscriptions, we should maintain the current API contract.
Maybe an easier fix would be adding a 1 buffer to |
@fjl Simplified the fix by making unsub buffered. Thanks for the suggestion. |
event/subscription_test.go
Outdated
sub := ResubscribeErr(100*time.Millisecond, func(ctx context.Context, lastErr error) (Subscription, error) { | ||
return NewSubscription(func(unsubscribed <-chan struct{}) error { | ||
select { | ||
case <-time.After(2 * time.Second): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's weird to have a timeout here. You can achieve the necessary synchronization event with another channel, like so
quitInner := make(chan struct{})
...
select {
case <-quitInner:
...
close(quitInner)
<-innerSubDone
sub.Unsubscribe()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good suggestion. Made the change.
…ds (ethereum#28359) A goroutine is used to manage the lifetime of subscriptions managed by resubscriptions. When the subscription ends with no error, the resub goroutine ends as well. However, the resub goroutine needs to live long enough to read from the unsub channel. Otheriwse, an Unsubscribe call deadlocks when writing to the unsub channel. This is fixed by adding a buffer to the unsub channel.
…r sub ends (ethereum#28359)" This reverts commit 3a1adc8.
…r sub ends (ethereum#28359)" This reverts commit 3a1adc8.
Cherry-pick changes from ethereum/go-ethereum#28359
…ds (ethereum#28359) A goroutine is used to manage the lifetime of subscriptions managed by resubscriptions. When the subscription ends with no error, the resub goroutine ends as well. However, the resub goroutine needs to live long enough to read from the unsub channel. Otheriwse, an Unsubscribe call deadlocks when writing to the unsub channel. This is fixed by adding a buffer to the unsub channel.
A goroutine oversees the lifetime of subscriptions handled by resubscriptions. This goroutine terminates when the subscription ends without any errors. However, the resub goroutine needs to live long enough to read from the unsub channel. Otherwise, an Unsubscribe call deadlocks when writing to the unsub channel.