Skip to content

Commit

Permalink
sql: Produce family specific spans for point lookups in index+lookup
Browse files Browse the repository at this point in the history
joiners

When only specific families are needed for a query, point lookups
in joiners can issue family specific spans to retrieve only
the data that they need, instead of querying for an entire row.

Release note: None
  • Loading branch information
rohany committed Jul 9, 2019
1 parent 5d37afc commit 14390c7
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 46 deletions.
22 changes: 21 additions & 1 deletion pkg/sql/distsqlrun/indexjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ type indexJoiner struct {
// spans is the batch of spans we will next retrieve from the index.
spans roachpb.Spans

// neededFamilies maintains what families we need to query from if our
// needed columns span multiple queries
neededFamilies []sqlbase.FamilyID

alloc sqlbase.DatumAlloc
}

Expand Down Expand Up @@ -117,6 +121,12 @@ func newIndexJoiner(
ij.fetcher = &rowFetcherWrapper{Fetcher: &fetcher}
}

ij.neededFamilies = sqlbase.NeededColumnFamilyIDs(
spec.Table.ColumnIdxMap(),
spec.Table.Families,
ij.out.neededColumns(),
)

return ij, nil
}

Expand Down Expand Up @@ -148,7 +158,7 @@ func (ij *indexJoiner) Next() (sqlbase.EncDatumRow, *distsqlpb.ProducerMetadata)
ij.MoveToDraining(err)
return nil, ij.DrainHelper()
}
ij.spans = append(ij.spans, span)
ij.spans = append(ij.spans, ij.maybeSplitSpanIntoSeparateFamilies(span)...)
}
if len(ij.spans) == 0 {
// All done.
Expand Down Expand Up @@ -203,6 +213,16 @@ func (ij *indexJoiner) generateSpan(row sqlbase.EncDatumRow) (roachpb.Span, erro
&ij.desc.PrimaryIndex, &ij.alloc)
}

func (ij *indexJoiner) maybeSplitSpanIntoSeparateFamilies(span roachpb.Span) roachpb.Spans {
// we are always looking up a single row, because we are always
// looking up a full primary key
if len(ij.neededFamilies) > 0 &&
len(ij.neededFamilies) < len(ij.desc.Families) {
return sqlbase.SplitSpanIntoSeparateFamilies(span, ij.neededFamilies)
}
return roachpb.Spans{span}
}

