Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental backup & recovery: restore-to-timestamp #13270

Merged
merged 27 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c76df4d
incremental backup and recovery: restore-to-timestamp
shlomi-noach Jun 8, 2023
5b4e575
debug info
shlomi-noach Jun 8, 2023
9bdb38e
use pointer
shlomi-noach Jun 8, 2023
75cea8c
a bit prettier
shlomi-noach Jun 8, 2023
419267c
temporary mistake for better logging
shlomi-noach Jun 8, 2023
78ffd6e
restore true file name
shlomi-noach Jun 8, 2023
f20bc71
debugging info. This only happens in GitHub CI
shlomi-noach Jun 8, 2023
d53de8f
adding validation
shlomi-noach Jun 8, 2023
3ce3a05
log request in error
shlomi-noach Jun 8, 2023
7d9f6b5
more validation
shlomi-noach Jun 8, 2023
71e00a1
remove debug info
shlomi-noach Jun 8, 2023
f076088
support for 5.7 timestamp encoding
shlomi-noach Jun 8, 2023
df2368d
refactor, unit test
shlomi-noach Jun 14, 2023
4981c30
fix refactor: only update variables when match is found
shlomi-noach Jun 14, 2023
531dc0c
Support --restore_to_timestamp
shlomi-noach Jun 14, 2023
057e35a
unexplode ApplyBinlogFileRequest in ApplyBinlogFile()
shlomi-noach Jun 14, 2023
19d9957
handle nil pointer
shlomi-noach Jun 15, 2023
eca663e
endtoend: testing --restore_to_timestamp
shlomi-noach Jun 15, 2023
22c1565
testing Xtrabackup and Mysqlctld
shlomi-noach Jun 15, 2023
fbc55d7
include Xtrabackup
shlomi-noach Jun 15, 2023
aefc777
resolved conflict
shlomi-noach Jul 5, 2023
a3ccd11
refactor: reuse 'backup_pitr' and 'backup_pitr_xtrabackup'
shlomi-noach Jul 5, 2023
e0c7804
removed restore_to_time endtoend tests as these have been merged in b…
shlomi-noach Jul 5, 2023
c72798a
make generate_ci_workflows
shlomi-noach Jul 5, 2023
15d63c5
resolved conflict
shlomi-noach Jul 13, 2023
1444de1
resolved conflict
shlomi-noach Jul 25, 2023
4e643ad
empty commit to kick CI
shlomi-noach Jul 25, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jobs:
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
uses: actions/setup-go@v4
with:
go-version: 1.20.4
go-version: 1.20.5

- name: Set up python
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jobs:
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
uses: actions/setup-go@v4
with:
go-version: 1.20.4
go-version: 1.20.5

- name: Set up python
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
Expand Down
24 changes: 24 additions & 0 deletions go/test/endtoend/backup/pitr/backup_pitr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,27 @@ func TestIncrementalBackupAndRestoreToPos(t *testing.T) {
}
backup.ExecTestIncrementalBackupAndRestoreToPos(t, tcase)
}

// TestIncrementalBackupAndRestoreToTimestamp - tests incremental backups and restores.
// The general outline of the test:
// - Generate some schema with data
// - Take a full backup
// - Proceed to take a series of inremental backups. In between, inject data (insert rows), and keep record
// of which data (number of rows) is present in each backup, and at which timestamp.
// - Expect backups success/failure per scenario
// - Next up, we start testing restores. Randomly pick recorded timestamps and restore to those points in time.
// - In each restore, excpect to find the data (number of rows) recorded for said timestamp
// - Some restores should fail because the timestamp exceeds the last binlog
// - Do so for all recorded tiemstamps.
// - Then, a 2nd round where some backups are purged -- this tests to see that we're still able to find a restore path
// (of course we only delete backups that still leave us with valid restore paths).
//
// All of the above is done for BuiltinBackup, XtraBackup, Mysqlctld (which is technically builtin)
func TestIncrementalBackupAndRestoreToTimestamp(t *testing.T) {
tcase := &backup.PITRTestCase{
Name: "BuiltinBackup",
SetupType: backup.BuiltinBackup,
ComprssDetails: nil,
}
backup.ExecTestIncrementalBackupAndRestoreToTimestamp(t, tcase)
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,29 @@ func TestIncrementalBackupAndRestoreToPos(t *testing.T) {
}
backup.ExecTestIncrementalBackupAndRestoreToPos(t, tcase)
}

