Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add aggressive-locking mechanism and support locking with conflict #528

Merged
merged 41 commits into from
Dec 1, 2022
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
46791bf
update kvproto
MyonKeminta Jun 2, 2022
b38f9d5
Update pessimistic lock request logic
MyonKeminta Jun 15, 2022
031ad12
Add handling for pessimistic lock returning locked with conflict
MyonKeminta Jun 16, 2022
27ef954
Add error check
MyonKeminta Jun 16, 2022
82cd7ff
Add aggressivelocking interface
MyonKeminta Jun 20, 2022
552885c
Finish cleanup logic and some refactor
MyonKeminta Jun 20, 2022
8a10d95
Add IsInAggressiveLockingMode for KVTxn
MyonKeminta Jun 20, 2022
d1b1dbf
Fix aggressiveLockingContext isn't reset after cancelling
MyonKeminta Jun 21, 2022
9954bb6
Add more info in KVTxn.String()
MyonKeminta Jun 21, 2022
1f0d628
Fix panic in GetMutation
MyonKeminta Jun 22, 2022
1078efa
sync
MyonKeminta Jun 22, 2022
542e8a5
Fix readonly ignoring aggressive locking
MyonKeminta Jun 29, 2022
6f0f4e2
Add method for checking if a key is staged for aggressive locking
MyonKeminta Jun 29, 2022
ec6081e
Check if locking can be skipped according to ttl
MyonKeminta Jun 29, 2022
152ef0a
Allow lockfirst mode only for single key requests
MyonKeminta Jul 27, 2022
6a5013c
Handle region error properly
MyonKeminta Jul 28, 2022
5ec76a1
Fix region error handling
MyonKeminta Aug 1, 2022
c3ee72a
Add logs for debugging
MyonKeminta Aug 2, 2022
e21588a
Try to fix invalid pessimistic rollback
MyonKeminta Aug 2, 2022
51f783b
Remove verbose logs
MyonKeminta Aug 2, 2022
fa6159d
Merge branch 'master' of https://github.com/tikv/client-go into m/pes…
MyonKeminta Aug 3, 2022
f12f8d6
Fix nil pointer dereference when a stmt locks 0 keys and mistake in
MyonKeminta Aug 3, 2022
8dafa31
Try to fix PessimisticLockNotFound error
MyonKeminta Aug 4, 2022
fe6ebca
Support skip resolving locks
MyonKeminta Aug 5, 2022
ed31e1b
Merge branch 'master' of https://github.com/tikv/client-go into m/pes…
MyonKeminta Oct 25, 2022
6b505f7
Add some simple integration tests
MyonKeminta Nov 23, 2022
72d3345
Merge branch 'master' of https://github.com/tikv/client-go into m/pes…
MyonKeminta Nov 23, 2022
b22ce72
Remove skip resolve lock related code; go mod tidy
MyonKeminta Nov 23, 2022
4c82029
Fix lint
MyonKeminta Nov 23, 2022
460ff53
Fix lint
MyonKeminta Nov 23, 2022
40987a9
Remove uncertain related code which is currently useless
MyonKeminta Nov 24, 2022
c451712
Renaming
MyonKeminta Nov 24, 2022
dee7e13
Remove some code for handling multikey
MyonKeminta Nov 24, 2022
9e1a07a
Address comments; add more comments
MyonKeminta Nov 29, 2022
e5bd5fe
Disable force lock behavior when LockOnlyIfExists is enabled
MyonKeminta Nov 29, 2022
16aa457
Merge branch 'master' of https://github.com/tikv/client-go into m/pes…
MyonKeminta Nov 29, 2022
3326e2b
Address comments
MyonKeminta Nov 30, 2022
dc66a8e
Remove useless code that handles partial-successful cases which won't…
MyonKeminta Nov 30, 2022
79c70dd
extract region error and key error handling logic
MyonKeminta Nov 30, 2022
c85f408
Fix lint
MyonKeminta Nov 30, 2022
a610577
Remove useless comments
MyonKeminta Nov 30, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20221117075110-51120697d051
github.com/pingcap/kvproto v0.0.0-20221123043343-cdc67325f05f
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221117075110-51120697d051 h1:Ywk7n+4zm6W6T9XSyAwihBWdxXR2ALQzswQMEOglHkM=
github.com/pingcap/kvproto v0.0.0-20221117075110-51120697d051/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221123043343-cdc67325f05f h1:hnUlIU5nCH6PAO9DC5DhODX1cwqoTcXTNIODyvNI9q4=
github.com/pingcap/kvproto v0.0.0-20221123043343-cdc67325f05f/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 h1:URLoJ61DmmY++Sa/yyPEQHG2s/ZBeV1FbIswHEMrdoY=
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
120 changes: 120 additions & 0 deletions integration_tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,126 @@ func (s *testCommitterSuite) TestPessimisticLockCheckExistence() {
s.Nil(txn.Rollback())
}

