Skip to content

Commit

Permalink
distsqlrun: add test infra to compare results of processors and opera…
Browse files Browse the repository at this point in the history
…tors

Adds test infrastructure that sets up a processor and the corresponding
columnar operator (as well as necessary columnarizers and materializers),
runs both paths, and checks whether the output matches.

Also, adds tests for general sorter and sort chunks using the introduced
infrastructure.

Release note: None
  • Loading branch information
yuzefovich committed Mar 28, 2019
1 parent a7be8dd commit 73eaf9f
Show file tree
Hide file tree
Showing 3 changed files with 309 additions and 0 deletions.
130 changes: 130 additions & 0 deletions pkg/sql/distsqlrun/columnar_operators_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2019 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package distsqlrun

import (
"context"
"math/rand"
"sort"
"testing"

"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

func TestSorterAgainstProcessor(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
evalCtx := tree.MakeTestingEvalContext(st)
defer evalCtx.Stop(context.Background())
rng := rand.New(rand.NewSource(timeutil.Now().UnixNano()))

nRows := 100
maxCols := 5
maxNum := 10
types := make([]sqlbase.ColumnType, maxCols)
for i := range types {
types[i] = sqlbase.IntType
}
for nCols := 1; nCols <= maxCols; nCols++ {
types := types[:nCols]
rows := sqlbase.MakeRandIntRowsInRange(rng, nRows, nCols, maxNum)
// Note: we're only generating column orderings on all nCols columns since
// if there are columns not in the ordering, the results are not fully
// deterministic.
orderingCols := generateColumnOrdering(rng, nCols, nCols)
sorterSpec := &distsqlpb.SorterSpec{
OutputOrdering: distsqlpb.Ordering{Columns: orderingCols},
}
pspec := &distsqlpb.ProcessorSpec{
Input: []distsqlpb.InputSyncSpec{{ColumnTypes: types}},
Core: distsqlpb.ProcessorCoreUnion{Sorter: sorterSpec},
}
if err := verifyColOperator(false, [][]sqlbase.ColumnType{types}, []sqlbase.EncDatumRows{rows}, types, pspec); err != nil {
t.Fatal(err)
}
}
}

func TestSortChunksAgainstProcessor(t *testing.T) {
var da sqlbase.DatumAlloc
st := cluster.MakeTestingClusterSettings()
evalCtx := tree.MakeTestingEvalContext(st)
defer evalCtx.Stop(context.Background())
rng := rand.New(rand.NewSource(timeutil.Now().UnixNano()))

nRows := 100
maxCols := 5
maxNum := 10
types := make([]sqlbase.ColumnType, maxCols)
for i := range types {
types[i] = sqlbase.IntType
}
for nCols := 1; nCols <= maxCols; nCols++ {
types := types[:nCols]
// Note: we're only generating column orderings on all nCols columns since
// if there are columns not in the ordering, the results are not fully
// deterministic.
orderingCols := generateColumnOrdering(rng, nCols, nCols)
for matchLen := 1; matchLen <= nCols; matchLen++ {
rows := sqlbase.MakeRandIntRowsInRange(rng, nRows, nCols, maxNum)
matchedCols := distsqlpb.ConvertToColumnOrdering(distsqlpb.Ordering{Columns: orderingCols[:matchLen]})
// Presort the input on first matchLen columns.
sort.Slice(rows, func(i, j int) bool {
cmp, err := rows[i].Compare(
types,
&da,
matchedCols,
&evalCtx,
rows[j],
)
if err != nil {
t.Fatal(err)
}
return cmp < 0
})

sorterSpec := &distsqlpb.SorterSpec{
OutputOrdering: distsqlpb.Ordering{Columns: orderingCols},
OrderingMatchLen: uint32(matchLen),
}
pspec := &distsqlpb.ProcessorSpec{
Input: []distsqlpb.InputSyncSpec{{ColumnTypes: types}},
Core: distsqlpb.ProcessorCoreUnion{Sorter: sorterSpec},
}
if err := verifyColOperator(false, [][]sqlbase.ColumnType{types}, []sqlbase.EncDatumRows{rows}, types, pspec); err != nil {
t.Fatal(err)
}
}
}
}

// generateColumnOrdering produces a random ordering of nOrderingCols columns
// on a table with nCols columns, so nOrderingCols must be not greater than
// nCols.
func generateColumnOrdering(
rng *rand.Rand, nCols int, nOrderingCols int,
) []distsqlpb.Ordering_Column {
if nOrderingCols > nCols {
panic("nOrderingCols > nCols in generateColumnOrdering")
}
orderingCols := make([]distsqlpb.Ordering_Column, nOrderingCols)
for i, col := range rng.Perm(nCols)[:nOrderingCols] {
orderingCols[i] = distsqlpb.Ordering_Column{ColIdx: uint32(col), Direction: distsqlpb.Ordering_Column_Direction(rng.Intn(2))}
}
return orderingCols
}
166 changes: 166 additions & 0 deletions pkg/sql/distsqlrun/columnar_utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright 2019 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package distsqlrun

import (
"context"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/exec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/pkg/errors"
)

// verifyColOperator passes inputs through both the processor defined by pspec
// and the corresponding columnar operator and verifies that the results match.
//
// anyOrder determines whether the results should be matched in order (when
// anyOrder is false) or as sets (when anyOrder is true).
func verifyColOperator(
anyOrder bool,
inputTypes [][]sqlbase.ColumnType,
inputs []sqlbase.EncDatumRows,
outputTypes []sqlbase.ColumnType,
pspec *distsqlpb.ProcessorSpec,
) error {
ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
tempEngine, err := engine.NewTempEngine(base.DefaultTestTempStorageConfig(st), base.DefaultTestStoreSpec)
if err != nil {
return err
}
defer tempEngine.Close()

evalCtx := tree.MakeTestingEvalContext(st)
defer evalCtx.Stop(ctx)
diskMonitor := makeTestDiskMonitor(ctx, st)
defer diskMonitor.Stop(ctx)
flowCtx := &FlowCtx{
EvalCtx: &evalCtx,
Settings: cluster.MakeTestingClusterSettings(),
TempStorage: tempEngine,
diskMonitor: diskMonitor,
}

inputsProc := make([]RowSource, len(inputs))
inputsColOp := make([]RowSource, len(inputs))
for i, input := range inputs {
inputsProc[i] = NewRepeatableRowSource(inputTypes[i], input)
inputsColOp[i] = NewRepeatableRowSource(inputTypes[i], input)
}

outProc := &RowBuffer{}
proc, err := newProcessor(ctx, flowCtx, 0, &pspec.Core, &pspec.Post, inputsProc, []RowReceiver{outProc}, nil)
if err != nil {
return err
}

columnarizers := make([]exec.Operator, len(inputs))
for i, input := range inputsColOp {
c, err := newColumnarizer(flowCtx, int32(i)+1, input)
if err != nil {
return err
}
columnarizers[i] = c
}

colOp, err := newColOperator(ctx, flowCtx, pspec, columnarizers)
if err != nil {
return err
}

outputToInputColIdx := make([]int, len(outputTypes))
for i := range outputTypes {
outputToInputColIdx[i] = i
}
outColOp := &RowBuffer{}
m, err := newMaterializer(flowCtx, int32(len(inputs))+2, colOp, outputTypes, outputToInputColIdx, &distsqlpb.PostProcessSpec{}, outColOp)
if err != nil {
return err
}

proc.Run(ctx)
m.Run(ctx)

var procRows, colOpRows sqlbase.EncDatumRows
rowCount := 0
for {
rowProc, meta := outProc.Next()
if meta != nil {
return errors.Errorf("unexpected meta %+v from processor", meta)
}
rowColOp, meta := outColOp.Next()
if meta != nil {
return errors.Errorf("unexpected meta %+v from columnar operator", meta)
}

if rowProc != nil && rowColOp == nil {
return errors.Errorf("different results: processor produced a row %s while columnar operator is done", rowProc.String(outputTypes))
}
if rowColOp != nil && rowProc == nil {
return errors.Errorf("different results: columnar operator produced a row %s while processor is done", rowColOp.String(outputTypes))
}
if rowProc == nil && rowColOp == nil {
break
}

if anyOrder {
// We accumulate all the rows to be matched using set comparison when
// both "producers" are done.
procRows = append(procRows, rowProc)
colOpRows = append(colOpRows, rowColOp)
} else {
// anyOrder is false, so the result rows must match in the same order.
expStr := rowProc.String(outputTypes)
retStr := rowColOp.String(outputTypes)
if expStr != retStr {
return errors.Errorf("different results on row %d;\nexpected:\n %s\ngot:\n %s", rowCount, expStr, retStr)
}
}
rowCount++
}

if anyOrder {
used := make([]bool, len(colOpRows))
for i, procRow := range procRows {
rowMatched := false
for j, colOpRow := range colOpRows {
if used[j] {
continue
}
expStr := procRow.String(outputTypes)
retStr := colOpRow.String(outputTypes)
if expStr == retStr {
rowMatched = true
used[j] = true
break
}
}
if !rowMatched {
return errors.Errorf("different results: no match found for row %d of processor output\n"+
"processor output:\n %s\ncolumnar operator output:\n %s", i, procRows.String(outputTypes), colOpRows.String(outputTypes))
}
}
// Note: we do not check whether used is all true here because procRows and
// colOpRows, at this point, must have equal number of rows - if it weren't
// true, an error would have been returned that either of the "producers"
// outputted a row while the other one didn't.
}
return nil
}
13 changes: 13 additions & 0 deletions pkg/sql/sqlbase/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,19 @@ func MakeRandIntRows(rng *rand.Rand, numRows int, numCols int) EncDatumRows {
return rows
}

// MakeRandIntRowsInRange constructs a numRows * numCols table where the values
// are random integers in the range [0, maxNum).
func MakeRandIntRowsInRange(rng *rand.Rand, numRows int, numCols int, maxNum int) EncDatumRows {
rows := make(EncDatumRows, numRows)
for i := range rows {
rows[i] = make(EncDatumRow, numCols)
for j := 0; j < numCols; j++ {
rows[i][j] = IntEncDatum(rng.Intn(maxNum))
}
}
return rows
}

// MakeRepeatedIntRows constructs a numRows x numCols table where blocks of n
// consecutive rows have the same value.
func MakeRepeatedIntRows(n int, numRows int, numCols int) EncDatumRows {
Expand Down

0 comments on commit 73eaf9f

Please sign in to comment.