Skip to content

Commit

Permalink
store/tikv,executor: redesign the latch scheduler (#7711)
Browse files Browse the repository at this point in the history
Check maxCommitTS on each key, instead of each slot, so hash collision
will not lead to transaction retry.
  • Loading branch information
tiancaiamao authored Oct 9, 2018
1 parent ee0d4d6 commit c19f8fb
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 98 deletions.
3 changes: 1 addition & 2 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1815,8 +1815,7 @@ func (s *testBypassSuite) TestBypassLatch(c *C) {

// txn1 and txn2 data range do not overlap, but using latches result in txn conflict.
fn()
_, err = tk1.Exec("commit")
c.Assert(err, NotNil)
tk1.MustExec("commit")

tk1.MustExec("truncate table t")
fn()
Expand Down
206 changes: 128 additions & 78 deletions store/tikv/latch/latch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package latch

import (
"bytes"
"math/bits"
"sort"
"sync"
Expand All @@ -22,32 +23,26 @@ import (
"github.com/spaolacci/murmur3"
)

// latch stores a key's waiting transactions information.
type latch struct {
// Whether there is any transaction in waitingQueue except head.
hasMoreWaiting bool
// The startTS of the transaction which is the head of waiting transactions.
waitingQueueHead uint64
maxCommitTS uint64
sync.Mutex
}
type node struct {
slotID int
key []byte
maxCommitTS uint64
value *Lock

func (l *latch) isEmpty() bool {
return l.waitingQueueHead == 0 && !l.hasMoreWaiting
next *node
}

func (l *latch) free() {
l.waitingQueueHead = 0
}

func (l *latch) refreshCommitTS(commitTS uint64) {
l.Lock()
defer l.Unlock()
l.maxCommitTS = mathutil.MaxUint64(commitTS, l.maxCommitTS)
// latch stores a key's waiting transactions information.
type latch struct {
queue *node
count int
waiting []*Lock
sync.Mutex
}

// Lock is the locks' information required for a transaction.
type Lock struct {
keys [][]byte
// The slot IDs of the latches(keys) that a startTS must acquire before being able to processed.
requiredSlots []int
// The number of latches that the transaction has acquired. For status is stale, it include the
Expand Down Expand Up @@ -96,9 +91,20 @@ func (l *Lock) SetCommitTS(commitTS uint64) {
// but conceptually a latch is a queue, and a slot is an index to the queue
type Latches struct {
slots []latch
// The waiting queue for each slot(slotID => slice of Lock).
waitingQueues map[int][]*Lock
sync.RWMutex
}

type bytesSlice [][]byte

func (s bytesSlice) Len() int {
return len(s)
}

func (s bytesSlice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

func (s bytesSlice) Less(i, j int) bool {
return bytes.Compare(s[i], s[j]) < 0
}

// NewLatches create a Latches with fixed length,
Expand All @@ -107,14 +113,15 @@ func NewLatches(size uint) *Latches {
powerOfTwoSize := 1 << uint32(bits.Len32(uint32(size-1)))
slots := make([]latch, powerOfTwoSize)
return &Latches{
slots: slots,
waitingQueues: make(map[int][]*Lock),
slots: slots,
}
}

// genLock generates Lock for the transaction with startTS and keys.
func (latches *Latches) genLock(startTS uint64, keys [][]byte) *Lock {
sort.Sort(bytesSlice(keys))
return &Lock{
keys: keys,
requiredSlots: latches.genSlotIDs(keys),
acquiredCount: 0,
startTS: startTS,
Expand All @@ -126,17 +133,7 @@ func (latches *Latches) genSlotIDs(keys [][]byte) []int {
for _, key := range keys {
slots = append(slots, latches.slotID(key))
}
sort.Ints(slots)
if len(slots) <= 1 {
return slots
}
dedup := slots[:1]
for i := 1; i < len(slots); i++ {
if slots[i] != slots[i-1] {
dedup = append(dedup, slots[i])
}
}
return dedup
return slots
}

// slotID return slotID for current key.
Expand All @@ -150,8 +147,7 @@ func (latches *Latches) acquire(lock *Lock) acquireResult {
return acquireStale
}
for lock.acquiredCount < len(lock.requiredSlots) {
slotID := lock.requiredSlots[lock.acquiredCount]
status := latches.acquireSlot(slotID, lock)
status := latches.acquireSlot(lock)
if status != acquireSuccess {
return status
}
Expand All @@ -161,75 +157,129 @@ func (latches *Latches) acquire(lock *Lock) acquireResult {

// release releases all latches owned by the `lock` and returns the wakeup list.
// Preconditions: the caller must ensure the transaction's status is not locked.
func (latches *Latches) release(lock *Lock, commitTS uint64, wakeupList []*Lock) []*Lock {
func (latches *Latches) release(lock *Lock, wakeupList []*Lock) []*Lock {
wakeupList = wakeupList[:0]
for i := 0; i < lock.acquiredCount; i++ {
slotID := lock.requiredSlots[i]
if nextLock := latches.releaseSlot(slotID, commitTS); nextLock != nil {
for lock.acquiredCount > 0 {
if nextLock := latches.releaseSlot(lock); nextLock != nil {
wakeupList = append(wakeupList, nextLock)
}
}
return wakeupList
}

// refreshCommitTS refreshes commitTS for keys.
func (latches *Latches) refreshCommitTS(keys [][]byte, commitTS uint64) {
slotIDs := latches.genSlotIDs(keys)
for _, slotID := range slotIDs {
latches.slots[slotID].refreshCommitTS(commitTS)
}
}

func (latches *Latches) releaseSlot(slotID int, commitTS uint64) (nextLock *Lock) {
func (latches *Latches) releaseSlot(lock *Lock) (nextLock *Lock) {
key := lock.keys[lock.acquiredCount-1]
slotID := lock.requiredSlots[lock.acquiredCount-1]
latch := &latches.slots[slotID]
lock.acquiredCount--
latch.Lock()
defer latch.Unlock()
latch.maxCommitTS = mathutil.MaxUint64(latch.maxCommitTS, commitTS)
if !latch.hasMoreWaiting {
latch.free()

find := findNode(latch.queue, key)
if find.value != lock {
panic("releaseSlot wrong")
}
find.maxCommitTS = mathutil.MaxUint64(find.maxCommitTS, lock.commitTS)
find.value = nil
if len(latch.waiting) == 0 {
return nil
}
nextLock, latch.hasMoreWaiting = latches.popFromWaitingQueue(slotID)
latch.waitingQueueHead = nextLock.startTS
nextLock.acquiredCount++
if latch.maxCommitTS > nextLock.startTS {
nextLock.isStale = true

var idx int
for idx = 0; idx < len(latch.waiting); idx++ {
waiting := latch.waiting[idx]
if bytes.Compare(waiting.keys[waiting.acquiredCount], key) == 0 {
break
}
}
return nextLock
}
// Wake up the first one in waiting queue.
if idx < len(latch.waiting) {
nextLock = latch.waiting[idx]
// Delete element latch.waiting[idx] from the array.
copy(latch.waiting[idx:], latch.waiting[idx+1:])
latch.waiting[len(latch.waiting)-1] = nil
latch.waiting = latch.waiting[:len(latch.waiting)-1]

func (latches *Latches) popFromWaitingQueue(slotID int) (front *Lock, hasMoreWaiting bool) {
latches.Lock()
defer latches.Unlock()
waiting := latches.waitingQueues[slotID]
front = waiting[0]
if len(waiting) == 1 {
delete(latches.waitingQueues, slotID)
} else {
latches.waitingQueues[slotID] = waiting[1:]
hasMoreWaiting = true
if find.maxCommitTS > nextLock.startTS {
nextLock.isStale = true
}
}

return
}

func (latches *Latches) acquireSlot(slotID int, lock *Lock) acquireResult {
func (latches *Latches) acquireSlot(lock *Lock) acquireResult {
key := lock.keys[lock.acquiredCount]
slotID := lock.requiredSlots[lock.acquiredCount]
latch := &latches.slots[slotID]
latch.Lock()
defer latch.Unlock()
if latch.maxCommitTS > lock.startTS {

// Try to recycle to limit the memory usage.
if latch.count >= latchListCount {
latch.recycle(lock.startTS)
}

find := findNode(latch.queue, key)
if find == nil {
tmp := &node{
slotID: slotID,
key: key,
value: lock,
}
tmp.next = latch.queue
latch.queue = tmp
latch.count++

lock.acquiredCount++
return acquireSuccess
}

if find.maxCommitTS > lock.startTS {
lock.isStale = true
return acquireStale
}

if latch.isEmpty() {
latch.waitingQueueHead = lock.startTS
if find.value == nil {
find.value = lock
lock.acquiredCount++
return acquireSuccess
}

// Push the current transaction into waitingQueue.
latch.hasMoreWaiting = true
latches.Lock()
defer latches.Unlock()
latches.waitingQueues[slotID] = append(latches.waitingQueues[slotID], lock)
latch.waiting = append(latch.waiting, lock)
return acquireLocked
}

// recycle is not thread safe, the latch should acquire its lock before executing this function.
func (l *latch) recycle(currentTS uint64) {
fakeHead := node{next: l.queue}
prev := &fakeHead
for curr := prev.next; curr != nil; curr = curr.next {
if tsoSub(currentTS, curr.maxCommitTS) >= expireDuration && curr.value == nil {
l.count--
prev.next = curr.next
} else {
prev = curr
}
}
l.queue = fakeHead.next
}

func (latches *Latches) recycle(currentTS uint64) {
for i := 0; i < len(latches.slots); i++ {
latch := &latches.slots[i]
latch.Lock()
latch.recycle(currentTS)
latch.Unlock()
}
}

func findNode(list *node, key []byte) *node {
for n := list; n != nil; n = n.next {
if bytes.Compare(n.key, key) == 0 {
return n
}
}
return nil
}
Loading

0 comments on commit c19f8fb

Please sign in to comment.