Skip to content

Commit

Permalink
Merge pull request #64219 from yuzefovich/backport21.1-63772
Browse files Browse the repository at this point in the history
release-21.1: colflow: cancel flow on ungraceful stream shutdown in outbox
  • Loading branch information
yuzefovich authored Apr 29, 2021
2 parents e887ee6 + e92fee3 commit 65bd373
Show file tree
Hide file tree
Showing 15 changed files with 150 additions and 80 deletions.
10 changes: 5 additions & 5 deletions pkg/col/coldatatestutils/random_testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,14 +304,14 @@ type RandomDataOpArgs struct {
Nulls bool
// BatchAccumulator, if set, will be called before returning a coldata.Batch
// from Next.
BatchAccumulator func(b coldata.Batch, typs []*types.T)
BatchAccumulator func(ctx context.Context, b coldata.Batch, typs []*types.T)
}

// RandomDataOp is an operator that generates random data according to
// RandomDataOpArgs. Call GetBuffer to get all data that was returned.
type RandomDataOp struct {
allocator *colmem.Allocator
batchAccumulator func(b coldata.Batch, typs []*types.T)
batchAccumulator func(ctx context.Context, b coldata.Batch, typs []*types.T)
typs []*types.T
rng *rand.Rand
batchSize int
Expand Down Expand Up @@ -366,12 +366,12 @@ func NewRandomDataOp(
func (o *RandomDataOp) Init() {}

// Next is part of the colexec.Operator interface.
func (o *RandomDataOp) Next(context.Context) coldata.Batch {
func (o *RandomDataOp) Next(ctx context.Context) coldata.Batch {
if o.numReturned == o.numBatches {
// Done.
b := coldata.ZeroBatch
if o.batchAccumulator != nil {
o.batchAccumulator(b, o.typs)
o.batchAccumulator(ctx, b, o.typs)
}
return b
}
Expand All @@ -398,7 +398,7 @@ func (o *RandomDataOp) Next(context.Context) coldata.Batch {
}
o.numReturned++
if o.batchAccumulator != nil {
o.batchAccumulator(b, o.typs)
o.batchAccumulator(ctx, b, o.typs)
}
return b
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colcontainer/diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestDiskQueue(t *testing.T) {
NumBatches: cap(batches),
BatchSize: 1 + rng.Intn(coldata.BatchSize()),
Nulls: true,
BatchAccumulator: func(b coldata.Batch, typs []*types.T) {
BatchAccumulator: func(_ context.Context, b coldata.Batch, typs []*types.T) {
batches = append(batches, coldatatestutils.CopyBatch(b, typs, testColumnFactory))
},
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecutils/spilling_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestSpillingQueue(t *testing.T) {
NumBatches: numBatches,
BatchSize: inputBatchSize,
Nulls: true,
BatchAccumulator: func(b coldata.Batch, typs []*types.T) {
BatchAccumulator: func(_ context.Context, b coldata.Batch, typs []*types.T) {
if b.Length() == 0 {
return
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colflow/colrpc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ go_test(
"//pkg/util/mon",
"//pkg/util/randutil",
"//pkg/util/stop",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
Expand Down
56 changes: 44 additions & 12 deletions pkg/sql/colflow/colrpc/colrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
Expand Down Expand Up @@ -145,7 +146,7 @@ func TestOutboxInbox(t *testing.T) {
// flow.
streamCtxCancel
// readerCtxCancel models a scenario in which the Inbox host cancels the
// flow.
// flow. This is considered a graceful termination.
readerCtxCancel
// transportBreaks models a scenario in which the transport breaks.
transportBreaks
Expand Down Expand Up @@ -197,6 +198,10 @@ func TestOutboxInbox(t *testing.T) {
// probability before cancellation.
sleepBeforeCancellation = rng.Float64() <= 0.25
sleepTime = time.Microsecond * time.Duration(rng.Intn(500))
// stopwatch is used to measure how long it takes for the outbox to
// exit once the transport broke.
stopwatch = timeutil.NewStopWatch()
transportBreaksProducerSleep = 4 * time.Second
)

// Test random selection as the Outbox should be deselecting before sending
Expand All @@ -206,15 +211,28 @@ func TestOutboxInbox(t *testing.T) {
DeterministicTyps: typs,
NumBatches: 64,
Selection: true,
BatchAccumulator: inputBuffer.Add,
BatchAccumulator: func(_ context.Context, b coldata.Batch, typs []*types.T) {
inputBuffer.Add(b, typs)
},
}

if cancellationScenario != noCancel {
// Crank up the number of batches so cancellation always happens in the
// middle of execution (or before).
args.NumBatches = math.MaxInt64
// Disable accumulation to avoid memory blowups.
args.BatchAccumulator = nil
if cancellationScenario == transportBreaks {
// Insert an artificial sleep in order to simulate that the
// input to the outbox takes a while to produce each batch.
args.BatchAccumulator = func(ctx context.Context, b coldata.Batch, typs []*types.T) {
select {
case <-ctx.Done():
case <-time.After(transportBreaksProducerSleep):
}
}
} else {
// Disable accumulation to avoid memory blowups.
args.BatchAccumulator = nil
}
}
inputMemAcc := testMemMonitor.MakeBoundAccount()
defer inputMemAcc.Close(ctx)
Expand Down Expand Up @@ -243,7 +261,15 @@ func TestOutboxInbox(t *testing.T) {
)
wg.Add(1)
go func() {
outbox.runWithStream(streamCtx, clientStream, func() { atomic.StoreUint32(&canceled, 1) })
// There is a bit of trickery going on here with the context
// management caused by the fact that we're using an internal
// runWithStream method rather than exported Run method. The goal is
// to create a context of the node on which the outbox runs and keep
// it different from the streamCtx. This matters in
// 'transportBreaks' scenario.
var flowCtxCancel context.CancelFunc
outbox.runnerCtx, flowCtxCancel = context.WithCancel(ctx)
outbox.runWithStream(streamCtx, clientStream, flowCtxCancel, func() { atomic.StoreUint32(&canceled, 1) })
wg.Done()
}()

Expand All @@ -262,6 +288,7 @@ func TestOutboxInbox(t *testing.T) {
case transportBreaks:
err := conn.Close() // nolint:grpcconnclose
require.NoError(t, err)
stopwatch.Start()
}
wg.Done()
}()
Expand Down Expand Up @@ -358,10 +385,11 @@ func TestOutboxInbox(t *testing.T) {
// cancellation (which is redundant) in the Outbox.
require.True(t, atomic.LoadUint32(&canceled) == 1)
case readerCtxCancel:
// If the reader context gets canceled, the Inbox should have returned
// from the stream handler.
require.Regexp(t, "context canceled", streamHandlerErr)
// The Inbox should propagate this error upwards.
// If the reader context gets canceled, it is treated as a graceful
// termination of the stream, so we expect no error from the stream
// handler.
require.Nil(t, streamHandlerErr)
// The Inbox should still propagate this error upwards.
require.True(t, testutils.IsError(readerErr, "context canceled"), readerErr)

// The cancellation should have been communicated to the Outbox, resulting
Expand All @@ -370,6 +398,10 @@ func TestOutboxInbox(t *testing.T) {
case transportBreaks:
// If the transport breaks, the scenario is very similar to
// streamCtxCancel. GRPC will cancel the stream handler's context.
stopwatch.Stop()
// We expect that the outbox exits much sooner than it receives the
// next batch from its input in this scenario.
require.Less(t, int64(stopwatch.Elapsed()), int64(transportBreaksProducerSleep/2), "Outbox took too long to exit on transport breakage")
require.True(t, testutils.IsError(streamHandlerErr, "context canceled"), streamHandlerErr)
require.True(t, testutils.IsError(readerErr, "context canceled"), readerErr)

Expand Down Expand Up @@ -523,7 +555,7 @@ func TestOutboxInboxMetadataPropagation(t *testing.T) {
)
wg.Add(1)
go func() {
outbox.runWithStream(ctx, clientStream, func() { atomic.StoreUint32(&canceled, 1) })
outbox.runWithStream(ctx, clientStream, nil /* flowCtxCancel */, func() { atomic.StoreUint32(&canceled, 1) })
wg.Done()
}()

Expand Down Expand Up @@ -595,7 +627,7 @@ func BenchmarkOutboxInbox(b *testing.B) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
outbox.runWithStream(ctx, clientStream, nil /* cancelFn */)
outbox.runWithStream(ctx, clientStream, nil /* flowCtxCancel */, nil /* outboxCtxCancel */)
wg.Done()
}()

Expand Down Expand Up @@ -659,7 +691,7 @@ func TestOutboxStreamIDPropagation(t *testing.T) {
roachpb.NodeID(0),
execinfrapb.FlowID{UUID: uuid.MakeV4()},
outboxStreamID,
nil, /* cancelFn */
nil, /* flowCtxCancel */
0, /* connectionTimeout */
)
outboxDone <- struct{}{}
Expand Down
25 changes: 19 additions & 6 deletions pkg/sql/colflow/colrpc/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,11 @@ func (i *Inbox) init(ctx context.Context) error {
i.errCh <- fmt.Errorf("%s: remote stream arrived too late", err)
return err
case <-ctx.Done():
i.errCh <- fmt.Errorf("%s: Inbox while waiting for stream", ctx.Err())
// Our reader canceled the context meaning that it no longer needs
// any more data from the outbox. This is a graceful termination, so
// we don't send any error on errCh and only return an error. This
// will close the inbox (making the stream handler exit gracefully)
// and will stop the current goroutine from proceeding further.
return ctx.Err()
}

Expand All @@ -196,8 +200,11 @@ func (i *Inbox) init(ctx context.Context) error {
return nil
}

// close closes the inbox, ensuring that any call to RunWithStream will
// return immediately. close is idempotent.
// close closes the inbox, ensuring that any call to RunWithStream will return
// immediately. close is idempotent.
// NOTE: it is very important to close the Inbox only when execution terminates
// in one way or another. DrainMeta will use the stream to read any remaining
// metadata after Next returns a zero-length batch during normal execution.
func (i *Inbox) close() {
if !i.done {
i.done = true
Expand All @@ -218,13 +225,17 @@ func (i *Inbox) RunWithStream(streamCtx context.Context, stream flowStreamServer
var readerCtx context.Context
select {
case err := <-i.errCh:
// nil will be read from errCh when the channel is closed.
return err
case readerCtx = <-i.contextCh:
log.VEvent(streamCtx, 2, "Inbox reader arrived")
case <-streamCtx.Done():
return fmt.Errorf("%s: streamCtx while waiting for reader (remote client canceled)", streamCtx.Err())
case <-i.flowCtx.Done():
return fmt.Errorf("%s: flowCtx while waiting for reader (local server canceled)", i.flowCtx.Err())
// The inbox host canceled the stream meaning that it no longer needs
// any more data from the outbox. This is a graceful termination, so we
// return nil.
return nil
}

// Now wait for one of the events described in the method comment. If a
Expand All @@ -235,8 +246,10 @@ func (i *Inbox) RunWithStream(streamCtx context.Context, stream flowStreamServer
// nil will be read from errCh when the channel is closed.
return err
case <-readerCtx.Done():
// The reader canceled the stream.
return fmt.Errorf("%s: readerCtx in Inbox stream handler (local reader canceled)", readerCtx.Err())
// The reader canceled the stream meaning that it no longer needs any
// more data from the outbox. This is a graceful termination, so we
// return nil.
return nil
case <-streamCtx.Done():
// The client canceled the stream.
return fmt.Errorf("%s: streamCtx in Inbox stream handler (remote client canceled)", streamCtx.Err())
Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/colflow/colrpc/inbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ func TestInboxCancellation(t *testing.T) {
require.True(t, testutils.IsError(err, "context canceled"), err)
// Now, the remote stream arrives.
err = inbox.RunWithStream(context.Background(), mockFlowStreamServer{})
require.True(t, testutils.IsError(err, "while waiting for stream"), err)
// We expect no error from the stream handler since we canceled it
// ourselves (a graceful termination).
require.Nil(t, err)
})

t.Run("DuringRecv", func(t *testing.T) {
Expand All @@ -98,7 +100,9 @@ func TestInboxCancellation(t *testing.T) {
// Cancel the context.
cancelFn()
err = <-streamHandlerErrCh
require.True(t, testutils.IsError(err, "readerCtx in Inbox stream handler"), err)
// Reader context cancellation is a graceful termination, so no error
// should be returned.
require.Nil(t, err)

// The mock RPC layer does not unblock the Recv for us on the server side,
// so manually send an io.EOF to the reader goroutine.
Expand Down
Loading

0 comments on commit 65bd373

Please sign in to comment.