Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv,executor: redesign the latch scheduler #7711

Merged
merged 13 commits into from
Oct 9, 2018
3 changes: 1 addition & 2 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1808,8 +1808,7 @@ func (s *testBypassSuite) TestBypassLatch(c *C) {

// txn1 and txn2 data range do not overlap, but using latches result in txn conflict.
fn()
_, err = tk1.Exec("commit")
c.Assert(err, NotNil)
tk1.MustExec("commit")

tk1.MustExec("truncate table t")
fn()
Expand Down
215 changes: 136 additions & 79 deletions store/tikv/latch/latch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package latch

import (
"bytes"
"math/bits"
"sort"
"sync"
Expand All @@ -22,32 +23,26 @@ import (
"github.com/spaolacci/murmur3"
)

// latch stores a key's waiting transactions information.
type latch struct {
// Whether there is any transaction in waitingQueue except head.
hasMoreWaiting bool
// The startTS of the transaction which is the head of waiting transactions.
waitingQueueHead uint64
maxCommitTS uint64
sync.Mutex
}
type node struct {
slotID int
key []byte
maxCommitTS uint64
value *Lock

func (l *latch) isEmpty() bool {
return l.waitingQueueHead == 0 && !l.hasMoreWaiting
next *node
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use list.List?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. list.List will make unnecessary allocation.

Use

type Element struct {

    // The value stored with this element.
    Value interface{}
    // contains filtered or unexported fields
}

is similar to

type node struct {
    Value *nodeValue
}
type nodeValue {
    slotID int
    key []byte
    maxCommitTS uint64
    value *Lock
}
  1. list.List is a doubly linked list, while a single linked list is sufficient here.
  2. list data struct is simple and common enough to implement

}

func (l *latch) free() {
l.waitingQueueHead = 0
}

func (l *latch) refreshCommitTS(commitTS uint64) {
l.Lock()
defer l.Unlock()
l.maxCommitTS = mathutil.MaxUint64(commitTS, l.maxCommitTS)
// latch stores a key's waiting transactions information.
type latch struct {
queue *node
count int
waiting []*Lock
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not each node has a waiting queue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waiting queue is moved from each node to the latch for those reasons:

  1. nodes in the queue is inserted every now and then, if each node has is waiting queue, the queue would be created and destroyed. There will be more allocations, and it's less memory efficient.
  2. I have an assumption that the waiting queue would not be large, when a list is small enough, an array is very efficient.
  3. You may still remember the "first waiting one automatically become running" problem and the old code is complex enough to handle different states. If each node doesn't have the waiting queue, the problem could be avoid.
    @zhangjinpeng1987

sync.Mutex
}

// Lock is the locks' information required for a transaction.
type Lock struct {
keys [][]byte
// The slot IDs of the latches(keys) that a startTS must acquire before being able to processed.
requiredSlots []int
// The number of latches that the transaction has acquired. For status is stale, it include the
Expand Down Expand Up @@ -96,9 +91,20 @@ func (l *Lock) SetCommitTS(commitTS uint64) {
// but conceptually a latch is a queue, and a slot is an index to the queue
type Latches struct {
slots []latch
// The waiting queue for each slot(slotID => slice of Lock).
waitingQueues map[int][]*Lock
sync.RWMutex
}

type bytesSlice [][]byte

func (s bytesSlice) Len() int {
return len(s)
}

func (s bytesSlice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

func (s bytesSlice) Less(i, j int) bool {
return bytes.Compare(s[i], s[j]) < 0
}

// NewLatches create a Latches with fixed length,
Expand All @@ -107,14 +113,15 @@ func NewLatches(size uint) *Latches {
powerOfTwoSize := 1 << uint32(bits.Len32(uint32(size-1)))
slots := make([]latch, powerOfTwoSize)
return &Latches{
slots: slots,
waitingQueues: make(map[int][]*Lock),
slots: slots,
}
}

// genLock generates Lock for the transaction with startTS and keys.
func (latches *Latches) genLock(startTS uint64, keys [][]byte) *Lock {
sort.Sort(bytesSlice(keys))
return &Lock{
keys: keys,
requiredSlots: latches.genSlotIDs(keys),
acquiredCount: 0,
startTS: startTS,
Expand All @@ -126,17 +133,7 @@ func (latches *Latches) genSlotIDs(keys [][]byte) []int {
for _, key := range keys {
slots = append(slots, latches.slotID(key))
}
sort.Ints(slots)
if len(slots) <= 1 {
return slots
}
dedup := slots[:1]
for i := 1; i < len(slots); i++ {
if slots[i] != slots[i-1] {
dedup = append(dedup, slots[i])
}
}
return dedup
return slots
}

// slotID return slotID for current key.
Expand All @@ -150,8 +147,7 @@ func (latches *Latches) acquire(lock *Lock) acquireResult {
return acquireStale
}
for lock.acquiredCount < len(lock.requiredSlots) {
slotID := lock.requiredSlots[lock.acquiredCount]
status := latches.acquireSlot(slotID, lock)
status := latches.acquireSlot(lock)
if status != acquireSuccess {
return status
}
Expand All @@ -161,75 +157,136 @@ func (latches *Latches) acquire(lock *Lock) acquireResult {

// release releases all latches owned by the `lock` and returns the wakeup list.
// Preconditions: the caller must ensure the transaction's status is not locked.
func (latches *Latches) release(lock *Lock, commitTS uint64, wakeupList []*Lock) []*Lock {
func (latches *Latches) release(lock *Lock, wakeupList []*Lock) []*Lock {
wakeupList = wakeupList[:0]
for i := 0; i < lock.acquiredCount; i++ {
slotID := lock.requiredSlots[i]
if nextLock := latches.releaseSlot(slotID, commitTS); nextLock != nil {
for lock.acquiredCount > 0 {
if nextLock := latches.releaseSlot(lock); nextLock != nil {
wakeupList = append(wakeupList, nextLock)
}
}
return wakeupList
}

// refreshCommitTS refreshes commitTS for keys.
func (latches *Latches) refreshCommitTS(keys [][]byte, commitTS uint64) {
slotIDs := latches.genSlotIDs(keys)
for _, slotID := range slotIDs {
latches.slots[slotID].refreshCommitTS(commitTS)
}
}

func (latches *Latches) releaseSlot(slotID int, commitTS uint64) (nextLock *Lock) {
func (latches *Latches) releaseSlot(lock *Lock) (nextLock *Lock) {
key := lock.keys[lock.acquiredCount-1]
slotID := lock.requiredSlots[lock.acquiredCount-1]
latch := &latches.slots[slotID]
lock.acquiredCount--
latch.Lock()
defer latch.Unlock()
latch.maxCommitTS = mathutil.MaxUint64(latch.maxCommitTS, commitTS)
if !latch.hasMoreWaiting {
latch.free()

find := findNode(latch.queue, key)
if find.value != lock {
panic("releaseSlot wrong")
}
find.maxCommitTS = mathutil.MaxUint64(find.maxCommitTS, lock.commitTS)
find.value = nil
if len(latch.waiting) == 0 {
return nil
}
nextLock, latch.hasMoreWaiting = latches.popFromWaitingQueue(slotID)
latch.waitingQueueHead = nextLock.startTS
nextLock.acquiredCount++
if latch.maxCommitTS > nextLock.startTS {
nextLock.isStale = true

idx := 0
for i := 0; i < len(latch.waiting); i++ {
waiting := latch.waiting[i]
if bytes.Compare(waiting.keys[waiting.acquiredCount], key) == 0 {
nextLock = waiting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that there are more than 1 Locks in the waiting list have the same key?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible! you find a bug.
I should only wake up the first one.
Wake up the first one is in a FIFO manner, there are still room for improvement here to choose which one to wake up.

} else {
idx++
latch.waiting[idx] = waiting
}
}
return nextLock
}
latch.waiting = latch.waiting[:idx]

func (latches *Latches) popFromWaitingQueue(slotID int) (front *Lock, hasMoreWaiting bool) {
latches.Lock()
defer latches.Unlock()
waiting := latches.waitingQueues[slotID]
front = waiting[0]
if len(waiting) == 1 {
delete(latches.waitingQueues, slotID)
} else {
latches.waitingQueues[slotID] = waiting[1:]
hasMoreWaiting = true
if nextLock != nil && find.maxCommitTS > nextLock.startTS {
nextLock.isStale = true
}
return
return nextLock
}

func (latches *Latches) acquireSlot(slotID int, lock *Lock) acquireResult {
func (latches *Latches) acquireSlot(lock *Lock) acquireResult {
key := lock.keys[lock.acquiredCount]
slotID := lock.requiredSlots[lock.acquiredCount]
latch := &latches.slots[slotID]
latch.Lock()
defer latch.Unlock()
if latch.maxCommitTS > lock.startTS {

find := findNode(latch.queue, key)
if find == nil {
tmp := &node{
slotID: slotID,
key: key,
value: lock,
}
tmp.next = latch.queue
latch.queue = tmp
latch.count++

lock.acquiredCount++
return acquireSuccess
}

// Try to limits the memory usage.
if latch.count > latchListCount {
latch.recycle(lock.startTS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should recyclebe moved into if find == nil {}?

}

if find.maxCommitTS > lock.startTS {
lock.isStale = true
return acquireStale
}

if latch.isEmpty() {
latch.waitingQueueHead = lock.startTS
if find.value == nil {
find.value = lock
lock.acquiredCount++
return acquireSuccess
}

// Push the current transaction into waitingQueue.
latch.hasMoreWaiting = true
latches.Lock()
defer latches.Unlock()
latches.waitingQueues[slotID] = append(latches.waitingQueues[slotID], lock)
latch.waiting = append(latch.waiting, lock)
return acquireLocked
}

// recycle is not thread safe, the latch should acquire its lock before executing this function.
func (l *latch) recycle(currentTS uint64) {
if l.queue == nil {
return
}

prev := l.queue
curr := l.queue.next
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better initialize as prev = nil, curr = l.queue, so don't have to handle head node separately. Or you can consider using the indirect pointer hack suggested by Linus Torvalds https://medium.com/@bartobri/applying-the-linus-tarvolds-good-taste-coding-requirement-99749f37684a

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I found that trick myself before I saw any article about it. (I remember I wrote a blog post http://www.zenlife.tk/fake-list-head.md around that time)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Admire you.


// Handle list nodes.
for curr != nil {
if tsoSub(currentTS, curr.maxCommitTS) >= expireDuration && curr.value == nil {
l.count--
prev.next = curr.next
zhangjinpeng87 marked this conversation as resolved.
Show resolved Hide resolved
} else {
prev = curr
}
curr = curr.next
}

// Handle the head node.
if tsoSub(currentTS, l.queue.maxCommitTS) >= expireDuration {
l.queue = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No count--?

}
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need return

}

func (latches *Latches) recycle(currentTS uint64) {
for i := 0; i < len(latches.slots); i++ {
latch := &latches.slots[i]
latch.Lock()
latch.recycle(currentTS)
latch.Unlock()
}
}

func findNode(list *node, key []byte) *node {
for n := list; n != nil; n = n.next {
if bytes.Compare(n.key, key) == 0 {
return n
}
}
return nil
}
Loading