Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support concurrent apply dml binlog #1285

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions doc/command-line-flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ Why is this behavior configurable? Different workloads have different characteri

Noteworthy is that setting `--dml-batch-size` to higher value _does not_ mean `gh-ost` blocks or waits on writes. The batch size is an upper limit on transaction size, not a minimal one. If `gh-ost` doesn't have "enough" events in the pipe, it does not wait on the binary log, it just writes what it already has. This conveniently suggests that if write load is light enough for `gh-ost` to only see a few events in the binary log at a given time, then it is also light enough for `gh-ost` to apply a fraction of the batch size.

### dml-batch-concurrency-size

`gh-ost` reads event and concurrently applies them onto the _ghost_ table.

The `--dml-batch-concurrency-size` flag controls the number of concurrent workers that apply binlog dml event the batched writes, every worker apply dmlBatchSize event in a transaction. Allowed values are `1 - 100`. Default value is `1`. Concurrency in applying binlog is only allowed when the unique index column in the chunk is of int type. If there is a non-int column in the unique index, the concurrency is forced to be set to 1.

### exact-rowcount

A `gh-ost` execution need to copy whatever rows you have in your existing table onto the ghost table. This can and often will be, a large number. Exactly what that number is?
Expand Down
19 changes: 16 additions & 3 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ const (
)

const (
HTTPStatusOK = 200
MaxEventsBatchSize = 1000
ETAUnknown = math.MinInt64
HTTPStatusOK = 200
MaxEventsBatchSize = 1000
MaxEventsBatchConcurrencySize = 100
ETAUnknown = math.MinInt64
)

