diff --git a/session/session.go b/session/session.go index c1d4c17a1a297..a9c2544b1ced4 100644 --- a/session/session.go +++ b/session/session.go @@ -69,6 +69,7 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/statistics/handle" + "github.com/pingcap/tidb/store/driver/txn" "github.com/pingcap/tidb/store/tikv" tikvstore "github.com/pingcap/tidb/store/tikv/kv" tikvutil "github.com/pingcap/tidb/store/tikv/util" @@ -544,14 +545,15 @@ func (s *session) doCommit(ctx context.Context) error { type temporaryTableKVFilter map[int64]tableutil.TempTable -func (m temporaryTableKVFilter) IsUnnecessaryKeyValue(key, value []byte, flags tikvstore.KeyFlags) bool { +func (m temporaryTableKVFilter) IsUnnecessaryKeyValue(key, value []byte, flags tikvstore.KeyFlags) (bool, error) { tid := tablecodec.DecodeTableID(key) if _, ok := m[tid]; ok { - return true + return true, nil } // This is the default filter for all tables. - return tablecodec.IsUntouchedIndexKValue(key, value) + defaultFilter := txn.TiDBKVFilter{} + return defaultFilter.IsUnnecessaryKeyValue(key, value, flags) } // errIsNoisy is used to filter DUPLCATE KEY errors. diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index 93a4f4a9508df..b7f011d6c1836 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -29,6 +29,8 @@ import ( tikverr "github.com/pingcap/tidb/store/tikv/error" tikvstore "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" ) type tikvTxn struct { @@ -235,6 +237,15 @@ func (txn *tikvTxn) extractKeyExistsErr(key kv.Key) error { type TiDBKVFilter struct{} // IsUnnecessaryKeyValue defines which kinds of KV pairs from TiDB needn't be committed. -func (f TiDBKVFilter) IsUnnecessaryKeyValue(key, value []byte, flags tikvstore.KeyFlags) bool { - return tablecodec.IsUntouchedIndexKValue(key, value) +func (f TiDBKVFilter) IsUnnecessaryKeyValue(key, value []byte, flags tikvstore.KeyFlags) (bool, error) { + isUntouchedValue := tablecodec.IsUntouchedIndexKValue(key, value) + if isUntouchedValue && flags.HasPresumeKeyNotExists() { + logutil.BgLogger().Error("unexpected path the untouched key value with PresumeKeyNotExists flag", + zap.Stringer("key", kv.Key(key)), zap.Stringer("value", kv.Key(value)), + zap.Uint16("flags", uint16(flags)), zap.Stack("stack")) + return false, errors.Errorf( + "unexpected path the untouched key=%s value=%s contains PresumeKeyNotExists flag keyFlags=%v", + kv.Key(key).String(), kv.Key(value).String(), flags) + } + return isUntouchedValue, nil } diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index f8be95e741a99..37e784b46de4a 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -323,7 +323,7 @@ func (c *twoPhaseCommitter) extractKeyExistsErr(err *tikverr.ErrKeyExist) error // KVFilter is a filter that filters out unnecessary KV pairs. type KVFilter interface { // IsUnnecessaryKeyValue returns whether this KV pair should be committed. - IsUnnecessaryKeyValue(key, value []byte, flags kv.KeyFlags) bool + IsUnnecessaryKeyValue(key, value []byte, flags kv.KeyFlags) (bool, error) } func (c *twoPhaseCommitter) initKeysAndMutations() error { @@ -353,7 +353,13 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error { } else { value = it.Value() if len(value) > 0 { - isUnnecessaryKV := filter != nil && filter.IsUnnecessaryKeyValue(key, value, flags) + var isUnnecessaryKV bool + if filter != nil { + isUnnecessaryKV, err = filter.IsUnnecessaryKeyValue(key, value, flags) + if err != nil { + return err + } + } if isUnnecessaryKV { if !flags.HasLocked() { continue diff --git a/table/tables/index.go b/table/tables/index.go index aef03d0590aaa..e1e44823a911f 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -168,8 +168,22 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue // should not overwrite the key with un-commit flag. // So if the key exists, just do nothing and return. v, err := txn.GetMemBuffer().Get(ctx, key) - if err == nil && len(v) != 0 { - return nil, nil + if err == nil { + if len(v) != 0 { + return nil, nil + } + // The key is marked as deleted in the memory buffer, as the existence check is done lazily + // for optimistic transactions by default. The "untouched" key could still exist in the store, + // it's needed to commit this key to do the existence check so unset the untouched flag. + if !txn.IsPessimistic() { + keyFlags, err := txn.GetMemBuffer().GetFlags(key) + if err != nil { + return nil, err + } + if keyFlags.HasPresumeKeyNotExists() { + opt.Untouched = false + } + } } } diff --git a/table/tables/tables_test.go b/table/tables/tables_test.go index ec5b82e351c74..5fb6ebf1d1f06 100644 --- a/table/tables/tables_test.go +++ b/table/tables/tables_test.go @@ -746,3 +746,29 @@ func (ts *testSuite) TestViewColumns(c *C) { "Warning|1356|View 'test.va' references invalid table(s) or column(s) or function(s) or definer/invoker of view lack rights to use them")) } } + +func (ts *testSuite) TestConstraintCheckForOptimisticUntouched(c *C) { + se, err := session.CreateSession4Test(ts.store) + c.Assert(err, IsNil) + c.Assert(se.Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil), IsTrue) + tk := testkit.NewTestKitWithSession(c, ts.store, se) + + tk.MustExec("use test") + tk.MustExec("drop table if exists test_optimistic_untouched_flag;") + tk.MustExec(`create table test_optimistic_untouched_flag(c0 int, c1 varchar(20), c2 varchar(20), unique key uk(c0));`) + tk.MustExec(`insert into test_optimistic_untouched_flag(c0, c1, c2) values (1, null, 'green');`) + + // Insert a row with duplicated entry on the unique key, the commit should fail. + tk.MustExec("begin optimistic;") + tk.MustExec(`insert into test_optimistic_untouched_flag(c0, c1, c2) values (1, 'red', 'white');`) + tk.MustExec(`delete from test_optimistic_untouched_flag where c1 is null;`) + tk.MustExec("update test_optimistic_untouched_flag set c2 = 'green' where c2 between 'purple' and 'white';") + err = tk.ExecToErr("commit") + c.Assert(err, NotNil) + + tk.MustExec("begin optimistic;") + tk.MustExec(`insert into test_optimistic_untouched_flag(c0, c1, c2) values (1, 'red', 'white');`) + tk.MustExec("update test_optimistic_untouched_flag set c2 = 'green' where c2 between 'purple' and 'white';") + err = tk.ExecToErr("commit") + c.Assert(err, NotNil) +}