diff --git a/executor/insert_common.go b/executor/insert_common.go index 4ed9d9b9774d9..36c187c13ef9f 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -152,8 +152,10 @@ func (e *InsertValues) initInsertColumns() error { } cols, missingColIdx = table.FindColumns(tableCols, columns, e.Table.Meta().PKIsHandle) if missingColIdx >= 0 { - return errors.Errorf("INSERT INTO %s: unknown column %s", - e.Table.Meta().Name.O, e.Columns[missingColIdx].Name.O) + return errors.Errorf( + "INSERT INTO %s: unknown column %s", + e.Table.Meta().Name.O, e.Columns[missingColIdx].Name.O, + ) } } else { // If e.Columns are empty, use all columns instead. @@ -389,7 +391,9 @@ func (e *InsertValues) evalRow(ctx context.Context, list []expression.Expression var emptyRow chunk.Row -func (e *InsertValues) fastEvalRow(ctx context.Context, list []expression.Expression, rowIdx int) ([]types.Datum, error) { +func (e *InsertValues) fastEvalRow(ctx context.Context, list []expression.Expression, rowIdx int) ( + []types.Datum, error, +) { rowLen := len(e.Table.Cols()) if e.hasExtraHandle { rowLen++ @@ -648,7 +652,9 @@ func (e *InsertValues) fillColValue( // `insert|replace values` can guarantee consecutive autoID in a batch. // Other statements like `insert select from` don't guarantee consecutive autoID. // https://dev.mysql.com/doc/refman/8.0/en/innodb-auto-increment-handling.html -func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue []bool, rowIdx int) ([]types.Datum, error) { +func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue []bool, rowIdx int) ( + []types.Datum, error, +) { gCols := make([]*table.Column, 0) tCols := e.Table.Cols() if e.hasExtraHandle { @@ -691,7 +697,12 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue if !ok { return nil, errors.Errorf("exchange partition process assert table partition failed") } - err := p.CheckForExchangePartition(e.ctx, pt.Meta().Partition, row, tbl.ExchangePartitionInfo.ExchangePartitionDefID) + err := p.CheckForExchangePartition( + e.ctx, + pt.Meta().Partition, + row, + tbl.ExchangePartitionInfo.ExchangePartitionDefID, + ) if err != nil { return nil, err } @@ -804,7 +815,9 @@ func setDatumAutoIDAndCast(ctx sessionctx.Context, d *types.Datum, id int64, col // lazyAdjustAutoIncrementDatum is quite similar to adjustAutoIncrementDatum // except it will cache auto increment datum previously for lazy batch allocation of autoID. -func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows [][]types.Datum) ([][]types.Datum, error) { +func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows [][]types.Datum) ( + [][]types.Datum, error, +) { // Not in lazyFillAutoID mode means no need to fill. if !e.lazyFillAutoID { return rows, nil @@ -896,7 +909,9 @@ func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows [] return rows, nil } -func (e *InsertValues) adjustAutoIncrementDatum(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) { +func (e *InsertValues) adjustAutoIncrementDatum( + ctx context.Context, d types.Datum, hasValue bool, c *table.Column, +) (types.Datum, error) { retryInfo := e.ctx.GetSessionVars().RetryInfo if retryInfo.Retrying { id, ok := retryInfo.GetCurrAutoIncrementID() @@ -972,7 +987,9 @@ func getAutoRecordID(d types.Datum, target *types.FieldType, isInsert bool) (int return recordID, nil } -func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) { +func (e *InsertValues) adjustAutoRandomDatum( + ctx context.Context, d types.Datum, hasValue bool, c *table.Column, +) (types.Datum, error) { retryInfo := e.ctx.GetSessionVars().RetryInfo if retryInfo.Retrying { autoRandomID, ok := retryInfo.GetCurrAutoRandomID() @@ -1071,7 +1088,9 @@ func (e *InsertValues) rebaseAutoRandomID(ctx context.Context, recordID int64, f return alloc.Rebase(ctx, autoRandomID, true) } -func (e *InsertValues) adjustImplicitRowID(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) { +func (e *InsertValues) adjustImplicitRowID( + ctx context.Context, d types.Datum, hasValue bool, c *table.Column, +) (types.Datum, error) { var err error var recordID int64 if !hasValue { @@ -1119,7 +1138,11 @@ func (e *InsertValues) rebaseImplicitRowID(ctx context.Context, recordID int64) alloc := e.Table.Allocators(e.ctx).Get(autoid.RowIDAllocType) tableInfo := e.Table.Meta() - shardFmt := autoid.NewShardIDFormat(types.NewFieldType(mysql.TypeLonglong), tableInfo.ShardRowIDBits, autoid.RowIDBitLength) + shardFmt := autoid.NewShardIDFormat( + types.NewFieldType(mysql.TypeLonglong), + tableInfo.ShardRowIDBits, + autoid.RowIDBitLength, + ) newTiDBRowIDBase := shardFmt.IncrementalMask() & recordID return alloc.Rebase(ctx, newTiDBRowIDBase, true) @@ -1147,9 +1170,11 @@ func (e *InsertValues) collectRuntimeStatsEnabled() bool { // batchCheckAndInsert checks rows with duplicate errors. // All duplicate rows will be ignored and appended as duplicate warnings. -func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.Datum, +func (e *InsertValues) batchCheckAndInsert( + ctx context.Context, rows [][]types.Datum, addRecord func(ctx context.Context, row []types.Datum) error, - replace bool) error { + replace bool, +) error { // all the rows will be checked, so it is safe to set BatchCheck = true e.ctx.GetSessionVars().StmtCtx.BatchCheck = true defer tracing.StartRegion(ctx, "InsertValues.batchCheckAndInsert").End() @@ -1218,7 +1243,8 @@ CheckAndInsert: } } else { e.ctx.GetSessionVars().StmtCtx.AppendWarning(r.handleKey.dupErr) - if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic { + if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic && + e.ctx.GetSessionVars().LockUnchangedKeys { // lock duplicated row key on insert-ignore txnCtx.AddUnchangedKeyForLock(r.handleKey.newKey) } @@ -1254,7 +1280,8 @@ CheckAndInsert: } else { // If duplicate keys were found in BatchGet, mark row = nil. e.ctx.GetSessionVars().StmtCtx.AppendWarning(uk.dupErr) - if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic { + if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic && + e.ctx.GetSessionVars().LockUnchangedKeys { // lock duplicated unique key on insert-ignore txnCtx.AddUnchangedKeyForLock(uk.newKey) } @@ -1293,9 +1320,11 @@ func (e *InsertValues) removeRow( newRow := r.row oldRow, err := getOldRow(ctx, e.ctx, txn, r.t, handle, e.GenExprs) if err != nil { - logutil.BgLogger().Error("get old row failed when replace", + logutil.BgLogger().Error( + "get old row failed when replace", zap.String("handle", handle.String()), - zap.String("toBeInsertedRow", types.DatumsToStrNoErr(r.row))) + zap.String("toBeInsertedRow", types.DatumsToStrNoErr(r.row)), + ) if kv.IsErrNotFound(err) { err = errors.NotFoundf("can not be duplicated row, due to old row not found. handle %s", handle) } @@ -1310,7 +1339,11 @@ func (e *InsertValues) removeRow( if inReplace { e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1) } - if _, err := addUnchangedKeysForLockByRow(e.ctx, r.t, handle, oldRow, lockRowKey|lockUniqueKeys); err != nil { + keySet := lockRowKey + if e.ctx.GetSessionVars().LockUnchangedKeys { + keySet |= lockUniqueKeys + } + if _, err := addUnchangedKeysForLockByRow(e.ctx, r.t, handle, oldRow, keySet); err != nil { return false, err } return true, nil @@ -1354,7 +1387,9 @@ func (e *InsertValues) addRecord(ctx context.Context, row []types.Datum) error { return e.addRecordWithAutoIDHint(ctx, row, 0) } -func (e *InsertValues) addRecordWithAutoIDHint(ctx context.Context, row []types.Datum, reserveAutoIDCount int) (err error) { +func (e *InsertValues) addRecordWithAutoIDHint( + ctx context.Context, row []types.Datum, reserveAutoIDCount int, +) (err error) { vars := e.ctx.GetSessionVars() if !vars.ConstraintCheckInPlace { vars.PresumeKeyNotExists = true diff --git a/executor/insert_test.go b/executor/insert_test.go index d6e4b08f93252..a72bb308ac03e 100644 --- a/executor/insert_test.go +++ b/executor/insert_test.go @@ -1510,68 +1510,86 @@ func TestIssue32213(t *testing.T) { tk.MustQuery("select cast(test.t1.c1 as decimal(6, 3)) from test.t1").Check(testkit.Rows("100.000")) } -func TestInsertLock(t *testing.T) { +func TestInsertLockUnchangedKeys(t *testing.T) { store := testkit.CreateMockStore(t) tk1 := testkit.NewTestKit(t, store) tk2 := testkit.NewTestKit(t, store) tk1.MustExec("use test") tk2.MustExec("use test") - for _, tt := range []struct { - name string - ddl string - dml string - }{ - { - "replace-pk", - "create table t (c int primary key clustered)", - "replace into t values (1)", - }, - { - "replace-uk", - "create table t (c int unique key)", - "replace into t values (1)", - }, - { - "insert-ingore-pk", - "create table t (c int primary key clustered)", - "insert ignore into t values (1)", - }, - { - "insert-ingore-uk", - "create table t (c int unique key)", - "insert ignore into t values (1)", - }, - { - "insert-update-pk", - "create table t (c int primary key clustered)", - "insert into t values (1) on duplicate key update c = values(c)", - }, - { - "insert-update-uk", - "create table t (c int unique key)", - "insert into t values (1) on duplicate key update c = values(c)", - }, - } { - t.Run(tt.name, func(t *testing.T) { - tk1.MustExec("drop table if exists t") - tk1.MustExec(tt.ddl) - tk1.MustExec("insert into t values (1)") - tk1.MustExec("begin") - tk1.MustExec(tt.dml) - done := make(chan struct{}) - go func() { - tk2.MustExec("delete from t") - done <- struct{}{} - }() - select { - case <-done: - require.Failf(t, "txn2 is not blocked by %q", tt.dml) - case <-time.After(100 * time.Millisecond): - } - tk1.MustExec("commit") - <-done - tk1.MustQuery("select * from t").Check([][]interface{}{}) - }) + for _, shouldLock := range []bool{false} { + for _, tt := range []struct { + name string + ddl string + dml string + isClusteredPK bool + }{ + { + "replace-pk", + "create table t (c int primary key clustered)", + "replace into t values (1)", + true, + }, + { + "replace-uk", + "create table t (c int unique key)", + "replace into t values (1)", + false, + }, + { + "insert-ignore-pk", + "create table t (c int primary key clustered)", + "insert ignore into t values (1)", + true, + }, + { + "insert-ignore-uk", + "create table t (c int unique key)", + "insert ignore into t values (1)", + false, + }, + { + "insert-update-pk", + "create table t (c int primary key clustered)", + "insert into t values (1) on duplicate key update c = values(c)", + true, + }, + { + "insert-update-uk", + "create table t (c int unique key)", + "insert into t values (1) on duplicate key update c = values(c)", + false, + }, + } { + t.Run( + tt.name+"-"+strconv.FormatBool(shouldLock), func(t *testing.T) { + tk1.MustExec(fmt.Sprintf("set @@tidb_lock_unchanged_keys = %v", shouldLock)) + tk1.MustExec("drop table if exists t") + tk1.MustExec(tt.ddl) + tk1.MustExec("insert into t values (1)") + tk1.MustExec("begin") + tk1.MustExec(tt.dml) + errCh := make(chan error) + go func() { + _, err := tk2.Exec("insert into t values (1)") + errCh <- err + }() + select { + case <-errCh: + if shouldLock { + require.Failf(t, "txn2 is not blocked by %q", tt.dml) + } + close(errCh) + case <-time.After(200 * time.Millisecond): + if !shouldLock && !tt.isClusteredPK { + require.Failf(t, "txn2 is blocked by %q", tt.dml) + } + } + tk1.MustExec("commit") + <-errCh + tk1.MustQuery("select * from t").Check(testkit.Rows("1")) + }, + ) + } } } diff --git a/executor/update_test.go b/executor/update_test.go index 2ef2093f12610..10f1b5cb87578 100644 --- a/executor/update_test.go +++ b/executor/update_test.go @@ -15,6 +15,8 @@ package executor_test import ( + "fmt" + "strconv" "testing" "time" @@ -34,10 +36,17 @@ func TestUpdateGenColInTxn(t *testing.T) { tk.MustExec(`begin;`) tk.MustExec(`insert into t(a) values(1);`) err := tk.ExecToErr(`update t set b=6 where b=2;`) - require.Equal(t, "[planner:3105]The value specified for generated column 'b' in table 't' is not allowed.", err.Error()) + require.Equal( + t, + "[planner:3105]The value specified for generated column 'b' in table 't' is not allowed.", + err.Error(), + ) tk.MustExec(`commit;`) - tk.MustQuery(`select * from t;`).Check(testkit.Rows( - `1 2`)) + tk.MustQuery(`select * from t;`).Check( + testkit.Rows( + `1 2`, + ), + ) } func TestUpdateWithAutoidSchema(t *testing.T) { @@ -174,10 +183,17 @@ func TestUpdateSchemaChange(t *testing.T) { tk.MustExec(`begin;`) tk.MustExec(`insert into t(a) values(1);`) err := tk.ExecToErr(`update t set b=6 where b=2;`) - require.Equal(t, "[planner:3105]The value specified for generated column 'b' in table 't' is not allowed.", err.Error()) + require.Equal( + t, + "[planner:3105]The value specified for generated column 'b' in table 't' is not allowed.", + err.Error(), + ) tk.MustExec(`commit;`) - tk.MustQuery(`select * from t;`).Check(testkit.Rows( - `1 2`)) + tk.MustQuery(`select * from t;`).Check( + testkit.Rows( + `1 2`, + ), + ) } func TestUpdateMultiDatabaseTable(t *testing.T) { @@ -272,8 +288,10 @@ func TestMultiUpdateOnSameTable(t *testing.T) { tk.MustExec("drop table if exists t") tk.MustExec("create table t (a int primary key, b int)") tk.MustExec("insert into t values (1,3), (2,4)") - tk.MustGetErrMsg("update t m, t n set m.a = n.a+10, n.b = m.b+1 where m.a=n.a", - `[planner:1706]Primary key/partition key update is not allowed since the table is updated both as 'm' and 'n'.`) + tk.MustGetErrMsg( + "update t m, t n set m.a = n.a+10, n.b = m.b+1 where m.a=n.a", + `[planner:1706]Primary key/partition key update is not allowed since the table is updated both as 'm' and 'n'.`, + ) tk.MustExec("drop table if exists t") tk.MustExec("create table t (a int, b int, c int, primary key(a, b))") @@ -282,8 +300,10 @@ func TestMultiUpdateOnSameTable(t *testing.T) { tk.MustQuery("select * from t").Check(testkit.Rows("11 13 5", "12 14 6")) tk.MustExec("update t m, t n, t q set q.c=m.a+n.b, n.c = m.a+1, m.c = n.b+1 where m.b=n.b AND m.a=q.a") tk.MustQuery("select * from t").Check(testkit.Rows("11 13 24", "12 14 26")) - tk.MustGetErrMsg("update t m, t n, t q set m.a = m.a+1, n.c = n.c-1, q.c = q.a+q.b where m.b=n.b and n.b=q.b", - `[planner:1706]Primary key/partition key update is not allowed since the table is updated both as 'm' and 'n'.`) + tk.MustGetErrMsg( + "update t m, t n, t q set m.a = m.a+1, n.c = n.c-1, q.c = q.a+q.b where m.b=n.b and n.b=q.b", + `[planner:1706]Primary key/partition key update is not allowed since the table is updated both as 'm' and 'n'.`, + ) } func TestUpdateClusterIndex(t *testing.T) { @@ -316,7 +336,10 @@ func TestUpdateClusterIndex(t *testing.T) { tk.MustExec(`update ut3pk set v = 666 where id1 = 'abc' and id2 = 'bbb2' and id3 = 222`) tk.MustQuery(`select id1, id2, id3, v from ut3pk`).Check(testkit.Rows("abc bbb2 222 666")) tk.MustExec(`insert into ut3pk(id1, id2, id3, v) values ('abc', 'bbb3', 222, 777)`) - tk.MustGetErrCode(`update ut3pk set id2 = 'bbb3' where id1 = 'abc' and id2 = 'bbb2' and id3 = 222`, errno.ErrDupEntry) + tk.MustGetErrCode( + `update ut3pk set id2 = 'bbb3' where id1 = 'abc' and id2 = 'bbb2' and id3 = 222`, + errno.ErrDupEntry, + ) tk.MustExec(`drop table if exists ut1pku`) tk.MustExec(`create table ut1pku(id varchar(200) primary key, uk int, v int, unique key ukk(uk))`) @@ -457,7 +480,8 @@ func TestOutOfRangeWithUnsigned(t *testing.T) { tk.MustExec(`insert into t values(1)`) tk.MustGetErrMsg( "update t set ts = IF(ts < (0 - ts), 1,1) where ts>0", - "[types:1690]BIGINT UNSIGNED value is out of range in '(0 - test.t.ts)'") + "[types:1690]BIGINT UNSIGNED value is out of range in '(0 - test.t.ts)'", + ) } func TestIssue21447(t *testing.T) { @@ -504,96 +528,117 @@ func TestLockUnchangedUniqueKeys(t *testing.T) { tk1.MustExec("use test") tk2.MustExec("use test") - for _, tt := range []struct { - name string - create string - insert string - update string - }{ - { - // ref https://github.com/pingcap/tidb/issues/36438 - "Issue36438", - "create table t (i varchar(10), unique key(i))", - "insert into t values ('a')", - "update t set i = 'a'", - }, - { - "ClusteredAndRowUnchanged", - "create table t (k int, v int, primary key(k) clustered, key sk(k))", - "insert into t values (1, 10)", - "update t force index(sk) set v = 10 where k = 1", - }, - { - "ClusteredAndRowUnchangedAndParted", - "create table t (k int, v int, primary key(k) clustered, key sk(k)) partition by hash(k) partitions 4", - "insert into t values (1, 10)", - "update t force index(sk) set v = 10 where k = 1", - }, - { - "ClusteredAndRowChanged", - "create table t (k int, v int, primary key(k) clustered, key sk(k))", - "insert into t values (1, 10)", - "update t force index(sk) set v = 11 where k = 1", - }, - { - "NonClusteredAndRowUnchanged", - "create table t (k int, v int, primary key(k) nonclustered, key sk(k))", - "insert into t values (1, 10)", - "update t force index(sk) set v = 10 where k = 1", - }, - { - "NonClusteredAndRowUnchangedAndParted", - "create table t (k int, v int, primary key(k) nonclustered, key sk(k)) partition by hash(k) partitions 4", - "insert into t values (1, 10)", - "update t force index(sk) set v = 10 where k = 1", - }, - { - "NonClusteredAndRowChanged", - "create table t (k int, v int, primary key(k) nonclustered, key sk(k))", - "insert into t values (1, 10)", - "update t force index(sk) set v = 11 where k = 1", - }, - { - "UniqueAndRowUnchanged", - "create table t (k int, v int, unique key uk(k), key sk(k))", - "insert into t values (1, 10)", - "update t force index(sk) set v = 10 where k = 1", - }, - { - "UniqueAndRowUnchangedAndParted", - "create table t (k int, v int, unique key uk(k), key sk(k)) partition by hash(k) partitions 4", - "insert into t values (1, 10)", - "update t force index(sk) set v = 10 where k = 1", - }, - { - "UniqueAndRowChanged", - "create table t (k int, v int, unique key uk(k), key sk(k))", - "insert into t values (1, 10)", - "update t force index(sk) set v = 11 where k = 1", - }, - } { - t.Run(tt.name, func(t *testing.T) { - tk1.MustExec("drop table if exists t") - tk1.MustExec(tt.create) - tk1.MustExec(tt.insert) - tk1.MustExec("begin pessimistic") - - tk1.MustExec(tt.update) - - errCh := make(chan error, 1) - go func() { - _, err := tk2.Exec(tt.insert) - errCh <- err - }() - - select { - case <-time.After(100 * time.Millisecond): - tk1.MustExec("rollback") - require.Error(t, <-errCh) - case err := <-errCh: - require.Error(t, err) - require.Fail(t, "insert is not blocked by update") - } - }) + for _, shouldLock := range []bool{true, false} { + for _, tt := range []struct { + name string + create string + insert string + update string + isClusteredPK bool + }{ + { + // ref https://github.com/pingcap/tidb/issues/36438 + "Issue36438", + "create table t (i varchar(10), unique key(i))", + "insert into t values ('a')", + "update t set i = 'a'", + false, + }, + { + "ClusteredAndRowUnchanged", + "create table t (k int, v int, primary key(k) clustered, key sk(k))", + "insert into t values (1, 10)", + "update t force index(sk) set v = 10 where k = 1", + true, + }, + { + "ClusteredAndRowUnchangedAndParted", + "create table t (k int, v int, primary key(k) clustered, key sk(k)) partition by hash(k) partitions 4", + "insert into t values (1, 10)", + "update t force index(sk) set v = 10 where k = 1", + true, + }, + { + "ClusteredAndRowChanged", + "create table t (k int, v int, primary key(k) clustered, key sk(k))", + "insert into t values (1, 10)", + "update t force index(sk) set v = 11 where k = 1", + true, + }, + { + "NonClusteredAndRowUnchanged", + "create table t (k int, v int, primary key(k) nonclustered, key sk(k))", + "insert into t values (1, 10)", + "update t force index(sk) set v = 10 where k = 1", + false, + }, + { + "NonClusteredAndRowUnchangedAndParted", + "create table t (k int, v int, primary key(k) nonclustered, key sk(k)) partition by hash(k) partitions 4", + "insert into t values (1, 10)", + "update t force index(sk) set v = 10 where k = 1", + false, + }, + { + "NonClusteredAndRowChanged", + "create table t (k int, v int, primary key(k) nonclustered, key sk(k))", + "insert into t values (1, 10)", + "update t force index(sk) set v = 11 where k = 1", + false, + }, + { + "UniqueAndRowUnchanged", + "create table t (k int, v int, unique key uk(k), key sk(k))", + "insert into t values (1, 10)", + "update t force index(sk) set v = 10 where k = 1", + false, + }, + { + "UniqueAndRowUnchangedAndParted", + "create table t (k int, v int, unique key uk(k), key sk(k)) partition by hash(k) partitions 4", + "insert into t values (1, 10)", + "update t force index(sk) set v = 10 where k = 1", + false, + }, + { + "UniqueAndRowChanged", + "create table t (k int, v int, unique key uk(k), key sk(k))", + "insert into t values (1, 10)", + "update t force index(sk) set v = 11 where k = 1", + false, + }, + } { + t.Run( + tt.name+"-"+strconv.FormatBool(shouldLock), func(t *testing.T) { + tk1.MustExec(fmt.Sprintf("set @@tidb_lock_unchanged_keys = %v", shouldLock)) + tk1.MustExec("drop table if exists t") + tk1.MustExec(tt.create) + tk1.MustExec(tt.insert) + tk1.MustExec("begin pessimistic") + + tk1.MustExec(tt.update) + + errCh := make(chan error, 1) + go func() { + _, err := tk2.Exec(tt.insert) + errCh <- err + }() + + select { + case <-time.After(100 * time.Millisecond): + if !shouldLock && !tt.isClusteredPK { + require.Fail(t, "insert is blocked by update") + } + tk1.MustExec("rollback") + require.Error(t, <-errCh) + case err := <-errCh: + require.Error(t, err) + if shouldLock { + require.Fail(t, "insert is not blocked by update") + } + } + }, + ) + } } } diff --git a/executor/write.go b/executor/write.go index 82d5cd25f7a4d..41e1e1d322206 100644 --- a/executor/write.go +++ b/executor/write.go @@ -50,8 +50,11 @@ var ( // The return values: // 1. changed (bool) : does the update really change the row values. e.g. update set i = 1 where i = 1; // 2. err (error) : error in the update. -func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, modified []bool, t table.Table, - onDup bool, memTracker *memory.Tracker, fkChecks []*FKCheckExec, fkCascades []*FKCascadeExec) (bool, error) { +func updateRecord( + ctx context.Context, sctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, modified []bool, + t table.Table, + onDup bool, memTracker *memory.Tracker, fkChecks []*FKCheckExec, fkCascades []*FKCascadeExec, +) (bool, error) { r, ctx := tracing.StartRegionEx(ctx, "executor.updateRecord") defer r.End() @@ -85,7 +88,12 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old if !ok { return false, errors.Errorf("exchange partition process assert table partition failed") } - err := p.CheckForExchangePartition(sctx, pt.Meta().Partition, newData, tbl.ExchangePartitionInfo.ExchangePartitionDefID) + err := p.CheckForExchangePartition( + sctx, + pt.Meta().Partition, + newData, + tbl.ExchangePartitionInfo.ExchangePartitionDefID, + ) if err != nil { return false, err } @@ -137,7 +145,11 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old if sctx.GetSessionVars().ClientCapability&mysql.ClientFoundRows > 0 { sc.AddAffectedRows(1) } - _, err := addUnchangedKeysForLockByRow(sctx, t, h, oldData, lockRowKey|lockUniqueKeys) + keySet := lockRowKey + if sctx.GetSessionVars().LockUnchangedKeys { + keySet |= lockUniqueKeys + } + _, err := addUnchangedKeysForLockByRow(sctx, t, h, oldData, keySet) return false, err } @@ -199,9 +211,11 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old } return false, err } - // Lock unique keys when handle unchanged - if _, err := addUnchangedKeysForLockByRow(sctx, t, h, oldData, lockUniqueKeys); err != nil { - return false, err + if sctx.GetSessionVars().LockUnchangedKeys { + // Lock unique keys when handle unchanged + if _, err := addUnchangedKeysForLockByRow(sctx, t, h, oldData, lockUniqueKeys); err != nil { + return false, err + } } } for _, fkt := range fkChecks { @@ -232,9 +246,11 @@ const ( lockUniqueKeys ) -func addUnchangedKeysForLockByRow(sctx sessionctx.Context, t table.Table, h kv.Handle, row []types.Datum, keyset int) (int, error) { +func addUnchangedKeysForLockByRow( + sctx sessionctx.Context, t table.Table, h kv.Handle, row []types.Datum, keySet int, +) (int, error) { txnCtx := sctx.GetSessionVars().TxnCtx - if !txnCtx.IsPessimistic || keyset == 0 { + if !txnCtx.IsPessimistic || keySet == 0 { return 0, nil } count := 0 @@ -246,12 +262,12 @@ func addUnchangedKeysForLockByRow(sctx sessionctx.Context, t table.Table, h kv.H } physicalID = p.GetPhysicalID() } - if keyset&lockRowKey > 0 { + if keySet&lockRowKey > 0 { unchangedRowKey := tablecodec.EncodeRowKeyWithHandle(physicalID, h) txnCtx.AddUnchangedKeyForLock(unchangedRowKey) count++ } - if keyset&lockUniqueKeys > 0 { + if keySet&lockUniqueKeys > 0 { stmtCtx := sctx.GetSessionVars().StmtCtx clustered := t.Meta().HasClusteredIndex() for _, idx := range t.Indices() { @@ -263,7 +279,15 @@ func addUnchangedKeysForLockByRow(sctx sessionctx.Context, t table.Table, h kv.H if err != nil { return count, err } - unchangedUniqueKey, _, err := tablecodec.GenIndexKey(stmtCtx, idx.TableMeta(), meta, physicalID, ukVals, h, nil) + unchangedUniqueKey, _, err := tablecodec.GenIndexKey( + stmtCtx, + idx.TableMeta(), + meta, + physicalID, + ukVals, + h, + nil, + ) if err != nil { return count, err } @@ -274,7 +298,9 @@ func addUnchangedKeysForLockByRow(sctx sessionctx.Context, t table.Table, h kv.H return count, nil } -func rebaseAutoRandomValue(ctx context.Context, sctx sessionctx.Context, t table.Table, newData *types.Datum, col *table.Column) error { +func rebaseAutoRandomValue( + ctx context.Context, sctx sessionctx.Context, t table.Table, newData *types.Datum, col *table.Column, +) error { tableInfo := t.Meta() if !tableInfo.ContainsAutoRandomBits() { return nil diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 7dd080adc8524..2b4b08f4a8737 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1477,6 +1477,10 @@ type SessionVars struct { // OptimizerFixControl control some details of the optimizer behavior through the tidb_opt_fix_control variable. OptimizerFixControl map[uint64]string + + // Whether to lock duplicate keys in INSERT IGNORE and REPLACE statements, + // or unchanged unique keys in UPDATE statements, see PR #42210 and #42713 + LockUnchangedKeys bool } // planReplayerSessionFinishedTaskKeyLen is used to control the max size for the finished plan replayer task key in session diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 0f7d48ff79f23..a1716a99cf164 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -2681,6 +2681,16 @@ var defaultSysVars = []*SysVar{ }, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) { return strconv.Itoa(ldap.LDAPSimpleAuthImpl.GetMaxCapacity()), nil }}, + { + Scope: ScopeGlobal | ScopeSession, + Name: TiDBLockUnchangedKeys, + Value: BoolToOnOff(DefTiDBLockUnchangedKeys), + Type: TypeBool, + SetSession: func(vars *SessionVars, s string) error { + vars.LockUnchangedKeys = TiDBOptOn(s) + return nil + }, + }, } func setTiFlashComputeDispatchPolicy(s *SessionVars, val string) error { diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index b53eaf1fa2c99..236c496f0de08 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -866,6 +866,16 @@ const ( // TiDBOptFixControl makes the user able to control some details of the optimizer behavior. TiDBOptFixControl = "tidb_opt_fix_control" + + // TiDBLockUnchangedKeys indicates whether to lock duplicate keys in INSERT IGNORE and REPLACE statements, + // or unchanged unique keys in UPDATE statements, see PR #42210 and #42713 + TiDBLockUnchangedKeys = "tidb_lock_unchanged_keys" + + // TiDBFastCheckTable enables fast check table. + TiDBFastCheckTable = "tidb_enable_fast_table_check" + + // TiDBAnalyzeSkipColumnTypes indicates the column types whose statistics would not be collected when executing the ANALYZE command. + TiDBAnalyzeSkipColumnTypes = "tidb_analyze_skip_column_types" ) // TiDB vars that have only global scope @@ -1328,6 +1338,10 @@ const ( DefAuthenticationLDAPSimpleUserSearchAttr = "uid" DefAuthenticationLDAPSimpleInitPoolSize = 10 DefAuthenticationLDAPSimpleMaxPoolSize = 1000 + DefTiDBEnableFastCheckTable = true + DefRuntimeFilterType = "IN" + DefRuntimeFilterMode = "OFF" + DefTiDBLockUnchangedKeys = true ) // Process global variables. @@ -1396,17 +1410,27 @@ var ( TTLScanBatchSize = atomic.NewInt64(DefTiDBTTLScanBatchSize) TTLDeleteBatchSize = atomic.NewInt64(DefTiDBTTLDeleteBatchSize) TTLDeleteRateLimit = atomic.NewInt64(DefTiDBTTLDeleteRateLimit) - TTLJobScheduleWindowStartTime = atomic.NewTime(mustParseTime(FullDayTimeFormat, DefTiDBTTLJobScheduleWindowStartTime)) - TTLJobScheduleWindowEndTime = atomic.NewTime(mustParseTime(FullDayTimeFormat, DefTiDBTTLJobScheduleWindowEndTime)) - TTLScanWorkerCount = atomic.NewInt32(DefTiDBTTLScanWorkerCount) - TTLDeleteWorkerCount = atomic.NewInt32(DefTiDBTTLDeleteWorkerCount) - PasswordHistory = atomic.NewInt64(DefPasswordReuseHistory) - PasswordReuseInterval = atomic.NewInt64(DefPasswordReuseTime) - IsSandBoxModeEnabled = atomic.NewBool(false) - MaxPreparedStmtCountValue = atomic.NewInt64(DefMaxPreparedStmtCount) - HistoricalStatsDuration = atomic.NewDuration(DefTiDBHistoricalStatsDuration) - EnableHistoricalStatsForCapture = atomic.NewBool(DefTiDBEnableHistoricalStatsForCapture) - TTLRunningTasks = atomic.NewInt32(DefTiDBTTLRunningTasks) + TTLJobScheduleWindowStartTime = atomic.NewTime( + mustParseTime( + FullDayTimeFormat, + DefTiDBTTLJobScheduleWindowStartTime, + ), + ) + TTLJobScheduleWindowEndTime = atomic.NewTime( + mustParseTime( + FullDayTimeFormat, + DefTiDBTTLJobScheduleWindowEndTime, + ), + ) + TTLScanWorkerCount = atomic.NewInt32(DefTiDBTTLScanWorkerCount) + TTLDeleteWorkerCount = atomic.NewInt32(DefTiDBTTLDeleteWorkerCount) + PasswordHistory = atomic.NewInt64(DefPasswordReuseHistory) + PasswordReuseInterval = atomic.NewInt64(DefPasswordReuseTime) + IsSandBoxModeEnabled = atomic.NewBool(false) + MaxPreparedStmtCountValue = atomic.NewInt64(DefMaxPreparedStmtCount) + HistoricalStatsDuration = atomic.NewDuration(DefTiDBHistoricalStatsDuration) + EnableHistoricalStatsForCapture = atomic.NewBool(DefTiDBEnableHistoricalStatsForCapture) + TTLRunningTasks = atomic.NewInt32(DefTiDBTTLRunningTasks) // always set the default value to false because the resource control in kv-client is not inited // It will be initialized to the right value after the first call of `rebuildSysVarCache` EnableResourceControl = atomic.NewBool(false)