Skip to content

Commit

Permalink
executor: revert #42285 and #42503 (#44000)
Browse files Browse the repository at this point in the history
  • Loading branch information
you06 authored May 19, 2023
1 parent 29116c0 commit 5239c23
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 128 deletions.
35 changes: 35 additions & 0 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,23 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
if err != nil {
return err
}
// Change the unique index LOCK into PUT record.
if len(indexKeys) > 0 {
if !e.txn.Valid() {
return kv.ErrInvalidTxn
}
membuf := e.txn.GetMemBuffer()
for _, idxKey := range indexKeys {
handleVal := handleVals[string(idxKey)]
if len(handleVal) == 0 {
continue
}
err = membuf.Set(idxKey, handleVal)
if err != nil {
return err
}
}
}
}
// Fetch all values.
values, err = batchGetter.BatchGet(ctx, keys)
Expand All @@ -403,6 +420,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
if e.lock && rc {
existKeys = make([]kv.Key, 0, 2*len(values))
}
changeLockToPutIdxKeys := make([]kv.Key, 0, len(indexKeys))
e.values = make([][]byte, 0, len(values))
for i, key := range keys {
val := values[string(key)]
Expand Down Expand Up @@ -437,6 +455,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
// lock primary key for clustered index table is redundant
if len(indexKeys) != 0 {
existKeys = append(existKeys, indexKeys[i])
changeLockToPutIdxKeys = append(changeLockToPutIdxKeys, indexKeys[i])
}
}
}
Expand All @@ -446,6 +465,22 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
if err != nil {
return err
}
if len(changeLockToPutIdxKeys) > 0 {
if !e.txn.Valid() {
return kv.ErrInvalidTxn
}
for _, idxKey := range changeLockToPutIdxKeys {
membuf := e.txn.GetMemBuffer()
handleVal := handleVals[string(idxKey)]
if len(handleVal) == 0 {
return kv.ErrNotExist
}
err = membuf.Set(idxKey, handleVal)
if err != nil {
return err
}
}
}
}
e.handles = handles
return nil
Expand Down
12 changes: 0 additions & 12 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1166,10 +1166,6 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
}
} else {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(r.handleKey.dupErr)
if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic {
// lock duplicated row key on insert-ignore
txnCtx.AddUnchangedRowKey(r.handleKey.newKey)
}
continue
}
} else if !kv.IsErrNotFound(err) {
Expand All @@ -1181,10 +1177,6 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
if err == nil {
// If duplicate keys were found in BatchGet, mark row = nil.
e.ctx.GetSessionVars().StmtCtx.AppendWarning(uk.dupErr)
if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic {
// lock duplicated unique key on insert-ignore
txnCtx.AddUnchangedRowKey(uk.newKey)
}
skip = true
break
}
Expand Down Expand Up @@ -1233,10 +1225,6 @@ func (e *InsertValues) removeRow(ctx context.Context, txn kv.Transaction, r toBe
return err
}
if identical {
_, err := appendUnchangedRowForLock(e.ctx, r.t, handle, oldRow)
if err != nil {
return err
}
return nil
}

Expand Down
66 changes: 0 additions & 66 deletions executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1481,69 +1481,3 @@ func TestIssue32213(t *testing.T) {
tk.MustQuery("select cast(test.t1.c1 as decimal(5, 3)) from test.t1").Check(testkit.Rows("99.999"))
tk.MustQuery("select cast(test.t1.c1 as decimal(6, 3)) from test.t1").Check(testkit.Rows("100.000"))
}

func TestInsertLock(t *testing.T) {
store := testkit.CreateMockStore(t)
tk1 := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk2.MustExec("use test")

for _, tt := range []struct {
name string
ddl string
dml string
}{
{
"replace-pk",
"create table t (c int primary key clustered)",
"replace into t values (1)",
},
{
"replace-uk",
"create table t (c int unique key)",
"replace into t values (1)",
},
{
"insert-ingore-pk",
"create table t (c int primary key clustered)",
"insert ignore into t values (1)",
},
{
"insert-ingore-uk",
"create table t (c int unique key)",
"insert ignore into t values (1)",
},
{
"insert-update-pk",
"create table t (c int primary key clustered)",
"insert into t values (1) on duplicate key update c = values(c)",
},
{
"insert-update-uk",
"create table t (c int unique key)",
"insert into t values (1) on duplicate key update c = values(c)",
},
} {
t.Run(tt.name, func(t *testing.T) {
tk1.MustExec("drop table if exists t")
tk1.MustExec(tt.ddl)
tk1.MustExec("insert into t values (1)")
tk1.MustExec("begin")
tk1.MustExec(tt.dml)
done := make(chan struct{})
go func() {
tk2.MustExec("delete from t")
done <- struct{}{}
}()
select {
case <-done:
require.Failf(t, "txn2 is not blocked by %q", tt.dml)
case <-time.After(100 * time.Millisecond):
}
tk1.MustExec("commit")
<-done
tk1.MustQuery("select * from t").Check([][]interface{}{})
})
}
}
12 changes: 12 additions & 0 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,18 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
return nil
}