// TestIncrementalBackupAndRestoreToTimestamp - tests incremental backups and restores.
// The general outline of the test:
// - Generate some schema with data
// - Take a full backup
// - Proceed to take a series of inremental backups. In between, inject data (insert rows), and keep record
// of which data (number of rows) is present in each backup, and at which timestamp.
// - Expect backups success/failure per scenario
// - Next up, we start testing restores. Randomly pick recorded timestamps and restore to those points in time.
// - In each restore, excpect to find the data (number of rows) recorded for said timestamp
// - Some restores should fail because the timestamp exceeds the last binlog
// - Do so for all recorded tiemstamps.
// - Then, a 2nd round where some backups are purged -- this tests to see that we're still able to find a restore path
// (of course we only delete backups that still leave us with valid restore paths).
//
// All of the above is done for BuiltinBackup, XtraBackup, Mysqlctld (which is technically builtin)
func TestIncrementalBackupAndRestoreToTimestamp(t *testing.T) {
tcase := &backup.PITRTestCase{
Name: "XtraBackup",
SetupType: backup.XtraBackup,
ComprssDetails: &backup.CompressionDetails{
CompressorEngineName: "pgzip",
},
}
backup.ExecTestIncrementalBackupAndRestoreToTimestamp(t, tcase)
}
13 changes: 13 additions & 0 deletions go/test/endtoend/backup/vtctlbackup/backup_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,6 +1190,19 @@ func TestReplicaRestoreToPos(t *testing.T, restoreToPos mysql.Position, expectEr
verifyTabletRestoreStats(t, replica1.VttabletProcess.GetVars())
}

func TestReplicaRestoreToTimestamp(t *testing.T, restoreToTimestamp time.Time, expectError string) {
require.False(t, restoreToTimestamp.IsZero())
restoreToTimestampArg := mysqlctl.FormatRFC3339(restoreToTimestamp)
output, err := localCluster.VtctlclientProcess.ExecuteCommandWithOutput("RestoreFromBackup", "--", "--restore_to_timestamp", restoreToTimestampArg, replica1.Alias)
if expectError != "" {
require.Errorf(t, err, "expected: %v", expectError)
require.Contains(t, output, expectError)
return
}
require.NoErrorf(t, err, "output: %v", output)
verifyTabletRestoreStats(t, replica1.VttabletProcess.GetVars())
}

func verifyTabletBackupStats(t *testing.T, vars map[string]any) {
// Currently only the builtin backup engine instruments bytes-processed
// counts.
Expand Down
207 changes: 206 additions & 1 deletion go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ import (

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/mysqlctl"
)

var (
minimalSleepDuration = time.Second + 100*time.Millisecond
gracefulPostBackupDuration = 10 * time.Millisecond
)

type PITRTestCase struct {
Expand All @@ -36,6 +42,11 @@ type PITRTestCase struct {
ComprssDetails *CompressionDetails
}

type testedBackupTimestampInfo struct {
rows int
postTimestamp time.Time
}

func waitForReplica(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expand Down Expand Up @@ -150,7 +161,7 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase)
// in 1 second resolution. We want to avoid two backups that have the same pathname. Realistically this
// is only ever a problem in this end-to-end test, not in production.
// Also, we gie the replica a chance to catch up.
time.Sleep(1100 * time.Millisecond)
time.Sleep(minimalSleepDuration)
waitForReplica(t)
recordRowsPerPosition(t)
// configure --incremental-from-pos to either:
Expand Down Expand Up @@ -219,3 +230,197 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase)
})
})
}

