From 8c4d31f036f1389b1a7aae0658bcc09ed800510c Mon Sep 17 00:00:00 2001 From: wenyihu3 Date: Tue, 28 Jun 2022 15:43:13 -0400 Subject: [PATCH 01/11] rfc: add rfc for invisible index feature This commit adds an RFC for the invisible index feature. Related issue: https://github.com/cockroachdb/cockroach/issues/72576, https://github.com/cockroachdb/cockroach/issues/82363 Release justification: low risk to the existing functionality; this commit just adds rfc. Release Note: none --- docs/RFCS/20220628_invisible_index.md | 428 ++++++++++++++++++++++++++ 1 file changed, 428 insertions(+) create mode 100644 docs/RFCS/20220628_invisible_index.md diff --git a/docs/RFCS/20220628_invisible_index.md b/docs/RFCS/20220628_invisible_index.md new file mode 100644 index 000000000000..bb55f13a8553 --- /dev/null +++ b/docs/RFCS/20220628_invisible_index.md @@ -0,0 +1,428 @@ +- Feature Name: Invisible Index +- Status: in-progress +- Start Date: 2022-06-28 +- Authors: Wenyi Hu +- RFC PR: https://github.com/cockroachdb/cockroach/pull/83531 +- Cockroach Issue: https://github.com/cockroachdb/cockroach/issues/72576, + https://github.com/cockroachdb/cockroach/issues/82363 + +# Summary + +An invisible index is an index that is maintained up-to-date but is ignored by +the optimizer unless explicitly selected with [index +hinting](https://www.cockroachlabs.com/docs/v22.1/table-expressions#force-index-selection) +or for constraint purposes. + +The main purpose of this RFC is to introduce the feature, document +implementation decisions, and propose a technical design. + +# Motivation: use cases + +### 1. Roll out new indexes with more confidence. +When you create a new index, all queries are able to pick it which could have +an immediate effect on the workload. Currently, some users with large +production scales are concerned about the impact of introducing new indexes and +potentially affecting their applications significantly. + +With invisible indexes, you can introduce the index as invisible first. In a new +session, you could give a workout and observe the impact of the new index by +turning `optimizer_use_not_visible_indexes` on or with index hinting. If this +index does turn out to be useful, you can then change this index to be visible +in your database. + +Note that this allows us to see the impact more safely; the maintenance cost +associated with an index during inserts, upserts, updates, or delete is still +needed. + +### 2. Drop indexes with less risk. +A question that comes up frequently about indexes is whether an index is +actually useful for queries or if it is just sitting around and wasting +maintenance cost. Currently, the only way you can test this is by dropping the +index and then recreating it if the index turns out to be useful. However, when +the table gets large, recreating the index can become really expensive. + +With invisible indexes, you can mark the index as invisible first, wait for a +few weeks to measure the impact, and then drop the index if no drop in +performance is observed. If the index turns out to be needed, you can easily +change the index back to visible without the cost of rebuilding an index. + +Note that using an invisible index reduces the risk associated with dropping the +index but not with no risks. First, just because an index is not used during +this observation period, this does not mean it will not be used in the future. +Second, invisible indexes are still used behind the scene by the optimizer for +any constraint check and maintenance purposes (more details below). In that +case, you cannot expect the database to behave in the exact same way as dropping +an index. + +### 3. Debugging. +If queries suddenly start using an index unexpectedly and is causing performance +issues, you can change the index to be invisible as a short term solution. You +can then investigate what the problem might be using +`optimizer_use_not_visible_indexes` or index hinting in a new session. Once the +issue has been solved, the index can be made visible again. + +### 4. Make indexes only available to specific queries. +If you know certain queries have problems and creating an index would help, you +can use invisible index and make this index available only to queries you want +with index hinting. In this way you can leave the rest of your application +unaffected. + +# Implementation Decisions +### Conclusion: +- Users can create an invisible index or change an existing index to be invisible. +- By default, indexes are visible. +- Primary indexes cannot be invisible. +- Constraints cannot be created with invisible indexes. Creating unique or foreign key constraints with invisible indexes is not supported. +- Partial invisible indexes or inverted invisible indexes are both supported. The behavior is as expected. +- Queries can be instructed to use invisible indexes explicitly through [index + hinting](https://www.cockroachlabs.com/docs/v22.1/table-expressions#force-index-selection). +- Session variable, `optimizer_use_not_visible_indexes`, can be set to true to tell the optimizer to treat invisible indexes as they are visible. By default, `optimizer_use_not_visible_indexes` is set to false. + +The following points are where things might be unexpected as making an index invisible is not exactly the same as dropping the index. +- Force index or index hinting with invisible index is allowed and will override the invisible index feature. + - If the index is dropped instead, this will throw an error. +- Invisible indexes will be treated as visible while policing unique or foreign key constraints. In other words, we will temporarily disable the invisible index feature during any constraint check. + - If the index is dropped, the query plan for constraint check could be different and lead to a full table scan. + +### 1. What types of indexes can be invisible? +Primary indexes cannot be invisible. Any secondary indexes including unique +indexes can be invisible. + +In [MySQL](https://dev.mysql.com/doc/refman/8.0/en/invisible-indexes.html), all +indexes other than primary keys can be invisible. In MySQL, a table with no +explicit primary key will use the first unique index on NOT NULL columns as an +implicit primary key. Implicit and explicit primary indexes are both not allowed +to be invisible. However, a table with no explicit primary key creates a new +rowid column in CRDB, and creating a unique index on a null column will not +change the primary key. + +### 2. Can constraints be invisible? Can constraints be created with invisible indexes? +No. Constraints cannot be invisible, and they cannot be created with invisible +indexes. Having invisible constraint means that the constraint can be on and off +at different times. Since constraint is an insert-time enforcement, allowing +invisible constraint could lead to corrupted indexes. + +One might think creating unique constraints with invisible indexes is similar to +creating unique constraints without indexes (which is something CRDB is +currently supporting). But they have very different semantic meanings. First, +creating a constraint without index is not user-friendly and was created for +multi-tenant testing purposes. Second, creating a constraint with an invisible +index is still an index but just ignored by the optimizer. + +Overall, only indexes can be invisible. Creating a unique constraint or a +foreign key constraint with invisible indexes will not be supported. This leads +to an issue with the parser. This behavior aligns with MySQL. + +This leads to another issue in the parser; creating an invisible unique index +inside a `CREATE TABLE` definition is supported by the grammar rule, but the +parser will throw an error. This is because the parser is doing a round trip in +`pkg/testutils/sqlutils/pretty.go`. In sql.y, creating a unique index in a +`CREATE TABLE` statement returns a new structure +`tree.UniqueConstraintTableDef`. However, creating unique constraints with not +visible indexes is not supported by the parser. When the parser does a round +trip for the following statement, it formats it to a pretty statement using the +unique constraint definition. But the parser does not support unique constraint +with not visible index syntax. So it will fail while parsing the pretty +statement. Since logictest also performs a roundtrip check in +`pkg/sql/logictest/logic.go`, logictest would also fail. But creating a unique +index inside a `CREATE TABLE` definition will still work in a cluster. This is a +known issue. See more details in +https://github.com/cockroachdb/cockroach/pull/65825. + +### 3. Should Force Index with invisible index be allowed? +Using index hinting with invisible indexes is allowed and is part of the feature +design. Although this may lead to different behavior with the index being +dropped, this offers more flexibility with the feature. For example, the +fallback of some queries might be a full table scan and may be too expensive. + +In MySQL, index hinting with invisible indexes +[errors](https://dev.mysql.com/doc/refman/8.0/en/invisible-indexes.html). +Instead, MySQL supports index hinting with the session variable +[optimizer_use_not_visible_indexes](https://dev.mysql.com/doc/refman/8.0/en/switchable-optimizations.html#optflag_use-invisible-indexes). +Users can instruct queries to use invisible indexes by setting this session +variable to true for only specific queries. + +### 4. Are invisible indexes still maintained and up-to-date? +Yes. Just like any other indexes, an invisible index consumes maintenance cost +and resources. Regardless of visibility, indexes are maintained up-to-date with +insert, delete, upsert, and update. + +This behavior aligns with [MySQL](https://dev.mysql.com/doc/refman/8.0/en/invisible-indexes.html). + +### 5. Are unique constraints with invisible indexes still in effect? +Regardless of index visibility, unique indexes still prevent checks for +duplicate values when inserting or updating data. Creating a foreign key +constraint requires unique indexes or constraints on the parent table. Foreign +key constraints are still enforced even if the unique indexes on the parent +table become invisible; if a column in a child table is referencing another +column in the parent table, then this value in the parent table must exist. + +This behavior aligns with [MySQL](https://dev.mysql.com/doc/refman/8.0/en/invisible-indexes.html). + +### 6. Scope of Invisibility: to what extent should the optimizer ignore invisible indexes? Should constraint check use or ignore invisible indexes? +Consider the following situation. Creating a child table requires a parent table +to have a unique index on the FK columns. What happens if the unique index is +invisible here? What happens if this unique index is changed to invisible after +the child table has been created? Consider another case. What happens if INSERT +ON CONFLICT is performed on an invisible unique index? + +The first option would be to ignore the invisible index completely. However, +this means that when insert on the child table may require a full table scan to +police the foreign key check. The same situation applies if a parent table +performs delete or update, or if a child table performs insert, upsert, or +update. This would not only lead to performance issues; having a unique +constraint was necessary to create the child table or to perform INSERT ON +CONFLICT. If the index becomes invisible, does it really make sense to allow +these operations? Overall, this option is not viable. + +The second option would be to throw an error when an operation requires a +constraint check using invisible indexes. The justification behind this option +would be if someone wants to test the impact of dropping an index, this would be +the expected behavior. + +However, if someone wants to drop an index, it does not make sense if they still +want to have a foreign key constraint on it or to perform `INSERT ON CONFLICT`. +In addition, this makes this feature much more limited. As described above in +the motivation section, there are other use cases other than testing the impact +of dropping an index. For example, this feature is also helpful for a staged +rollout of a new index. Throwing an error with `INSERT ON CONFLICT` could lead +to confusion. + +The only option left with us is to allow the optimizer to still use invisible +indexes while policing foreign key constraints or unique constraints. This +obviously has some drawbacks; users can no longer expect dropping an index to +behave exactly the same as marking an index as invisible. But we will try our +best to document this well and log messages on occasions where they cannot +expect the same behavior. On the bright side, this should be the more +standardized way based on MySQL and Oracle. + +We should log warning messages on occasions where users cannot expect the invisible index to be equivalent to dropping an index. +We will log this message: +- if users are changing an existing visible index to invisible or if users are dropping an invisible index +- if this invisible index may be used to police constraint check + - when this invisible index is unique + - or when this invisible index is on a child table, and the first column stored by the index is part of the FK constraint. + +**Conclusion** + +The optimizer will treat all invisible indexes as they are visible for any +unique or foreign key constraint purposes. + +### 7. How to observe the impact on invisible indexes? +- SQL statements or queries will have different execution plans. + - You can see this using `EXPLAIN`. + - If you want to know whether invisible indexes are used for constraint check, + you can use `EXPLAIN (VERBOSE)` and check if `disabled not visible index + feature` is set as part of the scan flags. +- Queries or workload will have different performance. + +# Technical Design +## 1. How does the optimizer support this feature? +As discussed above, to fully support the invisible index feature, we need to +ignore the invisible index unless it is used for constraint check or used during +force index. + +First, let’s ignore the part where we need to disable the invisible index +feature and focus on how the optimizer will ignore invisible indexes in general. + +During exploration, the optimizer will explore every possible query plan using +transformation rules. While constructing equivalent memo groups, the optimizer +will enumerate indexes on a given Scan operator’s table using `ForEach` under +`pkg/sql/opt/xform/scan_index_iter.go`. This is where we can hide the index away +from the optimizer. While enumerating every index, the optimizer can check if +the index is invisible and ignore if it is. The optimizer can effectively ignore +the invisible index by blocking the creation of query plans with invisible +indexes. + +Second, let’s think about what happens when force index is used with invisible +index. Force index will override the invisible index feature. We will just need +to check if the flag for force index is set before ignoring invisible indexes +during exploration. + +Third, let’s think about how to disable invisible index features during +constraint check. During Optbuild, we are constructing scan expression on a +given table using `buildScan` under `pkg/sql/opt/optbuilder/select.go`. We can +add a flag to `ScanPrivate` to indicate if this Scan expression was built for a +constraint check. When the factory constructs the scan expression, this flag +will be passed along as a scan operator property. + +When the optimizer enumerates indexes on a given Scan operator under +`pkg/sql/opt/xform/scan_index_iter.go`, the optimizer can then check if the scan +is built for constraint check before ignoring the invisible index. + +### Foreign key constraint check will be needed: + - When a parent table performs an `UPDATE` or `DELETE` operation, FK check on the child table is needed. + - When a child table performs an `INSERT`, `UPSERT`,or `UPDATE` operation, FK check on the parent table is needed. + - There may be different foreign key actions `[UPDATE | DELETE] [ON CASCADE | SET DEFAULT | SET NULL | NO ACTION | RESTRICT| ON CONSTRAINT]`. +### Unique constraint check will be needed: + - When `INSERT [ON CONFLICT DO NOTHING | DO UPDATE SET | ON CONSTRAINT | DISTINCT ON]` + - When `UPSERT`, `UPDATE` + +## 2. Syntax +### a. CREATE INDEX, CREATE TABLE, ALTER INDEX statements +#### Create Index Statements +```sql +CREATE [UNIQUE | INVERTED] INDEX [CONCURRENTLY] [IF NOT EXISTS] [] + ON ( [ASC | DESC] [, ...] ) + [USING HASH] [STORING ( )] + [PARTITION BY ] + [WITH ] [WHERE ] + [VISIBLE | NOT VISIBLE] +``` +- Example + +```sql +CREATE INDEX a ON b.c (d) VISIBLE +CREATE INDEX a ON b.c (d) NOT VISIBLE + +CREATE INDEX a ON b (c) WITH (fillfactor = 100, y_bounds = 50) VISIBLE +CREATE INDEX a ON b (c) WITH (fillfactor = 100, y_bounds = 50) NOT VISIBLE + +CREATE INDEX geom_idx ON t USING GIST(geom) VISIBLE +CREATE INDEX geom_idx ON t USING GIST(geom) NOT VISIBLE + +CREATE UNIQUE INDEX IF NOT EXISTS a ON b (c) WHERE d > 3 VISIBLE +CREATE UNIQUE INDEX IF NOT EXISTS a ON b (c) WHERE d > 3 NOT VISIBLE +``` + +#### Create Table Statements +```sql +CREATE [[GLOBAL | LOCAL] {TEMPORARY | TEMP}] TABLE [IF NOT EXISTS] [table_element_list] [] +``` + +```sql +table_element_list: index_def +[UNIQUE | INVERTED] INDEX [] ( [ASC | DESC] [, ...] + [USING HASH] [{STORING | INCLUDE | COVERING} ( )] + [PARTITION BY ] + [WITH ] [WHERE ] + [VISIBLE | NOT VISIBLE] +``` + +- Example: +```sql +CREATE TABLE a (b INT8, c STRING, INDEX (b ASC, c DESC) STORING (c) VISIBLE) +CREATE TABLE a (b INT8, c STRING, INDEX (b ASC, c DESC) STORING (c) NOT VISIBLE) + +CREATE TABLE a (b INT, UNIQUE INDEX foo (b) WHERE c > 3 VISIBLE) +CREATE TABLE a (b INT, UNIQUE INDEX foo (b) WHERE c > 3 NOT VISIBLE) +``` + +#### ALTER INDEX Statements +```sql +ALTER INDEX [IF EXISTS] [VISIBLE | NOT VISIBLE] +``` + +```sql +ALTER INDEX a@b VISIBLE +ALTER INDEX a@b NOT VISIBLE +``` + +### b. SHOW INDEX Statements +A new column needs to be added to the output of following SQL statements: +```sql +SHOW INDEX FROM (table_name) +SHOW INDEXES FROM(table_name) +SHOW KEYS FROM (table_name) + +SHOW INDEX FROM DATABASE(database_name) +SHOW INDEXES FROM DATABASE (database_name) +SHOW KEYS FROM DATABASE (database_name) +``` + +``` +table_name index_name non_unique seq_in_index column_name direction storing implicit visible +``` + +### c. Tables that store indexes information +A new column needs to be added to the output of `crdb_internal.table_indexes` and `information_schema.statistics`. +`crdb_internal.table_indexes` +``` +descriptor_id descriptor_name index_id index_name index_type is_unique is_inverted is_sharded ***is_visible*** shard_bucket_count created_at +``` + +`information_schema.statistics` +``` +table_catalog table_schema table_name non_unique index_schema index_name seq_in_index column_name COLLATION cardinality direction storing implicit ***is_visible*** +``` + +## d. Alternative Syntax Considered +### a. CREATE INDEX, CREATE TABLE, ALTER INDEX statements +Invisible index feature is introducing four new user facing syntax. Since +PostgreSQL does not support the invisible index feature yet, we will use MySQL +and Oracle as a reference for the standardized syntax. + +The two options that we have discussed are `NOT VISIBLE` and `INVISIBLE`. + +- Reason why `NOT VISIBLE` is good: CRDB currently supports a similar feature, +invisible column feature. And invisible column feature is using `NOT VISIBLE` +for its syntax. If you are wondering about why the invisible column feature chose +`NOT VISIBLE` over `INVISIBLE`, please look at this PR +https://github.com/cockroachdb/cockroach/pull/26644 for more information. +- Reason why `INVISIBLE` is good: MySQL and Oracle both support `INVISIBLE`. + +**Conclusion**: we have decided that being consistent internally with what CRDB +already has is more important than being consistent with other database engines. + +There has been discussion about supporting INVISIBLE as an alias. But this could +lead to more issues: +1. If we support INVISIBLE as an alias for invisible index +feature, we would have to support INVISIBLE as an alias for the invisible column +feature as well. There are some technical issues in the grammar to do that. +2. If users are migrating from other database to CRDB, they would need to rewrite +their SQL anyway. +3. This might lead to confusion when user tries to create invisible columns or + indexes. Overall, supporting `INVISIBLE` as an alias doesn't seem to provide + a large benefit. + + +### b. SHOW INDEX Statements +The three options are `is_hidden`, `is_visible`, and `visible`. + +- Reason why `is_hidden` is good: invisible column feature is using `is_hidden` for [`SHOW COLUMNS`](https://www.cockroachlabs.com/docs/stable/show-columns.html). +- Reason why `visible` is good: this is more consistent what we chose with the first syntax --- VISIBLE | NOT VISIBLE. MySQL is also using [`visible`](https://dev.mysql.com/doc/refman/8.0/en/show-index.html). +- Reason why `is_visible` is bad: less consistent with other columns in [`SHOW INDEX`](https://www.cockroachlabs.com/docs/stable/show-index.html) such as `storing, implicit, non_unique`. + +**Conclusion**: `visible` it is more important to stay consistent with the first user-facing syntax. + +### c. Tables that store indexes information: `crdb_internal.table_indexes` and `information_schema.statistics` +The three options are `is_hidden`, `is_visible`, and `visible`. + +- Reason why `hidden` is good: Invisible column feature uses `hidden` for `table_columns`. +- Reason why `is_visible` is good: MySQL uses `is_visible`. Also, this is more consistent with other columns in `table_indexes`, such as is_unique, is_inverted, is_sharded. +- Reason why `visibility` is good: Oracle uses `visibility`. + +- **Conclusion**: `is_visible` it is more important to stay consistent with the second-user facing syntax. + +### d. Index Descriptor + +We are also introducing another field in the index descriptor (just for internal +use). The options are `Hidden`, `Invisible`, or `NotVisible`. The invisible +column feature is using `Hidden` in the column descriptor. Using visible or +visibility would be odd as well since the default boolean value is false (by +default, index should be visible). + +- **Conclusion**: `NotVisible`. Since we chose `visible` for all invisible index features above, choosing `NotVisible` or `Invisible` here is more consistent. `NotVisible` is preferred here because we are trying to stay away from the keyword `Invisible` to avoid confusion for the first user-facing syntax. + +For more context on how this conclusion was drawn, please see https://github.com/cockroachdb/cockroach/pull/83388 and this RFC PR’s discussion + +# Fine Grained Control of Index Visibility +As of now, the plan is to introduce the general feature of invisible index +first. The design and implementation details for fine-grained control of index +visibility will be discussed in the future. + +Later on, we want to extend this feature and allow a more fine-grained control +of index visibility by introducing the following two features. + +1. Indexes are not restricted to just being visible or invisible; users can + experiment with different levels of visibility. In other words, instead of + using a boolean invisible flag, users can set a float invisible flag between + 0.0 and 100.0. The index would be made invisible only to a corresponding + fraction of queries. Related: + https://github.com/cockroachdb/cockroach/issues/72576#issuecomment-1034301996 + +2. Different sessions of users can set different index visibility. + Related: https://github.com/cockroachdb/cockroach/issues/82363 + +3. We can consider introducing another session variable or another type of + indexes that provides the exact same behaviour as dropping an index. From 091c13b303eed06ab6d2ebc6412a6c146cfcf05a Mon Sep 17 00:00:00 2001 From: Lidor Carmel Date: Mon, 15 Aug 2022 10:59:31 -0700 Subject: [PATCH 02/11] allocator: rename candidate selection function In preparation for adding a new selection function for a good enough candidate, rename the existing "good" to "best". Release note: None --- .../allocator/allocatorimpl/allocator.go | 2 +- .../allocatorimpl/allocator_scorer.go | 8 +- .../allocatorimpl/allocator_scorer_test.go | 120 +++++++++--------- 3 files changed, 65 insertions(+), 65 deletions(-) diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index cfb40e9a7295..6fda3a73727f 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -1101,7 +1101,7 @@ func (a Allocator) RemoveTarget( ) log.VEventf(ctx, 3, "remove %s: %s", targetType, rankedCandidates) - if bad := rankedCandidates.selectBad(a.randGen); bad != nil { + if bad := rankedCandidates.selectWorst(a.randGen); bad != nil { for _, exist := range existingReplicas { if exist.StoreID == bad.store.StoreID { log.VEventf(ctx, 3, "remove target: %s", bad) diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go index cb79b1017424..2a30899403d7 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go @@ -836,9 +836,9 @@ func (cl candidateList) betterThan(c candidate) candidateList { return cl } -// selectGood randomly chooses a good candidate store from a sorted (by score -// reversed) candidate list using the provided random generator. -func (cl candidateList) selectGood(randGen allocatorRand) *candidate { +// selectBest randomly chooses one of the best candidate stores from a sorted +// (by score reversed) candidate list using the provided random generator. +func (cl candidateList) selectBest(randGen allocatorRand) *candidate { cl = cl.best() if len(cl) == 0 { return nil @@ -1570,7 +1570,7 @@ func bestRebalanceTarget( if len(option.candidates) == 0 { continue } - target := option.candidates.selectGood(randGen) + target := option.candidates.selectBest(randGen) if target == nil { continue } diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer_test.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer_test.go index f6d6e1111ab9..a1f243ce0422 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer_test.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer_test.go @@ -95,9 +95,9 @@ func TestOnlyValidAndHealthyDisk(t *testing.T) { } } -// TestSelectGoodPanic is a basic regression test against a former panic in -// selectGood when called with just invalid/full stores. -func TestSelectGoodPanic(t *testing.T) { +// TestSelectBestPanic is a basic regression test against a former panic in +// selectBest when called with just invalid/full stores. +func TestSelectBestPanic(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -107,12 +107,12 @@ func TestSelectGoodPanic(t *testing.T) { }, } allocRand := makeAllocatorRand(rand.NewSource(0)) - if good := cl.selectGood(allocRand); good != nil { - t.Errorf("cl.selectGood() got %v, want nil", good) + if good := cl.selectBest(allocRand); good != nil { + t.Errorf("cl.selectBest() got %v, want nil", good) } } -// TestCandidateSelection tests select{good,bad} and {best,worst}constraints. +// TestCandidateSelection tests select{best,worst} and {best,worst}constraints. func TestCandidateSelection(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -153,60 +153,60 @@ func TestCandidateSelection(t *testing.T) { } testCases := []struct { - candidates []scoreTuple - best []scoreTuple - worst []scoreTuple - good scoreTuple - bad scoreTuple + candidates []scoreTuple + best []scoreTuple + worst []scoreTuple + bestChosen scoreTuple + worstChosen scoreTuple }{ { - candidates: []scoreTuple{{0, 0}}, - best: []scoreTuple{{0, 0}}, - worst: []scoreTuple{{0, 0}}, - good: scoreTuple{0, 0}, - bad: scoreTuple{0, 0}, + candidates: []scoreTuple{{0, 0}}, + best: []scoreTuple{{0, 0}}, + worst: []scoreTuple{{0, 0}}, + bestChosen: scoreTuple{0, 0}, + worstChosen: scoreTuple{0, 0}, }, { - candidates: []scoreTuple{{0, 0}, {0, 1}}, - best: []scoreTuple{{0, 0}, {0, 1}}, - worst: []scoreTuple{{0, 0}, {0, 1}}, - good: scoreTuple{0, 0}, - bad: scoreTuple{0, 1}, + candidates: []scoreTuple{{0, 0}, {0, 1}}, + best: []scoreTuple{{0, 0}, {0, 1}}, + worst: []scoreTuple{{0, 0}, {0, 1}}, + bestChosen: scoreTuple{0, 0}, + worstChosen: scoreTuple{0, 1}, }, { - candidates: []scoreTuple{{0, 0}, {0, 1}, {0, 2}}, - best: []scoreTuple{{0, 0}, {0, 1}, {0, 2}}, - worst: []scoreTuple{{0, 0}, {0, 1}, {0, 2}}, - good: scoreTuple{0, 1}, - bad: scoreTuple{0, 2}, + candidates: []scoreTuple{{0, 0}, {0, 1}, {0, 2}}, + best: []scoreTuple{{0, 0}, {0, 1}, {0, 2}}, + worst: []scoreTuple{{0, 0}, {0, 1}, {0, 2}}, + bestChosen: scoreTuple{0, 1}, + worstChosen: scoreTuple{0, 2}, }, { - candidates: []scoreTuple{{1, 0}, {0, 1}}, - best: []scoreTuple{{1, 0}}, - worst: []scoreTuple{{0, 1}}, - good: scoreTuple{1, 0}, - bad: scoreTuple{0, 1}, + candidates: []scoreTuple{{1, 0}, {0, 1}}, + best: []scoreTuple{{1, 0}}, + worst: []scoreTuple{{0, 1}}, + bestChosen: scoreTuple{1, 0}, + worstChosen: scoreTuple{0, 1}, }, { - candidates: []scoreTuple{{1, 0}, {0, 1}, {0, 2}}, - best: []scoreTuple{{1, 0}}, - worst: []scoreTuple{{0, 1}, {0, 2}}, - good: scoreTuple{1, 0}, - bad: scoreTuple{0, 2}, + candidates: []scoreTuple{{1, 0}, {0, 1}, {0, 2}}, + best: []scoreTuple{{1, 0}}, + worst: []scoreTuple{{0, 1}, {0, 2}}, + bestChosen: scoreTuple{1, 0}, + worstChosen: scoreTuple{0, 2}, }, { - candidates: []scoreTuple{{1, 0}, {1, 1}, {0, 2}}, - best: []scoreTuple{{1, 0}, {1, 1}}, - worst: []scoreTuple{{0, 2}}, - good: scoreTuple{1, 0}, - bad: scoreTuple{0, 2}, + candidates: []scoreTuple{{1, 0}, {1, 1}, {0, 2}}, + best: []scoreTuple{{1, 0}, {1, 1}}, + worst: []scoreTuple{{0, 2}}, + bestChosen: scoreTuple{1, 0}, + worstChosen: scoreTuple{0, 2}, }, { - candidates: []scoreTuple{{1, 0}, {1, 1}, {0, 2}, {0, 3}}, - best: []scoreTuple{{1, 0}, {1, 1}}, - worst: []scoreTuple{{0, 2}, {0, 3}}, - good: scoreTuple{1, 0}, - bad: scoreTuple{0, 3}, + candidates: []scoreTuple{{1, 0}, {1, 1}, {0, 2}, {0, 3}}, + best: []scoreTuple{{1, 0}, {1, 1}}, + worst: []scoreTuple{{0, 2}, {0, 3}}, + bestChosen: scoreTuple{1, 0}, + worstChosen: scoreTuple{0, 3}, }, } @@ -227,24 +227,24 @@ func TestCandidateSelection(t *testing.T) { t.Errorf("expected:%s actual:%s diff:%v", formatter(e), formatter(a), pretty.Diff(e, a)) } }) - t.Run(fmt.Sprintf("good-%s", formatter(cl)), func(t *testing.T) { - good := cl.selectGood(allocRand) - if good == nil { - t.Fatalf("no good candidate found") + t.Run(fmt.Sprintf("select-best-%s", formatter(cl)), func(t *testing.T) { + best := cl.selectBest(allocRand) + if best == nil { + t.Fatalf("no 'best' candidate found") } - actual := scoreTuple{int(good.diversityScore + 0.5), good.rangeCount} - if actual != tc.good { - t.Errorf("expected:%v actual:%v", tc.good, actual) + actual := scoreTuple{int(best.diversityScore + 0.5), best.rangeCount} + if actual != tc.bestChosen { + t.Errorf("expected:%v actual:%v", tc.bestChosen, actual) } }) - t.Run(fmt.Sprintf("bad-%s", formatter(cl)), func(t *testing.T) { - bad := cl.selectBad(allocRand) - if bad == nil { - t.Fatalf("no bad candidate found") + t.Run(fmt.Sprintf("select-worst-%s", formatter(cl)), func(t *testing.T) { + worst := cl.selectWorst(allocRand) + if worst == nil { + t.Fatalf("no 'worst' candidate found") } - actual := scoreTuple{int(bad.diversityScore + 0.5), bad.rangeCount} - if actual != tc.bad { - t.Errorf("expected:%v actual:%v", tc.bad, actual) + actual := scoreTuple{int(worst.diversityScore + 0.5), worst.rangeCount} + if actual != tc.worstChosen { + t.Errorf("expected:%v actual:%v", tc.worstChosen, actual) } }) } From 3ce0fd5483a6bf3d4a5102da9cf9775904f5c572 Mon Sep 17 00:00:00 2001 From: Lidor Carmel Date: Mon, 15 Aug 2022 11:03:43 -0700 Subject: [PATCH 03/11] allocator: select a good enough store for decom/recovery Until now, when decommissioning a node, or when recovering from a dead node, the allocator tries to pick one of the best possible stores as the target for the recovery. Because of that, we sometimes see multiple stores recover replicas to the same store, for example, when decommissioning a node and at the same time adding a new node. This PR changes the way we select a destination store by choosing a random store out of all the stores that are "good enough" for the replica. The risk diversity is still enforced, but we may recover a replica to a store that is considered "over full", for example. Note that during upreplication the allocator will still try to use one of the "best" stores as targets. Fixes: #86265 Release note: None Release justification: a relatively small change, and it can be reverted by setting kv.allocator.recovery_store_selector=best. --- .../allocator/allocatorimpl/allocator.go | 74 ++++++++++- .../allocatorimpl/allocator_scorer.go | 38 +++++- .../allocatorimpl/allocator_scorer_test.go | 48 +++++-- .../allocator/allocatorimpl/allocator_test.go | 122 ++++++++++++------ pkg/kv/kvserver/allocator_impl_test.go | 6 +- pkg/kv/kvserver/replica_command.go | 1 + pkg/kv/kvserver/replicate_queue.go | 6 +- 7 files changed, 239 insertions(+), 56 deletions(-) diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index 6fda3a73727f..bb1b92eb7380 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -87,6 +87,20 @@ var leaseRebalancingAggressiveness = settings.RegisterFloatSetting( settings.NonNegativeFloat, ) +// recoveryStoreSelector controls the strategy for choosing a store to recover +// replicas to: either to any valid store ("good") or to a store that has low +// range count ("best"). With this set to "good", recovering from a dead node or +// from a decommissioning node can be faster, because nodes can send replicas to +// more target stores (instead of multiple nodes sending replicas to a few +// stores with a low range count). +var recoveryStoreSelector = settings.RegisterStringSetting( + settings.SystemOnly, + "kv.allocator.recovery_store_selector", + "if set to 'good', the allocator may recover replicas to any valid store, if set "+ + "to 'best' it will pick one of the most ideal stores", + "good", +) + // AllocatorAction enumerates the various replication adjustments that may be // recommended by the allocator. type AllocatorAction int @@ -850,14 +864,64 @@ type decisionDetails struct { Existing string `json:",omitempty"` } +// CandidateSelector is an interface to select a store from a list of +// candidates. +type CandidateSelector interface { + selectOne(cl candidateList) *candidate +} + +// BestCandidateSelector in used to choose the best store to allocate. +type BestCandidateSelector struct { + randGen allocatorRand +} + +// NewBestCandidateSelector returns a CandidateSelector for choosing the best +// candidate store. +func (a *Allocator) NewBestCandidateSelector() CandidateSelector { + return &BestCandidateSelector{a.randGen} +} + +func (s *BestCandidateSelector) selectOne(cl candidateList) *candidate { + return cl.selectBest(s.randGen) +} + +// GoodCandidateSelector is used to choose a random store out of the stores that +// are good enough. +type GoodCandidateSelector struct { + randGen allocatorRand +} + +// NewGoodCandidateSelector returns a CandidateSelector for choosing a random store +// out of the stores that are good enough. +func (a *Allocator) NewGoodCandidateSelector() CandidateSelector { + return &GoodCandidateSelector{a.randGen} +} + +func (s *GoodCandidateSelector) selectOne(cl candidateList) *candidate { + return cl.selectGood(s.randGen) +} + func (a *Allocator) allocateTarget( ctx context.Context, conf roachpb.SpanConfig, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + replicaStatus ReplicaStatus, targetType TargetReplicaType, ) (roachpb.ReplicationTarget, string, error) { candidateStoreList, aliveStoreCount, throttled := a.StorePool.GetStoreList(storepool.StoreFilterThrottled) + // If the replica is alive we are upreplicating, and in that case we want to + // allocate new replicas on the best possible store. Otherwise, the replica is + // dead or decommissioned, and we want to recover the missing replica as soon + // as possible, and therefore any store that is good enough will be + // considered. + var selector CandidateSelector + if replicaStatus == Alive || recoveryStoreSelector.Get(&a.StorePool.St.SV) == "best" { + selector = a.NewBestCandidateSelector() + } else { + selector = a.NewGoodCandidateSelector() + } + target, details := a.AllocateTargetFromList( ctx, candidateStoreList, @@ -865,6 +929,7 @@ func (a *Allocator) allocateTarget( existingVoters, existingNonVoters, a.ScorerOptions(ctx), + selector, // When allocating a *new* replica, we explicitly disregard nodes with any // existing replicas. This is important for multi-store scenarios as // otherwise, stores on the nodes that have existing replicas are simply @@ -902,8 +967,9 @@ func (a *Allocator) AllocateVoter( ctx context.Context, conf roachpb.SpanConfig, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + replicaStatus ReplicaStatus, ) (roachpb.ReplicationTarget, string, error) { - return a.allocateTarget(ctx, conf, existingVoters, existingNonVoters, VoterTarget) + return a.allocateTarget(ctx, conf, existingVoters, existingNonVoters, replicaStatus, VoterTarget) } // AllocateNonVoter returns a suitable store for a new allocation of a @@ -913,8 +979,9 @@ func (a *Allocator) AllocateNonVoter( ctx context.Context, conf roachpb.SpanConfig, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + replicaStatus ReplicaStatus, ) (roachpb.ReplicationTarget, string, error) { - return a.allocateTarget(ctx, conf, existingVoters, existingNonVoters, NonVoterTarget) + return a.allocateTarget(ctx, conf, existingVoters, existingNonVoters, replicaStatus, NonVoterTarget) } // AllocateTargetFromList returns a suitable store for a new allocation of a @@ -926,6 +993,7 @@ func (a *Allocator) AllocateTargetFromList( conf roachpb.SpanConfig, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, options ScorerOptions, + selector CandidateSelector, allowMultipleReplsPerNode bool, targetType TargetReplicaType, ) (roachpb.ReplicationTarget, string) { @@ -967,7 +1035,7 @@ func (a *Allocator) AllocateTargetFromList( ) log.VEventf(ctx, 3, "allocate %s: %s", targetType, candidates) - if target := candidates.selectGood(a.randGen); target != nil { + if target := selector.selectOne(candidates); target != nil { log.VEventf(ctx, 3, "add target: %s", target) details := decisionDetails{Target: target.compactString()} detailsBytes, err := json.Marshal(details) diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go index 2a30899403d7..6df62e184ae3 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go @@ -778,6 +778,23 @@ func (cl candidateList) best() candidateList { return cl } +// good returns all the elements in a sorted (by score reversed) candidate list +// that share the highest diversity score and are valid. +func (cl candidateList) good() candidateList { + cl = cl.onlyValidAndHealthyDisk() + if len(cl) <= 1 { + return cl + } + for i := 1; i < len(cl); i++ { + if cl[i].necessary == cl[0].necessary && + scoresAlmostEqual(cl[i].diversityScore, cl[0].diversityScore) { + continue + } + return cl[:i] + } + return cl +} + // worst returns all the elements in a sorted (by score reversed) candidate list // that share the lowest constraint score (for instance, the set of candidates // that result in the lowest diversity score for the range, or the set of @@ -858,9 +875,26 @@ func (cl candidateList) selectBest(randGen allocatorRand) *candidate { return best } -// selectBad randomly chooses a bad candidate store from a sorted (by score +// selectGood randomly chooses a good candidate store from a sorted (by score // reversed) candidate list using the provided random generator. -func (cl candidateList) selectBad(randGen allocatorRand) *candidate { +func (cl candidateList) selectGood(randGen allocatorRand) *candidate { + cl = cl.good() + if len(cl) == 0 { + return nil + } + if len(cl) == 1 { + return &cl[0] + } + randGen.Lock() + r := randGen.Intn(len(cl)) + randGen.Unlock() + c := &cl[r] + return c +} + +// selectWorst randomly chooses one of the worst candidate stores from a sorted +// (by score reversed) candidate list using the provided random generator. +func (cl candidateList) selectWorst(randGen allocatorRand) *candidate { cl = cl.worst() if len(cl) == 0 { return nil diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer_test.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer_test.go index a1f243ce0422..f0d49c02eb34 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer_test.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer_test.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/kr/pretty" + "github.com/stretchr/testify/require" ) type storeScore struct { @@ -95,9 +96,8 @@ func TestOnlyValidAndHealthyDisk(t *testing.T) { } } -// TestSelectBestPanic is a basic regression test against a former panic in -// selectBest when called with just invalid/full stores. -func TestSelectBestPanic(t *testing.T) { +// TestNilSelection verifies selection with just invalid/full stores. +func TestNilSelection(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -107,12 +107,11 @@ func TestSelectBestPanic(t *testing.T) { }, } allocRand := makeAllocatorRand(rand.NewSource(0)) - if good := cl.selectBest(allocRand); good != nil { - t.Errorf("cl.selectBest() got %v, want nil", good) - } + require.Nil(t, cl.selectBest(allocRand)) + require.Nil(t, cl.selectGood(allocRand)) } -// TestCandidateSelection tests select{best,worst} and {best,worst}constraints. +// TestCandidateSelection tests select{Best,Good,Worst} and {best,good,worst}constraints. func TestCandidateSelection(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -155,57 +154,73 @@ func TestCandidateSelection(t *testing.T) { testCases := []struct { candidates []scoreTuple best []scoreTuple + good []scoreTuple worst []scoreTuple bestChosen scoreTuple + goodChosen scoreTuple worstChosen scoreTuple }{ { candidates: []scoreTuple{{0, 0}}, best: []scoreTuple{{0, 0}}, + good: []scoreTuple{{0, 0}}, worst: []scoreTuple{{0, 0}}, bestChosen: scoreTuple{0, 0}, + goodChosen: scoreTuple{0, 0}, worstChosen: scoreTuple{0, 0}, }, { candidates: []scoreTuple{{0, 0}, {0, 1}}, best: []scoreTuple{{0, 0}, {0, 1}}, + good: []scoreTuple{{0, 0}, {0, 1}}, worst: []scoreTuple{{0, 0}, {0, 1}}, bestChosen: scoreTuple{0, 0}, + goodChosen: scoreTuple{0, 1}, worstChosen: scoreTuple{0, 1}, }, { candidates: []scoreTuple{{0, 0}, {0, 1}, {0, 2}}, best: []scoreTuple{{0, 0}, {0, 1}, {0, 2}}, + good: []scoreTuple{{0, 0}, {0, 1}, {0, 2}}, worst: []scoreTuple{{0, 0}, {0, 1}, {0, 2}}, - bestChosen: scoreTuple{0, 1}, - worstChosen: scoreTuple{0, 2}, + bestChosen: scoreTuple{0, 0}, + goodChosen: scoreTuple{0, 0}, + worstChosen: scoreTuple{0, 1}, }, { candidates: []scoreTuple{{1, 0}, {0, 1}}, best: []scoreTuple{{1, 0}}, + good: []scoreTuple{{1, 0}}, worst: []scoreTuple{{0, 1}}, bestChosen: scoreTuple{1, 0}, + goodChosen: scoreTuple{1, 0}, worstChosen: scoreTuple{0, 1}, }, { candidates: []scoreTuple{{1, 0}, {0, 1}, {0, 2}}, best: []scoreTuple{{1, 0}}, + good: []scoreTuple{{1, 0}}, worst: []scoreTuple{{0, 1}, {0, 2}}, bestChosen: scoreTuple{1, 0}, + goodChosen: scoreTuple{1, 0}, worstChosen: scoreTuple{0, 2}, }, { candidates: []scoreTuple{{1, 0}, {1, 1}, {0, 2}}, best: []scoreTuple{{1, 0}, {1, 1}}, + good: []scoreTuple{{1, 0}, {1, 1}}, worst: []scoreTuple{{0, 2}}, bestChosen: scoreTuple{1, 0}, + goodChosen: scoreTuple{1, 1}, worstChosen: scoreTuple{0, 2}, }, { candidates: []scoreTuple{{1, 0}, {1, 1}, {0, 2}, {0, 3}}, best: []scoreTuple{{1, 0}, {1, 1}}, + good: []scoreTuple{{1, 0}, {1, 1}}, worst: []scoreTuple{{0, 2}, {0, 3}}, bestChosen: scoreTuple{1, 0}, + goodChosen: scoreTuple{1, 0}, worstChosen: scoreTuple{0, 3}, }, } @@ -218,6 +233,11 @@ func TestCandidateSelection(t *testing.T) { t.Errorf("expected:%s actual:%s diff:%v", formatter(e), formatter(a), pretty.Diff(e, a)) } }) + t.Run(fmt.Sprintf("good-%s", formatter(cl)), func(t *testing.T) { + if a, e := cl.good(), genCandidates(tc.good, 1); !reflect.DeepEqual(a, e) { + t.Errorf("expected:%s actual:%s diff:%v", formatter(e), formatter(a), pretty.Diff(e, a)) + } + }) t.Run(fmt.Sprintf("worst-%s", formatter(cl)), func(t *testing.T) { // Shifting the ids is required to match the end of the list. if a, e := cl.worst(), genCandidates( @@ -237,6 +257,16 @@ func TestCandidateSelection(t *testing.T) { t.Errorf("expected:%v actual:%v", tc.bestChosen, actual) } }) + t.Run(fmt.Sprintf("select-good-%s", formatter(cl)), func(t *testing.T) { + good := cl.selectGood(allocRand) + if good == nil { + t.Fatalf("no 'good' candidate found") + } + actual := scoreTuple{int(good.diversityScore + 0.5), good.rangeCount} + if actual != tc.goodChosen { + t.Errorf("expected:%v actual:%v", tc.goodChosen, actual) + } + }) t.Run(fmt.Sprintf("select-worst-%s", formatter(cl)), func(t *testing.T) { worst := cl.selectWorst(allocRand) if worst == nil { diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go index 8699c700d5bb..3fb2113e9430 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go @@ -559,6 +559,7 @@ func TestAllocatorSimpleRetrieval(t *testing.T) { ctx, simpleSpanConfig, nil /* existingVoters */, nil, /* existingNonVoters */ + Dead, ) if err != nil { t.Fatalf("Unable to perform allocation: %+v", err) @@ -579,6 +580,7 @@ func TestAllocatorNoAvailableDisks(t *testing.T) { ctx, simpleSpanConfig, nil /* existingVoters */, nil, /* existingNonVoters */ + Dead, ) if !roachpb.Empty(result) { t.Errorf("expected nil result: %+v", result) @@ -594,64 +596,84 @@ func TestAllocatorReadAmpCheck(t *testing.T) { ctx := context.Background() type testCase struct { - name string - stores []*roachpb.StoreDescriptor - conf roachpb.SpanConfig - expectedAddTarget roachpb.StoreID - enforcement StoreHealthEnforcement + name string + stores []*roachpb.StoreDescriptor + conf roachpb.SpanConfig + // The expected store to add when replicas are alive. The allocator should + // pick one of the best stores, with low range count. + expectedTargetIfAlive roachpb.StoreID + // The expected store to add when a replica is dead or decommissioning. The + // allocator should pick a store that is good enough, ignoring the range + // count. + expectedTargetIfDead roachpb.StoreID + enforcement StoreHealthEnforcement } tests := []testCase{ { - name: "ignore read amp on allocation when StoreHealthNoAction enforcement", + name: "ignore read amp on allocation when StoreHealthNoAction enforcement", + stores: allStoresHighReadAmp, + conf: emptySpanConfig(), // NB: All stores have high read amp, this should be ignored and // allocate to the store with the lowest range count. - stores: allStoresHighReadAmp, - conf: emptySpanConfig(), - expectedAddTarget: roachpb.StoreID(3), - enforcement: StoreHealthNoAction, + expectedTargetIfAlive: roachpb.StoreID(3), + // Recovery of a dead node can pick any valid store, not necessarily the + // one with the lowest range count. + expectedTargetIfDead: roachpb.StoreID(2), + enforcement: StoreHealthNoAction, }, { name: "ignore read amp on allocation when storeHealthLogOnly enforcement", // NB: All stores have high read amp, this should be ignored and // allocate to the store with the lowest range count. - stores: allStoresHighReadAmp, - conf: emptySpanConfig(), - expectedAddTarget: roachpb.StoreID(3), - enforcement: StoreHealthLogOnly, + stores: allStoresHighReadAmp, + conf: emptySpanConfig(), + expectedTargetIfAlive: roachpb.StoreID(3), + // Recovery of a dead node can pick any valid store, not necessarily the + // one with the lowest range count. + expectedTargetIfDead: roachpb.StoreID(2), + enforcement: StoreHealthLogOnly, }, { name: "ignore read amp on allocation when StoreHealthBlockRebalanceTo enforcement", // NB: All stores have high read amp, this should be ignored and // allocate to the store with the lowest range count. - stores: allStoresHighReadAmp, - conf: emptySpanConfig(), - expectedAddTarget: roachpb.StoreID(3), - enforcement: StoreHealthBlockRebalanceTo, + stores: allStoresHighReadAmp, + conf: emptySpanConfig(), + expectedTargetIfAlive: roachpb.StoreID(3), + // Recovery of a dead node can pick any valid store, not necessarily the + // one with the lowest range count. + expectedTargetIfDead: roachpb.StoreID(2), + enforcement: StoreHealthBlockRebalanceTo, }, { name: "don't allocate to stores when all have high read amp and StoreHealthBlockAll", // NB: All stores have high read amp (limit + 1), none are above the watermark, select the lowest range count. - stores: allStoresHighReadAmp, - conf: emptySpanConfig(), - expectedAddTarget: roachpb.StoreID(3), - enforcement: StoreHealthBlockAll, + stores: allStoresHighReadAmp, + conf: emptySpanConfig(), + expectedTargetIfAlive: roachpb.StoreID(3), + // Recovery of a dead node can pick any valid store, not necessarily the + // one with the lowest range count. + expectedTargetIfDead: roachpb.StoreID(2), + enforcement: StoreHealthBlockAll, }, { name: "allocate to store below the mean when all have high read amp and StoreHealthBlockAll", // NB: All stores have high read amp, however store 1 is below the watermark mean read amp. - stores: allStoresHighReadAmpSkewed, - conf: emptySpanConfig(), - expectedAddTarget: roachpb.StoreID(1), - enforcement: StoreHealthBlockAll, + stores: allStoresHighReadAmpSkewed, + conf: emptySpanConfig(), + expectedTargetIfAlive: roachpb.StoreID(1), + expectedTargetIfDead: roachpb.StoreID(1), + enforcement: StoreHealthBlockAll, }, { name: "allocate to lowest range count store without high read amp when StoreHealthBlockAll enforcement", // NB: Store 1, 2 and 3 have high read amp and are above the watermark, the lowest range count (4) // should be selected. - stores: threeStoresHighReadAmpAscRangeCount, - conf: emptySpanConfig(), - expectedAddTarget: roachpb.StoreID(4), - enforcement: StoreHealthBlockAll, + stores: threeStoresHighReadAmpAscRangeCount, + conf: emptySpanConfig(), + expectedTargetIfAlive: roachpb.StoreID(4), + expectedTargetIfDead: roachpb.StoreID(4), + enforcement: StoreHealthBlockAll, }, } @@ -661,22 +683,45 @@ func TestAllocatorReadAmpCheck(t *testing.T) { for i, test := range tests { t.Run(fmt.Sprintf("%d_%s", i+1, test.name), func(t *testing.T) { - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) + stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, true /* deterministic */) defer stopper.Stop(ctx) sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(test.stores, t) // Enable read disk health checking in candidate exclusion. l0SublevelsThresholdEnforce.Override(ctx, &a.StorePool.St.SV, int64(test.enforcement)) + + // Allocate a voter where all replicas are alive (e.g. up-replicating a valid range). add, _, err := a.AllocateVoter( ctx, test.conf, nil, nil, + Alive, + ) + require.NoError(t, err) + require.Truef(t, + chk(add, test.expectedTargetIfAlive), + "the addition target %+v from AllocateVoter doesn't match expectation", + add) + + // Allocate a voter where we have a dead (or decommissioning) replica. + add, _, err = a.AllocateVoter( + ctx, + test.conf, + nil, + nil, + // Dead and Decommissioning should behave the same here, use either. + func() ReplicaStatus { + if i%2 == 0 { + return Dead + } + return Decommissioning + }(), ) require.NoError(t, err) require.Truef(t, - chk(add, test.expectedAddTarget), + chk(add, test.expectedTargetIfDead), "the addition target %+v from AllocateVoter doesn't match expectation", add) }) @@ -695,6 +740,7 @@ func TestAllocatorTwoDatacenters(t *testing.T) { ctx, multiDCConfigSSD, nil /* existingVoters */, nil, /* existingNonVoters */ + Dead, ) if err != nil { t.Fatalf("Unable to perform allocation: %+v", err) @@ -706,6 +752,7 @@ func TestAllocatorTwoDatacenters(t *testing.T) { NodeID: result1.NodeID, StoreID: result1.StoreID, }}, nil, /* existingNonVoters */ + Dead, ) if err != nil { t.Fatalf("Unable to perform allocation: %+v", err) @@ -729,6 +776,7 @@ func TestAllocatorTwoDatacenters(t *testing.T) { StoreID: result2.StoreID, }, }, nil, /* existingNonVoters */ + Dead, ) if err == nil { t.Errorf("expected error on allocation without available stores: %+v", result3) @@ -762,6 +810,7 @@ func TestAllocatorExistingReplica(t *testing.T) { StoreID: 2, }, }, nil, /* existingNonVoters */ + Dead, ) if err != nil { t.Fatalf("Unable to perform allocation: %+v", err) @@ -865,6 +914,7 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { { result, _, err := a.AllocateVoter( ctx, emptySpanConfig(), tc.existing, nil, + Dead, ) if e, a := tc.expectTargetAllocate, !roachpb.Empty(result); e != a { t.Errorf( @@ -2920,7 +2970,7 @@ func TestAllocatorConstraintsAndVoterConstraints(t *testing.T) { // Allocate the voting replica first, before the non-voter. This is the // order in which we'd expect the allocator to repair a given range. See // TestAllocatorComputeAction. - voterTarget, _, err := a.AllocateVoter(ctx, test.conf, test.existingVoters, test.existingNonVoters) + voterTarget, _, err := a.AllocateVoter(ctx, test.conf, test.existingVoters, test.existingNonVoters, Dead) if test.shouldVoterAllocFail { require.Errorf(t, err, "expected voter allocation to fail; got %v as a valid target instead", voterTarget) } else { @@ -2929,7 +2979,7 @@ func TestAllocatorConstraintsAndVoterConstraints(t *testing.T) { test.existingVoters = append(test.existingVoters, replicas(voterTarget.StoreID)...) } - nonVoterTarget, _, err := a.AllocateNonVoter(ctx, test.conf, test.existingVoters, test.existingNonVoters) + nonVoterTarget, _, err := a.AllocateNonVoter(ctx, test.conf, test.existingVoters, test.existingNonVoters, Dead) if test.shouldNonVoterAllocFail { require.Errorf(t, err, "expected non-voter allocation to fail; got %v as a valid target instead", nonVoterTarget) } else { @@ -3003,7 +3053,7 @@ func TestAllocatorAllocateTargetLocality(t *testing.T) { StoreID: storeID, } } - targetStore, details, err := a.AllocateVoter(ctx, emptySpanConfig(), existingRepls, nil) + targetStore, details, err := a.AllocateVoter(ctx, emptySpanConfig(), existingRepls, nil, Dead) if err != nil { t.Fatal(err) } @@ -3469,7 +3519,7 @@ func TestAllocatorNonVoterAllocationExcludesVoterNodes(t *testing.T) { sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(test.stores, t) - result, _, err := a.AllocateNonVoter(ctx, test.conf, test.existingVoters, test.existingNonVoters) + result, _, err := a.AllocateNonVoter(ctx, test.conf, test.existingVoters, test.existingNonVoters, Dead) if test.shouldFail { require.Error(t, err) require.Regexp(t, test.expError, err) diff --git a/pkg/kv/kvserver/allocator_impl_test.go b/pkg/kv/kvserver/allocator_impl_test.go index 04d174baa883..ba454a8bcf01 100644 --- a/pkg/kv/kvserver/allocator_impl_test.go +++ b/pkg/kv/kvserver/allocator_impl_test.go @@ -252,14 +252,14 @@ func TestAllocatorThrottled(t *testing.T) { defer stopper.Stop(ctx) // First test to make sure we would send the replica to purgatory. - _, _, err := a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil) + _, _, err := a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead) if _, ok := IsPurgatoryError(err); !ok { t.Fatalf("expected a purgatory error, got: %+v", err) } // Second, test the normal case in which we can allocate to the store. gossiputil.NewStoreGossiper(g).GossipStores(singleStore, t) - result, _, err := a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil) + result, _, err := a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead) if err != nil { t.Fatalf("unable to perform allocation: %+v", err) } @@ -276,7 +276,7 @@ func TestAllocatorThrottled(t *testing.T) { } storeDetail.ThrottledUntil = timeutil.Now().Add(24 * time.Hour) a.StorePool.DetailsMu.Unlock() - _, _, err = a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil) + _, _, err = a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead) if _, ok := IsPurgatoryError(err); ok { t.Fatalf("expected a non purgatory error, got: %+v", err) } diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 4d669222f666..4bd0a531071a 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -3226,6 +3226,7 @@ func (r *Replica) relocateOne( existingVoters, existingNonVoters, r.store.allocator.ScorerOptions(ctx), + r.store.allocator.NewBestCandidateSelector(), // NB: Allow the allocator to return target stores that might be on the // same node as an existing replica. This is to ensure that relocations // that require "lateral" movement of replicas within a node can succeed. diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index cdd55b16f6a7..329cec560dc3 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -991,7 +991,7 @@ func (rq *replicateQueue) addOrReplaceVoters( // we're removing it (i.e. dead or decommissioning). If we left the replica in // the slice, the allocator would not be guaranteed to pick a replica that // fills the gap removeRepl leaves once it's gone. - newVoter, details, err := rq.allocator.AllocateVoter(ctx, conf, remainingLiveVoters, remainingLiveNonVoters) + newVoter, details, err := rq.allocator.AllocateVoter(ctx, conf, remainingLiveVoters, remainingLiveNonVoters, replicaStatus) if err != nil { return false, err } @@ -1023,7 +1023,7 @@ func (rq *replicateQueue) addOrReplaceVoters( oldPlusNewReplicas, roachpb.ReplicaDescriptor{NodeID: newVoter.NodeID, StoreID: newVoter.StoreID}, ) - _, _, err := rq.allocator.AllocateVoter(ctx, conf, oldPlusNewReplicas, remainingLiveNonVoters) + _, _, err := rq.allocator.AllocateVoter(ctx, conf, oldPlusNewReplicas, remainingLiveNonVoters, replicaStatus) if err != nil { // It does not seem possible to go to the next odd replica state. Note // that AllocateVoter returns an allocatorError (a PurgatoryError) @@ -1106,7 +1106,7 @@ func (rq *replicateQueue) addOrReplaceNonVoters( desc, conf := repl.DescAndSpanConfig() existingNonVoters := desc.Replicas().NonVoterDescriptors() - newNonVoter, details, err := rq.allocator.AllocateNonVoter(ctx, conf, liveVoterReplicas, liveNonVoterReplicas) + newNonVoter, details, err := rq.allocator.AllocateNonVoter(ctx, conf, liveVoterReplicas, liveNonVoterReplicas, replicaStatus) if err != nil { return false, err } From a0d6fa448c418707f5d6f8d84e002c024356d061 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Thu, 18 Aug 2022 12:14:21 +0000 Subject: [PATCH 04/11] clusterversion: rework final minted version check Previously this isReleaseBranch could be easily forgotten during the release process leaving the check disabled. Now it will fail loudly if the binary version looks like a minted final ensuring we remember to set it. Release note: none. Release justification: low risk, high benefit change to make sure an existing safety check is used. --- pkg/clusterversion/cockroach_versions.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 0df42e33dbab..dd68b697b4e0 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -154,7 +154,7 @@ type Key int // //go:generate stringer -type=Key const ( - _ Key = iota - 1 // want first named one to start at zero + invalidVersionKey Key = iota - 1 // want first named one to start at zero // V21_2 is CockroachDB v21.2. It's used for all v21.2.x patch releases. V21_2 @@ -497,11 +497,12 @@ var ( ) func init() { - const isReleaseBranch = false - if isReleaseBranch { - if binaryVersion != ByKey(V21_2) { - panic("unexpected cluster version greater than release's binary version") + if finalVersion > invalidVersionKey { + if binaryVersion != ByKey(finalVersion) { + panic("binary version does not match final version") } + } else if binaryVersion.Internal == 0 { + panic("a non-upgrade cluster version must be the final version") } } From 07cb344e58fccc418c6f97dd66a5496b162fe22a Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Thu, 25 Aug 2022 11:05:33 -0400 Subject: [PATCH 05/11] kv: Include error information in `crdb_internal.active_range_feeds` Include error count, and the last error information in `crdb_internal.active_range_feeds` table whenever rangefeed disconnects due to an error. Release justification: observability improvement. Release note: None --- pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go | 12 ++++++++++++ pkg/sql/crdb_internal.go | 12 +++++++++++- .../logictest/testdata/logic_test/create_statements | 8 ++++++-- 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 8ea19348e216..aeab508e8a01 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -225,6 +225,8 @@ type PartialRangeFeed struct { CreatedTime time.Time LastValueReceived time.Time Resolved hlc.Timestamp + NumErrs int + LastErr error } // ActiveRangeFeedIterFn is an iterator function which is passed PartialRangeFeed structure. @@ -280,6 +282,14 @@ func (a *activeRangeFeed) onRangeEvent( a.RangeID = rangeID } +func (a *activeRangeFeed) setLastError(err error) { + a.Lock() + defer a.Unlock() + a.LastErr = errors.Wrapf(err, "disconnect at %s: checkpoint %s/-%s", + timeutil.Now().Format(time.RFC3339), a.Resolved, timeutil.Since(a.Resolved.GoTime())) + a.NumErrs++ +} + // rangeFeedRegistry is responsible for keeping track of currently executing // range feeds. type rangeFeedRegistry struct { @@ -389,6 +399,8 @@ func (ds *DistSender) partialRangeFeed( startAfter.Forward(maxTS) if err != nil { + active.setLastError(err) + if log.V(1) { log.Infof(ctx, "RangeFeed %s disconnected with last checkpoint %s ago: %v", span, timeutil.Since(startAfter.GoTime()), err) diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index d8a7ee973a36..37b0d410983f 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -5664,7 +5664,9 @@ CREATE TABLE crdb_internal.active_range_feeds ( range_start STRING, range_end STRING, resolved STRING, - last_event_utc INT + last_event_utc INT, + num_errs INT, + last_err STRING );`, populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error { return p.execCfg.DistSender.ForEachActiveRangeFeed( @@ -5675,6 +5677,12 @@ CREATE TABLE crdb_internal.active_range_feeds ( } else { lastEvent = tree.NewDInt(tree.DInt(rf.LastValueReceived.UTC().UnixNano())) } + var lastErr tree.Datum + if rf.LastErr == nil { + lastErr = tree.DNull + } else { + lastErr = tree.NewDString(rf.LastErr.Error()) + } return addRow( tree.NewDInt(tree.DInt(rfCtx.ID)), @@ -5688,6 +5696,8 @@ CREATE TABLE crdb_internal.active_range_feeds ( tree.NewDString(keys.PrettyPrint(nil /* valDirs */, rf.Span.EndKey)), tree.NewDString(rf.Resolved.AsOfSystemTime()), lastEvent, + tree.NewDInt(tree.DInt(rf.NumErrs)), + lastErr, ) }, ) diff --git a/pkg/sql/logictest/testdata/logic_test/create_statements b/pkg/sql/logictest/testdata/logic_test/create_statements index 7c528e201e62..319b5df3040f 100644 --- a/pkg/sql/logictest/testdata/logic_test/create_statements +++ b/pkg/sql/logictest/testdata/logic_test/create_statements @@ -43,7 +43,9 @@ CREATE TABLE crdb_internal.active_range_feeds ( range_start STRING NULL, range_end STRING NULL, resolved STRING NULL, - last_event_utc INT8 NULL + last_event_utc INT8 NULL, + num_errs INT8 NULL, + last_err STRING NULL ) CREATE TABLE crdb_internal.active_range_feeds ( id INT8 NULL, tags STRING NULL, @@ -55,7 +57,9 @@ CREATE TABLE crdb_internal.active_range_feeds ( range_start STRING NULL, range_end STRING NULL, resolved STRING NULL, - last_event_utc INT8 NULL + last_event_utc INT8 NULL, + num_errs INT8 NULL, + last_err STRING NULL ) {} {} CREATE TABLE crdb_internal.backward_dependencies ( descriptor_id INT8 NULL, From f4f3dc43435f93219c98a468d7433e47f3d55585 Mon Sep 17 00:00:00 2001 From: j82w Date: Thu, 25 Aug 2022 14:50:53 -0400 Subject: [PATCH 06/11] sql: fix cluster_execution_insights priority level closes #86900, closes #86867 Release justification: Category 2: Bug fixes and low-risk updates to new functionality Release note (sql change): Fix the insight execution priority to have correct value instead of always being default. Changed the column to be a string to avoid converting it in the ui. --- pkg/sql/conn_executor_exec.go | 1 + pkg/sql/crdb_internal.go | 4 +- .../testdata/logic_test/create_statements | 8 +- pkg/sql/sqlstats/insights/insights.proto | 2 +- .../insights/integration/insights_test.go | 106 ++++++++++++++++++ .../sqlstats/ssmemstorage/ss_mem_writer.go | 3 +- pkg/sql/sqlstats/ssprovider.go | 1 + 7 files changed, 117 insertions(+), 8 deletions(-) diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index ade924808a02..e0512349e242 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -2296,6 +2296,7 @@ func (ex *connExecutor) recordTransactionFinish( RowsRead: ex.extraTxnState.rowsRead, RowsWritten: ex.extraTxnState.rowsWritten, BytesRead: ex.extraTxnState.bytesRead, + Priority: ex.state.priority, } if ex.server.cfg.TestingKnobs.OnRecordTxnFinish != nil { diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index d8a7ee973a36..a9b81ba63cb0 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -6344,7 +6344,7 @@ CREATE TABLE crdb_internal.%s ( plan_gist STRING NOT NULL, rows_read INT8 NOT NULL, rows_written INT8 NOT NULL, - priority FLOAT NOT NULL, + priority STRING NOT NULL, retries INT8 NOT NULL, last_retry_reason STRING, exec_node_ids INT[] NOT NULL, @@ -6456,7 +6456,7 @@ func populateExecutionInsights( tree.NewDString(insight.Statement.PlanGist), tree.NewDInt(tree.DInt(insight.Statement.RowsRead)), tree.NewDInt(tree.DInt(insight.Statement.RowsWritten)), - tree.NewDFloat(tree.DFloat(insight.Transaction.UserPriority)), + tree.NewDString(insight.Transaction.UserPriority), tree.NewDInt(tree.DInt(insight.Statement.Retries)), autoRetryReason, execNodeIDs, diff --git a/pkg/sql/logictest/testdata/logic_test/create_statements b/pkg/sql/logictest/testdata/logic_test/create_statements index 7c528e201e62..2a8a57affc42 100644 --- a/pkg/sql/logictest/testdata/logic_test/create_statements +++ b/pkg/sql/logictest/testdata/logic_test/create_statements @@ -261,7 +261,7 @@ CREATE TABLE crdb_internal.cluster_execution_insights ( plan_gist STRING NOT NULL, rows_read INT8 NOT NULL, rows_written INT8 NOT NULL, - priority FLOAT8 NOT NULL, + priority STRING NOT NULL, retries INT8 NOT NULL, last_retry_reason STRING NULL, exec_node_ids INT8[] NOT NULL, @@ -285,7 +285,7 @@ CREATE TABLE crdb_internal.cluster_execution_insights ( plan_gist STRING NOT NULL, rows_read INT8 NOT NULL, rows_written INT8 NOT NULL, - priority FLOAT8 NOT NULL, + priority STRING NOT NULL, retries INT8 NOT NULL, last_retry_reason STRING NULL, exec_node_ids INT8[] NOT NULL, @@ -977,7 +977,7 @@ CREATE TABLE crdb_internal.node_execution_insights ( plan_gist STRING NOT NULL, rows_read INT8 NOT NULL, rows_written INT8 NOT NULL, - priority FLOAT8 NOT NULL, + priority STRING NOT NULL, retries INT8 NOT NULL, last_retry_reason STRING NULL, exec_node_ids INT8[] NOT NULL, @@ -1001,7 +1001,7 @@ CREATE TABLE crdb_internal.node_execution_insights ( plan_gist STRING NOT NULL, rows_read INT8 NOT NULL, rows_written INT8 NOT NULL, - priority FLOAT8 NOT NULL, + priority STRING NOT NULL, retries INT8 NOT NULL, last_retry_reason STRING NULL, exec_node_ids INT8[] NOT NULL, diff --git a/pkg/sql/sqlstats/insights/insights.proto b/pkg/sql/sqlstats/insights/insights.proto index 31c4a2fac5f5..214e97e749ce 100644 --- a/pkg/sql/sqlstats/insights/insights.proto +++ b/pkg/sql/sqlstats/insights/insights.proto @@ -56,7 +56,7 @@ message Transaction { [(gogoproto.customname) = "FingerprintID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.TransactionFingerprintID", (gogoproto.nullable) = false]; - double user_priority = 3; + string user_priority = 3; } message Statement { diff --git a/pkg/sql/sqlstats/insights/integration/insights_test.go b/pkg/sql/sqlstats/insights/integration/insights_test.go index 129aa003ecc0..2c4aa4da337d 100644 --- a/pkg/sql/sqlstats/insights/integration/insights_test.go +++ b/pkg/sql/sqlstats/insights/integration/insights_test.go @@ -123,6 +123,112 @@ func TestInsightsIntegration(t *testing.T) { }, 1*time.Second) } +func TestInsightsPriorityIntegration(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + const appName = "TestInsightsPriorityIntegration" + + // Start the cluster. (One node is sufficient; the outliers system is currently in-memory only.) + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + args := base.TestClusterArgs{ServerArgs: base.TestServerArgs{Settings: settings}} + tc := testcluster.StartTestCluster(t, 1, args) + defer tc.Stopper().Stop(ctx) + conn := tc.ServerConn(0) + + // Enable detection by setting a latencyThreshold > 0. + latencyThreshold := 50 * time.Millisecond + insights.LatencyThreshold.Override(ctx, &settings.SV, latencyThreshold) + + _, err := conn.ExecContext(ctx, "SET SESSION application_name=$1", appName) + require.NoError(t, err) + + _, err = conn.Exec("CREATE TABLE t (id string, s string);") + require.NoError(t, err) + + queryDelayInSeconds := 2 * latencyThreshold.Seconds() + // Execute a "long-running" statement, running longer than our latencyThreshold. + _, err = conn.ExecContext(ctx, "SELECT pg_sleep($1)", queryDelayInSeconds) + require.NoError(t, err) + + var priorities = []struct { + setPriorityQuery string + query string + queryNoValues string + expectedPriorityValue string + }{ + { + setPriorityQuery: "SET TRANSACTION PRIORITY LOW", + query: "INSERT INTO t(id, s) VALUES ('test', 'originalValue')", + queryNoValues: "INSERT INTO t(id, s) VALUES ('_', '_')", + expectedPriorityValue: "low", + }, + { + setPriorityQuery: "SET TRANSACTION PRIORITY NORMAL", + query: "UPDATE t set s = 'updatedValue' where id = 'test'", + queryNoValues: "UPDATE t SET s = '_' WHERE id = '_'", + expectedPriorityValue: "normal", + }, + { + setPriorityQuery: "SELECT 1", // use a dummy query to validate default scenario + query: "UPDATE t set s = 'updatedValue'", + queryNoValues: "UPDATE t SET s = '_'", + expectedPriorityValue: "normal", + }, + { + setPriorityQuery: "SET TRANSACTION PRIORITY HIGH", + query: "DELETE FROM t WHERE t.s = 'originalValue'", + queryNoValues: "DELETE FROM t WHERE t.s = '_'", + expectedPriorityValue: "high", + }, + } + + for _, p := range priorities { + testutils.SucceedsWithin(t, func() error { + tx, errTxn := conn.BeginTx(ctx, &gosql.TxOptions{}) + require.NoError(t, errTxn) + + _, errTxn = tx.ExecContext(ctx, p.setPriorityQuery) + require.NoError(t, errTxn) + + _, errTxn = tx.ExecContext(ctx, p.query) + require.NoError(t, errTxn) + + _, errTxn = tx.ExecContext(ctx, "select pg_sleep(.1);") + require.NoError(t, errTxn) + errTxn = tx.Commit() + require.NoError(t, errTxn) + return nil + }, 2*time.Second) + + testutils.SucceedsWithin(t, func() error { + row := conn.QueryRowContext(ctx, "SELECT "+ + "query, "+ + "priority "+ + "FROM crdb_internal.node_execution_insights where "+ + "app_name = $1 and query = $2 ", appName, p.queryNoValues) + + var query, priority string + err = row.Scan(&query, &priority) + + if err != nil { + return err + } + + if query != p.queryNoValues { + return fmt.Errorf("expected '%s', but was %s", p.queryNoValues, query) + } + + if priority != p.expectedPriorityValue { + return fmt.Errorf("expected '%s', but was %s", p.expectedPriorityValue, priority) + } + + return nil + }, 2*time.Second) + } +} + func TestInsightsIntegrationForContention(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go index d5faea4db507..4c5056637097 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go @@ -313,7 +313,8 @@ func (s *Container) RecordTransaction( s.insights.ObserveTransaction(value.SessionID, &insights.Transaction{ ID: value.TransactionID, - FingerprintID: key}) + FingerprintID: key, + UserPriority: value.Priority.String()}) return nil } diff --git a/pkg/sql/sqlstats/ssprovider.go b/pkg/sql/sqlstats/ssprovider.go index 650781052435..eb4d7b92a66a 100644 --- a/pkg/sql/sqlstats/ssprovider.go +++ b/pkg/sql/sqlstats/ssprovider.go @@ -235,4 +235,5 @@ type RecordedTxnStats struct { RowsRead int64 RowsWritten int64 BytesRead int64 + Priority roachpb.UserPriority } From ebb812e3d71c9428bc1d2dce2594c033f2317262 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Thu, 18 Aug 2022 12:12:22 +0000 Subject: [PATCH 07/11] clusterversion: prevent upgrades from master versions This change defines a new "unstableVersionsAbove" point on the cluster version line, above which any cluster versions are considered unstable development-only versions which are still subject to change. Performing an upgrade to a version while it is still unstable leaves a cluster in a state where it persists a version that claims it has done that upgrade and all prior, however those upgrades are still subject to change by nature of being unstable. If it subsequently upgraded to a stable version, this could result in subtle and nearly impossible to detect issues, as being at or above a particular version is used to assume that all subsequent version upgrades _as released_ were run; on a cluster that ran an earlier iteration of an upgrade this does not hold. Thus to prevent clusters which upgrade to development versions from subsequently upgrading to a stable version, we offset all development versions -- those above the unstableVersionsAbove point -- into the far future by adding one million to their major version e.g. v22.x-y becomes 1000022.x-y. This means an attempt to subsequently "upgrade" to a stable version -- such as v22.2 -- will look like a downgrade and be forbidden. On the release branch, prior to starting to publish upgradable releases, the unstableVersionsAbove value should be set to invalidVersionKey to reflect that all version upgrades in that release branch are now considered to be stable, meaning they must be treated as immutable and append-only. Release note (ops change): clusters that are upgraded to an alpha or other manual build from the development branch will not be able to be subsequently upgraded to a release build. Release justification: high-priority change to existing functionality, to allow releasing alphas with known version upgrade bugs while ensuring they do not subsequently upgrade into stable version but silently corrupted clusters. --- .../settings/settings-for-tenants.txt | 2 +- docs/generated/settings/settings.html | 2 +- pkg/ccl/backupccl/backup_test.go | 4 +- .../testdata/logic_test/crdb_internal_tenant | 4 +- pkg/clusterversion/cockroach_versions.go | 40 +++++++++++++++++-- pkg/clusterversion/key_string.go | 8 ++-- pkg/kv/kvserver/client_migration_test.go | 2 +- pkg/kv/kvserver/stores.go | 3 +- pkg/sql/crdb_internal_test.go | 2 +- .../testdata/logic_test/crdb_internal | 4 +- pkg/sql/ttl/ttljob/ttljob_test.go | 1 - ...efore_starting_an_upgrade_external_test.go | 3 +- .../wait_for_del_range_in_gc_job_test.go | 2 +- 13 files changed, 57 insertions(+), 20 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 72e07afa86b5..36ec622b9565 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -293,4 +293,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https:///#/debug/tracez trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 22.1-62 set the active cluster version in the format '.' +version version 1000022.1-62 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index f53ad01bdb55..43bcf46e8b30 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -227,6 +227,6 @@ trace.opentelemetry.collectorstringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.span_registry.enabledbooleantrueif set, ongoing traces can be seen at https:///#/debug/tracez trace.zipkin.collectorstringthe address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -versionversion22.1-62set the active cluster version in the format '.' +versionversion1000022.1-62set the active cluster version in the format '.' diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index d46e1637b9ce..dde171b6f341 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -8228,7 +8228,7 @@ func TestManifestTooNew(t *testing.T) { require.NoError(t, protoutil.Unmarshal(manifestData, &backupManifest)) // Bump the version and write it back out to make it look newer. - backupManifest.ClusterVersion = roachpb.Version{Major: 99, Minor: 1} + backupManifest.ClusterVersion = roachpb.Version{Major: math.MaxInt32, Minor: 1} manifestData, err = protoutil.Marshal(&backupManifest) require.NoError(t, err) require.NoError(t, os.WriteFile(manifestPath, manifestData, 0644 /* perm */)) @@ -8238,7 +8238,7 @@ func TestManifestTooNew(t *testing.T) { require.NoError(t, os.WriteFile(manifestPath+backupinfo.BackupManifestChecksumSuffix, checksum, 0644 /* perm */)) // Verify we reject it. - sqlDB.ExpectErr(t, "backup from version 99.1 is newer than current version", `RESTORE DATABASE r1 FROM 'nodelocal://0/too_new'`) + sqlDB.ExpectErr(t, "backup from version 2147483647.1 is newer than current version", `RESTORE DATABASE r1 FROM 'nodelocal://0/too_new'`) // Bump the version down and write it back out to make it look older. backupManifest.ClusterVersion = roachpb.Version{Major: 20, Minor: 2, Internal: 2} diff --git a/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant b/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant index 4108928e244e..3d6473ca810a 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant +++ b/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant @@ -360,7 +360,7 @@ select crdb_internal.get_vmodule() · query T -select regexp_replace(crdb_internal.node_executable_version()::string, '(-\d+)?$', ''); +select regexp_replace(regexp_replace(crdb_internal.node_executable_version()::string, '(-\d+)?$', ''), '10000', ''); ---- 22.1 @@ -453,7 +453,7 @@ select * from crdb_internal.gossip_alerts # Anyone can see the executable version. query T -select regexp_replace(crdb_internal.node_executable_version()::string, '(-\d+)?$', ''); +select regexp_replace(regexp_replace(crdb_internal.node_executable_version()::string, '(-\d+)?$', ''), '10000', ''); ---- 22.1 diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index dd68b697b4e0..328224f65e51 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -302,8 +302,8 @@ const TODOPreV21_2 = V21_2 // previously referenced a < 22.1 version until that check/gate can be removed. const TODOPreV22_1 = V22_1 -// versionsSingleton lists all historical versions here in chronological order, -// with comments describing what backwards-incompatible features were +// rawVersionsSingleton lists all historical versions here in chronological +// order, with comments describing what backwards-incompatible features were // introduced. // // A roachpb.Version has the colloquial form MAJOR.MINOR[.PATCH][-INTERNAL], @@ -319,7 +319,11 @@ const TODOPreV22_1 = V22_1 // Such clusters would need to be wiped. As a result, do not bump the major or // minor version until we are absolutely sure that no new migrations will need // to be added (i.e., when cutting the final release candidate). -var versionsSingleton = keyedVersions{ +// +// rawVersionsSingleton is converted to versionsSingleton below, by adding a +// large number to every major if building from master, so as to ensure that +// master builds cannot be upgraded to release-branch builds. +var rawVersionsSingleton = keyedVersions{ { // V21_2 is CockroachDB v21.2. It's used for all v21.2.x patch releases. Key: V21_2, @@ -479,6 +483,36 @@ var versionsSingleton = keyedVersions{ // ************************************************* } +const ( + // unstableVersionsAbove is a cluster version Key above which any upgrades in + // this version are considered unstable development-only versions if it is not + // negative, and upgrading to them should permanently move a cluster to + // development versions. On master it should be the minted version of the last + // release, while on release branches it can be set to invalidVersionKey to + // disable marking any versions as development versions. + unstableVersionsAbove = V22_1 + + // finalVersion should be set on a release branch to the minted final cluster + // version key, e.g. to V22_2 on the release-22.2 branch once it is minted. + // Setting it has the effect of ensuring no versions are subsequently added. + finalVersion = invalidVersionKey +) + +var versionsSingleton = func() keyedVersions { + if unstableVersionsAbove > invalidVersionKey { + const devOffset = 1000000 + // Throw every version above the last release (which will be none on a release + // branch) 1 million major versions into the future, so any "upgrade" to a + // release branch build will be a downgrade and thus blocked. + for i := range rawVersionsSingleton { + if rawVersionsSingleton[i].Key > unstableVersionsAbove { + rawVersionsSingleton[i].Major += devOffset + } + } + } + return rawVersionsSingleton +}() + // TODO(irfansharif): clusterversion.binary{,MinimumSupported}Version // feels out of place. A "cluster version" and a "binary version" are two // separate concepts. diff --git a/pkg/clusterversion/key_string.go b/pkg/clusterversion/key_string.go index 540216642d36..3a25cd0a7915 100644 --- a/pkg/clusterversion/key_string.go +++ b/pkg/clusterversion/key_string.go @@ -8,6 +8,7 @@ func _() { // An "invalid array index" compiler error signifies that the constant values have changed. // Re-run the stringer command to generate them again. var x [1]struct{} + _ = x[invalidVersionKey - -1] _ = x[V21_2-0] _ = x[Start22_1-1] _ = x[ProbeRequest-2] @@ -47,13 +48,14 @@ func _() { _ = x[NoNonMVCCAddSSTable-36] } -const _Key_name = "V21_2Start22_1ProbeRequestEnableSpanConfigStoreEnableNewStoreRebalancerV22_1Start22_2LocalTimestampsPebbleFormatSplitUserKeysMarkedCompactedEnsurePebbleFormatVersionRangeKeysEnablePebbleFormatVersionRangeKeysTrigramInvertedIndexesRemoveGrantPrivilegeMVCCRangeTombstonesUpgradeSequenceToBeReferencedByIDSampledStmtDiagReqsAddSSTableTombstonesSystemPrivilegesTableEnablePredicateProjectionChangefeedAlterSystemSQLInstancesAddLocalitySystemExternalConnectionsTableAlterSystemStatementStatisticsAddIndexRecommendationsRoleIDSequenceAddSystemUserIDColumnSystemUsersIDColumnIsBackfilledSetSystemUsersUserIDColumnNotNullSQLSchemaTelemetryScheduledJobsSchemaChangeSupportsCreateFunctionDeleteRequestReturnKeyPebbleFormatPrePebblev1MarkedRoleOptionsTableHasIDColumnRoleOptionsIDColumnIsBackfilledSetRoleOptionsUserIDColumnNotNullUseDelRangeInGCJobWaitedForDelRangeInGCJobRangefeedUseOneStreamPerNodeNoNonMVCCAddSSTable" +const _Key_name = "invalidVersionKeyV21_2Start22_1ProbeRequestEnableSpanConfigStoreEnableNewStoreRebalancerV22_1Start22_2LocalTimestampsPebbleFormatSplitUserKeysMarkedCompactedEnsurePebbleFormatVersionRangeKeysEnablePebbleFormatVersionRangeKeysTrigramInvertedIndexesRemoveGrantPrivilegeMVCCRangeTombstonesUpgradeSequenceToBeReferencedByIDSampledStmtDiagReqsAddSSTableTombstonesSystemPrivilegesTableEnablePredicateProjectionChangefeedAlterSystemSQLInstancesAddLocalitySystemExternalConnectionsTableAlterSystemStatementStatisticsAddIndexRecommendationsRoleIDSequenceAddSystemUserIDColumnSystemUsersIDColumnIsBackfilledSetSystemUsersUserIDColumnNotNullSQLSchemaTelemetryScheduledJobsSchemaChangeSupportsCreateFunctionDeleteRequestReturnKeyPebbleFormatPrePebblev1MarkedRoleOptionsTableHasIDColumnRoleOptionsIDColumnIsBackfilledSetRoleOptionsUserIDColumnNotNullUseDelRangeInGCJobWaitedForDelRangeInGCJobRangefeedUseOneStreamPerNodeNoNonMVCCAddSSTable" -var _Key_index = [...]uint16{0, 5, 14, 26, 47, 71, 76, 85, 100, 140, 174, 208, 230, 250, 269, 302, 321, 341, 362, 397, 431, 461, 514, 528, 549, 580, 613, 644, 678, 700, 729, 756, 787, 820, 838, 862, 890, 909} +var _Key_index = [...]uint16{0, 17, 22, 31, 43, 64, 88, 93, 102, 117, 157, 191, 225, 247, 267, 286, 319, 338, 358, 379, 414, 448, 478, 531, 545, 566, 597, 630, 661, 695, 717, 746, 773, 804, 837, 855, 879, 907, 926} func (i Key) String() string { + i -= -1 if i < 0 || i >= Key(len(_Key_index)-1) { - return "Key(" + strconv.FormatInt(int64(i), 10) + ")" + return "Key(" + strconv.FormatInt(int64(i+-1), 10) + ")" } return _Key_name[_Key_index[i]:_Key_index[i+1]] } diff --git a/pkg/kv/kvserver/client_migration_test.go b/pkg/kv/kvserver/client_migration_test.go index ddb17fca56a6..6efacf3226ad 100644 --- a/pkg/kv/kvserver/client_migration_test.go +++ b/pkg/kv/kvserver/client_migration_test.go @@ -55,7 +55,7 @@ func TestStorePurgeOutdatedReplicas(t *testing.T) { t.Run(fmt.Sprintf("with-initial-version=%t", withInitialVersion), func(t *testing.T) { const numStores = 3 ctx := context.Background() - migrationVersion := roachpb.Version{Major: 42} + migrationVersion := roachpb.Version{Major: 1000042} storeKnobs := &kvserver.StoreTestingKnobs{ DisableEagerReplicaRemoval: true, diff --git a/pkg/kv/kvserver/stores.go b/pkg/kv/kvserver/stores.go index fde593a1c248..60cdd1b6d8fc 100644 --- a/pkg/kv/kvserver/stores.go +++ b/pkg/kv/kvserver/stores.go @@ -13,6 +13,7 @@ package kvserver import ( "context" "fmt" + math "math" "sync" "unsafe" @@ -364,7 +365,7 @@ func SynthesizeClusterVersionFromEngines( origin string } - maxPossibleVersion := roachpb.Version{Major: 999999} // Sort above any real version. + maxPossibleVersion := roachpb.Version{Major: math.MaxInt32} // Sort above any real version. minStoreVersion := originVersion{ Version: maxPossibleVersion, origin: "(no store)", diff --git a/pkg/sql/crdb_internal_test.go b/pkg/sql/crdb_internal_test.go index 6b184003a87f..3208886a1118 100644 --- a/pkg/sql/crdb_internal_test.go +++ b/pkg/sql/crdb_internal_test.go @@ -923,7 +923,7 @@ func TestIsAtLeastVersion(t *testing.T) { errorRE string }{ {version: "21.2", expected: "true"}, - {version: "99.2", expected: "false"}, + {version: "1000099.2", expected: "false"}, {version: "foo", errorRE: ".*invalid version.*"}, } { query := fmt.Sprintf("SELECT crdb_internal.is_at_least_version('%s')", tc.version) diff --git a/pkg/sql/logictest/testdata/logic_test/crdb_internal b/pkg/sql/logictest/testdata/logic_test/crdb_internal index 3e790d4f2350..46f1f3a07cbc 100644 --- a/pkg/sql/logictest/testdata/logic_test/crdb_internal +++ b/pkg/sql/logictest/testdata/logic_test/crdb_internal @@ -534,7 +534,7 @@ select crdb_internal.get_vmodule() query T select regexp_replace(crdb_internal.node_executable_version()::string, '(-\d+)?$', ''); ---- -22.1 +1000022.1 query ITTT colnames select node_id, component, field, regexp_replace(regexp_replace(value, '^\d+$', ''), e':\\d+', ':') as value from crdb_internal.node_runtime_info @@ -693,7 +693,7 @@ select * from crdb_internal.node_inflight_trace_spans query T select regexp_replace(crdb_internal.node_executable_version()::string, '(-\d+)?$', ''); ---- -22.1 +1000022.1 user root diff --git a/pkg/sql/ttl/ttljob/ttljob_test.go b/pkg/sql/ttl/ttljob/ttljob_test.go index d4e1de778bff..cdc9fdd81878 100644 --- a/pkg/sql/ttl/ttljob/ttljob_test.go +++ b/pkg/sql/ttl/ttljob/ttljob_test.go @@ -420,7 +420,6 @@ func TestRowLevelTTLJobMultipleNodes(t *testing.T) { tableName, ) const rowsPerRange = 10 - const expiredRowsPerRange = rowsPerRange / 2 splitPoints := make([]serverutils.SplitPoint, len(splitAts)) for i, splitAt := range splitAts { newLeaseHolderIdx := (leaseHolderIdx + 1 + i) % numNodes diff --git a/pkg/upgrade/upgrades/precondition_before_starting_an_upgrade_external_test.go b/pkg/upgrade/upgrades/precondition_before_starting_an_upgrade_external_test.go index c2cf38c85d25..413bc086e754 100644 --- a/pkg/upgrade/upgrades/precondition_before_starting_an_upgrade_external_test.go +++ b/pkg/upgrade/upgrades/precondition_before_starting_an_upgrade_external_test.go @@ -14,6 +14,7 @@ import ( "context" gosql "database/sql" "encoding/hex" + "strings" "testing" "github.com/cockroachdb/cockroach/pkg/base" @@ -107,7 +108,7 @@ func TestPreconditionBeforeStartingAnUpgrade(t *testing.T) { "There exists invalid descriptors as listed below. Fix these descriptors before attempting to upgrade again.\n"+ "Invalid descriptor: defaultdb.public.t (104) because 'relation \"t\" (104): invalid depended-on-by relation back reference: referenced descriptor ID 53: referenced descriptor not found'\n"+ "Invalid descriptor: defaultdb.public.temp_tbl (104) because 'no matching name info found in non-dropped relation \"t\"'", - err.Error()) + strings.ReplaceAll(err.Error(), "1000022", "22")) // The cluster version should remain at `v0`. tdb.CheckQueryResults(t, "SHOW CLUSTER SETTING version", [][]string{{v0.String()}}) }) diff --git a/pkg/upgrade/upgrades/wait_for_del_range_in_gc_job_test.go b/pkg/upgrade/upgrades/wait_for_del_range_in_gc_job_test.go index f8c98867e5cd..41e6101f5258 100644 --- a/pkg/upgrade/upgrades/wait_for_del_range_in_gc_job_test.go +++ b/pkg/upgrade/upgrades/wait_for_del_range_in_gc_job_test.go @@ -94,7 +94,7 @@ SELECT count(*) WHERE job_type = 'SCHEMA CHANGE GC' AND status = 'paused'`, [][]string{{"2"}}) - tdb.ExpectErr(t, `verifying precondition for version 22.1-\d+: `+ + tdb.ExpectErr(t, `verifying precondition for version \d*22.1-\d+: `+ `paused GC jobs prevent upgrading GC job behavior: \[\d+ \d+]`, "SET CLUSTER SETTING version = crdb_internal.node_executable_version()") From 73aa6f225cf28a6ff9c6a66edadbf72c73b09d2f Mon Sep 17 00:00:00 2001 From: Alex Sarkesian Date: Mon, 22 Aug 2022 23:06:14 -0400 Subject: [PATCH 08/11] kvserver: add additional testing to multiqueue Add testing for cancelation of multi-queue requests and fix a bug where the channel wasn't closed on task cancelation. Release justification: Test-only change. Release note: None --- pkg/kv/kvserver/multiqueue/multi_queue.go | 37 ++++---- .../kvserver/multiqueue/multi_queue_test.go | 94 +++++++++++++++++++ 2 files changed, 113 insertions(+), 18 deletions(-) diff --git a/pkg/kv/kvserver/multiqueue/multi_queue.go b/pkg/kv/kvserver/multiqueue/multi_queue.go index 813c07bd3a6a..588c4556de46 100644 --- a/pkg/kv/kvserver/multiqueue/multi_queue.go +++ b/pkg/kv/kvserver/multiqueue/multi_queue.go @@ -120,8 +120,8 @@ type Permit struct { } // tryRunNextLocked will run the next task in order round-robin through the -// queues and in priority order within a queue. It will return true if it ran a -// task. The MultiQueue.mu lock must be held before calling this func. +// queues and in priority order within a queue. +// MultiQueue.mu lock must be held before calling this function. func (m *MultiQueue) tryRunNextLocked() { // If no permits are left, then we can't run anything. if m.remainingRuns <= 0 { @@ -130,7 +130,7 @@ func (m *MultiQueue) tryRunNextLocked() { for i := 0; i < len(m.outstanding); i++ { // Start with the next queue in order and iterate through all empty queues. - // If all queues are empty then return false signaling that nothing was run. + // If all queues are empty then return, as there is nothing to run. index := (m.lastQueueIndex + i + 1) % len(m.outstanding) if m.outstanding[index].Len() > 0 { task := heap.Pop(&m.outstanding[index]).(*Task) @@ -142,7 +142,7 @@ func (m *MultiQueue) tryRunNextLocked() { } } -// Add returns a Task that must be closed (calling Task.Close) to +// Add returns a Task that must be closed (calling m.Release(..)) to // release the Permit. The number of types is expected to // be relatively small and not be changing over time. func (m *MultiQueue) Add(queueType int, priority float64) *Task { @@ -166,10 +166,7 @@ func (m *MultiQueue) Add(queueType int, priority float64) *Task { } heap.Push(&m.outstanding[pos], &newTask) - // Once we are done adding a task, signal the main loop in case it finished - // all its work and was waiting for more work. We are holding the mu lock when - // signaling, so we guarantee that it will not be able to respond to the - // signal until after we release the lock. + // Once we are done adding a task, attempt to signal the next waiting task. m.tryRunNextLocked() return &newTask @@ -184,21 +181,25 @@ func (m *MultiQueue) Cancel(task *Task) { // Task will track its position within the queue. queueIdx := m.mapping[task.queueType] ok := m.outstanding[queueIdx].tryRemove(task) + // Close the permit channel so that waiters stop blocking. + if ok { + close(task.permitC) + return + } // If we get here, we are racing with the task being started. The concern is // that the caller may also call MultiQueue.Release since the task was // started. Either we get the permit or the caller, so we guarantee only one // release will be called. - if !ok { - select { - case p, ok := <-task.permitC: - // Only release if the channel is open, and we can get the permit. - if ok { - m.releaseLocked(p) - } - default: - // If we are not able to get the permit, this means the permit has already - // been given to the caller, and they must call Release on it. + select { + case p, ok := <-task.permitC: + // Only release if the channel is open, and we can get the permit. + if ok { + close(task.permitC) + m.releaseLocked(p) } + default: + // If we are not able to get the permit, this means the permit has already + // been given to the caller, and they must call Release on it. } } diff --git a/pkg/kv/kvserver/multiqueue/multi_queue_test.go b/pkg/kv/kvserver/multiqueue/multi_queue_test.go index ce7e676b3348..8e693a1e210f 100644 --- a/pkg/kv/kvserver/multiqueue/multi_queue_test.go +++ b/pkg/kv/kvserver/multiqueue/multi_queue_test.go @@ -121,6 +121,100 @@ func TestMultiQueueRemove(t *testing.T) { verifyOrder(t, queue, a3, b3, c3, a2, c2) } +func TestMultiQueueCancelOne(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + queue := NewMultiQueue(1) + task := queue.Add(1, 1) + queue.Cancel(task) +} + +func TestMultiQueueCancelInProgress(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + queue := NewMultiQueue(1) + + const a = 1 + const b = 2 + const c = 3 + + a3 := queue.Add(a, 5.0) + a2 := queue.Add(a, 4.0) + b1 := queue.Add(b, 1.1) + b2 := queue.Add(b, 2.1) + c3 := queue.Add(c, 2.2) + b3 := queue.Add(b, 6.1) + + queue.Cancel(b2) + queue.Cancel(b1) + + started := 0 + completed := 0 + startTask := func(task *Task) (*Permit, bool) { + select { + case permit, ok := <-task.GetWaitChan(): + if ok { + started++ + return permit, true + } + case <-time.After(time.Second): + t.Fatalf(`should not wait for task on queue %d with priority %f to start`, + task.queueType, task.priority, + ) + } + return nil, false + } + + completeTask := func(task *Task, permit *Permit) { + releaseStarted := make(chan struct{}) + releaseFinished := make(chan struct{}) + go func() { + close(releaseStarted) + queue.Release(permit) + close(releaseFinished) + }() + <-releaseStarted + select { + case <-releaseFinished: + completed++ + case <-time.After(time.Second): + t.Fatalf(`should not wait for task on queue %d with priority %f to complete`, + task.queueType, task.priority, + ) + } + } + + // Execute a3. + a3Permit, ok := startTask(a3) + require.True(t, ok) + completeTask(a3, a3Permit) + + // Cancel b3 before starting. Should not be able to get permit. + queue.Cancel(b3) + _, ok = startTask(b3) + require.False(t, ok) + + // Now, should be able to execute c3 immediately. + c3Permit, ok := startTask(c3) + require.True(t, ok) + + // A and C started + require.Equal(t, 2, started) + // A completed + require.Equal(t, 1, completed) + + // Complete c3 and cancel after completion. + completeTask(c3, c3Permit) + queue.Cancel(c3) + + // Start a2, which is the final item and also should not block to start. + startTask(a2) + + require.Equal(t, 3, started) + require.Equal(t, 2, completed) +} + // TestMultiQueueStress calls Add from multiple threads. It chooses different // names and different priorities for the requests. The goal is simply to make // sure that all the requests are serviced and nothing hangs or fails. From 9913ba58c622e32f968f79c3877872e242706371 Mon Sep 17 00:00:00 2001 From: Alex Sarkesian Date: Thu, 18 Aug 2022 14:13:01 -0400 Subject: [PATCH 09/11] kvserver: incorporate sending queue priority into snapshot requests This change modifies the `(Delegated)SnapshotRequest` Raft RPCs in order to incorporate the name of the sending queue, as well as the sending queue's priority, in order to be used to prioritize queued snapshots on a receiving store. Release justification: Low-risk change to existing functionality. Release note: None --- pkg/kv/kvserver/kvserverpb/raft.proto | 27 +++++++++++++++++ pkg/kv/kvserver/raft_snapshot_queue.go | 2 +- pkg/kv/kvserver/replica_command.go | 42 +++++++++++++++++--------- pkg/kv/kvserver/replicate_queue.go | 33 ++++++++++++++------ 4 files changed, 79 insertions(+), 25 deletions(-) diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index 1b02b41d473b..f5cbd37383a6 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -155,6 +155,15 @@ message SnapshotRequest { reserved 2; } + // QueueName indicates the source of the snapshot. Snapshots are prioritized + // within a queue and round-robin selected between queues for both the sending + // and receiving side. + enum QueueName { + OTHER = 0; + REPLICATE_QUEUE = 1; + RAFT_SNAPSHOT_QUEUE = 2; + } + message Header { // The replica state at the time the snapshot was generated. Note // that ReplicaState.Desc differs from the above range_descriptor @@ -170,12 +179,16 @@ message SnapshotRequest { int64 range_size = 3; // The priority of the snapshot. + // Deprecated, prefer sender_queue_priority. + // TODO(abaptist): Remove this field for v23.1. Priority priority = 6; // The strategy of the snapshot. Strategy strategy = 7; // The type of the snapshot. + // Deprecated, prefer sender_queue_name. + // TODO(abaptist): Remove this field for v23.1. Type type = 9; // Whether the snapshot uses the unreplicated RaftTruncatedState or not. @@ -190,6 +203,14 @@ message SnapshotRequest { // TODO(irfansharif): Remove this in v22.1. bool deprecated_unreplicated_truncated_state = 8; + // The sending queue's name, to be utilized to ensure fairness across + // different snapshot sending sources. + SnapshotRequest.QueueName sender_queue_name = 10; + + // The sending queue's priority, to be utilized to prioritize snapshots + // from a particular sending source. + double sender_queue_priority = 11; + reserved 1, 4; } @@ -237,6 +258,12 @@ message DelegateSnapshotRequest { // The priority of the snapshot. SnapshotRequest.Priority priority = 5; + // The sending queue's name. + SnapshotRequest.QueueName sender_queue_name = 9; + + // The sending queue's priority. + double sender_queue_priority = 10; + // The type of the snapshot. SnapshotRequest.Type type = 6; diff --git a/pkg/kv/kvserver/raft_snapshot_queue.go b/pkg/kv/kvserver/raft_snapshot_queue.go index 61419699ecec..c3f1d848325a 100644 --- a/pkg/kv/kvserver/raft_snapshot_queue.go +++ b/pkg/kv/kvserver/raft_snapshot_queue.go @@ -140,7 +140,7 @@ func (rq *raftSnapshotQueue) processRaftSnapshot( } } - err := repl.sendSnapshot(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY) + err := repl.sendSnapshot(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY, kvserverpb.SnapshotRequest_RAFT_SNAPSHOT_QUEUE, raftSnapshotPriority) // NB: if the snapshot fails because of an overlapping replica on the // recipient which is also waiting for a snapshot, the "smart" thing is to diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 4d669222f666..db180daaf455 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -981,13 +981,15 @@ func (r *Replica) ChangeReplicas( return nil, errors.New("must disable replicate queue to use ChangeReplicas manually") } } - return r.changeReplicasImpl(ctx, desc, priority, reason, details, chgs) + return r.changeReplicasImpl(ctx, desc, priority, kvserverpb.SnapshotRequest_OTHER, 0.0, reason, details, chgs) } func (r *Replica) changeReplicasImpl( ctx context.Context, desc *roachpb.RangeDescriptor, priority kvserverpb.SnapshotRequest_Priority, + senderName kvserverpb.SnapshotRequest_QueueName, + senderQueuePriority float64, reason kvserverpb.RangeLogEventReason, details string, chgs roachpb.ReplicationChanges, @@ -1054,7 +1056,7 @@ func (r *Replica) changeReplicasImpl( _ = roachpb.ReplicaSet.LearnerDescriptors var err error desc, err = r.initializeRaftLearners( - ctx, desc, priority, reason, details, adds, roachpb.LEARNER, + ctx, desc, priority, senderName, senderQueuePriority, reason, details, adds, roachpb.LEARNER, ) if err != nil { return nil, err @@ -1100,7 +1102,7 @@ func (r *Replica) changeReplicasImpl( // disruption to foreground traffic. See // https://github.com/cockroachdb/cockroach/issues/63199 for an example. desc, err = r.initializeRaftLearners( - ctx, desc, priority, reason, details, adds, roachpb.NON_VOTER, + ctx, desc, priority, senderName, senderQueuePriority, reason, details, adds, roachpb.NON_VOTER, ) if err != nil { return nil, err @@ -1654,6 +1656,8 @@ func (r *Replica) initializeRaftLearners( ctx context.Context, desc *roachpb.RangeDescriptor, priority kvserverpb.SnapshotRequest_Priority, + senderName kvserverpb.SnapshotRequest_QueueName, + senderQueuePriority float64, reason kvserverpb.RangeLogEventReason, details string, targets []roachpb.ReplicationTarget, @@ -1799,7 +1803,9 @@ func (r *Replica) initializeRaftLearners( // orphaned learner. Second, this tickled some bugs in etcd/raft around // switching between StateSnapshot and StateProbe. Even if we worked through // these, it would be susceptible to future similar issues. - if err := r.sendSnapshot(ctx, rDesc, kvserverpb.SnapshotRequest_INITIAL, priority); err != nil { + if err := r.sendSnapshot( + ctx, rDesc, kvserverpb.SnapshotRequest_INITIAL, priority, senderName, senderQueuePriority, + ); err != nil { return nil, err } } @@ -2584,6 +2590,8 @@ func (r *Replica) sendSnapshot( recipient roachpb.ReplicaDescriptor, snapType kvserverpb.SnapshotRequest_Type, priority kvserverpb.SnapshotRequest_Priority, + senderQueueName kvserverpb.SnapshotRequest_QueueName, + senderQueuePriority float64, ) (retErr error) { defer func() { // Report the snapshot status to Raft, which expects us to do this once we @@ -2631,13 +2639,15 @@ func (r *Replica) sendSnapshot( // Create new delegate snapshot request with only required metadata. delegateRequest := &kvserverpb.DelegateSnapshotRequest{ - RangeID: r.RangeID, - CoordinatorReplica: sender, - RecipientReplica: recipient, - Priority: priority, - Type: snapType, - Term: status.Term, - DelegatedSender: sender, + RangeID: r.RangeID, + CoordinatorReplica: sender, + RecipientReplica: recipient, + Priority: priority, + SenderQueueName: senderQueueName, + SenderQueuePriority: senderQueuePriority, + Type: snapType, + Term: status.Term, + DelegatedSender: sender, } err = contextutil.RunWithTimeout( ctx, "delegate-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { @@ -2777,10 +2787,12 @@ func (r *Replica) followerSendSnapshot( Snapshot: snap.RaftSnap, }, }, - RangeSize: rangeSize, - Priority: req.Priority, - Strategy: kvserverpb.SnapshotRequest_KV_BATCH, - Type: req.Type, + RangeSize: rangeSize, + Priority: req.Priority, + SenderQueueName: req.SenderQueueName, + SenderQueuePriority: req.SenderQueuePriority, + Strategy: kvserverpb.SnapshotRequest_KV_BATCH, + Type: req.Type, } newBatchFn := func() storage.Batch { return r.store.Engine().NewUnindexedBatch(true /* writeOnly */) diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index cdd55b16f6a7..81c79fdbf271 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -774,7 +774,7 @@ func (rq *replicateQueue) processOneChange( // unavailability; see: _ = execChangeReplicasTxn - action, _ := rq.allocator.ComputeAction(ctx, conf, desc) + action, allocatorPrio := rq.allocator.ComputeAction(ctx, conf, desc) log.VEventf(ctx, 1, "next replica action: %s", action) switch action { @@ -787,13 +787,13 @@ func (rq *replicateQueue) processOneChange( // Add replicas. case allocatorimpl.AllocatorAddVoter: requeue, err := rq.addOrReplaceVoters( - ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, dryRun, + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, allocatorPrio, dryRun, ) rq.metrics.trackResultByAllocatorAction(action, err, dryRun) return requeue, err case allocatorimpl.AllocatorAddNonVoter: requeue, err := rq.addOrReplaceNonVoters( - ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, dryRun, + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, allocatorPrio, dryRun, ) rq.metrics.trackResultByAllocatorAction(action, err, dryRun) return requeue, err @@ -821,7 +821,7 @@ func (rq *replicateQueue) processOneChange( deadVoterReplicas[0], voterReplicas) } requeue, err := rq.addOrReplaceVoters( - ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, dryRun) + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, allocatorPrio, dryRun) rq.metrics.trackResultByAllocatorAction(action, err, dryRun) return requeue, err case allocatorimpl.AllocatorReplaceDeadNonVoter: @@ -836,7 +836,7 @@ func (rq *replicateQueue) processOneChange( deadNonVoterReplicas[0], nonVoterReplicas) } requeue, err := rq.addOrReplaceNonVoters( - ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, dryRun) + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, allocatorPrio, dryRun) rq.metrics.trackResultByAllocatorAction(action, err, dryRun) return requeue, err @@ -854,7 +854,7 @@ func (rq *replicateQueue) processOneChange( decommissioningVoterReplicas[0], voterReplicas) } requeue, err := rq.addOrReplaceVoters( - ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, dryRun) + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, allocatorPrio, dryRun) rq.metrics.trackResultByAllocatorAction(action, err, dryRun) if err != nil { return requeue, decommissionPurgatoryError{err} @@ -872,7 +872,7 @@ func (rq *replicateQueue) processOneChange( decommissioningNonVoterReplicas[0], nonVoterReplicas) } requeue, err := rq.addOrReplaceNonVoters( - ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, dryRun) + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, allocatorPrio, dryRun) rq.metrics.trackResultByAllocatorAction(action, err, dryRun) if err != nil { return requeue, decommissionPurgatoryError{err} @@ -919,6 +919,7 @@ func (rq *replicateQueue) processOneChange( repl, voterReplicas, nonVoterReplicas, + allocatorPrio, canTransferLeaseFrom, scatter, dryRun, @@ -963,6 +964,7 @@ func (rq *replicateQueue) addOrReplaceVoters( liveVoterReplicas, liveNonVoterReplicas []roachpb.ReplicaDescriptor, removeIdx int, replicaStatus allocatorimpl.ReplicaStatus, + allocatorPriority float64, dryRun bool, ) (requeue bool, _ error) { desc, conf := repl.DescAndSpanConfig() @@ -1082,6 +1084,7 @@ func (rq *replicateQueue) addOrReplaceVoters( ops, desc, kvserverpb.SnapshotRequest_RECOVERY, + allocatorPriority, kvserverpb.ReasonRangeUnderReplicated, details, dryRun, @@ -1101,6 +1104,7 @@ func (rq *replicateQueue) addOrReplaceNonVoters( liveVoterReplicas, liveNonVoterReplicas []roachpb.ReplicaDescriptor, removeIdx int, replicaStatus allocatorimpl.ReplicaStatus, + allocatorPrio float64, dryRun bool, ) (requeue bool, _ error) { desc, conf := repl.DescAndSpanConfig() @@ -1138,6 +1142,7 @@ func (rq *replicateQueue) addOrReplaceNonVoters( ops, desc, kvserverpb.SnapshotRequest_RECOVERY, + allocatorPrio, kvserverpb.ReasonRangeUnderReplicated, details, dryRun, @@ -1326,6 +1331,7 @@ func (rq *replicateQueue) removeVoter( roachpb.MakeReplicationChanges(roachpb.REMOVE_VOTER, removeVoter), desc, kvserverpb.SnapshotRequest_UNKNOWN, // unused + 0.0, // unused kvserverpb.ReasonRangeOverReplicated, details, dryRun, @@ -1369,7 +1375,8 @@ func (rq *replicateQueue) removeNonVoter( repl, roachpb.MakeReplicationChanges(roachpb.REMOVE_NON_VOTER, target), desc, - kvserverpb.SnapshotRequest_UNKNOWN, + kvserverpb.SnapshotRequest_UNKNOWN, // unused + 0.0, // unused kvserverpb.ReasonRangeOverReplicated, details, dryRun, @@ -1429,6 +1436,7 @@ func (rq *replicateQueue) removeDecommissioning( roachpb.MakeReplicationChanges(targetType.RemoveChangeType(), target), desc, kvserverpb.SnapshotRequest_UNKNOWN, // unused + 0.0, // unused kvserverpb.ReasonStoreDecommissioning, "", dryRun, ); err != nil { return false, err @@ -1475,6 +1483,7 @@ func (rq *replicateQueue) removeDead( roachpb.MakeReplicationChanges(targetType.RemoveChangeType(), target), desc, kvserverpb.SnapshotRequest_UNKNOWN, // unused + 0.0, // unused kvserverpb.ReasonStoreDead, "", dryRun, @@ -1488,6 +1497,7 @@ func (rq *replicateQueue) considerRebalance( ctx context.Context, repl *Replica, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + allocatorPrio float64, canTransferLeaseFrom func(ctx context.Context, repl *Replica) bool, scatter, dryRun bool, ) (requeue bool, _ error) { @@ -1574,6 +1584,7 @@ func (rq *replicateQueue) considerRebalance( chgs, desc, kvserverpb.SnapshotRequest_REBALANCE, + allocatorPrio, kvserverpb.ReasonRebalance, details, dryRun, @@ -1794,6 +1805,7 @@ func (rq *replicateQueue) changeReplicas( chgs roachpb.ReplicationChanges, desc *roachpb.RangeDescriptor, priority kvserverpb.SnapshotRequest_Priority, + allocatorPriority float64, reason kvserverpb.RangeLogEventReason, details string, dryRun bool, @@ -1804,7 +1816,10 @@ func (rq *replicateQueue) changeReplicas( // NB: this calls the impl rather than ChangeReplicas because // the latter traps tests that try to call it while the replication // queue is active. - if _, err := repl.changeReplicasImpl(ctx, desc, priority, reason, details, chgs); err != nil { + if _, err := repl.changeReplicasImpl( + ctx, desc, priority, kvserverpb.SnapshotRequest_REPLICATE_QUEUE, allocatorPriority, reason, + details, chgs, + ); err != nil { return err } rangeUsageInfo := rangeUsageInfoForRepl(repl) From 28db5f34b2e4098298db21a71d30a6194cb2c0f4 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 25 Aug 2022 17:51:09 -0700 Subject: [PATCH 10/11] dev: bump version This commit bumps version since there appears to have been a "merge skew" between #85095 and #86167, and somehow I had a `dev` binary that didn't include the benchmark fix from the latter. Release justification: test-only change. Release note: None --- dev | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev b/dev index 0ecc418e1c40..51c0d44c3582 100755 --- a/dev +++ b/dev @@ -8,7 +8,7 @@ fi set -euo pipefail # Bump this counter to force rebuilding `dev` on all machines. -DEV_VERSION=53 +DEV_VERSION=54 THIS_DIR=$(cd "$(dirname "$0")" && pwd) BINARY_DIR=$THIS_DIR/bin/dev-versions From bc2789e56a332ae81daf220062fee3b459373497 Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Wed, 24 Aug 2022 14:38:59 -0400 Subject: [PATCH 11/11] ttl: add queries to job details This helps with observability so users can understand what the TTL job is doing behind the scenes. The job details in the DB console will show: ``` ttl for defaultdb.public.t -- for each range, iterate to find rows: SELECT id FROM [108 AS tbl_name] AS OF SYSTEM TIME '30s' WHERE <= $1 AND (id) > () AND (id) < () ORDER BY id LIMIT -- then delete with: DELETE FROM [108 AS tbl_name] WHERE <= $1 AND (id) IN () ``` Release note: None Release justification: low risk change --- pkg/BUILD.bazel | 2 + pkg/sql/ttl/ttlbase/BUILD.bazel | 12 ++++ pkg/sql/ttl/ttlbase/ttl_helpers.go | 52 ++++++++++++++++ pkg/sql/ttl/ttljob/BUILD.bazel | 3 +- pkg/sql/ttl/ttljob/ttljob.go | 3 +- pkg/sql/ttl/ttljob/ttljob_query_builder.go | 36 +++-------- .../ttl/ttljob/ttljob_query_builder_test.go | 62 +++++++++++++------ pkg/sql/ttl/ttlschedule/BUILD.bazel | 2 + pkg/sql/ttl/ttlschedule/ttlschedule.go | 31 +++++++++- 9 files changed, 153 insertions(+), 50 deletions(-) create mode 100644 pkg/sql/ttl/ttlbase/BUILD.bazel create mode 100644 pkg/sql/ttl/ttlbase/ttl_helpers.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 2d40d27a41e5..4427a90cf0e8 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -1727,6 +1727,7 @@ GO_TARGETS = [ "//pkg/sql/syntheticprivilege:syntheticprivilege_test", "//pkg/sql/tests:tests", "//pkg/sql/tests:tests_test", + "//pkg/sql/ttl/ttlbase:ttlbase", "//pkg/sql/ttl/ttljob:ttljob", "//pkg/sql/ttl/ttljob:ttljob_test", "//pkg/sql/ttl/ttlschedule:ttlschedule", @@ -2752,6 +2753,7 @@ GET_X_DATA_TARGETS = [ "//pkg/sql/storageparam/tablestorageparam:get_x_data", "//pkg/sql/syntheticprivilege:get_x_data", "//pkg/sql/tests:get_x_data", + "//pkg/sql/ttl/ttlbase:get_x_data", "//pkg/sql/ttl/ttljob:get_x_data", "//pkg/sql/ttl/ttlschedule:get_x_data", "//pkg/sql/types:get_x_data", diff --git a/pkg/sql/ttl/ttlbase/BUILD.bazel b/pkg/sql/ttl/ttlbase/BUILD.bazel new file mode 100644 index 000000000000..b3cb815e5a72 --- /dev/null +++ b/pkg/sql/ttl/ttlbase/BUILD.bazel @@ -0,0 +1,12 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "ttlbase", + srcs = ["ttl_helpers.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/sql/ttl/ttlbase", + visibility = ["//visibility:public"], + deps = ["//pkg/sql/lexbase"], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/sql/ttl/ttlbase/ttl_helpers.go b/pkg/sql/ttl/ttlbase/ttl_helpers.go new file mode 100644 index 000000000000..a7d1dac2818a --- /dev/null +++ b/pkg/sql/ttl/ttlbase/ttl_helpers.go @@ -0,0 +1,52 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package ttlbase + +import ( + "bytes" + "time" + + "github.com/cockroachdb/cockroach/pkg/sql/lexbase" +) + +// DefaultAOSTDuration is the default duration to use in the AS OF SYSTEM TIME +// clause used in the SELECT query. +const DefaultAOSTDuration = -time.Second * 30 + +// SelectTemplate is the format string used to build SELECT queries for the +// TTL job. +const SelectTemplate = `SELECT %[1]s FROM [%[2]d AS tbl_name] +AS OF SYSTEM TIME %[3]s +WHERE %[4]s <= $1 +%[5]s%[6]s +ORDER BY %[1]s +LIMIT %[7]v` + +// DeleteTemplate is the format string used to build DELETE queries for the +// TTL job. +const DeleteTemplate = `DELETE FROM [%d AS tbl_name] +WHERE %s <= $1 +AND (%s) IN (%s)` + +// MakeColumnNamesSQL converts columns into an escape string +// for an order by clause, e.g.: +// {"a", "b"} => a, b +// {"escape-me", "b"} => "escape-me", b +func MakeColumnNamesSQL(columns []string) string { + var b bytes.Buffer + for i, pkColumn := range columns { + if i > 0 { + b.WriteString(", ") + } + lexbase.EncodeRestrictedSQLIdent(&b, pkColumn, lexbase.EncNoFlags) + } + return b.String() +} diff --git a/pkg/sql/ttl/ttljob/BUILD.bazel b/pkg/sql/ttl/ttljob/BUILD.bazel index 7b43ed3e990c..9edce1de6384 100644 --- a/pkg/sql/ttl/ttljob/BUILD.bazel +++ b/pkg/sql/ttl/ttljob/BUILD.bazel @@ -30,7 +30,6 @@ go_library( "//pkg/sql/catalog/descs", "//pkg/sql/execinfra", "//pkg/sql/execinfrapb", - "//pkg/sql/lexbase", "//pkg/sql/physicalplan", "//pkg/sql/rowenc", "//pkg/sql/rowexec", @@ -39,6 +38,7 @@ go_library( "//pkg/sql/sessiondatapb", "//pkg/sql/sqltelemetry", "//pkg/sql/sqlutil", + "//pkg/sql/ttl/ttlbase", "//pkg/sql/types", "//pkg/util/ctxgroup", "//pkg/util/log", @@ -79,6 +79,7 @@ go_test( "//pkg/sql/parser", "//pkg/sql/randgen", "//pkg/sql/sem/tree", + "//pkg/sql/ttl/ttlbase", "//pkg/sql/types", "//pkg/testutils", "//pkg/testutils/serverutils", diff --git a/pkg/sql/ttl/ttljob/ttljob.go b/pkg/sql/ttl/ttljob/ttljob.go index 77d9421e4a55..69044672e223 100644 --- a/pkg/sql/ttl/ttljob/ttljob.go +++ b/pkg/sql/ttl/ttljob/ttljob.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/sql/ttl/ttlbase" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -103,7 +104,7 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err details := t.job.Details().(jobspb.RowLevelTTLDetails) - aostDuration := -time.Second * 30 + aostDuration := ttlbase.DefaultAOSTDuration if knobs.AOSTDuration != nil { aostDuration = *knobs.AOSTDuration } diff --git a/pkg/sql/ttl/ttljob/ttljob_query_builder.go b/pkg/sql/ttl/ttljob/ttljob_query_builder.go index 547388ecacc0..aba6791ebd7d 100644 --- a/pkg/sql/ttl/ttljob/ttljob_query_builder.go +++ b/pkg/sql/ttl/ttljob/ttljob_query_builder.go @@ -11,7 +11,6 @@ package ttljob import ( - "bytes" "context" "fmt" "time" @@ -20,11 +19,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" - "github.com/cockroachdb/cockroach/pkg/sql/lexbase" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/sql/ttl/ttlbase" "github.com/cockroachdb/errors" ) @@ -91,8 +90,8 @@ func makeSelectQueryBuilder( cachedArgs: cachedArgs, isFirst: true, - pkColumnNamesSQL: makeColumnNamesSQL(pkColumns), - endPKColumnNamesSQL: makeColumnNamesSQL(pkColumns[:len(endPK)]), + pkColumnNamesSQL: ttlbase.MakeColumnNamesSQL(pkColumns), + endPKColumnNamesSQL: ttlbase.MakeColumnNamesSQL(pkColumns[:len(endPK)]), } } @@ -117,7 +116,7 @@ func (b *selectQueryBuilder) buildQuery() string { var filterClause string if !b.isFirst { // After the first query, we always want (col1, ...) > (cursor_col_1, ...) - filterClause = fmt.Sprintf(" AND (%s) > (", b.pkColumnNamesSQL) + filterClause = fmt.Sprintf("AND (%s) > (", b.pkColumnNamesSQL) for i := range b.pkColumns { if i > 0 { filterClause += ", " @@ -129,7 +128,7 @@ func (b *selectQueryBuilder) buildQuery() string { filterClause += ")" } else if len(startPK) > 0 { // For the the first query, we want (col1, ...) >= (cursor_col_1, ...) - filterClause = fmt.Sprintf(" AND (%s) >= (", makeColumnNamesSQL(b.pkColumns[:len(startPK)])) + filterClause = fmt.Sprintf("AND (%s) >= (", ttlbase.MakeColumnNamesSQL(b.pkColumns[:len(startPK)])) for i := range startPK { if i > 0 { filterClause += ", " @@ -142,11 +141,7 @@ func (b *selectQueryBuilder) buildQuery() string { } return fmt.Sprintf( - `SELECT %[1]s FROM [%[2]d AS tbl_name] -AS OF SYSTEM TIME %[3]s -WHERE %[4]s <= $1%[5]s%[6]s -ORDER BY %[1]s -LIMIT %[7]d`, + ttlbase.SelectTemplate, b.pkColumnNamesSQL, b.tableID, tree.MustMakeDTimestampTZ(b.aost, time.Microsecond), @@ -254,7 +249,7 @@ func makeDeleteQueryBuilder( } func (b *deleteQueryBuilder) buildQuery(numRows int) string { - columnNamesSQL := makeColumnNamesSQL(b.pkColumns) + columnNamesSQL := ttlbase.MakeColumnNamesSQL(b.pkColumns) var placeholderStr string for i := 0; i < numRows; i++ { if i > 0 { @@ -271,7 +266,7 @@ func (b *deleteQueryBuilder) buildQuery(numRows int) string { } return fmt.Sprintf( - `DELETE FROM [%d AS tbl_name] WHERE %s <= $1 AND (%s) IN (%s)`, + ttlbase.DeleteTemplate, b.tableID, b.ttlExpr, columnNamesSQL, @@ -316,18 +311,3 @@ func (b *deleteQueryBuilder) run( ) return int64(rowCount), err } - -// makeColumnNamesSQL converts columns into an escape string -// for an order by clause, e.g.: -// {"a", "b"} => a, b -// {"escape-me", "b"} => "escape-me", b -func makeColumnNamesSQL(columns []string) string { - var b bytes.Buffer - for i, pkColumn := range columns { - if i > 0 { - b.WriteString(", ") - } - lexbase.EncodeRestrictedSQLIdent(&b, pkColumn, lexbase.EncNoFlags) - } - return b.String() -} diff --git a/pkg/sql/ttl/ttljob/ttljob_query_builder_test.go b/pkg/sql/ttl/ttljob/ttljob_query_builder_test.go index fc518f116814..bbb3707d61b4 100644 --- a/pkg/sql/ttl/ttljob/ttljob_query_builder_test.go +++ b/pkg/sql/ttl/ttljob/ttljob_query_builder_test.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/ttl/ttlbase" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/stretchr/testify/require" @@ -56,7 +57,8 @@ func TestSelectQueryBuilder(t *testing.T) { { expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' -WHERE crdb_internal_expiration <= $1 AND (col1, col2) >= ($4, $5) AND (col1, col2) < ($2, $3) +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) >= ($4, $5) AND (col1, col2) < ($2, $3) ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -72,7 +74,8 @@ LIMIT 2`, { expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' -WHERE crdb_internal_expiration <= $1 AND (col1, col2) > ($4, $5) AND (col1, col2) < ($2, $3) +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) > ($4, $5) AND (col1, col2) < ($2, $3) ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -88,7 +91,8 @@ LIMIT 2`, { expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' -WHERE crdb_internal_expiration <= $1 AND (col1, col2) > ($4, $5) AND (col1, col2) < ($2, $3) +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) > ($4, $5) AND (col1, col2) < ($2, $3) ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -117,6 +121,7 @@ LIMIT 2`, expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' WHERE crdb_internal_expiration <= $1 + ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -130,7 +135,8 @@ LIMIT 2`, { expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' -WHERE crdb_internal_expiration <= $1 AND (col1, col2) > ($2, $3) +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) > ($2, $3) ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -145,7 +151,8 @@ LIMIT 2`, { expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' -WHERE crdb_internal_expiration <= $1 AND (col1, col2) > ($2, $3) +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) > ($2, $3) ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -175,7 +182,8 @@ LIMIT 2`, { expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' -WHERE crdb_internal_expiration <= $1 AND (col1) >= ($3) AND (col1) < ($2) +WHERE crdb_internal_expiration <= $1 +AND (col1) >= ($3) AND (col1) < ($2) ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -191,7 +199,8 @@ LIMIT 2`, { expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' -WHERE crdb_internal_expiration <= $1 AND (col1, col2) > ($3, $4) AND (col1) < ($2) +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) > ($3, $4) AND (col1) < ($2) ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -207,7 +216,8 @@ LIMIT 2`, { expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' -WHERE crdb_internal_expiration <= $1 AND (col1, col2) > ($3, $4) AND (col1) < ($2) +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) > ($3, $4) AND (col1) < ($2) ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -237,7 +247,8 @@ LIMIT 2`, { expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' -WHERE crdb_internal_expiration <= $1 AND (col1, col2) < ($2, $3) +WHERE crdb_internal_expiration <= $1 + AND (col1, col2) < ($2, $3) ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -252,7 +263,8 @@ LIMIT 2`, { expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' -WHERE crdb_internal_expiration <= $1 AND (col1, col2) > ($4, $5) AND (col1, col2) < ($2, $3) +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) > ($4, $5) AND (col1, col2) < ($2, $3) ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -268,7 +280,8 @@ LIMIT 2`, { expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' -WHERE crdb_internal_expiration <= $1 AND (col1, col2) > ($4, $5) AND (col1, col2) < ($2, $3) +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) > ($4, $5) AND (col1, col2) < ($2, $3) ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -298,7 +311,8 @@ LIMIT 2`, { expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' -WHERE crdb_internal_expiration <= $1 AND (col1, col2) >= ($2, $3) +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) >= ($2, $3) ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -313,7 +327,8 @@ LIMIT 2`, { expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' -WHERE crdb_internal_expiration <= $1 AND (col1, col2) > ($2, $3) +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) > ($2, $3) ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -328,7 +343,8 @@ LIMIT 2`, { expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' -WHERE crdb_internal_expiration <= $1 AND (col1, col2) > ($2, $3) +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) > ($2, $3) ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -382,7 +398,9 @@ func TestDeleteQueryBuilder(t *testing.T) { {tree.NewDInt(10), tree.NewDInt(15)}, {tree.NewDInt(12), tree.NewDInt(16)}, }, - expectedQuery: `DELETE FROM [1 AS tbl_name] WHERE crdb_internal_expiration <= $1 AND (col1, col2) IN (($2, $3), ($4, $5))`, + expectedQuery: `DELETE FROM [1 AS tbl_name] +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) IN (($2, $3), ($4, $5))`, expectedArgs: []interface{}{ mockTime, tree.NewDInt(10), tree.NewDInt(15), @@ -401,7 +419,9 @@ func TestDeleteQueryBuilder(t *testing.T) { {tree.NewDInt(12), tree.NewDInt(16)}, {tree.NewDInt(12), tree.NewDInt(18)}, }, - expectedQuery: `DELETE FROM [1 AS tbl_name] WHERE crdb_internal_expiration <= $1 AND (col1, col2) IN (($2, $3), ($4, $5), ($6, $7))`, + expectedQuery: `DELETE FROM [1 AS tbl_name] +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) IN (($2, $3), ($4, $5), ($6, $7))`, expectedArgs: []interface{}{ mockTime, tree.NewDInt(10), tree.NewDInt(15), @@ -415,7 +435,9 @@ func TestDeleteQueryBuilder(t *testing.T) { {tree.NewDInt(112), tree.NewDInt(116)}, {tree.NewDInt(112), tree.NewDInt(118)}, }, - expectedQuery: `DELETE FROM [1 AS tbl_name] WHERE crdb_internal_expiration <= $1 AND (col1, col2) IN (($2, $3), ($4, $5), ($6, $7))`, + expectedQuery: `DELETE FROM [1 AS tbl_name] +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) IN (($2, $3), ($4, $5), ($6, $7))`, expectedArgs: []interface{}{ mockTime, tree.NewDInt(110), tree.NewDInt(115), @@ -427,7 +449,9 @@ func TestDeleteQueryBuilder(t *testing.T) { rows: []tree.Datums{ {tree.NewDInt(1210), tree.NewDInt(1215)}, }, - expectedQuery: `DELETE FROM [1 AS tbl_name] WHERE crdb_internal_expiration <= $1 AND (col1, col2) IN (($2, $3))`, + expectedQuery: `DELETE FROM [1 AS tbl_name] +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) IN (($2, $3))`, expectedArgs: []interface{}{ mockTime, tree.NewDInt(1210), tree.NewDInt(1215), @@ -464,7 +488,7 @@ func TestMakeColumnNamesSQL(t *testing.T) { for _, tc := range testCases { t.Run(tc.expected, func(t *testing.T) { - require.Equal(t, tc.expected, makeColumnNamesSQL(tc.cols)) + require.Equal(t, tc.expected, ttlbase.MakeColumnNamesSQL(tc.cols)) }) } } diff --git a/pkg/sql/ttl/ttlschedule/BUILD.bazel b/pkg/sql/ttl/ttlschedule/BUILD.bazel index b22e534a08f7..6ec47978d9ee 100644 --- a/pkg/sql/ttl/ttlschedule/BUILD.bazel +++ b/pkg/sql/ttl/ttlschedule/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "//pkg/scheduledjobs", "//pkg/security/username", "//pkg/sql", + "//pkg/sql/catalog", "//pkg/sql/catalog/catpb", "//pkg/sql/catalog/descs", "//pkg/sql/pgwire/pgcode", @@ -20,6 +21,7 @@ go_library( "//pkg/sql/sem/tree", "//pkg/sql/sqlerrors", "//pkg/sql/sqlutil", + "//pkg/sql/ttl/ttlbase", "//pkg/util/metric", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/sql/ttl/ttlschedule/ttlschedule.go b/pkg/sql/ttl/ttlschedule/ttlschedule.go index 4d9589de875c..d7f9c0a40f5f 100644 --- a/pkg/sql/ttl/ttlschedule/ttlschedule.go +++ b/pkg/sql/ttl/ttlschedule/ttlschedule.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/scheduledjobs" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" @@ -27,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/sql/ttl/ttlbase" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -209,6 +211,33 @@ func (s rowLevelTTLExecutor) GetCreateScheduleStatement( return fmt.Sprintf(`ALTER TABLE %s WITH (ttl = 'on', ...)`, tn.FQString()), nil } +func makeTTLJobDescription(tableDesc catalog.TableDescriptor, tn *tree.TableName) string { + pkColumns := tableDesc.GetPrimaryIndex().IndexDesc().KeyColumnNames + pkColumnNamesSQL := ttlbase.MakeColumnNamesSQL(pkColumns) + selectQuery := fmt.Sprintf( + ttlbase.SelectTemplate, + pkColumnNamesSQL, + tableDesc.GetID(), + fmt.Sprintf("'%v'", ttlbase.DefaultAOSTDuration), + "", + fmt.Sprintf("AND (%s) > ()", pkColumnNamesSQL), + fmt.Sprintf(" AND (%s) < ()", pkColumnNamesSQL), + "", + ) + deleteQuery := fmt.Sprintf( + ttlbase.DeleteTemplate, + tableDesc.GetID(), + "", + pkColumnNamesSQL, + "", + ) + return fmt.Sprintf(`ttl for %s +-- for each range, iterate to find rows: +%s +-- then delete with: +%s`, tn.FQString(), selectQuery, deleteQuery) +} + func createRowLevelTTLJob( ctx context.Context, createdByInfo *jobs.CreatedByInfo, @@ -226,7 +255,7 @@ func createRowLevelTTLJob( return 0, err } record := jobs.Record{ - Description: fmt.Sprintf("ttl for %s", tn.FQString()), + Description: makeTTLJobDescription(tableDesc, tn), Username: username.NodeUserName(), Details: jobspb.RowLevelTTLDetails{ TableID: ttlArgs.TableID,