From d66dee1600131a13c20120e1d9c165cb8ffb0315 Mon Sep 17 00:00:00 2001 From: you06 Date: Fri, 19 May 2023 14:14:35 +0800 Subject: [PATCH] Revert "executor: revert #25730 to fix #28011 (#42488) (#42503)" This reverts commit 6211ae68887bd469a3875660bc1f271e51a39b94. --- executor/batch_point_get.go | 35 +++++++++ executor/point_get.go | 12 +++ .../pessimistictest/pessimistic_test.go | 73 ++++++++++++------- 3 files changed, 95 insertions(+), 25 deletions(-) diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 965708073a097..ee9808700aaec 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -392,6 +392,23 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { if err != nil { return err } + // Change the unique index LOCK into PUT record. + if len(indexKeys) > 0 { + if !e.txn.Valid() { + return kv.ErrInvalidTxn + } + membuf := e.txn.GetMemBuffer() + for _, idxKey := range indexKeys { + handleVal := handleVals[string(idxKey)] + if len(handleVal) == 0 { + continue + } + err = membuf.Set(idxKey, handleVal) + if err != nil { + return err + } + } + } } // Fetch all values. values, err = batchGetter.BatchGet(ctx, keys) @@ -403,6 +420,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { if e.lock && rc { existKeys = make([]kv.Key, 0, 2*len(values)) } + changeLockToPutIdxKeys := make([]kv.Key, 0, len(indexKeys)) e.values = make([][]byte, 0, len(values)) for i, key := range keys { val := values[string(key)] @@ -437,6 +455,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { // lock primary key for clustered index table is redundant if len(indexKeys) != 0 { existKeys = append(existKeys, indexKeys[i]) + changeLockToPutIdxKeys = append(changeLockToPutIdxKeys, indexKeys[i]) } } } @@ -446,6 +465,22 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { if err != nil { return err } + if len(changeLockToPutIdxKeys) > 0 { + if !e.txn.Valid() { + return kv.ErrInvalidTxn + } + for _, idxKey := range changeLockToPutIdxKeys { + membuf := e.txn.GetMemBuffer() + handleVal := handleVals[string(idxKey)] + if len(handleVal) == 0 { + return kv.ErrNotExist + } + err = membuf.Set(idxKey, handleVal) + if err != nil { + return err + } + } + } } e.handles = handles return nil diff --git a/executor/point_get.go b/executor/point_get.go index bb958055dedd8..3e3cddb08d9ba 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -282,6 +282,18 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { return nil } + // Change the unique index LOCK into PUT record. + if e.lock { + if !e.txn.Valid() { + return kv.ErrInvalidTxn + } + memBuffer := e.txn.GetMemBuffer() + err = memBuffer.Set(e.idxKey, e.handleVal) + if err != nil { + return err + } + } + var iv kv.Handle iv, err = tablecodec.DecodeHandleInUniqueIndexValue(e.handleVal, e.tblInfo.IsCommonHandle) if err != nil { diff --git a/tests/realtikvtest/pessimistictest/pessimistic_test.go b/tests/realtikvtest/pessimistictest/pessimistic_test.go index ad40080c765a9..ae7545e0e91f6 100644 --- a/tests/realtikvtest/pessimistictest/pessimistic_test.go +++ b/tests/realtikvtest/pessimistictest/pessimistic_test.go @@ -2816,37 +2816,60 @@ func TestAsyncCommitCalTSFail(t *testing.T) { tk2.MustExec("commit") } -func TestIssue28011(t *testing.T) { +func TestChangeLockToPut(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) tk := testkit.NewTestKit(t, store) + tk2 := testkit.NewTestKit(t, store) tk.MustExec("use test") + tk2.MustExec("use test") - for _, tt := range []struct { - name string - lockQuery string - finalRows [][]interface{} - }{ - {"Update", "update t set b = 'x' where a = 'a'", testkit.Rows("a x", "b y", "c z")}, - {"BatchUpdate", "update t set b = 'x' where a in ('a', 'b', 'c')", testkit.Rows("a x", "b y", "c x")}, - {"SelectForUpdate", "select a from t where a = 'a' for update", testkit.Rows("a x", "b y", "c z")}, - {"BatchSelectForUpdate", "select a from t where a in ('a', 'b', 'c') for update", testkit.Rows("a x", "b y", "c z")}, - } { - t.Run(tt.name, func(t *testing.T) { - tk.MustExec("drop table if exists t") - tk.MustExec("create table t (a varchar(10) primary key nonclustered, b varchar(10))") - tk.MustExec("insert into t values ('a', 'x'), ('b', 'x'), ('c', 'z')") - tk.MustExec("begin") - tk.MustExec(tt.lockQuery) - tk.MustQuery("select a from t").Check(testkit.Rows("a", "b", "c")) - tk.MustExec("replace into t values ('b', 'y')") - tk.MustQuery("select a from t").Check(testkit.Rows("a", "b", "c")) - tk.MustQuery("select a, b from t order by a").Check(tt.finalRows) - tk.MustExec("commit") - tk.MustQuery("select a, b from t order by a").Check(tt.finalRows) - tk.MustExec("admin check table t") - }) + tk.MustExec("drop table if exists tk") + tk.MustExec("create table t1(c1 varchar(20) key, c2 int, c3 int, unique key k1(c2), key k2(c3))") + tk.MustExec(`insert into t1 values ("1", 1, 1), ("2", 2, 2), ("3", 3, 3)`) + + // Test point get change lock to put. + for _, mode := range []string{"REPEATABLE-READ", "READ-COMMITTED"} { + tk.MustExec(fmt.Sprintf(`set tx_isolation = "%s"`, mode)) + tk.MustExec("begin pessimistic") + tk.MustQuery(`select * from t1 where c1 = "1" for update`).Check(testkit.Rows("1 1 1")) + tk.MustExec("commit") + tk.MustExec("begin pessimistic") + tk.MustQuery(`select * from t1 where c1 = "1" for update`).Check(testkit.Rows("1 1 1")) + tk.MustExec("commit") + tk.MustExec("admin check table t1") + tk2.MustExec("begin") + tk2.MustQuery(`select * from t1 use index(k1) where c2 = "1" for update`).Check(testkit.Rows("1 1 1")) + tk2.MustQuery(`select * from t1 use index(k1) where c2 = "3" for update`).Check(testkit.Rows("3 3 3")) + tk2.MustExec("commit") + tk2.MustExec("begin") + tk2.MustQuery(`select * from t1 use index(k2) where c3 = 1`).Check(testkit.Rows("1 1 1")) + tk2.MustQuery("select * from t1 use index(k2) where c3 > 1").Check(testkit.Rows("2 2 2", "3 3 3")) + tk2.MustExec("commit") } + + // Test batch point get change lock to put. + for _, mode := range []string{"REPEATABLE-READ", "READ-COMMITTED"} { + tk.MustExec(fmt.Sprintf(`set tx_isolation = "%s"`, mode)) + tk.MustExec("begin pessimistic") + tk.MustQuery(`select * from t1 where c1 in ("1", "5", "3") for update`).Check(testkit.Rows("1 1 1", "3 3 3")) + tk.MustExec("commit") + tk.MustExec("begin pessimistic") + tk.MustQuery(`select * from t1 where c1 in ("1", "2", "8") for update`).Check(testkit.Rows("1 1 1", "2 2 2")) + tk.MustExec("commit") + tk.MustExec("admin check table t1") + tk2.MustExec("begin") + tk2.MustQuery(`select * from t1 use index(k1) where c2 in ("1", "2", "3") for update`).Check(testkit.Rows("1 1 1", "2 2 2", "3 3 3")) + tk2.MustQuery(`select * from t1 use index(k2) where c2 in ("2") for update`).Check(testkit.Rows("2 2 2")) + tk2.MustExec("commit") + tk2.MustExec("begin") + tk2.MustQuery(`select * from t1 use index(k2) where c3 in (5, 8)`).Check(testkit.Rows()) + tk2.MustQuery(`select * from t1 use index(k2) where c3 in (1, 8) for update`).Check(testkit.Rows("1 1 1")) + tk2.MustQuery(`select * from t1 use index(k2) where c3 > 1`).Check(testkit.Rows("2 2 2", "3 3 3")) + tk2.MustExec("commit") + } + + tk.MustExec("admin check table t1") } func createTable(part bool, columnNames []string, columnTypes []string) string {