// ExecTestIncrementalBackupAndRestoreToPos
func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTestCase) {
defer cluster.PanicHandler(t)

var lastInsertedRowTimestamp time.Time
insertRowOnPrimary := func(t *testing.T, hint string) {
InsertRowOnPrimary(t, hint)
lastInsertedRowTimestamp = time.Now()
}

t.Run(tcase.Name, func(t *testing.T) {
// setup cluster for the testing
code, err := LaunchCluster(tcase.SetupType, "xbstream", 0, &CompressionDetails{
CompressorEngineName: "pgzip",
})
require.NoError(t, err, "setup failed with status code %d", code)
defer TearDownCluster()

InitTestTable(t)

testedBackups := []testedBackupTimestampInfo{}

var fullBackupPos mysql.Position
t.Run("full backup", func(t *testing.T) {
insertRowOnPrimary(t, "before-full-backup")
waitForReplica(t)

manifest, _ := TestReplicaFullBackup(t)
fullBackupPos = manifest.Position
require.False(t, fullBackupPos.IsZero())
//
rows := ReadRowsFromReplica(t)
testedBackups = append(testedBackups, testedBackupTimestampInfo{len(rows), time.Now()})
})

lastBackupPos := fullBackupPos
insertRowOnPrimary(t, "before-incremental-backups")

tt := []struct {
name string
writeBeforeBackup bool
fromFullPosition bool
autoPosition bool
expectError string
}{
{
name: "first incremental backup",
},
{
name: "make writes, succeed",
writeBeforeBackup: true,
},
{
name: "fail, no binary logs to backup",
expectError: "no binary logs to backup",
},
{
name: "make writes again, succeed",
writeBeforeBackup: true,
},
{
name: "auto position, succeed",
writeBeforeBackup: true,
autoPosition: true,
},
{
name: "fail auto position, no binary logs to backup",
autoPosition: true,
expectError: "no binary logs to backup",
},
{
name: "auto position, make writes again, succeed",
writeBeforeBackup: true,
autoPosition: true,
},
{
name: "from full backup position",
fromFullPosition: true,
},
}
var fromFullPositionBackups []string
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
if tc.writeBeforeBackup {
insertRowOnPrimary(t, "")
}
// we wait for 1 second because backups are written to a directory named after the current timestamp,
// in 1 second resolution. We want to avoid two backups that have the same pathname. Realistically this
// is only ever a problem in this end-to-end test, not in production.
// Also, we gie the replica a chance to catch up.
time.Sleep(minimalSleepDuration)
waitForReplica(t)
rowsBeforeBackup := ReadRowsFromReplica(t)
// configure --incremental-from-pos to either:
// - auto
// - explicit last backup pos
// - back in history to the original full backup
var incrementalFromPos mysql.Position
if !tc.autoPosition {
incrementalFromPos = lastBackupPos
if tc.fromFullPosition {
incrementalFromPos = fullBackupPos
}
}
manifest, backupName := TestReplicaIncrementalBackup(t, incrementalFromPos, tc.expectError)
if tc.expectError != "" {
return
}
// We wish to mark the current post-backup timestamp. We will later on retore to this point in time.
// However, the restore is up to and _exclusive_ of the timestamp. So for test's sake, we sleep
// an extra few milliseconds just to ensure the timestamp we read is strictly after the backup time.
// This is basicaly to avoid weird flakiness in CI.
time.Sleep(gracefulPostBackupDuration)
testedBackups = append(testedBackups, testedBackupTimestampInfo{len(rowsBeforeBackup), time.Now()})
defer func() {
lastBackupPos = manifest.Position
}()
if tc.fromFullPosition {
fromFullPositionBackups = append(fromFullPositionBackups, backupName)
}
require.False(t, manifest.FromPosition.IsZero())
require.NotEqual(t, manifest.Position, manifest.FromPosition)
require.True(t, manifest.Position.GTIDSet.Union(manifest.PurgedPosition.GTIDSet).Contains(manifest.FromPosition.GTIDSet))
{
incrDetails := manifest.IncrementalDetails
require.NotNil(t, incrDetails)
require.NotEmpty(t, incrDetails.FirstTimestamp)
require.NotEmpty(t, incrDetails.FirstTimestampBinlog)
require.NotEmpty(t, incrDetails.LastTimestamp)
require.NotEmpty(t, incrDetails.LastTimestampBinlog)
require.GreaterOrEqual(t, incrDetails.LastTimestamp, incrDetails.FirstTimestamp)

if tc.fromFullPosition {
require.Greater(t, incrDetails.LastTimestampBinlog, incrDetails.FirstTimestampBinlog)
} else {
// No binlog rotation
require.Equal(t, incrDetails.LastTimestampBinlog, incrDetails.FirstTimestampBinlog)
}
}

gtidPurgedPos, err := mysql.ParsePosition(mysql.Mysql56FlavorID, GetReplicaGtidPurged(t))
require.NoError(t, err)
fromPositionIncludingPurged := manifest.FromPosition.GTIDSet.Union(gtidPurgedPos.GTIDSet)

expectFromPosition := lastBackupPos.GTIDSet.Union(gtidPurgedPos.GTIDSet)
if !incrementalFromPos.IsZero() {
expectFromPosition = incrementalFromPos.GTIDSet.Union(gtidPurgedPos.GTIDSet)
}
require.Equalf(t, expectFromPosition, fromPositionIncludingPurged, "expected: %v, found: %v, gtid_purged: %v, manifest.Position: %v", expectFromPosition, fromPositionIncludingPurged, gtidPurgedPos, manifest.Position)
})
}