func (s *testCommitterSuite) TestPessimisticLockAllowLockWithConflict() {
key := []byte("key")

txn0 := s.begin()
txn0.SetPessimistic(true)
s.Nil(txn0.Set(key, key))
s.Nil(txn0.Commit(context.Background()))

// No conflict cases
for _, returnValues := range []bool{false, true} {
for _, checkExistence := range []bool{false, true} {
txn := s.begin()
txn.SetPessimistic(true)
txn.StartAggressiveLocking()
lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
if checkExistence {
lockCtx.InitCheckExistence(1)
}
if returnValues {
lockCtx.InitReturnValues(1)
}
s.Nil(txn.LockKeys(context.Background(), lockCtx, key))
if checkExistence || returnValues {
s.Len(lockCtx.Values, 1)
s.True(lockCtx.Values[string(key)].Exists)
} else {
s.Len(lockCtx.Values, 0)
}
if returnValues {
s.Equal(key, lockCtx.Values[string(key)].Value)
} else {
s.Len(lockCtx.Values[string(key)].Value, 0)
}
s.Equal(uint64(0), lockCtx.Values[string(key)].LockedWithConflictTS)
s.Equal(uint64(0), lockCtx.MaxLockedWithConflictTS)

txn.DoneAggressiveLocking(context.Background())
s.Nil(txn.Rollback())
}
}

// Conflicting cases
for _, returnValues := range []bool{false, true} {
for _, checkExistence := range []bool{false, true} {
// Make different values
value := []byte(fmt.Sprintf("value-%v-%v", returnValues, checkExistence))
txn0 := s.begin()
txn0.SetPessimistic(true)
s.Nil(txn0.Set(key, value))

txn := s.begin()
txn.SetPessimistic(true)
txn.StartAggressiveLocking()

s.Nil(txn0.Commit(context.Background()))
s.Greater(txn0.GetCommitTS(), txn.StartTS())

lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}
if checkExistence {
lockCtx.InitCheckExistence(1)
}
if returnValues {
lockCtx.InitReturnValues(1)
}
s.Nil(txn.LockKeys(context.Background(), lockCtx, key))

s.Equal(txn0.GetCommitTS(), lockCtx.MaxLockedWithConflictTS)
v := lockCtx.Values[string(key)]
s.Equal(txn0.GetCommitTS(), v.LockedWithConflictTS)
s.True(v.Exists)
s.Equal(value, v.Value)

txn.CancelAggressiveLocking(context.Background())
s.Nil(txn.Rollback())
}
}
}