// Change the unique index LOCK into PUT record.
if e.lock {
if !e.txn.Valid() {
return kv.ErrInvalidTxn
}
memBuffer := e.txn.GetMemBuffer()
err = memBuffer.Set(e.idxKey, e.handleVal)
if err != nil {
return err
}
}

var iv kv.Handle
iv, err = tablecodec.DecodeHandleInUniqueIndexValue(e.handleVal, e.tblInfo.IsCommonHandle)
if err != nil {
Expand Down
4 changes: 0 additions & 4 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ func (e *ReplaceExec) removeRow(ctx context.Context, txn kv.Transaction, handle
}
if rowUnchanged {
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
_, err := appendUnchangedRowForLock(e.ctx, r.t, handle, oldRow)
if err != nil {
return false, err
}
return true, nil
}

Expand Down
36 changes: 16 additions & 20 deletions executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,22 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old
if sctx.GetSessionVars().ClientCapability&mysql.ClientFoundRows > 0 {
sc.AddAffectedRows(1)
}
_, err := appendUnchangedRowForLock(sctx, t, h, oldData)
return false, err

physicalID := t.Meta().ID
if pt, ok := t.(table.PartitionedTable); ok {
p, err := pt.GetPartitionByRow(sctx, oldData)
if err != nil {
return false, err
}
physicalID = p.GetPhysicalID()
}

unchangedRowKey := tablecodec.EncodeRowKeyWithHandle(physicalID, h)
txnCtx := sctx.GetSessionVars().TxnCtx
if txnCtx.IsPessimistic {
txnCtx.AddUnchangedRowKey(unchangedRowKey)
}
return false, nil
}

// Fill values into on-update-now fields, only if they are really changed.
Expand Down Expand Up @@ -217,24 +231,6 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old
return true, nil
}

func appendUnchangedRowForLock(sctx sessionctx.Context, t table.Table, h kv.Handle, row []types.Datum) (bool, error) {
txnCtx := sctx.GetSessionVars().TxnCtx
if !txnCtx.IsPessimistic {
return false, nil
}
physicalID := t.Meta().ID
if pt, ok := t.(table.PartitionedTable); ok {
p, err := pt.GetPartitionByRow(sctx, row)
if err != nil {
return false, err
}
physicalID = p.GetPhysicalID()
}
unchangedRowKey := tablecodec.EncodeRowKeyWithHandle(physicalID, h)
txnCtx.AddUnchangedRowKey(unchangedRowKey)
return true, nil
}

