Skip to content

Commit

Permalink
fix the incorrect untouch used in optimistic transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
cfzjywxk committed Dec 8, 2021
1 parent 90ae7cc commit 0a63fcf
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 3 deletions.
11 changes: 10 additions & 1 deletion store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,16 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
} else {
value = it.Value()
if len(value) > 0 {
if tablecodec.IsUntouchedIndexKValue(key, value) {
isUntouchedValue := tablecodec.IsUntouchedIndexKValue(key, value)
if isUntouchedValue {
if flags.HasPresumeKeyNotExists() {
logutil.BgLogger().Error("unexpected path the untouched key value with PresumeKeyNotExists flag",
zap.Stringer("key", key), zap.Stringer("value", kv.Key(value)),
zap.Uint16("flags", uint16(flags)), zap.Stack("stack"))
return errors.Errorf(
"unexpected path the untouched key=%s value=%s contains PresumeKeyNotExists flag keyFlags=%v",
key.String(), kv.Key(value).String(), flags)
}
if !flags.HasLocked() {
continue
}
Expand Down
18 changes: 16 additions & 2 deletions table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,22 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue
// should not overwrite the key with un-commit flag.
// So if the key exists, just do nothing and return.
v, err := txn.GetMemBuffer().Get(ctx, key)
if err == nil && len(v) != 0 {
return nil, nil
if err == nil {
if len(v) != 0 {
return nil, nil
}
// The key is marked as deleted in the memory buffer, as the existence check is done lazily
// for optimistic transactions by default. The "untouched" key could still exist in the store,
// it's needed to commit this key to do the existence check so unset the untouched flag.
if !txn.IsPessimistic() {
keyFlags, err := txn.GetMemBuffer().GetFlags(key)
if err != nil {
return nil, err
}
if keyFlags.HasPresumeKeyNotExists() {
opt.Untouched = false
}
}
}
}

Expand Down
26 changes: 26 additions & 0 deletions table/tables/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,3 +746,29 @@ func (ts *testSuite) TestViewColumns(c *C) {
"Warning|1356|View 'test.va' references invalid table(s) or column(s) or function(s) or definer/invoker of view lack rights to use them"))
}
}

func (ts *testSuite) TestConstraintCheckForOptimisticUntouched(c *C) {
se, err := session.CreateSession4Test(ts.store)
c.Assert(err, IsNil)
c.Assert(se.Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil), IsTrue)
tk := testkit.NewTestKitWithSession(c, ts.store, se)

tk.MustExec("use test")
tk.MustExec("drop table if exists test_optimistic_untouched_flag;")
tk.MustExec(`create table test_optimistic_untouched_flag(c0 int, c1 varchar(20), c2 varchar(20), unique key uk(c0));`)
tk.MustExec(`insert into test_optimistic_untouched_flag(c0, c1, c2) values (1, null, 'green');`)

// Insert a row with duplicated entry on the unique key, the commit should fail.
tk.MustExec("begin optimistic;")
tk.MustExec(`insert into test_optimistic_untouched_flag(c0, c1, c2) values (1, 'red', 'white');`)
tk.MustExec(`delete from test_optimistic_untouched_flag where c1 is null;`)
tk.MustExec("update test_optimistic_untouched_flag set c2 = 'green' where c2 between 'purple' and 'white';")
err = tk.ExecToErr("commit")
c.Assert(err, NotNil)

tk.MustExec("begin optimistic;")
tk.MustExec(`insert into test_optimistic_untouched_flag(c0, c1, c2) values (1, 'red', 'white');`)
tk.MustExec("update test_optimistic_untouched_flag set c2 = 'green' where c2 between 'purple' and 'white';")
err = tk.ExecToErr("commit")
c.Assert(err, NotNil)
}

0 comments on commit 0a63fcf

Please sign in to comment.