Skip to content

Commit

Permalink
executor: optimize point lock read process for rc (#36834)
Browse files Browse the repository at this point in the history
close #36833
  • Loading branch information
TonsnakeLin authored Sep 15, 2022
1 parent f80a42d commit e9a8577
Show file tree
Hide file tree
Showing 3 changed files with 273 additions and 35 deletions.
117 changes: 82 additions & 35 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,40 +254,44 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if err != nil {
return err
}
}

e.handleVal, err = e.get(ctx, e.idxKey)
if err != nil {
if !kv.ErrNotExist.Equal(err) {
return err
}
}

// also lock key if read consistency read a value
// TODO: pessimistic lock support lock-if-exist.
if lockNonExistIdxKey || len(e.handleVal) > 0 {
if !lockNonExistIdxKey {
err = e.lockKeyIfNeeded(ctx, e.idxKey)
if err != nil {
e.handleVal, err = e.get(ctx, e.idxKey)
if err != nil {
if !kv.ErrNotExist.Equal(err) {
return err
}
}
// Change the unique index LOCK into PUT record.
if e.lock && len(e.handleVal) > 0 {
if !e.txn.Valid() {
return kv.ErrInvalidTxn
}
memBuffer := e.txn.GetMemBuffer()
err = memBuffer.Set(e.idxKey, e.handleVal)
} else {
if e.lock {
e.handleVal, err = e.lockKeyIfExists(ctx, e.idxKey)
if err != nil {
return err
}
} else {
e.handleVal, err = e.get(ctx, e.idxKey)
if err != nil {
if !kv.ErrNotExist.Equal(err) {
return err
}
}
}
}

if len(e.handleVal) == 0 {
return nil
}

// Change the unique index LOCK into PUT record.
if e.lock {
if !e.txn.Valid() {
return kv.ErrInvalidTxn
}
memBuffer := e.txn.GetMemBuffer()
err = memBuffer.Set(e.idxKey, e.handleVal)
if err != nil {
return err
}
}

var iv kv.Handle
iv, err = tablecodec.DecodeHandleInUniqueIndexValue(e.handleVal, e.tblInfo.IsCommonHandle)
if err != nil {
Expand Down Expand Up @@ -353,17 +357,20 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {

func (e *PointGetExecutor) getAndLock(ctx context.Context, key kv.Key) (val []byte, err error) {
if e.ctx.GetSessionVars().IsPessimisticReadConsistency() {
// Only Lock the exist keys in RC isolation.
val, err = e.get(ctx, key)
if err != nil {
if !kv.ErrNotExist.Equal(err) {
// Only Lock the existing keys in RC isolation.
if e.lock {
val, err = e.lockKeyIfExists(ctx, key)
if err != nil {
return nil, err
}
return nil, nil
}
err = e.lockKeyIfNeeded(ctx, key)
if err != nil {
return nil, err
} else {
val, err = e.get(ctx, key)
if err != nil {
if !kv.ErrNotExist.Equal(err) {
return nil, err
}
return nil, nil
}
}
return val, nil
}
Expand All @@ -383,28 +390,68 @@ func (e *PointGetExecutor) getAndLock(ctx context.Context, key kv.Key) (val []by
}

func (e *PointGetExecutor) lockKeyIfNeeded(ctx context.Context, key []byte) error {
_, err := e.lockKeyBase(ctx, key, false)
return err
}

// lockKeyIfExists locks the key if needed, but won't lock the key if it doesn't exis.
// Returns the value of the key if the key exist.
func (e *PointGetExecutor) lockKeyIfExists(ctx context.Context, key []byte) ([]byte, error) {
return e.lockKeyBase(ctx, key, true)
}

func (e *PointGetExecutor) lockKeyBase(ctx context.Context,
key []byte,
LockOnlyIfExists bool) ([]byte, error) {
if len(key) == 0 {
return nil
return nil, nil
}

if e.lock {
seVars := e.ctx.GetSessionVars()
lockCtx, err := newLockCtx(e.ctx, e.lockWaitTime, 1)
if err != nil {
return err
return nil, err
}
lockCtx.LockOnlyIfExists = LockOnlyIfExists
lockCtx.InitReturnValues(1)
err = doLockKeys(ctx, e.ctx, lockCtx, key)
if err != nil {
return err
return nil, err
}
lockCtx.IterateValuesNotLocked(func(k, v []byte) {
seVars.TxnCtx.SetPessimisticLockCache(k, v)
})
if len(e.handleVal) > 0 {
seVars.TxnCtx.SetPessimisticLockCache(e.idxKey, e.handleVal)
}
if LockOnlyIfExists {
return e.getValueFromLockCtx(ctx, lockCtx, key)
}
}
return nil

return nil, nil
}

func (e *PointGetExecutor) getValueFromLockCtx(ctx context.Context,
lockCtx *kv.LockCtx,
key []byte) ([]byte, error) {
if val, ok := lockCtx.Values[string(key)]; ok {
if val.Exists {
return val.Value, nil
} else if val.AlreadyLocked {
val, err := e.get(ctx, key)
if err != nil {
if !kv.ErrNotExist.Equal(err) {
return nil, err
}
return nil, nil
}
return val, nil
}
}

return nil, nil
}

// get will first try to get from txn buffer, then check the pessimistic lock cache,
Expand Down
29 changes: 29 additions & 0 deletions store/mockstore/unistore/tikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ func (store *MVCCStore) PessimisticLock(reqCtx *requestCtx, req *kvrpcpb.Pessimi
regCtx.AcquireLatches(hashVals)
defer regCtx.ReleaseLatches(hashVals)

if req.LockOnlyIfExists && !req.ReturnValues {
return nil, errors.New("LockOnlyIfExists is set for LockKeys but ReturnValues is not set")
}
batch := store.dbWriter.NewWriteBatch(startTS, 0, reqCtx.rpcCtx)
var dup bool
for _, m := range mutations {
Expand Down Expand Up @@ -279,6 +282,9 @@ func (store *MVCCStore) PessimisticLock(reqCtx *requestCtx, req *kvrpcpb.Pessimi
if err1 != nil {
return nil, err1
}
if lock == nil {
continue
}
batch.PessimisticLock(m.Key, lock)
}
err = store.dbWriter.Write(batch)
Expand Down Expand Up @@ -588,6 +594,12 @@ func (store *MVCCStore) buildPessimisticLock(m *kvrpcpb.Mutation, item *badger.I
return nil, &kverrors.ErrKeyAlreadyExists{Key: m.Key}
}
}
if ok, err := doesNeedLock(item, req); !ok {
if err != nil {
return nil, err
}
return nil, nil
}
lock := &mvcc.Lock{
LockHdr: mvcc.LockHdr{
StartTS: req.StartVersion,
Expand Down Expand Up @@ -1822,3 +1834,20 @@ func (f *GCCompactionFilter) Guards() []badger.Guard {
baseGuard, raftGuard, metaGuard, metaExtraGuard, tableIndexGuard, tableExtraGuard,
}
}

func doesNeedLock(item *badger.Item,
req *kvrpcpb.PessimisticLockRequest) (bool, error) {
if req.LockOnlyIfExists {
if item == nil {
return false, nil
}
val, err := item.Value()
if err != nil {
return false, err
}
if len(val) == 0 {
return false, nil
}
}
return true, nil
}
Loading

0 comments on commit e9a8577

Please sign in to comment.