var (
Expand Down Expand Up @@ -195,6 +196,7 @@ type MigrationContext struct {
TotalRowsCopied int64
TotalDMLEventsApplied int64
DMLBatchSize int64
DMLBatchConcurrencySize int64
isThrottled bool
throttleReason string
throttleReasonHint ThrottleReasonHint
Expand Down Expand Up @@ -277,6 +279,7 @@ func NewMigrationContext() *MigrationContext {
MaxLagMillisecondsThrottleThreshold: 1500,
CutOverLockTimeoutSeconds: 3,
DMLBatchSize: 10,
DMLBatchConcurrencySize: 1,
etaNanoseonds: ETAUnknown,
maxLoad: NewLoadMap(),
criticalLoad: NewLoadMap(),
Expand Down Expand Up @@ -626,6 +629,16 @@ func (this *MigrationContext) SetDMLBatchSize(batchSize int64) {
atomic.StoreInt64(&this.DMLBatchSize, batchSize)
}

func (this *MigrationContext) SetDMLBatchConcurrencySize(batchConcurrencySize int64) {
if batchConcurrencySize < 1 {
batchConcurrencySize = 1
}
if batchConcurrencySize > MaxEventsBatchConcurrencySize {
batchConcurrencySize = MaxEventsBatchConcurrencySize
}
atomic.StoreInt64(&this.DMLBatchConcurrencySize, batchConcurrencySize)
}

func (this *MigrationContext) SetThrottleGeneralCheckResult(checkResult *ThrottleCheckResult) *ThrottleCheckResult {
this.throttleMutex.Lock()
defer this.throttleMutex.Unlock()
Expand Down
2 changes: 2 additions & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func main() {
exponentialBackoffMaxInterval := flag.Int64("exponential-backoff-max-interval", 64, "Maximum number of seconds to wait between attempts when performing various operations with exponential backoff.")
chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 10-100,000)")
dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-100)")
dmlBatchConcurrencySize := flag.Int64("dml-batch-concurrency-size", 1, "batch concurrency size for DML events to apply (range 1-100)")
defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking")
cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout)")
niceRatio := flag.Float64("nice-ratio", 0, "force being 'nice', imply sleep time per chunk time; range: [0.0..100.0]. Example values: 0 is aggressive. 1: for every 1ms spent copying rows, sleep additional 1ms (effectively doubling runtime); 0.7: for every 10ms spend in a rowcopy chunk, spend 7ms sleeping immediately after")
Expand Down Expand Up @@ -295,6 +296,7 @@ func main() {
migrationContext.SetNiceRatio(*niceRatio)
migrationContext.SetChunkSize(*chunkSize)
migrationContext.SetDMLBatchSize(*dmlBatchSize)
migrationContext.SetDMLBatchConcurrencySize(*dmlBatchConcurrencySize)
migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis)
migrationContext.SetThrottleQuery(*throttleQuery)
migrationContext.SetThrottleHTTP(*throttleHTTP)
Expand Down
111 changes: 98 additions & 13 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,20 @@ const (
)

type dmlBuildResult struct {
query string
args []interface{}
rowsDelta int64
err error
query string
args []interface{}
rowsDelta int64
err error
uniqueValues []interface{} // impact of DML operations on unique values
}

func newDmlBuildResult(query string, args []interface{}, rowsDelta int64, err error) *dmlBuildResult {
func newDmlBuildResult(query string, args []interface{}, rowsDelta int64, uniqueValues []interface{}, err error) *dmlBuildResult {
return &dmlBuildResult{
query: query,
args: args,
rowsDelta: rowsDelta,
err: err,
query: query,
args: args,
rowsDelta: rowsDelta,
err: err,
uniqueValues: uniqueValues,
}
}

Expand Down Expand Up @@ -72,15 +74,22 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier {
}

func (this *Applier) InitDBConnections() (err error) {
// apply dml events、create ghost table... use this.db
applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, applierUri); err != nil {
return err
}
if int(this.migrationContext.DMLBatchConcurrencySize) > mysql.MaxDBPoolConnections {
this.db.SetMaxOpenConns(int(this.migrationContext.DMLBatchConcurrencySize))
}

// cut-over phase requires a singleton connection to the applier
singletonApplierUri := fmt.Sprintf("%s&timeout=0", applierUri)
if this.singletonDB, _, err = mysql.GetDB(this.migrationContext.Uuid, singletonApplierUri); err != nil {
return err
}
this.singletonDB.SetMaxOpenConns(1)

version, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext, this.name)
if err != nil {
return err
Expand Down Expand Up @@ -1110,12 +1119,12 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (result
case binlog.DeleteDML:
{
query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues())
return append(results, newDmlBuildResult(query, uniqueKeyArgs, -1, err))
return append(results, newDmlBuildResult(query, uniqueKeyArgs, -1, uniqueKeyArgs, err))
}
case binlog.InsertDML:
{
query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues())
return append(results, newDmlBuildResult(query, sharedArgs, 1, err))
query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues())
return append(results, newDmlBuildResult(query, sharedArgs, 1, uniqueKeyArgs, err))
}
case binlog.UpdateDML:
{
Expand All @@ -1130,7 +1139,7 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (result
args := sqlutils.Args()
args = append(args, sharedArgs...)
args = append(args, uniqueKeyArgs...)
return append(results, newDmlBuildResult(query, args, 0, err))
return append(results, newDmlBuildResult(query, args, 0, uniqueKeyArgs, err))
}
}
return append(results, newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)))
Expand Down Expand Up @@ -1196,6 +1205,82 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
return nil
}

func (this *Applier) BuildDMLEventQueryMap(dmlEvents []*binlog.BinlogDMLEvent) (resultMap map[string][]*dmlBuildResult, err error) {
resultMap = make(map[string][]*dmlBuildResult)
for _, dmlEvent := range dmlEvents {
for _, buildResult := range this.buildDMLEventQuery(dmlEvent) {
if buildResult.err != nil {
return nil, this.migrationContext.Log.Errore(buildResult.err)
}
resultMap[fmt.Sprintf("%v", buildResult.uniqueValues)] = append(resultMap[buildResult.query], buildResult)
}
}
return resultMap, nil
}

