From c05ce92f2e34145915d7b1034e702e7ff16ceb89 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Tue, 12 Feb 2019 14:52:05 -0500 Subject: [PATCH] client/requestbatcher: fix panic when only batch is sent due to size This PR fixes a bug discovered during the implementation of #34803 whereby a nil batch may be send due to a stale timer that corresponds to the sole batch being sent due to size and no logic to cancel its timer. This condition occurs when there is one outstanding batch for a single range and it gets filled according to size constraints. Hopefully this case is rare because in cases where a batch is filled based on size limits hopefully there is traffic to another range. This is a pretty egregious bug. While fixing it I encountered another gotcha with regards to the timeutil.Timer.Stop method for which I added additional comments. Release note: None --- pkg/internal/client/requestbatcher/batcher.go | 7 +++- .../client/requestbatcher/batcher_test.go | 33 +++++++++++++++++++ pkg/util/timeutil/timer.go | 2 ++ 3 files changed, 41 insertions(+), 1 deletion(-) diff --git a/pkg/internal/client/requestbatcher/batcher.go b/pkg/internal/client/requestbatcher/batcher.go index d37145e03685..897c1950346d 100644 --- a/pkg/internal/client/requestbatcher/batcher.go +++ b/pkg/internal/client/requestbatcher/batcher.go @@ -232,7 +232,7 @@ func (b *RequestBatcher) cleanup(err error) { func (b *RequestBatcher) run(ctx context.Context) { var deadline time.Time - var timer timeutil.Timer + timer := timeutil.NewTimer() maybeSetTimer := func() { var nextDeadline time.Time if next := b.batches.peekFront(); next != nil { @@ -242,6 +242,11 @@ func (b *RequestBatcher) run(ctx context.Context) { deadline = nextDeadline if !deadline.IsZero() { timer.Reset(time.Until(deadline)) + } else { + // Clear the current timer due to a sole batch already sent due to + // size constraints. + timer.Stop() + timer = timeutil.NewTimer() } } } diff --git a/pkg/internal/client/requestbatcher/batcher_test.go b/pkg/internal/client/requestbatcher/batcher_test.go index 550809d383f0..bee99e8f1097 100644 --- a/pkg/internal/client/requestbatcher/batcher_test.go +++ b/pkg/internal/client/requestbatcher/batcher_test.go @@ -51,6 +51,39 @@ func (c chanSender) Send( return resp.br, resp.pe } +func TestBatcherSendOnSizeWithReset(t *testing.T) { + // This test ensures that when a single batch ends up sending due to size + // constrains its timer is successfully canceled and does not lead to a + // nil panic due to an attempt to send a batch due to the old timer. + defer leaktest.AfterTest(t)() + stopper := stop.NewStopper() + defer stopper.Stop(context.Background()) + sc := make(chanSender) + const wait = 5 * time.Millisecond + b := New(Config{ + MaxIdle: wait, + MaxWait: wait, + MaxMsgsPerBatch: 2, + Sender: sc, + Stopper: stopper, + }) + var g errgroup.Group + sendRequest := func(rangeID roachpb.RangeID, request roachpb.Request) { + g.Go(func() error { + _, err := b.Send(context.Background(), rangeID, request) + return err + }) + } + sendRequest(1, &roachpb.GetRequest{}) + sendRequest(1, &roachpb.GetRequest{}) + s := <-sc + s.respChan <- batchResp{} + time.Sleep(wait) + if err := g.Wait(); err != nil { + t.Fatalf("Failed to send: %v", err) + } +} + func TestBatcherSend(t *testing.T) { defer leaktest.AfterTest(t)() stopper := stop.NewStopper() diff --git a/pkg/util/timeutil/timer.go b/pkg/util/timeutil/timer.go index 1b975e228542..77aa74046912 100644 --- a/pkg/util/timeutil/timer.go +++ b/pkg/util/timeutil/timer.go @@ -94,6 +94,8 @@ func (t *Timer) Reset(d time.Duration) { // the timer, false if the timer has already expired, been stopped previously, // or had never been initialized with a call to Timer.Reset. Stop does not // close the channel, to prevent a read from succeeding incorrectly. +// Note that a Timer must never be used again after calls to Stop as the timer +// object will be put into an object pool for reuse. func (t *Timer) Stop() bool { var res bool if t.timer != nil {