Skip to content

Commit

Permalink
table/tables: add StateRemote interface for the cached table (#29152)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored Nov 25, 2021
1 parent dde1e0b commit 7f93b09
Show file tree
Hide file tree
Showing 3 changed files with 392 additions and 16 deletions.
10 changes: 5 additions & 5 deletions table/tables/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (c *cachedTable) UpdateLockForRead(store kv.Storage, ts uint64) error {
// Load data from original table and the update lock information.
tid := c.Meta().ID
lease := leaseFromTS(ts)
succ, err := c.handle.LockForRead(tid, ts, lease)
succ, err := c.handle.LockForRead(context.Background(), tid, ts, lease)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -198,7 +198,7 @@ func (c *cachedTable) AddRecord(ctx sessionctx.Context, r []types.Datum, opts ..
return nil, err
}
now := txn.StartTS()
err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now))
err = c.handle.LockForWrite(context.Background(), c.Meta().ID, now, leaseFromTS(now))
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -212,7 +212,7 @@ func (c *cachedTable) UpdateRecord(ctx context.Context, sctx sessionctx.Context,
return err
}
now := txn.StartTS()
err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now))
err = c.handle.LockForWrite(ctx, c.Meta().ID, now, leaseFromTS(now))
if err != nil {
return errors.Trace(err)
}
Expand All @@ -226,7 +226,7 @@ func (c *cachedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type
return err
}
now := txn.StartTS()
err = c.handle.LockForWrite(c.Meta().ID, now, leaseFromTS(now))
err = c.handle.LockForWrite(context.Background(), c.Meta().ID, now, leaseFromTS(now))
if err != nil {
return errors.Trace(err)
}
Expand All @@ -237,7 +237,7 @@ func (c *cachedTable) renewLease(ts uint64, op RenewLeaseType, data *cacheData)
return func() {
tid := c.Meta().ID
lease := leaseFromTS(ts)
succ, err := c.handle.RenewLease(tid, ts, lease, op)
succ, err := c.handle.RenewLease(context.Background(), tid, ts, lease, op)
if err != nil {
log.Warn("Renew read lease error")
}
Expand Down
270 changes: 259 additions & 11 deletions table/tables/state_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
package tables

import (
"context"
"fmt"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/tikv/client-go/v2/oracle"
)

Expand All @@ -37,43 +42,66 @@ const (
CachedTableLockWrite
)

// StateRemote Indicates the remote status information of the read-write lock
func (l CachedTableLockType) String() string {
switch l {
case CachedTableLockNone:
return "NONE"
case CachedTableLockRead:
return "READ"
case CachedTableLockIntend:
return "INTEND"
case CachedTableLockWrite:
return "WRITE"
}
panic("invalid CachedTableLockType value")
}

// StateRemote is the interface to control the remote state of the cached table's lock meta information.
type StateRemote interface {
// Load obtain the corresponding lock type and lease value according to the tableID
Load(tid int64) (CachedTableLockType, uint64, error)

// LockForRead try to add a read lock to the table with the specified tableID
LockForRead(tid int64, now, ts uint64) (bool, error)
Load(ctx context.Context, tid int64) (CachedTableLockType, uint64, error)

// LockForRead try to add a read lock to the table with the specified tableID.
// If this operation succeed, according to the protocol, the TiKV data will not be
// modified until the lease expire. It's safe for the caller to load the table data,
// cache and use the data.
// The parameter `now` means the current tso. Because the tso is get from PD, in
// the TiDB side, its value lags behind the real one forever, this doesn't matter.
// Because `now` is only used to clean up the orphan lock, as long as it's smaller
// than the real one, the correctness of the algorithm is not violated.
LockForRead(ctx context.Context, tid int64, now, lease uint64) (bool, error)

// LockForWrite try to add a write lock to the table with the specified tableID
LockForWrite(tid int64, now, ts uint64) error
LockForWrite(ctx context.Context, tid int64, now, ts uint64) error

// RenewLease attempt to renew the read / write lock on the table with the specified tableID
RenewLease(tid int64, oldTs uint64, newTs uint64, op RenewLeaseType) (bool, error)
RenewLease(ctx context.Context, tid int64, oldTs uint64, newTs uint64, op RenewLeaseType) (bool, error)
}

// mockStateRemoteHandle implement the StateRemote interface.
type mockStateRemoteHandle struct {
ch chan remoteTask
}

func (r *mockStateRemoteHandle) Load(tid int64) (CachedTableLockType, uint64, error) {
var _ StateRemote = &mockStateRemoteHandle{}

func (r *mockStateRemoteHandle) Load(ctx context.Context, tid int64) (CachedTableLockType, uint64, error) {
op := &loadOP{tid: tid}
op.Add(1)
r.ch <- op
op.Wait()
return op.lockType, op.lease, op.err
}

func (r *mockStateRemoteHandle) LockForRead(tid int64, now, ts uint64) (bool, error) {
func (r *mockStateRemoteHandle) LockForRead(ctx context.Context, tid int64, now, ts uint64) (bool, error) {
op := &lockForReadOP{tid: tid, now: now, ts: ts}
op.Add(1)
r.ch <- op
op.Wait()
return op.succ, op.err
}

func (r *mockStateRemoteHandle) LockForWrite(tid int64, now, ts uint64) error {
func (r *mockStateRemoteHandle) LockForWrite(ctx context.Context, tid int64, now, ts uint64) error {
op := &lockForWriteOP{tid: tid, now: now, ts: ts}
op.Add(1)
r.ch <- op
Expand Down Expand Up @@ -101,7 +129,7 @@ func (r *mockStateRemoteHandle) LockForWrite(tid int64, now, ts uint64) error {
return op.err
}

func (r *mockStateRemoteHandle) RenewLease(tid int64, oldTs uint64, newTs uint64, op RenewLeaseType) (bool, error) {
func (r *mockStateRemoteHandle) RenewLease(ctx context.Context, tid int64, oldTs uint64, newTs uint64, op RenewLeaseType) (bool, error) {
switch op {
case RenewReadLease:
op := &renewLeaseForReadOP{tid: tid, oldTs: oldTs, newTs: newTs}
Expand Down Expand Up @@ -326,5 +354,225 @@ func (r *mockStateRemoteData) renewLeaseForRead(tid int64, oldTs uint64, newTs u
return true, nil
}
return false, errors.New("The new lease is smaller than the old lease is an illegal contract renewal operation")
}

type sqlExec interface {
AffectedRows() uint64
ExecuteInternal(context.Context, string, ...interface{}) (sqlexec.RecordSet, error)
GetStore() kv.Storage
}

type stateRemoteHandle struct {
exec sqlExec
sync.Mutex
}

// NewStateRemote creates a StateRemote object.
func NewStateRemote(exec sqlExec) *stateRemoteHandle {
return &stateRemoteHandle{
exec: exec,
}
}

var _ StateRemote = &stateRemoteHandle{}

func (h *stateRemoteHandle) Load(ctx context.Context, tid int64) (CachedTableLockType, uint64, error) {
lockType, lease, _, err := h.loadRow(ctx, tid)
return lockType, lease, err
}

// LockForRead try to lock the table, if this operation succeed, the remote data
// is "read locked" and will not be modified according to the protocol, until the lease expire.
func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, now, ts uint64) ( /*succ*/ bool, error) {
h.Lock()
defer h.Unlock()
succ := false
err := h.runInTxn(ctx, func(ctx context.Context) error {
lockType, lease, _, err := h.loadRow(ctx, tid)
if err != nil {
return errors.Trace(err)
}
// The old lock is outdated, clear orphan lock.
if now > lease {
succ = true
if err := h.updateRow(ctx, tid, "READ", ts); err != nil {
return errors.Trace(err)
}
return nil
}

switch lockType {
case CachedTableLockNone:
case CachedTableLockRead:
case CachedTableLockWrite, CachedTableLockIntend:
return nil
}
succ = true
if ts > lease { // Note the check, don't decrease lease value!
if err := h.updateRow(ctx, tid, "READ", ts); err != nil {
return errors.Trace(err)
}
}

return nil
})
return succ, err
}

func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64, now, ts uint64) error {
h.Lock()
defer h.Unlock()
for {
waitAndRetry, err := h.lockForWriteOnce(ctx, tid, now, ts)
if err != nil {
return err
}
if waitAndRetry == 0 {
break
}

time.Sleep(waitAndRetry)
store := h.exec.GetStore()
o := store.GetOracle()
newTS, err := o.GetTimestamp(ctx, &oracle.Option{TxnScope: kv.GlobalTxnScope})
if err != nil {
return errors.Trace(err)
}
now, ts = newTS, leaseFromTS(newTS)
}
return nil
}

func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, now, ts uint64) (waitAndRetry time.Duration, err error) {
err = h.runInTxn(ctx, func(ctx context.Context) error {
lockType, lease, oldReadLease, err := h.loadRow(ctx, tid)
if err != nil {
return errors.Trace(err)
}
// The lease is outdated, so lock is invalid, clear orphan lock of any kind.
if now > lease {
if err := h.updateRow(ctx, tid, "WRITE", ts); err != nil {
return errors.Trace(err)
}
return nil
}

// The lease is valid.
switch lockType {
case CachedTableLockNone:
if err = h.updateRow(ctx, tid, "WRITE", ts); err != nil {
return errors.Trace(err)
}
case CachedTableLockRead:
// Change from READ to INTEND
if _, err = h.execSQL(ctx, "update mysql.table_cache_meta set lock_type='INTEND', oldReadLease=%?, lease=%? where tid=%?", lease, ts, tid); err != nil {
return errors.Trace(err)
}
// Wait for lease to expire, and then retry.
waitAndRetry = waitForLeaseExpire(oldReadLease, now)
case CachedTableLockIntend, CachedTableLockWrite:
// `now` exceed `oldReadLease` means wait for READ lock lease is done, it's safe to read here.
if now > oldReadLease {
if lockType == CachedTableLockIntend {
if err = h.updateRow(ctx, tid, "WRITE", ts); err != nil {
return errors.Trace(err)
}
}
return nil
}
// Otherwise, the WRITE should wait for the READ lease expire.
// And then retry changing the lock to WRITE
waitAndRetry = waitForLeaseExpire(oldReadLease, now)
}
return nil
})

return
}

func waitForLeaseExpire(oldReadLease, now uint64) time.Duration {
if oldReadLease >= now {
t1 := oracle.GetTimeFromTS(oldReadLease)
t2 := oracle.GetTimeFromTS(now)
waitDuration := t1.Sub(t2)
return waitDuration
}
return 0
}

func (h *stateRemoteHandle) RenewLease(ctx context.Context, tid int64, now, newTs uint64, op RenewLeaseType) (bool, error) {
h.Lock()
defer h.Unlock()
// TODO: `now` should use the real current tso to check the old lease is not expired.
_, err := h.execSQL(ctx, "update mysql.table_cache_meta set lease = %? where tid = %? and lock_type ='READ'", newTs, tid)
if err != nil {
return false, errors.Trace(err)
}
succ := h.exec.AffectedRows() > 0
return succ, err
}

func (h *stateRemoteHandle) beginTxn(ctx context.Context) error {
_, err := h.execSQL(ctx, "begin")
return err
}

func (h *stateRemoteHandle) commitTxn(ctx context.Context) error {
_, err := h.execSQL(ctx, "commit")
return err
}

func (h *stateRemoteHandle) rollbackTxn(ctx context.Context) error {
_, err := h.execSQL(ctx, "rollback")
return err
}

func (h *stateRemoteHandle) runInTxn(ctx context.Context, fn func(ctx context.Context) error) error {
err := h.beginTxn(ctx)
if err != nil {
return errors.Trace(err)
}

err = fn(ctx)
if err != nil {
terror.Log(h.rollbackTxn(ctx))
return errors.Trace(err)
}

return h.commitTxn(ctx)
}

func (h *stateRemoteHandle) loadRow(ctx context.Context, tid int64) (CachedTableLockType, uint64, uint64, error) {
chunkRows, err := h.execSQL(ctx, "select lock_type, lease, oldReadLease from mysql.table_cache_meta where tid = %? for update", tid)
if err != nil {
return 0, 0, 0, errors.Trace(err)
}
if len(chunkRows) != 1 {
return 0, 0, 0, errors.Errorf("table_cache_meta tid not exist %d", tid)
}
col1 := chunkRows[0].GetEnum(0)
// Note, the MySQL enum value start from 1 rather than 0
lockType := CachedTableLockType(col1.Value - 1)
lease := chunkRows[0].GetUint64(1)
oldReadLease := chunkRows[0].GetUint64(2)
return lockType, lease, oldReadLease, nil
}

func (h *stateRemoteHandle) updateRow(ctx context.Context, tid int64, lockType string, lease uint64) error {
_, err := h.execSQL(ctx, "update mysql.table_cache_meta set lock_type = %?, lease = %? where tid = %?", lockType, lease, tid)
return err
}

func (h *stateRemoteHandle) execSQL(ctx context.Context, sql string, args ...interface{}) ([]chunk.Row, error) {
rs, err := h.exec.ExecuteInternal(ctx, sql, args...)
if rs != nil {
defer rs.Close()
}
if err != nil {
return nil, errors.Trace(err)
}
if rs != nil {
return sqlexec.DrainRecordSet(ctx, rs, 1)
}
return nil, nil
}
Loading

0 comments on commit 7f93b09

Please sign in to comment.