Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: fix the incorrect untouch used in optimistic transactions(#30447) #30526

Merged
merged 3 commits into from
Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/store/driver/txn"
"github.com/pingcap/tidb/store/tikv"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
tikvutil "github.com/pingcap/tidb/store/tikv/util"
Expand Down Expand Up @@ -544,14 +545,15 @@ func (s *session) doCommit(ctx context.Context) error {

type temporaryTableKVFilter map[int64]tableutil.TempTable

func (m temporaryTableKVFilter) IsUnnecessaryKeyValue(key, value []byte, flags tikvstore.KeyFlags) bool {
func (m temporaryTableKVFilter) IsUnnecessaryKeyValue(key, value []byte, flags tikvstore.KeyFlags) (bool, error) {
tid := tablecodec.DecodeTableID(key)
if _, ok := m[tid]; ok {
return true
return true, nil
}

// This is the default filter for all tables.
return tablecodec.IsUntouchedIndexKValue(key, value)
defaultFilter := txn.TiDBKVFilter{}
return defaultFilter.IsUnnecessaryKeyValue(key, value, flags)
}

// errIsNoisy is used to filter DUPLCATE KEY errors.
Expand Down
15 changes: 13 additions & 2 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
tikverr "github.com/pingcap/tidb/store/tikv/error"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

type tikvTxn struct {
Expand Down Expand Up @@ -235,6 +237,15 @@ func (txn *tikvTxn) extractKeyExistsErr(key kv.Key) error {
type TiDBKVFilter struct{}

// IsUnnecessaryKeyValue defines which kinds of KV pairs from TiDB needn't be committed.
func (f TiDBKVFilter) IsUnnecessaryKeyValue(key, value []byte, flags tikvstore.KeyFlags) bool {
return tablecodec.IsUntouchedIndexKValue(key, value)
func (f TiDBKVFilter) IsUnnecessaryKeyValue(key, value []byte, flags tikvstore.KeyFlags) (bool, error) {
isUntouchedValue := tablecodec.IsUntouchedIndexKValue(key, value)
if isUntouchedValue && flags.HasPresumeKeyNotExists() {
logutil.BgLogger().Error("unexpected path the untouched key value with PresumeKeyNotExists flag",
zap.Stringer("key", kv.Key(key)), zap.Stringer("value", kv.Key(value)),
zap.Uint16("flags", uint16(flags)), zap.Stack("stack"))
return false, errors.Errorf(
"unexpected path the untouched key=%s value=%s contains PresumeKeyNotExists flag keyFlags=%v",
kv.Key(key).String(), kv.Key(value).String(), flags)
}
return isUntouchedValue, nil
}
10 changes: 8 additions & 2 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func (c *twoPhaseCommitter) extractKeyExistsErr(err *tikverr.ErrKeyExist) error
// KVFilter is a filter that filters out unnecessary KV pairs.
type KVFilter interface {
// IsUnnecessaryKeyValue returns whether this KV pair should be committed.
IsUnnecessaryKeyValue(key, value []byte, flags kv.KeyFlags) bool
IsUnnecessaryKeyValue(key, value []byte, flags kv.KeyFlags) (bool, error)
}

func (c *twoPhaseCommitter) initKeysAndMutations() error {
Expand Down Expand Up @@ -353,7 +353,13 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
} else {
value = it.Value()
if len(value) > 0 {
isUnnecessaryKV := filter != nil && filter.IsUnnecessaryKeyValue(key, value, flags)
var isUnnecessaryKV bool
if filter != nil {
isUnnecessaryKV, err = filter.IsUnnecessaryKeyValue(key, value, flags)
if err != nil {
return err
}
}
if isUnnecessaryKV {
if !flags.HasLocked() {
continue
Expand Down
18 changes: 16 additions & 2 deletions table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,22 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue
// should not overwrite the key with un-commit flag.
// So if the key exists, just do nothing and return.
v, err := txn.GetMemBuffer().Get(ctx, key)
if err == nil && len(v) != 0 {
return nil, nil
if err == nil {
if len(v) != 0 {
return nil, nil
}
// The key is marked as deleted in the memory buffer, as the existence check is done lazily
// for optimistic transactions by default. The "untouched" key could still exist in the store,
// it's needed to commit this key to do the existence check so unset the untouched flag.
if !txn.IsPessimistic() {
keyFlags, err := txn.GetMemBuffer().GetFlags(key)
if err != nil {
return nil, err
}
if keyFlags.HasPresumeKeyNotExists() {
opt.Untouched = false
}
}
}
}

Expand Down
26 changes: 26 additions & 0 deletions table/tables/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,3 +746,29 @@ func (ts *testSuite) TestViewColumns(c *C) {
"Warning|1356|View 'test.va' references invalid table(s) or column(s) or function(s) or definer/invoker of view lack rights to use them"))
}
}

func (ts *testSuite) TestConstraintCheckForOptimisticUntouched(c *C) {
se, err := session.CreateSession4Test(ts.store)
c.Assert(err, IsNil)
c.Assert(se.Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil), IsTrue)
tk := testkit.NewTestKitWithSession(c, ts.store, se)

tk.MustExec("use test")
tk.MustExec("drop table if exists test_optimistic_untouched_flag;")
tk.MustExec(`create table test_optimistic_untouched_flag(c0 int, c1 varchar(20), c2 varchar(20), unique key uk(c0));`)
tk.MustExec(`insert into test_optimistic_untouched_flag(c0, c1, c2) values (1, null, 'green');`)

// Insert a row with duplicated entry on the unique key, the commit should fail.
tk.MustExec("begin optimistic;")
tk.MustExec(`insert into test_optimistic_untouched_flag(c0, c1, c2) values (1, 'red', 'white');`)
tk.MustExec(`delete from test_optimistic_untouched_flag where c1 is null;`)
tk.MustExec("update test_optimistic_untouched_flag set c2 = 'green' where c2 between 'purple' and 'white';")
err = tk.ExecToErr("commit")
c.Assert(err, NotNil)

tk.MustExec("begin optimistic;")
tk.MustExec(`insert into test_optimistic_untouched_flag(c0, c1, c2) values (1, 'red', 'white');`)
tk.MustExec("update test_optimistic_untouched_flag set c2 = 'green' where c2 between 'purple' and 'white';")
err = tk.ExecToErr("commit")
c.Assert(err, NotNil)
}