Skip to content

Commit

Permalink
Merge branch 'master' into ddl-multiple-targets
Browse files Browse the repository at this point in the history
  • Loading branch information
hicqu authored Dec 24, 2021
2 parents bc791a7 + 404895c commit daa23a3
Show file tree
Hide file tree
Showing 89 changed files with 2,518 additions and 1,477 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ coverage.out
*.iml
*.swp
*.log
*.test.bin
tags
profile.coverprofile
explain_test
Expand Down
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ devgotest: failpoint-enable
$(GOTEST) -ldflags '$(TEST_LDFLAGS)' $(EXTRA_TEST_ARGS) -cover $(PACKAGES_TIDB_TESTS) -check.p true > gotest.log || { $(FAILPOINT_DISABLE); grep -v '^\([[]20\|PASS:\|ok \)' 'gotest.log'; exit 1; }
@$(FAILPOINT_DISABLE)

ut: failpoint-enable tools/bin/ut
tools/bin/ut $(X);
@$(FAILPOINT_DISABLE)

gotest: failpoint-enable
@echo "Running in native mode."
@export log_level=info; export TZ='Asia/Shanghai'; \
Expand Down Expand Up @@ -220,6 +224,10 @@ failpoint-disable: tools/bin/failpoint-ctl
# Restoring gofail failpoints...
@$(FAILPOINT_DISABLE)

tools/bin/ut: tools/check/ut.go
cd tools/check; \
$(GO) build -o ../bin/ut ut.go

tools/bin/megacheck: tools/check/go.mod
cd tools/check; \
$(GO) build -o ../bin/megacheck honnef.co/go/tools/cmd/megacheck
Expand Down
15 changes: 9 additions & 6 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ type local struct {
duplicateDetection bool
duplicateDB *pebble.DB
errorMgr *errormanager.ErrorManager
}

var bufferPool = membuf.NewPool(1024, manual.Allocator{})
bufferPool *membuf.Pool
}

