Skip to content

Commit

Permalink
Issue Number: ref pingcap#8854
Browse files Browse the repository at this point in the history
ID pools for global kill 32bit
  • Loading branch information
pingyu committed Apr 2, 2023
1 parent e6cf1ab commit 9f55ff3
Show file tree
Hide file tree
Showing 4 changed files with 828 additions and 0 deletions.
25 changes: 25 additions & 0 deletions util/globalconn/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "globalconn",
srcs = [
"globalconn.go",
"pool.go",
],
importpath = "github.com/pingcap/tidb/util/globalconn",
visibility = ["//visibility:public"],
deps = [
"@com_github_cznic_mathutil//:mathutil",
"@com_github_ngaut_sync2//:sync2",
],
)

go_test(
name = "globalconn_test",
srcs = ["pool_test.go"],
deps = [
":globalconn",
"@com_github_cznic_mathutil//:mathutil",
"@com_github_stretchr_testify//assert",
],
)
49 changes: 49 additions & 0 deletions util/globalconn/globalconn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package globalconn

// GlobalConnID is the global connection ID, providing UNIQUE connection IDs across the whole TiDB cluster.
// Used when GlobalKill feature is enable.
// See https://github.com/pingcap/tidb/blob/master/docs/design/2020-06-01-global-kill.md
// 32 bits version:
//
// 31 21 20 1 0
// +--------+------------------+------+
// |serverID| local connID |markup|
// | (11b) | (20b) | =0 |
// +--------+------------------+------+
//
// 64 bits version:
//
// 63 62 41 40 1 0
// +--+---------------------+--------------------------------------+------+
// | | serverId | local connId |markup|
// |=0| (22b) | (40b) | =1 |
// +--+---------------------+--------------------------------------+------+
const (
// MaxServerID32 is maximum serverID for 32bits global connection ID.
MaxServerID32 = 1<<11 - 1
// LocalConnIDBits32 is the number of bits of localConnID for 32bits global connection ID.
LocalConnIDBits32 = 20
// MaxLocalConnID32 is maximum localConnID for 32bits global connection ID.
MaxLocalConnID32 = 1<<LocalConnIDBits32 - 1

// MaxServerID64 is maximum serverID for 64bits global connection ID.
MaxServerID64 = 1<<22 - 1
// LocalConnIDBits64 is the number of bits of localConnID for 64bits global connection ID.
LocalConnIDBits64 = 40
// MaxLocalConnID64 is maximum localConnID for 64bits global connection ID.
MaxLocalConnID64 = 1<<LocalConnIDBits64 - 1
)
260 changes: 260 additions & 0 deletions util/globalconn/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package globalconn

import (
"fmt"
"math"
"runtime"
"sync"
"sync/atomic"

"github.com/cznic/mathutil"
"github.com/ngaut/sync2"
)

const (
// IDPoolInvalidValue indicates invalid value from IDPool.
IDPoolInvalidValue = math.MaxUint64
)

// IDPool is the pool allocating & deallocating IDs.
type IDPool interface {
fmt.Stringer
// Init initiates pool.
Init(sizeInBits uint32)
// Len returns length of available id's in pool.
// Note that Len() would return -1 when this method is NOT supported.
Len() int
// Put puts value to pool. "ok" is false when pool is full.
Put(val uint64) (ok bool)
// Get gets value from pool. "ok" is false when pool is empty.
Get() (val uint64, ok bool)
}

var (
_ IDPool = (*AutoIncPool)(nil)
_ IDPool = (*LockFreeCircularPool)(nil)
)

// AutoIncPool simply do auto-increment to allocate ID. Wrapping will happen.
type AutoIncPool struct {
lastID uint64
idMask uint64
tryCnt int

mu *sync.Mutex
existed map[uint64]struct{}
}

// Init initiates AutoIncPool.
func (p *AutoIncPool) Init(sizeInBits uint32) {
p.InitExt(sizeInBits, false, 1)
}

// InitExt initiates AutoIncPool with more parameters.
func (p *AutoIncPool) InitExt(sizeInBits uint32, checkExisted bool, tryCnt int) {
p.idMask = 1<<sizeInBits - 1
if checkExisted {
p.existed = make(map[uint64]struct{})
p.mu = &sync.Mutex{}
}
p.tryCnt = tryCnt
}

// Get id by auto-increment.
func (p *AutoIncPool) Get() (id uint64, ok bool) {
for i := 0; i < p.tryCnt; i++ {
id := atomic.AddUint64(&p.lastID, 1) & p.idMask
if p.existed != nil {
p.mu.Lock()
_, occupied := p.existed[id]
if occupied {
p.mu.Unlock()
continue
}
p.existed[id] = struct{}{}
p.mu.Unlock()
}
return id, true
}
return 0, false
}

// Put id back to pool.
func (p *AutoIncPool) Put(id uint64) (ok bool) {
if p.existed != nil {
p.mu.Lock()
delete(p.existed, id)
p.mu.Unlock()
}
return true
}

