Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
41307: sql: fix various problematic uses of the txn in DistSQL flows r=andreimatei a=andreimatei

see individual commits

Release justification: Fixes bad bugs.

Co-authored-by: Andrei Matei <[email protected]>
  • Loading branch information
craig[bot] and andreimatei committed Oct 5, 2019
2 parents e6e9161 + eb51c4f commit 1c99165
Show file tree
Hide file tree
Showing 18 changed files with 455 additions and 117 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/sst_writer_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ func newSSTWriterProcessor(
settings: flowCtx.Cfg.Settings,
registry: flowCtx.Cfg.JobRegistry,
progress: spec.Progress,
db: flowCtx.EvalCtx.Txn.DB(),
}
if err := sp.out.Init(&execinfrapb.PostProcessSpec{}, sstOutputTypes, flowCtx.NewEvalCtx(), output); err != nil {
return nil, err
Expand Down Expand Up @@ -97,6 +96,7 @@ func (sp *sstWriter) OutputTypes() []types.T {
}

func (sp *sstWriter) Run(ctx context.Context) {
sp.db = sp.flowCtx.EvalCtx.Txn.DB()
sp.input.Start(ctx)

ctx, span := tracing.ChildSpan(ctx, "sstWriter")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ type unorderedSynchronizerMsg struct {
b coldata.Batch
}

var _ Operator = &UnorderedSynchronizer{}
var _ Operator = &ParallelUnorderedSynchronizer{}
var _ execinfra.OpNode = &ParallelUnorderedSynchronizer{}

// UnorderedSynchronizer is an Operator that combines multiple Operator streams
// ParallelUnorderedSynchronizer is an Operator that combines multiple Operator streams
// into one.
type UnorderedSynchronizer struct {
type ParallelUnorderedSynchronizer struct {
inputs []Operator
// readNextBatch is a slice of channels, where each channel corresponds to the
// input at the same index in inputs. It is used as a barrier for input
Expand All @@ -59,38 +60,39 @@ type UnorderedSynchronizer struct {
done bool
zeroBatch coldata.Batch
// externalWaitGroup refers to the WaitGroup passed in externally. Since the
// UnorderedSynchronizer spawns goroutines, this allows callers to wait for
// the completion of these goroutines.
// ParallelUnorderedSynchronizer spawns goroutines, this allows callers to
// wait for the completion of these goroutines.
externalWaitGroup *sync.WaitGroup
// internalWaitGroup refers to the WaitGroup internally managed by the
// UnorderedSynchronizer. This will only ever be incremented by the
// UnorderedSynchronizer and decremented by the input goroutines. This allows
// the UnorderedSynchronizer to wait only on internal goroutines.
// ParallelUnorderedSynchronizer. This will only ever be incremented by the
// ParallelUnorderedSynchronizer and decremented by the input goroutines. This
// allows the ParallelUnorderedSynchronizer to wait only on internal
// goroutines.
internalWaitGroup *sync.WaitGroup
cancelFn context.CancelFunc
batchCh chan *unorderedSynchronizerMsg
errCh chan error
}

// ChildCount implements the execinfra.OpNode interface.
func (s *UnorderedSynchronizer) ChildCount() int {
func (s *ParallelUnorderedSynchronizer) ChildCount() int {
return len(s.inputs)
}

// Child implements the execinfra.OpNode interface.
func (s *UnorderedSynchronizer) Child(nth int) execinfra.OpNode {
func (s *ParallelUnorderedSynchronizer) Child(nth int) execinfra.OpNode {
return s.inputs[nth]
}

// NewUnorderedSynchronizer creates a new UnorderedSynchronizer. On the first
// call to Next, len(inputs) goroutines are spawned to read each input
// asynchronously (to not be limited by a slow input). These will increment
// the passed-in WaitGroup and decrement when done. It is also guaranteed that
// these spawned goroutines will have completed on any error or zero-length
// batch received from Next.
func NewUnorderedSynchronizer(
// NewParallelUnorderedSynchronizer creates a new ParallelUnorderedSynchronizer.
// On the first call to Next, len(inputs) goroutines are spawned to read each
// input asynchronously (to not be limited by a slow input). These will
// increment the passed-in WaitGroup and decrement when done. It is also
// guaranteed that these spawned goroutines will have completed on any error or
// zero-length batch received from Next.
func NewParallelUnorderedSynchronizer(
inputs []Operator, typs []coltypes.T, wg *sync.WaitGroup,
) *UnorderedSynchronizer {
) *ParallelUnorderedSynchronizer {
readNextBatch := make([]chan struct{}, len(inputs))
for i := range readNextBatch {
// Buffer readNextBatch chans to allow for non-blocking writes. There will
Expand All @@ -99,7 +101,7 @@ func NewUnorderedSynchronizer(
}
zeroBatch := coldata.NewMemBatchWithSize(typs, 0)
zeroBatch.SetLength(0)
return &UnorderedSynchronizer{
return &ParallelUnorderedSynchronizer{
inputs: inputs,
readNextBatch: readNextBatch,
batches: make([]coldata.Batch, len(inputs)),
Expand All @@ -116,7 +118,7 @@ func NewUnorderedSynchronizer(
}

// Init is part of the Operator interface.
func (s *UnorderedSynchronizer) Init() {
func (s *ParallelUnorderedSynchronizer) Init() {
for _, input := range s.inputs {
input.Init()
}
Expand All @@ -130,7 +132,7 @@ func (s *UnorderedSynchronizer) Init() {
// error on s.errCh, resulting in the first error pushed to be observed by the
// Next goroutine. Inputs are asynchronous so that the synchronizer is minimally
// affected by slow inputs.
func (s *UnorderedSynchronizer) init(ctx context.Context) {
func (s *ParallelUnorderedSynchronizer) init(ctx context.Context) {
ctx, s.cancelFn = contextutil.WithCancel(ctx)
for i, input := range s.inputs {
s.nextBatch[i] = func(input Operator, inputIdx int) func() {
Expand Down Expand Up @@ -199,7 +201,7 @@ func (s *UnorderedSynchronizer) init(ctx context.Context) {
}

// Next is part of the Operator interface.
func (s *UnorderedSynchronizer) Next(ctx context.Context) coldata.Batch {
func (s *ParallelUnorderedSynchronizer) Next(ctx context.Context) coldata.Batch {
if s.done {
// TODO(yuzefovich): do we want to be on the safe side and explicitly set
// the length here (and below) to 0?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/stretchr/testify/require"
)

func TestUnorderedSynchronizer(t *testing.T) {
func TestParallelUnorderedSynchronizer(t *testing.T) {
defer leaktest.AfterTest(t)()

const (
Expand All @@ -49,7 +49,7 @@ func TestUnorderedSynchronizer(t *testing.T) {
}

var wg sync.WaitGroup
s := NewUnorderedSynchronizer(inputs, typs, &wg)
s := NewParallelUnorderedSynchronizer(inputs, typs, &wg)

ctx, cancelFn := context.WithCancel(context.Background())
var cancel bool
Expand Down Expand Up @@ -117,14 +117,14 @@ func TestUnorderedSynchronizerNoLeaksOnError(t *testing.T) {
ctx = context.Background()
wg sync.WaitGroup
)
s := NewUnorderedSynchronizer(inputs, []coltypes.T{coltypes.Int64}, &wg)
s := NewParallelUnorderedSynchronizer(inputs, []coltypes.T{coltypes.Int64}, &wg)
err := execerror.CatchVectorizedRuntimeError(func() { _ = s.Next(ctx) })
// This is the crux of the test: assert that all inputs have finished.
require.Equal(t, len(inputs), int(atomic.LoadUint32(&s.numFinishedInputs)))
require.True(t, testutils.IsError(err, expectedErr), err)
}

func BenchmarkUnorderedSynchronizer(b *testing.B) {
func BenchmarkParallelUnorderedSynchronizer(b *testing.B) {
const numInputs = 6

typs := []coltypes.T{coltypes.Int64}
Expand All @@ -136,7 +136,7 @@ func BenchmarkUnorderedSynchronizer(b *testing.B) {
}
var wg sync.WaitGroup
ctx, cancelFn := context.WithCancel(context.Background())
s := NewUnorderedSynchronizer(inputs, typs, &wg)
s := NewParallelUnorderedSynchronizer(inputs, typs, &wg)
b.SetBytes(8 * int64(coldata.BatchSize()))
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down
78 changes: 78 additions & 0 deletions pkg/sql/colexec/serial_unordered_synchronizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package colexec

import (
"context"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/col/coltypes"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
)

// SerialUnorderedSynchronizer is an Operator that combines multiple Operator
// streams into one. It reads its inputs one by one until each one is exhausted,
// at which point it moves to the next input. See ParallelUnorderedSynchronizer
// for a parallel implementation. The serial one is used when concurrency is
// undesirable - for example when the whole query is planned on the gateway and
// we want to run it in the RootTxn.
type SerialUnorderedSynchronizer struct {
inputs []Operator
// curSerialInputIdx indicates the index of the current input being consumed.
curSerialInputIdx int
zeroBatch coldata.Batch
}

var _ Operator = &SerialUnorderedSynchronizer{}
var _ execinfra.OpNode = &SerialUnorderedSynchronizer{}

// ChildCount implements the execinfra.OpNode interface.
func (s *SerialUnorderedSynchronizer) ChildCount() int {
return len(s.inputs)
}

// Child implements the execinfra.OpNode interface.
func (s *SerialUnorderedSynchronizer) Child(nth int) execinfra.OpNode {
return s.inputs[nth]
}

// NewSerialUnorderedSynchronizer creates a new SerialUnorderedSynchronizer.
func NewSerialUnorderedSynchronizer(
inputs []Operator, typs []coltypes.T,
) *SerialUnorderedSynchronizer {
return &SerialUnorderedSynchronizer{
inputs: inputs,
curSerialInputIdx: 0,
zeroBatch: coldata.NewMemBatchWithSize(typs, 0),
}
}

// Init is part of the Operator interface.
func (s *SerialUnorderedSynchronizer) Init() {
for _, input := range s.inputs {
input.Init()
}
}

// Next is part of the Operator interface.
func (s *SerialUnorderedSynchronizer) Next(ctx context.Context) coldata.Batch {
for {
if s.curSerialInputIdx == len(s.inputs) {
return s.zeroBatch
}
b := s.inputs[s.curSerialInputIdx].Next(ctx)
if b.Length() == 0 {
s.curSerialInputIdx++
} else {
return b
}
}
}
52 changes: 52 additions & 0 deletions pkg/sql/colexec/serial_unordered_synchronizer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package colexec

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/col/coltypes"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/stretchr/testify/require"
)

func TestSerialUnorderedSynchronizer(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
rng, _ := randutil.NewPseudoRand()
const numInputs = 3
const numBatches = 4

typs := []coltypes.T{coltypes.Int64}
inputs := make([]Operator, numInputs)
for i := range inputs {
batch := coldata.NewMemBatchWithSize(typs, int(coldata.BatchSize()))
batch.SetLength(coldata.BatchSize())
source := NewRepeatableBatchSource(
RandomBatch(rng, typs, int(coldata.BatchSize()), 0 /* length */, rng.Float64()))
source.ResetBatchesToReturn(numBatches)
inputs[i] = source
}
s := NewSerialUnorderedSynchronizer(inputs, typs)
resultBatches := 0
for {
b := s.Next(ctx)
if b.Length() == 0 {
break
}
resultBatches++
}
require.Equal(t, numInputs*numBatches, resultBatches)
}
Loading

0 comments on commit 1c99165

Please sign in to comment.