diff --git a/pkg/providers/mysql/source.go b/pkg/providers/mysql/source.go index 78413a53..7220cb33 100644 --- a/pkg/providers/mysql/source.go +++ b/pkg/providers/mysql/source.go @@ -609,7 +609,10 @@ func NewSource(src *MysqlSource, transferID string, objects *model.DataObjects, } rollbacks.Add(storage.Close) - flavor, _ := CheckMySQLVersion(storage) + flavor, _, err := CheckMySQLVersion(storage) + if err != nil { + return nil, xerrors.Errorf("unable to check MySQL version: %w", err) + } config := new(Config) diff --git a/pkg/providers/mysql/storage.go b/pkg/providers/mysql/storage.go index 4b3f9eac..48511eda 100644 --- a/pkg/providers/mysql/storage.go +++ b/pkg/providers/mysql/storage.go @@ -332,7 +332,11 @@ func (s *Storage) LoadTable(ctx context.Context, table abstract.TableDescription func (s *Storage) getBinlogPosition(ctx context.Context, tx Queryable) (string, uint32, error) { masterStatusQuery := "show master status;" - _, version := CheckMySQLVersion(s) + _, version, err := CheckMySQLVersion(s) + if err != nil { + return "", 0, xerrors.Errorf("unable to check MySQL version: %w", err) + } + if strings.HasPrefix(version, "8.4") { masterStatusQuery = " SHOW BINARY LOG STATUS;" } diff --git a/pkg/providers/mysql/sync_binlog_position.go b/pkg/providers/mysql/sync_binlog_position.go index a0d10f50..2d04dab3 100644 --- a/pkg/providers/mysql/sync_binlog_position.go +++ b/pkg/providers/mysql/sync_binlog_position.go @@ -49,7 +49,11 @@ func SyncBinlogPosition(src *MysqlSource, id string, cp coordinator.Coordinator) return xerrors.Errorf("failed to get log file position: %w", err) } - flavor, _ := CheckMySQLVersion(storage) + flavor, _, err := CheckMySQLVersion(storage) + if err != nil { + return xerrors.Errorf("unable to check MySQL version: %w", err) + } + gtidModeEnabled, err := IsGtidModeEnabled(storage, flavor) if err != nil { return xerrors.Errorf("Unable to check gtid mode: %w", err) diff --git a/pkg/providers/mysql/tasks.go b/pkg/providers/mysql/tasks.go index 527d30eb..899582ca 100644 --- a/pkg/providers/mysql/tasks.go +++ b/pkg/providers/mysql/tasks.go @@ -53,7 +53,11 @@ func RemoveTracker(src *MysqlSource, id string, cp coordinator.Coordinator) erro } defer func() { _ = storage.DB.Close() }() - flavor, _ := CheckMySQLVersion(storage) + flavor, _, err := CheckMySQLVersion(storage) + if err != nil { + return xerrors.Errorf("unable to check MySQL version: %w", err) + } + enabled, err := IsGtidModeEnabled(storage, flavor) if err != nil { return xerrors.Errorf("Unable to check gtid mode: %w", err) diff --git a/pkg/providers/mysql/utils.go b/pkg/providers/mysql/utils.go index 2ab2a646..b4a04f05 100644 --- a/pkg/providers/mysql/utils.go +++ b/pkg/providers/mysql/utils.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/cenkalti/backoff/v4" "github.com/doublecloud/transfer/internal/logger" "github.com/doublecloud/transfer/library/go/core/xerrors" "github.com/doublecloud/transfer/pkg/abstract" @@ -321,31 +322,42 @@ func IsGtidModeEnabled(storage *Storage, flavor string) (bool, error) { return strings.ToLower(gtidMode) == "on", nil } -func CheckMySQLVersion(storage *Storage) (string, string) { +func CheckMySQLVersion(storage *Storage) (string, string, error) { flavor := mysql.MySQLFlavor var version string - rows, err := storage.DB.Query("SHOW VARIABLES LIKE '%version%';") + var rows *sql.Rows + err := backoff.RetryNotify( + func() error { + var err error + rows, err = storage.DB.Query("SHOW VARIABLES LIKE '%version%';") + return err + }, + backoff.NewExponentialBackOff(backoff.WithMaxElapsedTime(time.Minute)), + func(err error, d time.Duration) { + msg := fmt.Sprintf("Unable to check version, will retry after %s", d.String()) + logger.Log.Warn(msg, log.Error(err)) + }, + ) if err != nil { - logger.Log.Warnf("Unable to check version: %v", err) + return "", "", xerrors.Errorf("unable to check version: %w", err) } + defer rows.Close() for rows.Next() { var name, val string if err := rows.Scan(&name, &val); err != nil { - logger.Log.Warnf("Unable to parse version variable: %v", err) - continue + return "", "", xerrors.Errorf("unable to parse version variable: %w", err) } if name == "version" { logger.Log.Infof("MySQL version is %v", val) - val = strings.ToLower(val) if strings.Contains(strings.ToLower(val), "mariadb") { - return mysql.MariaDBFlavor, val + return mysql.MariaDBFlavor, val, nil } version = val break } } - return flavor, version + return flavor, version, nil }