diff --git a/js.go b/js.go index 97ce71d26..7448a24c3 100644 --- a/js.go +++ b/js.go @@ -2867,6 +2867,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { if subClosed { err = errors.Join(ErrBadSubscription, ErrSubscriptionClosed) } + hbLock := sync.Mutex{} if err == nil && len(msgs) < batch && !subClosed { // For batch real size of 1, it does not make sense to set no_wait in // the request. @@ -2909,7 +2910,9 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { if o.hb > 0 { if hbTimer == nil { hbTimer = time.AfterFunc(2*o.hb, func() { + hbLock.Lock() hbErr = ErrNoHeartbeat + hbLock.Unlock() cancel() }) } else { @@ -2951,6 +2954,8 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { } // If there is at least a message added to msgs, then need to return OK and no error if err != nil && len(msgs) == 0 { + hbLock.Lock() + defer hbLock.Unlock() if hbErr != nil { return nil, hbErr } @@ -3181,9 +3186,12 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e } var hbTimer *time.Timer var hbErr error + hbLock := sync.Mutex{} if o.hb > 0 { hbTimer = time.AfterFunc(2*o.hb, func() { + hbLock.Lock() hbErr = ErrNoHeartbeat + hbLock.Unlock() cancel() }) } @@ -3219,11 +3227,13 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e } } if err != nil { + hbLock.Lock() if hbErr != nil { result.err = hbErr } else { result.err = o.checkCtxErr(err) } + hbLock.Unlock() } close(result.msgs) result.done <- struct{}{}