Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rowexec: change lookup join batch size to be specified in bytes #48058

Merged
merged 2 commits into from
May 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@ import (
"github.com/opentracing/opentracing-go"
)

// TODO(radu): we currently create one batch at a time and run the KV operations
// on this node. In the future we may want to build separate batches for the
// nodes that "own" the respective ranges, and send out flows on those nodes.
const joinReaderBatchSize = 100

// joinReaderState represents the state of the processor.
type joinReaderState int

Expand Down Expand Up @@ -82,7 +77,8 @@ type joinReader struct {
lookupCols []uint32

// Batch size for fetches. Not a constant so we can lower for testing.
batchSize int
batchSizeBytes int64
curBatchSizeBytes int64

// State variables for each batch of input rows.
scratchInputRows sqlbase.EncDatumRows
Expand All @@ -109,7 +105,6 @@ func newJoinReader(
input: input,
inputTypes: input.OutputTypes(),
lookupCols: spec.LookupColumns,
batchSize: joinReaderBatchSize,
}

var err error
Expand Down Expand Up @@ -183,6 +178,7 @@ func newJoinReader(
}

jr.initJoinReaderStrategy(flowCtx, jr.desc.ColumnTypesWithMutations(returnMutations), len(columnIDs), spec.MaintainOrdering)
jr.batchSizeBytes = jr.strategy.getLookupRowsBatchSizeHint()

// TODO(radu): verify the input types match the index key types
return jr, nil
Expand Down Expand Up @@ -259,9 +255,9 @@ func getIndexColSet(
return cols
}

// SetBatchSize sets the desired batch size. It should only be used in tests.
func (jr *joinReader) SetBatchSize(batchSize int) {
jr.batchSize = batchSize
// SetBatchSizeBytes sets the desired batch size. It should only be used in tests.
func (jr *joinReader) SetBatchSizeBytes(batchSize int64) {
jr.batchSizeBytes = batchSize
}

// Spilled returns whether the joinReader spilled to disk.
Expand Down Expand Up @@ -331,7 +327,7 @@ func (jr *joinReader) Next() (sqlbase.EncDatumRow, *execinfrapb.ProducerMetadata
// readInput reads the next batch of input rows and starts an index scan.
func (jr *joinReader) readInput() (joinReaderState, *execinfrapb.ProducerMetadata) {
// Read the next batch of input rows.
for len(jr.scratchInputRows) < jr.batchSize {
for jr.curBatchSizeBytes < jr.batchSizeBytes {
row, meta := jr.input.Next()
if meta != nil {
if meta.Err != nil {
Expand All @@ -343,6 +339,7 @@ func (jr *joinReader) readInput() (joinReaderState, *execinfrapb.ProducerMetadat
if row == nil {
break
}
jr.curBatchSizeBytes += int64(row.Size())
jr.scratchInputRows = append(jr.scratchInputRows, jr.rowAlloc.CopyRow(row))
}

Expand All @@ -360,6 +357,7 @@ func (jr *joinReader) readInput() (joinReaderState, *execinfrapb.ProducerMetadat
return jrStateUnknown, jr.DrainHelper()
}
jr.scratchInputRows = jr.scratchInputRows[:0]
jr.curBatchSizeBytes = 0
if len(spans) == 0 {
// All of the input rows were filtered out. Skip the index lookup.
return jrEmittingRows, nil
Expand Down
28 changes: 24 additions & 4 deletions pkg/sql/rowexec/joinreader_strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type defaultSpanGenerator struct {

indexKeyRow sqlbase.EncDatumRow
keyToInputRowIndices map[string][]int

scratchSpans roachpb.Spans
}

// Generate spans for a given row.
Expand Down Expand Up @@ -69,7 +71,7 @@ func (g *defaultSpanGenerator) generateSpans(rows []sqlbase.EncDatumRow) (roachp
}
// We maintain a map from index key to the corresponding input rows so we can
// join the index results to the inputs.
var spans roachpb.Spans
g.scratchSpans = g.scratchSpans[:0]
for i, inputRow := range rows {
if g.hasNullLookupColumn(inputRow) {
continue
Expand All @@ -80,15 +82,18 @@ func (g *defaultSpanGenerator) generateSpans(rows []sqlbase.EncDatumRow) (roachp
}
inputRowIndices := g.keyToInputRowIndices[string(generatedSpan.Key)]
if inputRowIndices == nil {
spans = g.spanBuilder.MaybeSplitSpanIntoSeparateFamilies(
spans, generatedSpan, len(g.lookupCols), containsNull)
g.scratchSpans = g.spanBuilder.MaybeSplitSpanIntoSeparateFamilies(
g.scratchSpans, generatedSpan, len(g.lookupCols), containsNull)
}
g.keyToInputRowIndices[string(generatedSpan.Key)] = append(inputRowIndices, i)
}
return spans, nil
return g.scratchSpans, nil
}

type joinReaderStrategy interface {
// getLookupRowsBatchSizeHint returns the size in bytes of the batch of lookup
// rows.
getLookupRowsBatchSizeHint() int64
// processLookupRows consumes the rows the joinReader has buffered and should
// return the lookup spans.
processLookupRows(rows []sqlbase.EncDatumRow) (roachpb.Spans, error)
Expand Down Expand Up @@ -141,6 +146,15 @@ type joinReaderNoOrderingStrategy struct {
}
}

// getLookupRowsBatchSizeHint returns the batch size for the join reader no
// ordering strategy. This number was chosen by running TPCH queries 7, 9, 10,
// and 11 with varying batch sizes and choosing the smallest batch size that
// offered a significant performance improvement. Larger batch sizes offered
// small to no marginal improvements.
func (s *joinReaderNoOrderingStrategy) getLookupRowsBatchSizeHint() int64 {
return 2 << 20 /* 2 MiB */
}

func (s *joinReaderNoOrderingStrategy) processLookupRows(
rows []sqlbase.EncDatumRow,
) (roachpb.Spans, error) {
Expand Down Expand Up @@ -294,6 +308,12 @@ type joinReaderOrderingStrategy struct {
}
}

func (s *joinReaderOrderingStrategy) getLookupRowsBatchSizeHint() int64 {
// TODO(asubiotto): Eventually we might want to adjust this batch size
// dynamically based on whether the result row container spilled or not.
return 10 << 10 /* 10 KiB */
}

func (s *joinReaderOrderingStrategy) processLookupRows(
rows []sqlbase.EncDatumRow,
) (roachpb.Spans, error) {
Expand Down
31 changes: 19 additions & 12 deletions pkg/sql/rowexec/joinreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ func TestJoinReader(t *testing.T) {
}

// Set a lower batch size to force multiple batches.
jr.(*joinReader).SetBatchSize(3 /* batchSize */)
jr.(*joinReader).SetBatchSizeBytes(int64(encRows[0].Size() * 3))

jr.Run(ctx)

Expand Down Expand Up @@ -742,33 +742,40 @@ func BenchmarkJoinReader(b *testing.B) {
b.Skip()
}

// Create an *on-disk* store spec for the primary store and temp engine to
// reflect the real costs of lookups and spilling.
primaryStoragePath, cleanupPrimaryDir := testutils.TempDir(b)
defer cleanupPrimaryDir()
storeSpec, err := base.NewStoreSpec(fmt.Sprintf("path=%s", primaryStoragePath))
require.NoError(b, err)

var (
logScope = log.Scope(b)
ctx = context.Background()
s, sqlDB, kvDB = serverutils.StartServer(b, base.TestServerArgs{})
st = s.ClusterSettings()
evalCtx = tree.MakeTestingEvalContext(st)
diskMonitor = execinfra.NewTestDiskMonitor(ctx, st)
flowCtx = execinfra.FlowCtx{
s, sqlDB, kvDB = serverutils.StartServer(b, base.TestServerArgs{
StoreSpecs: []base.StoreSpec{storeSpec},
})
st = s.ClusterSettings()
evalCtx = tree.MakeTestingEvalContext(st)
diskMonitor = execinfra.NewTestDiskMonitor(ctx, st)
flowCtx = execinfra.FlowCtx{
EvalCtx: &evalCtx,
Cfg: &execinfra.ServerConfig{
DiskMonitor: diskMonitor,
Settings: st,
},
}
path, cleanupTempDir = testutils.TempDir(b)
)
defer logScope.Close(b)
defer s.Stopper().Stop(ctx)
defer evalCtx.Stop(ctx)
defer diskMonitor.Stop(ctx)
defer cleanupTempDir()

// Create an *on-disk* temp engine for benchmark iterations that spill to
// disk.
storeSpec, err := base.NewStoreSpec(fmt.Sprintf("path=%s", path))
tempStoragePath, cleanupTempDir := testutils.TempDir(b)
defer cleanupTempDir()
tempStoreSpec, err := base.NewStoreSpec(fmt.Sprintf("path=%s", tempStoragePath))
require.NoError(b, err)
tempEngine, _, err := storage.NewTempEngine(ctx, storage.DefaultStorageEngine, base.TempStorageConfig{Path: path, Mon: diskMonitor}, storeSpec)
tempEngine, _, err := storage.NewTempEngine(ctx, storage.DefaultStorageEngine, base.TempStorageConfig{Path: tempStoragePath, Mon: diskMonitor}, tempStoreSpec)
require.NoError(b, err)
defer tempEngine.Close()
flowCtx.Cfg.TempStorage = tempEngine
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func runProcessorTest(
switch pt := p.(type) {
case *joinReader:
// Reduce batch size to exercise batching logic.
pt.SetBatchSize(2 /* batchSize */)
pt.SetBatchSizeBytes(2 * int64(inputRows[0].Size()))
case *indexJoiner:
// Reduce batch size to exercise batching logic.
pt.SetBatchSize(2 /* batchSize */)
Expand Down