// Len implements IDPool interface.
func (p *AutoIncPool) Len() int {
if p.existed != nil {
p.mu.Lock()
length := len(p.existed)
p.mu.Unlock()
return length
}
return -1
}

// String implements IDPool interface.
func (p AutoIncPool) String() string {
return fmt.Sprintf("lastID: %v", p.lastID)
}

// LockFreeCircularPool is a lock-free circular implementation of IDPool.
// Note that to reduce memory usage, LockFreeCircularPool supports 32bits IDs ONLY.
type LockFreeCircularPool struct {
_ uint64 // align to 64bits
head sync2.AtomicUint32 // first available slot
_ uint32 // padding to avoid false sharing
tail sync2.AtomicUint32 // first empty slot. `head==tail` means empty.
_ uint32 // padding to avoid false sharing

cap uint32
slots []lockFreePoolItem
}

type lockFreePoolItem struct {
value uint32

// seq indicates read/write status
// Sequence:
// seq==tail: writable ---> doWrite,seq:=tail+1 ---> seq==head+1:written/readable ---> doRead,seq:=head+size
// ^ |
// +----------------------------------------------------------------------------------------+
// slot[i].seq: i(writable) ---> i+1(readable) ---> i+cap(writable) ---> i+cap+1(readable) ---> i+2*cap ---> ...
seq uint32
}

// Init implements IDPool interface.
func (p *LockFreeCircularPool) Init(sizeInBits uint32) {
p.InitExt(sizeInBits, 0)
}

// InitExt initializes LockFreeCircularPool with more parameters.
// fillCount: fills pool with [1, min(fillCount, 1<<(sizeInBits-1)]. Pass "math.MaxUint32" to fulfill the pool.
func (p *LockFreeCircularPool) InitExt(sizeInBits uint32, fillCount uint32) {
p.cap = 1 << sizeInBits
p.slots = make([]lockFreePoolItem, p.cap)

fillCount = mathutil.MinUint32(p.cap-1, fillCount)
var i uint32
for i = 0; i < fillCount; i++ {
p.slots[i] = lockFreePoolItem{value: i + 1, seq: i + 1}
}
for ; i < p.cap; i++ {
p.slots[i] = lockFreePoolItem{value: math.MaxUint32, seq: i}
}

p.head.Set(0)
p.tail.Set(fillCount)
}

// InitForTest used to unit test overflow of head & tail.
func (p *LockFreeCircularPool) InitForTest(head uint32, fillCount uint32) {
fillCount = mathutil.MinUint32(p.cap-1, fillCount)
var i uint32
for i = 0; i < fillCount; i++ {
p.slots[i] = lockFreePoolItem{value: i + 1, seq: head + i + 1}
}
for ; i < p.cap; i++ {
p.slots[i] = lockFreePoolItem{value: math.MaxUint32, seq: head + i}
}

p.head.Set(head)
p.tail.Set(head + fillCount)
}

// Len implements IDPool interface.
func (p *LockFreeCircularPool) Len() int {
return int(p.tail.Get() - p.head.Get())
}

// String implements IDPool interface.
// Notice: NOT thread safe.
func (p LockFreeCircularPool) String() string {
head := p.head.Get()
tail := p.tail.Get()
headSlot := &p.slots[head&(p.cap-1)]
tailSlot := &p.slots[tail&(p.cap-1)]
length := tail - head

return fmt.Sprintf("cap:%v, length:%v; head:%x, slot:{%x,%x}; tail:%x, slot:{%x,%x}",
p.cap, length, head, headSlot.value, headSlot.seq, tail, tailSlot.value, tailSlot.seq)
}

// Put implements IDPool interface.
func (p *LockFreeCircularPool) Put(val uint64) (ok bool) {
for {
tail := p.tail.Get() // `tail` should be loaded before `head`, to avoid "false full".
head := p.head.Get()

if tail-head == p.cap-1 { // full
return false
}

if !p.tail.CompareAndSwap(tail, tail+1) {
continue
}

slot := &p.slots[tail&(p.cap-1)]
for {
seq := atomic.LoadUint32(&slot.seq)

if seq == tail { // writable
slot.value = uint32(val)
atomic.StoreUint32(&slot.seq, tail+1)
return true
}

runtime.Gosched()
}
}
}

// Get implements IDPool interface.
func (p *LockFreeCircularPool) Get() (val uint64, ok bool) {
for {
head := p.head.Get()
tail := p.tail.Get()
if head == tail { // empty
return IDPoolInvalidValue, false
}

if !p.head.CompareAndSwap(head, head+1) {
continue
}

slot := &p.slots[head&(p.cap-1)]
for {
seq := atomic.LoadUint32(&slot.seq)

if seq == head+1 { // readable
val = uint64(slot.value)
slot.value = math.MaxUint32
atomic.StoreUint32(&slot.seq, head+p.cap)
return val, true
}

runtime.Gosched()
}
}
}
Loading

0 comments on commit 9f55ff3

Please sign in to comment.