From e9f1c056cde6054f05e40206cfea0e16491616b5 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 14 Jul 2022 07:55:04 -0700 Subject: [PATCH] rowexec: increase the batch size for join reader ordering strategy This commit increases the default value of `sql.distsql.join_reader_ordering_strategy.batch_size` cluster setting (which determines the input row batch size for the lookup joins when ordering needs to be maintained) from 10KiB to 100KiB since the previous number is likely to have been too conservative. We have seen support cases (https://github.com/cockroachlabs/support/issues/1627) where bumping up this setting was needed to get reasonable performance, we also change this setting to 100KiB in our TPC-E setup (https://github.com/cockroachlabs/tpc-e/blob/d47d3ea5ce71ecb1be5e0e466a8aa7c94af63d17/tier-a/src/schema.rs#L404). I did some historical digging, and I believe that the number 10KiB was chosen somewhat arbitrarily with no real justification in #48058. That PR changed how we measure the input row batches from the number of rows to the memory footprint of the input rows. Prior to that change we had 100 rows limit, so my guess that 10KiB was somewhat dependent on that number. The reason we don't want this batch size to be too large is that we buffer all looked up rows in a disk-backed container, so if too many responses come back (because we included too many input rows in the batch), that container has to spill to disk. To make sure we don't regress in such scenarios this commit teaches the join reader to lower the batch size bytes limit if the container does spill to disk, until 10 KiB which is treated as the minimum. Release note: None --- .../testdata/logic_test/information_schema | 2 +- .../logictest/testdata/logic_test/pg_catalog | 4 ++-- .../logictest/testdata/logic_test/show_source | 2 +- pkg/sql/rowexec/joinreader.go | 19 +++++++++++++++++++ pkg/sql/rowexec/joinreader_strategies.go | 6 ++---- 5 files changed, 25 insertions(+), 8 deletions(-) diff --git a/pkg/sql/logictest/testdata/logic_test/information_schema b/pkg/sql/logictest/testdata/logic_test/information_schema index 234310929302..a326ecb4f0eb 100644 --- a/pkg/sql/logictest/testdata/logic_test/information_schema +++ b/pkg/sql/logictest/testdata/logic_test/information_schema @@ -4657,7 +4657,7 @@ integer_datetimes on intervalstyle postgres intervalstyle_enabled on is_superuser on -join_reader_ordering_strategy_batch_size 10 KiB +join_reader_ordering_strategy_batch_size 100 KiB large_full_scan_rows 1000 lc_collate C.UTF-8 lc_ctype C.UTF-8 diff --git a/pkg/sql/logictest/testdata/logic_test/pg_catalog b/pkg/sql/logictest/testdata/logic_test/pg_catalog index a8cd5c37fd6d..cb9bcd9f2580 100644 --- a/pkg/sql/logictest/testdata/logic_test/pg_catalog +++ b/pkg/sql/logictest/testdata/logic_test/pg_catalog @@ -4186,7 +4186,7 @@ inject_retry_errors_enabled off NULL integer_datetimes on NULL NULL NULL string intervalstyle postgres NULL NULL NULL string is_superuser on NULL NULL NULL string -join_reader_ordering_strategy_batch_size 10 KiB NULL NULL NULL string +join_reader_ordering_strategy_batch_size 100 KiB NULL NULL NULL string large_full_scan_rows 1000 NULL NULL NULL string lc_collate C.UTF-8 NULL NULL NULL string lc_ctype C.UTF-8 NULL NULL NULL string @@ -4310,7 +4310,7 @@ inject_retry_errors_enabled off NULL integer_datetimes on NULL user NULL on on intervalstyle postgres NULL user NULL postgres postgres is_superuser on NULL user NULL on on -join_reader_ordering_strategy_batch_size 10 KiB NULL user NULL 10 KiB 10 KiB +join_reader_ordering_strategy_batch_size 100 KiB NULL user NULL 100 KiB 100 KiB large_full_scan_rows 1000 NULL user NULL 1000 1000 lc_collate C.UTF-8 NULL user NULL C.UTF-8 C.UTF-8 lc_ctype C.UTF-8 NULL user NULL C.UTF-8 C.UTF-8 diff --git a/pkg/sql/logictest/testdata/logic_test/show_source b/pkg/sql/logictest/testdata/logic_test/show_source index 9fd11d1270e9..e044c5b6f14b 100644 --- a/pkg/sql/logictest/testdata/logic_test/show_source +++ b/pkg/sql/logictest/testdata/logic_test/show_source @@ -78,7 +78,7 @@ inject_retry_errors_enabled off integer_datetimes on intervalstyle postgres is_superuser on -join_reader_ordering_strategy_batch_size 10 KiB +join_reader_ordering_strategy_batch_size 100 KiB large_full_scan_rows 1000 lc_collate C.UTF-8 lc_ctype C.UTF-8 diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index b7f21e06c2e8..c95434ae76d9 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -1047,9 +1047,28 @@ func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMet log.VEvent(jr.Ctx, 1, "done joining rows") jr.strategy.prepareToEmit(jr.Ctx) + // Check if the strategy spilled to disk and reduce the batch size if it + // did. + // TODO(yuzefovich): we should probably also grow the batch size bytes limit + // dynamically if we haven't spilled and are not close to spilling (say not + // exceeding half of the memory limit of the disk-backed container), up to + // some limit. (This would only apply to the joinReaderOrderingStrategy + // since other strategies cannot spill in the first place.) Probably it'd be + // good to look at not just the current batch of input rows, but to keep + // some statistics over the last several batches to make a more informed + // decision. + if jr.strategy.spilled() && jr.batchSizeBytes > joinReaderMinBatchSize { + jr.batchSizeBytes = jr.batchSizeBytes / 2 + if jr.batchSizeBytes < joinReaderMinBatchSize { + jr.batchSizeBytes = joinReaderMinBatchSize + } + } + return jrEmittingRows, nil } +const joinReaderMinBatchSize = 10 << 10 /* 10 KiB */ + // emitRow returns the next row from jr.toEmit, if present. Otherwise it // prepares for another input batch. func (jr *joinReader) emitRow() ( diff --git a/pkg/sql/rowexec/joinreader_strategies.go b/pkg/sql/rowexec/joinreader_strategies.go index e497fcbe73d6..36fde25c1455 100644 --- a/pkg/sql/rowexec/joinreader_strategies.go +++ b/pkg/sql/rowexec/joinreader_strategies.go @@ -451,7 +451,7 @@ var partialJoinSentinel = []int{-1} // // Say the joinReader looks up rows in order: (red, x), then (blue, y). Once // (red, x) is fetched, it is handed to -// joinReaderOderingStrategy.processLookedUpRow(), which will match it against +// joinReaderOrderingStrategy.processLookedUpRow(), which will match it against // all the corresponding input rows, producing (1, x), (4, x). These two rows // are not emitted because that would violate the input ordering (well, (1, x) // could be emitted, but we're not smart enough). So, they are buffered until @@ -535,7 +535,7 @@ type joinReaderOrderingStrategy struct { testingInfoSpilled bool } -const joinReaderOrderingStrategyBatchSizeDefault = 10 << 10 /* 10 KiB */ +const joinReaderOrderingStrategyBatchSizeDefault = 100 << 10 /* 100 KiB */ // JoinReaderOrderingStrategyBatchSize determines the size of input batches used // to construct a single lookup KV batch by joinReaderOrderingStrategy. @@ -548,8 +548,6 @@ var JoinReaderOrderingStrategyBatchSize = settings.RegisterByteSizeSetting( ) func (s *joinReaderOrderingStrategy) getLookupRowsBatchSizeHint(sd *sessiondata.SessionData) int64 { - // TODO(asubiotto): Eventually we might want to adjust this batch size - // dynamically based on whether the result row container spilled or not. if sd.JoinReaderOrderingStrategyBatchSize == 0 { // In some tests the session data might not be set - use the default // value then.