diff --git a/.github/workflows/cluster_endtoend_backup_pitr.yml b/.github/workflows/cluster_endtoend_backup_pitr.yml index 8736f6dd4df..ae030975eed 100644 --- a/.github/workflows/cluster_endtoend_backup_pitr.yml +++ b/.github/workflows/cluster_endtoend_backup_pitr.yml @@ -84,15 +84,16 @@ jobs: if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' run: | - # Get key to latest MySQL repo - sudo apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 467B942D3A79BD29 - # Setup MySQL 8.0 - wget -c https://dev.mysql.com/get/mysql-apt-config_0.8.24-1_all.deb - echo mysql-apt-config mysql-apt-config/select-server select mysql-8.0 | sudo debconf-set-selections - sudo DEBIAN_FRONTEND="noninteractive" dpkg -i mysql-apt-config* + # Setup Percona Server for MySQL 8.0 sudo apt-get update + sudo apt-get install -y lsb-release gnupg2 curl + wget https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb + sudo DEBIAN_FRONTEND="noninteractive" dpkg -i percona-release_latest.$(lsb_release -sc)_all.deb + sudo percona-release setup ps80 + sudo apt-get update + # Install everything else we need, and configure - sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget eatmydata xz-utils libncurses5 + sudo apt-get install -y percona-server-server percona-server-client make unzip g++ etcd git wget eatmydata xz-utils libncurses5 sudo service mysql stop sudo service etcd stop @@ -103,6 +104,8 @@ jobs: # install JUnit report formatter go install github.com/vitessio/go-junit-report@HEAD + sudo apt-get install percona-xtrabackup-80 lz4 + - name: Setup launchable dependencies if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && github.base_ref == 'main' run: | diff --git a/.github/workflows/cluster_endtoend_backup_pitr_mysql57.yml b/.github/workflows/cluster_endtoend_backup_pitr_mysql57.yml index 4ba286dea60..2cdbd328c95 100644 --- a/.github/workflows/cluster_endtoend_backup_pitr_mysql57.yml +++ b/.github/workflows/cluster_endtoend_backup_pitr_mysql57.yml @@ -13,6 +13,10 @@ env: LAUNCHABLE_WORKSPACE: "vitess-app" GITHUB_PR_HEAD_SHA: "${{ github.event.pull_request.head.sha }}" + # This is used if we need to pin the xtrabackup version used in tests. + # If this is NOT set then the latest version available will be used. + #XTRABACKUP_VERSION: "2.4.24-1" + jobs: build: name: Run endtoend tests on Cluster (backup_pitr) mysql57 @@ -114,6 +118,18 @@ jobs: # install JUnit report formatter go install github.com/vitessio/go-junit-report@HEAD + wget "https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb" + sudo apt-get install -y gnupg2 + sudo dpkg -i "percona-release_latest.$(lsb_release -sc)_all.deb" + sudo apt-get update + if [[ -n $XTRABACKUP_VERSION ]]; then + debfile="percona-xtrabackup-24_$XTRABACKUP_VERSION.$(lsb_release -sc)_amd64.deb" + wget "https://repo.percona.com/pxb-24/apt/pool/main/p/percona-xtrabackup-24/$debfile" + sudo apt install -y "./$debfile" + else + sudo apt-get install -y percona-xtrabackup-24 + fi + - name: Setup launchable dependencies if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && github.base_ref == 'main' run: | diff --git a/go/test/endtoend/backup/pitr/backup_mysqlctld_pitr_test.go b/go/test/endtoend/backup/pitr/backup_mysqlctld_pitr_test.go index dfb869b4de2..8cb8261606c 100644 --- a/go/test/endtoend/backup/pitr/backup_mysqlctld_pitr_test.go +++ b/go/test/endtoend/backup/pitr/backup_mysqlctld_pitr_test.go @@ -52,161 +52,184 @@ func waitForReplica(t *testing.T) { } // TestIncrementalBackupMysqlctld - tests incremental backups using myslctld -func TestIncrementalBackupMysqlctld(t *testing.T) { +func TestIncrementalBackup(t *testing.T) { defer cluster.PanicHandler(t) - // setup cluster for the testing - code, err := backup.LaunchCluster(backup.Mysqlctld, "xbstream", 0, nil) - require.NoError(t, err, "setup failed with status code %d", code) - defer backup.TearDownCluster() - - backup.InitTestTable(t) - - rowsPerPosition := map[string]int{} - backupPositions := []string{} - - recordRowsPerPosition := func(t *testing.T) { - pos := backup.GetReplicaPosition(t) - msgs := backup.ReadRowsFromReplica(t) - if _, ok := rowsPerPosition[pos]; !ok { - backupPositions = append(backupPositions, pos) - rowsPerPosition[pos] = len(msgs) - } - } - var fullBackupPos mysql.Position - t.Run("full backup", func(t *testing.T) { - backup.InsertRowOnPrimary(t, "before-full-backup") - waitForReplica(t) - manifest, _ := backup.TestReplicaFullBackup(t) - fullBackupPos = manifest.Position - require.False(t, fullBackupPos.IsZero()) - // - msgs := backup.ReadRowsFromReplica(t) - pos := mysql.EncodePosition(fullBackupPos) - backupPositions = append(backupPositions, pos) - rowsPerPosition[pos] = len(msgs) - }) - - lastBackupPos := fullBackupPos - backup.InsertRowOnPrimary(t, "before-incremental-backups") - - tt := []struct { - name string - writeBeforeBackup bool - fromFullPosition bool - autoPosition bool - expectError string + tcases := []struct { + name string + setupType int + comprss *backup.CompressionDetails }{ { - name: "first incremental backup", + "BuiltinBackup", backup.BuiltinBackup, nil, }, { - name: "make writes, succeed", - writeBeforeBackup: true, + "XtraBackup", backup.XtraBackup, &backup.CompressionDetails{ + CompressorEngineName: "pgzip", + }, }, { - name: "fail, no binary logs to backup", - expectError: "no binary logs to backup", - }, - { - name: "make writes again, succeed", - writeBeforeBackup: true, - }, - { - name: "auto position, succeed", - writeBeforeBackup: true, - autoPosition: true, - }, - { - name: "fail auto position, no binary logs to backup", - autoPosition: true, - expectError: "no binary logs to backup", - }, - { - name: "auto position, make writes again, succeed", - writeBeforeBackup: true, - autoPosition: true, - }, - { - name: "from full backup position", - fromFullPosition: true, + "Mysqlctld", backup.Mysqlctld, nil, }, } - var fromFullPositionBackups []string - for _, tc := range tt { - t.Run(tc.name, func(t *testing.T) { - if tc.writeBeforeBackup { - backup.InsertRowOnPrimary(t, "") - } - // we wait for 1 second because backups are written to a directory named after the current timestamp, - // in 1 second resolution. We want to avoid two backups that have the same pathname. Realistically this - // is only ever a problem in this end-to-end test, not in production. - // Also, we gie the replica a chance to catch up. - time.Sleep(1100 * time.Millisecond) - waitForReplica(t) - recordRowsPerPosition(t) - // configure --incremental-from-pos to either: - // - auto - // - explicit last backup pos - // - back in history to the original full backup - var incrementalFromPos mysql.Position - if !tc.autoPosition { - incrementalFromPos = lastBackupPos - if tc.fromFullPosition { - incrementalFromPos = fullBackupPos + for _, tcase := range tcases { + t.Run(tcase.name, func(t *testing.T) { + // setup cluster for the testing + code, err := backup.LaunchCluster(tcase.setupType, "xbstream", 0, tcase.comprss) + require.NoError(t, err, "setup failed with status code %d", code) + defer backup.TearDownCluster() + + backup.InitTestTable(t) + + rowsPerPosition := map[string]int{} + backupPositions := []string{} + + recordRowsPerPosition := func(t *testing.T) { + pos := backup.GetReplicaPosition(t) + msgs := backup.ReadRowsFromReplica(t) + if _, ok := rowsPerPosition[pos]; !ok { + backupPositions = append(backupPositions, pos) + rowsPerPosition[pos] = len(msgs) } } - manifest, backupName := backup.TestReplicaIncrementalBackup(t, incrementalFromPos, tc.expectError) - if tc.expectError != "" { - return - } - defer func() { - lastBackupPos = manifest.Position - }() - if tc.fromFullPosition { - fromFullPositionBackups = append(fromFullPositionBackups, backupName) - } - require.False(t, manifest.FromPosition.IsZero()) - require.NotEqual(t, manifest.Position, manifest.FromPosition) - require.True(t, manifest.Position.GTIDSet.Contains(manifest.FromPosition.GTIDSet)) - gtidPurgedPos, err := mysql.ParsePosition(mysql.Mysql56FlavorID, backup.GetReplicaGtidPurged(t)) - require.NoError(t, err) - fromPositionIncludingPurged := manifest.FromPosition.GTIDSet.Union(gtidPurgedPos.GTIDSet) + var fullBackupPos mysql.Position + t.Run("full backup", func(t *testing.T) { + backup.InsertRowOnPrimary(t, "before-full-backup") + waitForReplica(t) + + manifest, _ := backup.TestReplicaFullBackup(t) + fullBackupPos = manifest.Position + require.False(t, fullBackupPos.IsZero()) + // + msgs := backup.ReadRowsFromReplica(t) + pos := mysql.EncodePosition(fullBackupPos) + backupPositions = append(backupPositions, pos) + rowsPerPosition[pos] = len(msgs) + }) - expectFromPosition := lastBackupPos.GTIDSet.Union(gtidPurgedPos.GTIDSet) - if !incrementalFromPos.IsZero() { - expectFromPosition = incrementalFromPos.GTIDSet.Union(gtidPurgedPos.GTIDSet) + lastBackupPos := fullBackupPos + backup.InsertRowOnPrimary(t, "before-incremental-backups") + + tt := []struct { + name string + writeBeforeBackup bool + fromFullPosition bool + autoPosition bool + expectError string + }{ + { + name: "first incremental backup", + }, + { + name: "make writes, succeed", + writeBeforeBackup: true, + }, + { + name: "fail, no binary logs to backup", + expectError: "no binary logs to backup", + }, + { + name: "make writes again, succeed", + writeBeforeBackup: true, + }, + { + name: "auto position, succeed", + writeBeforeBackup: true, + autoPosition: true, + }, + { + name: "fail auto position, no binary logs to backup", + autoPosition: true, + expectError: "no binary logs to backup", + }, + { + name: "auto position, make writes again, succeed", + writeBeforeBackup: true, + autoPosition: true, + }, + { + name: "from full backup position", + fromFullPosition: true, + }, + } + var fromFullPositionBackups []string + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + if tc.writeBeforeBackup { + backup.InsertRowOnPrimary(t, "") + } + // we wait for 1 second because backups are written to a directory named after the current timestamp, + // in 1 second resolution. We want to avoid two backups that have the same pathname. Realistically this + // is only ever a problem in this end-to-end test, not in production. + // Also, we gie the replica a chance to catch up. + time.Sleep(1100 * time.Millisecond) + waitForReplica(t) + recordRowsPerPosition(t) + // configure --incremental-from-pos to either: + // - auto + // - explicit last backup pos + // - back in history to the original full backup + var incrementalFromPos mysql.Position + if !tc.autoPosition { + incrementalFromPos = lastBackupPos + if tc.fromFullPosition { + incrementalFromPos = fullBackupPos + } + } + manifest, backupName := backup.TestReplicaIncrementalBackup(t, incrementalFromPos, tc.expectError) + if tc.expectError != "" { + return + } + defer func() { + lastBackupPos = manifest.Position + }() + if tc.fromFullPosition { + fromFullPositionBackups = append(fromFullPositionBackups, backupName) + } + require.False(t, manifest.FromPosition.IsZero()) + require.NotEqual(t, manifest.Position, manifest.FromPosition) + require.True(t, manifest.Position.GTIDSet.Union(manifest.PurgedPosition.GTIDSet).Contains(manifest.FromPosition.GTIDSet)) + + gtidPurgedPos, err := mysql.ParsePosition(mysql.Mysql56FlavorID, backup.GetReplicaGtidPurged(t)) + require.NoError(t, err) + fromPositionIncludingPurged := manifest.FromPosition.GTIDSet.Union(gtidPurgedPos.GTIDSet) + + expectFromPosition := lastBackupPos.GTIDSet.Union(gtidPurgedPos.GTIDSet) + if !incrementalFromPos.IsZero() { + expectFromPosition = incrementalFromPos.GTIDSet.Union(gtidPurgedPos.GTIDSet) + } + require.Equalf(t, expectFromPosition, fromPositionIncludingPurged, "expected: %v, found: %v, gtid_purged: %v, manifest.Position: %v", expectFromPosition, fromPositionIncludingPurged, gtidPurgedPos, manifest.Position) + }) } - require.Equalf(t, expectFromPosition, fromPositionIncludingPurged, "expected: %v, found: %v", expectFromPosition, fromPositionIncludingPurged) - }) - } - testRestores := func(t *testing.T) { - for _, r := range rand.Perm(len(backupPositions)) { - pos := backupPositions[r] - testName := fmt.Sprintf("%s, %d records", pos, rowsPerPosition[pos]) - t.Run(testName, func(t *testing.T) { - restoreToPos, err := mysql.DecodePosition(pos) - require.NoError(t, err) - backup.TestReplicaRestoreToPos(t, restoreToPos, "") - msgs := backup.ReadRowsFromReplica(t) - count, ok := rowsPerPosition[pos] - require.True(t, ok) - assert.Equalf(t, count, len(msgs), "messages: %v", msgs) + testRestores := func(t *testing.T) { + for _, r := range rand.Perm(len(backupPositions)) { + pos := backupPositions[r] + testName := fmt.Sprintf("%s, %d records", pos, rowsPerPosition[pos]) + t.Run(testName, func(t *testing.T) { + restoreToPos, err := mysql.DecodePosition(pos) + require.NoError(t, err) + backup.TestReplicaRestoreToPos(t, restoreToPos, "") + msgs := backup.ReadRowsFromReplica(t) + count, ok := rowsPerPosition[pos] + require.True(t, ok) + assert.Equalf(t, count, len(msgs), "messages: %v", msgs) + }) + } + } + t.Run("PITR", func(t *testing.T) { + testRestores(t) }) - } + t.Run("remove full position backups", func(t *testing.T) { + // Delete the fromFullPosition backup(s), which leaves us with less restore options. Try again. + for _, backupName := range fromFullPositionBackups { + backup.RemoveBackup(t, backupName) + } + }) + t.Run("PITR-2", func(t *testing.T) { + testRestores(t) + }) + }) } - t.Run("PITR", func(t *testing.T) { - testRestores(t) - }) - t.Run("remove full position backups", func(t *testing.T) { - // Delete the fromFullPosition backup(s), which leaves us with less restore options. Try again. - for _, backupName := range fromFullPositionBackups { - backup.RemoveBackup(t, backupName) - } - }) - t.Run("PITR-2", func(t *testing.T) { - testRestores(t) - }) } diff --git a/go/vt/mysqlctl/backup.go b/go/vt/mysqlctl/backup.go index 2bf86a14d6e..e3359e4d9f9 100644 --- a/go/vt/mysqlctl/backup.go +++ b/go/vt/mysqlctl/backup.go @@ -135,16 +135,24 @@ func Backup(ctx context.Context, params BackupParams) error { return vterrors.Wrap(err, "StartBackup failed") } - be, err := GetBackupEngine() - if err != nil { - return vterrors.Wrap(err, "failed to find backup engine") - } // Scope stats to selected backup engine. beParams := params.Copy() beParams.Stats = params.Stats.Scope( stats.Component(stats.BackupEngine), stats.Implementation(titleCase(backupEngineImplementation)), ) + var be BackupEngine + if isIncrementalBackup(beParams) { + // Incremental backups are always done via 'builtin' engine, which copies + // appropriate binlog files. + be = BackupRestoreEngineMap[builtinBackupEngineName] + } else { + be, err = GetBackupEngine() + if err != nil { + return vterrors.Wrap(err, "failed to find backup engine") + } + } + // Take the backup, and either AbortBackup or EndBackup. usable, err := be.ExecuteBackup(ctx, beParams, bh) logger := params.Logger @@ -291,6 +299,43 @@ func ShouldRestore(ctx context.Context, params RestoreParams) (bool, error) { return checkNoDB(ctx, params.Mysqld, params.DbName) } +// ensureRestoredGTIDPurgedMatchesManifest sees the following: when you restore a full backup, you want the MySQL server to have +// @@gtid_purged == . This then also implies that @@gtid_executed equals same value. This is because we restore without +// any binary logs. +func ensureRestoredGTIDPurgedMatchesManifest(ctx context.Context, manifest *BackupManifest, params *RestoreParams) error { + if manifest == nil { + return nil + } + if manifest.Position.GTIDSet == nil { + return nil + } + gtid := manifest.Position.GTIDSet.String() + if gtid == "" { + return nil + } + // Xtrabackup 2.4's restore seems to set @@gtid_purged to be the @@gtid_purged at the time of backup. But this is not + // the desired value. We want to set @@gtid_purged to be the @@gtid_executed of the backup. + // As reminder, when restoring from a full backup, setting @@gtid_purged also sets @@gtid_executed. + restoredGTIDPurgedPos, err := params.Mysqld.GetGTIDPurged(ctx) + if err != nil { + return vterrors.Wrapf(err, "failed to read gtid_purged after restore") + } + if restoredGTIDPurgedPos.Equal(manifest.Position) { + return nil + } + params.Logger.Infof("Restore: @@gtid_purged does not equal manifest's GTID position. Setting @@gtid_purged to %v", gtid) + // This is not good. We want to apply a new @@gtid_purged value. + query := "RESET MASTER" // required dialect in 5.7 + if _, err := params.Mysqld.FetchSuperQuery(ctx, query); err != nil { + return vterrors.Wrapf(err, "error issuing %v", query) + } + query = fmt.Sprintf("SET GLOBAL gtid_purged='%s'", gtid) + if _, err := params.Mysqld.FetchSuperQuery(ctx, query); err != nil { + return vterrors.Wrapf(err, "failed to apply `%s` after restore", query) + } + return nil +} + // Restore is the main entry point for backup restore. If there is no // appropriate backup on the BackupStorage, Restore logs an error // and returns ErrNoBackup. Any other error is returned. @@ -382,8 +427,7 @@ func Restore(ctx context.Context, params RestoreParams) (*BackupManifest, error) // of those who can connect. params.Logger.Infof("Restore: starting mysqld for mysql_upgrade") // Note Start will use dba user for waiting, this is fine, it will be allowed. - err = params.Mysqld.Start(context.Background(), params.Cnf, "--skip-grant-tables", "--skip-networking") - if err != nil { + if err := params.Mysqld.Start(context.Background(), params.Cnf, "--skip-grant-tables", "--skip-networking"); err != nil { return nil, err } @@ -395,19 +439,23 @@ func Restore(ctx context.Context, params RestoreParams) (*BackupManifest, error) // The MySQL manual recommends restarting mysqld after running mysql_upgrade, // so that any changes made to system tables take effect. params.Logger.Infof("Restore: restarting mysqld after mysql_upgrade") - err = params.Mysqld.Shutdown(context.Background(), params.Cnf, true) - if err != nil { + if err := params.Mysqld.Shutdown(context.Background(), params.Cnf, true); err != nil { return nil, err } - err = params.Mysqld.Start(context.Background(), params.Cnf) - if err != nil { + if err := params.Mysqld.Start(context.Background(), params.Cnf); err != nil { + return nil, err + } + if err = ensureRestoredGTIDPurgedMatchesManifest(ctx, manifest, ¶ms); err != nil { return nil, err } if handles := restorePath.IncrementalBackupHandles(); len(handles) > 0 { params.Logger.Infof("Restore: applying %v incremental backups", len(handles)) + // Incremental restores are always done via 'builtin' engine, which copies + // appropriate binlog files. + builtInRE := BackupRestoreEngineMap[builtinBackupEngineName] for _, bh := range handles { - manifest, err := re.ExecuteRestore(ctx, params, bh) + manifest, err := builtInRE.ExecuteRestore(ctx, params, bh) if err != nil { return nil, err } diff --git a/go/vt/mysqlctl/backupengine.go b/go/vt/mysqlctl/backupengine.go index d41780ca9e9..57d8a67cc13 100644 --- a/go/vt/mysqlctl/backupengine.go +++ b/go/vt/mysqlctl/backupengine.go @@ -167,6 +167,11 @@ func init() { } } +// isIncrementalBackup is a convenience function to check whether the params indicate an incremental backup request +func isIncrementalBackup(params BackupParams) bool { + return params.IncrementalFromPos != "" +} + func registerBackupEngineFlags(fs *pflag.FlagSet) { fs.StringVar(&backupEngineImplementation, "backup_engine_implementation", backupEngineImplementation, "Specifies which implementation to use for creating new backups (builtin or xtrabackup). Restores will always be done with whichever engine created a given backup.") } diff --git a/go/vt/mysqlctl/binlogs_gtid.go b/go/vt/mysqlctl/binlogs_gtid.go index 4d4c7fa3bcc..70f734b7cae 100644 --- a/go/vt/mysqlctl/binlogs_gtid.go +++ b/go/vt/mysqlctl/binlogs_gtid.go @@ -64,7 +64,6 @@ func ChooseBinlogsForIncrementalBackup( incrementalBackupToGTID string, err error, ) { - var prevGTIDsUnion mysql.GTIDSet for i, binlog := range binaryLogs { previousGtids, err := pgtids(ctx, binlog) @@ -108,6 +107,15 @@ func ChooseBinlogsForIncrementalBackup( if err != nil { return nil, "", "", vterrors.Wrapf(err, "cannot evaluate incremental backup from pos") } + if incrementalBackupFromGTID == "" { + // This can happen on the very first binary log file. It happens in two scenarios: + // 1. This is the first binlog ever in the history of the mysql server; the GTID is truly empty + // 2. A full backup was taken and restored, with all binlog scrapped. + // We take for granted that the first binary log file covers the + // requested "from GTID" + incrementalBackupFromGTID = backupFromGTIDSet.String() + } + // The Previous-GTIDs of the binary logs that _follows_ our binary-logs-to-backup indicates // the backup's position. incrementalBackupToGTID, err := pgtids(ctx, binaryLogs[len(binaryLogs)-1]) diff --git a/go/vt/mysqlctl/binlogs_gtid_test.go b/go/vt/mysqlctl/binlogs_gtid_test.go index a5d748d0d34..f6ccb1e9fbe 100644 --- a/go/vt/mysqlctl/binlogs_gtid_test.go +++ b/go/vt/mysqlctl/binlogs_gtid_test.go @@ -107,6 +107,45 @@ func TestChooseBinlogsForIncrementalBackup(t *testing.T) { backupPos: "16b1039f-0000-0000-0000-000000000000:1-63", expectError: "Mismatching GTID entries", }, + { + name: "empty previous GTIDs in first binlog covering backup pos", + previousGTIDs: map[string]string{ + "vt-bin.000001": "", + "vt-bin.000002": "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-60", + "vt-bin.000003": "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-60", + "vt-bin.000004": "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-78", + "vt-bin.000005": "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-243", + "vt-bin.000006": "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-331", + }, + backupPos: "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-30", + expectBinlogs: []string{"vt-bin.000001", "vt-bin.000002", "vt-bin.000003", "vt-bin.000004", "vt-bin.000005"}, + }, + { + name: "empty previous GTIDs in first binlog not covering backup pos", + previousGTIDs: map[string]string{ + "vt-bin.000001": "", + "vt-bin.000002": "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-60", + "vt-bin.000003": "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-60", + "vt-bin.000004": "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-78", + "vt-bin.000005": "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-243", + "vt-bin.000006": "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-331", + }, + backupPos: "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-65", + expectBinlogs: []string{"vt-bin.000003", "vt-bin.000004", "vt-bin.000005"}, + }, + { + name: "empty previous GTIDs in first binlog not covering backup pos, 2", + previousGTIDs: map[string]string{ + "vt-bin.000001": "", + "vt-bin.000002": "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-60", + "vt-bin.000003": "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-60", + "vt-bin.000004": "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-78", + "vt-bin.000005": "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-243", + "vt-bin.000006": "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-331", + }, + backupPos: "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-100", + expectBinlogs: []string{"vt-bin.000004", "vt-bin.000005"}, + }, { name: "match with non strictly monotonic sequence", previousGTIDs: map[string]string{ @@ -212,7 +251,9 @@ func TestChooseBinlogsForIncrementalBackup(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, binlogsToBackup) assert.Equal(t, tc.expectBinlogs, binlogsToBackup) - assert.Equal(t, tc.previousGTIDs[binlogsToBackup[0]], fromGTID) + if tc.previousGTIDs[binlogsToBackup[0]] != "" { + assert.Equal(t, tc.previousGTIDs[binlogsToBackup[0]], fromGTID) + } assert.Equal(t, tc.previousGTIDs[binlogs[len(binlogs)-1]], toGTID) assert.NotEqual(t, fromGTID, toGTID) }) diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index 1dabb41d131..0fc4e9fabff 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -153,11 +153,6 @@ func registerBuiltinBackupEngineFlags(fs *pflag.FlagSet) { fs.UintVar(&builtinBackupFileWriteBufferSize, "builtinbackup-file-write-buffer-size", builtinBackupFileWriteBufferSize, "write files using an IO buffer of this many bytes. Golang defaults are used when set to 0.") } -// isIncrementalBackup is a convenience function to check whether the params indicate an incremental backup request -func isIncrementalBackup(params BackupParams) bool { - return params.IncrementalFromPos != "" -} - // fullPath returns the full path of the entry, based on its type func (fe *FileEntry) fullPath(cnf *Mycnf) (string, error) { // find the root to use @@ -224,18 +219,18 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par return false, vterrors.Wrap(err, "can't get server uuid") } // @@gtid_purged - getPurgedGTIDSet := func() (mysql.Mysql56GTIDSet, error) { + getPurgedGTIDSet := func() (mysql.Position, mysql.Mysql56GTIDSet, error) { gtidPurged, err := params.Mysqld.GetGTIDPurged(ctx) if err != nil { - return nil, vterrors.Wrap(err, "can't get @@gtid_purged") + return gtidPurged, nil, vterrors.Wrap(err, "can't get @@gtid_purged") } purgedGTIDSet, ok := gtidPurged.GTIDSet.(mysql.Mysql56GTIDSet) if !ok { - return nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "cannot get MySQL GTID purged value: %v", gtidPurged) + return gtidPurged, nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "cannot get MySQL GTID purged value: %v", gtidPurged) } - return purgedGTIDSet, nil + return gtidPurged, purgedGTIDSet, nil } - purgedGTIDSet, err := getPurgedGTIDSet() + gtidPurged, purgedGTIDSet, err := getPurgedGTIDSet() if err != nil { return false, err } @@ -333,7 +328,7 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par // incrementalBackupFromGTID is the "previous GTIDs" of the first binlog file we back up. // It is a fact that incrementalBackupFromGTID is earlier or equal to params.IncrementalFromPos. // In the backup manifest file, we document incrementalBackupFromGTID, not the user's requested position. - if err := be.backupFiles(ctx, params, bh, incrementalBackupToPosition, mysql.Position{}, incrementalBackupFromPosition, binaryLogsToBackup, serverUUID); err != nil { + if err := be.backupFiles(ctx, params, bh, incrementalBackupToPosition, gtidPurged, incrementalBackupFromPosition, binaryLogsToBackup, serverUUID); err != nil { return false, err } return true, nil @@ -418,10 +413,6 @@ func (be *BuiltinBackupEngine) executeFullBackup(ctx context.Context, params Bac return false, vterrors.Wrap(err, "can't get gtid_purged") } - if err != nil { - return false, vterrors.Wrap(err, "can't get purged position") - } - serverUUID, err := params.Mysqld.GetServerUUID(ctx) if err != nil { return false, vterrors.Wrap(err, "can't get server uuid") @@ -522,7 +513,6 @@ func (be *BuiltinBackupEngine) backupFiles( binlogFiles []string, serverUUID string, ) (finalErr error) { - // Get the files to backup. // We don't care about totalSize because we add each file separately. var fes []FileEntry @@ -863,7 +853,7 @@ func (be *BuiltinBackupEngine) executeRestoreIncrementalBackup(ctx context.Conte return vterrors.Wrap(err, "failed to restore file") } if err := mysqld.ApplyBinlogFile(ctx, binlogFile, params.RestoreToPos); err != nil { - return vterrors.Wrap(err, "failed to extract binlog file") + return vterrors.Wrapf(err, "failed to apply binlog file %v", binlogFile) } defer os.Remove(binlogFile) params.Logger.Infof("Applied binlog file: %v", binlogFile) @@ -1126,5 +1116,5 @@ func getPrimaryPosition(ctx context.Context, tmc tmclient.TabletManagerClient, t } func init() { - BackupRestoreEngineMap["builtin"] = &BuiltinBackupEngine{} + BackupRestoreEngineMap[builtinBackupEngineName] = &BuiltinBackupEngine{} } diff --git a/go/vt/mysqlctl/fakemysqldaemon.go b/go/vt/mysqlctl/fakemysqldaemon.go index 0bd703a25de..9cf64914f67 100644 --- a/go/vt/mysqlctl/fakemysqldaemon.go +++ b/go/vt/mysqlctl/fakemysqldaemon.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "reflect" + "regexp" "strings" "sync" "sync/atomic" @@ -534,11 +535,15 @@ func (fmd *FakeMysqlDaemon) FetchSuperQuery(ctx context.Context, query string) ( return nil, fmt.Errorf("unexpected query: %v", query) } - qr, ok := fmd.FetchSuperQueryMap[query] - if !ok { - return nil, fmt.Errorf("unexpected query: %v", query) + if qr, ok := fmd.FetchSuperQueryMap[query]; ok { + return qr, nil + } + for k, qr := range fmd.FetchSuperQueryMap { + if ok, _ := regexp.MatchString(k, query); ok { + return qr, nil + } } - return qr, nil + return nil, fmt.Errorf("unexpected query: %v", query) } // Close is part of the MysqlDaemon interface diff --git a/go/vt/mysqlctl/mysqld.go b/go/vt/mysqlctl/mysqld.go index 994e47c67c9..92c7b5aa48e 100644 --- a/go/vt/mysqlctl/mysqld.go +++ b/go/vt/mysqlctl/mysqld.go @@ -50,6 +50,7 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl/mysqlctlclient" "vitess.io/vitess/go/vt/servenv" + "vitess.io/vitess/go/vt/vterrors" vtenv "vitess.io/vitess/go/vt/env" ) @@ -1063,7 +1064,7 @@ socket=%v `, connParams.Uname, connParams.Pass, connParams.UnixSocket) } - tmpfile, err := os.CreateTemp("", "example") + tmpfile, err := os.CreateTemp("", "defaults-extra-file-") if err != nil { return "", err } @@ -1202,12 +1203,13 @@ func (mysqld *Mysqld) ApplyBinlogFile(ctx context.Context, binlogFile string, re mysqlbinlogCmd = exec.Command(name, args...) mysqlbinlogCmd.Dir = dir mysqlbinlogCmd.Env = env - log.Infof("ApplyBinlogFile: running %#v", mysqlbinlogCmd) + log.Infof("ApplyBinlogFile: running mysqlbinlog command: %#v", mysqlbinlogCmd) pipe, err = mysqlbinlogCmd.StdoutPipe() // to be piped into mysql if err != nil { return err } } + var mysqlErrFile *os.File { name, err := binaryPath(dir, "mysql") if err != nil { @@ -1219,13 +1221,19 @@ func (mysqld *Mysqld) ApplyBinlogFile(ctx context.Context, binlogFile string, re } cnf, err := mysqld.defaultsExtraFile(params) if err != nil { - return err + return vterrors.Wrapf(err, "failed to create defaults extra file") } defer os.Remove(cnf) args := []string{ "--defaults-extra-file=" + cnf, } + mysqlErrFile, err = os.CreateTemp("", "err-mysql-") + if err != nil { + return err + } + defer os.Remove(mysqlErrFile.Name()) + // We disable super_read_only, in case it is in the default MySQL startup // parameters. We do it blindly, since this will fail on MariaDB, which doesn't // have super_read_only This is safe, since we're restarting MySQL after the restore anyway @@ -1252,20 +1260,29 @@ func (mysqld *Mysqld) ApplyBinlogFile(ctx context.Context, binlogFile string, re mysqlCmd.Dir = dir mysqlCmd.Env = env mysqlCmd.Stdin = pipe // piped from mysqlbinlog + + mysqlCmd.Stderr = mysqlErrFile + log.Infof("ApplyBinlogFile: running mysql command: %#v with errfile=%v", mysqlCmd, mysqlErrFile.Name()) } // Run both processes, piped: if err := mysqlbinlogCmd.Start(); err != nil { return err } if err := mysqlCmd.Start(); err != nil { - return err + return vterrors.Wrapf(err, "failed to start mysql") } // Wait for both to complete: if err := mysqlbinlogCmd.Wait(); err != nil { - return err + return vterrors.Wrapf(err, "mysqlbinlog command failed") } if err := mysqlCmd.Wait(); err != nil { - return err + if mysqlErrFile != nil { + errFileContent, _ := os.ReadFile(mysqlErrFile.Name()) + if len(errFileContent) > 0 { + err = vterrors.Wrapf(err, "with error output: %s", string(errFileContent)) + } + } + return vterrors.Wrapf(err, "waiting on mysql command") } return nil } diff --git a/go/vt/mysqlctl/xtrabackupengine.go b/go/vt/mysqlctl/xtrabackupengine.go index 5ae60266089..cfcf7344301 100644 --- a/go/vt/mysqlctl/xtrabackupengine.go +++ b/go/vt/mysqlctl/xtrabackupengine.go @@ -167,9 +167,18 @@ func closeFile(wc io.WriteCloser, fileName string, logger logutil.Logger, finalE } } -// ExecuteBackup returns a boolean that indicates if the backup is usable, +// ExecuteBackup runs a backup based on given params. This could be a full or incremental backup. +// The function returns a boolean that indicates if the backup is usable, and an overall error. +func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle) (bool, error) { + params.Logger.Infof("Executing Backup at %v for keyspace/shard %v/%v on tablet %v, concurrency: %v, compress: %v, incrementalFromPos: %v", + params.BackupTime, params.Keyspace, params.Shard, params.TabletAlias, params.Concurrency, backupStorageCompress, params.IncrementalFromPos) + + return be.executeFullBackup(ctx, params, bh) +} + +// executeFullBackup returns a boolean that indicates if the backup is usable, // and an overall error. -func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle) (complete bool, finalErr error) { +func (be *XtrabackupEngine) executeFullBackup(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle) (complete bool, finalErr error) { if params.IncrementalFromPos != "" { return false, vterrors.New(vtrpc.Code_INVALID_ARGUMENT, "incremental backups not supported in xtrabackup engine.") @@ -232,14 +241,15 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, params BackupPara bm := &xtraBackupManifest{ // Common base fields BackupManifest: BackupManifest{ - BackupMethod: xtrabackupEngineName, - Position: replicationPosition, - ServerUUID: serverUUID, - TabletAlias: params.TabletAlias, - Keyspace: params.Keyspace, - Shard: params.Shard, - BackupTime: params.BackupTime.UTC().Format(time.RFC3339), - FinishedTime: time.Now().UTC().Format(time.RFC3339), + BackupMethod: xtrabackupEngineName, + Position: replicationPosition, + PurgedPosition: replicationPosition, + ServerUUID: serverUUID, + TabletAlias: params.TabletAlias, + Keyspace: params.Keyspace, + Shard: params.Shard, + BackupTime: params.BackupTime.UTC().Format(time.RFC3339), + FinishedTime: time.Now().UTC().Format(time.RFC3339), }, // XtraBackup-specific fields @@ -266,7 +276,14 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, params BackupPara return true, nil } -func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle, backupFileName string, numStripes int, flavor string) (replicationPosition mysql.Position, finalErr error) { +func (be *XtrabackupEngine) backupFiles( + ctx context.Context, + params BackupParams, + bh backupstorage.BackupHandle, + backupFileName string, + numStripes int, + flavor string, +) (replicationPosition mysql.Position, finalErr error) { backupProgram := path.Join(xtrabackupEnginePath, xtrabackupBinaryName) flagsToExec := []string{"--defaults-file=" + params.Cnf.Path, diff --git a/go/vt/wrangler/testlib/backup_test.go b/go/vt/wrangler/testlib/backup_test.go index 5956add57f6..b845a6d7104 100644 --- a/go/vt/wrangler/testlib/backup_test.go +++ b/go/vt/wrangler/testlib/backup_test.go @@ -192,6 +192,11 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error { "FAKE SET MASTER", "START SLAVE", } + sourceTablet.FakeMysqlDaemon.FetchSuperQueryMap = map[string]*sqltypes.Result{ + "SHOW DATABASES": {}, + "RESET MASTER": {}, + "SET GLOBAL gtid_purged": {}, + } sourceTablet.StartActionLoop(t, wr) defer sourceTablet.StopActionLoop(t) @@ -239,7 +244,9 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error { "START SLAVE", } destTablet.FakeMysqlDaemon.FetchSuperQueryMap = map[string]*sqltypes.Result{ - "SHOW DATABASES": {}, + "SHOW DATABASES": {}, + "RESET MASTER": {}, + "SET GLOBAL gtid_purged": {}, } destTablet.FakeMysqlDaemon.SetReplicationPositionPos = sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition destTablet.FakeMysqlDaemon.SetReplicationSourceInputs = append(destTablet.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet)) @@ -281,7 +288,9 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error { } primary.FakeMysqlDaemon.FetchSuperQueryMap = map[string]*sqltypes.Result{ - "SHOW DATABASES": {}, + "SHOW DATABASES": {}, + "RESET MASTER": {}, + "SET GLOBAL gtid_purged": {}, } primary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{ "STOP SLAVE", @@ -493,7 +502,9 @@ func TestBackupRestoreLagged(t *testing.T) { "START SLAVE", } destTablet.FakeMysqlDaemon.FetchSuperQueryMap = map[string]*sqltypes.Result{ - "SHOW DATABASES": {}, + "SHOW DATABASES": {}, + "RESET MASTER": {}, + "SET GLOBAL gtid_purged": {}, } destTablet.FakeMysqlDaemon.SetReplicationPositionPos = destTablet.FakeMysqlDaemon.CurrentPrimaryPosition destTablet.FakeMysqlDaemon.SetReplicationSourceInputs = append(destTablet.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet)) @@ -684,7 +695,9 @@ func TestRestoreUnreachablePrimary(t *testing.T) { "START SLAVE", } destTablet.FakeMysqlDaemon.FetchSuperQueryMap = map[string]*sqltypes.Result{ - "SHOW DATABASES": {}, + "SHOW DATABASES": {}, + "RESET MASTER": {}, + "SET GLOBAL gtid_purged": {}, } destTablet.FakeMysqlDaemon.SetReplicationPositionPos = sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition destTablet.FakeMysqlDaemon.SetReplicationSourceInputs = append(destTablet.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet)) @@ -841,7 +854,9 @@ func TestDisableActiveReparents(t *testing.T) { "FAKE SET SLAVE POSITION", } destTablet.FakeMysqlDaemon.FetchSuperQueryMap = map[string]*sqltypes.Result{ - "SHOW DATABASES": {}, + "SHOW DATABASES": {}, + "RESET MASTER": {}, + "SET GLOBAL gtid_purged": {}, } destTablet.FakeMysqlDaemon.SetReplicationPositionPos = sourceTablet.FakeMysqlDaemon.CurrentPrimaryPosition destTablet.FakeMysqlDaemon.SetReplicationSourceInputs = append(destTablet.FakeMysqlDaemon.SetReplicationSourceInputs, topoproto.MysqlAddr(primary.Tablet)) diff --git a/test/ci_workflow_gen.go b/test/ci_workflow_gen.go index 2cd716ee615..711beb23717 100644 --- a/test/ci_workflow_gen.go +++ b/test/ci_workflow_gen.go @@ -131,6 +131,7 @@ var ( clustersRequiringXtraBackup = []string{ "xb_backup", "xb_recovery", + "backup_pitr", } clustersRequiringMakeTools = []string{ "18",