Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

restore: optimize SQL processing speed #110

Merged
merged 6 commits into from
Jan 2, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Lightning struct {
common.LogConfig
TableConcurrency int `toml:"table-concurrency" json:"table-concurrency"`
RegionConcurrency int `toml:"region-concurrency" json:"region-concurrency"`
IOConcurrency int `toml:"io-concurrency" json:"io-concurrency"`
ProfilePort int `toml:"pprof-port" json:"pprof-port"`
CheckRequirements bool `toml:"check-requirements" json:"check-requirements"`
}
Expand Down Expand Up @@ -129,6 +130,7 @@ func NewConfig() *Config {
App: Lightning{
RegionConcurrency: runtime.NumCPU(),
TableConcurrency: 8,
IOConcurrency: 5,
CheckRequirements: true,
},
TiDB: DBStore{
Expand Down
2 changes: 2 additions & 0 deletions lightning/config/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ const (
ReadBlockSize int64 = 64 * _K
MinRegionSize int64 = 256 * _M

BufferSizeScale = 5

// kv import
KVMaxBatchSize int64 = 200 * _G
)
27 changes: 27 additions & 0 deletions lightning/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,30 @@ var (
Buckets: prometheus.ExponentialBuckets(1024, 2, 8),
},
)
ChunkParserReadBlockSecondsHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "lightning",
Name: "chunk_parser_read_block_seconds",
Help: "time needed for chunk parser read a block",
Buckets: prometheus.ExponentialBuckets(0.001, 3.1622776601683795, 10),
},
)
ChunkParserReadRowSecondsHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "lightning",
Name: "chunk_parser_read_row_seconds",
Help: "time needed for chunk parser read a row",
Buckets: prometheus.ExponentialBuckets(0.001, 3.1622776601683795, 10),
},
)
ApplyWorkerSecondsHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "lightning",
Name: "apply_worker_seconds",
Help: "time needed to apply a worker",
Buckets: prometheus.ExponentialBuckets(0.001, 3.1622776601683795, 10),
}, []string{"name"},
)
BlockEncodeSecondsHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "lightning",
Expand Down Expand Up @@ -149,6 +173,9 @@ func init() {
prometheus.MustRegister(BlockDeliverSecondsHistogram)
prometheus.MustRegister(BlockDeliverBytesHistogram)
prometheus.MustRegister(ChecksumSecondsHistogram)
prometheus.MustRegister(ChunkParserReadRowSecondsHistogram)
prometheus.MustRegister(ChunkParserReadBlockSecondsHistogram)
prometheus.MustRegister(ApplyWorkerSecondsHistogram)
}

