Skip to content

Commit

Permalink
🐞 fix closure in PeekLockRenewer that was causing context cancelation…
Browse files Browse the repository at this point in the history
… to impact another message (#141)

* fix closure in lock renewer
* add a concurrency unit test on the lock renewer
  • Loading branch information
serbrech authored Jul 19, 2023
1 parent 83d561e commit 1925126
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 8 deletions.
14 changes: 7 additions & 7 deletions v2/lockrenewer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ func NewLockRenewalHandler(lockRenewer LockRenewer, options *LockRenewalOptions,
cancelMessageContextOnStop = *options.CancelMessageContextOnStop
}
}
plr := &peekLockRenewer{
next: handler,
lockRenewer: lockRenewer,
renewalInterval: &interval,
cancelMessageCtxOnStop: cancelMessageContextOnStop,
stopped: make(chan struct{}, 1), // buffered channel to ensure we are not blocking
}
return func(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage) {
plr := &peekLockRenewer{
next: handler,
lockRenewer: lockRenewer,
renewalInterval: &interval,
cancelMessageCtxOnStop: cancelMessageContextOnStop,
stopped: make(chan struct{}, 1), // buffered channel to ensure we are not blocking
}
renewalCtx, cancel := context.WithCancel(ctx)
plr.cancelMessageCtx = cancel
go plr.startPeriodicRenewal(renewalCtx, message)
Expand Down
62 changes: 61 additions & 1 deletion v2/lockrenewer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package shuttle_test
import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
Expand All @@ -16,12 +17,27 @@ import (

type fakeSBLockRenewer struct {
RenewCount atomic.Int32
PerMessage map[*azservicebus.ReceivedMessage]*atomic.Int32
mapLock sync.Mutex
Err error
}

func (r *fakeSBLockRenewer) RenewMessageLock(ctx context.Context, message *azservicebus.ReceivedMessage,
options *azservicebus.RenewMessageLockOptions) error {
_ *azservicebus.RenewMessageLockOptions) error {
r.RenewCount.Add(1)
r.mapLock.Lock()
defer r.mapLock.Unlock()
if r.PerMessage == nil {
r.PerMessage = map[*azservicebus.ReceivedMessage]*atomic.Int32{
message: {},
}
}
perMessageCount := r.PerMessage[message]
if perMessageCount == nil {
r.PerMessage[message] = &atomic.Int32{}
}
perMessageCount = r.PerMessage[message]
perMessageCount.Add(1)
return r.Err
}

Expand All @@ -47,6 +63,50 @@ func Test_StopRenewingOnHandlerCompletion(t *testing.T) {
20*time.Millisecond).Should(Succeed())
}

func Test_RenewalHandlerStayIndependentPerMessage(t *testing.T) {
renewer := &fakeSBLockRenewer{}
settler := &fakeSettler{}
g := NewWithT(t)
interval := 50 * time.Millisecond
lr := shuttle.NewLockRenewalHandler(renewer, &shuttle.LockRenewalOptions{Interval: &interval},
shuttle.HandlerFunc(func(ctx context.Context, settler shuttle.MessageSettler,
message *azservicebus.ReceivedMessage) {
// Sleep > 100ms to allow 2 renewal to happen
time.Sleep(120 * time.Millisecond)
err := settler.CompleteMessage(ctx, message, nil)
g.Expect(err).To(Not(HaveOccurred()))
}))
// send 2 message with different context, cancel the 2nd context right away.
// The 2nd message should not be renewed.
// The 1st message should be renewed exactly twice
msg1 := &azservicebus.ReceivedMessage{}
msg2 := &azservicebus.ReceivedMessage{}
ctx := context.Background()
ctx1, cancel1 := context.WithCancel(ctx)
ctx2, cancel2 := context.WithCancel(ctx)
defer cancel1()
defer cancel2()
go lr.Handle(ctx1, settler, msg1)
go lr.Handle(ctx2, settler, msg2)
// cancel 2nd message context right away
cancel2()
g.Eventually(
func(g Gomega) {
g.Expect(renewer.PerMessage[msg2]).To(BeNil(), "msg2 should not be in the map")
g.Expect(renewer.PerMessage[msg1]).ToNot(BeNil(), "msg1 should be in the map")
g.Expect(renewer.PerMessage[msg1].Load()).To(Equal(int32(2)))
},
200*time.Millisecond,
10*time.Millisecond).Should(Succeed())
g.Eventually(
func(g Gomega) {
g.Expect(settler.CompleteCalled.Load()).To(Equal(int32(2)))
},
100*time.Millisecond,
10*time.Millisecond).Should(Succeed())

}

func Test_RenewPeriodically(t *testing.T) {
renewer := &fakeSBLockRenewer{}
interval := 50 * time.Millisecond
Expand Down

0 comments on commit 1925126

Please sign in to comment.