testRestores := func(t *testing.T) {
numFailedRestores := 0
numSuccessfulRestores := 0
for _, backupIndex := range rand.Perm(len(testedBackups)) {
testedBackup := testedBackups[backupIndex]
testName := fmt.Sprintf("backup num%v at %v, %v rows", backupIndex, mysqlctl.FormatRFC3339(testedBackup.postTimestamp), testedBackup.rows)
t.Run(testName, func(t *testing.T) {
expectError := ""
if testedBackup.postTimestamp.After(lastInsertedRowTimestamp) {
// The restore_to_timestamp value is beyond the last incremental
// There is no path to restore to this timestamp.
expectError = "no path found"
}
TestReplicaRestoreToTimestamp(t, testedBackup.postTimestamp, expectError)
if expectError == "" {
msgs := ReadRowsFromReplica(t)
assert.Equalf(t, testedBackup.rows, len(msgs), "messages: %v", msgs)
numSuccessfulRestores++
} else {
numFailedRestores++
}
})
}
// Integrity check for the test itself: ensure we have both successful and failed restores.
require.NotZero(t, numFailedRestores)
require.NotZero(t, numSuccessfulRestores)
}
t.Run("PITR", func(t *testing.T) {
testRestores(t)
})
t.Run("remove full position backups", func(t *testing.T) {
// Delete the fromFullPosition backup(s), which leaves us with less restore options. Try again.
for _, backupName := range fromFullPositionBackups {
RemoveBackup(t, backupName)
}
})
t.Run("PITR-2", func(t *testing.T) {
testRestores(t)
})
})
}
12 changes: 12 additions & 0 deletions go/vt/mysqlctl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,18 @@ func init() {
}
}

func FormatRFC3339(t time.Time) string {
return t.Format(time.RFC3339)
}

func ParseRFC3339(timestamp string) (time.Time, error) {
return time.Parse(time.RFC3339, timestamp)
}

func ParseBinlogTimestamp(timestamp string) (time.Time, error) {
return time.Parse("060102 15:04:05", timestamp)
}

func registerBackupFlags(fs *pflag.FlagSet) {
fs.BoolVar(&backupStorageCompress, "backup_storage_compress", backupStorageCompress, "if set, the backup files will be compressed.")
fs.IntVar(&backupCompressBlockSize, "backup_storage_block_size", backupCompressBlockSize, "if backup_storage_compress is true, backup_storage_block_size sets the byte size for each block while compressing (default is 250000).")
Expand Down
2 changes: 1 addition & 1 deletion go/vt/mysqlctl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ func createFakeBackupRestoreEnv(t *testing.T) (*fakeBackupRestoreEnv, func()) {
}

manifest := BackupManifest{
BackupTime: time.Now().Add(-1 * time.Hour).Format(time.RFC3339),
BackupTime: FormatRFC3339(time.Now().Add(-1 * time.Hour)),
BackupMethod: "fake",
Keyspace: "test",
Shard: "-",
Expand Down
Loading