func RecordTableCount(status string, err error) {
Expand Down
20 changes: 17 additions & 3 deletions lightning/mydump/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,13 @@ package mydump
import (
"bytes"
"io"
"time"

"github.com/pkg/errors"

"github.com/pingcap/tidb-lightning/lightning/config"
"github.com/pingcap/tidb-lightning/lightning/metric"
"github.com/pingcap/tidb-lightning/lightning/worker"
)

// ChunkParser is a parser of the data files (the file containing only INSERT
Expand All @@ -29,6 +34,7 @@ type ChunkParser struct {
// cache
remainBuf *bytes.Buffer
appendBuf *bytes.Buffer
ioWorkers *worker.RestoreWorkerPool
}

// Chunk represents a portion of the data file.
Expand All @@ -46,12 +52,13 @@ type Row struct {
}

// NewChunkParser creates a new parser which can read chunks out of a file.
func NewChunkParser(reader io.Reader) *ChunkParser {
func NewChunkParser(reader io.Reader, blockBufSize int64, ioWorkers *worker.RestoreWorkerPool) *ChunkParser {
return &ChunkParser{
reader: reader,
blockBuf: make([]byte, 8192),
blockBuf: make([]byte, blockBufSize*config.BufferSizeScale),
remainBuf: &bytes.Buffer{},
appendBuf: &bytes.Buffer{},
ioWorkers: ioWorkers,
}
}

Expand Down Expand Up @@ -81,7 +88,13 @@ const (
)

func (parser *ChunkParser) readBlock() error {
n, err := io.ReadFull(parser.reader, parser.blockBuf)
startTime := time.Now()

// limit IO concurrency
w := parser.ioWorkers.Apply()
n, err := parser.reader.Read(parser.blockBuf)
parser.ioWorkers.Recycle(w)

switch err {
case io.ErrUnexpectedEOF, io.EOF:
parser.isLastChunk = true
Expand All @@ -95,6 +108,7 @@ func (parser *ChunkParser) readBlock() error {
parser.appendBuf.Write(parser.remainBuf.Bytes())
parser.appendBuf.Write(parser.blockBuf[:n])
parser.buf = parser.appendBuf.Bytes()
metric.ChunkParserReadBlockSecondsHistogram.Observe(time.Since(startTime).Seconds())
return nil
default:
return errors.Trace(err)
Expand Down
13 changes: 10 additions & 3 deletions lightning/mydump/parser_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package mydump_test

import (
"context"
"io"
"strings"

. "github.com/pingcap/check"
"github.com/pingcap/tidb-lightning/lightning/config"
"github.com/pingcap/tidb-lightning/lightning/mydump"
"github.com/pingcap/tidb-lightning/lightning/worker"

"github.com/pkg/errors"
)

Expand All @@ -24,7 +28,8 @@ func (s *testMydumpParserSuite) TestReadRow(c *C) {
"insert another_table values (10, 11, 12, '(13)', '(', 14, ')');",
)

parser := mydump.NewChunkParser(reader)
ioWorkers := worker.NewRestoreWorkerPool(context.Background(), 5, "test")
parser := mydump.NewChunkParser(reader, config.ReadBlockSize, ioWorkers)

c.Assert(parser.ReadRow(), IsNil)
c.Assert(parser.LastRow(), DeepEquals, mydump.Row{
Expand Down Expand Up @@ -72,7 +77,8 @@ func (s *testMydumpParserSuite) TestReadChunks(c *C) {
INSERT foo VALUES (29,30,31,32),(33,34,35,36);
`)

parser := mydump.NewChunkParser(reader)
ioWorkers := worker.NewRestoreWorkerPool(context.Background(), 5, "test")
parser := mydump.NewChunkParser(reader, config.ReadBlockSize, ioWorkers)

chunks, err := parser.ReadChunks(32)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -118,7 +124,8 @@ func (s *testMydumpParserSuite) TestNestedRow(c *C) {
("789",CONVERT("[]" USING UTF8MB4));
`)

parser := mydump.NewChunkParser(reader)
ioWorkers := worker.NewRestoreWorkerPool(context.Background(), 5, "test")
parser := mydump.NewChunkParser(reader, config.ReadBlockSize, ioWorkers)
chunks, err := parser.ReadChunks(96)

c.Assert(err, IsNil)
Expand Down
70 changes: 19 additions & 51 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/pingcap/tidb-lightning/lightning/metric"
"github.com/pingcap/tidb-lightning/lightning/mydump"
verify "github.com/pingcap/tidb-lightning/lightning/verification"
"github.com/pingcap/tidb-lightning/lightning/worker"

tidbcfg "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/util/kvencoder"
Expand Down Expand Up @@ -94,8 +96,9 @@ type RestoreController struct {
cfg *config.Config
dbMetas []*mydump.MDDatabaseMeta
dbInfos map[string]*TidbDBInfo
tableWorkers *RestoreWorkerPool
regionWorkers *RestoreWorkerPool
tableWorkers *worker.RestoreWorkerPool
regionWorkers *worker.RestoreWorkerPool
ioWorkers *worker.RestoreWorkerPool
importer *kv.Importer
tidbMgr *TiDBManager
postProcessLock sync.Mutex // a simple way to ensure post-processing is not concurrent without using complicated goroutines
Expand Down Expand Up @@ -128,8 +131,9 @@ func NewRestoreController(ctx context.Context, dbMetas []*mydump.MDDatabaseMeta,
rc := &RestoreController{
cfg: cfg,
dbMetas: dbMetas,
tableWorkers: NewRestoreWorkerPool(ctx, cfg.App.TableConcurrency, "table"),
regionWorkers: NewRestoreWorkerPool(ctx, cfg.App.RegionConcurrency, "region"),
tableWorkers: worker.NewRestoreWorkerPool(ctx, cfg.App.TableConcurrency, "table"),
regionWorkers: worker.NewRestoreWorkerPool(ctx, cfg.App.RegionConcurrency, "region"),
ioWorkers: worker.NewRestoreWorkerPool(ctx, cfg.App.IOConcurrency, "io"),
importer: importer,
tidbMgr: tidbMgr,

Expand Down Expand Up @@ -438,9 +442,9 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error {
// Note: We still need tableWorkers to control the concurrency of tables. In the future, we will investigate more about
// the difference between restoring tables concurrently and restoring tables one by one.

worker := rc.tableWorkers.Apply()
restoreWorker := rc.tableWorkers.Apply()
wg.Add(1)
go func(w *RestoreWorker, t *TableRestore, cp *TableCheckpoint) {
go func(w *worker.RestoreWorker, t *TableRestore, cp *TableCheckpoint) {
defer wg.Done()

closedEngine, err := t.restore(ctx, rc, cp)
Expand All @@ -464,7 +468,7 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error {
}

err = t.postProcess(ctx, closedEngine, rc, cp)
}(worker, tr, cp)
}(restoreWorker, tr, cp)
}
}

Expand Down Expand Up @@ -547,15 +551,15 @@ func (t *TableRestore) restore(ctx context.Context, rc *RestoreController, cp *T
// 3. load kvs data (into kv deliver server)
// 4. flush kvs data (into tikv node)

cr, err := newChunkRestore(chunkIndex, chunk)
cr, err := newChunkRestore(chunkIndex, chunk, rc.cfg.Mydumper.ReadBlockSize, rc.ioWorkers)
if err != nil {
return nil, errors.Trace(err)
}
metric.ChunkCounter.WithLabelValues(metric.ChunkStatePending).Inc()

worker := rc.regionWorkers.Apply()
restoreWorker := rc.regionWorkers.Apply()
wg.Add(1)
go func(w *RestoreWorker, cr *chunkRestore) {
go func(w *worker.RestoreWorker, cr *chunkRestore) {
// Restore a chunk.
defer func() {
cr.close()
Expand All @@ -580,7 +584,7 @@ func (t *TableRestore) restore(ctx context.Context, rc *RestoreController, cp *T

handled := int(atomic.AddInt32(handledChunksCount, 1))
common.AppLogger.Infof("[%s] handled region count = %d (%s)", t.tableName, handled, common.Percent(handled, len(cp.Chunks)))
}(worker, cr)
}(restoreWorker, cr)
}

wg.Wait()
Expand Down Expand Up @@ -859,56 +863,18 @@ func (rc *RestoreController) getTables() []string {
return tables
}

////////////////////////////////////////////////////////////////

type RestoreWorkerPool struct {
limit int
workers chan *RestoreWorker
name string
}

type RestoreWorker struct {
ID int64
}

func NewRestoreWorkerPool(ctx context.Context, limit int, name string) *RestoreWorkerPool {
workers := make(chan *RestoreWorker, limit)
for i := 0; i < limit; i++ {
workers <- &RestoreWorker{ID: int64(i + 1)}
}

metric.IdleWorkersGauge.WithLabelValues(name).Set(float64(limit))
return &RestoreWorkerPool{
limit: limit,
workers: workers,
name: name,
}
}

func (pool *RestoreWorkerPool) Apply() *RestoreWorker {
worker := <-pool.workers
metric.IdleWorkersGauge.WithLabelValues(pool.name).Set(float64(len(pool.workers)))
return worker
}
func (pool *RestoreWorkerPool) Recycle(worker *RestoreWorker) {
pool.workers <- worker
metric.IdleWorkersGauge.WithLabelValues(pool.name).Set(float64(len(pool.workers)))
}

////////////////////////////////////////////////////////////////

type chunkRestore struct {
parser *mydump.ChunkParser
index int
chunk *ChunkCheckpoint
}

func newChunkRestore(index int, chunk *ChunkCheckpoint) (*chunkRestore, error) {
func newChunkRestore(index int, chunk *ChunkCheckpoint, blockBufSize int64, ioWorkers *worker.RestoreWorkerPool) (*chunkRestore, error) {
reader, err := os.Open(chunk.Key.Path)
if err != nil {
return nil, errors.Trace(err)
}
parser := mydump.NewChunkParser(reader)
parser := mydump.NewChunkParser(reader, blockBufSize, ioWorkers)

reader.Seek(chunk.Chunk.Offset, io.SeekStart)
parser.SetPos(chunk.Chunk.Offset, chunk.Chunk.PrevRowIDMax)
Expand Down Expand Up @@ -1354,6 +1320,7 @@ func (cr *chunkRestore) restore(
var sep byte = ' '
readLoop:
for cr.parser.Pos() < endOffset {
readRowStartTime := time.Now()
err := cr.parser.ReadRow()
switch errors.Cause(err) {
case nil:
Expand All @@ -1368,6 +1335,7 @@ func (cr *chunkRestore) restore(
buffer.WriteString(" VALUES ")
sep = ','
}
metric.ChunkParserReadRowSecondsHistogram.Observe(time.Since(readRowStartTime).Seconds())
lastRow := cr.parser.LastRow()
if cr.chunk.ShouldIncludeRowID {
buffer.Write(lastRow.Row[:len(lastRow.Row)-1])
Expand Down
44 changes: 44 additions & 0 deletions lightning/worker/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package worker
lonng marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"time"

"github.com/pingcap/tidb-lightning/lightning/metric"
)

type RestoreWorkerPool struct {
limit int
workers chan *RestoreWorker
name string
}

type RestoreWorker struct {
ID int64
}

func NewRestoreWorkerPool(ctx context.Context, limit int, name string) *RestoreWorkerPool {
workers := make(chan *RestoreWorker, limit)
for i := 0; i < limit; i++ {
workers <- &RestoreWorker{ID: int64(i + 1)}
}

metric.IdleWorkersGauge.WithLabelValues(name).Set(float64(limit))
return &RestoreWorkerPool{
limit: limit,
workers: workers,
name: name,
}
}

func (pool *RestoreWorkerPool) Apply() *RestoreWorker {
start := time.Now()
worker := <-pool.workers
metric.IdleWorkersGauge.WithLabelValues(pool.name).Set(float64(len(pool.workers)))
metric.ApplyWorkerSecondsHistogram.WithLabelValues(pool.name).Observe(time.Since(start).Seconds())
return worker
}
func (pool *RestoreWorkerPool) Recycle(worker *RestoreWorker) {
pool.workers <- worker
metric.IdleWorkersGauge.WithLabelValues(pool.name).Set(float64(len(pool.workers)))
}
6 changes: 6 additions & 0 deletions tidb-lightning.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ table-concurrency = 8
# In mixed configuration, you can set it to 75% of the size of logical CPU cores.
# region-concurrency default to runtime.NumCPU()
# region-concurrency =
# io-concurrency controls the maximum IO concurrency
# Excessive IO concurrency causes an increase in IO latency because the disk
# internal buffer is frequently refreshed causing a cache miss. For different
lonng marked this conversation as resolved.
Show resolved Hide resolved
# disk media, concurrency has different effects on IO latency, which can be
# adjusted according to monitoring.
# io-concurrency = 5

# logging
level = "info"
Expand Down