Skip to content

Commit

Permalink
Incremental backup: fix calculation of binlog files to use (#13066)
Browse files Browse the repository at this point in the history
* Incremental backup: fix calculation of binlog files to use

Signed-off-by: Shlomi Noach <[email protected]>

* rewording

Signed-off-by: Shlomi Noach <[email protected]>

* Update go/vt/mysqlctl/binlogs_gtid.go

Co-authored-by: Deepthi Sigireddi <[email protected]>
Signed-off-by: Shlomi Noach <[email protected]>

* clarify error message

---------

Signed-off-by: Shlomi Noach <[email protected]>
Co-authored-by: Deepthi Sigireddi <[email protected]>
  • Loading branch information
shlomi-noach and deepthi authored May 21, 2023
1 parent d7f86b3 commit 2a69ceb
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 54 deletions.
45 changes: 23 additions & 22 deletions go/vt/mysqlctl/binlogs_gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ func (p *BackupManifestPath) String() string {
// possible, or is empty.
func ChooseBinlogsForIncrementalBackup(
ctx context.Context,
lookFromGTIDSet mysql.GTIDSet,
backupFromGTIDSet mysql.GTIDSet,
purgedGTIDSet mysql.GTIDSet,
binaryLogs []string,
pgtids func(ctx context.Context, binlog string) (gtids string, err error),
unionPreviousGTIDs bool,
) (
binaryLogsToBackup []string,
incrementalBackupFromGTID string,
Expand All @@ -71,43 +71,44 @@ func ChooseBinlogsForIncrementalBackup(
if err != nil {
return nil, "", "", vterrors.Wrapf(err, "cannot get previous gtids for binlog %v", binlog)
}
prevPos, err := mysql.ParsePosition(mysql.Mysql56FlavorID, previousGtids)
previousGTIDsPos, err := mysql.ParsePosition(mysql.Mysql56FlavorID, previousGtids)
if err != nil {
return nil, "", "", vterrors.Wrapf(err, "cannot decode binlog %s position in incremental backup: %v", binlog, prevPos)
return nil, "", "", vterrors.Wrapf(err, "cannot decode binlog %s position in incremental backup: %v", binlog, previousGTIDsPos)
}
if prevGTIDsUnion == nil {
prevGTIDsUnion = prevPos.GTIDSet
prevGTIDsUnion = previousGTIDsPos.GTIDSet
} else {
prevGTIDsUnion = prevGTIDsUnion.Union(prevPos.GTIDSet)
prevGTIDsUnion = prevGTIDsUnion.Union(previousGTIDsPos.GTIDSet)
}

containedInFromPos := lookFromGTIDSet.Contains(prevPos.GTIDSet)
// The binary logs are read in-order. They are build one on top of the other: we know
// the PreviousGTIDs of once binary log fully cover the previous binary log's.
if containedInFromPos {
// All previous binary logs are fully contained by backupPos. Carry on
continue
}
// We look for the first binary log whose "PreviousGTIDs" isn't already fully covered
// by "backupPos" (the position from which we want to create the inreemental backup).
// The binary logs are read in-order. They expand. For example, we know
// Previous-GTIDs of binlog file 0000018 contain those of binlog file 0000017.
// We look for the first binary log whose Previous-GTIDs isn't already fully covered
// by "backupPos" (the position from which we want to create the incremental backup).
// That means the *previous* binary log is the first binary log to introduce GTID events on top
// of "backupPos"
if i == 0 {
return nil, "", "", vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "the very first binlog file %v has PreviousGTIDs %s that exceed given incremental backup pos. There are GTID entries that are missing and this backup cannot run", binlog, prevPos)
if backupFromGTIDSet.Contains(previousGTIDsPos.GTIDSet) {
// Previous-GTIDs is contained by backupPos. So definitely all binlogs _prior_ to
// this binlog are not necessary. We still don't know about _this_ binlog. We can't tell yet if
// _this_ binlog introduces new GTID entries not covered by the last backup pos. But we will only
// know this when we look into the _next_ binlog file's Previous-GTIDs.
continue
}
if unionPreviousGTIDs {
prevPos.GTIDSet = prevGTIDsUnion
if i == 0 {
return nil, "", "", vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "Required entries have been purged. Oldest binary log %v expects entries not found in backup pos. Expected pos=%v", binlog, previousGTIDsPos)
}
if !prevPos.GTIDSet.Contains(lookFromGTIDSet) {
return nil, "", "", vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "binary log %v with previous GTIDS %s neither contains requested GTID %s nor contains it. Backup cannot take place", binlog, prevPos.GTIDSet, lookFromGTIDSet)
if !prevGTIDsUnion.Union(purgedGTIDSet).Contains(backupFromGTIDSet) {
return nil, "", "", vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION,
"Mismatching GTID entries. Requested backup pos has entries not found in the binary logs, and binary logs have entries not found in the requested backup pos. Neither fully contains the other. Requested pos=%v, binlog pos=%v",
backupFromGTIDSet, previousGTIDsPos.GTIDSet)
}
// We begin with the previous binary log, and we ignore the last binary log, because it's still open and being written to.
binaryLogsToBackup = binaryLogs[i-1 : len(binaryLogs)-1]
incrementalBackupFromGTID, err := pgtids(ctx, binaryLogsToBackup[0])
if err != nil {
return nil, "", "", vterrors.Wrapf(err, "cannot evaluate incremental backup from pos")
}
// The "previous GTIDs" of the binary logs that _follows_ our binary-logs-to-backup indicates
// The Previous-GTIDs of the binary logs that _follows_ our binary-logs-to-backup indicates
// the backup's position.
incrementalBackupToGTID, err := pgtids(ctx, binaryLogs[len(binaryLogs)-1])
if err != nil {
Expand Down
90 changes: 83 additions & 7 deletions go/vt/mysqlctl/binlogs_gtid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,47 +45,57 @@ func TestChooseBinlogsForIncrementalBackup(t *testing.T) {
"vt-bin.000006": "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-331",
}
tt := []struct {
name string
previousGTIDs map[string]string
backupPos string
gtidPurged string
expectBinlogs []string
expectError string
}{
{
name: "exact match",
previousGTIDs: basePreviousGTIDs,
backupPos: "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-78",
expectBinlogs: []string{"vt-bin.000004", "vt-bin.000005"},
},
{
name: "exact match, two binlogs with same previous GTIDs",
previousGTIDs: basePreviousGTIDs,
backupPos: "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-60",
expectBinlogs: []string{"vt-bin.000003", "vt-bin.000004", "vt-bin.000005"},
},
{
name: "inexact match",
previousGTIDs: basePreviousGTIDs,
backupPos: "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-63",
expectBinlogs: []string{"vt-bin.000003", "vt-bin.000004", "vt-bin.000005"},
},
{
name: "one binlog match",
previousGTIDs: basePreviousGTIDs,
backupPos: "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-243",
expectBinlogs: []string{"vt-bin.000005"},
},
{
name: "last binlog excluded, no binlogs found",
previousGTIDs: basePreviousGTIDs,
backupPos: "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-331",
expectError: "no binary logs to backup",
},
{
name: "backup pos beyond all binlogs",
previousGTIDs: basePreviousGTIDs,
backupPos: "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-630000",
expectError: "no binary logs to backup",
},
{
name: "missing GTID entries",
previousGTIDs: basePreviousGTIDs,
backupPos: "16b1039f-22b6-11ed-b765-0a43f95f0000:1-63",
expectError: "There are GTID entries that are missing",
backupPos: "16b1039f-0000-0000-0000-000000000000:1-63",
expectError: "Required entries have been purged",
},
{
name: "empty previous GTIDs in first binlog",
previousGTIDs: map[string]string{
"vt-bin.000001": "",
"vt-bin.000002": "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-60",
Expand All @@ -94,10 +104,11 @@ func TestChooseBinlogsForIncrementalBackup(t *testing.T) {
"vt-bin.000005": "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-243",
"vt-bin.000006": "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-331",
},
backupPos: "16b1039f-22b6-11ed-b765-0a43f95f0000:1-63",
expectError: "neither contains requested GTID",
backupPos: "16b1039f-0000-0000-0000-000000000000:1-63",
expectError: "Mismatching GTID entries",
},
{
name: "match with non strictly monotonic sequence",
previousGTIDs: map[string]string{
"vt-bin.000001": "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-50",
"vt-bin.000002": "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-60",
Expand All @@ -109,15 +120,81 @@ func TestChooseBinlogsForIncrementalBackup(t *testing.T) {
backupPos: "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-63",
expectBinlogs: []string{"vt-bin.000003", "vt-bin.000004", "vt-bin.000005"},
},
{
name: "exact, gitd_purged",
previousGTIDs: map[string]string{
"vt-bin.000001": "16b1039f-22b6-11ed-b765-0a43f95f28a3:3-78",
"vt-bin.000002": "16b1039f-22b6-11ed-b765-0a43f95f28a3:3-90",
"vt-bin.000003": "16b1039f-22b6-11ed-b765-0a43f95f28a3:3-90",
"vt-bin.000004": "16b1039f-22b6-11ed-b765-0a43f95f28a3:3-100",
"vt-bin.000005": "16b1039f-22b6-11ed-b765-0a43f95f28a3:3-110",
"vt-bin.000006": "16b1039f-22b6-11ed-b765-0a43f95f28a3:2-300",
},
backupPos: "16b1039f-22b6-11ed-b765-0a43f95f28a3:3-78",
gtidPurged: "16b1039f-22b6-11ed-b765-0a43f95f28a3:3-78",
expectBinlogs: []string{"vt-bin.000001", "vt-bin.000002", "vt-bin.000003", "vt-bin.000004", "vt-bin.000005"},
},
{
name: "exact, gitd_purged 2",
previousGTIDs: basePreviousGTIDs,
backupPos: "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-50",
gtidPurged: "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-50",
expectBinlogs: []string{"vt-bin.000001", "vt-bin.000002", "vt-bin.000003", "vt-bin.000004", "vt-bin.000005"},
},
{
name: "inexact, gitd_purged, missing",
previousGTIDs: map[string]string{
"vt-bin.000001": "16b1039f-22b6-11ed-b765-0a43f95f28a3:3-78",
"vt-bin.000002": "16b1039f-22b6-11ed-b765-0a43f95f28a3:3-90",
"vt-bin.000003": "16b1039f-22b6-11ed-b765-0a43f95f28a3:3-90",
"vt-bin.000004": "16b1039f-22b6-11ed-b765-0a43f95f28a3:3-100",
"vt-bin.000005": "16b1039f-22b6-11ed-b765-0a43f95f28a3:3-110",
"vt-bin.000006": "16b1039f-22b6-11ed-b765-0a43f95f28a3:2-300",
},
backupPos: "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-63",
gtidPurged: "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-2",
expectError: "Required entries have been purged",
},
{
name: "inexact, gitd_purged, missing 2",
previousGTIDs: map[string]string{
"vt-bin.000001": "16b1039f-22b6-11ed-b765-0a43f95f28a3:3-78",
"vt-bin.000002": "16b1039f-22b6-11ed-b765-0a43f95f28a3:3-90",
"vt-bin.000003": "16b1039f-22b6-11ed-b765-0a43f95f28a3:3-90",
"vt-bin.000004": "16b1039f-22b6-11ed-b765-0a43f95f28a3:3-100",
"vt-bin.000005": "16b1039f-22b6-11ed-b765-0a43f95f28a3:3-110",
"vt-bin.000006": "16b1039f-22b6-11ed-b765-0a43f95f28a3:2-300",
},
backupPos: "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-80",
gtidPurged: "16b1039f-22b6-11ed-b765-0a43f95f28a3:1",
expectBinlogs: []string{"vt-bin.000001", "vt-bin.000002", "vt-bin.000003", "vt-bin.000004", "vt-bin.000005"},
expectError: "Mismatching GTID entries",
},
{
name: "inexact, gitd_purged, found",
previousGTIDs: map[string]string{
"vt-bin.000001": "16b1039f-22b6-11ed-b765-0a43f95f28a3:3-78",
"vt-bin.000002": "16b1039f-22b6-11ed-b765-0a43f95f28a3:3-90",
"vt-bin.000003": "16b1039f-22b6-11ed-b765-0a43f95f28a3:3-90",
"vt-bin.000004": "16b1039f-22b6-11ed-b765-0a43f95f28a3:3-100",
"vt-bin.000005": "16b1039f-22b6-11ed-b765-0a43f95f28a3:3-110",
"vt-bin.000006": "16b1039f-22b6-11ed-b765-0a43f95f28a3:2-300",
},
backupPos: "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-84",
gtidPurged: "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-2",
expectBinlogs: []string{"vt-bin.000001", "vt-bin.000002", "vt-bin.000003", "vt-bin.000004", "vt-bin.000005"},
},
}
for i, tc := range tt {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
backupPos, err := mysql.ParsePosition(mysql.Mysql56FlavorID, tc.backupPos)
require.NoError(t, err)
gtidPurged, err := mysql.ParsePosition(mysql.Mysql56FlavorID, tc.gtidPurged)
require.NoError(t, err)
binlogsToBackup, fromGTID, toGTID, err := ChooseBinlogsForIncrementalBackup(
context.Background(),
backupPos.GTIDSet,
gtidPurged.GTIDSet,
binlogs,
func(ctx context.Context, binlog string) (gtids string, err error) {
gtids, ok := tc.previousGTIDs[binlog]
Expand All @@ -126,7 +203,6 @@ func TestChooseBinlogsForIncrementalBackup(t *testing.T) {
}
return gtids, nil
},
true,
)
if tc.expectError != "" {
require.Error(t, err)
Expand Down
68 changes: 43 additions & 25 deletions go/vt/mysqlctl/builtinbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,29 @@ func (be *BuiltinBackupEngine) ExecuteBackup(ctx context.Context, params BackupP
// - A valid position
// - "auto", indicating the incremental backup should begin with last successful backup end position.
func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle) (bool, error) {
// Collect MySQL status:
// UUID
serverUUID, err := params.Mysqld.GetServerUUID(ctx)
if err != nil {
return false, vterrors.Wrap(err, "can't get server uuid")
}
// @@gtid_purged
getPurgedGTIDSet := func() (mysql.Mysql56GTIDSet, error) {
gtidPurged, err := params.Mysqld.GetGTIDPurged(ctx)
if err != nil {
return nil, vterrors.Wrap(err, "can't get @@gtid_purged")
}
purgedGTIDSet, ok := gtidPurged.GTIDSet.(mysql.Mysql56GTIDSet)
if !ok {
return nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "cannot get MySQL GTID purged value: %v", gtidPurged)
}
return purgedGTIDSet, nil
}
purgedGTIDSet, err := getPurgedGTIDSet()
if err != nil {
return false, err
}

if params.IncrementalFromPos == autoIncrementalFromPos {
params.Logger.Infof("auto evaluating incremental_from_pos")
bs, err := backupstorage.GetBackupStorage()
Expand All @@ -226,38 +249,33 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par
params.Logger.Infof("auto evaluated incremental_from_pos: %s", params.IncrementalFromPos)
}

rp, err := mysql.DecodePosition(params.IncrementalFromPos)
if err != nil {
return false, vterrors.Wrapf(err, "cannot decode position in incremental backup: %v", params.IncrementalFromPos)
}
if !rp.MatchesFlavor(mysql.Mysql56FlavorID) {
// incrementalFromGtidSet, ok := rp.GTIDSet.(mysql.Mysql56GTIDSet)
// if !ok {
return false, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "incremental backup only supports MySQL GTID positions. Got: %v", params.IncrementalFromPos)
}
serverUUID, err := params.Mysqld.GetServerUUID(ctx)
if err != nil {
return false, vterrors.Wrap(err, "can't get server uuid")
// params.IncrementalFromPos is a string. We want to turn that into a MySQL GTID
getIncrementalFromPosGTIDSet := func() (mysql.Mysql56GTIDSet, error) {
pos, err := mysql.DecodePosition(params.IncrementalFromPos)
if err != nil {
return nil, vterrors.Wrapf(err, "cannot decode position in incremental backup: %v", params.IncrementalFromPos)
}
if !pos.MatchesFlavor(mysql.Mysql56FlavorID) {
return nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "incremental backup only supports MySQL GTID positions. Got: %v", params.IncrementalFromPos)
}
ifPosGTIDSet, ok := pos.GTIDSet.(mysql.Mysql56GTIDSet)
if !ok {
return nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "cannot get MySQL GTID value: %v", pos)
}
return ifPosGTIDSet, nil
}
gtidPurged, err := params.Mysqld.GetGTIDPurged(ctx)
backupFromGTIDSet, err := getIncrementalFromPosGTIDSet()
if err != nil {
return false, vterrors.Wrap(err, "can't get gtid_purged")
}
rpGTID, ok := rp.GTIDSet.(mysql.Mysql56GTIDSet)
if !ok {
return false, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "cannot get MySQL GTID value: %v", rpGTID)
}
purgedGTID, ok := gtidPurged.GTIDSet.(mysql.Mysql56GTIDSet)
if !ok {
return false, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "cannot get MySQL GTID purged value: %v", rpGTID)
return false, err
}
// OK, we now have the formal MySQL GTID from which we want to take the incremental backip.

// binlogs may not contain information about purged GTIDs. e.g. some binlog.000003 may have
// previous GTIDs like 00021324-1111-1111-1111-111111111111:30-60, ie 1-29 range is missing. This can happen
// when a server is restored from backup and set with gtid_purged != "".
// This is fine!
// Shortly we will compare a binlog's "Previous GTIDs" with the backup's position. For the purpose of comparison, we
// ignore the purged GTIDs:
binlogCompareGTID := rpGTID.Difference(purgedGTID)

if err := params.Mysqld.FlushBinaryLogs(ctx); err != nil {
return false, vterrors.Wrapf(err, "cannot flush binary logs in incremental backup")
Expand All @@ -267,7 +285,7 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par
return false, vterrors.Wrapf(err, "cannot get binary logs in incremental backup")
}
previousGTIDs := map[string]string{}
getPreviousGTIDs := func(ctx context.Context, binlog string) (gtids string, err error) {
getBinlogPreviousGTIDs := func(ctx context.Context, binlog string) (gtids string, err error) {
gtids, ok := previousGTIDs[binlog]
if ok {
// Found a cached entry! No need to query again
Expand All @@ -280,7 +298,7 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par
previousGTIDs[binlog] = gtids
return gtids, nil
}
binaryLogsToBackup, incrementalBackupFromGTID, incrementalBackupToGTID, err := ChooseBinlogsForIncrementalBackup(ctx, binlogCompareGTID, binaryLogs, getPreviousGTIDs, true)
binaryLogsToBackup, incrementalBackupFromGTID, incrementalBackupToGTID, err := ChooseBinlogsForIncrementalBackup(ctx, backupFromGTIDSet, purgedGTIDSet, binaryLogs, getBinlogPreviousGTIDs)
if err != nil {
return false, vterrors.Wrapf(err, "cannot get binary logs to backup in incremental backup")
}
Expand Down

0 comments on commit 2a69ceb

Please sign in to comment.