Skip to content

Commit

Permalink
txn: add a variable to control whether to lock unchanged unique keys (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jun 26, 2023
1 parent 63fb86b commit 38ef1f9
Show file tree
Hide file tree
Showing 7 changed files with 364 additions and 202 deletions.
71 changes: 53 additions & 18 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,10 @@ func (e *InsertValues) initInsertColumns() error {
}
cols, missingColIdx = table.FindColumns(tableCols, columns, e.Table.Meta().PKIsHandle)
if missingColIdx >= 0 {
return errors.Errorf("INSERT INTO %s: unknown column %s",
e.Table.Meta().Name.O, e.Columns[missingColIdx].Name.O)
return errors.Errorf(
"INSERT INTO %s: unknown column %s",
e.Table.Meta().Name.O, e.Columns[missingColIdx].Name.O,
)
}
} else {
// If e.Columns are empty, use all columns instead.
Expand Down Expand Up @@ -389,7 +391,9 @@ func (e *InsertValues) evalRow(ctx context.Context, list []expression.Expression

var emptyRow chunk.Row

func (e *InsertValues) fastEvalRow(ctx context.Context, list []expression.Expression, rowIdx int) ([]types.Datum, error) {
func (e *InsertValues) fastEvalRow(ctx context.Context, list []expression.Expression, rowIdx int) (
[]types.Datum, error,
) {
rowLen := len(e.Table.Cols())
if e.hasExtraHandle {
rowLen++
Expand Down Expand Up @@ -648,7 +652,9 @@ func (e *InsertValues) fillColValue(
// `insert|replace values` can guarantee consecutive autoID in a batch.
// Other statements like `insert select from` don't guarantee consecutive autoID.
// https://dev.mysql.com/doc/refman/8.0/en/innodb-auto-increment-handling.html
func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue []bool, rowIdx int) ([]types.Datum, error) {
func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue []bool, rowIdx int) (
[]types.Datum, error,
) {
gCols := make([]*table.Column, 0)
tCols := e.Table.Cols()
if e.hasExtraHandle {
Expand Down Expand Up @@ -691,7 +697,12 @@ func (e *InsertValues) fillRow(ctx context.Context, row []types.Datum, hasValue
if !ok {
return nil, errors.Errorf("exchange partition process assert table partition failed")
}
err := p.CheckForExchangePartition(e.ctx, pt.Meta().Partition, row, tbl.ExchangePartitionInfo.ExchangePartitionDefID)
err := p.CheckForExchangePartition(
e.ctx,
pt.Meta().Partition,
row,
tbl.ExchangePartitionInfo.ExchangePartitionDefID,
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -804,7 +815,9 @@ func setDatumAutoIDAndCast(ctx sessionctx.Context, d *types.Datum, id int64, col

// lazyAdjustAutoIncrementDatum is quite similar to adjustAutoIncrementDatum
// except it will cache auto increment datum previously for lazy batch allocation of autoID.
func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows [][]types.Datum) ([][]types.Datum, error) {
func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows [][]types.Datum) (
[][]types.Datum, error,
) {
// Not in lazyFillAutoID mode means no need to fill.
if !e.lazyFillAutoID {
return rows, nil
Expand Down Expand Up @@ -896,7 +909,9 @@ func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows []
return rows, nil
}

func (e *InsertValues) adjustAutoIncrementDatum(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) {
func (e *InsertValues) adjustAutoIncrementDatum(
ctx context.Context, d types.Datum, hasValue bool, c *table.Column,
) (types.Datum, error) {
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
id, ok := retryInfo.GetCurrAutoIncrementID()
Expand Down Expand Up @@ -972,7 +987,9 @@ func getAutoRecordID(d types.Datum, target *types.FieldType, isInsert bool) (int
return recordID, nil
}

func (e *InsertValues) adjustAutoRandomDatum(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) {
func (e *InsertValues) adjustAutoRandomDatum(
ctx context.Context, d types.Datum, hasValue bool, c *table.Column,
) (types.Datum, error) {
retryInfo := e.ctx.GetSessionVars().RetryInfo
if retryInfo.Retrying {
autoRandomID, ok := retryInfo.GetCurrAutoRandomID()
Expand Down Expand Up @@ -1071,7 +1088,9 @@ func (e *InsertValues) rebaseAutoRandomID(ctx context.Context, recordID int64, f
return alloc.Rebase(ctx, autoRandomID, true)
}

func (e *InsertValues) adjustImplicitRowID(ctx context.Context, d types.Datum, hasValue bool, c *table.Column) (types.Datum, error) {
func (e *InsertValues) adjustImplicitRowID(
ctx context.Context, d types.Datum, hasValue bool, c *table.Column,
) (types.Datum, error) {
var err error
var recordID int64
if !hasValue {
Expand Down Expand Up @@ -1119,7 +1138,11 @@ func (e *InsertValues) rebaseImplicitRowID(ctx context.Context, recordID int64)
alloc := e.Table.Allocators(e.ctx).Get(autoid.RowIDAllocType)
tableInfo := e.Table.Meta()

shardFmt := autoid.NewShardIDFormat(types.NewFieldType(mysql.TypeLonglong), tableInfo.ShardRowIDBits, autoid.RowIDBitLength)
shardFmt := autoid.NewShardIDFormat(
types.NewFieldType(mysql.TypeLonglong),
tableInfo.ShardRowIDBits,
autoid.RowIDBitLength,
)
newTiDBRowIDBase := shardFmt.IncrementalMask() & recordID

return alloc.Rebase(ctx, newTiDBRowIDBase, true)
Expand Down Expand Up @@ -1147,9 +1170,11 @@ func (e *InsertValues) collectRuntimeStatsEnabled() bool {

// batchCheckAndInsert checks rows with duplicate errors.
// All duplicate rows will be ignored and appended as duplicate warnings.
func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.Datum,
func (e *InsertValues) batchCheckAndInsert(
ctx context.Context, rows [][]types.Datum,
addRecord func(ctx context.Context, row []types.Datum) error,
replace bool) error {
replace bool,
) error {
// all the rows will be checked, so it is safe to set BatchCheck = true
e.ctx.GetSessionVars().StmtCtx.BatchCheck = true
defer tracing.StartRegion(ctx, "InsertValues.batchCheckAndInsert").End()
Expand Down Expand Up @@ -1218,7 +1243,8 @@ CheckAndInsert:
}
} else {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(r.handleKey.dupErr)
if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic {
if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic &&
e.ctx.GetSessionVars().LockUnchangedKeys {
// lock duplicated row key on insert-ignore
txnCtx.AddUnchangedKeyForLock(r.handleKey.newKey)
}
Expand Down Expand Up @@ -1254,7 +1280,8 @@ CheckAndInsert:
} else {
// If duplicate keys were found in BatchGet, mark row = nil.
e.ctx.GetSessionVars().StmtCtx.AppendWarning(uk.dupErr)
if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic {
if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic &&
e.ctx.GetSessionVars().LockUnchangedKeys {
// lock duplicated unique key on insert-ignore
txnCtx.AddUnchangedKeyForLock(uk.newKey)
}
Expand Down Expand Up @@ -1293,9 +1320,11 @@ func (e *InsertValues) removeRow(
newRow := r.row
oldRow, err := getOldRow(ctx, e.ctx, txn, r.t, handle, e.GenExprs)
if err != nil {
logutil.BgLogger().Error("get old row failed when replace",
logutil.BgLogger().Error(
"get old row failed when replace",
zap.String("handle", handle.String()),
zap.String("toBeInsertedRow", types.DatumsToStrNoErr(r.row)))
zap.String("toBeInsertedRow", types.DatumsToStrNoErr(r.row)),
)
if kv.IsErrNotFound(err) {
err = errors.NotFoundf("can not be duplicated row, due to old row not found. handle %s", handle)
}
Expand All @@ -1310,7 +1339,11 @@ func (e *InsertValues) removeRow(
if inReplace {
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
}
if _, err := addUnchangedKeysForLockByRow(e.ctx, r.t, handle, oldRow, lockRowKey|lockUniqueKeys); err != nil {
keySet := lockRowKey
if e.ctx.GetSessionVars().LockUnchangedKeys {
keySet |= lockUniqueKeys
}
if _, err := addUnchangedKeysForLockByRow(e.ctx, r.t, handle, oldRow, keySet); err != nil {
return false, err
}
return true, nil
Expand Down Expand Up @@ -1354,7 +1387,9 @@ func (e *InsertValues) addRecord(ctx context.Context, row []types.Datum) error {
return e.addRecordWithAutoIDHint(ctx, row, 0)
}

func (e *InsertValues) addRecordWithAutoIDHint(ctx context.Context, row []types.Datum, reserveAutoIDCount int) (err error) {
func (e *InsertValues) addRecordWithAutoIDHint(
ctx context.Context, row []types.Datum, reserveAutoIDCount int,
) (err error) {
vars := e.ctx.GetSessionVars()
if !vars.ConstraintCheckInPlace {
vars.PresumeKeyNotExists = true
Expand Down
132 changes: 75 additions & 57 deletions executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1510,68 +1510,86 @@ func TestIssue32213(t *testing.T) {
tk.MustQuery("select cast(test.t1.c1 as decimal(6, 3)) from test.t1").Check(testkit.Rows("100.000"))
}

func TestInsertLock(t *testing.T) {
func TestInsertLockUnchangedKeys(t *testing.T) {
store := testkit.CreateMockStore(t)
tk1 := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk2.MustExec("use test")

for _, tt := range []struct {
name string
ddl string
dml string
}{
{
"replace-pk",
"create table t (c int primary key clustered)",
"replace into t values (1)",
},
{
"replace-uk",
"create table t (c int unique key)",
"replace into t values (1)",
},
{
"insert-ingore-pk",
"create table t (c int primary key clustered)",
"insert ignore into t values (1)",
},
{
"insert-ingore-uk",
"create table t (c int unique key)",
"insert ignore into t values (1)",
},
{
"insert-update-pk",
"create table t (c int primary key clustered)",
"insert into t values (1) on duplicate key update c = values(c)",
},
{
"insert-update-uk",
"create table t (c int unique key)",
"insert into t values (1) on duplicate key update c = values(c)",
},
} {
t.Run(tt.name, func(t *testing.T) {
tk1.MustExec("drop table if exists t")
tk1.MustExec(tt.ddl)
tk1.MustExec("insert into t values (1)")
tk1.MustExec("begin")
tk1.MustExec(tt.dml)
done := make(chan struct{})
go func() {
tk2.MustExec("delete from t")
done <- struct{}{}
}()
select {
case <-done:
require.Failf(t, "txn2 is not blocked by %q", tt.dml)
case <-time.After(100 * time.Millisecond):
}
tk1.MustExec("commit")
<-done
tk1.MustQuery("select * from t").Check([][]interface{}{})
})
for _, shouldLock := range []bool{false} {
for _, tt := range []struct {
name string
ddl string
dml string
isClusteredPK bool
}{
{
"replace-pk",
"create table t (c int primary key clustered)",
"replace into t values (1)",
true,
},
{
"replace-uk",
"create table t (c int unique key)",
"replace into t values (1)",
false,
},
{
"insert-ignore-pk",
"create table t (c int primary key clustered)",
"insert ignore into t values (1)",
true,
},
{
"insert-ignore-uk",
"create table t (c int unique key)",
"insert ignore into t values (1)",
false,
},
{
"insert-update-pk",
"create table t (c int primary key clustered)",
"insert into t values (1) on duplicate key update c = values(c)",
true,
},
{
"insert-update-uk",
"create table t (c int unique key)",
"insert into t values (1) on duplicate key update c = values(c)",
false,
},
} {
t.Run(
tt.name+"-"+strconv.FormatBool(shouldLock), func(t *testing.T) {
tk1.MustExec(fmt.Sprintf("set @@tidb_lock_unchanged_keys = %v", shouldLock))
tk1.MustExec("drop table if exists t")
tk1.MustExec(tt.ddl)
tk1.MustExec("insert into t values (1)")
tk1.MustExec("begin")
tk1.MustExec(tt.dml)
errCh := make(chan error)
go func() {
_, err := tk2.Exec("insert into t values (1)")
errCh <- err
}()
select {
case <-errCh:
if shouldLock {
require.Failf(t, "txn2 is not blocked by %q", tt.dml)
}
close(errCh)
case <-time.After(200 * time.Millisecond):
if !shouldLock && !tt.isClusteredPK {
require.Failf(t, "txn2 is blocked by %q", tt.dml)
}
}
tk1.MustExec("commit")
<-errCh
tk1.MustQuery("select * from t").Check(testkit.Rows("1"))
},
)
}
}
}
Loading

0 comments on commit 38ef1f9

Please sign in to comment.