Skip to content

Commit

Permalink
exec: add null handling to default sorter
Browse files Browse the repository at this point in the history
Addresses part of #36880.

Release note: None
  • Loading branch information
rohany committed Aug 8, 2019
1 parent 3c69b46 commit 3efa0ee
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 44 deletions.
3 changes: 1 addition & 2 deletions pkg/sql/distsqlrun/columnar_operators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ func TestSorterAgainstProcessor(t *testing.T) {
nRows := 100
maxCols := 5
maxNum := 10
// TODO (yuzefovich): change nullProbability to non 0 value.
nullProbability := 0.0
nullProbability := 0.2
typs := make([]types.T, maxCols)
for i := range typs {
typs[i] = *types.Int
Expand Down
15 changes: 15 additions & 0 deletions pkg/sql/exec/coldata/nulls.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,21 @@ func (n *Nulls) UnsetNull64(i uint64) {
n.nulls[i/8] |= bitMask[i%8]
}

// SwapNulls swaps the null values at the argument indices.
// We implement the logic directly on the byte array
// rather than case on the result of NullAt to avoid
// having to take some branches.
func (n *Nulls) SwapNulls(i, j uint64) {
// Get original null values.
ni := (n.nulls[i/8] >> (i % 8)) & 0x1
nj := (n.nulls[j/8] >> (j % 8)) & 0x1
// Write into the correct positions
iMask := bitMask[i%8]
jMask := bitMask[j%8]
n.nulls[i/8] = (n.nulls[i/8] & ^iMask) | (nj << (i % 8))
n.nulls[j/8] = (n.nulls[j/8] & ^jMask) | (ni << (j % 8))
}

// Extend extends the nulls vector with the next toAppend values from src,
// starting at srcStartIdx.
func (n *Nulls) Extend(src *Nulls, destStartIdx uint64, srcStartIdx uint16, toAppend uint16) {
Expand Down
81 changes: 81 additions & 0 deletions pkg/sql/exec/coldata/nulls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,87 @@ func TestUnsetNullRange(t *testing.T) {
}
}

func TestSwapNulls(t *testing.T) {
n := NewNulls(BatchSize)
swapPos := []uint64{0, 1, 63, 64, 65, BatchSize - 1}
idxInSwapPos := func(idx uint64) bool {
for _, p := range swapPos {
if p == idx {
return true
}
}
return false
}

t.Run("TestSwapNullWithNull", func(t *testing.T) {
// Test that swapping null with null doesn't change anything.
for _, p := range swapPos {
n.SetNull64(p)
}
for _, i := range swapPos {
for _, j := range swapPos {
n.SwapNulls(i, j)
for k := uint64(0); k < BatchSize; k++ {
require.Equal(t, idxInSwapPos(k), n.NullAt64(k),
"after swapping NULLS (%d, %d), NullAt(%d) saw %t, expected %t", i, j, k, n.NullAt64(k), idxInSwapPos(k))
}
}
}
})

t.Run("TestSwapNullWithNotNull", func(t *testing.T) {
// Test that swapping null with not null changes things appropriately.
n.UnsetNulls()
swaps := map[uint64]uint64{
0: BatchSize - 1,
1: 62,
2: 3,
63: 65,
68: 120,
}
idxInSwaps := func(idx uint64) bool {
for k, v := range swaps {
if idx == k || idx == v {
return true
}
}
return false
}
for _, j := range swaps {
n.SetNull64(j)
}
for i, j := range swaps {
n.SwapNulls(i, j)
require.Truef(t, n.NullAt64(i), "after swapping not null and null (%d, %d), found null=%t at %d", i, j, n.NullAt64(i), i)
require.Truef(t, !n.NullAt64(j), "after swapping not null and null (%d, %d), found null=%t at %d", i, j, !n.NullAt64(j), j)
for k := uint64(0); k < BatchSize; k++ {
if idxInSwaps(k) {
continue
}
require.Falsef(t, n.NullAt64(k),
"after swapping NULLS (%d, %d), NullAt(%d) saw %t, expected false", i, j, k, n.NullAt64(k))
}
}
})

t.Run("TestSwapNullWithNull", func(t *testing.T) {
// Test that swapping not null with not null doesn't do anything.
n.SetNulls()
for _, p := range swapPos {
n.UnsetNull64(p)
}
for _, i := range swapPos {
for _, j := range swapPos {
n.SwapNulls(i, j)
for k := uint64(0); k < BatchSize; k++ {
require.Equal(t, idxInSwapPos(k), !n.NullAt64(k),
"after swapping NULLS (%d, %d), NullAt(%d) saw %t, expected %t", i, j, k, !n.NullAt64(k), idxInSwapPos(k))
}
}
}
})
}

func TestNullsTruncate(t *testing.T) {
for _, size := range pos {
n := NewNulls(BatchSize)
Expand Down
34 changes: 22 additions & 12 deletions pkg/sql/exec/execgen/cmd/execgen/sort_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@ type sortOverload struct {
*overload
Dir string
DirString string
Nulls bool
}

type sortOverloads struct {
LTyp types.T
Overloads []sortOverload
}

// sortOverloads maps type to distsqlpb.Ordering_Column_Direction to overload.
var typesToSortOverloads map[types.T]sortOverloads
// typesToSortOverloads maps types to whether nulls are handled to
// the overload representing the sort direction.
var typesToSortOverloads map[types.T]map[bool]sortOverloads

func genSortOps(wr io.Writer) error {
d, err := ioutil.ReadFile("pkg/sql/exec/sort_tmpl.go")
Expand All @@ -45,11 +47,13 @@ func genSortOps(wr io.Writer) error {

// Replace the template variables.
s = strings.Replace(s, "_GOTYPESLICE", "{{.LTyp.GoTypeSliceName}}", -1)
s = strings.Replace(s, "_TYPES_T", "types.{{.LTyp}}", -1)
s = strings.Replace(s, "_TYPE", "{{.LTyp}}", -1)
s = strings.Replace(s, "_TYPES_T", "types.{{$typ}}", -1)
s = strings.Replace(s, "_TYPE", "{{$typ}}", -1)
s = strings.Replace(s, "_DIR_ENUM", "{{.Dir}}", -1)
s = strings.Replace(s, "_DIR", "{{.DirString}}", -1)
s = strings.Replace(s, "_TemplateType", "{{.LTyp}}", -1)
s = strings.Replace(s, "_ISNULL", "{{$isNull}}", -1)
s = strings.Replace(s, "_HANDLES_NULLS", "{{if .Nulls}}WithNulls{{else}}{{end}}", -1)

assignLtRe := regexp.MustCompile(`_ASSIGN_LT\((.*),(.*),(.*)\)`)
s = assignLtRe.ReplaceAllString(s, "{{.Assign $1 $2 $3}}")
Expand All @@ -76,6 +80,7 @@ func genQuickSortOps(wr io.Writer) error {
// Replace the template variables.
s = strings.Replace(s, "_TYPE", "{{.LTyp}}", -1)
s = strings.Replace(s, "_DIR", "{{.DirString}}", -1)
s = strings.Replace(s, "_HANDLES_NULLS", "{{if .Nulls}}WithNulls{{else}}{{end}}", -1)

// Now, generate the op, from the template.
tmpl, err := template.New("quicksort").Parse(s)
Expand All @@ -89,17 +94,22 @@ func genQuickSortOps(wr io.Writer) error {
func init() {
registerGenerator(genSortOps, "sort.eg.go")
registerGenerator(genQuickSortOps, "quicksort.eg.go")
typesToSortOverloads = make(map[types.T]sortOverloads)
typesToSortOverloads = make(map[types.T]map[bool]sortOverloads)
for _, o := range comparisonOpToOverloads[tree.LT] {
typesToSortOverloads[o.LTyp] = sortOverloads{
LTyp: o.LTyp,
Overloads: []sortOverload{
{overload: o, Dir: "distsqlpb.Ordering_Column_ASC", DirString: "Asc"},
{}},
typesToSortOverloads[o.LTyp] = make(map[bool]sortOverloads)
for _, b := range []bool{true, false} {
typesToSortOverloads[o.LTyp][b] = sortOverloads{
LTyp: o.LTyp,
Overloads: []sortOverload{
{overload: o, Dir: "distsqlpb.Ordering_Column_ASC", DirString: "Asc", Nulls: b},
{}},
}
}
}
for _, o := range comparisonOpToOverloads[tree.GT] {
typesToSortOverloads[o.LTyp].Overloads[1] = sortOverload{
overload: o, Dir: "distsqlpb.Ordering_Column_DESC", DirString: "Desc"}
for _, b := range []bool{true, false} {
typesToSortOverloads[o.LTyp][b].Overloads[1] = sortOverload{
overload: o, Dir: "distsqlpb.Ordering_Column_DESC", DirString: "Desc", Nulls: b}
}
}
}
16 changes: 9 additions & 7 deletions pkg/sql/exec/quicksort_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ func maxDepth(n int) int {
}

// {{range .}} {{/* for each type */}}
// {{range .}} {{/* for null vs non null */}}
// {{range .Overloads}} {{/* for each direction */}}

// Insertion sort
func (p *sort_TYPE_DIROp) insertionSort(a, b int) {
func (p *sort_TYPE_DIR_HANDLES_NULLSOp) insertionSort(a, b int) {
for i := a + 1; i < b; i++ {
for j := i; j > a && p.Less(j, j-1); j-- {
p.Swap(j, j-1)
Expand All @@ -50,7 +51,7 @@ func (p *sort_TYPE_DIROp) insertionSort(a, b int) {

// siftDown implements the heap property on data[lo, hi).
// first is an offset into the array where the root of the heap lies.
func (p *sort_TYPE_DIROp) siftDown(lo, hi, first int) {
func (p *sort_TYPE_DIR_HANDLES_NULLSOp) siftDown(lo, hi, first int) {
root := lo
for {
child := 2*root + 1
Expand All @@ -68,7 +69,7 @@ func (p *sort_TYPE_DIROp) siftDown(lo, hi, first int) {
}
}

func (p *sort_TYPE_DIROp) heapSort(ctx context.Context, a, b int) {
func (p *sort_TYPE_DIR_HANDLES_NULLSOp) heapSort(ctx context.Context, a, b int) {
first := a
lo := 0
hi := b - a
Expand All @@ -90,7 +91,7 @@ func (p *sort_TYPE_DIROp) heapSort(ctx context.Context, a, b int) {
// ``Engineering a Sort Function,'' SP&E November 1993.

// medianOfThree moves the median of the three values data[m0], data[m1], data[m2] into data[m1].
func (p *sort_TYPE_DIROp) medianOfThree(m1, m0, m2 int) {
func (p *sort_TYPE_DIR_HANDLES_NULLSOp) medianOfThree(m1, m0, m2 int) {
// sort 3 elements
if p.Less(m1, m0) {
p.Swap(m1, m0)
Expand All @@ -106,13 +107,13 @@ func (p *sort_TYPE_DIROp) medianOfThree(m1, m0, m2 int) {
// now data[m0] <= data[m1] <= data[m2]
}

func (p *sort_TYPE_DIROp) swapRange(a, b, n int) {
func (p *sort_TYPE_DIR_HANDLES_NULLSOp) swapRange(a, b, n int) {
for i := 0; i < n; i++ {
p.Swap(a+i, b+i)
}
}

func (p *sort_TYPE_DIROp) doPivot(lo, hi int) (midlo, midhi int) {
func (p *sort_TYPE_DIR_HANDLES_NULLSOp) doPivot(lo, hi int) (midlo, midhi int) {
m := int(uint(lo+hi) >> 1) // Written like this to avoid integer overflow.
if hi-lo > 40 {
// Tukey's ``Ninther,'' median of three medians of three.
Expand Down Expand Up @@ -199,7 +200,7 @@ func (p *sort_TYPE_DIROp) doPivot(lo, hi int) (midlo, midhi int) {
return b - 1, c
}

func (p *sort_TYPE_DIROp) quickSort(ctx context.Context, a, b, maxDepth int) {
func (p *sort_TYPE_DIR_HANDLES_NULLSOp) quickSort(ctx context.Context, a, b, maxDepth int) {
for b-a > 12 { // Use ShellSort for slices <= 12 elements
if maxDepth == 0 {
p.heapSort(ctx, a, b)
Expand Down Expand Up @@ -232,3 +233,4 @@ func (p *sort_TYPE_DIROp) quickSort(ctx context.Context, a, b, maxDepth int) {

// {{end}}
// {{end}}
// {{end}}
18 changes: 11 additions & 7 deletions pkg/sql/exec/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/distsqlpb"
"github.com/cockroachdb/cockroach/pkg/sql/exec/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/exec/types"
"github.com/pkg/errors"
)

// NewSorter returns a new sort operator, which sorts its input on the columns
Expand All @@ -31,14 +32,12 @@ func NewSorter(
func newSorter(
input spooler, inputTypes []types.T, orderingCols []distsqlpb.Ordering_Column,
) (resettableOperator, error) {
sorters := make([]colSorter, len(orderingCols))
partitioners := make([]partitioner, len(orderingCols)-1)

var err error
for i, ord := range orderingCols {
sorters[i], err = newSingleSorter(inputTypes[ord.ColIdx], ord.Direction)
if err != nil {
return nil, err
if !isSorterSupported(inputTypes[ord.ColIdx], ord.Direction) {
return nil, errors.Errorf("sorter for type: %s and direction: %s not supported", inputTypes[ord.ColIdx], ord.Direction)
}
if i < len(orderingCols)-1 {
partitioners[i], err = newPartitioner(inputTypes[ord.ColIdx])
Expand All @@ -51,7 +50,7 @@ func newSorter(
return &sortOp{
input: input,
inputTypes: inputTypes,
sorters: sorters,
sorters: make([]colSorter, len(orderingCols)),
partitioners: partitioners,
orderingCols: orderingCols,
state: sortSpooling,
Expand Down Expand Up @@ -170,7 +169,10 @@ type sortOp struct {
// orderingCols is the ordered list of column orderings that the sorter should
// sort on.
orderingCols []distsqlpb.Ordering_Column
// sorters contains one colSorter per sort column.
// sorters contains one colSorter per sort column. The instantiation of
// sorters occurs within the sort method rather than during construction
// of the sortOp so that we can correctly choose a sorter based on
// whether the input has nulls or not.
sorters []colSorter
// partitioners contains one partitioner per sort column except for the last,
// which doesn't need to be partitioned.
Expand Down Expand Up @@ -283,7 +285,9 @@ func (p *sortOp) sort(ctx context.Context) {
}

for i := range p.orderingCols {
p.sorters[i].init(p.input.getValues(int(p.orderingCols[i].ColIdx)), p.order)
inputVec := p.input.getValues(int(p.orderingCols[i].ColIdx))
p.sorters[i] = newSingleSorter(p.inputTypes[p.orderingCols[i].ColIdx], p.orderingCols[i].Direction, inputVec.MaybeHasNulls())
p.sorters[i].init(inputVec, p.order)
}

// Now, sort each column in turn.
Expand Down
49 changes: 48 additions & 1 deletion pkg/sql/exec/sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,30 @@ func TestSort(t *testing.T) {
ordCols []distsqlpb.Ordering_Column
typ []types.T
}{
{
tuples: tuples{{1}, {2}, {nil}, {4}, {5}, {nil}},
expected: tuples{{nil}, {nil}, {1}, {2}, {4}, {5}},
typ: []types.T{types.Int64},
ordCols: []distsqlpb.Ordering_Column{{ColIdx: 0}},
},
{
tuples: tuples{{1, 2}, {1, 1}, {1, nil}, {2, nil}, {2, 3}, {2, nil}, {5, 1}},
expected: tuples{{1, nil}, {1, 1}, {1, 2}, {2, nil}, {2, nil}, {2, 3}, {5, 1}},
typ: []types.T{types.Int64, types.Int64},
ordCols: []distsqlpb.Ordering_Column{{ColIdx: 0}, {ColIdx: 1}},
},
{
tuples: tuples{{1, 2}, {1, 1}, {1, nil}, {2, nil}, {2, 3}, {2, nil}, {5, 1}},
expected: tuples{{5, 1}, {2, 3}, {2, nil}, {2, nil}, {1, 2}, {1, 1}, {1, nil}},
typ: []types.T{types.Int64, types.Int64},
ordCols: []distsqlpb.Ordering_Column{{ColIdx: 0, Direction: distsqlpb.Ordering_Column_DESC}, {ColIdx: 1, Direction: distsqlpb.Ordering_Column_DESC}},
},
{
tuples: tuples{{nil, nil}, {nil, 3}, {1, nil}, {nil, 1}, {1, 2}, {nil, nil}, {5, nil}},
expected: tuples{{nil, nil}, {nil, nil}, {nil, 1}, {nil, 3}, {1, nil}, {1, 2}, {5, nil}},
typ: []types.T{types.Int64, types.Int64},
ordCols: []distsqlpb.Ordering_Column{{ColIdx: 0}, {ColIdx: 1}},
},
{
tuples: tuples{{1}, {2}, {3}, {4}, {5}, {6}, {7}},
expected: tuples{{1}, {2}, {3}, {4}, {5}, {6}, {7}},
Expand Down Expand Up @@ -145,7 +169,11 @@ func TestSortRandomized(t *testing.T) {
tups[i] = make(tuple, nCols)
for j := range tups[i] {
// Small range so we can test partitioning
tups[i][j] = rng.Int63() % 2048
if rng.Float64() < nullProbability {
tups[i][j] = nil
} else {
tups[i][j] = rng.Int63() % 2048
}
}
// Enforce that the last ordering column is always unique. Otherwise
// there would be multiple valid sort orders.
Expand Down Expand Up @@ -337,6 +365,25 @@ func BenchmarkAllSpooler(b *testing.B) {
func less(tuples tuples, ordCols []distsqlpb.Ordering_Column) func(i, j int) bool {
return func(i, j int) bool {
for _, col := range ordCols {
n1 := tuples[i][col.ColIdx] == nil
n2 := tuples[j][col.ColIdx] == nil
if col.Direction == distsqlpb.Ordering_Column_ASC {
if n1 && n2 {
continue
} else if n1 {
return true
} else if n2 {
return false
}
} else {
if n1 && n2 {
continue
} else if n1 {
return false
} else if n2 {
return true
}
}
if tuples[i][col.ColIdx].(int64) < tuples[j][col.ColIdx].(int64) {
return col.Direction == distsqlpb.Ordering_Column_ASC
} else if tuples[i][col.ColIdx].(int64) > tuples[j][col.ColIdx].(int64) {
Expand Down
Loading

0 comments on commit 3efa0ee

Please sign in to comment.