Skip to content

Commit

Permalink
client/requestbatcher: fix panic when only batch is sent due to size
Browse files Browse the repository at this point in the history
This PR fixes a bug discovered during the implementation of #34803 whereby a
nil batch may be send due to a stale timer that corresponds to the sole batch
being sent due to size and no logic to cancel its timer. This condition occurs
when there is one outstanding batch for a single range and it gets filled
according to size constraints. Hopefully this case is rare because in cases
where a batch is filled based on size limits hopefully there is traffic to
another range. This is a pretty egregious bug. While fixing it I encountered
another gotcha with regards to the timeutil.Timer.Stop method for which I
added additional comments.

Release note: None
  • Loading branch information
ajwerner committed Feb 12, 2019
1 parent 011c8f7 commit 4d82429
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 1 deletion.
7 changes: 6 additions & 1 deletion pkg/internal/client/requestbatcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (b *RequestBatcher) cleanup(err error) {

func (b *RequestBatcher) run(ctx context.Context) {
var deadline time.Time
var timer timeutil.Timer
timer := timeutil.NewTimer()
maybeSetTimer := func() {
var nextDeadline time.Time
if next := b.batches.peekFront(); next != nil {
Expand All @@ -242,6 +242,11 @@ func (b *RequestBatcher) run(ctx context.Context) {
deadline = nextDeadline
if !deadline.IsZero() {
timer.Reset(time.Until(deadline))
} else {
// Clear the current timer due to a sole batch already sent before
// the timer fired.
timer.Stop()
timer = timeutil.NewTimer()
}
}
}
Expand Down
33 changes: 33 additions & 0 deletions pkg/internal/client/requestbatcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,39 @@ func (c chanSender) Send(
return resp.br, resp.pe
}

func TestBatcherSendOnSizeWithReset(t *testing.T) {
// This test ensures that when a single batch ends up sending due to size
// constrains its timer is successfully canceled and does not lead to a
// nil panic due to an attempt to send a batch due to the old timer.
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
sc := make(chanSender)
const wait = 5 * time.Millisecond
b := New(Config{
MaxIdle: wait,
MaxWait: wait,
MaxMsgsPerBatch: 2,
Sender: sc,
Stopper: stopper,
})
var g errgroup.Group
sendRequest := func(rangeID roachpb.RangeID, request roachpb.Request) {
g.Go(func() error {
_, err := b.Send(context.Background(), rangeID, request)
return err
})
}
sendRequest(1, &roachpb.GetRequest{})
sendRequest(1, &roachpb.GetRequest{})
s := <-sc
s.respChan <- batchResp{}
time.Sleep(wait)
if err := g.Wait(); err != nil {
t.Fatalf("Failed to send: %v", err)
}
}

func TestBatcherSend(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/timeutil/timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ func (t *Timer) Reset(d time.Duration) {
// the timer, false if the timer has already expired, been stopped previously,
// or had never been initialized with a call to Timer.Reset. Stop does not
// close the channel, to prevent a read from succeeding incorrectly.
// Note that a Timer must never be used again after calls to Stop as the timer
// object will be put into an object pool for reuse.
func (t *Timer) Stop() bool {
var res bool
if t.timer != nil {
Expand Down

0 comments on commit 4d82429

Please sign in to comment.