Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor StartBackup to Support Custom Backup Handlers #849

Merged
merged 7 commits into from
Feb 4, 2024
46 changes: 34 additions & 12 deletions replication/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,25 @@ import (
// StartBackup: Like mysqlbinlog remote raw backup
// Backup remote binlog from position (filename, offset) and write in backupDir
func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Duration) error {
err := os.MkdirAll(backupDir, 0755)
if err != nil {
return errors.Trace(err)
}
return b.StartBackupWithHandler(p, timeout, func(filename string) (io.WriteCloser, error) {
return os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644)
})
}

// StartBackupWithHandler starts the backup process for the binary log using the specified position and handler.
// The process will continue until the timeout is reached or an error occurs.
//
// Parameters:
// - p: The starting position in the binlog from which to begin the backup.
// - timeout: The maximum duration to wait for new binlog events before stopping the backup process.
// If set to 0, a default very long timeout (30 days) is used instead.
// - handler: A function that takes a binlog filename and returns an WriteCloser for writing raw events to.
func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration,
handler func(binlogFilename string) (io.WriteCloser, error)) (retErr error) {
if timeout == 0 {
// a very long timeout here
timeout = 30 * 3600 * 24 * time.Second
Expand All @@ -22,10 +41,6 @@ func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Du
// Force use raw mode
b.parser.SetRawMode(true)

if err := os.MkdirAll(backupDir, 0755); err != nil {
return errors.Trace(err)
}

s, err := b.StartSync(p)
if err != nil {
return errors.Trace(err)
Expand All @@ -34,10 +49,14 @@ func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Du
var filename string
var offset uint32

var f *os.File
var w io.WriteCloser
defer func() {
if f != nil {
f.Close()
var closeErr error
if w != nil {
closeErr = w.Close()
}
if retErr == nil {
retErr = closeErr
}
}()

Expand Down Expand Up @@ -67,26 +86,29 @@ func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Du
} else if e.Header.EventType == FORMAT_DESCRIPTION_EVENT {
// FormateDescriptionEvent is the first event in binlog, we will close old one and create a new

if f != nil {
f.Close()
if w != nil {
if err = w.Close(); err != nil {
w = nil
return errors.Trace(err)
}
}

if len(filename) == 0 {
return errors.Errorf("empty binlog filename for FormateDescriptionEvent")
}

f, err = os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644)
w, err = handler(filename)
if err != nil {
return errors.Trace(err)
}

// write binlog header fe'bin'
if _, err = f.Write(BinLogFileHeader); err != nil {
if _, err = w.Write(BinLogFileHeader); err != nil {
return errors.Trace(err)
}
}

if n, err := f.Write(e.RawData); err != nil {
if n, err := w.Write(e.RawData); err != nil {
return errors.Trace(err)
} else if n != len(e.RawData) {
return errors.Trace(io.ErrShortWrite)
Expand Down
Loading