diff --git a/lightning/config/config.go b/lightning/config/config.go index a1aed36b8..ba1bd4447 100644 --- a/lightning/config/config.go +++ b/lightning/config/config.go @@ -71,6 +71,7 @@ type Lightning struct { common.LogConfig TableConcurrency int `toml:"table-concurrency" json:"table-concurrency"` RegionConcurrency int `toml:"region-concurrency" json:"region-concurrency"` + IOConcurrency int `toml:"io-concurrency" json:"io-concurrency"` ProfilePort int `toml:"pprof-port" json:"pprof-port"` CheckRequirements bool `toml:"check-requirements" json:"check-requirements"` } @@ -129,6 +130,7 @@ func NewConfig() *Config { App: Lightning{ RegionConcurrency: runtime.NumCPU(), TableConcurrency: 8, + IOConcurrency: 5, CheckRequirements: true, }, TiDB: DBStore{ diff --git a/lightning/config/const.go b/lightning/config/const.go index 48c70441d..f7769a771 100644 --- a/lightning/config/const.go +++ b/lightning/config/const.go @@ -9,6 +9,8 @@ const ( ReadBlockSize int64 = 64 * _K MinRegionSize int64 = 256 * _M + BufferSizeScale = 5 + // kv import KVMaxBatchSize int64 = 200 * _G ) diff --git a/lightning/metric/metric.go b/lightning/metric/metric.go index b0856d8f2..f2d7135f7 100644 --- a/lightning/metric/metric.go +++ b/lightning/metric/metric.go @@ -102,6 +102,30 @@ var ( Buckets: prometheus.ExponentialBuckets(1024, 2, 8), }, ) + ChunkParserReadBlockSecondsHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "lightning", + Name: "chunk_parser_read_block_seconds", + Help: "time needed for chunk parser read a block", + Buckets: prometheus.ExponentialBuckets(0.001, 3.1622776601683795, 10), + }, + ) + ChunkParserReadRowSecondsHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "lightning", + Name: "chunk_parser_read_row_seconds", + Help: "time needed for chunk parser read a row", + Buckets: prometheus.ExponentialBuckets(0.001, 3.1622776601683795, 10), + }, + ) + ApplyWorkerSecondsHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "lightning", + Name: "apply_worker_seconds", + Help: "time needed to apply a worker", + Buckets: prometheus.ExponentialBuckets(0.001, 3.1622776601683795, 10), + }, []string{"name"}, + ) BlockEncodeSecondsHistogram = prometheus.NewHistogram( prometheus.HistogramOpts{ Namespace: "lightning", @@ -149,6 +173,9 @@ func init() { prometheus.MustRegister(BlockDeliverSecondsHistogram) prometheus.MustRegister(BlockDeliverBytesHistogram) prometheus.MustRegister(ChecksumSecondsHistogram) + prometheus.MustRegister(ChunkParserReadRowSecondsHistogram) + prometheus.MustRegister(ChunkParserReadBlockSecondsHistogram) + prometheus.MustRegister(ApplyWorkerSecondsHistogram) } func RecordTableCount(status string, err error) { diff --git a/lightning/mydump/parser.go b/lightning/mydump/parser.go index 74052a0c3..8a2d7598b 100644 --- a/lightning/mydump/parser.go +++ b/lightning/mydump/parser.go @@ -3,8 +3,13 @@ package mydump import ( "bytes" "io" + "time" "github.com/pkg/errors" + + "github.com/pingcap/tidb-lightning/lightning/config" + "github.com/pingcap/tidb-lightning/lightning/metric" + "github.com/pingcap/tidb-lightning/lightning/worker" ) // ChunkParser is a parser of the data files (the file containing only INSERT @@ -29,6 +34,7 @@ type ChunkParser struct { // cache remainBuf *bytes.Buffer appendBuf *bytes.Buffer + ioWorkers *worker.RestoreWorkerPool } // Chunk represents a portion of the data file. @@ -46,12 +52,13 @@ type Row struct { } // NewChunkParser creates a new parser which can read chunks out of a file. -func NewChunkParser(reader io.Reader) *ChunkParser { +func NewChunkParser(reader io.Reader, blockBufSize int64, ioWorkers *worker.RestoreWorkerPool) *ChunkParser { return &ChunkParser{ reader: reader, - blockBuf: make([]byte, 8192), + blockBuf: make([]byte, blockBufSize*config.BufferSizeScale), remainBuf: &bytes.Buffer{}, appendBuf: &bytes.Buffer{}, + ioWorkers: ioWorkers, } } @@ -81,7 +88,13 @@ const ( ) func (parser *ChunkParser) readBlock() error { - n, err := io.ReadFull(parser.reader, parser.blockBuf) + startTime := time.Now() + + // limit IO concurrency + w := parser.ioWorkers.Apply() + n, err := parser.reader.Read(parser.blockBuf) + parser.ioWorkers.Recycle(w) + switch err { case io.ErrUnexpectedEOF, io.EOF: parser.isLastChunk = true @@ -95,6 +108,7 @@ func (parser *ChunkParser) readBlock() error { parser.appendBuf.Write(parser.remainBuf.Bytes()) parser.appendBuf.Write(parser.blockBuf[:n]) parser.buf = parser.appendBuf.Bytes() + metric.ChunkParserReadBlockSecondsHistogram.Observe(time.Since(startTime).Seconds()) return nil default: return errors.Trace(err) diff --git a/lightning/mydump/parser_test.go b/lightning/mydump/parser_test.go index 165bc7220..68e1a8781 100644 --- a/lightning/mydump/parser_test.go +++ b/lightning/mydump/parser_test.go @@ -1,11 +1,15 @@ package mydump_test import ( + "context" "io" "strings" . "github.com/pingcap/check" + "github.com/pingcap/tidb-lightning/lightning/config" "github.com/pingcap/tidb-lightning/lightning/mydump" + "github.com/pingcap/tidb-lightning/lightning/worker" + "github.com/pkg/errors" ) @@ -24,7 +28,8 @@ func (s *testMydumpParserSuite) TestReadRow(c *C) { "insert another_table values (10, 11, 12, '(13)', '(', 14, ')');", ) - parser := mydump.NewChunkParser(reader) + ioWorkers := worker.NewRestoreWorkerPool(context.Background(), 5, "test") + parser := mydump.NewChunkParser(reader, config.ReadBlockSize, ioWorkers) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ @@ -72,7 +77,8 @@ func (s *testMydumpParserSuite) TestReadChunks(c *C) { INSERT foo VALUES (29,30,31,32),(33,34,35,36); `) - parser := mydump.NewChunkParser(reader) + ioWorkers := worker.NewRestoreWorkerPool(context.Background(), 5, "test") + parser := mydump.NewChunkParser(reader, config.ReadBlockSize, ioWorkers) chunks, err := parser.ReadChunks(32) c.Assert(err, IsNil) @@ -118,7 +124,8 @@ func (s *testMydumpParserSuite) TestNestedRow(c *C) { ("789",CONVERT("[]" USING UTF8MB4)); `) - parser := mydump.NewChunkParser(reader) + ioWorkers := worker.NewRestoreWorkerPool(context.Background(), 5, "test") + parser := mydump.NewChunkParser(reader, config.ReadBlockSize, ioWorkers) chunks, err := parser.ReadChunks(96) c.Assert(err, IsNil) diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index fce2ed9b0..363c76b0a 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -24,6 +24,8 @@ import ( "github.com/pingcap/tidb-lightning/lightning/metric" "github.com/pingcap/tidb-lightning/lightning/mydump" verify "github.com/pingcap/tidb-lightning/lightning/verification" + "github.com/pingcap/tidb-lightning/lightning/worker" + tidbcfg "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/util/kvencoder" @@ -94,8 +96,9 @@ type RestoreController struct { cfg *config.Config dbMetas []*mydump.MDDatabaseMeta dbInfos map[string]*TidbDBInfo - tableWorkers *RestoreWorkerPool - regionWorkers *RestoreWorkerPool + tableWorkers *worker.RestoreWorkerPool + regionWorkers *worker.RestoreWorkerPool + ioWorkers *worker.RestoreWorkerPool importer *kv.Importer tidbMgr *TiDBManager postProcessLock sync.Mutex // a simple way to ensure post-processing is not concurrent without using complicated goroutines @@ -128,8 +131,9 @@ func NewRestoreController(ctx context.Context, dbMetas []*mydump.MDDatabaseMeta, rc := &RestoreController{ cfg: cfg, dbMetas: dbMetas, - tableWorkers: NewRestoreWorkerPool(ctx, cfg.App.TableConcurrency, "table"), - regionWorkers: NewRestoreWorkerPool(ctx, cfg.App.RegionConcurrency, "region"), + tableWorkers: worker.NewRestoreWorkerPool(ctx, cfg.App.TableConcurrency, "table"), + regionWorkers: worker.NewRestoreWorkerPool(ctx, cfg.App.RegionConcurrency, "region"), + ioWorkers: worker.NewRestoreWorkerPool(ctx, cfg.App.IOConcurrency, "io"), importer: importer, tidbMgr: tidbMgr, @@ -438,9 +442,9 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error { // Note: We still need tableWorkers to control the concurrency of tables. In the future, we will investigate more about // the difference between restoring tables concurrently and restoring tables one by one. - worker := rc.tableWorkers.Apply() + restoreWorker := rc.tableWorkers.Apply() wg.Add(1) - go func(w *RestoreWorker, t *TableRestore, cp *TableCheckpoint) { + go func(w *worker.RestoreWorker, t *TableRestore, cp *TableCheckpoint) { defer wg.Done() closedEngine, err := t.restore(ctx, rc, cp) @@ -464,7 +468,7 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error { } err = t.postProcess(ctx, closedEngine, rc, cp) - }(worker, tr, cp) + }(restoreWorker, tr, cp) } } @@ -547,15 +551,15 @@ func (t *TableRestore) restore(ctx context.Context, rc *RestoreController, cp *T // 3. load kvs data (into kv deliver server) // 4. flush kvs data (into tikv node) - cr, err := newChunkRestore(chunkIndex, chunk) + cr, err := newChunkRestore(chunkIndex, chunk, rc.cfg.Mydumper.ReadBlockSize, rc.ioWorkers) if err != nil { return nil, errors.Trace(err) } metric.ChunkCounter.WithLabelValues(metric.ChunkStatePending).Inc() - worker := rc.regionWorkers.Apply() + restoreWorker := rc.regionWorkers.Apply() wg.Add(1) - go func(w *RestoreWorker, cr *chunkRestore) { + go func(w *worker.RestoreWorker, cr *chunkRestore) { // Restore a chunk. defer func() { cr.close() @@ -580,7 +584,7 @@ func (t *TableRestore) restore(ctx context.Context, rc *RestoreController, cp *T handled := int(atomic.AddInt32(handledChunksCount, 1)) common.AppLogger.Infof("[%s] handled region count = %d (%s)", t.tableName, handled, common.Percent(handled, len(cp.Chunks))) - }(worker, cr) + }(restoreWorker, cr) } wg.Wait() @@ -859,56 +863,18 @@ func (rc *RestoreController) getTables() []string { return tables } -//////////////////////////////////////////////////////////////// - -type RestoreWorkerPool struct { - limit int - workers chan *RestoreWorker - name string -} - -type RestoreWorker struct { - ID int64 -} - -func NewRestoreWorkerPool(ctx context.Context, limit int, name string) *RestoreWorkerPool { - workers := make(chan *RestoreWorker, limit) - for i := 0; i < limit; i++ { - workers <- &RestoreWorker{ID: int64(i + 1)} - } - - metric.IdleWorkersGauge.WithLabelValues(name).Set(float64(limit)) - return &RestoreWorkerPool{ - limit: limit, - workers: workers, - name: name, - } -} - -func (pool *RestoreWorkerPool) Apply() *RestoreWorker { - worker := <-pool.workers - metric.IdleWorkersGauge.WithLabelValues(pool.name).Set(float64(len(pool.workers))) - return worker -} -func (pool *RestoreWorkerPool) Recycle(worker *RestoreWorker) { - pool.workers <- worker - metric.IdleWorkersGauge.WithLabelValues(pool.name).Set(float64(len(pool.workers))) -} - -//////////////////////////////////////////////////////////////// - type chunkRestore struct { parser *mydump.ChunkParser index int chunk *ChunkCheckpoint } -func newChunkRestore(index int, chunk *ChunkCheckpoint) (*chunkRestore, error) { +func newChunkRestore(index int, chunk *ChunkCheckpoint, blockBufSize int64, ioWorkers *worker.RestoreWorkerPool) (*chunkRestore, error) { reader, err := os.Open(chunk.Key.Path) if err != nil { return nil, errors.Trace(err) } - parser := mydump.NewChunkParser(reader) + parser := mydump.NewChunkParser(reader, blockBufSize, ioWorkers) reader.Seek(chunk.Chunk.Offset, io.SeekStart) parser.SetPos(chunk.Chunk.Offset, chunk.Chunk.PrevRowIDMax) @@ -1354,6 +1320,7 @@ func (cr *chunkRestore) restore( var sep byte = ' ' readLoop: for cr.parser.Pos() < endOffset { + readRowStartTime := time.Now() err := cr.parser.ReadRow() switch errors.Cause(err) { case nil: @@ -1368,6 +1335,7 @@ func (cr *chunkRestore) restore( buffer.WriteString(" VALUES ") sep = ',' } + metric.ChunkParserReadRowSecondsHistogram.Observe(time.Since(readRowStartTime).Seconds()) lastRow := cr.parser.LastRow() if cr.chunk.ShouldIncludeRowID { buffer.Write(lastRow.Row[:len(lastRow.Row)-1]) diff --git a/lightning/worker/worker.go b/lightning/worker/worker.go new file mode 100644 index 000000000..0ffa36942 --- /dev/null +++ b/lightning/worker/worker.go @@ -0,0 +1,49 @@ +package worker + +import ( + "context" + "time" + + "github.com/pingcap/tidb-lightning/lightning/metric" +) + +type RestoreWorkerPool struct { + limit int + workers chan *RestoreWorker + name string +} + +type RestoreWorker struct { + ID int64 +} + +func NewRestoreWorkerPool(ctx context.Context, limit int, name string) *RestoreWorkerPool { + workers := make(chan *RestoreWorker, limit) + for i := 0; i < limit; i++ { + workers <- &RestoreWorker{ID: int64(i + 1)} + } + + metric.IdleWorkersGauge.WithLabelValues(name).Set(float64(limit)) + return &RestoreWorkerPool{ + limit: limit, + workers: workers, + name: name, + } +} + +func (pool *RestoreWorkerPool) Apply() *RestoreWorker { + start := time.Now() + worker := <-pool.workers + metric.IdleWorkersGauge.WithLabelValues(pool.name).Set(float64(len(pool.workers))) + metric.ApplyWorkerSecondsHistogram.WithLabelValues(pool.name).Observe(time.Since(start).Seconds()) + return worker +} + +func (pool *RestoreWorkerPool) Recycle(worker *RestoreWorker) { + pool.workers <- worker + metric.IdleWorkersGauge.WithLabelValues(pool.name).Set(float64(len(pool.workers))) +} + +func (pool *RestoreWorkerPool) HasWorker() bool { + return len(pool.workers) > 0 +} diff --git a/lightning/worker/worker_test.go b/lightning/worker/worker_test.go new file mode 100644 index 000000000..7ca740997 --- /dev/null +++ b/lightning/worker/worker_test.go @@ -0,0 +1,40 @@ +package worker_test + +import ( + "context" + "testing" + + . "github.com/pingcap/check" + "github.com/pingcap/tidb-lightning/lightning/worker" +) + +type testWorkerPool struct{} + +func (s *testWorkerPool) SetUpSuite(c *C) {} +func (s *testWorkerPool) TearDownSuite(c *C) {} + +var _ = Suite(&testWorkerPool{}) + +func TestNewRestoreWorkerPool(t *testing.T) { + TestingT(t) +} + +func (s *testWorkerPool) TestApplyRecycle(c *C) { + pool := worker.NewRestoreWorkerPool(context.Background(), 3, "test") + + w1, w2, w3 := pool.Apply(), pool.Apply(), pool.Apply() + c.Assert(w1.ID, Equals, int64(1)) + c.Assert(w2.ID, Equals, int64(2)) + c.Assert(w3.ID, Equals, int64(3)) + c.Assert(pool.HasWorker(), Equals, false) + + pool.Recycle(w3) + c.Assert(pool.HasWorker(), Equals, true) + c.Assert(pool.Apply(), Equals, w3) + pool.Recycle(w2) + c.Assert(pool.Apply(), Equals, w2) + pool.Recycle(w1) + c.Assert(pool.Apply(), Equals, w1) + + c.Assert(pool.HasWorker(), Equals, false) +} diff --git a/tidb-lightning.toml b/tidb-lightning.toml index 8b6ab77b8..9d15074be 100644 --- a/tidb-lightning.toml +++ b/tidb-lightning.toml @@ -13,6 +13,13 @@ table-concurrency = 8 # In mixed configuration, you can set it to 75% of the size of logical CPU cores. # region-concurrency default to runtime.NumCPU() # region-concurrency = +# io-concurrency controls the maximum IO concurrency +# Excessive IO concurrency causes an increase in IO latency because the disk +# internal buffer is frequently refreshed causing a cache miss. For different +# disk media, concurrency has different effects on IO latency, which can be +# adjusted according to monitoring. +# Ref: https://en.wikipedia.org/wiki/Disk_buffer#Read-ahead/read-behind +# io-concurrency = 5 # logging level = "info"