diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 85017913841b..0d80bb8eeef6 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -3667,6 +3667,10 @@ func (m *sessionDataMutator) SetUnsafeSettingInterlockKey(val string) { m.data.UnsafeSettingInterlockKey = val } +func (m *sessionDataMutator) SetOptimizerUseLockOpForSerializable(val bool) { + m.data.OptimizerUseLockOpForSerializable = val +} + // Utility functions related to scrubbing sensitive information on SQL Stats. // quantizeCounts ensures that the Count field in the diff --git a/pkg/sql/logictest/testdata/logic_test/information_schema b/pkg/sql/logictest/testdata/logic_test/information_schema index 22f2250ca563..33ce9aa511b9 100644 --- a/pkg/sql/logictest/testdata/logic_test/information_schema +++ b/pkg/sql/logictest/testdata/logic_test/information_schema @@ -5568,6 +5568,7 @@ optimizer_use_improved_disjunction_stats on optimizer_use_improved_join_elimination on optimizer_use_improved_split_disjunction_for_joins on optimizer_use_limit_ordering_for_streaming_group_by on +optimizer_use_lock_op_for_serializable off optimizer_use_multicol_stats on optimizer_use_not_visible_indexes off override_multi_region_zone_config off diff --git a/pkg/sql/logictest/testdata/logic_test/pg_catalog b/pkg/sql/logictest/testdata/logic_test/pg_catalog index bb7a146623cc..b3fe36049ff6 100644 --- a/pkg/sql/logictest/testdata/logic_test/pg_catalog +++ b/pkg/sql/logictest/testdata/logic_test/pg_catalog @@ -2886,6 +2886,7 @@ optimizer_use_improved_disjunction_stats on N optimizer_use_improved_join_elimination on NULL NULL NULL string optimizer_use_improved_split_disjunction_for_joins on NULL NULL NULL string optimizer_use_limit_ordering_for_streaming_group_by on NULL NULL NULL string +optimizer_use_lock_op_for_serializable off NULL NULL NULL string optimizer_use_multicol_stats on NULL NULL NULL string optimizer_use_not_visible_indexes off NULL NULL NULL string override_multi_region_zone_config off NULL NULL NULL string @@ -3048,6 +3049,7 @@ optimizer_use_improved_disjunction_stats on N optimizer_use_improved_join_elimination on NULL user NULL on on optimizer_use_improved_split_disjunction_for_joins on NULL user NULL on on optimizer_use_limit_ordering_for_streaming_group_by on NULL user NULL on on +optimizer_use_lock_op_for_serializable off NULL user NULL off off optimizer_use_multicol_stats on NULL user NULL on on optimizer_use_not_visible_indexes off NULL user NULL off off override_multi_region_zone_config off NULL user NULL off off @@ -3209,6 +3211,7 @@ optimizer_use_improved_disjunction_stats NULL NULL NULL optimizer_use_improved_join_elimination NULL NULL NULL NULL NULL optimizer_use_improved_split_disjunction_for_joins NULL NULL NULL NULL NULL optimizer_use_limit_ordering_for_streaming_group_by NULL NULL NULL NULL NULL +optimizer_use_lock_op_for_serializable NULL NULL NULL NULL NULL optimizer_use_multicol_stats NULL NULL NULL NULL NULL optimizer_use_not_visible_indexes NULL NULL NULL NULL NULL override_multi_region_zone_config NULL NULL NULL NULL NULL diff --git a/pkg/sql/logictest/testdata/logic_test/show_source b/pkg/sql/logictest/testdata/logic_test/show_source index 3e97d9d51f38..71c772796b1e 100644 --- a/pkg/sql/logictest/testdata/logic_test/show_source +++ b/pkg/sql/logictest/testdata/logic_test/show_source @@ -124,6 +124,7 @@ optimizer_use_improved_disjunction_stats on optimizer_use_improved_join_elimination on optimizer_use_improved_split_disjunction_for_joins on optimizer_use_limit_ordering_for_streaming_group_by on +optimizer_use_lock_op_for_serializable off optimizer_use_multicol_stats on optimizer_use_not_visible_indexes off override_multi_region_zone_config off diff --git a/pkg/sql/opt/exec/execbuilder/mutation.go b/pkg/sql/opt/exec/execbuilder/mutation.go index 3cac4d7a076b..800a0634cc32 100644 --- a/pkg/sql/opt/exec/execbuilder/mutation.go +++ b/pkg/sql/opt/exec/execbuilder/mutation.go @@ -1134,3 +1134,26 @@ func unwrapProjectExprs(input memo.RelExpr) memo.RelExpr { } return input } + +func (b *Builder) buildLock(lock *memo.LockExpr) (execPlan, error) { + // Don't bother creating the lookup join if we don't need it. + locking, err := b.buildLocking(lock.Locking) + if err != nil { + return execPlan{}, err + } + if !locking.IsLocking() { + return b.buildRelational(lock.Input) + } + + private := &memo.LookupJoinPrivate{ + JoinType: opt.SemiJoinOp, + Table: lock.Table, + Index: cat.PrimaryIndex, + KeyCols: lock.KeyCols, + Cols: lock.Cols, + LookupColsAreTableKey: true, + Locking: locking, + } + join := lock.Memo().MemoizeLookupJoin(lock.Input, nil /* on */, private) + return b.buildLookupJoin(join.(*memo.LookupJoinExpr)) +} diff --git a/pkg/sql/opt/exec/execbuilder/relational.go b/pkg/sql/opt/exec/execbuilder/relational.go index 97b2492bb4cc..eb500818cd1b 100644 --- a/pkg/sql/opt/exec/execbuilder/relational.go +++ b/pkg/sql/opt/exec/execbuilder/relational.go @@ -292,6 +292,9 @@ func (b *Builder) buildRelational(e memo.RelExpr) (execPlan, error) { case *memo.DeleteExpr: ep, err = b.buildDelete(t) + case *memo.LockExpr: + ep, err = b.buildLock(t) + case *memo.CreateTableExpr: ep, err = b.buildCreateTable(t) @@ -3626,7 +3629,8 @@ func (b *Builder) statementTag(expr memo.RelExpr) string { switch expr.Op() { case opt.OpaqueRelOp, opt.OpaqueMutationOp, opt.OpaqueDDLOp: return expr.Private().(*memo.OpaqueRelPrivate).Metadata.String() - + case opt.LockOp: + return "SELECT " + expr.Private().(*memo.LockPrivate).Locking.Strength.String() default: return expr.Op().SyntaxTag() } diff --git a/pkg/sql/opt/exec/execbuilder/testdata/select_for_update_read_committed b/pkg/sql/opt/exec/execbuilder/testdata/select_for_update_read_committed index f4fa3e98dda8..dc089a4e65e1 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/select_for_update_read_committed +++ b/pkg/sql/opt/exec/execbuilder/testdata/select_for_update_read_committed @@ -24,10 +24,10 @@ SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED query T EXPLAIN (OPT) SELECT aisle FROM supermarket WHERE person = 'matilda' FOR UPDATE ---- -project +lock supermarket + ├── locking: for-update,durability-guaranteed └── scan supermarket - ├── constraint: /1: [/'matilda' - /'matilda'] - └── locking: for-update,durability-guaranteed + └── constraint: /1: [/'matilda' - /'matilda'] query T EXPLAIN (VERBOSE) SELECT aisle FROM supermarket WHERE person = 'matilda' FOR UPDATE @@ -38,13 +38,20 @@ vectorized: true • project │ columns: (aisle) │ -└── • scan - columns: (person, aisle) - estimated row count: 1 (missing stats) - table: supermarket@supermarket_pkey - spans: /"matilda"/0 - locking strength: for update - locking durability: guaranteed +└── • lookup join (semi) + │ columns: (person, aisle) + │ estimated row count: 1 (missing stats) + │ table: supermarket@supermarket_pkey + │ equality: (person) = (person) + │ equality cols are key + │ locking strength: for update + │ locking durability: guaranteed + │ + └── • scan + columns: (person, aisle) + estimated row count: 1 (missing stats) + table: supermarket@supermarket_pkey + spans: /"matilda"/0 query T EXPLAIN (OPT) @@ -58,10 +65,10 @@ update supermarket │ └── constraint: /7: [/'michael' - /'michael'] └── projections └── subquery - └── project + └── lock supermarket + ├── locking: for-update,durability-guaranteed └── scan supermarket - ├── constraint: /13: [/'matilda' - /'matilda'] - └── locking: for-update,durability-guaranteed + └── constraint: /13: [/'matilda' - /'matilda'] query T EXPLAIN (VERBOSE) @@ -80,7 +87,6 @@ vectorized: true │ │ estimated row count: 0 (missing stats) │ │ table: supermarket │ │ set: aisle -│ │ auto commit │ │ │ └── • render │ │ columns: (person, aisle, starts_with, ends_with, aisle_new) @@ -105,13 +111,21 @@ vectorized: true └── • project │ columns: (aisle) │ - └── • scan - columns: (person, aisle) - estimated row count: 1 (missing stats) - table: supermarket@supermarket_pkey - spans: /"matilda"/0 - locking strength: for update - locking durability: guaranteed + └── • lookup join (semi) + │ columns: (person, aisle) + │ estimated row count: 1 (missing stats) + │ table: supermarket@supermarket_pkey + │ equality: (person) = (person) + │ equality cols are key + │ locking strength: for update + │ locking durability: guaranteed + │ + └── • scan + columns: (person, aisle) + estimated row count: 1 (missing stats) + table: supermarket@supermarket_pkey + spans: /"matilda"/0 + locking strength: for update query T EXPLAIN (OPT) @@ -120,10 +134,10 @@ WITH s AS SELECT aisle + 1 FROM s ---- with &1 (s) - ├── project + ├── lock supermarket + │ ├── locking: for-update,durability-guaranteed │ └── scan supermarket - │ ├── constraint: /1: [/'matilda' - /'matilda'] - │ └── locking: for-update,durability-guaranteed + │ └── constraint: /1: [/'matilda' - /'matilda'] └── project ├── with-scan &1 (s) └── projections @@ -162,13 +176,20 @@ vectorized: true └── • project │ columns: (aisle) │ - └── • scan - columns: (person, aisle) - estimated row count: 1 (missing stats) - table: supermarket@supermarket_pkey - spans: /"matilda"/0 - locking strength: for update - locking durability: guaranteed + └── • lookup join (semi) + │ columns: (person, aisle) + │ estimated row count: 1 (missing stats) + │ table: supermarket@supermarket_pkey + │ equality: (person) = (person) + │ equality cols are key + │ locking strength: for update + │ locking durability: guaranteed + │ + └── • scan + columns: (person, aisle) + estimated row count: 1 (missing stats) + table: supermarket@supermarket_pkey + spans: /"matilda"/0 query T EXPLAIN (OPT) @@ -183,13 +204,14 @@ with &1 (names) ├── materialized ├── values │ └── ('matilda',) - └── project - └── inner-join (lookup supermarket) - ├── flags: force lookup join (into right side) - ├── lookup columns are key - ├── locking: for-update,durability-guaranteed - ├── with-scan &1 (names) - └── filters (true) + └── lock supermarket + ├── locking: for-update,durability-guaranteed + └── project + └── inner-join (lookup supermarket) + ├── flags: force lookup join (into right side) + ├── lookup columns are key + ├── with-scan &1 (names) + └── filters (true) query T EXPLAIN (VERBOSE) @@ -209,8 +231,8 @@ vectorized: true ├── • project │ │ columns: (aisle) │ │ -│ └── • lookup join (inner) -│ │ columns: (person, person, aisle) +│ └── • lookup join (semi) +│ │ columns: (person, aisle) │ │ estimated row count: 1 (missing stats) │ │ table: supermarket@supermarket_pkey │ │ equality: (person) = (person) @@ -218,10 +240,20 @@ vectorized: true │ │ locking strength: for update │ │ locking durability: guaranteed │ │ -│ └── • scan buffer -│ columns: (person) -│ estimated row count: 1 -│ label: buffer 1 (names) +│ └── • project +│ │ columns: (person, aisle) +│ │ +│ └── • lookup join (inner) +│ │ columns: (person, person, aisle) +│ │ estimated row count: 1 (missing stats) +│ │ table: supermarket@supermarket_pkey +│ │ equality: (person) = (person) +│ │ equality cols are key +│ │ +│ └── • scan buffer +│ columns: (person) +│ estimated row count: 1 +│ label: buffer 1 (names) │ └── • subquery │ id: @S1 @@ -244,13 +276,13 @@ SELECT aisle WHERE starts_with = 'm' FOR UPDATE ---- -project - └── index-join supermarket - ├── locking: for-update,durability-guaranteed - └── scan supermarket@supermarket_starts_with_idx - ├── constraint: /3/1: [/'m' - /'m'] - ├── flags: force-index=supermarket_starts_with_idx - └── locking: for-update,durability-guaranteed +lock supermarket + ├── locking: for-update,durability-guaranteed + └── project + └── index-join supermarket + └── scan supermarket@supermarket_starts_with_idx + ├── constraint: /3/1: [/'m' - /'m'] + └── flags: force-index=supermarket_starts_with_idx query T EXPLAIN (VERBOSE) @@ -265,21 +297,29 @@ vectorized: true • project │ columns: (aisle) │ -└── • index join - │ columns: (aisle, starts_with) +└── • lookup join (semi) + │ columns: (person, aisle) │ estimated row count: 10 (missing stats) │ table: supermarket@supermarket_pkey - │ key columns: person + │ equality: (person) = (person) + │ equality cols are key │ locking strength: for update │ locking durability: guaranteed │ - └── • scan - columns: (person, starts_with) - estimated row count: 10 (missing stats) - table: supermarket@supermarket_starts_with_idx - spans: /"m"-/"m"/PrefixEnd - locking strength: for update - locking durability: guaranteed + └── • project + │ columns: (person, aisle) + │ + └── • index join + │ columns: (person, aisle, starts_with) + │ estimated row count: 10 (missing stats) + │ table: supermarket@supermarket_pkey + │ key columns: person + │ + └── • scan + columns: (person, starts_with) + estimated row count: 10 (missing stats) + table: supermarket@supermarket_starts_with_idx + spans: /"m"-/"m"/PrefixEnd statement ok SET enable_zigzag_join = true @@ -291,17 +331,16 @@ SELECT aisle WHERE starts_with = 'm' AND ends_with = 'lda' FOR UPDATE ---- -project - └── inner-join (lookup supermarket) - ├── lookup columns are key - ├── locking: for-update,durability-guaranteed - ├── inner-join (zigzag supermarket@supermarket_starts_with_idx supermarket@supermarket_ends_with_idx) - │ ├── left locking: for-update,durability-guaranteed - │ ├── right locking: for-update,durability-guaranteed - │ └── filters - │ ├── starts_with = 'm' - │ └── ends_with = 'lda' - └── filters (true) +lock supermarket + ├── locking: for-update,durability-guaranteed + └── project + └── inner-join (lookup supermarket) + ├── lookup columns are key + ├── inner-join (zigzag supermarket@supermarket_starts_with_idx supermarket@supermarket_ends_with_idx) + │ └── filters + │ ├── starts_with = 'm' + │ └── ends_with = 'lda' + └── filters (true) query T EXPLAIN (VERBOSE) @@ -316,35 +355,38 @@ vectorized: true • project │ columns: (aisle) │ -└── • project - │ columns: (aisle, starts_with, ends_with) +└── • lookup join (semi) + │ columns: (person, aisle) + │ estimated row count: 1 (missing stats) + │ table: supermarket@supermarket_pkey + │ equality: (person) = (person) + │ equality cols are key + │ locking strength: for update + │ locking durability: guaranteed │ - └── • lookup join (inner) - │ columns: (person, starts_with, ends_with, aisle) - │ estimated row count: 1 (missing stats) - │ table: supermarket@supermarket_pkey - │ equality: (person) = (person) - │ equality cols are key - │ locking strength: for update - │ locking durability: guaranteed + └── • project + │ columns: (person, aisle) │ - └── • project - │ columns: (person, starts_with, ends_with) + └── • lookup join (inner) + │ columns: (person, starts_with, ends_with, aisle) + │ estimated row count: 1 (missing stats) + │ table: supermarket@supermarket_pkey + │ equality: (person) = (person) + │ equality cols are key │ - └── • zigzag join - columns: (person, starts_with, person, ends_with) - estimated row count: 1 (missing stats) - pred: (starts_with = 'm') AND (ends_with = 'lda') - left table: supermarket@supermarket_starts_with_idx - left columns: (person, starts_with) - left fixed values: 1 column - left locking strength: for update - left locking durability: guaranteed - right table: supermarket@supermarket_ends_with_idx - right columns: (person, ends_with) - right fixed values: 1 column - right locking strength: for update - right locking durability: guaranteed + └── • project + │ columns: (person, starts_with, ends_with) + │ + └── • zigzag join + columns: (person, starts_with, person, ends_with) + estimated row count: 1 (missing stats) + pred: (starts_with = 'm') AND (ends_with = 'lda') + left table: supermarket@supermarket_starts_with_idx + left columns: (person, starts_with) + left fixed values: 1 column + right table: supermarket@supermarket_ends_with_idx + right columns: (person, ends_with) + right fixed values: 1 column statement ok RESET enable_zigzag_join diff --git a/pkg/sql/opt/memo/check_expr.go b/pkg/sql/opt/memo/check_expr.go index efc7d2898d2f..f5696fdc8d40 100644 --- a/pkg/sql/opt/memo/check_expr.go +++ b/pkg/sql/opt/memo/check_expr.go @@ -306,6 +306,12 @@ func (m *Memo) CheckExpr(e opt.Expr) { m.checkColListLen(t.UpdateCols, tab.ColumnCount(), "UpdateCols") m.checkMutationExpr(t, &t.MutationPrivate) + case *LockExpr: + tab := m.Metadata().Table(t.Table) + m.checkColListLen( + opt.OptionalColList(t.KeyCols), tab.Index(cat.PrimaryIndex).KeyColumnCount(), "KeyCols", + ) + case *ZigzagJoinExpr: if len(t.LeftEqCols) != len(t.RightEqCols) { panic(errors.AssertionFailedf("zigzag join with mismatching eq columns")) diff --git a/pkg/sql/opt/memo/expr_format.go b/pkg/sql/opt/memo/expr_format.go index d89a9c15c84a..c5e4c1ab0d15 100644 --- a/pkg/sql/opt/memo/expr_format.go +++ b/pkg/sql/opt/memo/expr_format.go @@ -274,7 +274,7 @@ func (f *ExprFmtCtx) formatRelational(e RelExpr, tp treeprinter.Node) { f.Buffer.WriteByte(')') case *ScanExpr, *PlaceholderScanExpr, *IndexJoinExpr, *ShowTraceForSessionExpr, - *InsertExpr, *UpdateExpr, *UpsertExpr, *DeleteExpr, *SequenceSelectExpr, + *InsertExpr, *UpdateExpr, *UpsertExpr, *DeleteExpr, *LockExpr, *SequenceSelectExpr, *WindowExpr, *OpaqueRelExpr, *OpaqueMutationExpr, *OpaqueDDLExpr, *AlterTableSplitExpr, *AlterTableUnsplitExpr, *AlterTableUnsplitAllExpr, *AlterTableRelocateExpr, *AlterRangeRelocateExpr, *ControlJobsExpr, *CancelQueriesExpr, @@ -697,6 +697,9 @@ func (f *ExprFmtCtx) formatRelational(e RelExpr, tp treeprinter.Node) { f.formatMutationCommon(tp, &t.MutationPrivate) } + case *LockExpr: + f.formatLocking(tp, t.Locking) + case *WithExpr: switch t.Mtr { case tree.CTEMaterializeAlways: @@ -1695,6 +1698,9 @@ func FormatPrivate(f *ExprFmtCtx, private interface{}, physProps *physical.Requi case *MutationPrivate: f.formatIndex(t.Table, cat.PrimaryIndex, false /* reverse */) + case *LockPrivate: + f.formatIndex(t.Table, cat.PrimaryIndex, false /* reverse */) + case *OrdinalityPrivate: if !t.Ordering.Any() { fmt.Fprintf(f.Buffer, " ordering=%s", t.Ordering) diff --git a/pkg/sql/opt/memo/logical_props_builder.go b/pkg/sql/opt/memo/logical_props_builder.go index 47f7d03a98ec..b84bd42a7c0f 100644 --- a/pkg/sql/opt/memo/logical_props_builder.go +++ b/pkg/sql/opt/memo/logical_props_builder.go @@ -1516,6 +1516,25 @@ func (b *logicalPropsBuilder) buildMutationProps(mutation RelExpr, rel *props.Re } } +func (b *logicalPropsBuilder) buildLockProps(lock *LockExpr, rel *props.Relational) { + BuildSharedProps(lock, &rel.Shared, b.evalCtx) + + private := lock.Private().(*LockPrivate) + inputProps := lock.Child(0).(RelExpr).Relational() + + rel.OutputCols = inputProps.OutputCols.Copy() + rel.NotNullCols = inputProps.NotNullCols.Copy() + rel.FuncDeps.CopyFrom(&inputProps.FuncDeps) + rel.Cardinality = inputProps.Cardinality + if private.Locking.WaitPolicy == tree.LockWaitSkipLocked { + // SKIP LOCKED can act like a filter. + rel.Cardinality = rel.Cardinality.AsLowAs(0) + } + if !b.disableStats { + b.sb.buildLock(lock, rel) + } +} + func (b *logicalPropsBuilder) buildCreateTableProps(ct *CreateTableExpr, rel *props.Relational) { BuildSharedProps(ct, &rel.Shared, b.evalCtx) } diff --git a/pkg/sql/opt/memo/memo.go b/pkg/sql/opt/memo/memo.go index c9210dd25ed4..cc1c4d49443e 100644 --- a/pkg/sql/opt/memo/memo.go +++ b/pkg/sql/opt/memo/memo.go @@ -172,6 +172,7 @@ type Memo struct { implicitFKLockingForSerializable bool durableLockingForSerializable bool sharedLockingForSerializable bool + useLockOpForSerializable bool // txnIsoLevel is the isolation level under which the plan was created. This // affects the planning of some locking operations, so it must be included in @@ -242,6 +243,7 @@ func (m *Memo) Init(ctx context.Context, evalCtx *eval.Context) { implicitFKLockingForSerializable: evalCtx.SessionData().ImplicitFKLockingForSerializable, durableLockingForSerializable: evalCtx.SessionData().DurableLockingForSerializable, sharedLockingForSerializable: evalCtx.SessionData().SharedLockingForSerializable, + useLockOpForSerializable: evalCtx.SessionData().OptimizerUseLockOpForSerializable, txnIsoLevel: evalCtx.TxnIsoLevel, } m.metadata.Init() @@ -386,6 +388,7 @@ func (m *Memo) IsStale( m.implicitFKLockingForSerializable != evalCtx.SessionData().ImplicitFKLockingForSerializable || m.durableLockingForSerializable != evalCtx.SessionData().DurableLockingForSerializable || m.sharedLockingForSerializable != evalCtx.SessionData().SharedLockingForSerializable || + m.useLockOpForSerializable != evalCtx.SessionData().OptimizerUseLockOpForSerializable || m.txnIsoLevel != evalCtx.TxnIsoLevel { return true, nil } @@ -585,3 +588,9 @@ var GetLookupJoinLookupTableDistribution func( required *physical.Required, optimizer interface{}, ) (physicalDistribution physical.Distribution) + +// CopyLockGroupIntoLookupJoin helps us create a mock LookupJoinExpr in +// execbuilder during building of LockExpr. +func CopyLockGroupIntoLookupJoin(lock *LockExpr, join *LookupJoinExpr) { + join.grp = lock.grp +} diff --git a/pkg/sql/opt/memo/memo_test.go b/pkg/sql/opt/memo/memo_test.go index b00e86fa8a99..4cdd4acc9d00 100644 --- a/pkg/sql/opt/memo/memo_test.go +++ b/pkg/sql/opt/memo/memo_test.go @@ -406,6 +406,12 @@ func TestMemoIsStale(t *testing.T) { evalCtx.SessionData().SharedLockingForSerializable = false notStale() + // Stale optimizer_use_lock_op_for_serializable. + evalCtx.SessionData().OptimizerUseLockOpForSerializable = true + stale() + evalCtx.SessionData().OptimizerUseLockOpForSerializable = false + notStale() + // Stale txn isolation level. evalCtx.TxnIsoLevel = isolation.ReadCommitted stale() diff --git a/pkg/sql/opt/memo/statistics_builder.go b/pkg/sql/opt/memo/statistics_builder.go index 1791289b7df6..c3d322050fc5 100644 --- a/pkg/sql/opt/memo/statistics_builder.go +++ b/pkg/sql/opt/memo/statistics_builder.go @@ -477,6 +477,9 @@ func (sb *statisticsBuilder) colStat(colSet opt.ColSet, e RelExpr) *props.Column case opt.InsertOp, opt.UpdateOp, opt.UpsertOp, opt.DeleteOp: return sb.colStatMutation(colSet, e) + case opt.LockOp: + return sb.colStatLock(colSet, e.(*LockExpr)) + case opt.SequenceSelectOp: return sb.colStatSequenceSelect(colSet, e.(*SequenceSelectExpr)) @@ -2656,6 +2659,37 @@ func (sb *statisticsBuilder) colStatMutation( return colStat } +// +------+ +// | Lock | +// +------+ + +func (sb *statisticsBuilder) buildLock(lock *LockExpr, relProps *props.Relational) { + s := relProps.Statistics() + if zeroCardinality := s.Init(relProps); zeroCardinality { + // Short cut if cardinality is 0. + return + } + s.Available = sb.availabilityFromInput(lock) + + inputStats := lock.Input.Relational().Statistics() + + s.RowCount = inputStats.RowCount + sb.finalizeFromCardinality(relProps) +} + +func (sb *statisticsBuilder) colStatLock(colSet opt.ColSet, lock *LockExpr) *props.ColumnStatistic { + s := lock.Relational().Statistics() + + inColStat := sb.colStatFromChild(colSet, lock, 0 /* childIdx */) + + // Construct colstat using the corresponding input stats. + colStat, _ := s.ColStats.Add(colSet) + colStat.DistinctCount = inColStat.DistinctCount + colStat.NullCount = inColStat.NullCount + sb.finalizeFromRowCountAndDistinctCounts(colStat, s) + return colStat +} + // +-----------------+ // | Sequence Select | // +-----------------+ diff --git a/pkg/sql/opt/ops/README.md b/pkg/sql/opt/ops/README.md index 6c6a06650230..e130ec0dc60b 100644 --- a/pkg/sql/opt/ops/README.md +++ b/pkg/sql/opt/ops/README.md @@ -73,4 +73,3 @@ Tags for relational operators: - `WithBinding`: used for operators which associate a `WithID` with the expression in the first child. Such expressions must implement a `WithBindingID()` method. - diff --git a/pkg/sql/opt/ops/mutation.opt b/pkg/sql/opt/ops/mutation.opt index b170c6112993..4db94d421a1a 100644 --- a/pkg/sql/opt/ops/mutation.opt +++ b/pkg/sql/opt/ops/mutation.opt @@ -346,3 +346,50 @@ define UniqueChecksItemPrivate { # in the error message. KeyCols ColList } + +# Lock evaluates a relational input expression, and locks rows in the given +# table based on primary key columns provided by the input expression. Lock is +# produced by FOR UPDATE and FOR SHARE clauses on SELECT statements: +# +# SELECT * FROM ab WHERE a > 0 ORDER BY b LIMIT 10 FOR UPDATE +# +# The locking strength, wait policy, form, and durability are specified by the +# operator, meaning that a single Lock operator represents locking one table at +# one strength. A single statement using FOR UPDATE or FOR SHARE may require +# multiple Lock operators in order to lock multiple tables, or even to lock the +# same table at multiple different strengths. +# +# The input expression is prohibited from using certain operations such as outer +# joins and aggregations in order to clarify the semantics of which rows are +# locked. Outside of these prohibitions the input can be arbitrary, including +# inner joins and subqueries, but must provide the primary key columns of the +# table being locked. +# +# The Lock operator does not necessarily have to be at the top of a plan. It +# could appear within a subtree of the plan, and in this case acts as an +# optimization barrier to ensure the correct rows are locked. +[Relational, Mutation] +define Lock { + Input RelExpr + _ LockPrivate +} + +[Private] +define LockPrivate { + # Table identifies the table to lock. Currently only the primary index of + # the table is locked. + Table TableID + + # Locking represents the row-level locking mode to use when locking the + # primary index. + Locking Locking + + # KeyCols are the primary key columns produced by the input, used to lookup + # into the primary index of the table. They must be in the same order as the + # primary key columns of the table. + KeyCols ColList + + # Cols is the set of columns from the input expression returned by the Lock + # operator. Cols contains all of the KeyCols. + Cols ColSet +} diff --git a/pkg/sql/opt/optbuilder/join.go b/pkg/sql/opt/optbuilder/join.go index b3492f674672..4e7fa5890e33 100644 --- a/pkg/sql/opt/optbuilder/join.go +++ b/pkg/sql/opt/optbuilder/join.go @@ -33,6 +33,8 @@ import ( func (b *Builder) buildJoin( join *tree.JoinTableExpr, lockCtx lockingContext, inScope *scope, ) (outScope *scope) { + // TODO(michae2, 97434): Poison the lockCtx for the null-extended side(s) if + // this is an outer join. leftScope := b.buildDataSource(join.Left, nil /* indexFlags */, lockCtx, inScope) inScopeRight := inScope diff --git a/pkg/sql/opt/optbuilder/locking.go b/pkg/sql/opt/optbuilder/locking.go index 48391a56e784..e6550a406a21 100644 --- a/pkg/sql/opt/optbuilder/locking.go +++ b/pkg/sql/opt/optbuilder/locking.go @@ -11,7 +11,11 @@ package optbuilder import ( + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/sql/opt" + "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" + "github.com/cockroachdb/cockroach/pkg/sql/opt/memo" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -68,6 +72,31 @@ type lockingItem struct { // targetsFound is used to validate that we matched all of the lock targets. targetsFound intsets.Fast + + // builders has one lockBuilder for each data source that matched this + // item. Each lockBuilder here will become one Lock operator in the plan. + builders []*lockBuilder +} + +// lockBuilder is a helper for building Lock operators for a single data +// source. It keeps track of the PK columns of the table. The same lockBuilder +// may be referenced by multiple lockingItems. +type lockBuilder struct { + table opt.TableID + keyCols opt.ColList +} + +// newLockBuilder constructs a lockBuilder for the passed table. +func newLockBuilder(tabMeta *opt.TableMeta) *lockBuilder { + primaryIndex := tabMeta.Table.Index(cat.PrimaryIndex) + lb := &lockBuilder{ + table: tabMeta.MetaID, + keyCols: make(opt.ColList, primaryIndex.KeyColumnCount()), + } + for i := range lb.keyCols { + lb.keyCols[i] = lb.table.IndexColumnID(primaryIndex, i) + } + return lb } // lockingSpec maintains a collection of FOR [KEY] UPDATE/SHARE items that apply @@ -202,6 +231,49 @@ func (lockCtx *lockingContext) withoutTargets() { // > clause within the WITH query. func (lm lockingSpec) ignoreLockingForCTE() {} +// analyzeLockArgs analyzes all locking clauses currently in scope and adds the +// PK columns needed for those clauses to lockScope. +func (b *Builder) analyzeLockArgs( + lockCtx lockingContext, inScope, projectionsScope *scope, +) (lockScope *scope) { + if !b.shouldBuildLockOp() { + return nil + } + + // Get all the PK cols of all lockBuilders in scope. + var pkCols opt.ColSet + for _, item := range lockCtx.lockScope { + for _, lb := range item.builders { + for _, col := range lb.keyCols { + pkCols.Add(col) + } + } + } + + if pkCols.Empty() { + return nil + } + + lockScope = inScope.push() + lockScope.cols = make([]scopeColumn, 0, pkCols.Len()) + + for i := range inScope.cols { + if pkCols.Contains(inScope.cols[i].id) { + lockScope.appendColumn(&inScope.cols[i]) + } + } + return lockScope +} + +// buildLockArgs adds the PK columns needed for all locking clauses currently in +// scope to the projectionsScope. +func (b *Builder) buildLockArgs(inScope, projectionsScope, lockScope *scope) { + if lockScope == nil { + return + } + projectionsScope.addExtraColumns(lockScope.cols) +} + // validate checks that the locking item is well-formed, and that all of its // targets matched a data source in the FROM clause. func (item *lockingItem) validate() { @@ -269,6 +341,65 @@ func (item *lockingItem) validate() { } } +// shouldUseGuaranteedDurability returns whether we should use +// guaranteed-durable locking for SELECT FOR UPDATE, SELECT FOR SHARE, or +// constraint checks. +func (b *Builder) shouldUseGuaranteedDurability() bool { + return b.evalCtx.Settings.Version.IsActive(b.ctx, clusterversion.V23_2) && + (b.evalCtx.TxnIsoLevel != isolation.Serializable || + b.evalCtx.SessionData().DurableLockingForSerializable) +} + +// shouldBuildLockOp returns whether we should use the Lock operator for SELECT +// FOR UPDATE or SELECT FOR SHARE. +func (b *Builder) shouldBuildLockOp() bool { + return b.evalCtx.Settings.Version.IsActive(b.ctx, clusterversion.V23_2) && + (b.evalCtx.TxnIsoLevel != isolation.Serializable || + b.evalCtx.SessionData().OptimizerUseLockOpForSerializable) +} + +// buildLocking constructs one Lock operator for each data source that this +// lockingItem applied to. +func (b *Builder) buildLocking(item *lockingItem, inScope *scope) { + locking := lockingSpec{item}.get() + // Under weaker isolation levels we use fully-durable locks for SELECT FOR + // UPDATE. + if b.shouldUseGuaranteedDurability() { + locking.Durability = tree.LockDurabilityGuaranteed + } + for i := range item.builders { + b.buildLock(item.builders[i], locking, inScope) + } +} + +// buildLock constructs a Lock operator for a single data source at a single +// locking strength. +func (b *Builder) buildLock(lb *lockBuilder, locking opt.Locking, inScope *scope) { + md := b.factory.Metadata() + tab := md.Table(lb.table) + private := &memo.LockPrivate{ + Table: lb.table, + Locking: locking, + KeyCols: lb.keyCols, + Cols: inScope.colSetWithExtraCols(), + } + // Validate that all of the PK cols are found within the input scope. + scopeCols := private.Cols + for _, keyCol := range private.KeyCols { + if !scopeCols.Contains(keyCol) { + panic(errors.AssertionFailedf("cols missing key column %d", keyCol)) + } + } + if private.Locking.WaitPolicy == tree.LockWaitSkipLocked && tab.FamilyCount() > 1 { + // TODO(rytaft): We may be able to support this if enough columns are + // pruned that only a single family is scanned. + panic(pgerror.Newf(pgcode.FeatureNotSupported, + "SKIP LOCKED cannot be used for tables with multiple column families", + )) + } + inScope.expr = b.factory.ConstructLock(inScope.expr, private) +} + // lockingSpecForClause converts a lockingClause to a lockingSpec. func lockingSpecForClause(lockingClause tree.LockingClause) (lm lockingSpec) { for _, li := range lockingClause { diff --git a/pkg/sql/opt/optbuilder/scope.go b/pkg/sql/opt/optbuilder/scope.go index d1ff62b2ae71..9c5effbdd6ff 100644 --- a/pkg/sql/opt/optbuilder/scope.go +++ b/pkg/sql/opt/optbuilder/scope.go @@ -72,8 +72,9 @@ type scope struct { // distinctOnCols records the DISTINCT ON columns by ID. distinctOnCols opt.ColSet - // extraCols contains columns specified by the ORDER BY or DISTINCT ON clauses - // which don't appear in cols. + // extraCols contains columns specified by the ORDER BY or DISTINCT ON + // clauses, or needed by FOR UPDATE or FOR SHARE clauses, which don't appear + // in cols. extraCols []scopeColumn // expr is the SQL node built with this scope. @@ -569,7 +570,8 @@ func (s *scope) hasSameColumns(other *scope) bool { } // removeHiddenCols removes hidden columns from the scope (and moves them to -// extraCols, in case they are referenced by ORDER BY or DISTINCT ON). +// extraCols, in case they are referenced by ORDER BY or DISTINCT ON or needed +// by FOR UPDATE or FOR SHARE). func (s *scope) removeHiddenCols() { n := 0 for i := range s.cols { diff --git a/pkg/sql/opt/optbuilder/select.go b/pkg/sql/opt/optbuilder/select.go index eb9f5153d4ac..1d2cc235ee01 100644 --- a/pkg/sql/opt/optbuilder/select.go +++ b/pkg/sql/opt/optbuilder/select.go @@ -14,8 +14,6 @@ import ( "context" "fmt" - "github.com/cockroachdb/cockroach/pkg/clusterversion" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" @@ -148,6 +146,16 @@ func (b *Builder) buildDataSource( switch t := ds.(type) { case cat.Table: tabMeta := b.addTable(t, &resName) + locking := lockCtx.locking + if locking.isSet() { + lb := newLockBuilder(tabMeta) + for _, item := range locking { + item.builders = append(item.builders, lb) + } + } + if b.shouldBuildLockOp() { + locking = nil + } return b.buildScan( tabMeta, tableOrdinals(t, columnKinds{ @@ -155,7 +163,7 @@ func (b *Builder) buildDataSource( includeSystem: true, includeInverted: false, }), - indexFlags, lockCtx.locking, inScope, + indexFlags, locking, inScope, false, /* disableNotVisibleIndex */ ) @@ -461,7 +469,15 @@ func (b *Builder) buildScanFromTableRef( tn := tree.MakeUnqualifiedTableName(tab.Name()) tabMeta := b.addTable(tab, &tn) - + if locking.isSet() { + lb := newLockBuilder(tabMeta) + for _, item := range locking { + item.builders = append(item.builders, lb) + } + } + if b.shouldBuildLockOp() { + locking = nil + } return b.buildScan( tabMeta, ordinals, indexFlags, locking, inScope, false, /* disableNotVisibleIndex */ ) @@ -702,9 +718,7 @@ func (b *Builder) buildScan( } if locking.isSet() { private.Locking = locking.get() - if b.evalCtx.Settings.Version.IsActive(b.ctx, clusterversion.V23_2) && - (b.evalCtx.TxnIsoLevel != isolation.Serializable || - b.evalCtx.SessionData().DurableLockingForSerializable) { + if b.shouldUseGuaranteedDurability() { // Under weaker isolation levels we use fully-durable locks for SELECT FOR // UPDATE statements, SELECT FOR SHARE statements, and constraint checks // (e.g. FK checks), regardless of locking strength and wait policy. @@ -1158,11 +1172,15 @@ func (b *Builder) buildSelectStmtWithoutParens( b.buildLimit(limit, inScope, outScope) } - // Remove locking items from scope, and validate that they were found within - // the FROM clause. + // Remove locking items from scope, validate that they were found within the + // FROM clause, and build them. for range lockingClause { item := lockCtx.pop() item.validate() + if b.shouldBuildLockOp() { + // TODO(michae2): Combine multiple buildLock calls for the same table. + b.buildLocking(item, outScope) + } } // TODO(rytaft): Support FILTER expression. @@ -1203,6 +1221,7 @@ func (b *Builder) buildSelectClause( orderByScope := b.analyzeOrderBy(orderBy, fromScope, projectionsScope, exprKindOrderBy, tree.RejectGenerators) distinctOnScope := b.analyzeDistinctOnArgs(sel.DistinctOn, fromScope, projectionsScope) + lockScope := b.analyzeLockArgs(lockCtx, fromScope, projectionsScope) var having opt.ScalarExpr needsAgg := b.needsAggregation(sel, fromScope) @@ -1217,6 +1236,7 @@ func (b *Builder) buildSelectClause( b.buildProjectionList(fromScope, projectionsScope) b.buildOrderBy(fromScope, projectionsScope, orderByScope) b.buildDistinctOnArgs(fromScope, projectionsScope, distinctOnScope) + b.buildLockArgs(fromScope, projectionsScope, lockScope) b.buildProjectSet(fromScope) if needsAgg { diff --git a/pkg/sql/opt/ordering/mutation.go b/pkg/sql/opt/ordering/mutation.go index a52fef711a09..54cbce92a495 100644 --- a/pkg/sql/opt/ordering/mutation.go +++ b/pkg/sql/opt/ordering/mutation.go @@ -57,3 +57,23 @@ func mutationBuildProvided(expr memo.RelExpr, required *props.OrderingChoice) op // Ensure that provided ordering only uses projected columns. return remapProvided(provided, &fdset, expr.Relational().OutputCols) } + +func lockCanProvideOrdering(expr memo.RelExpr, required *props.OrderingChoice) bool { + // The lock operator can always pass through ordering to its input. + return true +} + +func lockBuildChildReqOrdering( + parent memo.RelExpr, required *props.OrderingChoice, childIdx int, +) props.OrderingChoice { + if childIdx != 0 { + return props.OrderingChoice{} + } + return *required +} + +func lockBuildProvided(expr memo.RelExpr, required *props.OrderingChoice) opt.Ordering { + lock := expr.(*memo.LockExpr) + // Need to remember the ordering to set in lookupjoin? + return lock.Input.ProvidedPhysical().Ordering +} diff --git a/pkg/sql/opt/ordering/ordering.go b/pkg/sql/opt/ordering/ordering.go index bb1ffd7830c9..a58d18a2409a 100644 --- a/pkg/sql/opt/ordering/ordering.go +++ b/pkg/sql/opt/ordering/ordering.go @@ -266,6 +266,11 @@ func init() { buildChildReqOrdering: mutationBuildChildReqOrdering, buildProvidedOrdering: mutationBuildProvided, } + funcMap[opt.LockOp] = funcs{ + canProvideOrdering: lockCanProvideOrdering, + buildChildReqOrdering: lockBuildChildReqOrdering, + buildProvidedOrdering: lockBuildProvided, + } funcMap[opt.ExplainOp] = funcs{ canProvideOrdering: canNeverProvideOrdering, buildChildReqOrdering: explainBuildChildReqOrdering, diff --git a/pkg/sql/opt/xform/physical_props.go b/pkg/sql/opt/xform/physical_props.go index 9373594024b2..fe72934588cb 100644 --- a/pkg/sql/opt/xform/physical_props.go +++ b/pkg/sql/opt/xform/physical_props.go @@ -108,7 +108,7 @@ func BuildChildPhysicalProps( } } - case opt.IndexJoinOp: + case opt.IndexJoinOp, opt.LockOp: // For an index join, every input row results in exactly one output row. childProps.LimitHint = parentProps.LimitHint diff --git a/pkg/sql/sessiondatapb/local_only_session_data.proto b/pkg/sql/sessiondatapb/local_only_session_data.proto index a472edce3be1..cee2741e009e 100644 --- a/pkg/sql/sessiondatapb/local_only_session_data.proto +++ b/pkg/sql/sessiondatapb/local_only_session_data.proto @@ -435,7 +435,6 @@ message LocalOnlySessionData { // automatic retries to perform for statements in explicit READ COMMITTED // transactions that see a transaction retry error. int32 max_retries_for_read_committed = 110; - // StrictDDLAtomicity causes errors when the client attempts DDL // operations inside an explicit txn and CockroachDB cannot // guarantee the DDL to be performed atomically. @@ -448,10 +447,17 @@ message LocalOnlySessionData { // not occur any more (at the expense of disabling certain // forms of DDL inside explicit txns). bool strict_ddl_atomicity = 111 [(gogoproto.customname) = "StrictDDLAtomicity"]; - // UnsafeSettingInterlockKey needs to be set to a special string // before SET CLUSTER SETTING is allowed on an unsafe setting. string unsafe_setting_interlock_key = 113; + // OptimizerUseLockOpForSerializable, when true, instructs the optimizer to + // implement SELECT FOR UPDATE and SELECT FOR SHARE statements using the Lock + // operator under serializable isolation. + // + // For correctness, under weaker isolation levels the optimizer always + // implements SELECT FOR UPDATE and SELECT FOR SHARE using the Lock operator, + // regardless of this setting. + bool optimizer_use_lock_op_for_serializable = 114; /////////////////////////////////////////////////////////////////////////// // WARNING: consider whether a session parameter you're adding needs to // diff --git a/pkg/sql/vars.go b/pkg/sql/vars.go index 60b977f06879..10df42358ecb 100644 --- a/pkg/sql/vars.go +++ b/pkg/sql/vars.go @@ -2947,6 +2947,23 @@ var varGen = map[string]sessionVar{ }, GlobalDefault: func(_ *settings.Values) string { return "" }, }, + + // CockroachDB extension. + `optimizer_use_lock_op_for_serializable`: { + GetStringVal: makePostgresBoolGetStringValFn(`optimizer_use_lock_op_for_serializable`), + Set: func(_ context.Context, m sessionDataMutator, s string) error { + b, err := paramparse.ParseBoolVar("optimizer_use_lock_op_for_serializable", s) + if err != nil { + return err + } + m.SetOptimizerUseLockOpForSerializable(b) + return nil + }, + Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) { + return formatBoolAsPostgresSetting(evalCtx.SessionData().OptimizerUseLockOpForSerializable), nil + }, + GlobalDefault: globalFalse, + }, } func ReplicationModeFromString(s string) (sessiondatapb.ReplicationMode, error) {