Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: record resolving locks #473

Merged
merged 10 commits into from
May 17, 2022
8 changes: 8 additions & 0 deletions txnkv/transaction/pessimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
req.ResourceGroupTag = action.LockCtx.ResourceGroupTagger(req.Req.(*kvrpcpb.PessimisticLockRequest))
}
lockWaitStartTime := action.WaitStartTime
var resolvingRecordToken *int
for {
// if lockWaitTime set, refine the request `WaitTimeout` field based on timeout limit
if action.LockWaitTime() > 0 && action.LockWaitTime() != kv.LockAlwaysWait {
Expand Down Expand Up @@ -226,6 +227,13 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
// Because we already waited on tikv, no need to Backoff here.
// tikv default will wait 3s(also the maximum wait value) when lock error occurs
startTime = time.Now()
if resolvingRecordToken == nil {
token := c.store.GetLockResolver().RecordResolvingLocks(locks, c.startTS)
resolvingRecordToken = &token
defer c.store.GetLockResolver().ResolveLocksDone(c.startTS, *resolvingRecordToken)
} else {
c.store.GetLockResolver().UpdateResolvingLocks(locks, c.startTS, *resolvingRecordToken)
}
msBeforeTxnExpired, err := c.store.GetLockResolver().ResolveLocks(bo, 0, locks)
if err != nil {
return err
Expand Down
8 changes: 8 additions & 0 deletions txnkv/transaction/prewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B

req := c.buildPrewriteRequest(batch, txnSize)
sender := locate.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient())
var resolvingRecordToken *int
defer func() {
if err != nil {
// If we fail to receive response for async commit prewrite, it will be undetermined whether this
Expand Down Expand Up @@ -379,6 +380,13 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B
locks = append(locks, lock)
}
start := time.Now()
if resolvingRecordToken == nil {
token := c.store.GetLockResolver().RecordResolvingLocks(locks, c.startTS)
resolvingRecordToken = &token
defer c.store.GetLockResolver().ResolveLocksDone(c.startTS, *resolvingRecordToken)
} else {
c.store.GetLockResolver().UpdateResolvingLocks(locks, c.startTS, *resolvingRecordToken)
}
msBeforeExpired, err := c.store.GetLockResolver().ResolveLocks(bo, c.startTS, locks)
if err != nil {
return err
Expand Down
76 changes: 76 additions & 0 deletions txnkv/txnlock/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ type LockResolver struct {
resolveLockLiteThreshold uint64
mu struct {
sync.RWMutex
// These two fields is used to tracking lock resolving information
// currentStartTS -> caller token -> resolving locks
resolving map[uint64][][]Lock
// currentStartTS -> concurrency resolving lock process in progress
// use concurrency counting here to speed up checking
// whether we can free the resource used in `resolving`
resolvingConcurrency map[uint64]int
// resolved caches resolved txns (FIFO, txn id -> txnStatus).
resolved map[uint64]TxnStatus
recentResolved *list.List
Expand All @@ -76,13 +83,23 @@ type LockResolver struct {
asyncResolveCancel func()
}

// ResolvingLock stands for current resolving locks' information
type ResolvingLock struct {
TxnID uint64
LockTxnID uint64
Key []byte
Primary []byte
}

// NewLockResolver creates a new LockResolver instance.
func NewLockResolver(store storage) *LockResolver {
r := &LockResolver{
store: store,
resolveLockLiteThreshold: config.GetGlobalConfig().TiKVClient.ResolveLockLiteThreshold,
}
r.mu.resolved = make(map[uint64]TxnStatus)
r.mu.resolving = make(map[uint64][][]Lock)
r.mu.resolvingConcurrency = make(map[uint64]int)
r.mu.recentResolved = list.New()
r.asyncResolveCtx, r.asyncResolveCancel = context.WithCancel(context.Background())
return r
Expand Down Expand Up @@ -322,6 +339,45 @@ func (lr *LockResolver) ResolveLocksForRead(bo *retry.Backoffer, callerStartTS u
return lr.resolveLocks(bo, callerStartTS, locks, true, lite)
}

// RecordResolvingLocks records a txn which startTS is callerStartTS tries to resolve locks
// Call this when start trying to resolve locks
// Return a token which is used to call ResolvingLocksDone
func (lr *LockResolver) RecordResolvingLocks(locks []*Lock, callerStartTS uint64) int {
resolving := make([]Lock, 0, len(locks))
for _, lock := range locks {
resolving = append(resolving, *lock)
}
lr.mu.Lock()
lr.mu.resolvingConcurrency[callerStartTS]++
token := len(lr.mu.resolving[callerStartTS])
lr.mu.resolving[callerStartTS] = append(lr.mu.resolving[callerStartTS], resolving)
lr.mu.Unlock()
return token
}

// UpdateResolvingLocks update the lock resoling information of the txn `callerStartTS`
func (lr *LockResolver) UpdateResolvingLocks(locks []*Lock, callerStartTS uint64, token int) {
resolving := make([]Lock, 0, len(locks))
for _, lock := range locks {
resolving = append(resolving, *lock)
}
lr.mu.Lock()
lr.mu.resolving[callerStartTS][token] = resolving
lr.mu.Unlock()
}

// ResolveLocksDone will remove resolving locks information related with callerStartTS
func (lr *LockResolver) ResolveLocksDone(callerStartTS uint64, token int) {
lr.mu.Lock()
lr.mu.resolving[callerStartTS] = nil
lr.mu.resolvingConcurrency[callerStartTS]--
if lr.mu.resolvingConcurrency[callerStartTS] == 0 {
delete(lr.mu.resolving, callerStartTS)
delete(lr.mu.resolvingConcurrency, callerStartTS)
}
lr.mu.Unlock()
}

func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, callerStartTS uint64, locks []*Lock, forRead bool, lite bool) (int64, []uint64 /* canIgnore */, []uint64 /* canAccess */, error) {
if lr.testingKnobs.meetLock != nil {
lr.testingKnobs.meetLock(locks)
Expand Down Expand Up @@ -425,6 +481,26 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, callerStartTS uint64,
return msBeforeTxnExpired.value(), canIgnore, canAccess, nil
}

// Resolving returns the locks' information we are resolving currently.
func (lr *LockResolver) Resolving() []ResolvingLock {
result := []ResolvingLock{}
lr.mu.RLock()
defer lr.mu.RUnlock()
for txnID, items := range lr.mu.resolving {
for _, item := range items {
for _, lock := range item {
result = append(result, ResolvingLock{
TxnID: txnID,
LockTxnID: lock.TxnID,
Key: lock.Key,
Primary: lock.Primary,
})
}
}
}
return result
}

type txnExpireTime struct {
initialized bool
txnExpire int64
Expand Down
7 changes: 7 additions & 0 deletions txnkv/txnlock/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,10 @@ func (l LockResolverProbe) IsNonAsyncCommitLock(err error) bool {
_, ok := errors.Cause(err).(*nonAsyncCommitLock)
return ok
}

// SetResolving set the resolving lock status for LockResolver
func (l LockResolverProbe) SetResolving(currentStartTS uint64, locks []Lock) {
l.mu.Lock()
defer l.mu.Unlock()
l.mu.resolving[currentStartTS] = append(l.mu.resolving[currentStartTS], locks)
}
15 changes: 15 additions & 0 deletions txnkv/txnsnapshot/client_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,21 @@ func (ch *ClientHelper) ResolveLocks(bo *retry.Backoffer, callerStartTS uint64,
return msBeforeTxnExpired, nil
}

// UpdateResolvingLocks wraps the UpdateResolvingLocks function
func (ch *ClientHelper) UpdateResolvingLocks(locks []*txnlock.Lock, callerStartTS uint64, token int) {
ch.lockResolver.UpdateResolvingLocks(locks, callerStartTS, token)
}

// RecordResolvingLocks wraps the RecordResolvingLocks function
func (ch *ClientHelper) RecordResolvingLocks(locks []*txnlock.Lock, callerStartTS uint64) int {
return ch.lockResolver.RecordResolvingLocks(locks, callerStartTS)
}

// ResolveLocksDone wraps the ResolveLocksDone function
func (ch *ClientHelper) ResolveLocksDone(callerStartTS uint64, token int) {
ch.lockResolver.ResolveLocksDone(callerStartTS, token)
}

// SendReqCtx wraps the SendReqCtx function and use the resolved lock result in the kvrpcpb.Context.
func (ch *ClientHelper) SendReqCtx(bo *retry.Backoffer, req *tikvrpc.Request, regionID locate.RegionVerID, timeout time.Duration, et tikvrpc.EndpointType, directStoreAddr string, opts ...locate.StoreSelectorOption) (*tikvrpc.Response, *locate.RPCContext, string, error) {
sender := locate.NewRegionRequestSender(ch.regionCache, ch.client)
Expand Down
11 changes: 10 additions & 1 deletion txnkv/txnsnapshot/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func (s *Scanner) getData(bo *retry.Backoffer) error {
sender := locate.NewRegionRequestSender(s.snapshot.store.GetRegionCache(), s.snapshot.store.GetTiKVClient())
var reqEndKey, reqStartKey []byte
var loc *locate.KeyLocation
var resolvingRecordToken *int
var err error
for {
if !s.reverse {
Expand Down Expand Up @@ -293,7 +294,15 @@ func (s *Scanner) getData(bo *retry.Backoffer) error {
if err != nil {
return err
}
msBeforeExpired, err := txnlock.NewLockResolver(s.snapshot.store).ResolveLocks(bo, s.snapshot.version, []*txnlock.Lock{lock})
locks := []*txnlock.Lock{lock}
if resolvingRecordToken == nil {
token := s.snapshot.store.GetLockResolver().RecordResolvingLocks(locks, s.snapshot.version)
resolvingRecordToken = &token
defer s.snapshot.store.GetLockResolver().ResolveLocksDone(s.snapshot.version, *resolvingRecordToken)
} else {
s.snapshot.store.GetLockResolver().UpdateResolvingLocks(locks, s.snapshot.version, *resolvingRecordToken)
}
msBeforeExpired, err := s.snapshot.store.GetLockResolver().ResolveLocks(bo, s.snapshot.version, locks)
if err != nil {
return err
}
Expand Down
20 changes: 18 additions & 2 deletions txnkv/txnsnapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
s.mu.RUnlock()

pending := batch.keys
var resolvingRecordToken *int
for {
s.mu.RLock()
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdBatchGet, &kvrpcpb.BatchGetRequest{
Expand Down Expand Up @@ -450,6 +451,13 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
s.mergeExecDetail(batchGetResp.ExecDetailsV2)
}
if len(lockedKeys) > 0 {
if resolvingRecordToken == nil {
token := cli.RecordResolvingLocks(locks, s.version)
resolvingRecordToken = &token
defer cli.ResolveLocksDone(s.version, *resolvingRecordToken)
} else {
cli.UpdateResolvingLocks(locks, s.version, *resolvingRecordToken)
}
msBeforeExpired, err := cli.ResolveLocks(bo, s.version, locks)
if err != nil {
return err
Expand Down Expand Up @@ -565,6 +573,7 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([]
}

var firstLock *txnlock.Lock
var resolvingRecordToken *int
for {
util.EvalFailpoint("beforeSendPointGet")
loc, err := s.store.GetRegionCache().LocateKey(bo, k)
Expand Down Expand Up @@ -617,8 +626,15 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([]
cli.resolvedLocks.Put(lock.TxnID)
continue
}

msBeforeExpired, err := cli.ResolveLocks(bo, s.version, []*txnlock.Lock{lock})
locks := []*txnlock.Lock{lock}
if resolvingRecordToken == nil {
token := cli.RecordResolvingLocks(locks, s.version)
resolvingRecordToken = &token
defer cli.ResolveLocksDone(s.version, *resolvingRecordToken)
} else {
cli.UpdateResolvingLocks(locks, s.version, *resolvingRecordToken)
}
msBeforeExpired, err := cli.ResolveLocks(bo, s.version, locks)
if err != nil {
return nil, err
}
Expand Down