-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rowexec: change lookup join batch size to be specified in bytes #48058
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 3 of 3 files at r1.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @jordanlewis)
pkg/sql/rowexec/joinreader.go, line 44 at r1 (raw file):
// TODO(asubiotto): Eventually we might want to adjust this batch size // dynamically based on whether the result row container spilled or not. const joinReaderDefaultLookupBatchSizeBytes = 6 << 20 /* 6 MiB */
This number seems too "overfitted" to the query that we ran the benchmarks on, no? Also I think it should depend on 10MiB
constant, so maybe something like TargetBytesSize / 2
or TargetBytesSize / 4
would be better?
pkg/sql/rowexec/joinreader.go, line 97 at r1 (raw file):
// Batch size for fetches. Not a constant so we can lower for testing. batchSizeBytes int64 curBatchSize int64
nit: s/curBatchSize/curBatchSizeBytes/g
for symmetry.
pkg/sql/rowexec/utils_test.go, line 62 at r1 (raw file):
case *joinReader: // Reduce batch size to exercise batching logic. pt.SetBatchSizeBytes(int64(inputRows[0].Size()))
nit: missing a factor of 2
, not sure if it matters though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto, @jordanlewis, and @yuzefovich)
pkg/sql/rowexec/joinreader.go, line 44 at r1 (raw file):
Previously, yuzefovich wrote…
This number seems too "overfitted" to the query that we ran the benchmarks on, no? Also I think it should depend on
10MiB
constant, so maybe something likeTargetBytesSize / 2
orTargetBytesSize / 4
would be better?
It could definitely be. The goal of this batch size is to be as big as possible while avoiding disk spilling (performance tanks due to random accesses of the disk container). I think it's a good starting point since the query in question has a big lookup to result byte ratio. I think that we might want to end up calculating a dynamic batch size by assuming that that ratio will stay more or less the same and tracking how much memory is used by the results.
Note that the 10MiB
TargetBytes
is a per-fetch limit, but we can't stop fetching until we've got all the results for our current lookup batch (since they might be out of order and we want to preserve the order of the input).
I'm also going to look into whether we can change the way we're storing rows on disk. If we match a result row with a lookup ordinal and sort by that, I think we can change the accesses to be sequential which might change this batch size equation.
I might be totally off here, but are there cases when we don't care about preserving the order? Maybe we could have different modes of |
That's actually a very good observation. I'm going to ask the optimizer folks. I'm also wondering whether it would be more efficient to not order anything and if we rely on ordering plan a sorter after the lookup join. If we can't specify the sort in terms of some pre-existing ordering key on the left side, I think that we probably don't care about ordering at all (since the only "ordering" of the lookup side will be accidental in this case). |
I had another thought: since the access pattern of the indexed row container is random, maybe trying disabling its cache when we spilled to disk? The comments say that disabling should only be "used in tests", but that could be ignored. Maybe then spilling to disk won't be that bad, and we can continue reusing the same row container implementation. |
Possibly. I talked to the optimizer folks and it seems like we already communicate a required ordering when we need it. However, it looks like in the distsql physical planning code we expect a lookup joiner to always satisfy this ordering and only plan a merge ordering stage after it. I think we'll want to provide two different implementations with as much codeshare as possible for the required ordering vs no ordering case. This will also allow us to have different batching strategies. I think what needs to be done is:
Before we make any improvements though, I think we need to work on having benchmarks for a lot of different cases. I'll work on adding these and distilling these thoughts into issues. |
❌ The GitHub CI (Cockroach) build has failed on 2b6c92ec. 🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is otan. |
❌ The GitHub CI (Cockroach) build has failed on 95d334bb. 🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is otan. |
❌ The GitHub CI (Cockroach) build has failed on 832b3ef6. 🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is otan. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 4 of 4 files at r2.
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @asubiotto and @jordanlewis)
pkg/sql/rowexec/joinreader.go, line 97 at r1 (raw file):
Previously, yuzefovich wrote…
nit:
s/curBatchSize/curBatchSizeBytes/g
for symmetry.
Ping.
pkg/sql/rowexec/utils_test.go, line 62 at r1 (raw file):
Previously, yuzefovich wrote…
nit: missing a factor of
2
, not sure if it matters though.
Ping.
nit: the PR description needs an update. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 2 of 0 LGTMs obtained (waiting on @asubiotto and @jordanlewis)
The current status of this is that there are some performance regressions in microbenchmarks which is curious given the TPCH improvements. I want to take a look at this before I merge the PR. |
I focused on I first tried modifying the benchmark to use disk (I already thought it did, but it was only using it for the temporary engine, not the primary store) but it looks like there's still a noticeable difference. Looking at the profiles, the biggest difference seems to be due to more timestamp cache use because of the bigger read batches. One thing that stood out in the profile was some expensive error checking that I'm fixing in #49324. The other thing is that it seems like we're allocating a lot in Given that this regression shows up only in the microbenchmarks, I'm fine with merging this as is. @yuzefovich / @jordanlewis lmk what you think and I'll go ahead. |
The previous batch size of 100 rows was overly defensive in order to protect against result memory blowup. Since the addition of TargetBytes to the roachpb API, a result will no longer take more than 10MiB. Having a higher batch size allows us to amortize the lookup cost. The lookup join with no ordering variant now has a batch size of 2MiB, which was determined by running TPCH queries 7, 9, 10, 11 and choosing the smallest batch size that showed a major performance improvement. The lookup join with ordering variant has a batch size of 10KiB. The goal is to keep it as close to the previous 100 row batch size as possible, since increasing the batch size could result in lookup joins that were not spilling to disk do so, since the number of results grows based on the number of lookup rows and this strategy need to buffer all result rows for a lookup batch. This change shows a 1.5x-7.5x performance improvement on TPCH queries 7, 9, 10, and 11 compared to 20.1. Release note (performance improvement): lookup join performance has been improved.
The previous benchmark was using an in-memory primary store. To reflect the real cost of lookups, this commit changes that store to be on-disk. Release note: None (benchmark change)
I'm in favor of merging as is, but we should probably open an issue to investigate the timestamp cache allocations. |
The timestamp cache allocates large slabs of memory at a time - 32 MB chunk by default. I think the problem here is that we dramatically reduce this allocation size for tests. See This is kind of like how we use an in-memory storage engine for tests. We'll either want to configure the tscache properly in benchmarks with the usual allocation size so as to provide a more realistic environment for benchmarks or just ignore it in general. For this PR though, you'll probably want to set |
I wonder if we should do what Pebble does and double the size of the page on each allocation up to some maximum. |
I like that idea a lot. Out of curiosity, where does Pebble do this? With its block cache? Its memtables? What are the minimum and maximum sizes that it uses? |
It only does this with the memtable. See |
Thanks for the info. Increasing this page size did help, but the microbenchmarks are still showing a regression. Will investigate further independently from this PR. bors r=yuzefovich,jordanlewis |
Build succeeded |
Addresses a suggestion from Peter: cockroachdb#48058 (comment). This change updates `intervalSkl` to dynamically grow the size of its pages as pages are rotated. This allows the structure to start off small (128 KB per page) and grow logarithmically to a maximum size (32 MB per page) as it is used. The pages start small to limit the memory footprint of the data structure for short-lived tests but will settle upon the maximum page size under steady-state on a long-lived process. This does not appear to have an impact on benchmarks: ``` ➜ benchdiff --run='BenchmarkJoinReader/reqOrdering=false/matchratio=onetothirtytwo/lookuprows=16384' ./pkg/sql/rowexec checking out 'f575fa8' building benchmark binaries for 'f575fa8' 1/1 - checking out '3d46054' building benchmark binaries for '3d46054' 1/1 / pkg=1/1 iter=10/10 cockroachdb/cockroach/pkg/sql/rowexec | name old time/op new time/op delta JoinReader/reqOrdering=false/matchratio=onetothirtytwo/lookuprows=16384-16 1.34s ± 2% 1.34s ± 4% ~ (p=1.000 n=9+10) name old speed new speed delta JoinReader/reqOrdering=false/matchratio=onetothirtytwo/lookuprows=16384-16 3.23MB/s ± 2% 3.23MB/s ± 3% ~ (p=0.953 n=9+10) name old alloc/op new alloc/op delta JoinReader/reqOrdering=false/matchratio=onetothirtytwo/lookuprows=16384-16 72.1MB ± 0% 73.9MB ± 0% +2.44% (p=0.000 n=10+10) name old allocs/op new allocs/op delta JoinReader/reqOrdering=false/matchratio=onetothirtytwo/lookuprows=16384-16 556k ± 0% 556k ± 0% ~ (p=0.781 n=10+10) ```
Addresses a suggestion from Peter: cockroachdb#48058 (comment). This change updates `intervalSkl` to dynamically grow the size of its pages as pages are rotated. This allows the structure to start off small (128 KB per page) and grow exponentially to a maximum size (32 MB per page) as it is used. The pages start small to limit the memory footprint of the data structure for short-lived tests but will settle upon the maximum page size under steady-state on a long-lived process. This does not appear to have an impact on benchmarks: ``` ➜ benchdiff --run='BenchmarkJoinReader/reqOrdering=false/matchratio=onetothirtytwo/lookuprows=16384' ./pkg/sql/rowexec checking out 'f575fa8' building benchmark binaries for 'f575fa8' 1/1 - checking out '3d46054' building benchmark binaries for '3d46054' 1/1 / pkg=1/1 iter=10/10 cockroachdb/cockroach/pkg/sql/rowexec | name old time/op new time/op delta JoinReader/reqOrdering=false/matchratio=onetothirtytwo/lookuprows=16384-16 1.34s ± 2% 1.34s ± 4% ~ (p=1.000 n=9+10) name old speed new speed delta JoinReader/reqOrdering=false/matchratio=onetothirtytwo/lookuprows=16384-16 3.23MB/s ± 2% 3.23MB/s ± 3% ~ (p=0.953 n=9+10) name old alloc/op new alloc/op delta JoinReader/reqOrdering=false/matchratio=onetothirtytwo/lookuprows=16384-16 72.1MB ± 0% 73.9MB ± 0% +2.44% (p=0.000 n=10+10) name old allocs/op new allocs/op delta JoinReader/reqOrdering=false/matchratio=onetothirtytwo/lookuprows=16384-16 556k ± 0% 556k ± 0% ~ (p=0.781 n=10+10) ```
Addresses a suggestion from Peter: cockroachdb#48058 (comment). This change updates `intervalSkl` to dynamically grow the size of its pages as pages are rotated. This allows the structure to start off small (128 KB per page) and grow exponentially to a maximum size (32 MB per page) as it is used. The pages start small to limit the memory footprint of the data structure for short-lived tests but will settle upon the maximum page size under steady-state on a long-lived process. This does not appear to have an impact on benchmarks: ``` ➜ benchdiff --run='BenchmarkJoinReader/reqOrdering=false/matchratio=onetothirtytwo/lookuprows=16384' ./pkg/sql/rowexec checking out 'f575fa8' building benchmark binaries for 'f575fa8' 1/1 - checking out '3d46054' building benchmark binaries for '3d46054' 1/1 / pkg=1/1 iter=10/10 cockroachdb/cockroach/pkg/sql/rowexec | name old time/op new time/op delta JoinReader/reqOrdering=false/matchratio=onetothirtytwo/lookuprows=16384-16 1.34s ± 2% 1.34s ± 4% ~ (p=1.000 n=9+10) name old speed new speed delta JoinReader/reqOrdering=false/matchratio=onetothirtytwo/lookuprows=16384-16 3.23MB/s ± 2% 3.23MB/s ± 3% ~ (p=0.953 n=9+10) name old alloc/op new alloc/op delta JoinReader/reqOrdering=false/matchratio=onetothirtytwo/lookuprows=16384-16 72.1MB ± 0% 73.9MB ± 0% +2.44% (p=0.000 n=10+10) name old allocs/op new allocs/op delta JoinReader/reqOrdering=false/matchratio=onetothirtytwo/lookuprows=16384-16 556k ± 0% 556k ± 0% ~ (p=0.781 n=10+10) ```
49422: kv/tscache: dynamically size intervalSkl pages r=nvanbenschoten a=nvanbenschoten Addresses a suggestion from Peter: #48058 (comment). This change updates `intervalSkl` to dynamically grow the size of its pages as pages are rotated. This allows the structure to start off small (128 KB per page) and grow logarithmically to a maximum size (32 MB per page) as it is used. The pages start small to limit the memory footprint of the data structure for short-lived tests but will settle upon the maximum page size under steady-state on a long-lived process. This does not appear to have an impact on benchmarks: ``` ➜ benchdiff --run='BenchmarkJoinReader/reqOrdering=false/matchratio=onetothirtytwo/lookuprows=16384' ./pkg/sql/rowexec checking out 'f575fa8' building benchmark binaries for 'f575fa8' 1/1 - checking out '3d46054' building benchmark binaries for '3d46054' 1/1 / pkg=1/1 iter=10/10 cockroachdb/cockroach/pkg/sql/rowexec | name old time/op new time/op delta JoinReader/reqOrdering=false/matchratio=onetothirtytwo/lookuprows=16384-16 1.34s ± 2% 1.34s ± 4% ~ (p=1.000 n=9+10) name old speed new speed delta JoinReader/reqOrdering=false/matchratio=onetothirtytwo/lookuprows=16384-16 3.23MB/s ± 2% 3.23MB/s ± 3% ~ (p=0.953 n=9+10) name old alloc/op new alloc/op delta JoinReader/reqOrdering=false/matchratio=onetothirtytwo/lookuprows=16384-16 72.1MB ± 0% 73.9MB ± 0% +2.44% (p=0.000 n=10+10) name old allocs/op new allocs/op delta JoinReader/reqOrdering=false/matchratio=onetothirtytwo/lookuprows=16384-16 556k ± 0% 556k ± 0% ~ (p=0.781 n=10+10) ``` Co-authored-by: Nathan VanBenschoten <[email protected]>
This commit increases the default value of `sql.distsql.join_reader_ordering_strategy.batch_size` cluster setting (which determines the input row batch size for the lookup joins when ordering needs to be maintained) from 10KiB to 100KiB since the previous number is likely to have been too conservative. We have seen support cases (cockroachlabs/support#1627) where bumping up this setting was needed to get reasonable performance, we also change this setting to 100KiB in our TPC-E setup (https://github.com/cockroachlabs/tpc-e/blob/d47d3ea5ce71ecb1be5e0e466a8aa7c94af63d17/tier-a/src/schema.rs#L404). I did some historical digging, and I believe that the number 10KiB was chosen somewhat arbitrarily with no real justification in cockroachdb#48058. That PR changed how we measure the input row batches from the number of rows to the memory footprint of the input rows. Prior to that change we had 100 rows limit, so my guess that 10KiB was somewhat dependent on that number. The reason we don't want this batch size to be too large is that we buffer all looked up rows in a disk-backed container, so if too many responses come back (because we included too many input rows in the batch), that container has to spill to disk. To make sure we don't regress in such scenarios this commit teaches the join reader to lower the batch size bytes limit if the container does spill to disk, until 10 KiB which is treated as the minimum. Release note: None
This commit increases the default value of `sql.distsql.join_reader_ordering_strategy.batch_size` cluster setting (which determines the input row batch size for the lookup joins when ordering needs to be maintained) from 10KiB to 100KiB since the previous number is likely to have been too conservative. We have seen support cases (cockroachlabs/support#1627) where bumping up this setting was needed to get reasonable performance, we also change this setting to 100KiB in our TPC-E setup (https://github.com/cockroachlabs/tpc-e/blob/d47d3ea5ce71ecb1be5e0e466a8aa7c94af63d17/tier-a/src/schema.rs#L404). I did some historical digging, and I believe that the number 10KiB was chosen somewhat arbitrarily with no real justification in cockroachdb#48058. That PR changed how we measure the input row batches from the number of rows to the memory footprint of the input rows. Prior to that change we had 100 rows limit, so my guess that 10KiB was somewhat dependent on that number. The reason we don't want this batch size to be too large is that we buffer all looked up rows in a disk-backed container, so if too many responses come back (because we included too many input rows in the batch), that container has to spill to disk. To make sure we don't regress in such scenarios this commit teaches the join reader to lower the batch size bytes limit if the container does spill to disk, until 10 KiB which is treated as the minimum. Release note: None
This commit increases the default value of `sql.distsql.join_reader_ordering_strategy.batch_size` cluster setting (which determines the input row batch size for the lookup joins when ordering needs to be maintained) from 10KiB to 100KiB since the previous number is likely to have been too conservative. We have seen support cases (cockroachlabs/support#1627) where bumping up this setting was needed to get reasonable performance, we also change this setting to 100KiB in our TPC-E setup (https://github.com/cockroachlabs/tpc-e/blob/d47d3ea5ce71ecb1be5e0e466a8aa7c94af63d17/tier-a/src/schema.rs#L404). I did some historical digging, and I believe that the number 10KiB was chosen somewhat arbitrarily with no real justification in cockroachdb#48058. That PR changed how we measure the input row batches from the number of rows to the memory footprint of the input rows. Prior to that change we had 100 rows limit, so my guess that 10KiB was somewhat dependent on that number. The reason we don't want this batch size to be too large is that we buffer all looked up rows in a disk-backed container, so if too many responses come back (because we included too many input rows in the batch), that container has to spill to disk. To make sure we don't regress in such scenarios this commit teaches the join reader to lower the batch size bytes limit if the container does spill to disk, until 10 KiB which is treated as the minimum. Release note: None
84324: sqlsmith: make order-dependent aggregation functions deterministic r=msirek,mgartner a=michae2 **cmd: add smith command** Add a new top-level command `smith` which dumps randomly-generated sqlsmith queries. This is useful for testing modifications to sqlsmith. Assists: #83024 Release note: None **sqlsmith: make order-dependent aggregation functions deterministic** Some aggregation functions (e.g. string_agg) have results that depend on the order of input rows. To make sqlsmith more deterministic, add ORDER BY clauses to these aggregation functions whenever their argument is a column reference. (When their argument is a constant, ordering doesn't matter.) Fixes: #83024 Release note: None 84398: colflow: prevent deadlocks when many queries spill to disk at same time r=yuzefovich a=yuzefovich **colflow: prevent deadlocks when many queries spill to disk at same time** This commit fixes a long-standing issue which could cause memory-intensive queries to deadlock on acquiring the file descriptors quota when vectorized execution spills to disk. This bug has been present since the introduction of disk-spilling (over two and a half years ago, introduced in #45318 and partially mitigated in #45892), but we haven't seen this in any user reports, only in `tpch_concurrency` roachtest runs, so the severity seems pretty minor. Consider the following query plan: ``` Node 1 Node 2 TableReader TableReader | | HashRouter HashRouter | \ ___________ / | | \/__________ | | / \ | HashAggregator HashAggregator ``` and let's imagine that each hash aggregator has to spill to disk. This would require acquiring the file descriptors quota. Now, imagine that because of that hash aggregators' spilling, each of the hash routers has slow outputs causing them to spill too. As a result, this query plan can require `A + 2 * R` number of FDs of a single node to succeed where `A` is the quota for a single hash aggregator (equal to 16 - with the default value of `COCKROACH_VEC_MAX_OPEN_FDS` environment variable which is 256) and `R` is the quota for a single router output (2). This means that we can estimate that 20 FDs from each node are needed for the query to finish execution with 16 FDs being acquired first. Now imagine that this query is run with concurrency of 16. We can end up in such a situation that all hash aggregators have spilled, fully exhausting the global node limit on each node, so whenever the hash router outputs need to spill, they block forever since no FDs will ever be released, until a query is canceled or a node is shutdown. In other words, we have a deadlock. This commit fixes this situation by introducing a retry mechanism to exponentially backoff when trying to acquire the FD quota, until a time out. The randomizations provided by the `retry` package should be sufficient so that some of the queries succeed while others result in an error. Unfortunately, I don't see a way to prevent this deadlock from occurring in the first place without possible increase in latency in some case. The difficult thing is that we currently acquire FDs only once we need them, meaning once a particular component spills to disk. We could acquire the maximum number of FDs that a query might need up-front, before the query execution starts, but that could lead to starvation of the queries that ultimately won't spill to disk. This seems like a much worse impact than receiving timeout errors on some analytical queries when run with high concurrency. We're not an OLAP database, so this behavior seems ok. Fixes: #80290. Release note (bug fix): Previously, CockroachDB could deadlock when evaluating analytical queries f multiple queries had to spill to disk at the same time. This is now fixed by making some of the queries error out instead. **roachtest: remove some debugging printouts in tpch_concurrency** This was added to track down the deadlock fixed in the previous commit, so we no longer need it. Release note: None 84430: sql/schemachanger/scplan: allow plan to move to NonRevertible earlier r=ajwerner a=ajwerner This is critical for DROP COLUMN. In `DROP COLUMN` we cannot perform the primary index swap until the dropping column reaches `DELETE_ONLY`, which is not revertible. The primary index swap itself is revertible. Given this fact, we need a mechanism to be able to "promote" revertible operations (operations which don't destroy information or cause irrevocable visible side effects) to be grouped with or after non-revertible operations. This commit makes that happen naturally. Release note: None 84433: rowexec: increase the batch size for join reader ordering strategy r=yuzefovich a=yuzefovich This commit increases the default value of `sql.distsql.join_reader_ordering_strategy.batch_size` cluster setting (which determines the input row batch size for the lookup joins when ordering needs to be maintained) from 10KiB to 100KiB since the previous number is likely to have been too conservative. We have seen support cases (https://github.com/cockroachlabs/support/issues/1627) where bumping up this setting was needed to get reasonable performance, we also change this setting to 100KiB in our TPC-E setup (https://github.com/cockroachlabs/tpc-e/blob/d47d3ea5ce71ecb1be5e0e466a8aa7c94af63d17/tier-a/src/schema.rs#L404). I did some historical digging, and I believe that the number 10KiB was chosen somewhat arbitrarily with no real justification in #48058. That PR changed how we measure the input row batches from the number of rows to the memory footprint of the input rows. Prior to that change we had 100 rows limit, so my guess that 10KiB was somewhat dependent on that number. The reason we don't want this batch size to be too large is that we buffer all looked up rows in a disk-backed container, so if too many responses come back (because we included too many input rows in the batch), that container has to spill to disk. To make sure we don't regress in such scenarios this commit teaches the join reader to lower the batch size bytes limit if the container does spill to disk, until 10 KiB which is treated as the minimum. Release note: None Co-authored-by: Michael Erickson <[email protected]> Co-authored-by: Yahor Yuzefovich <[email protected]> Co-authored-by: Andrew Werner <[email protected]>
The previous batch size of 100 rows was overly defensive in order to protect
against result memory blowup. Since the addition of TargetBytes to the roachpb
API, a result will no longer take more than 10MiB. Having a higher batch size
allows us to amortize the lookup cost.
The concern with a larger batch size is no longer whether the results will
cause a memory blowup, but whether the results need to be spilled to disk.
A variation of TPCH Q10 which has a lookup join with a ratio of 8 result bytes
per lookup byte to determine that 6MiB was a good default batch size. However,
since this ratio changes based on the query and lookup columns, we eventually
will want to make this lookup batch size dynamic.
With the new default of 6MiB, the TPCH Q10 variation shows a 15% improvement
on local lookups (2.8s v 3.3s) and an 88% improvement (11s vs 97s) with an
artificial 150ms latency injected into kvfetcher fetches.
Release note (performance improvement): lookup join performance has been
improved.
Closes #39471