func (s *testCommitterSuite) TestPessimisticLockAllowLockWithConflictError() {
key := []byte("key")

for _, returnValues := range []bool{false, true} {
for _, checkExistence := range []bool{false, true} {
// Another transaction locked the key.
txn0 := s.begin()
txn0.SetPessimistic(true)
lockCtx := &kv.LockCtx{ForUpdateTS: txn0.StartTS(), WaitStartTime: time.Now()}
s.Nil(txn0.LockKeys(context.Background(), lockCtx, key))

// Test key is locked
txn := s.begin()
txn.SetPessimistic(true)
txn.StartAggressiveLocking()
lockCtx = kv.NewLockCtx(txn.StartTS(), 10, time.Now())
if checkExistence {
lockCtx.InitCheckExistence(1)
}
if returnValues {
lockCtx.InitReturnValues(1)
}
err := txn.LockKeys(context.Background(), lockCtx, key)
s.NotNil(err)
s.Equal(tikverr.ErrLockWaitTimeout.Error(), err.Error())
s.Equal([]string{}, txn.GetAggressiveLockingKeys())

// Abort the blocking transaction.
s.Nil(txn0.Rollback())

// Test region error
s.Nil(failpoint.Enable("tikvclient/tikvStoreSendReqResult", `1*return("PessimisticLockNotLeader")`))
err = txn.LockKeys(context.Background(), lockCtx, key)
s.Nil(err)
s.Nil(failpoint.Disable("tikvclient/tikvStoreSendReqResult"))
s.Equal([]string{"key"}, txn.GetAggressiveLockingKeys())
txn.CancelAggressiveLocking(context.Background())
s.Nil(txn.Rollback())
}
}
}

// TestElapsedTTL tests that elapsed time is correct even if ts physical time is greater than local time.
func (s *testCommitterSuite) TestElapsedTTL() {
key := []byte("key")
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/ninedraft/israce v0.0.3
github.com/pingcap/errors v0.11.5-0.20220729040631-518f63d66278
github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
github.com/pingcap/kvproto v0.0.0-20221117075110-51120697d051
github.com/pingcap/kvproto v0.0.0-20221123043343-cdc67325f05f
github.com/pingcap/tidb v1.1.0-beta.0.20221101102559-97add26c8f84
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.0
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,8 @@ github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 h1:Pe2LbxRmbTfAoKJ65bZL
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221117075110-51120697d051 h1:Ywk7n+4zm6W6T9XSyAwihBWdxXR2ALQzswQMEOglHkM=
github.com/pingcap/kvproto v0.0.0-20221117075110-51120697d051/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221123043343-cdc67325f05f h1:hnUlIU5nCH6PAO9DC5DhODX1cwqoTcXTNIODyvNI9q4=
github.com/pingcap/kvproto v0.0.0-20221123043343-cdc67325f05f/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 h1:URLoJ61DmmY++Sa/yyPEQHG2s/ZBeV1FbIswHEMrdoY=
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
6 changes: 3 additions & 3 deletions integration_tests/raw/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,15 +462,15 @@ func (s *apiTestSuite) TestEmptyValue() {
s.Empty(v)
// batch_get
vs := s.mustBatchGetBytes(prefix, []string{"key", "key1"})
s.Equal([][]byte{[]byte{}, nil}, vs)
s.Equal([][]byte{{}, nil}, vs)
// scan
keys, values := s.mustScanBytes(prefix, "key", "keyz", 10)
s.Equal([][]byte{[]byte("key")}, keys)
s.Equal([][]byte{[]byte{}}, values)
s.Equal([][]byte{{}}, values)
// reverse scan
keys, values = s.mustReverseScanBytes(prefix, "keyz", "key", 10)
s.Equal([][]byte{[]byte("key")}, keys)
s.Equal([][]byte{[]byte{}}, values)
s.Equal([][]byte{{}}, values)
}

verifyNotExist := func() {
Expand Down
6 changes: 6 additions & 0 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,12 @@ func (s *RegionRequestSender) SendReqCtx(
Resp: &kvrpcpb.GCResponse{RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{}}},
}, nil, nil
}
case "PessimisticLockNotLeader":
if req.Type == tikvrpc.CmdPessimisticLock {
return &tikvrpc.Response{
Resp: &kvrpcpb.PessimisticLockResponse{RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{}}},
}, nil, nil
}
case "GCServerIsBusy":
if req.Type == tikvrpc.CmdGC {
return &tikvrpc.Response{
Expand Down
Loading