func rebaseAutoRandomValue(ctx context.Context, sctx sessionctx.Context, t table.Table, newData *types.Datum, col *table.Column) error {
tableInfo := t.Meta()
if !tableInfo.ContainsAutoRandomBits() {
Expand Down
75 changes: 49 additions & 26 deletions tests/realtikvtest/pessimistictest/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ func TestOptimisticConflicts(t *testing.T) {
tk.MustExec("begin pessimistic")
// This SQL use BatchGet and cache data in the txn snapshot.
// It can be changed to other SQLs that use BatchGet.
tk.MustExec("select * from conflict where id in (1, 2, 3)")
tk.MustExec("insert ignore into conflict values (1, 2)")

tk2.MustExec("update conflict set c = c - 1")

Expand Down Expand Up @@ -2816,37 +2816,60 @@ func TestAsyncCommitCalTSFail(t *testing.T) {
tk2.MustExec("commit")
}

func TestIssue28011(t *testing.T) {
func TestChangeLockToPut(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)

tk := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2.MustExec("use test")

for _, tt := range []struct {
name string
lockQuery string
finalRows [][]interface{}
}{
{"Update", "update t set b = 'x' where a = 'a'", testkit.Rows("a x", "b y", "c z")},
{"BatchUpdate", "update t set b = 'x' where a in ('a', 'b', 'c')", testkit.Rows("a x", "b y", "c x")},
{"SelectForUpdate", "select a from t where a = 'a' for update", testkit.Rows("a x", "b y", "c z")},
{"BatchSelectForUpdate", "select a from t where a in ('a', 'b', 'c') for update", testkit.Rows("a x", "b y", "c z")},
} {
t.Run(tt.name, func(t *testing.T) {
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a varchar(10) primary key nonclustered, b varchar(10))")
tk.MustExec("insert into t values ('a', 'x'), ('b', 'x'), ('c', 'z')")
tk.MustExec("begin")
tk.MustExec(tt.lockQuery)
tk.MustQuery("select a from t").Check(testkit.Rows("a", "b", "c"))
tk.MustExec("replace into t values ('b', 'y')")
tk.MustQuery("select a from t").Check(testkit.Rows("a", "b", "c"))
tk.MustQuery("select a, b from t order by a").Check(tt.finalRows)
tk.MustExec("commit")
tk.MustQuery("select a, b from t order by a").Check(tt.finalRows)
tk.MustExec("admin check table t")
})
tk.MustExec("drop table if exists tk")
tk.MustExec("create table t1(c1 varchar(20) key, c2 int, c3 int, unique key k1(c2), key k2(c3))")
tk.MustExec(`insert into t1 values ("1", 1, 1), ("2", 2, 2), ("3", 3, 3)`)

// Test point get change lock to put.
for _, mode := range []string{"REPEATABLE-READ", "READ-COMMITTED"} {
tk.MustExec(fmt.Sprintf(`set tx_isolation = "%s"`, mode))
tk.MustExec("begin pessimistic")
tk.MustQuery(`select * from t1 where c1 = "1" for update`).Check(testkit.Rows("1 1 1"))
tk.MustExec("commit")
tk.MustExec("begin pessimistic")
tk.MustQuery(`select * from t1 where c1 = "1" for update`).Check(testkit.Rows("1 1 1"))
tk.MustExec("commit")
tk.MustExec("admin check table t1")
tk2.MustExec("begin")
tk2.MustQuery(`select * from t1 use index(k1) where c2 = "1" for update`).Check(testkit.Rows("1 1 1"))
tk2.MustQuery(`select * from t1 use index(k1) where c2 = "3" for update`).Check(testkit.Rows("3 3 3"))
tk2.MustExec("commit")
tk2.MustExec("begin")
tk2.MustQuery(`select * from t1 use index(k2) where c3 = 1`).Check(testkit.Rows("1 1 1"))
tk2.MustQuery("select * from t1 use index(k2) where c3 > 1").Check(testkit.Rows("2 2 2", "3 3 3"))
tk2.MustExec("commit")
}

// Test batch point get change lock to put.
for _, mode := range []string{"REPEATABLE-READ", "READ-COMMITTED"} {
tk.MustExec(fmt.Sprintf(`set tx_isolation = "%s"`, mode))
tk.MustExec("begin pessimistic")
tk.MustQuery(`select * from t1 where c1 in ("1", "5", "3") for update`).Check(testkit.Rows("1 1 1", "3 3 3"))
tk.MustExec("commit")
tk.MustExec("begin pessimistic")
tk.MustQuery(`select * from t1 where c1 in ("1", "2", "8") for update`).Check(testkit.Rows("1 1 1", "2 2 2"))
tk.MustExec("commit")
tk.MustExec("admin check table t1")
tk2.MustExec("begin")
tk2.MustQuery(`select * from t1 use index(k1) where c2 in ("1", "2", "3") for update`).Check(testkit.Rows("1 1 1", "2 2 2", "3 3 3"))
tk2.MustQuery(`select * from t1 use index(k2) where c2 in ("2") for update`).Check(testkit.Rows("2 2 2"))
tk2.MustExec("commit")
tk2.MustExec("begin")
tk2.MustQuery(`select * from t1 use index(k2) where c3 in (5, 8)`).Check(testkit.Rows())
tk2.MustQuery(`select * from t1 use index(k2) where c3 in (1, 8) for update`).Check(testkit.Rows("1 1 1"))
tk2.MustQuery(`select * from t1 use index(k2) where c3 > 1`).Check(testkit.Rows("2 2 2", "3 3 3"))
tk2.MustExec("commit")
}

tk.MustExec("admin check table t1")
}

func createTable(part bool, columnNames []string, columnTypes []string) string {
Expand Down

0 comments on commit 5239c23

Please sign in to comment.