Skip to content

Commit

Permalink
[FIXED] Race condition in Fetch and FetchBatch when using heartbeats (#…
Browse files Browse the repository at this point in the history
…1601)

Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio authored Apr 2, 2024
1 parent 33316cd commit 4207658
Showing 1 changed file with 10 additions and 0 deletions.
10 changes: 10 additions & 0 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -2867,6 +2867,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
if subClosed {
err = errors.Join(ErrBadSubscription, ErrSubscriptionClosed)
}
hbLock := sync.Mutex{}
if err == nil && len(msgs) < batch && !subClosed {
// For batch real size of 1, it does not make sense to set no_wait in
// the request.
Expand Down Expand Up @@ -2909,7 +2910,9 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
if o.hb > 0 {
if hbTimer == nil {
hbTimer = time.AfterFunc(2*o.hb, func() {
hbLock.Lock()
hbErr = ErrNoHeartbeat
hbLock.Unlock()
cancel()
})
} else {
Expand Down Expand Up @@ -2951,6 +2954,8 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
}
// If there is at least a message added to msgs, then need to return OK and no error
if err != nil && len(msgs) == 0 {
hbLock.Lock()
defer hbLock.Unlock()
if hbErr != nil {
return nil, hbErr
}
Expand Down Expand Up @@ -3181,9 +3186,12 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
}
var hbTimer *time.Timer
var hbErr error
hbLock := sync.Mutex{}
if o.hb > 0 {
hbTimer = time.AfterFunc(2*o.hb, func() {
hbLock.Lock()
hbErr = ErrNoHeartbeat
hbLock.Unlock()
cancel()
})
}
Expand Down Expand Up @@ -3219,11 +3227,13 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
}
}
if err != nil {
hbLock.Lock()
if hbErr != nil {
result.err = hbErr
} else {
result.err = o.checkCtxErr(err)
}
hbLock.Unlock()
}
close(result.msgs)
result.done <- struct{}{}
Expand Down

0 comments on commit 4207658

Please sign in to comment.