// outputStatsToTrace outputs the collected indexJoiner stats to the trace. Will
// fail silently if the indexJoiner is not collecting stats.
func (ij *indexJoiner) outputStatsToTrace() {
Expand Down
32 changes: 31 additions & 1 deletion pkg/sql/distsqlrun/indexjoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@ func TestIndexJoiner(t *testing.T) {
99,
sqlutils.ToRowFn(aFn, bFn, sumFn, sqlutils.RowEnglishFn))

sqlutils.CreateTable(t, sqlDB, "t2",
"a INT, b INT, sum INT, s STRING, PRIMARY KEY (a,b), FAMILY f1 (a, b), FAMILY f2 (s), FAMILY f3 (sum), INDEX bs (b,s)",
99,
sqlutils.ToRowFn(aFn, bFn, sumFn, sqlutils.RowEnglishFn))

td := sqlbase.GetTableDescriptor(kvDB, "test", "t")
tdf := sqlbase.GetTableDescriptor(kvDB, "test", "t2")

v := [10]sqlbase.EncDatum{}
for i := range v {
Expand All @@ -61,13 +67,15 @@ func TestIndexJoiner(t *testing.T) {

testCases := []struct {
description string
desc *sqlbase.TableDescriptor
post distsqlpb.PostProcessSpec
input sqlbase.EncDatumRows
outputTypes []types.T
expected sqlbase.EncDatumRows
}{
{
description: "Test selecting rows using the primary index",
desc: td,
post: distsqlpb.PostProcessSpec{
Projection: true,
OutputColumns: []uint32{0, 1, 2},
Expand All @@ -88,6 +96,7 @@ func TestIndexJoiner(t *testing.T) {
},
{
description: "Test a filter in the post process spec and using a secondary index",
desc: td,
post: distsqlpb.PostProcessSpec{
Filter: distsqlpb.Expression{Expr: "@3 <= 5"}, // sum <= 5
Projection: true,
Expand All @@ -112,12 +121,33 @@ func TestIndexJoiner(t *testing.T) {
{sqlbase.StrEncDatum("five-zero")},
},
},
{
description: "Test selecting rows using the primary index with multiple family spans",
desc: tdf,
post: distsqlpb.PostProcessSpec{
Projection: true,
OutputColumns: []uint32{0, 1, 2},
},
input: sqlbase.EncDatumRows{
{v[0], v[2]},
{v[0], v[5]},
{v[1], v[0]},
{v[1], v[5]},
},
outputTypes: sqlbase.ThreeIntCols,
expected: sqlbase.EncDatumRows{
{v[0], v[2], v[2]},
{v[0], v[5], v[5]},
{v[1], v[0], v[1]},
{v[1], v[5], v[6]},
},
},
}

for _, c := range testCases {
t.Run(c.description, func(t *testing.T) {
spec := distsqlpb.JoinReaderSpec{
Table: *td,
Table: *c.desc,
IndexIdx: 0,
}
txn := client.NewTxn(context.Background(), s.DB(), s.NodeID(), client.RootTxn)
Expand Down
27 changes: 26 additions & 1 deletion pkg/sql/distsqlrun/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ type joinReader struct {
finalLookupBatch bool
toEmit sqlbase.EncDatumRows

// neededFamilies maintains what families we need to query from if our
// needed columns span multiple queries
neededFamilies []sqlbase.FamilyID

// A few scratch buffers, to avoid re-allocating.
lookupRows []lookupRow
indexKeyRow sqlbase.EncDatumRow
Expand Down Expand Up @@ -207,6 +211,12 @@ func newJoinReader(

jr.indexKeyPrefix = sqlbase.MakeIndexKeyPrefix(&jr.desc, jr.index.ID)

jr.neededFamilies = sqlbase.NeededColumnFamilyIDs(
spec.Table.ColumnIdxMap(),
spec.Table.Families,
jr.neededRightCols(),
)

// TODO(radu): verify the input types match the index key types
return jr, nil
}
Expand Down Expand Up @@ -277,6 +287,21 @@ func (jr *joinReader) generateSpan(row sqlbase.EncDatumRow) (roachpb.Span, error
jr.index, &jr.alloc)
}

func (jr *joinReader) maybeSplitSpanIntoSeparateFamilies(span roachpb.Span) roachpb.Spans {
// check the following:
// - we have more than one needed family
// - we are looking at a unique index
// - our table has more than the default family
// - we have all the columns of the index
if len(jr.neededFamilies) > 0 &&
jr.index.Unique &&
len(jr.lookupCols) == len(jr.index.ColumnIDs) &&
len(jr.neededFamilies) < len(jr.desc.Families) {
return sqlbase.SplitSpanIntoSeparateFamilies(span, jr.neededFamilies)
}
return roachpb.Spans{span}
}

// Next is part of the RowSource interface.
func (jr *joinReader) Next() (sqlbase.EncDatumRow, *distsqlpb.ProducerMetadata) {
// The lookup join is implemented as follows:
Expand Down Expand Up @@ -367,7 +392,7 @@ func (jr *joinReader) readInput() (joinReaderState, *distsqlpb.ProducerMetadata)
}
inputRowIndices := jr.keyToInputRowIndices[string(span.Key)]
if inputRowIndices == nil {
spans = append(spans, span)
spans = append(spans, jr.maybeSplitSpanIntoSeparateFamilies(span)...)
}
jr.keyToInputRowIndices[string(span.Key)] = append(inputRowIndices, i)
}
Expand Down
18 changes: 17 additions & 1 deletion pkg/sql/distsqlrun/joinreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,23 @@ func TestJoinReader(t *testing.T) {
outputTypes: sqlbase.ThreeIntCols,
expected: "[[0 2 2] [0 2 2] [0 5 5] [1 0 0] [1 5 5]]",
},
{
description: "Test lookup join queries with separate families",
post: distsqlpb.PostProcessSpec{
Projection: true,
OutputColumns: []uint32{0, 1, 3, 4},
},
input: [][]tree.Datum{
{aFn(2), bFn(2)},
{aFn(5), bFn(5)},
{aFn(10), bFn(10)},
{aFn(15), bFn(15)},
},
lookupCols: []uint32{0, 1},
inputTypes: sqlbase.TwoIntCols,
outputTypes: sqlbase.FourIntCols,
expected: "[[0 2 2 2] [0 5 5 5] [1 0 0 1] [1 5 5 6]]",
},
{
description: "Test lookup joins preserve order of left input",
post: distsqlpb.PostProcessSpec{
Expand Down Expand Up @@ -404,7 +421,6 @@ func TestJoinReader(t *testing.T) {
Settings: st,
txn: client.NewTxn(ctx, s.DB(), s.NodeID(), client.RootTxn),
}

encRows := make(sqlbase.EncDatumRows, len(c.input))
for rowIdx, row := range c.input {
encRow := make(sqlbase.EncDatumRow, len(row))
Expand Down
72 changes: 72 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/lookup_join
Original file line number Diff line number Diff line change
Expand Up @@ -561,3 +561,75 @@ SELECT a,b from small WHERE a+b<20 AND EXISTS (SELECT a FROM data WHERE small.a=
4 8
5 10
6 12

# The following tests check that if the joiners can separate a row request
# into separate families that it does, and generates spans for each family
# instead of reading the entire row when it doesn't need to.

statement ok
CREATE TABLE family_split_1 (x INT, PRIMARY KEY (x))

statement ok
INSERT INTO family_split_1 VALUES (1)

statement ok
CREATE TABLE family_split_2 (x INT, y INT, z INT, PRIMARY KEY (x), FAMILY f1 (x), FAMILY f2 (y), FAMILY f3 (z))

statement ok
INSERT INTO family_split_2 VALUES (1, 2, 3)

statement ok
SET tracing = on; SELECT family_split_2.x, family_split_2.z FROM family_split_1 INNER LOOKUP JOIN family_split_2 ON family_split_1.x = family_split_2.x; SET tracing = off

query T
SELECT message FROM [SHOW TRACE FOR SESSION] WHERE message LIKE 'Scan /Table/70/1/1/{0-1}, /Table/70/1/1/2/{1-2}'
----
Scan /Table/70/1/1/{0-1}, /Table/70/1/1/2/{1-2}

statement ok
CREATE TABLE family_index_join (x INT PRIMARY KEY, y INT, z INT, w INT, INDEX (y), FAMILY f1 (x), FAMILY f2 (y), FAMILY f3 (z), FAMILY f4(w))

statement ok
INSERT INTO family_index_join VALUES (1, 2, 3, 4)

statement ok
SET tracing = on

query II
SELECT y,w FROM family_index_join@family_index_join_y_idx WHERE y = 2
----
2 4

statement ok
SET tracing = off

query T
SELECT message FROM [SHOW TRACE FOR SESSION] WHERE message LIKE 'Scan /Table/71/1/1/1/{1-2}, /Table/71/1/1/3/{1-2}'
----
Scan /Table/71/1/1/1/{1-2}, /Table/71/1/1/3/{1-2}

# Test generating tighter spans on interleaved tables.
statement ok
CREATE TABLE family_interleave_1 (x INT, y INT, z INT, PRIMARY KEY (x), FAMILY f1 (x), FAMILY f2 (y), FAMILY f3 (z))

statement ok
CREATE TABLE family_interleave_2 (x INT, y INT, PRIMARY KEY (x, y)) INTERLEAVE IN PARENT family_interleave_1 (x)

statement ok
INSERT INTO family_interleave_1 VALUES (1, 2, 3)

statement ok
INSERT INTO family_interleave_2 VALUES (1, 3)

statement ok
SET TRACING = on

query II
SELECT family_interleave_1.x, family_interleave_1.z FROM family_interleave_2 INNER LOOKUP JOIN family_interleave_1 ON family_interleave_1.x = family_interleave_2.x
----
1 3

query T
SELECT message FROM [SHOW TRACE FOR SESSION] WHERE message LIKE 'Scan /Table/72/1/1/{0-1}, /Table/72/1/1/2/{1-2}'
----
Scan /Table/72/1/1/{0-1}, /Table/72/1/1/2/{1-2}
44 changes: 2 additions & 42 deletions pkg/sql/opt_index_selection.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"fmt"
"sort"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/opt"
"github.com/cockroachdb/cockroach/pkg/sql/opt/constraint"
Expand Down Expand Up @@ -681,23 +680,9 @@ func appendSpansFromConstraintSpan(
len(tableDesc.Families) > 1 &&
cs.StartKey().Length() == len(tableDesc.PrimaryIndex.ColumnIDs) &&
s.Key.Equal(s.EndKey) {
neededFamilyIDs := neededColumnFamilyIDs(tableDesc, needed)
neededFamilyIDs := sqlbase.NeededColumnFamilyIDs(tableDesc.ColumnIdxMap(), tableDesc.Families, needed)
if len(neededFamilyIDs) < len(tableDesc.Families) {
for i, familyID := range neededFamilyIDs {
var span roachpb.Span
span.Key = make(roachpb.Key, len(s.Key))
copy(span.Key, s.Key)
span.Key = keys.MakeFamilyKey(span.Key, uint32(familyID))
span.EndKey = span.Key.PrefixEnd()
if i > 0 && familyID == neededFamilyIDs[i-1]+1 {
// This column family is adjacent to the previous one. We can merge
// the two spans into one.
spans[len(spans)-1].EndKey = span.EndKey
} else {
spans = append(spans, span)
}
}
return spans, nil
return sqlbase.SplitSpanIntoSeparateFamilies(s, neededFamilyIDs), nil
}
}

Expand All @@ -711,28 +696,3 @@ func appendSpansFromConstraintSpan(
}
return append(spans, s), nil
}

func neededColumnFamilyIDs(
tableDesc *sqlbase.ImmutableTableDescriptor, neededCols exec.ColumnOrdinalSet,
) []sqlbase.FamilyID {
colIdxMap := tableDesc.ColumnIdxMap()

var needed []sqlbase.FamilyID
for i := range tableDesc.Families {
family := &tableDesc.Families[i]
for _, columnID := range family.ColumnIDs {
columnOrdinal := colIdxMap[columnID]
if neededCols.Contains(columnOrdinal) {
needed = append(needed, family.ID)
break
}
}
}

// TODO(solon): There is a further optimization possible here: if there is at
// least one non-nullable column in the needed column families, we can
// potentially omit the primary family, since the primary keys are encoded
// in all families. (Note that composite datums are an exception.)

return needed
}
Loading

0 comments on commit 14390c7

Please sign in to comment.