Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hooks: reporting GH_OST_ETA_SECONDS. ETA as part of migration context #936

Merged
merged 30 commits into from
May 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
9b2a04d
Merge pull request #2 from openark/workflow-upload-artifact
shlomi-noach Jul 28, 2020
9ccde4f
Merge pull request #5 from openark/parse-alter-statement
shlomi-noach Jul 29, 2020
b59a8ed
merged conflict
shlomi-noach Aug 2, 2020
6012e80
Merge pull request #8 from openark/ajm188-handle_driver_timeout_error
shlomi-noach Aug 2, 2020
ae22d84
v1.1.0
shlomi-noach Aug 5, 2020
ca0ca5a
Merge remote-tracking branch 'upstream/master' into updates-from-upst…
shlomi-noach Oct 18, 2020
e9f9af2
Merge pull request #11 from openark/updates-from-upstream-2020-10
shlomi-noach Oct 18, 2020
294d43b
WIP: copying AUTO_INCREMENT value to ghost table
shlomi-noach Dec 31, 2020
26f7602
greping for 'expect_table_structure' content
shlomi-noach Dec 31, 2020
75009db
Adding simple test for 'expect_table_structure' scenario
shlomi-noach Dec 31, 2020
eeab264
adding tests for AUTO_INCREMENT value after row deletes. Should initi…
shlomi-noach Dec 31, 2020
2d0281f
clear event beforehand
shlomi-noach Dec 31, 2020
af20211
parsing AUTO_INCREMENT from alter query, reading AUTO_INCREMENT from …
shlomi-noach Dec 31, 2020
31069ae
support GetUint64
shlomi-noach Dec 31, 2020
3d4dfaa
minor update to test
shlomi-noach Dec 31, 2020
63219ab
adding test for user defined AUTO_INCREMENT statement
shlomi-noach Dec 31, 2020
525a80d
Merge branch 'master' into copy-auto-increment
shlomi-noach Jan 5, 2021
ff82140
Merge pull request #12 from openark/copy-auto-increment
shlomi-noach Jan 5, 2021
7202076
Generated column as part of UNIQUE (or PRIMARY) KEY
shlomi-noach Jan 19, 2021
b7b3bfb
skip analysis of generated column data type in unique key
shlomi-noach Jan 19, 2021
253658d
Merge pull request #13 from openark/unique-key-generated-column
shlomi-noach Jan 27, 2021
4a36e24
Merge pull request #14 from ccoffey/cathal/safer_cut_over
shlomi-noach Feb 7, 2021
710c9dd
All MySQL DBs limited to max 3 concurrent/idle connections
shlomi-noach Feb 18, 2021
dea8d54
Merge branch 'master' into limit-mysql-connetions
shlomi-noach Feb 22, 2021
2b5cf78
Merge pull request #15 from openark/limit-mysql-connetions
shlomi-noach Feb 22, 2021
54000ab
hooks: reporting GH_OST_ETA_SECONDS. ETA stored as part of migration …
shlomi-noach Mar 7, 2021
51719a2
GH_OST_ETA_NANOSECONDS
shlomi-noach Mar 7, 2021
76b9c16
N/A denoted by negative value
shlomi-noach Mar 7, 2021
b688c58
ETAUnknown constant
shlomi-noach Mar 7, 2021
9ccee27
Merge branch 'master' into hooks-eta-seconds
shlomi-noach May 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const (
const (
HTTPStatusOK = 200
MaxEventsBatchSize = 1000
ETAUnknown = math.MinInt64
)

var (
Expand Down Expand Up @@ -182,6 +183,7 @@ type MigrationContext struct {
lastHeartbeatOnChangelogMutex *sync.Mutex
CurrentLag int64
currentProgress uint64
etaNanoseonds int64
ThrottleHTTPStatusCode int64
controlReplicasLagResult mysql.ReplicationLagResult
TotalRowsCopied int64
Expand Down Expand Up @@ -267,6 +269,7 @@ func NewMigrationContext() *MigrationContext {
MaxLagMillisecondsThrottleThreshold: 1500,
CutOverLockTimeoutSeconds: 3,
DMLBatchSize: 10,
etaNanoseonds: ETAUnknown,
maxLoad: NewLoadMap(),
criticalLoad: NewLoadMap(),
throttleMutex: &sync.Mutex{},
Expand Down Expand Up @@ -474,6 +477,22 @@ func (this *MigrationContext) SetProgressPct(progressPct float64) {
atomic.StoreUint64(&this.currentProgress, math.Float64bits(progressPct))
}

func (this *MigrationContext) GetETADuration() time.Duration {
return time.Duration(atomic.LoadInt64(&this.etaNanoseonds))
}

func (this *MigrationContext) SetETADuration(etaDuration time.Duration) {
atomic.StoreInt64(&this.etaNanoseonds, etaDuration.Nanoseconds())
}

func (this *MigrationContext) GetETASeconds() int64 {
nano := atomic.LoadInt64(&this.etaNanoseonds)
if nano < 0 {
return ETAUnknown
}
return nano / int64(time.Second)
}

// math.Float64bits([f=0..100])

// GetTotalRowsCopied returns the accurate number of rows being copied (affected)
Expand Down
1 change: 1 addition & 0 deletions go/logic/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) [
env = append(env, fmt.Sprintf("GH_OST_INSPECTED_LAG=%f", this.migrationContext.GetCurrentLagDuration().Seconds()))
env = append(env, fmt.Sprintf("GH_OST_HEARTBEAT_LAG=%f", this.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds()))
env = append(env, fmt.Sprintf("GH_OST_PROGRESS=%f", this.migrationContext.GetProgressPct()))
env = append(env, fmt.Sprintf("GH_OST_ETA_SECONDS=%d", this.migrationContext.GetETASeconds()))
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT=%s", this.migrationContext.HooksHintMessage))
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_OWNER=%s", this.migrationContext.HooksHintOwner))
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_TOKEN=%s", this.migrationContext.HooksHintToken))
Expand Down
19 changes: 14 additions & 5 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,20 +939,29 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
}

var etaSeconds float64 = math.MaxFloat64
eta := "N/A"
var etaDuration = time.Duration(base.ETAUnknown)
if progressPct >= 100.0 {
eta = "due"
etaDuration = 0
} else if progressPct >= 0.1 {
elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds()
totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied)
etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds
if etaSeconds >= 0 {
etaDuration := time.Duration(etaSeconds) * time.Second
eta = base.PrettifyDurationOutput(etaDuration)
etaDuration = time.Duration(etaSeconds) * time.Second
} else {
eta = "due"
etaDuration = 0
}
}
this.migrationContext.SetETADuration(etaDuration)
var eta string
switch etaDuration {
case 0:
eta = "due"
case time.Duration(base.ETAUnknown):
eta = "N/A"
default:
eta = base.PrettifyDurationOutput(etaDuration)
}

state := "migrating"
if atomic.LoadInt64(&this.migrationContext.CountingRowsFlag) > 0 && !this.migrationContext.ConcurrentCountTableRows {
Expand Down
7 changes: 5 additions & 2 deletions go/logic/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,12 @@ func (this *Throttler) collectControlReplicasLag() {
dbUri := connectionConfig.GetDBUri("information_schema")

var heartbeatValue string
if db, _, err := mysql.GetDB(this.migrationContext.Uuid, dbUri); err != nil {
db, _, err := mysql.GetDB(this.migrationContext.Uuid, dbUri)
if err != nil {
return lag, err
} else if err = db.QueryRow(replicationLagQuery).Scan(&heartbeatValue); err != nil {
}

if err := db.QueryRow(replicationLagQuery).Scan(&heartbeatValue); err != nil {
return lag, err
}

Expand Down
30 changes: 16 additions & 14 deletions go/mysql/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ import (
"github.com/outbrain/golib/sqlutils"
)

const MaxTableNameLength = 64
const MaxReplicationPasswordLength = 32
const (
MaxTableNameLength = 64
MaxReplicationPasswordLength = 32
MaxDBPoolConnections = 3
)

type ReplicationLagResult struct {
Key InstanceKey
Expand All @@ -39,23 +42,22 @@ func (this *ReplicationLagResult) HasLag() bool {
var knownDBs map[string]*gosql.DB = make(map[string]*gosql.DB)
var knownDBsMutex = &sync.Mutex{}

func GetDB(migrationUuid string, mysql_uri string) (*gosql.DB, bool, error) {
func GetDB(migrationUuid string, mysql_uri string) (db *gosql.DB, exists bool, err error) {
cacheKey := migrationUuid + ":" + mysql_uri

knownDBsMutex.Lock()
defer func() {
knownDBsMutex.Unlock()
}()

var exists bool
if _, exists = knownDBs[cacheKey]; !exists {
if db, err := gosql.Open("mysql", mysql_uri); err == nil {
knownDBs[cacheKey] = db
} else {
return db, exists, err
defer knownDBsMutex.Unlock()

if db, exists = knownDBs[cacheKey]; !exists {
db, err = gosql.Open("mysql", mysql_uri)
if err != nil {
return nil, false, err
}
db.SetMaxOpenConns(MaxDBPoolConnections)
db.SetMaxIdleConns(MaxDBPoolConnections)
knownDBs[cacheKey] = db
}
return knownDBs[cacheKey], exists, nil
return db, exists, nil
}

// GetReplicationLagFromSlaveStatus returns replication lag for a given db; via SHOW SLAVE STATUS
Expand Down