Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: add a variable to control whether to lock unchanged unique keys (#44598) #44938

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 53 additions & 18 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,10 @@ func (e *InsertValues) initInsertColumns() error {
}
cols, missingColIdx = table.FindColumns(tableCols, columns, e.Table.Meta().PKIsHandle)
if missingColIdx >= 0 {
return errors.Errorf("INSERT INTO %s: unknown column %s",
e.Table.Meta().Name.O, e.Columns[missingColIdx].Name.O)
return errors.Errorf(
"INSERT INTO %s: unknown column %s",
e.Table.Meta().Name.O, e.Columns[missingColIdx].Name.O,
)
}
} else {
// If e.Columns are empty, use all columns instead.
Expand Down Expand Up @@ -389,7 +391,9 @@ func (e *InsertValues) evalRow(ctx context.Context, list []expression.Expression

var emptyRow chunk.Row

func (e *InsertValues) fastEvalRow(ctx context.Context, list []expression.Expression, rowIdx int) ([]types.Datum, error) {
func (e *InsertValues) fastEvalRow(ctx context.Context, list []expression.Expression, rowIdx int) (
[]types.Datum, error,
) {
rowLen := len(e.Table.Cols())
if e.hasExtraHandle {
rowLen++
Expand Down Expand Up @@ -648,7 +652,9 @@ func (e *InsertValues) fillColValue(
// `insert|replace values` can guarantee consecutive autoID in a batch.
// Other statements like `insert select from` don't guarantee consecutive autoID.
// https://dev.mysql.com/doc/refman/8.0/en/innodb-auto-increment-handling.html
func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue []bool, rowIdx int) ([]types.Datum, error) {
func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue []bool, rowIdx int) (
[]types.Datum, error,
) {
gCols := make([]*table.Column, 0)
tCols := e.Table.Cols()
if e.hasExtraHandle {
Expand Down Expand Up @@ -691,7 +697,12 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue
if !ok {
return nil, errors.Errorf("exchange partition process assert table partition failed")
}
err := p.CheckForExchangePartition(e.ctx, pt.Meta().Partition, row, tbl.ExchangePartitionInfo.ExchangePartitionDefID)
err := p.CheckForExchangePartition(
e.ctx,
pt.Meta().Partition,
row,
tbl.ExchangePartitionInfo.ExchangePartitionDefID,
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -804,7 +815,9 @@ func setDatumAutoIDAndCast(ctx sessionctx.Context, d *types.Datum, id int64, col

// lazyAdjustAutoIncrementDatum is quite similar to adjustAutoIncrementDatum
// except it will cache auto increment datum previously for lazy batch allocation of autoID.
func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows [][]types.Datum) ([][]types.Datum, error) {
func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows [][]types.Datum) (
[][]types.Datum, error,
) {
// Not in lazyFillAutoID mode means no need to fill.
if !e.lazyFillAutoID {
return rows, nil
Expand Down Expand Up @@ -896,7 +909,9 @@ func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows []
return rows, nil
}

func (e *InsertValues) adjustAutoIncrementDatum(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) {
func (e *InsertValues) adjustAutoIncrementDatum(
ctx context.Context, d types.Datum, hasValue bool, c *table.Column,
) (types.Datum, error) {
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
id, ok := retryInfo.GetCurrAutoIncrementID()
Expand Down Expand Up @@ -972,7 +987,9 @@ func getAutoRecordID(d types.Datum, target *types.FieldType, isInsert bool) (int
return recordID, nil
}

func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) {
func (e *InsertValues) adjustAutoRandomDatum(
ctx context.Context, d types.Datum, hasValue bool, c *table.Column,
) (types.Datum, error) {
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
autoRandomID, ok := retryInfo.GetCurrAutoRandomID()
Expand Down Expand Up @@ -1071,7 +1088,9 @@ func (e *InsertValues) rebaseAutoRandomID(ctx context.Context, recordID int64, f
return alloc.Rebase(ctx, autoRandomID, true)
}

func (e *InsertValues) adjustImplicitRowID(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) {
func (e *InsertValues) adjustImplicitRowID(
ctx context.Context, d types.Datum, hasValue bool, c *table.Column,
) (types.Datum, error) {
var err error
var recordID int64
if !hasValue {
Expand Down Expand Up @@ -1119,7 +1138,11 @@ func (e *InsertValues) rebaseImplicitRowID(ctx context.Context, recordID int64)
alloc := e.Table.Allocators(e.ctx).Get(autoid.RowIDAllocType)
tableInfo := e.Table.Meta()

shardFmt := autoid.NewShardIDFormat(types.NewFieldType(mysql.TypeLonglong), tableInfo.ShardRowIDBits, autoid.RowIDBitLength)
shardFmt := autoid.NewShardIDFormat(
types.NewFieldType(mysql.TypeLonglong),
tableInfo.ShardRowIDBits,
autoid.RowIDBitLength,
)
newTiDBRowIDBase := shardFmt.IncrementalMask() & recordID

return alloc.Rebase(ctx, newTiDBRowIDBase, true)
Expand Down Expand Up @@ -1147,9 +1170,11 @@ func (e *InsertValues) collectRuntimeStatsEnabled() bool {

// batchCheckAndInsert checks rows with duplicate errors.
// All duplicate rows will be ignored and appended as duplicate warnings.
func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.Datum,
func (e *InsertValues) batchCheckAndInsert(
ctx context.Context, rows [][]types.Datum,
addRecord func(ctx context.Context, row []types.Datum) error,
replace bool) error {
replace bool,
) error {
// all the rows will be checked, so it is safe to set BatchCheck = true
e.ctx.GetSessionVars().StmtCtx.BatchCheck = true
defer tracing.StartRegion(ctx, "InsertValues.batchCheckAndInsert").End()
Expand Down Expand Up @@ -1218,7 +1243,8 @@ CheckAndInsert:
}
} else {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(r.handleKey.dupErr)
if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic {
if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic &&
e.ctx.GetSessionVars().LockUnchangedKeys {
// lock duplicated row key on insert-ignore
txnCtx.AddUnchangedKeyForLock(r.handleKey.newKey)
}
Expand Down Expand Up @@ -1254,7 +1280,8 @@ CheckAndInsert:
} else {
// If duplicate keys were found in BatchGet, mark row = nil.
e.ctx.GetSessionVars().StmtCtx.AppendWarning(uk.dupErr)
if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic {
if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic &&
e.ctx.GetSessionVars().LockUnchangedKeys {
// lock duplicated unique key on insert-ignore
txnCtx.AddUnchangedKeyForLock(uk.newKey)
}
Expand Down Expand Up @@ -1293,9 +1320,11 @@ func (e *InsertValues) removeRow(
newRow := r.row
oldRow, err := getOldRow(ctx, e.ctx, txn, r.t, handle, e.GenExprs)
if err != nil {
logutil.BgLogger().Error("get old row failed when replace",
logutil.BgLogger().Error(
"get old row failed when replace",
zap.String("handle", handle.String()),
zap.String("toBeInsertedRow", types.DatumsToStrNoErr(r.row)))
zap.String("toBeInsertedRow", types.DatumsToStrNoErr(r.row)),
)
if kv.IsErrNotFound(err) {
err = errors.NotFoundf("can not be duplicated row, due to old row not found. handle %s", handle)
}
Expand All @@ -1310,7 +1339,11 @@ func (e *InsertValues) removeRow(
if inReplace {
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
}
if _, err := addUnchangedKeysForLockByRow(e.ctx, r.t, handle, oldRow, lockRowKey|lockUniqueKeys); err != nil {
keySet := lockRowKey
if e.ctx.GetSessionVars().LockUnchangedKeys {
keySet |= lockUniqueKeys
}
if _, err := addUnchangedKeysForLockByRow(e.ctx, r.t, handle, oldRow, keySet); err != nil {
return false, err
}
return true, nil
Expand Down Expand Up @@ -1354,7 +1387,9 @@ func (e *InsertValues) addRecord(ctx context.Context, row []types.Datum) error {
return e.addRecordWithAutoIDHint(ctx, row, 0)
}

func (e *InsertValues) addRecordWithAutoIDHint(ctx context.Context, row []types.Datum, reserveAutoIDCount int) (err error) {
func (e *InsertValues) addRecordWithAutoIDHint(
ctx context.Context, row []types.Datum, reserveAutoIDCount int,
) (err error) {
vars := e.ctx.GetSessionVars()
if !vars.ConstraintCheckInPlace {
vars.PresumeKeyNotExists = true
Expand Down
132 changes: 75 additions & 57 deletions executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1510,68 +1510,86 @@ func TestIssue32213(t *testing.T) {
tk.MustQuery("select cast(test.t1.c1 as decimal(6, 3)) from test.t1").Check(testkit.Rows("100.000"))
}

func TestInsertLock(t *testing.T) {
func TestInsertLockUnchangedKeys(t *testing.T) {
store := testkit.CreateMockStore(t)
tk1 := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk2.MustExec("use test")

for _, tt := range []struct {
name string
ddl string
dml string
}{
{
"replace-pk",
"create table t (c int primary key clustered)",
"replace into t values (1)",
},
{
"replace-uk",
"create table t (c int unique key)",
"replace into t values (1)",
},
{
"insert-ingore-pk",
"create table t (c int primary key clustered)",
"insert ignore into t values (1)",
},
{
"insert-ingore-uk",
"create table t (c int unique key)",
"insert ignore into t values (1)",
},
{
"insert-update-pk",
"create table t (c int primary key clustered)",
"insert into t values (1) on duplicate key update c = values(c)",
},
{
"insert-update-uk",
"create table t (c int unique key)",
"insert into t values (1) on duplicate key update c = values(c)",
},
} {
t.Run(tt.name, func(t *testing.T) {
tk1.MustExec("drop table if exists t")
tk1.MustExec(tt.ddl)
tk1.MustExec("insert into t values (1)")
tk1.MustExec("begin")
tk1.MustExec(tt.dml)
done := make(chan struct{})
go func() {
tk2.MustExec("delete from t")
done <- struct{}{}
}()
select {
case <-done:
require.Failf(t, "txn2 is not blocked by %q", tt.dml)
case <-time.After(100 * time.Millisecond):
}
tk1.MustExec("commit")
<-done
tk1.MustQuery("select * from t").Check([][]interface{}{})
})
for _, shouldLock := range []bool{false} {
for _, tt := range []struct {
name string
ddl string
dml string
isClusteredPK bool
}{
{
"replace-pk",
"create table t (c int primary key clustered)",
"replace into t values (1)",
true,
},
{
"replace-uk",
"create table t (c int unique key)",
"replace into t values (1)",
false,
},
{
"insert-ignore-pk",
"create table t (c int primary key clustered)",
"insert ignore into t values (1)",
true,
},
{
"insert-ignore-uk",
"create table t (c int unique key)",
"insert ignore into t values (1)",
false,
},
{
"insert-update-pk",
"create table t (c int primary key clustered)",
"insert into t values (1) on duplicate key update c = values(c)",
true,
},
{
"insert-update-uk",
"create table t (c int unique key)",
"insert into t values (1) on duplicate key update c = values(c)",
false,
},
} {
t.Run(
tt.name+"-"+strconv.FormatBool(shouldLock), func(t *testing.T) {
tk1.MustExec(fmt.Sprintf("set @@tidb_lock_unchanged_keys = %v", shouldLock))
tk1.MustExec("drop table if exists t")
tk1.MustExec(tt.ddl)
tk1.MustExec("insert into t values (1)")
tk1.MustExec("begin")
tk1.MustExec(tt.dml)
errCh := make(chan error)
go func() {
_, err := tk2.Exec("insert into t values (1)")
errCh <- err
}()
select {
case <-errCh:
if shouldLock {
require.Failf(t, "txn2 is not blocked by %q", tt.dml)
}
close(errCh)
case <-time.After(200 * time.Millisecond):
if !shouldLock && !tt.isClusteredPK {
require.Failf(t, "txn2 is blocked by %q", tt.dml)
}
}
tk1.MustExec("commit")
<-errCh
tk1.MustQuery("select * from t").Check(testkit.Rows("1"))
},
)
}
}
}
Loading