// ApplyDMLQueries concurrent applies multiple DML queries onto the _ghost_ table
func (this *Applier) ApplyDMLQueries(dmlResults []*dmlBuildResult) error {
var totalDelta int64

if len(dmlResults) == 0 {
return nil
}

err := func() error {
tx, err := this.db.Begin()
if err != nil {
return err
}

rollback := func(err error) error {
tx.Rollback()
return err
}

sessionQuery := "SET SESSION time_zone = '+00:00'"
sessionQuery = fmt.Sprintf("%s, %s", sessionQuery, this.generateSqlModeQuery())
if _, err := tx.Exec(sessionQuery); err != nil {
return rollback(err)
}
for _, buildResult := range dmlResults {
if buildResult.err != nil {
return rollback(buildResult.err)
}

result, err := tx.Exec(buildResult.query, buildResult.args...)
if err != nil {
err = fmt.Errorf("%w; query=%s; args=%+v", err, buildResult.query, buildResult.args)
return rollback(err)
}

rowsAffected, err := result.RowsAffected()
if err != nil {
log.Warningf("error getting rows affected from DML event query: %s. i'm going to assume that the DML affected a single row, but this may result in inaccurate statistics", err)
rowsAffected = 1
}
// each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1).
// multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event
totalDelta += buildResult.rowsDelta * rowsAffected
}

if err := tx.Commit(); err != nil {
return err
}
return nil
}()

if err != nil {
return this.migrationContext.Log.Errore(err)
}
// no error
atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, int64(len(dmlResults)))
if this.migrationContext.CountTableRows {
atomic.AddInt64(&this.migrationContext.RowsDeltaEstimate, totalDelta)
}
this.migrationContext.Log.Debugf("ApplyDMLEventQueries() applied %d events in one transaction", len(dmlResults))
return nil
}

func (this *Applier) Teardown() {
this.migrationContext.Log.Debugf("Tearing down...")
this.db.Close()
Expand Down
9 changes: 9 additions & 0 deletions go/logic/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) {
}