func openDuplicateDB(storeDir string) (*pebble.DB, error) {
dbPath := filepath.Join(storeDir, duplicateDBName)
Expand Down Expand Up @@ -244,6 +244,8 @@ func NewLocalBackend(
checkTiKVAvaliable: cfg.App.CheckRequirements,
duplicateDB: duplicateDB,
errorMgr: errorMgr,

bufferPool: membuf.NewPool(membuf.WithAllocator(manual.Allocator{})),
}
local.conns = common.NewGRPCConns()
if err = local.checkMultiIngestSupport(ctx); err != nil {
Expand Down Expand Up @@ -423,6 +425,7 @@ func (local *local) Close() {
engine.unlock()
}
local.conns.Close()
local.bufferPool.Destroy()

if local.duplicateDB != nil {
// Check whether there are duplicates.
Expand Down Expand Up @@ -776,7 +779,7 @@ func (local *local) WriteToTiKV(
requests = append(requests, req)
}

bytesBuf := bufferPool.NewBuffer()
bytesBuf := local.bufferPool.NewBuffer()
defer bytesBuf.Destroy()
pairs := make([]*sst.Pair, 0, local.batchWriteKVPairs)
count := 0
Expand Down Expand Up @@ -1664,14 +1667,14 @@ func (local *local) LocalWriter(ctx context.Context, cfg *backend.LocalWriterCon
return nil, errors.Errorf("could not find engine for %s", engineUUID.String())
}
engine := e.(*Engine)
return openLocalWriter(cfg, engine, local.localWriterMemCacheSize)
return openLocalWriter(cfg, engine, local.localWriterMemCacheSize, local.bufferPool.NewBuffer())
}

func openLocalWriter(cfg *backend.LocalWriterConfig, engine *Engine, cacheSize int64) (*Writer, error) {
func openLocalWriter(cfg *backend.LocalWriterConfig, engine *Engine, cacheSize int64, kvBuffer *membuf.Buffer) (*Writer, error) {
w := &Writer{
engine: engine,
memtableSizeLimit: cacheSize,
kvBuffer: bufferPool.NewBuffer(),
kvBuffer: kvBuffer,
isKVSorted: cfg.IsKVSorted,
isWriteBatchSorted: true,
}
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/restore"
Expand Down Expand Up @@ -357,7 +358,10 @@ func testLocalWriter(c *C, needSort bool, partitialSort bool) {
f.wg.Add(1)
go f.ingestSSTLoop()
sorted := needSort && !partitialSort
w, err := openLocalWriter(&backend.LocalWriterConfig{IsKVSorted: sorted}, f, 1024)
pool := membuf.NewPool()
defer pool.Destroy()
kvBuffer := pool.NewBuffer()
w, err := openLocalWriter(&backend.LocalWriterConfig{IsKVSorted: sorted}, f, 1024, kvBuffer)
c.Assert(err, IsNil)

ctx := context.Background()
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ func newTestClient(
}
}

// ScatterRegions scatters regions in a batch.
func (c *testClient) ScatterRegions(ctx context.Context, regionInfo []*restore.RegionInfo) error {
return nil
}

func (c *testClient) GetAllRegions() map[uint64]*restore.RegionInfo {
c.mu.RLock()
defer c.mu.RUnlock()
Expand Down
81 changes: 63 additions & 18 deletions br/pkg/membuf/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

package membuf

const bigValueSize = 1 << 16 // 64K

var allocBufLen = 1 << 20 // 1M
const (
defaultPoolSize = 1024
defaultBlockSize = 1 << 20 // 1M
defaultLargeAllocThreshold = 1 << 16 // 64K
)

// Allocator is the abstract interface for allocating and freeing memory.
type Allocator interface {
Expand All @@ -38,30 +40,71 @@ func (stdAllocator) Free(_ []byte) {}
// garbage collector which always release the memory so late. Use a fixed size chan to reuse
// can decrease the memory usage to 1/3 compare with sync.Pool.
type Pool struct {
allocator Allocator
recycleCh chan []byte
allocator Allocator
blockSize int
blockCache chan []byte
largeAllocThreshold int
}

// Option configures a pool.
type Option func(p *Pool)

// WithPoolSize configures how many blocks cached by this pool.
func WithPoolSize(size int) Option {
return func(p *Pool) {
p.blockCache = make(chan []byte, size)
}
}

// WithBlockSize configures the size of each block.
func WithBlockSize(size int) Option {
return func(p *Pool) {
p.blockSize = size
}
}

// WithAllocator specifies the allocator used by pool to allocate and free memory.
func WithAllocator(allocator Allocator) Option {
return func(p *Pool) {
p.allocator = allocator
}
}

// WithLargeAllocThreshold configures the threshold for large allocation of a Buffer.
// If allocate size is larger than this threshold, bytes will be allocated directly
// by the make built-in function and won't be tracked by the pool.
func WithLargeAllocThreshold(threshold int) Option {
return func(p *Pool) {
p.largeAllocThreshold = threshold
}
}

// NewPool creates a new pool.
func NewPool(size int, allocator Allocator) *Pool {
return &Pool{
allocator: allocator,
recycleCh: make(chan []byte, size),
func NewPool(opts ...Option) *Pool {
p := &Pool{
allocator: stdAllocator{},
blockSize: defaultBlockSize,
blockCache: make(chan []byte, defaultPoolSize),
largeAllocThreshold: defaultLargeAllocThreshold,
}
for _, opt := range opts {
opt(p)
}
return p
}

func (p *Pool) acquire() []byte {
select {
case b := <-p.recycleCh:
case b := <-p.blockCache:
return b
default:
return p.allocator.Alloc(allocBufLen)
return p.allocator.Alloc(p.blockSize)
}
}

func (p *Pool) release(b []byte) {
select {
case p.recycleCh <- b:
case p.blockCache <- b:
default:
p.allocator.Free(b)
}
Expand All @@ -72,10 +115,12 @@ func (p *Pool) NewBuffer() *Buffer {
return &Buffer{pool: p, bufs: make([][]byte, 0, 128), curBufIdx: -1}
}

var globalPool = NewPool(1024, stdAllocator{})

// NewBuffer creates a new buffer in global pool.
func NewBuffer() *Buffer { return globalPool.NewBuffer() }
func (p *Pool) Destroy() {
close(p.blockCache)
for b := range p.blockCache {
p.allocator.Free(b)
}
}

// Buffer represents the reuse buffer.
type Buffer struct {
Expand Down Expand Up @@ -123,12 +168,12 @@ func (b *Buffer) Destroy() {

// TotalSize represents the total memory size of this Buffer.
func (b *Buffer) TotalSize() int64 {
return int64(len(b.bufs) * allocBufLen)
return int64(len(b.bufs) * b.pool.blockSize)
}

// AllocBytes allocates bytes with the given length.
func (b *Buffer) AllocBytes(n int) []byte {
if n > bigValueSize {
if n > b.pool.largeAllocThreshold {
return make([]byte, n)
}
if b.curIdx+n > b.curBufLen {
Expand Down
20 changes: 14 additions & 6 deletions br/pkg/membuf/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ import (
"github.com/stretchr/testify/require"
)

func init() {
allocBufLen = 1024
}

type testAllocator struct {
allocs int
frees int
Expand All @@ -41,7 +37,13 @@ func (t *testAllocator) Free(_ []byte) {

func TestBufferPool(t *testing.T) {
allocator := &testAllocator{}
pool := NewPool(2, allocator)
pool := NewPool(
WithPoolSize(2),
WithAllocator(allocator),
WithBlockSize(1024),
WithLargeAllocThreshold(512),
)
defer pool.Destroy()

bytesBuf := pool.NewBuffer()
bytesBuf.AllocBytes(256)
Expand All @@ -53,6 +55,10 @@ func TestBufferPool(t *testing.T) {
bytesBuf.AllocBytes(767)
require.Equal(t, 2, allocator.allocs)

largeBytes := bytesBuf.AllocBytes(513)
require.Equal(t, 513, len(largeBytes))
require.Equal(t, 2, allocator.allocs)

require.Equal(t, 0, allocator.frees)
bytesBuf.Destroy()
require.Equal(t, 0, allocator.frees)
Expand All @@ -67,7 +73,9 @@ func TestBufferPool(t *testing.T) {
}

func TestBufferIsolation(t *testing.T) {
bytesBuf := NewBuffer()
pool := NewPool(WithBlockSize(1024))
defer pool.Destroy()
bytesBuf := pool.NewBuffer()
defer bytesBuf.Destroy()

b1 := bytesBuf.AllocBytes(16)
Expand Down
Loading

0 comments on commit daa23a3

Please sign in to comment.