for _, column := range this.migrationContext.UniqueKey.Columns.Columns() {
// As long as there is a column that is not numeric, set the concurrency of applying binlog to 1.
if this.migrationContext.DMLBatchConcurrencySize > 1 && !column.IsInteger {
this.migrationContext.Log.Warning("Detected that the column %s in the unique index of the chunk data is not of integer type, forcing the binlog concurrency to be set to 1.", column.Name)
this.migrationContext.SetDMLBatchConcurrencySize(1)
}

if this.migrationContext.GhostTableVirtualColumns.GetColumn(column.Name) != nil {
// this is a virtual column
continue
Expand Down Expand Up @@ -598,6 +604,9 @@ func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsL
if strings.Contains(columnType, "unsigned") {
column.IsUnsigned = true
}
if strings.Contains(columnType, "int") {
column.IsInteger = true
}
if strings.Contains(columnType, "mediumint") {
column.Type = sql.MediumIntColumnType
}
Expand Down
99 changes: 90 additions & 9 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"math"
"os"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -1226,6 +1227,9 @@ func (this *Migrator) iterateChunks() error {
}

func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
batchSize := int(atomic.LoadInt64(&this.migrationContext.DMLBatchSize))
concurrencySize := int(atomic.LoadInt64(&this.migrationContext.DMLBatchConcurrencySize))

handleNonDMLEventStruct := func(eventStruct *applyEventStruct) error {
if eventStruct.writeFunc != nil {
if err := this.retryOperation(*eventStruct.writeFunc); err != nil {
Expand All @@ -1234,6 +1238,79 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
}
return nil
}

handleSerialApplyDMLEvent := func(dmlEvents []*binlog.BinlogDMLEvent) error {
// Create a task to apply the DML event; this will be execute by executeWriteFuncs()
var applyEventFunc tableWriteFunc = func() error {
return this.applier.ApplyDMLEventQueries(dmlEvents)
}
if err := this.retryOperation(applyEventFunc); err != nil {
return this.migrationContext.Log.Errore(err)
}
return nil
}

handleConcurrencyApplyDMLEvent := func(dmlEvents []*binlog.BinlogDMLEvent) error {
// map[uniqueKey]*[]dmlBuildResult, grouped by unique key and sorted in order of event sequence
var dmlResultsMap map[string][]*dmlBuildResult
var buildEventFunc tableWriteFunc = func() (err error) {
dmlResultsMap, err = this.applier.BuildDMLEventQueryMap(dmlEvents)
return err
}

if err := this.retryOperation(buildEventFunc); err != nil {
return this.migrationContext.Log.Errore(err)
}

var applyDMLResults []*dmlBuildResult
for _, dmlResults := range dmlResultsMap {
// the same unique key, only apply the last dml result,
// because the previous dml results is overwritten by the last one
applyDMLResults = append(applyDMLResults, dmlResults[len(dmlResults)-1])
}

wg := sync.WaitGroup{}

// The number of dmlEvents obtained is batchSize * concurrencySize.
// Since update unique key dml is split into delete and insert dml,
// the applyDMLResults may be larger than batchSize * concurrencySize.
// When calculating the final concurrency, the size of len(applyDMLResults)/batchSize
// may be two times the concurrencySize. Therefore, the size of errCh is max concurrencySize.
errCh := make(chan error, len(applyDMLResults)/batchSize*2)
defer func() {
close(errCh)
for range errCh {
// drain errCh
}
}()

for i := 0; i < len(applyDMLResults); i += batchSize {
end := i + batchSize
if end > len(applyDMLResults) {
end = len(applyDMLResults)
}
wg.Add(1)
go func(begin, end int) {
defer wg.Done()

var _func tableWriteFunc = func() error {
return this.applier.ApplyDMLQueries(applyDMLResults[begin:end])
}

if err := this.retryOperation(_func); err != nil {
errCh <- err
}
}(i, end)
}

wg.Wait()

if len(errCh) > 0 {
return <-errCh
}
return nil
}

if eventStruct.dmlEvent == nil {
return handleNonDMLEventStruct(eventStruct)
}
Expand All @@ -1243,11 +1320,10 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
var nonDmlStructToApply *applyEventStruct

availableEvents := len(this.applyEventsQueue)
batchSize := int(atomic.LoadInt64(&this.migrationContext.DMLBatchSize))
if availableEvents > batchSize-1 {
if availableEvents > batchSize*concurrencySize-1 {
// The "- 1" is because we already consumed one event: the original event that led to this function getting called.
// So, if DMLBatchSize==1 we wish to not process any further events
availableEvents = batchSize - 1
availableEvents = batchSize*concurrencySize - 1
}
for i := 0; i < availableEvents; i++ {
additionalStruct := <-this.applyEventsQueue
Expand All @@ -1258,13 +1334,18 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
}
dmlEvents = append(dmlEvents, additionalStruct.dmlEvent)
}
// Create a task to apply the DML event; this will be execute by executeWriteFuncs()
var applyEventFunc tableWriteFunc = func() error {
return this.applier.ApplyDMLEventQueries(dmlEvents)
}
if err := this.retryOperation(applyEventFunc); err != nil {
return this.migrationContext.Log.Errore(err)

// Apply the DML events, if concurrencySize=1, serially. Otherwise, concurrently.
if concurrencySize == 1 {
if err := handleSerialApplyDMLEvent(dmlEvents); err != nil {
return this.migrationContext.Log.Errore(err)
}
} else {
if err := handleConcurrencyApplyDMLEvent(dmlEvents); err != nil {
return this.migrationContext.Log.Errore(err)
}
}

if nonDmlStructToApply != nil {
// We pulled DML events from the queue, and then we hit a non-DML event. Wait!
// We need to handle it!
Expand Down
Loading