diff --git a/go/cmd/dolt/commands/engine/sqlengine.go b/go/cmd/dolt/commands/engine/sqlengine.go index 395db2b949d..18c9f5ff01a 100644 --- a/go/cmd/dolt/commands/engine/sqlengine.go +++ b/go/cmd/dolt/commands/engine/sqlengine.go @@ -35,7 +35,6 @@ import ( "github.com/dolthub/dolt/go/cmd/dolt/cli" "github.com/dolthub/dolt/go/libraries/doltcore/branch_control" "github.com/dolthub/dolt/go/libraries/doltcore/dconfig" - "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/env" "github.com/dolthub/dolt/go/libraries/doltcore/servercfg" dsqle "github.com/dolthub/dolt/go/libraries/doltcore/sqle" @@ -154,8 +153,6 @@ func NewSqlEngine( if err := configureBinlogPrimaryController(engine); err != nil { return nil, err } - pro.AddInitDatabaseHook(dblr.NewBinlogInitDatabaseHook(ctx, doltdb.DatabaseUpdateListeners)) - pro.AddDropDatabaseHook(dblr.NewBinlogDropDatabaseHook(ctx, doltdb.DatabaseUpdateListeners)) config.ClusterController.SetIsStandbyCallback(func(isStandby bool) { pro.SetIsStandby(isStandby) @@ -351,38 +348,6 @@ func configureBinlogReplicaController(config *SqlEngineConfig, engine *gms.Engin func configureBinlogPrimaryController(engine *gms.Engine) error { primaryController := dblr.NewDoltBinlogPrimaryController() engine.Analyzer.Catalog.BinlogPrimaryController = primaryController - - _, logBinValue, ok := sql.SystemVariables.GetGlobal("log_bin") - if !ok { - return fmt.Errorf("unable to load @@log_bin system variable") - } - logBin, ok := logBinValue.(int8) - if !ok { - return fmt.Errorf("unexpected type for @@log_bin system variable: %T", logBinValue) - } - if logBin == 1 { - logrus.Debug("Enabling binary logging") - binlogProducer, err := dblr.NewBinlogProducer(primaryController.StreamerManager()) - if err != nil { - return err - } - doltdb.RegisterDatabaseUpdateListener(binlogProducer) - primaryController.BinlogProducer = binlogProducer - } - - _, logBinBranchValue, ok := sql.SystemVariables.GetGlobal("log_bin_branch") - if !ok { - return fmt.Errorf("unable to load @@log_bin_branch system variable") - } - logBinBranch, ok := logBinBranchValue.(string) - if !ok { - return fmt.Errorf("unexpected type for @@log_bin_branch system variable: %T", logBinBranchValue) - } - if logBinBranch != "" { - logrus.Debugf("Setting binary logging branch to %s", logBinBranch) - dblr.BinlogBranch = logBinBranch - } - return nil } diff --git a/go/cmd/dolt/commands/sqlserver/server.go b/go/cmd/dolt/commands/sqlserver/server.go index e2c93401bee..fc0f723ae6c 100644 --- a/go/cmd/dolt/commands/sqlserver/server.go +++ b/go/cmd/dolt/commands/sqlserver/server.go @@ -305,6 +305,73 @@ func ConfigureServices( } controller.Register(PersistNondeterministicSystemVarDefaults) + InitBinlogging := &svcs.AnonService{ + InitF: func(context.Context) error { + primaryController := sqlEngine.GetUnderlyingEngine().Analyzer.Catalog.BinlogPrimaryController + doltBinlogPrimaryController, ok := primaryController.(*binlogreplication.DoltBinlogPrimaryController) + if !ok { + return fmt.Errorf("unexpected type of binlog controller: %T", primaryController) + } + + _, logBinValue, ok := sql.SystemVariables.GetGlobal("log_bin") + if !ok { + return fmt.Errorf("unable to load @@log_bin system variable") + } + logBin, ok := logBinValue.(int8) + if !ok { + return fmt.Errorf("unexpected type for @@log_bin system variable: %T", logBinValue) + } + + _, logBinBranchValue, ok := sql.SystemVariables.GetGlobal("log_bin_branch") + if !ok { + return fmt.Errorf("unable to load @@log_bin_branch system variable") + } + logBinBranch, ok := logBinBranchValue.(string) + if !ok { + return fmt.Errorf("unexpected type for @@log_bin_branch system variable: %T", logBinBranchValue) + } + if logBinBranch != "" { + // If an invalid branch has been configured, let the server start up so that it's + // easier for customers to correct the value, but log a warning and don't enable + // binlog replication. + if strings.Contains(logBinBranch, "/") { + logrus.Warnf("branch names containing '/' are not supported "+ + "for binlog replication. Not enabling binlog replication; fix "+ + "@@log_bin_branch value and restart Dolt (current value: %s)", logBinBranch) + return nil + } + + binlogreplication.BinlogBranch = logBinBranch + } + + if logBin == 1 { + logrus.Infof("Enabling binary logging for branch %s", logBinBranch) + binlogProducer, err := binlogreplication.NewBinlogProducer(dEnv.FS) + if err != nil { + return err + } + + logManager, err := binlogreplication.NewLogManager(fs) + if err != nil { + return err + } + binlogProducer.LogManager(logManager) + doltdb.RegisterDatabaseUpdateListener(binlogProducer) + doltBinlogPrimaryController.BinlogProducer(binlogProducer) + + // Register binlog hooks for database creation/deletion + provider := sqlEngine.GetUnderlyingEngine().Analyzer.Catalog.DbProvider + if doltProvider, ok := provider.(*sqle.DoltDatabaseProvider); ok { + doltProvider.AddInitDatabaseHook(binlogreplication.NewBinlogInitDatabaseHook(nil, doltdb.DatabaseUpdateListeners)) + doltProvider.AddDropDatabaseHook(binlogreplication.NewBinlogDropDatabaseHook(nil, doltdb.DatabaseUpdateListeners)) + } + } + + return nil + }, + } + controller.Register(InitBinlogging) + // Add superuser if specified user exists; add root superuser if no user specified and no existing privileges InitSuperUser := &svcs.AnonService{ InitF: func(context.Context) error { diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_file_utils.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_file_utils.go new file mode 100644 index 00000000000..84f904ef575 --- /dev/null +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_file_utils.go @@ -0,0 +1,132 @@ +// Copyright 2024 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package binlogreplication + +import ( + "encoding/binary" + "fmt" + "os" + "strconv" + "strings" + + "github.com/dolthub/vitess/go/mysql" +) + +// binlogFileMagicNumber holds the four bytes that start off every +// MySQL binlog file and identify the file as a MySQL binlog. +var binlogFileMagicNumber = []byte{0xfe, 0x62, 0x69, 0x6e} + +// fileExists returns true if the specified |filepath| exists on disk and is not a directory, +// otherwise returns false. |filepath| is a fully specified path to a file on disk. +func fileExists(filepath string) bool { + info, err := os.Stat(filepath) + if os.IsNotExist(err) { + return false + } + return !info.IsDir() +} + +// openBinlogFileForReading opens the specified |logfile| for reading and reads the first four bytes to make sure they +// are the expected binlog file magic numbers. If any problems are encountered opening the file or reading the first +// four bytes, an error is returned. +func openBinlogFileForReading(logfile string) (*os.File, error) { + file, err := os.Open(logfile) + if err != nil { + return nil, err + } + buffer := make([]byte, len(binlogFileMagicNumber)) + bytesRead, err := file.Read(buffer) + if err != nil { + return nil, err + } + if bytesRead != len(binlogFileMagicNumber) || string(buffer) != string(binlogFileMagicNumber) { + return nil, fmt.Errorf("invalid magic number in binlog file!") + } + + return file, nil +} + +// readBinlogEventFromFile reads the next binlog event from the specified, open |file| and +// returns it. If no more events are available in the file, then io.EOF is returned. +func readBinlogEventFromFile(file *os.File) (mysql.BinlogEvent, error) { + headerBuffer := make([]byte, 4+1+4+4+4+2) + _, err := file.Read(headerBuffer) + if err != nil { + return nil, err + } + + // Event Header: + //timestamp := headerBuffer[0:4] + //eventType := headerBuffer[4] + //serverId := binary.LittleEndian.Uint32(headerBuffer[5:5+4]) + eventSize := binary.LittleEndian.Uint32(headerBuffer[9 : 9+4]) + + payloadBuffer := make([]byte, eventSize-uint32(len(headerBuffer))) + _, err = file.Read(payloadBuffer) + if err != nil { + return nil, err + } + + return mysql.NewMysql56BinlogEvent(append(headerBuffer, payloadBuffer...)), nil +} + +// readFirstGtidEventFromFile reads events from |file| until a GTID event is found, then +// returns that GTID event. If |file| has been read completely and no GTID events were found, +// then io.EOF is returned. +func readFirstGtidEventFromFile(file *os.File) (mysql.BinlogEvent, error) { + for { + binlogEvent, err := readBinlogEventFromFile(file) + if err != nil { + return nil, err + } + + if binlogEvent.IsGTID() { + return binlogEvent, nil + } + } +} + +// formatBinlogFilename formats a binlog filename using the specified |branch| and |sequence| number. The +// returned filename will be of the form "binlog-main.000001". +func formatBinlogFilename(branch string, sequence int) string { + return fmt.Sprintf("binlog-%s.%06d", branch, sequence) +} + +// parseBinlogFilename parses a binlog filename, of the form "binlog-main.000001", into its branch and +// sequence number. +func parseBinlogFilename(filename string) (branch string, sequence int, err error) { + if !strings.HasPrefix(filename, "binlog-") { + return "", 0, fmt.Errorf("invalid binlog filename: %s; must start with 'binlog-'", filename) + } + + filename = strings.TrimPrefix(filename, "binlog-") + + splits := strings.Split(filename, ".") + if len(splits) != 2 { + return "", 0, fmt.Errorf( + "unable to parse binlog filename: %s; expected format 'binlog-branch.sequence'", filename) + } + + branch = splits[0] + sequenceString := splits[1] + + sequence, err = strconv.Atoi(sequenceString) + if err != nil { + return "", 0, fmt.Errorf( + "unable to parse binlog sequence number: %s; %s", sequenceString, err.Error()) + } + + return branch, sequence, nil +} diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_position_store.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_position_store.go index 36b3b16bdbc..9f856251e15 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_position_store.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_position_store.go @@ -25,6 +25,7 @@ import ( "github.com/dolthub/vitess/go/mysql" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" + "github.com/dolthub/dolt/go/libraries/utils/filesys" ) const binlogPositionDirectory = ".doltcfg" @@ -38,19 +39,16 @@ type binlogPositionStore struct { mu sync.Mutex } -// Load loads a mysql.Position instance from the .doltcfg/binlog-position file at the root of the provider's filesystem. +// Load loads a mysql.Position instance from the .doltcfg/binlog-position file at the root of the specified |filesystem|. // This file MUST be stored at the root of the provider's filesystem, and NOT inside a nested database's .doltcfg directory, // since the binlog position contains events that cover all databases in a SQL server. The returned mysql.Position // represents the set of GTIDs that have been successfully executed and applied on this replica. Currently only the // default binlog channel ("") is supported. If no .doltcfg/binlog-position file is stored, this method returns a nil // mysql.Position and a nil error. If any errors are encountered, a nil mysql.Position and an error are returned. -func (store *binlogPositionStore) Load(ctx *sql.Context) (*mysql.Position, error) { +func (store *binlogPositionStore) Load(filesys filesys.Filesys) (*mysql.Position, error) { store.mu.Lock() defer store.mu.Unlock() - doltSession := dsess.DSessFromSess(ctx.Session) - filesys := doltSession.Provider().FileSystem() - doltDirExists, _ := filesys.Exists(binlogPositionDirectory) if !doltDirExists { return nil, nil diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_primary_controller.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_primary_controller.go index 64f5da9614b..154f88c48df 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_primary_controller.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_primary_controller.go @@ -16,33 +16,36 @@ package binlogreplication import ( "fmt" + "os" + "path/filepath" "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/go-mysql-server/sql/binlogreplication" + gmstypes "github.com/dolthub/go-mysql-server/sql/types" "github.com/dolthub/vitess/go/mysql" "github.com/sirupsen/logrus" ) -// doltBinlogPrimaryController implements the binlogreplication.BinlogPrimaryController +// DoltBinlogPrimaryController implements the binlogreplication.BinlogPrimaryController // interface from GMS and is the main extension point where Dolt plugs in to GMS and // interprets commands and statements related to serving binlog events. -type doltBinlogPrimaryController struct { +type DoltBinlogPrimaryController struct { streamerManager *binlogStreamerManager - BinlogProducer *binlogProducer + binlogProducer *binlogProducer } -var _ binlogreplication.BinlogPrimaryController = (*doltBinlogPrimaryController)(nil) +var _ binlogreplication.BinlogPrimaryController = (*DoltBinlogPrimaryController)(nil) -// NewDoltBinlogPrimaryController creates a new doltBinlogPrimaryController instance. -func NewDoltBinlogPrimaryController() *doltBinlogPrimaryController { - controller := doltBinlogPrimaryController{ +// NewDoltBinlogPrimaryController creates a new DoltBinlogPrimaryController instance. +func NewDoltBinlogPrimaryController() *DoltBinlogPrimaryController { + return &DoltBinlogPrimaryController{ streamerManager: newBinlogStreamerManager(), } - return &controller } -func (d *doltBinlogPrimaryController) StreamerManager() *binlogStreamerManager { - return d.streamerManager +func (d *DoltBinlogPrimaryController) BinlogProducer(binlogProducer *binlogProducer) { + d.binlogProducer = binlogProducer + d.streamerManager.logManager = binlogProducer.logManager } // RegisterReplica implements the BinlogPrimaryController interface. @@ -52,21 +55,90 @@ func (d *doltBinlogPrimaryController) StreamerManager() *binlogStreamerManager { // to implement the ListReplicas method below. For now, this method is still useful to throw errors back to the // replica if bin logging isn't enabled, since errors returned from the BinlogDumpGtid method seem to be dropped // by the replica, instead of being displayed as an error. -func (d *doltBinlogPrimaryController) RegisterReplica(ctx *sql.Context, c *mysql.Conn, replicaHost string, replicaPort uint16) error { - if d.BinlogProducer == nil { +func (d *DoltBinlogPrimaryController) RegisterReplica(ctx *sql.Context, c *mysql.Conn, replicaHost string, replicaPort uint16) error { + if d.binlogProducer == nil { return fmt.Errorf("no binlog currently being recorded; make sure the server is started with @@log_bin enabled") } return nil } +// validateReplicationConfiguration checks that this server is properly configured to replicate databases, meaning +// that @@log_bin is enabled, @@gtid_mode is enabled, @@enforce_gtid_consistency is enabled, and the binlog producer +// has been instantiated. If any of this configuration is not valid, then an error is returned. +func (d *DoltBinlogPrimaryController) validateReplicationConfiguration() *mysql.SQLError { + if d.binlogProducer == nil { + return mysql.NewSQLError(mysql.ERMasterFatalReadingBinlog, "HY000", + "no binlog currently being recorded; make sure the server is started with @@log_bin enabled") + } + + _, logBinValue, ok := sql.SystemVariables.GetGlobal("log_bin") + if !ok { + return mysql.NewSQLError(mysql.ERUnknownError, "HY000", "unable to find system variable @@log_bin") + } + logBin, _, err := gmstypes.Boolean.Convert(logBinValue) + if err != nil { + return mysql.NewSQLError(mysql.ERUnknownError, "HY000", err.Error()) + } + if logBin.(int8) != 1 { + return mysql.NewSQLError(mysql.ERMasterFatalReadingBinlog, "HY000", + "no binlog currently being recorded; make sure the server is started with @@log_bin enabled") + } + + _, gtidModeValue, ok := sql.SystemVariables.GetGlobal("gtid_mode") + if !ok { + return mysql.NewSQLError(mysql.ERUnknownError, "HY000", "unable to find system variable @@log_bin") + } + gtidMode, ok := gtidModeValue.(string) + if !ok { + return mysql.NewSQLError(mysql.ERUnknownError, "HY000", "unexpected type for @@gtid_mode: %T", gtidModeValue) + } + if gtidMode != "ON" { + return mysql.NewSQLError(mysql.ERMasterFatalReadingBinlog, "HY000", + "@@gtid_mode must be enabled for binlog replication") + } + + _, enforceGtidConsistencyValue, ok := sql.SystemVariables.GetGlobal("enforce_gtid_consistency") + if !ok { + return mysql.NewSQLError(mysql.ERUnknownError, "HY000", "unable to find system variable @@log_bin") + } + enforceGtidConsistency, ok := enforceGtidConsistencyValue.(string) + if !ok { + return mysql.NewSQLError(mysql.ERUnknownError, "HY000", + "unexpected type for @@enforce_gtid_consistency: %T", enforceGtidConsistencyValue) + } + if enforceGtidConsistency != "ON" { + return mysql.NewSQLError(mysql.ERMasterFatalReadingBinlog, "HY000", + "@@enforce_gtid_consistency must be enabled for binlog replication") + } + + return nil +} + // BinlogDumpGtid implements the BinlogPrimaryController interface. -func (d *doltBinlogPrimaryController) BinlogDumpGtid(ctx *sql.Context, conn *mysql.Conn, gtidSet mysql.GTIDSet) error { - if d.BinlogProducer == nil { - return fmt.Errorf("no binlog currently being recorded; make sure the server is started with @@log_bin enabled") +func (d *DoltBinlogPrimaryController) BinlogDumpGtid(ctx *sql.Context, conn *mysql.Conn, replicaExecutedGtids mysql.GTIDSet) error { + if err := d.validateReplicationConfiguration(); err != nil { + return err + } + + if replicaExecutedGtids == nil { + replicaExecutedGtids = mysql.Mysql56GTIDSet{} + } + + primaryExecutedGtids := d.binlogProducer.gtidPosition.GTIDSet + missingGtids := d.binlogProducer.logManager.calculateMissingGtids(replicaExecutedGtids, primaryExecutedGtids) + if !missingGtids.Equal(mysql.Mysql56GTIDSet{}) { + // We must send back error code 1236 (ER_MASTER_FATAL_ERROR_READING_BINLOG) to the replica to signal an error, + // otherwise the replica won't expose the error in replica status and will just keep trying to reconnect and + // only log the error to MySQL's error log. + return mysql.NewSQLError(mysql.ERMasterFatalReadingBinlog, "HY000", + "Cannot replicate because the source purged required binary logs. Replicate the missing transactions "+ + "from elsewhere, or provision a new replica from backup. Consider increasing the source's binary log "+ + "expiration period. The GTID set sent by the replica is '%s', and the missing transactions are '%s'.", + replicaExecutedGtids.String(), missingGtids.String()) } - err := d.streamerManager.StartStream(ctx, conn, d.BinlogProducer.binlogFormat, d.BinlogProducer.binlogEventMeta) + err := d.streamerManager.StartStream(ctx, conn, replicaExecutedGtids, d.binlogProducer.binlogFormat, d.binlogProducer.binlogEventMeta) if err != nil { logrus.Warnf("exiting binlog streamer due to error: %s", err.Error()) } else { @@ -77,21 +149,45 @@ func (d *doltBinlogPrimaryController) BinlogDumpGtid(ctx *sql.Context, conn *mys } // ListReplicas implements the BinlogPrimaryController interface. -func (d *doltBinlogPrimaryController) ListReplicas(ctx *sql.Context) error { +func (d *DoltBinlogPrimaryController) ListReplicas(ctx *sql.Context) error { return fmt.Errorf("ListReplicas not implemented in Dolt yet") } // ListBinaryLogs implements the BinlogPrimaryController interface. -func (d *doltBinlogPrimaryController) ListBinaryLogs(_ *sql.Context) ([]binlogreplication.BinaryLogFileMetadata, error) { - // TODO: No log file support yet, so just return an empty list - return nil, nil +func (d *DoltBinlogPrimaryController) ListBinaryLogs(_ *sql.Context) ([]binlogreplication.BinaryLogFileMetadata, error) { + if d.binlogProducer == nil || d.binlogProducer.logManager == nil { + return nil, nil + } + logManager := d.binlogProducer.logManager + + logFiles, err := logManager.logFilesOnDiskForBranch(BinlogBranch) + if err != nil { + return nil, err + } + + logFileMetadata := make([]binlogreplication.BinaryLogFileMetadata, len(logFiles)) + for i, logFile := range logFiles { + fileStats, err := os.Stat(filepath.Join(logManager.binlogDirectory, logFile)) + if err != nil { + return nil, err + } + logFileMetadata[i] = binlogreplication.BinaryLogFileMetadata{ + Name: logFile, + Size: uint64(fileStats.Size()), + } + } + return logFileMetadata, nil } // GetBinaryLogStatus implements the BinlogPrimaryController interface. -func (d *doltBinlogPrimaryController) GetBinaryLogStatus(ctx *sql.Context) ([]binlogreplication.BinaryLogStatus, error) { +func (d *DoltBinlogPrimaryController) GetBinaryLogStatus(_ *sql.Context) ([]binlogreplication.BinaryLogStatus, error) { + if d.binlogProducer == nil || d.binlogProducer.logManager == nil { + return nil, nil + } + return []binlogreplication.BinaryLogStatus{{ - File: binlogFilename, - Position: uint(d.BinlogProducer.binlogEventMeta.NextLogPosition), - ExecutedGtids: d.BinlogProducer.currentGtidPosition(), + File: d.binlogProducer.logManager.currentBinlogFileName, + Position: uint(d.binlogProducer.logManager.currentPosition), + ExecutedGtids: d.binlogProducer.currentGtidPosition(), }}, nil } diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_primary_log_manager.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_primary_log_manager.go new file mode 100644 index 00000000000..4f592840f89 --- /dev/null +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_primary_log_manager.go @@ -0,0 +1,629 @@ +// Copyright 2024 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package binlogreplication + +import ( + "encoding/binary" + "fmt" + "io" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/dolthub/go-mysql-server/sql" + gmstypes "github.com/dolthub/go-mysql-server/sql/types" + "github.com/dolthub/vitess/go/mysql" + "github.com/sirupsen/logrus" + + "github.com/dolthub/dolt/go/libraries/utils/filesys" +) + +var binlogDirectory = filepath.Join(".dolt", "binlog") + +// logManager is responsible for the binary log files on disk, including actually writing events to the log files, +// rotating the log files, listing the available log files, purging old log files, and keeping track of what GTIDs +// are available in the log files. +type logManager struct { + binlogFormat mysql.BinlogFormat + binlogEventMeta mysql.BinlogEventMetadata + + mu *sync.Mutex + currentBinlogFile *os.File + currentBinlogFileName string + currentPosition int + fs filesys.Filesys + binlogDirectory string + availableGtids mysql.GTIDSet +} + +// NewLogManager creates a new logManager instance where binlog files are stored in the .dolt/binlog directory +// underneath the specified |fs| filesystem. This method also initializes the binlog logging system, including +// rotating to a new log file and purging expired log files. +func NewLogManager(fs filesys.Filesys) (*logManager, error) { + binlogFormat := createBinlogFormat() + binlogEventMeta, err := createBinlogEventMetadata() + if err != nil { + return nil, err + } + + lm := &logManager{ + mu: &sync.Mutex{}, + fs: fs, + binlogFormat: *binlogFormat, + binlogEventMeta: *binlogEventMeta, + } + + // Initialize binlog file storage directory + if err := fs.MkDirs(binlogDirectory); err != nil { + return nil, err + } + abs, err := fs.Abs(binlogDirectory) + if err != nil { + return nil, err + } + lm.binlogDirectory = abs + + // Initialize a new binlog file + if err := lm.createNewBinlogFile(); err != nil { + return nil, err + } + + // Purge any expired log files + if err := lm.purgeExpiredLogFiles(); err != nil { + return nil, err + } + + // Ensure the previous log file has a Rotate event that points to the new log file + if err := lm.addRotateEventToPreviousLogFile(); err != nil { + return nil, err + } + + // Initialize @@gtid_purged based on the first GTID we see available in the available binary logs + // NOTE that we assume that all GTIDs are available after the first GTID we find in the logs. This won't + // be true if someone goes directly to the file system and deletes binary log files, but that isn't + // how we expect people to manage the binary log files. + if err := lm.initializePurgedGtids(); err != nil { + return nil, err + } + + // Initialize the set of GTIDs that are available in the current log files + if err := lm.initializeAvailableGtids(); err != nil { + return nil, err + } + + return lm, nil +} + +// initializeAvailableGtids sets the value of availableGtids by seeing what GTIDs have been executed (@@gtid_executed) +// and subtracting any GTIDs that have been marked as purged (@@gtid_purged). +func (lm *logManager) initializeAvailableGtids() (err error) { + // Initialize availableGtids from @@gtid_executed – we start by assuming we have all executed GTIDs available + // in the logs, and then adjust availableGtids based on which GTIDs we detect have been purged. + _, gtidExecutedValue, ok := sql.SystemVariables.GetGlobal("gtid_executed") + if !ok { + return fmt.Errorf("unable to find system variable @@gtid_executed") + } + if _, ok := gtidExecutedValue.(string); !ok { + return fmt.Errorf("unexpected type for @@gtid_executed: %T", gtidExecutedValue) + } + lm.availableGtids, err = mysql.ParseMysql56GTIDSet(gtidExecutedValue.(string)) + if err != nil { + return err + } + + _, gtidPurgedValue, ok := sql.SystemVariables.GetGlobal("gtid_purged") + if !ok { + return fmt.Errorf("unable to find system variable @@gtid_purged") + } + if _, ok := gtidExecutedValue.(string); !ok { + return fmt.Errorf("unexpected type for @@gtid_purged: %T", gtidPurgedValue) + } + purgedGtids, err := mysql.ParseMysql56GTIDSet(gtidPurgedValue.(string)) + if err != nil { + return err + } + + lm.availableGtids = lm.availableGtids.Subtract(purgedGtids) + logrus.Debugf("setting availableGtids to %s after removing purgedGtids %s", lm.availableGtids, purgedGtids) + return nil +} + +// purgeExpiredLogFiles removes any binlog files that are older than @@binlog_expire_logs_seconds. This automatic +// binlog purging currently happens only on server startup. Eventually this should also be hooked into the `FLUSH LOGS` +// (or `FLUSH BINARY LOGS`) statement as well to match MySQL's behavior. +// +// When this method is called, it is expected that the new, current binlog file has already been initialized and that +// adding a Rotate event to the previous log has NOT occurred yet (otherwise adding the Rotate event would update the +// log file's last modified time and would not be purged). +func (lm *logManager) purgeExpiredLogFiles() error { + expireLogsSeconds, err := lookupBinlogExpireLogsSeconds() + if expireLogsSeconds == 0 { + // If @@binlog_expire_logs_seconds is set to 0, then binlog files are never automatically expired + return nil + } + + purgeThresholdTime := time.Now().Add(-time.Duration(expireLogsSeconds) * time.Second) + + filenames, err := lm.logFilesOnDiskForBranch(BinlogBranch) + if err != nil { + return err + } + + logrus.WithField("logfiles", strings.Join(filenames, ", ")).Tracef("examining available log files for expiration...") + + for _, filename := range filenames { + fullLogFilepath := lm.resolveLogFile(filename) + stat, err := os.Stat(fullLogFilepath) + if err != nil { + return err + } + + logrus.WithField("file", filename). + WithField("mod_time", stat.ModTime()). + WithField("purge_threshold", purgeThresholdTime). + Tracef("checking log file") + + if stat.ModTime().Before(purgeThresholdTime) { + logrus.Debugf("purging expired binlog filename: %s", filename) + if err := os.Remove(fullLogFilepath); err != nil { + return err + } + } + } + return nil +} + +// initializePurgedGtids searches through the available binary logs to find the first GTID available +// in the binary logs. If a GTID is found in the available logs, then @@gtid_purged is set to the GTID immediately +// preceding the found GTID, unless the found GTID is sequence number 1. If no GTIDs are found in the available +// binary logs, then it is assumed that all GTIDs have been purged, so @@gtid_purged is set to the same value +// held in @@gtid_executed. +func (lm *logManager) initializePurgedGtids() error { + filenames, err := lm.logFilesOnDiskForBranch(BinlogBranch) + if err != nil { + return err + } + + for _, filename := range filenames { + gtid, err := lm.findFirstGtidInFile(filename) + if err == io.EOF { + continue + } else if err != nil { + return err + } + + // If the first found GTID in the available binary logs is anything other than sequence number 1, + // then we need to set @@gtid_purged to indicate that not all GTIDs are available in the logs, and + // all GTIDs before the first sequence number found have been purged. + sequenceNumber := gtid.SequenceNumber().(int64) + if sequenceNumber > 1 { + gtidPurged := fmt.Sprintf("%s:%d", gtid.SourceServer(), sequenceNumber-1) + logrus.Debugf("setting gtid_purged to: %s", gtidPurged) + return sql.SystemVariables.SetGlobal("gtid_purged", gtidPurged) + } else { + return nil + } + } + + // If there are no GTID events in any of the files, then all GTIDs have been purged, so + // initialize @@gtid_purged with the value of @@gtid_executed. + _, gtidExecutedValue, ok := sql.SystemVariables.GetGlobal("gtid_executed") + if !ok { + return fmt.Errorf("unable to find system variable @@gtid_executed") + } + logrus.Debugf("no available GTIDs found in logs, setting gtid_purged to: %s", gtidExecutedValue) + return sql.SystemVariables.SetGlobal("gtid_purged", gtidExecutedValue) +} + +// findLogFileForPosition searches through the available binlog files on disk for the first log file that +// contains GTIDs that are NOT present in |executedGtids|. This is determined by reading the first GTID event +// from each log file and selecting the previous file when the first GTID not in |executedGtids| is found. If +// the first GTID event in all available logs files is in |executedGtids|, then the current log file is returned. +func (lm *logManager) findLogFileForPosition(executedGtids mysql.GTIDSet) (string, error) { + files, err := lm.logFilesOnDiskForBranch(BinlogBranch) + if err != nil { + return "", err + } + + for i, f := range files { + binlogFilePath := filepath.Join(lm.binlogDirectory, f) + file, err := openBinlogFileForReading(binlogFilePath) + if err != nil { + return "", err + } + + binlogEvent, err := readFirstGtidEventFromFile(file) + if fileCloseErr := file.Close(); fileCloseErr != nil { + logrus.Errorf("unable to cleanly close binlog file %s: %s", f, err.Error()) + } + if err == io.EOF { + continue + } else if err != nil { + return "", err + } + + if binlogEvent.IsGTID() { + gtid, _, err := binlogEvent.GTID(lm.binlogFormat) + if err != nil { + return "", err + } + // If the first GTID in this file is contained in the set of GTIDs that the replica + // has already executed, then move on to check the next file. + if executedGtids.ContainsGTID(gtid) { + continue + } + + // If we found an unexecuted GTID in the first binlog file, return the first file, + // otherwise return the previous file + if i == 0 { + return binlogFilePath, nil + } else { + return filepath.Join(lm.binlogDirectory, files[i-1]), nil + } + } + } + + // If we don't find any GTIDs that are missing from |executedGtids|, then return the current + // log file so the streamer can reply events from it, potentially finding GTIDs later in the + // file that need to be sent to the connected replica, or waiting for new events to be written. + return lm.currentBinlogFilepath(), nil +} + +// findFirstGtidInFile opens the file with the base name |filename| in the binlog directory and +// reads the events until a GTID event is found, then extracts the GTID value and returns it. +func (lm *logManager) findFirstGtidInFile(filename string) (gtid mysql.GTID, err error) { + openFile, err := openBinlogFileForReading(lm.resolveLogFile(filename)) + if err != nil { + return nil, err + } + defer openFile.Close() + + binlogEvent, err := readFirstGtidEventFromFile(openFile) + if err != nil { + return nil, err + } + + gtid, _, err = binlogEvent.GTID(lm.binlogFormat) + if err != nil { + return nil, err + } + + return gtid, nil +} + +// addRotateEventToPreviousLogFile finds the previous binlog file and appends a Rotate event that points to the +// next binlog file. This is necessary so that as streamers are reading from the binlog files, they have a pointer +// to follow to the next binlog file. This function MUST be called after the new log file has been initialized, +// and after any expired log files have been purged. +func (lm *logManager) addRotateEventToPreviousLogFile() error { + previousLogFileName, err := lm.previousLogFile() + if err != nil { + return err + } + + // If the previous log file in the sequence has been purged, then there's nothing to do + if !fileExists(lm.resolveLogFile(previousLogFileName)) { + return nil + } + + // Open the log file and append a Rotate event + previousLogFile, err := os.OpenFile(lm.resolveLogFile(previousLogFileName), os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + return err + } + defer previousLogFile.Close() + + rotateEvent := mysql.NewRotateEvent(lm.binlogFormat, lm.binlogEventMeta, 0, lm.currentBinlogFileName) + _, err = previousLogFile.Write(rotateEvent.Bytes()) + return err +} + +// createNewBinlogFile creates a new binlog file and initializes it with the binlog magic number, a Format Description +// event, and a Previous GTIDs event. The new binlog file is opened for append only writing. +func (lm *logManager) createNewBinlogFile() error { + nextLogFilename, err := lm.nextLogFile() + if err != nil { + return err + } + lm.currentBinlogFileName = nextLogFilename + + return lm.initializeCurrentLogFile(lm.binlogFormat, lm.binlogEventMeta) +} + +// nextLogFile returns the filename of the next bin log file in the current sequence. For example, if the +// current log file is "binlog-main.000008" the nextLogFile() method would return "binlog-main.000009". +// Note that this function returns the file name only, not the full file path. +func (lm *logManager) nextLogFile() (filename string, err error) { + mostRecentLogfile, err := lm.mostRecentLogFileForBranch(BinlogBranch) + if err != nil { + return "", err + } + + if mostRecentLogfile == "" { + return formatBinlogFilename(BinlogBranch, 1), nil + } else { + branch, sequence, err := parseBinlogFilename(mostRecentLogfile) + if err != nil { + return "", err + } + return formatBinlogFilename(branch, sequence+1), nil + } +} + +// previousLogFile returns the filename of the previous bin log file in the current sequence. For example, if +// the current log file is "binlog-main.000008" the previousLogFile() method would return "binlog-main.000007". +// Note that this function returns the file name only, not the full path, and doesn't guarantee that the named +// file actually exists on disk or not. +func (lm *logManager) previousLogFile() (filename string, err error) { + branch, sequence, err := parseBinlogFilename(lm.currentBinlogFileName) + if err != nil { + return "", err + } + return formatBinlogFilename(branch, sequence-1), nil +} + +// TODO: consider moving these to the helper function file + +func (lm *logManager) logFilesOnDisk() (files []string, err error) { + err = lm.fs.Iter(binlogDirectory, false, func(path string, size int64, isDir bool) (stop bool) { + base := filepath.Base(path) + if strings.HasPrefix(base, "binlog-") { + files = append(files, base) + } + + return false + }) + if err != nil { + return nil, err + } + + return files, nil +} + +func (lm *logManager) logFilesOnDiskForBranch(branch string) (files []string, err error) { + branch = strings.ToLower(branch) + err = lm.fs.Iter(binlogDirectory, false, func(path string, size int64, isDir bool) (stop bool) { + base := filepath.Base(path) + if strings.HasPrefix(base, "binlog-"+branch) { + files = append(files, base) + } + + return false + }) + if err != nil { + return nil, err + } + + return files, nil +} + +func (lm *logManager) mostRecentLogfile() (logFile string, err error) { + logFiles, err := lm.logFilesOnDisk() + if err != nil { + return "", err + } + + return logFiles[len(logFiles)-1], nil +} + +func (lm *logManager) mostRecentLogFileForBranch(branch string) (logFile string, err error) { + logFiles, err := lm.logFilesOnDiskForBranch(branch) + if err != nil { + return "", err + } + + // TODO: This assumes the list comes back sorted by time or by filename + if len(logFiles) == 0 { + return "", nil + } else { + return logFiles[len(logFiles)-1], nil + } +} + +// RotateLogFile rotates the current log file that is actively being written to. A new binlog file is created and +// initialized, including writing the first four bytes with the binlog magic number, and the old binlog file is closed. +// Rotation should occur when an administrator explicitly requests it with the `FLUSH LOGS` statement, during server +// shutdown or restart, or when the current binary log file size exceeds the maximum size defined by the +// @@max_binlog_size system variable. +func (lm *logManager) RotateLogFile() error { + nextLogFile, err := lm.nextLogFile() + if err != nil { + return err + } + logrus.Tracef("Rotating bin log file to: %s", nextLogFile) + + binlogEvent := mysql.NewRotateEvent(lm.binlogFormat, lm.binlogEventMeta, 0, nextLogFile) + if err = lm.writeEventsHelper(binlogEvent); err != nil { + return err + } + + // Close the current binlog file + if err = lm.currentBinlogFile.Close(); err != nil { + logrus.Errorf("error closing current binlog file before rotating to new file: %s", err.Error()) + } + + // Open and initialize a new binlog file + lm.currentBinlogFileName = nextLogFile + return lm.initializeCurrentLogFile(lm.binlogFormat, lm.binlogEventMeta) +} + +// initializeCurrentLogFile creates and opens the current binlog file for append only writing, writes the first four +// bytes with the binlog magic numbers, then writes a Format Description event and a Previous GTIDs event. +func (lm *logManager) initializeCurrentLogFile(binlogFormat mysql.BinlogFormat, binlogEventMeta mysql.BinlogEventMetadata) error { + logrus.Tracef("Initializing binlog file: %s", lm.currentBinlogFilepath()) + + // Open the file in append mode + file, err := os.OpenFile(lm.currentBinlogFilepath(), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + lm.currentBinlogFile = file + lm.currentPosition = 0 + + // Write Magic Number + _, err = file.Write(binlogFileMagicNumber) + if err != nil { + return err + } + lm.currentPosition += len(binlogFileMagicNumber) + + // Write Format Description Event, the first event in each binlog file + binlogEvent := mysql.NewFormatDescriptionEvent(binlogFormat, binlogEventMeta) + if err = lm.writeEventsHelper(binlogEvent); err != nil { + return err + } + + // Write the Previous GTIDs event + // TODO: Instead of using the @@gtid_executed system variable, logManager could keep track of which GTIDs + // it has seen logged and use that as the source of truth for the Previous GTIDs event. This would + // eliminate a race condition. + _, rawValue, ok := sql.SystemVariables.GetGlobal("gtid_executed") + if !ok { + panic("unable to find @@gtid_executed system variable") + } + stringValue, ok := rawValue.(string) + if !ok { + panic(fmt.Sprintf("unexpected type for @@gtid_executed system variable: %T", rawValue)) + } + + gtidSet, err := mysql.ParseMysql56GTIDSet(stringValue) + if err != nil { + return err + } + return lm.writeEventsHelper(mysql.NewPreviousGtidsEvent(binlogFormat, binlogEventMeta, gtidSet.(mysql.Mysql56GTIDSet))) +} + +// WriteEvents writes |binlogEvents| to the current binlog file. Access to write to the binary log is synchronized, +// so that only one thread can write to the log file at a time. +func (lm *logManager) WriteEvents(binlogEvents ...mysql.BinlogEvent) error { + // synchronize on WriteEvents so that only one thread is writing to the log file at a time + lm.mu.Lock() + defer lm.mu.Unlock() + + return lm.writeEventsHelper(binlogEvents...) +} + +// writeEventsHelper writes |binlogEvents| to the current binlog file. This function is NOT synchronized, and is only +// intended to be used from code inside logManager that needs to be called transitively from the WriteEvents method. +func (lm *logManager) writeEventsHelper(binlogEvents ...mysql.BinlogEvent) error { + maxBinlogSize, err := lookupMaxBinlogSize() + if err != nil { + return err + } + + // Write to the file + rotateLogFile := false + for _, event := range binlogEvents { + // NOTE: When we write the event to file, we need to ensure the next log position field + // is correct. That means we have to serialize the events going into the log file and + // we update their NextLogPosition field in the header to ensure it's correct. Because + // we change the packet, we must recompute the checksum. + nextPosition := lm.currentPosition + len(event.Bytes()) + binary.LittleEndian.PutUint32(event.Bytes()[13:13+4], uint32(nextPosition)) + mysql.UpdateChecksum(lm.binlogFormat, event) + + lm.currentPosition = nextPosition + if nextPosition > maxBinlogSize && !event.IsRotate() { + rotateLogFile = true + } + + // Write the event to file + if _, err := lm.currentBinlogFile.Write(event.Bytes()); err != nil { + return err + } + + if event.IsGTID() { + gtid, _, err := event.GTID(lm.binlogFormat) + if err != nil { + return err + } + // TODO: Consider locking around lm.availableGtids + lm.availableGtids = lm.availableGtids.AddGTID(gtid) + } + } + + if rotateLogFile { + // NOTE: Rotate event should be the very last entry in the (completed) binlog file. + // Streamers will read the rotate event and know what file to open next. + return lm.RotateLogFile() + } + + return nil +} + +// calculateMissingGtids takes the set of GTIDs that a replica reports it has executed, |replicaExecutedGtids|, +// and the full set of GTIDs executed in this primary server, |primaryExecutedGtids|, and determines which GTIDs +// are needed to bring the replica in sync with this server, but not available in the current binary logs. The +// results are a returned as the set of GTIDs that this server is unable to provide to the replica, since they are +// no longer available in this server's binary logs. If the returned set of GTIDs is empty, then this server can +// accept the connection from the replica and start streaming it the GTIDs needed to get it in sync with this +// primary. +func (lm *logManager) calculateMissingGtids(replicaExecutedGtids mysql.GTIDSet, primaryExecutedGtids mysql.GTIDSet) mysql.GTIDSet { + // First, subtract all the GTIDs that the replica has executed from the GTIDs this server has executed, + // in order to determine which GTIDs are needed to get the replica in sync with the primary. + neededGtids := primaryExecutedGtids.Subtract(replicaExecutedGtids) + + // Next subtract all the GTIDs that are available in the logs to determine which GTIDs are missing. + missingGtids := neededGtids.Subtract(lm.availableGtids) + + logrus.Debugf("calculateMissingGtids: replicaExecutedGtids: %s, primaryExecutedGtids: %s, neededGtids: %s, availableGtids: %s, missingGtids: %s", replicaExecutedGtids, primaryExecutedGtids, neededGtids, lm.availableGtids, missingGtids) + + return missingGtids +} + +// resolveLogFile accepts a base filename of a binlog file and returns a fully qualified file +// path to the file in the binlog storage directory. +func (lm *logManager) resolveLogFile(filename string) string { + return filepath.Join(lm.binlogDirectory, filename) +} + +// TODO: Consider moving lookup SystemVar helper functions into a separate file + +func (lm *logManager) currentBinlogFilepath() string { + return lm.resolveLogFile(lm.currentBinlogFileName) +} + +// lookupMaxBinlogSize looks up the value of the @@max_binlog_size system variable and returns it, along with any +// errors encountered while looking it up. +func lookupMaxBinlogSize() (int, error) { + _, value, ok := sql.SystemVariables.GetGlobal("max_binlog_size") + if !ok { + return 0, fmt.Errorf("system variable @@max_binlog_size not found") + } + + intValue, _, err := gmstypes.Int32.Convert(value) + if err != nil { + return 0, err + } + return int(intValue.(int32)), nil +} + +// lookupBinlogExpireLogsSeconds looks up the value of the @@binlog_expire_logs_seconds system variable and returns +// it, along with any errors encountered while looking it up. +func lookupBinlogExpireLogsSeconds() (int, error) { + _, value, ok := sql.SystemVariables.GetGlobal("binlog_expire_logs_seconds") + if !ok { + return -1, fmt.Errorf("unable to find system variable @@binlog_expire_logs_seconds") + } + + int32Value, _, err := gmstypes.Int32.Convert(value) + if err != nil { + return -1, fmt.Errorf("unable to convert @@binlog_expire_logs_seconds value to integer: %s", err.Error()) + } + + return int(int32Value.(int32)), nil +} diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_primary_streamer.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_primary_streamer.go index 6ddc4abf61c..ed57062fdd2 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_primary_streamer.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_primary_streamer.go @@ -16,6 +16,9 @@ package binlogreplication import ( "fmt" + "io" + "os" + "path/filepath" "sync" "time" @@ -29,17 +32,17 @@ import ( // It also sends heartbeat events to the replica over the same connection at // regular intervals. There is one streamer per connected replica. type binlogStreamer struct { - quitChan chan struct{} - eventChan chan []mysql.BinlogEvent - ticker *time.Ticker + quitChan chan struct{} + ticker *time.Ticker + skippingGtids bool + currentLogFile *os.File } // NewBinlogStreamer creates a new binlogStreamer instance. func newBinlogStreamer() *binlogStreamer { return &binlogStreamer{ - quitChan: make(chan struct{}), - eventChan: make(chan []mysql.BinlogEvent, 5), - ticker: time.NewTicker(30 * time.Second), + quitChan: make(chan struct{}), + ticker: time.NewTicker(30 * time.Second), } } @@ -49,46 +52,118 @@ func newBinlogStreamer() *binlogStreamer { // and |binlogEventMeta| records the position of the stream. This method blocks until an error // is received over the stream (e.g. the connection closing) or the streamer is closed, // through it's quit channel. -func (streamer *binlogStreamer) startStream(ctx *sql.Context, conn *mysql.Conn, binlogFormat *mysql.BinlogFormat, binlogEventMeta mysql.BinlogEventMetadata) error { - if err := sendInitialEvents(ctx, conn, binlogFormat, &binlogEventMeta); err != nil { +func (streamer *binlogStreamer) startStream(ctx *sql.Context, conn *mysql.Conn, executedGtids mysql.GTIDSet, binlogFormat *mysql.BinlogFormat, binlogEventMeta *mysql.BinlogEventMetadata, logfile string) (err error) { + logrus.WithField("connection_id", conn.ConnectionID). + WithField("executed_gtids", executedGtids). + Trace("starting binlog stream") + + streamer.currentLogFile, err = openBinlogFileForReading(logfile) + if err != nil { return err } - for { - logrus.StandardLogger().Trace("binlog streamer is listening for messages") + // Send a fake rotate event to let the replica know what file and position we're at + binlogEventMeta.NextLogPosition = 4 + rotateEvent := mysql.NewFakeRotateEvent(*binlogFormat, *binlogEventMeta, filepath.Base(logfile)) + if err = conn.WriteBinlogEvent(rotateEvent, false); err != nil { + return err + } + _ = conn.FlushBuffer() + defer streamer.currentLogFile.Close() + + for { select { case <-streamer.quitChan: - logrus.StandardLogger().Trace("received message from streamer's quit channel") + logrus.Debug("received message from streamer's quit channel") streamer.ticker.Stop() return nil case <-streamer.ticker.C: - logrus.StandardLogger().Trace("sending binlog heartbeat") - if err := sendHeartbeat(conn, binlogFormat, binlogEventMeta); err != nil { + logrus.Debug("sending binlog heartbeat") + if err := sendHeartbeat(conn, binlogFormat, *binlogEventMeta); err != nil { return err } if err := conn.FlushBuffer(); err != nil { return fmt.Errorf("unable to flush binlog connection: %s", err.Error()) } - case events := <-streamer.eventChan: - // TODO: If an error occurs while sending an event, it would be nice to have a retry at this - // level. Technically the replica should be abel to automatically reconnect and restart - // the stream from the last GTID it executed successfully, but it would be better to - // avoid the extra work for the reconnection and restart if possible. - logrus.StandardLogger().Tracef("streaming %d binlog events", len(events)) - for _, event := range events { - if err := conn.WriteBinlogEvent(event, false); err != nil { - return err - } - binlogEventMeta.NextLogPosition += event.Length() + default: + logrus.Trace("checking binlog file for new events...") + // TODO: Being able to select on new updates from the file would be nicer + err := streamer.streamNextEvents(ctx, conn, + *binlogFormat, binlogEventMeta, filepath.Dir(logfile), executedGtids) + if err == io.EOF { + logrus.Debug("End of binlog file! Pausing for new events...") + time.Sleep(250 * time.Millisecond) + } else if err != nil { + return err } - if err := conn.FlushBuffer(); err != nil { - return fmt.Errorf("unable to flush binlog connection: %s", err.Error()) + } + } +} + +// streamNextEvents streams up to 50 of the next events from the current binary logfile to a replica connected on +// |conn|. |executedGtids| indicates which GTIDs the connected replica has already executed. |logFileDir| indicates +// where the streamer can look for more binary log files with the current file rotates. If an error, including +// io.EOF, occurs while reading from the file, it is returned. +func (streamer *binlogStreamer) streamNextEvents(_ *sql.Context, conn *mysql.Conn, binlogFormat mysql.BinlogFormat, binlogEventMeta *mysql.BinlogEventMetadata, logFileDir string, executedGtids mysql.GTIDSet) error { + for range 50 { + binlogEvent, err := readBinlogEventFromFile(streamer.currentLogFile) + if err != nil { + return err + } + + // Update next log position in the stream so that we can send the correct position + // when a heartbeat needs to be sent by the timer signal. + binlogEventMeta.NextLogPosition += binlogEvent.Length() + + if binlogEvent.IsRotate() { + bytes := binlogEvent.Bytes() + newLogfile := string(bytes[19+8 : (len(bytes) - 4)]) + logrus.Debugf("Rotatating to new binlog file: %s", newLogfile) + + if err = streamer.currentLogFile.Close(); err != nil { + logrus.Errorf("unable to close previous binlog file: %s", err.Error()) + } + + newLogfile = filepath.Join(logFileDir, newLogfile) + if streamer.currentLogFile, err = openBinlogFileForReading(newLogfile); err != nil { + return err + } + + // Reset log position to right after the 4 byte magic number for the file type + binlogEventMeta.NextLogPosition = 4 + continue + } + + if binlogEvent.IsGTID() { + gtid, _, err := binlogEvent.GTID(binlogFormat) + if err != nil { + return err + } + + // If the replica has already executed this GTID, then skip it. + if executedGtids.ContainsGTID(gtid) { + streamer.skippingGtids = true + } else { + streamer.skippingGtids = false } } + + if streamer.skippingGtids { + continue + } + + if err := conn.WriteBinlogEvent(binlogEvent, false); err != nil { + return err + } + if err := conn.FlushBuffer(); err != nil { + return err + } } + + return nil } // binlogStreamerManager manages a collection of binlogStreamers, one for reach connected replica, @@ -98,6 +173,7 @@ type binlogStreamerManager struct { streamers []*binlogStreamer streamersMutex sync.Mutex quitChan chan struct{} + logManager *logManager } // NewBinlogStreamerManager creates a new binlogStreamerManager instance. @@ -137,21 +213,20 @@ func (m *binlogStreamerManager) copyStreamers() []*binlogStreamer { // StartStream starts a new binlogStreamer and streams events over |conn| until the connection // is closed, the streamer is sent a quit signal over its quit channel, or the streamer receives // errors while sending events over the connection. Note that this method blocks until the -// streamer exits. -func (m *binlogStreamerManager) StartStream(ctx *sql.Context, conn *mysql.Conn, binlogFormat *mysql.BinlogFormat, binlogEventMeta mysql.BinlogEventMetadata) error { +// streamer exits. Note that this function does NOT validate that the primary has the correct set +// of GTIDs available to get the replica in sync with the primary – it is expected for that +// validation to have been completed before starting a binlog stream. +func (m *binlogStreamerManager) StartStream(ctx *sql.Context, conn *mysql.Conn, executedGtids mysql.GTIDSet, binlogFormat *mysql.BinlogFormat, binlogEventMeta mysql.BinlogEventMetadata) error { streamer := newBinlogStreamer() m.addStreamer(streamer) defer m.removeStreamer(streamer) - return streamer.startStream(ctx, conn, binlogFormat, binlogEventMeta) -} - -// sendEvents sends |binlogEvents| to all the streams managed by this instance. -func (m *binlogStreamerManager) sendEvents(binlogEvents []mysql.BinlogEvent) { - for _, streamer := range m.copyStreamers() { - logrus.StandardLogger().Tracef("queuing %d binlog events\n", len(binlogEvents)) - streamer.eventChan <- binlogEvents + file, err := m.logManager.findLogFileForPosition(executedGtids) + if err != nil { + return err } + + return streamer.startStream(ctx, conn, executedGtids, binlogFormat, &binlogEventMeta, file) } // addStreamer adds |streamer| to the slice of streamers managed by this binlogStreamerManager. @@ -167,6 +242,10 @@ func (m *binlogStreamerManager) removeStreamer(streamer *binlogStreamer) { m.streamersMutex.Lock() defer m.streamersMutex.Unlock() + if len(m.streamers) == 0 { + return + } + m.streamers = make([]*binlogStreamer, len(m.streamers)-1, 0) for _, element := range m.streamers { if element != streamer { @@ -179,40 +258,6 @@ func sendHeartbeat(conn *mysql.Conn, binlogFormat *mysql.BinlogFormat, binlogEve binlogEventMeta.Timestamp = uint32(0) // Timestamp is zero for a heartbeat event logrus.WithField("log_position", binlogEventMeta.NextLogPosition).Tracef("sending heartbeat") - binlogEvent := mysql.NewHeartbeatEventWithLogFile(*binlogFormat, binlogEventMeta, binlogFilename) - return conn.WriteBinlogEvent(binlogEvent, false) -} - -// sendInitialEvents sends the initial binlog events (i.e. Rotate, FormatDescription) over a newly established binlog -// streaming connection. -func sendInitialEvents(_ *sql.Context, conn *mysql.Conn, binlogFormat *mysql.BinlogFormat, binlogEventMeta *mysql.BinlogEventMetadata) error { - err := sendRotateEvent(conn, binlogFormat, binlogEventMeta) - if err != nil { - return err - } - - err = sendFormatDescription(conn, binlogFormat, binlogEventMeta) - if err != nil { - return err - } - - return conn.FlushBuffer() -} - -func sendRotateEvent(conn *mysql.Conn, binlogFormat *mysql.BinlogFormat, binlogEventMeta *mysql.BinlogEventMetadata) error { - binlogFilePosition := uint64(0) - binlogEventMeta.NextLogPosition = uint32(binlogFilePosition) - - // The Rotate event sent at the start of a stream is a "virtual" event that isn't actually - // recorded to the binary log file, but sent to the replica so it knows what file is being - // read from. Because it is virtual, we do NOT update the nextLogPosition field of - // BinlogEventMetadata. - binlogEvent := mysql.NewRotateEvent(*binlogFormat, *binlogEventMeta, binlogFilePosition, binlogFilename) - return conn.WriteBinlogEvent(binlogEvent, false) -} - -func sendFormatDescription(conn *mysql.Conn, binlogFormat *mysql.BinlogFormat, binlogEventMeta *mysql.BinlogEventMetadata) error { - binlogEvent := mysql.NewFormatDescriptionEvent(*binlogFormat, *binlogEventMeta) - binlogEventMeta.NextLogPosition += binlogEvent.Length() + binlogEvent := mysql.NewHeartbeatEvent(*binlogFormat, binlogEventMeta) return conn.WriteBinlogEvent(binlogEvent, false) } diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_primary_test.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_primary_test.go index dc35c69ed77..f8d8f9bf138 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_primary_test.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_primary_test.go @@ -16,6 +16,8 @@ package binlogreplication import ( "fmt" + "os" + "path/filepath" "testing" "time" @@ -33,13 +35,74 @@ var doltReplicationPrimarySystemVars = map[string]string{ "gtid_mode": "ON", } +// TestBinlogPrimary_BinlogNotEnabled tests that when binary logging is NOT enabled, primary commands such as +// SHOW BINARY LOGS still work, and that attempts to start replication fail with an error. +func TestBinlogPrimary_BinlogNotEnabled(t *testing.T) { + defer teardown(t) + startSqlServersWithDoltSystemVars(t, map[string]string{ + "enforce_gtid_consistency": "ON", + "gtid_mode": "ON", + }) + setupForDoltToMySqlReplication() + + // When binary logging is NOT enabled, binary log commands such as SHOW MASTER STATUS, SHOW BINARY LOG STATUS, + // and SHOW BINARY LOGS should not error out. + requirePrimaryResults(t, "SHOW MASTER STATUS", [][]any{}) + requirePrimaryResults(t, "SHOW BINARY LOG STATUS", [][]any{}) + requirePrimaryResults(t, "SHOW BINARY LOGS", [][]any{}) + + startReplicationAndCreateTestDb(t, doltPort) + status := queryReplicaStatus(t) + require.Equal(t, "13120", status["Last_IO_Errno"]) + require.Contains(t, status["Last_IO_Error"], + "Source command COM_REGISTER_REPLICA failed: unknown error: no binlog currently being recorded") +} + +// TestBinlogPrimary_GtidModeNotEnabled asserts that when @@gtid_mode is NOT enabled, +// attempting to start replication will fail with an error visible in the replica's status. +func TestBinlogPrimary_GtidModeNotEnabled(t *testing.T) { + defer teardown(t) + startSqlServersWithDoltSystemVars(t, map[string]string{"log_bin": "1"}) + setupForDoltToMySqlReplication() + + requirePrimaryResults(t, "SHOW MASTER STATUS", [][]any{{"binlog-main.000001", "151", "", "", ""}}) + requirePrimaryResults(t, "SHOW BINARY LOG STATUS", [][]any{{"binlog-main.000001", "151", "", "", ""}}) + requirePrimaryResults(t, "SHOW BINARY LOGS", [][]any{{"binlog-main.000001", "151", "No"}}) + + startReplication(t, doltPort) + time.Sleep(500 * time.Millisecond) + status := queryReplicaStatus(t) + require.Equal(t, "13117", status["Last_IO_Errno"]) + require.Contains(t, status["Last_IO_Error"], + "The replication receiver thread cannot start because the source has GTID_MODE = OFF and this server has GTID_MODE = ON") +} + +// TestBinlogPrimary_EnforceGtidConsistencyNotEnabled asserts that when @@enforce_gtid_consistency is NOT enabled, +// attempting to start replication will fail with an error visible in the replica's status. +func TestBinlogPrimary_EnforceGtidConsistencyNotEnabled(t *testing.T) { + defer teardown(t) + startSqlServersWithDoltSystemVars(t, map[string]string{"log_bin": "1", "gtid_mode": "ON"}) + setupForDoltToMySqlReplication() + + requirePrimaryResults(t, "SHOW MASTER STATUS", [][]any{{"binlog-main.000001", "151", "", "", ""}}) + requirePrimaryResults(t, "SHOW BINARY LOG STATUS", [][]any{{"binlog-main.000001", "151", "", "", ""}}) + requirePrimaryResults(t, "SHOW BINARY LOGS", [][]any{{"binlog-main.000001", "151", "No"}}) + + startReplication(t, doltPort) + time.Sleep(500 * time.Millisecond) + status := queryReplicaStatus(t) + require.Equal(t, "13114", status["Last_IO_Errno"]) + require.Contains(t, status["Last_IO_Error"], + "@@enforce_gtid_consistency must be enabled for binlog replication") +} + // TestBinlogPrimary runs a simple sanity check that a MySQL replica can connect to a Dolt primary and receive // binlog events from a wide variety of SQL data types. func TestBinlogPrimary(t *testing.T) { defer teardown(t) startSqlServersWithDoltSystemVars(t, doltReplicationPrimarySystemVars) setupForDoltToMySqlReplication() - startReplication(t, doltPort) + startReplicationAndCreateTestDb(t, doltPort) primaryDatabase.MustExec("create table db01.t (" + "pk int primary key, " + @@ -124,7 +187,181 @@ func TestBinlogPrimary(t *testing.T) { }) requirePrimaryResults(t, "SHOW BINARY LOG STATUS", [][]any{ - {"binlog-main.000001", "2226", "", "", uuid + ":1-3"}}) + {"binlog-main.000001", "2377", "", "", uuid + ":1-3"}}) +} + +// TestBinlogPrimary_Rotation tests how a Dolt primary server handles rotating the binary log file when the +// size threshold is reached. +func TestBinlogPrimary_Rotation(t *testing.T) { + defer teardown(t) + startSqlServersWithDoltSystemVars(t, doltReplicationPrimarySystemVars) + setupForDoltToMySqlReplication() + startReplicationAndCreateTestDb(t, doltPort) + + // Change the binlog rotation threshold on the primary to 10KB (instead of the default 1GB) + primaryDatabase.MustExec("SET @@GLOBAL.max_binlog_size = 10240;") + + // Generate enough data to trigger a logfile rotation + primaryDatabase.MustExec("create table t (n int);") + for i := range 100 { + primaryDatabase.MustExec(fmt.Sprintf("insert into t values (%d);", i)) + } + waitForReplicaToCatchUp(t) + requireReplicaResults(t, "SELECT MAX(n) FROM t;", [][]any{{"99"}}) + + // Check the binary log file status and ensure the file has been rotated + uuid := queryPrimaryServerUuid(t) + requirePrimaryResults(t, "show binary log status;", [][]any{ + {"binlog-main.000003", "1027", "", "", uuid + ":1-102"}, + }) + + requirePrimaryResults(t, "show binary logs;", [][]any{ + {"binlog-main.000001", "10318", "No"}, + {"binlog-main.000002", "10481", "No"}, + {"binlog-main.000003", "1027", "No"}, + }) +} + +// TestBinlogPrimary_AutoPurging tests that the primary server correctly purges binary log files older than +// @@binlog_expire_logs_seconds on restart. +func TestBinlogPrimary_AutoPurging(t *testing.T) { + defer teardown(t) + mapCopy := copyMap(doltReplicationPrimarySystemVars) + mapCopy["binlog_expire_logs_seconds"] = "1" + startSqlServersWithDoltSystemVars(t, mapCopy) + setupForDoltToMySqlReplication() + + // Generate binary log content + primaryDatabase.MustExec("create database db01;") + primaryDatabase.MustExec("create table db01.t (n int);") + for i := range 100 { + primaryDatabase.MustExec(fmt.Sprintf("insert into db01.t values (%d);", i)) + } + requirePrimaryResults(t, "SHOW BINARY LOGS;", [][]any{ + {"binlog-main.000001", "21346", "No"}, + }) + + // Restart and confirm the binary log has been purged + stopDoltSqlServer(t) + time.Sleep(1 * time.Second) + mustRestartDoltPrimaryServer(t) + requirePrimaryResults(t, "SHOW BINARY LOGS;", [][]any{ + {"binlog-main.000002", "191", "No"}, + }) + + // Check the value of @@gtid_purged + requirePrimaryResults(t, "SELECT @@gtid_purged;", [][]any{ + {fmt.Sprintf("%s:1-102", queryPrimaryServerUuid(t))}, + }) + + // Verify the replica reports an error about the GTIDs not being available + startReplicationAndCreateTestDb(t, doltPort) + status := queryReplicaStatus(t) + require.Equal(t, "13114", status["Last_IO_Errno"]) + require.Contains(t, status["Last_IO_Error"], + "Got fatal error 1236 from source when reading data from binary log: "+ + "'Cannot replicate because the source purged required binary logs.") +} + +// TestBinlogPrimary_InitializeGTIDPurged asserts that @@gtid_purged is set correctly in a variety of +// scenarios, such as when a fresh server starts up, or when a server is restarted multiple times. +func TestBinlogPrimary_InitializeGTIDPurged(t *testing.T) { + defer teardown(t) + startSqlServersWithDoltSystemVars(t, doltReplicationPrimarySystemVars) + setupForDoltToMySqlReplication() + + // On a fresh server, @@gtid_purged and @@gtid_executed should be empty + requirePrimaryResults(t, "SELECT @@gtid_executed;", [][]any{{""}}) + requirePrimaryResults(t, "SELECT @@gtid_purged;", [][]any{{""}}) + + // Create a GTID in the first binary log file, and restart the server to rotate to a new binary log file + // After the first restart, @@gtid_purged should be empty and @@gtid_executed should be the first GTID + primaryDatabase.MustExec("CREATE DATABASE db01;") + stopDoltSqlServer(t) + mustRestartDoltPrimaryServer(t) + requirePrimaryResults(t, "SELECT @@gtid_executed;", [][]any{ + {fmt.Sprintf("%s:1", queryPrimaryServerUuid(t))}, + }) + requirePrimaryResults(t, "SELECT @@gtid_purged;", [][]any{{""}}) + + // Manually remove the first binary log file, containing GTID 1 and restart the server + // When no GTID is found in any available logs, @@gtid_purged should be set to @@gtid_executed + require.NoError(t, os.Remove(filepath.Join(testDir, "dolt", ".dolt", "binlog", "binlog-main.000001"))) + stopDoltSqlServer(t) + mustRestartDoltPrimaryServer(t) + requirePrimaryResults(t, "SELECT @@gtid_executed;", [][]any{ + {fmt.Sprintf("%s:1", queryPrimaryServerUuid(t))}, + }) + requirePrimaryResults(t, "SELECT @@gtid_purged;", [][]any{ + {fmt.Sprintf("%s:1", queryPrimaryServerUuid(t))}, + }) + + // Create a new GTID in the current binary log file, restart, and test @@gtid_executed and @@gtid_purged + primaryDatabase.MustExec("CREATE DATABASE db02;") + stopDoltSqlServer(t) + mustRestartDoltPrimaryServer(t) + requirePrimaryResults(t, "SELECT @@gtid_executed;", [][]any{ + {fmt.Sprintf("%s:1-2", queryPrimaryServerUuid(t))}, + }) + requirePrimaryResults(t, "SELECT @@gtid_purged;", [][]any{ + {fmt.Sprintf("%s:1", queryPrimaryServerUuid(t))}, + }) +} + +// TestBinlogPrimary_ReplicaAndPrimaryRestart tests that a replica can disconnect and reconnect to the primary to +// restart the replication stream, even when the primary has been restarted and log files have rotated. +func TestBinlogPrimary_ReplicaAndPrimaryRestart(t *testing.T) { + defer teardown(t) + startSqlServersWithDoltSystemVars(t, doltReplicationPrimarySystemVars) + setupForDoltToMySqlReplication() + startReplicationAndCreateTestDb(t, doltPort) + + // Change the binlog rotation threshold on the primary to 10KB (instead of the default 1GB) so + // that log files will rotate more often + primaryDatabase.MustExec("SET @@GLOBAL.max_binlog_size = 10240;") + + // Create a table on the primary and assert that it gets replicated + primaryDatabase.MustExec("create table db01.t1 (pk int primary key, c1 varchar(255));") + waitForReplicaToCatchUp(t) + requireReplicaResults(t, "show tables;", [][]any{{"t1"}}) + + // Assert that the executed GTID position on the replica contains GTIDs 1 and 2 + serverUuid := queryPrimaryServerUuid(t) + status := queryReplicaStatus(t) + require.Equal(t, serverUuid+":1-2", status["Executed_Gtid_Set"]) + + // Stop the MySQL replica server and wait for a few seconds + stopMySqlServer(t) + time.Sleep(2_000 * time.Millisecond) + + // Generate enough data to trigger a logfile rotation + primaryDatabase.MustExec("create table t (n int);") + for i := range 100 { + primaryDatabase.MustExec(fmt.Sprintf("insert into t values (%d);", i)) + } + + // Stop the primary and restart it to test that it creates a new log file and + // applies a rotate event to the last log file + stopDoltSqlServer(t) + + // Restart the Dolt primary server + mustRestartDoltPrimaryServer(t) + + // Generate more data on the primary after restarting + primaryDatabase.MustExec("use db01;") + for i := range 100 { + primaryDatabase.MustExec(fmt.Sprintf("insert into t values (%d);", i+100)) + } + + // Restart the MySQL replica and reconnect to the Dolt primary + mustRestartMySqlReplicaServer(t) + startReplicationAndCreateTestDb(t, doltPort) + waitForReplicaToCatchUp(t) + + // Assert the executed GTID position now contains all GTIDs + status = queryReplicaStatus(t) + require.Equal(t, serverUuid+":1-203", status["Executed_Gtid_Set"]) + requireReplicaResults(t, "SELECT MAX(n) FROM t;", [][]any{{"199"}}) } // TestBinlogPrimary_Heartbeats tests that heartbeats sent from the primary to the replica are well-formed and @@ -138,7 +375,7 @@ func TestBinlogPrimary_Heartbeats(t *testing.T) { // Start replication, with a 45s delay before any commands are sent to the primary. // This gives enough time for the first heartbeat event to be sent, before any user // initiated binlog events, so we can test that scenario. - startReplicationWithDelay(t, doltPort, 45*time.Second) + startReplicationAndCreateTestDbWithDelay(t, doltPort, 45*time.Second) // Insert a row every second, for 70s, which gives the server a chance to send two heartbeats primaryDatabase.MustExec("create table db01.heartbeatTest(pk int);") @@ -169,7 +406,7 @@ func TestBinlogPrimary_ReplicaRestart(t *testing.T) { defer teardown(t) startSqlServersWithDoltSystemVars(t, doltReplicationPrimarySystemVars) setupForDoltToMySqlReplication() - startReplication(t, doltPort) + startReplicationAndCreateTestDb(t, doltPort) // Create a table on the primary and assert that it gets replicated primaryDatabase.MustExec("create table db01.t1 (pk int primary key, c1 varchar(255));") @@ -187,7 +424,6 @@ func TestBinlogPrimary_ReplicaRestart(t *testing.T) { // Make a change while the replica is stopped to test that the server // doesn't error out when a registered replica is not available. - // NOTE: This won't be replicated until we start persisting the binlog to disk primaryDatabase.MustExec("insert into db01.t1 values (1, 'one');") // Restart the MySQL replica and reconnect to the Dolt primary @@ -197,21 +433,16 @@ func TestBinlogPrimary_ReplicaRestart(t *testing.T) { require.NoError(t, err) replicaDatabase = primaryDatabase primaryDatabase = prevPrimaryDatabase - startReplication(t, doltPort) + startReplicationAndCreateTestDb(t, doltPort) // Create another table and assert that it gets replicated primaryDatabase.MustExec("create table db01.t2 (pk int primary key, c1 varchar(255));") - // We can't use waitForReplicaToCatchUp here, because it won't get the statement - // that was executed while the replica was stopped. Once we support replaying binlog - // events from a log file, we can switch this to waitForReplicaToCatchUp(t) - waitForReplicaToHaveLatestGtid(t) + waitForReplicaToCatchUp(t) requireReplicaResults(t, "show tables;", [][]any{{"t1"}, {"t2"}}) - // Assert the executed GTID position now contains GTID #2 and GTID #4 - // (#1 isn't present, because it was executed before we turned on replication, - // and #3 isn't present, because it was executed while the replica was stopped) + // Assert the executed GTID position now contains all GTIDs status = queryReplicaStatus(t) - require.Equal(t, serverUuid+":1-2:4", status["Executed_Gtid_Set"]) + require.Equal(t, serverUuid+":1-4", status["Executed_Gtid_Set"]) } // TestBinlogPrimary_PrimaryRestart tests that a Dolt primary server can be restarted and that a replica @@ -220,7 +451,12 @@ func TestBinlogPrimary_PrimaryRestart(t *testing.T) { defer teardown(t) startSqlServersWithDoltSystemVars(t, doltReplicationPrimarySystemVars) setupForDoltToMySqlReplication() - startReplication(t, doltPort) + startReplicationAndCreateTestDb(t, doltPort) + + // Only one binary log file should be present on a fresh server + requirePrimaryResults(t, "show binary logs;", [][]any{ + {"binlog-main.000001", "263", "No"}, + }) // Create a table on the primary and assert that it gets replicated primaryDatabase.MustExec("create table db01.t1 (pk int primary key, c1 varchar(255));") @@ -235,16 +471,17 @@ func TestBinlogPrimary_PrimaryRestart(t *testing.T) { // Stop the Dolt primary server stopDoltSqlServer(t) time.Sleep(2_000 * time.Millisecond) - prevReplicaDatabase := replicaDatabase // Restart the Dolt primary server - var err error - doltPort, doltProcess, err = startDoltSqlServer(testDir, nil) - require.NoError(t, err) - primaryDatabase = replicaDatabase - replicaDatabase = prevReplicaDatabase + mustRestartDoltPrimaryServer(t) waitForReplicaToReconnect(t) + // A new binary log file is created on each server restart + requirePrimaryResults(t, "show binary logs;", [][]any{ + {"binlog-main.000001", "549", "No"}, + {"binlog-main.000002", "191", "No"}, + }) + // Create another table and assert that it gets replicated primaryDatabase.MustExec("create table db01.t2 (pk int primary key, c1 varchar(255));") waitForReplicaToCatchUp(t) @@ -255,33 +492,46 @@ func TestBinlogPrimary_PrimaryRestart(t *testing.T) { require.Equal(t, serverUuid+":1-3", status["Executed_Gtid_Set"]) } -// TestBinlogPrimary_OptIn asserts that binary logging does not work when the log_bin system variable is not set. -func TestBinlogPrimary_OptIn(t *testing.T) { +// TestBinlogPrimary_PrimaryRestartBeforeReplicaConnects tests that a MySQL replica can connect to a Dolt primary +// when the Dolt primary has multiple binlog files and the replica needs events from a non-current binlog file. +func TestBinlogPrimary_PrimaryRestartBeforeReplicaConnects(t *testing.T) { defer teardown(t) - startSqlServersWithDoltSystemVars(t, map[string]string{ - "enforce_gtid_consistency": "ON", - "gtid_mode": "ON", - }) + startSqlServersWithDoltSystemVars(t, doltReplicationPrimarySystemVars) setupForDoltToMySqlReplication() - // Ensure that log_bin is disabled before having the replica try to connect - requirePrimaryResults(t, "select @@log_bin;", [][]any{{"0"}}) + // Create a test database to trigger the first GTID binlog event + primaryDatabase.MustExec("CREATE DATABASE db02;") + + // Restart the Dolt primary server to trigger a binlog file rotation + stopDoltSqlServer(t) + mustRestartDoltPrimaryServer(t) + + // Start replication and verify the replica receives the CREATE DATABASE event from the first binlog file startReplication(t, doltPort) + waitForReplicaToCatchUp(t) + requireReplicaResults(t, "SHOW DATABASES;", [][]any{ + {"db02"}, {"information_schema"}, {"mysql"}, {"performance_schema"}, {"sys"}, + }) - // MySQL doesn't return errors directly from the START REPLICA statement; instead, - // callers must check the replica status information for errors - replicaStatus := queryReplicaStatus(t) - require.Equal(t, "Source command COM_REGISTER_REPLICA failed: unknown error: no binlog currently being recorded; "+ - "make sure the server is started with @@log_bin enabled (Errno: 1105)", replicaStatus["Last_IO_Error"]) + // Verify that the Dolt primary server has two binary log files + requirePrimaryResults(t, "SHOW BINARY LOGS;", [][]any{ + {"binlog-main.000001", "312", "No"}, + {"binlog-main.000002", "191", "No"}, + }) +} - // Create a database and assert that it does not get replicated - primaryDatabase.MustExec("create database newDb;") +// TestBinlogPrimary_DisallowBranchesWithSlashes asserts that trying to set @@log_bin_branch to +// a branch name containing a slash results in an error. +func TestBinlogPrimary_DisallowBranchesWithSlashes(t *testing.T) { + defer teardown(t) + mapCopy := copyMap(doltReplicationPrimarySystemVars) + mapCopy["log_bin_branch"] = "'branch/withslash'" + startSqlServersWithDoltSystemVars(t, mapCopy) + setupForDoltToMySqlReplication() - // Note that we use a sleep here, instead of waitForReplicaToCatchUp, since - // replication is not enabled in this test - time.Sleep(200 * time.Millisecond) - requireReplicaResults(t, "show databases;", - [][]any{{"information_schema"}, {"mysql"}, {"performance_schema"}, {"sys"}}) + // Because the replication branch was invalid, the binary log status should be + // empty, indicating that no binary logs are being recorded. + requirePrimaryResults(t, "SHOW BINARY LOG STATUS;", [][]any{}) } // TestBinlogPrimary_ChangeReplicationBranch asserts that the log_bin_branch system variable can @@ -292,7 +542,7 @@ func TestBinlogPrimary_ChangeReplicationBranch(t *testing.T) { mapCopy["log_bin_branch"] = "branch1" startSqlServersWithDoltSystemVars(t, mapCopy) setupForDoltToMySqlReplication() - startReplication(t, doltPort) + startReplicationAndCreateTestDb(t, doltPort) // No events should be generated when we're not updating the configured replication branch primaryDatabase.MustExec("create table db01.t (pk varchar(100) primary key, c1 int, c2 year);") @@ -327,7 +577,7 @@ func TestBinlogPrimary_SimpleSchemaChangesWithAutocommit(t *testing.T) { defer teardown(t) startSqlServersWithDoltSystemVars(t, doltReplicationPrimarySystemVars) setupForDoltToMySqlReplication() - startReplication(t, doltPort) + startReplicationAndCreateTestDb(t, doltPort) // Create a table primaryDatabase.MustExec("create table db01.t1 (pk int primary key, c1 varchar(255) NOT NULL comment 'foo bar baz');") @@ -377,7 +627,7 @@ func TestBinlogPrimary_SchemaChangesWithManualCommit(t *testing.T) { defer teardown(t) startSqlServersWithDoltSystemVars(t, doltReplicationPrimarySystemVars) setupForDoltToMySqlReplication() - startReplication(t, doltPort) + startReplicationAndCreateTestDb(t, doltPort) // Create table primaryDatabase.MustExec("set @@autocommit=0;") @@ -410,7 +660,7 @@ func TestBinlogPrimary_Rollback(t *testing.T) { defer teardown(t) startSqlServersWithDoltSystemVars(t, doltReplicationPrimarySystemVars) setupForDoltToMySqlReplication() - startReplication(t, doltPort) + startReplicationAndCreateTestDb(t, doltPort) // Create a test table primaryDatabase.MustExec("set @@autocommit=0;") @@ -435,7 +685,7 @@ func TestBinlogPrimary_MultipleTablesManualCommit(t *testing.T) { defer teardown(t) startSqlServersWithDoltSystemVars(t, doltReplicationPrimarySystemVars) setupForDoltToMySqlReplication() - startReplication(t, doltPort) + startReplicationAndCreateTestDb(t, doltPort) // Insert to multiple tables in a single SQL transaction primaryDatabase.MustExec("set @@autocommit=0;") @@ -481,7 +731,7 @@ func TestBinlogPrimary_ReplicateCreateDropDatabase(t *testing.T) { defer teardown(t) startSqlServersWithDoltSystemVars(t, doltReplicationPrimarySystemVars) setupForDoltToMySqlReplication() - startReplication(t, doltPort) + startReplicationAndCreateTestDb(t, doltPort) // Test CREATE DATABASE primaryDatabase.MustExec("create database foobar1;") @@ -512,7 +762,7 @@ func TestBinlogPrimary_InsertUpdateDelete(t *testing.T) { defer teardown(t) startSqlServersWithDoltSystemVars(t, doltReplicationPrimarySystemVars) setupForDoltToMySqlReplication() - startReplication(t, doltPort) + startReplicationAndCreateTestDb(t, doltPort) primaryDatabase.MustExec("create table db01.t (pk varchar(100) primary key, c1 int, c2 year);") @@ -552,7 +802,7 @@ func TestBinlogPrimary_OnlyReplicateMainBranch(t *testing.T) { defer teardown(t) startSqlServersWithDoltSystemVars(t, doltReplicationPrimarySystemVars) setupForDoltToMySqlReplication() - startReplication(t, doltPort) + startReplicationAndCreateTestDb(t, doltPort) primaryDatabase.MustExec("create table db01.t (pk varchar(100) primary key, c1 int, c2 year);") primaryDatabase.MustExec("call dolt_commit('-Am', 'creating table t');") @@ -582,7 +832,7 @@ func TestBinlogPrimary_KeylessTables(t *testing.T) { defer teardown(t) startSqlServersWithDoltSystemVars(t, doltReplicationPrimarySystemVars) setupForDoltToMySqlReplication() - startReplication(t, doltPort) + startReplicationAndCreateTestDb(t, doltPort) primaryDatabase.MustExec("create table db01.t (c1 varchar(100), c2 int, c3 int unsigned);") primaryDatabase.MustExec("call dolt_commit('-Am', 'creating table t');") @@ -618,7 +868,7 @@ func TestBinlogPrimary_Merge(t *testing.T) { defer teardown(t) startSqlServersWithDoltSystemVars(t, doltReplicationPrimarySystemVars) setupForDoltToMySqlReplication() - startReplication(t, doltPort) + startReplicationAndCreateTestDb(t, doltPort) primaryDatabase.MustExec("create table db01.t (pk varchar(100) primary key, c1 int, c2 year);") primaryDatabase.MustExec("call dolt_commit('-Am', 'creating table t');") @@ -651,7 +901,7 @@ func TestBinlogPrimary_Cherrypick(t *testing.T) { defer teardown(t) startSqlServersWithDoltSystemVars(t, doltReplicationPrimarySystemVars) setupForDoltToMySqlReplication() - startReplication(t, doltPort) + startReplicationAndCreateTestDb(t, doltPort) primaryDatabase.MustExec("create table db01.t (pk varchar(100) primary key, c1 int);") primaryDatabase.MustExec("call dolt_commit('-Am', 'creating table t');") @@ -688,7 +938,7 @@ func TestBinlogPrimary_Revert(t *testing.T) { defer teardown(t) startSqlServersWithDoltSystemVars(t, doltReplicationPrimarySystemVars) setupForDoltToMySqlReplication() - startReplication(t, doltPort) + startReplicationAndCreateTestDb(t, doltPort) primaryDatabase.MustExec("create table db01.t (pk varchar(100) primary key, c1 int);") primaryDatabase.MustExec("call dolt_commit('-Am', 'creating table t');") @@ -725,7 +975,7 @@ func TestBinlogPrimary_Reset(t *testing.T) { defer teardown(t) startSqlServersWithDoltSystemVars(t, doltReplicationPrimarySystemVars) setupForDoltToMySqlReplication() - startReplication(t, doltPort) + startReplicationAndCreateTestDb(t, doltPort) primaryDatabase.MustExec("create table db01.t (pk varchar(100) primary key, c1 int);") primaryDatabase.MustExec("call dolt_commit('-Am', 'creating table t');") @@ -844,3 +1094,25 @@ func waitForReplicaToReconnect(t *testing.T) { } } } + +// mustRestartDoltPrimaryServer starts up the Dolt sql-server, after it has already been stopped before this function +// is called, and configures it as the primary database. +func mustRestartDoltPrimaryServer(t *testing.T) { + var err error + prevReplicaDatabase := replicaDatabase + doltPort, doltProcess, err = startDoltSqlServer(testDir, nil) + require.NoError(t, err) + primaryDatabase = replicaDatabase + replicaDatabase = prevReplicaDatabase +} + +// mustRestartMySqlReplicaServer starts up the MySQL server, after it has already been stopped before this function +// is called, and configures it as the replica database. +func mustRestartMySqlReplicaServer(t *testing.T) { + var err error + prevPrimaryDatabase := primaryDatabase + mySqlPort, mySqlProcess, err = startMySqlServer(testDir) + require.NoError(t, err) + replicaDatabase = primaryDatabase + primaryDatabase = prevPrimaryDatabase +} diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_producer.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_producer.go index 8fa1fa515b5..737028c466e 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_producer.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_producer.go @@ -25,12 +25,14 @@ import ( "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/vitess/go/mysql" + "github.com/sirupsen/logrus" "github.com/dolthub/dolt/go/libraries/doltcore/diff" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable" "github.com/dolthub/dolt/go/libraries/doltcore/schema" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlfmt" + "github.com/dolthub/dolt/go/libraries/utils/filesys" "github.com/dolthub/dolt/go/store/prolly" "github.com/dolthub/dolt/go/store/prolly/tree" "github.com/dolthub/dolt/go/store/val" @@ -39,11 +41,6 @@ import ( // BinlogBranch specifies the branch used for generating binlog events. var BinlogBranch = "main" -// binlogFilename is the name of the filename used in binlog events. Note that -// currently, this doesn't map to a real file on disk yet, but the filename is -// still needed for binlog messages. -var binlogFilename = "binlog-" + BinlogBranch + ".000001" - // binlogProducer implements the doltdb.DatabaseUpdateListener interface so that it can listen for updates to Dolt // databases and generate binlog events describing them. Those binlog events are sent to the binlogStreamerManager, // which is responsible for delivering them to each connected replica. @@ -51,15 +48,15 @@ var binlogFilename = "binlog-" + BinlogBranch + ".000001" // Note that the initial version of binlogProducer currently delivers the generated binlog events directly to the // connected replicas, without actually writing them to a real binlog file on disk. type binlogProducer struct { - binlogEventMeta mysql.BinlogEventMetadata binlogFormat *mysql.BinlogFormat + binlogEventMeta mysql.BinlogEventMetadata mu *sync.Mutex gtidPosition *mysql.Position gtidSequence int64 - streamerManager *binlogStreamerManager + logManager *logManager } var _ doltdb.DatabaseUpdateListener = (*binlogProducer)(nil) @@ -67,19 +64,29 @@ var _ doltdb.DatabaseUpdateListener = (*binlogProducer)(nil) // NewBinlogProducer creates and returns a new instance of BinlogProducer. Note that callers must register the // returned binlogProducer as a DatabaseUpdateListener before it will start receiving database updates and start // producing binlog events. -func NewBinlogProducer(streamerManager *binlogStreamerManager) (*binlogProducer, error) { +func NewBinlogProducer(fs filesys.Filesys) (*binlogProducer, error) { binlogFormat := createBinlogFormat() binlogEventMeta, err := createBinlogEventMetadata() if err != nil { return nil, err } - return &binlogProducer{ + b := &binlogProducer{ binlogEventMeta: *binlogEventMeta, binlogFormat: binlogFormat, - streamerManager: streamerManager, mu: &sync.Mutex{}, - }, nil + } + + if err = b.initializeGtidPosition(fs); err != nil { + return nil, err + } + + return b, nil +} + +// LogManager sets the |logManager| this producer will send events to. +func (b *binlogProducer) LogManager(logManager *logManager) { + b.logManager = logManager } // WorkingRootUpdated implements the doltdb.DatabaseUpdateListener interface. When a working root changes, @@ -136,8 +143,7 @@ func (b *binlogProducer) WorkingRootUpdated(ctx *sql.Context, databaseName strin binlogEvents = append(binlogEvents, b.newXIDEvent()) } - b.streamerManager.sendEvents(binlogEvents) - return nil + return b.logManager.WriteEvents(binlogEvents...) } // DatabaseCreated implements the doltdb.DatabaseUpdateListener interface. @@ -156,8 +162,7 @@ func (b *binlogProducer) DatabaseCreated(ctx *sql.Context, databaseName string) createDatabaseStatement := fmt.Sprintf("create database `%s`;", databaseName) binlogEvents = append(binlogEvents, b.newQueryEvent(databaseName, createDatabaseStatement)) - b.streamerManager.sendEvents(binlogEvents) - return nil + return b.logManager.WriteEvents(binlogEvents...) } // DatabaseDropped implements the doltdb.DatabaseUpdateListener interface. @@ -172,15 +177,14 @@ func (b *binlogProducer) DatabaseDropped(ctx *sql.Context, databaseName string) dropDatabaseStatement := fmt.Sprintf("drop database `%s`;", databaseName) binlogEvents = append(binlogEvents, b.newQueryEvent(databaseName, dropDatabaseStatement)) - b.streamerManager.sendEvents(binlogEvents) - return nil + return b.logManager.WriteEvents(binlogEvents...) } // initializeGtidPosition loads the persisted GTID position from disk and initializes it // in this binlogStreamerManager instance. If the gtidPosition has already been loaded // from disk and initialized, this method simply returns. If any problems were encountered // loading the GTID position from disk, or parsing its value, an error is returned. -func (b *binlogProducer) initializeGtidPosition(ctx *sql.Context) error { +func (b *binlogProducer) initializeGtidPosition(fs filesys.Filesys) error { if b.gtidPosition != nil { return nil } @@ -188,7 +192,7 @@ func (b *binlogProducer) initializeGtidPosition(ctx *sql.Context) error { b.mu.Lock() defer b.mu.Unlock() - position, err := positionStore.Load(ctx) + position, err := positionStore.Load(fs) if err != nil { return err } @@ -230,17 +234,15 @@ func (b *binlogProducer) initializeGtidPosition(ctx *sql.Context) error { return fmt.Errorf("unable to parse GTID position (%s): %s", gtidString, err.Error()) } b.gtidSequence = int64(i + 1) - return nil + + logrus.Tracef("setting @@gtid_executed to %s", b.gtidPosition.GTIDSet.String()) + return sql.SystemVariables.AssignValues(map[string]any{ + "gtid_executed": b.gtidPosition.GTIDSet.String()}) } // createGtidEvent creates a new GTID event for the current transaction and updates the stream's // current log position. func (b *binlogProducer) createGtidEvent(ctx *sql.Context) (mysql.BinlogEvent, error) { - err := b.initializeGtidPosition(ctx) - if err != nil { - return nil, err - } - serverUuid, err := getServerUuid(ctx) if err != nil { return nil, err @@ -255,7 +257,6 @@ func (b *binlogProducer) createGtidEvent(ctx *sql.Context) (mysql.BinlogEvent, e gtid := mysql.Mysql56GTID{Server: sid, Sequence: b.gtidSequence} binlogEvent := mysql.NewMySQLGTIDEvent(*b.binlogFormat, b.binlogEventMeta, gtid, false) - b.binlogEventMeta.NextLogPosition += binlogEvent.Length() b.gtidSequence++ // Store the latest executed GTID to disk @@ -514,73 +515,43 @@ func (b *binlogProducer) currentGtidPosition() string { // newQueryEvent creates a new Query BinlogEvent for the specified |databaseName| and |query|, and updates the // stream's log position. func (b *binlogProducer) newQueryEvent(databaseName, query string) mysql.BinlogEvent { - b.mu.Lock() - defer b.mu.Unlock() - // TODO: Charset and SQL_MODE support - binlogEvent := mysql.NewQueryEvent(*b.binlogFormat, b.binlogEventMeta, mysql.Query{ + return mysql.NewQueryEvent(*b.binlogFormat, b.binlogEventMeta, mysql.Query{ Database: databaseName, Charset: nil, SQL: query, Options: 0, SqlMode: 0, }) - b.binlogEventMeta.NextLogPosition += binlogEvent.Length() - return binlogEvent } // newXIDEvent returns a new XID BinlogEvent and updates the stream's log position. func (b *binlogProducer) newXIDEvent() mysql.BinlogEvent { - b.mu.Lock() - defer b.mu.Unlock() - - binlogEvent := mysql.NewXIDEvent(*b.binlogFormat, b.binlogEventMeta) - b.binlogEventMeta.NextLogPosition += binlogEvent.Length() - return binlogEvent + return mysql.NewXIDEvent(*b.binlogFormat, b.binlogEventMeta) } // newTableMapEvent returns a new TableMap BinlogEvent for the specified |tableId| and |tableMap|, and updates the // stream's log position. func (b *binlogProducer) newTableMapEvent(tableId uint64, tableMap *mysql.TableMap) mysql.BinlogEvent { - b.mu.Lock() - defer b.mu.Unlock() - - binlogEvent := mysql.NewTableMapEvent(*b.binlogFormat, b.binlogEventMeta, tableId, tableMap) - b.binlogEventMeta.NextLogPosition += binlogEvent.Length() - return binlogEvent + return mysql.NewTableMapEvent(*b.binlogFormat, b.binlogEventMeta, tableId, tableMap) } // newWriteRowsEvent returns a new WriteRows BinlogEvent for the specified |tableId| and |rows|, and updates the // stream's log position. func (b *binlogProducer) newWriteRowsEvent(tableId uint64, rows mysql.Rows) mysql.BinlogEvent { - b.mu.Lock() - defer b.mu.Unlock() - - binlogEvent := mysql.NewWriteRowsEvent(*b.binlogFormat, b.binlogEventMeta, tableId, rows) - b.binlogEventMeta.NextLogPosition += binlogEvent.Length() - return binlogEvent + return mysql.NewWriteRowsEvent(*b.binlogFormat, b.binlogEventMeta, tableId, rows) } // newDeleteRowsEvent returns a new DeleteRows BinlogEvent for the specified |tableId| and |rows|, and updates the // stream's log position. func (b *binlogProducer) newDeleteRowsEvent(tableId uint64, rows mysql.Rows) mysql.BinlogEvent { - b.mu.Lock() - defer b.mu.Unlock() - - binlogEvent := mysql.NewDeleteRowsEvent(*b.binlogFormat, b.binlogEventMeta, tableId, rows) - b.binlogEventMeta.NextLogPosition += binlogEvent.Length() - return binlogEvent + return mysql.NewDeleteRowsEvent(*b.binlogFormat, b.binlogEventMeta, tableId, rows) } // newUpdateRowsEvent returns a new UpdateRows BinlogEvent for the specified |tableId| and |rows|, and updates the // stream's log position. func (b *binlogProducer) newUpdateRowsEvent(tableId uint64, rows mysql.Rows) mysql.BinlogEvent { - b.mu.Lock() - defer b.mu.Unlock() - - binlogEvent := mysql.NewUpdateRowsEvent(*b.binlogFormat, b.binlogEventMeta, tableId, rows) - b.binlogEventMeta.NextLogPosition += binlogEvent.Length() - return binlogEvent + return mysql.NewUpdateRowsEvent(*b.binlogFormat, b.binlogEventMeta, tableId, rows) } // extractRowCountAndDiffType uses |sch| and |diff| to determine how many changed rows this @@ -716,8 +687,7 @@ func createBinlogEventMetadata() (*mysql.BinlogEventMetadata, error) { } return &mysql.BinlogEventMetadata{ - ServerID: serverId, - NextLogPosition: 0, - Timestamp: uint32(time.Now().Unix()), + ServerID: serverId, + Timestamp: uint32(time.Now().Unix()), }, nil } diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_replica_applier.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_replica_applier.go index b7bd716fe00..171dab43b88 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_replica_applier.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_replica_applier.go @@ -190,7 +190,10 @@ func (a *binlogReplicaApplier) startReplicationEventStream(ctx *sql.Context, con return err } - position, err := positionStore.Load(ctx) + doltSession := dsess.DSessFromSess(ctx.Session) + filesys := doltSession.Provider().FileSystem() + + position, err := positionStore.Load(filesys) if err != nil { return err } diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_alltypes_test.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_alltypes_test.go index 643bbaf946e..c62c240f10c 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_alltypes_test.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_alltypes_test.go @@ -29,7 +29,7 @@ import ( func TestBinlogReplicationForAllTypes(t *testing.T) { defer teardown(t) startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) - startReplication(t, mySqlPort) + startReplicationAndCreateTestDb(t, mySqlPort) // Set the session's timezone to UTC, to avoid TIMESTAMP test values changing // when they are converted to UTC for storage. diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_filters_test.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_filters_test.go index ebefa64d5af..4d6adc0061e 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_filters_test.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_filters_test.go @@ -26,7 +26,7 @@ import ( func TestBinlogReplicationFilters_ignoreTablesOnly(t *testing.T) { defer teardown(t) startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) - startReplication(t, mySqlPort) + startReplicationAndCreateTestDb(t, mySqlPort) // Ignore replication events for db01.t2. Also tests that the first filter setting is overwritten by // the second and that db and that db and table names are case-insensitive. @@ -77,7 +77,7 @@ func TestBinlogReplicationFilters_ignoreTablesOnly(t *testing.T) { func TestBinlogReplicationFilters_doTablesOnly(t *testing.T) { defer teardown(t) startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) - startReplication(t, mySqlPort) + startReplicationAndCreateTestDb(t, mySqlPort) // Do replication events for db01.t1. Also tests that the first filter setting is overwritten by // the second and that db and that db and table names are case-insensitive. @@ -128,7 +128,7 @@ func TestBinlogReplicationFilters_doTablesOnly(t *testing.T) { func TestBinlogReplicationFilters_doTablesAndIgnoreTables(t *testing.T) { defer teardown(t) startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) - startReplication(t, mySqlPort) + startReplicationAndCreateTestDb(t, mySqlPort) // Do replication events for db01.t1, and db01.t2 replicaDatabase.MustExec("CHANGE REPLICATION FILTER REPLICATE_DO_TABLE=(db01.t1, db01.t2);") diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_multidb_test.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_multidb_test.go index 93c37c33cf4..61436050d82 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_multidb_test.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_multidb_test.go @@ -25,7 +25,7 @@ import ( func TestBinlogReplicationMultiDb(t *testing.T) { defer teardown(t) startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) - startReplication(t, mySqlPort) + startReplicationAndCreateTestDb(t, mySqlPort) // Make changes on the primary to db01 and db02 primaryDatabase.MustExec("create database db02;") @@ -127,7 +127,7 @@ func TestBinlogReplicationMultiDb(t *testing.T) { func TestBinlogReplicationMultiDbTransactions(t *testing.T) { defer teardown(t) startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) - startReplication(t, mySqlPort) + startReplicationAndCreateTestDb(t, mySqlPort) // Make changes on the primary to db01 and db02 primaryDatabase.MustExec("create database db02;") diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_reconnect_test.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_reconnect_test.go index 33a9d403ea5..30e44e33084 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_reconnect_test.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_reconnect_test.go @@ -39,7 +39,7 @@ func TestBinlogReplicationAutoReconnect(t *testing.T) { startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) configureToxiProxy(t) configureFastConnectionRetry(t) - startReplication(t, proxyPort) + startReplicationAndCreateTestDb(t, proxyPort) // Get the replica started up and ensure it's in sync with the primary before turning on the limit_data toxic testInitialReplicaStatus(t) diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_restart_test.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_restart_test.go index 312079b1afd..70810019f7c 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_restart_test.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_restart_test.go @@ -27,7 +27,7 @@ import ( func TestBinlogReplicationServerRestart(t *testing.T) { defer teardown(t) startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) - startReplication(t, mySqlPort) + startReplicationAndCreateTestDb(t, mySqlPort) primaryDatabase.MustExec("create table t (pk int auto_increment primary key)") diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_test.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_test.go index ba8a0e26572..8b26268de1d 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_test.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_replication_test.go @@ -33,7 +33,6 @@ import ( "testing" "time" - "github.com/dolthub/vitess/go/mysql" _ "github.com/go-sql-driver/mysql" "github.com/jmoiron/sqlx" "github.com/stretchr/testify/require" @@ -80,6 +79,9 @@ func teardown(t *testing.T) { printFile(doltLogFilePath) fmt.Printf("\nMySQL server log from %s:\n", mysqlLogFilePath) printFile(mysqlLogFilePath) + mysqlErrorLogFilePath := filepath.Join(filepath.Dir(mysqlLogFilePath), "error_log.err") + fmt.Printf("\nMySQL server error log from %s:\n", mysqlErrorLogFilePath) + printFile(mysqlErrorLogFilePath) } else { // clean up temp files on clean test runs defer os.RemoveAll(testDir) @@ -99,7 +101,7 @@ func teardown(t *testing.T) { func TestBinlogReplicationSanityCheck(t *testing.T) { defer teardown(t) startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) - startReplication(t, mySqlPort) + startReplicationAndCreateTestDb(t, mySqlPort) // Make changes on the primary and verify on the replica primaryDatabase.MustExec("create table t (pk int primary key)") @@ -124,7 +126,7 @@ func TestBinlogSystemUserIsLocked(t *testing.T) { require.ErrorContains(t, err, "User not found") // After starting replication, the system account is locked - startReplication(t, mySqlPort) + startReplicationAndCreateTestDb(t, mySqlPort) err = db.Ping() require.Error(t, err) require.ErrorContains(t, err, "Access denied for user") @@ -136,7 +138,7 @@ func TestBinlogSystemUserIsLocked(t *testing.T) { func TestFlushLogs(t *testing.T) { defer teardown(t) startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) - startReplication(t, mySqlPort) + startReplicationAndCreateTestDb(t, mySqlPort) // Make changes on the primary and verify on the replica primaryDatabase.MustExec("create table t (pk int primary key)") @@ -160,7 +162,7 @@ func TestFlushLogs(t *testing.T) { func TestResetReplica(t *testing.T) { defer teardown(t) startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) - startReplication(t, mySqlPort) + startReplicationAndCreateTestDb(t, mySqlPort) // RESET REPLICA returns an error if replication is running _, err := replicaDatabase.Queryx("RESET REPLICA") @@ -199,7 +201,7 @@ func TestResetReplica(t *testing.T) { require.NoError(t, rows.Close()) // Start replication again and verify that we can still query replica status - startReplication(t, mySqlPort) + startReplicationAndCreateTestDb(t, mySqlPort) replicaStatus := showReplicaStatus(t) require.Equal(t, "0", replicaStatus["Last_Errno"]) require.Equal(t, "", replicaStatus["Last_Error"]) @@ -233,7 +235,7 @@ func TestStartReplicaErrors(t *testing.T) { require.Nil(t, rows) // START REPLICA logs a warning if replication is already running - startReplication(t, mySqlPort) + startReplicationAndCreateTestDb(t, mySqlPort) replicaDatabase.MustExec("START REPLICA;") assertWarning(t, replicaDatabase, 3083, "Replication thread(s) for channel '' are already running.") } @@ -275,7 +277,7 @@ func TestStopReplica(t *testing.T) { require.Equal(t, "No", status["Replica_SQL_Running"]) // START REPLICA and verify status - startReplication(t, mySqlPort) + startReplicationAndCreateTestDb(t, mySqlPort) time.Sleep(100 * time.Millisecond) status = showReplicaStatus(t) require.True(t, status["Replica_IO_Running"] == "Connecting" || status["Replica_IO_Running"] == "Yes") @@ -296,7 +298,7 @@ func TestStopReplica(t *testing.T) { func TestDoltCommits(t *testing.T) { defer teardown(t) startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) - startReplication(t, mySqlPort) + startReplicationAndCreateTestDb(t, mySqlPort) // First transaction (DDL) primaryDatabase.MustExec("create table t1 (pk int primary key);") @@ -376,7 +378,7 @@ func TestDoltCommits(t *testing.T) { func TestForeignKeyChecks(t *testing.T) { defer teardown(t) startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) - startReplication(t, mySqlPort) + startReplicationAndCreateTestDb(t, mySqlPort) // Test that we can execute statement-based replication that requires foreign_key_checks // being turned off (referenced table doesn't exist yet). @@ -433,7 +435,7 @@ func TestForeignKeyChecks(t *testing.T) { func TestCharsetsAndCollations(t *testing.T) { defer teardown(t) startSqlServersWithDoltSystemVars(t, doltReplicaSystemVars) - startReplication(t, mySqlPort) + startReplicationAndCreateTestDb(t, mySqlPort) // Use non-default charset/collations to create data on the primary primaryDatabase.MustExec("CREATE TABLE t1 (pk int primary key, c1 varchar(255) COLLATE ascii_general_ci, c2 varchar(255) COLLATE utf16_general_ci);") @@ -460,10 +462,11 @@ func TestCharsetsAndCollations(t *testing.T) { // Test Helper Functions // -// waitForReplicaToCatchUp waits (up to 60s) for the replica to catch up with the primary database. The +// waitForReplicaToCatchUp waits (up to 30s) for the replica to catch up with the primary database. The // lag is measured by checking that gtid_executed is the same on the primary and replica. func waitForReplicaToCatchUp(t *testing.T) { - timeLimit := 60 * time.Second + timeLimit := 30 * time.Second + endTime := time.Now().Add(timeLimit) for time.Now().Before(endTime) { replicaGtid := queryGtid(t, replicaDatabase) @@ -482,45 +485,6 @@ func waitForReplicaToCatchUp(t *testing.T) { t.Fatal("primary and replica did not synchronize within " + timeLimit.String()) } -// waitForReplicaToHaveLatestGtid waits (up to 10s) for the replica to contain the -// most recent GTID executed from the primary. Both the primary and replica are queried -// for the value of @@gtid_executed to determine if the replica contains the most recent -// transaction from the primary. -func waitForReplicaToHaveLatestGtid(t *testing.T) { - timeLimit := 10 * time.Second - endTime := time.Now().Add(timeLimit) - for time.Now().Before(endTime) { - replicaGtid := queryGtid(t, replicaDatabase) - primaryGtid := queryGtid(t, primaryDatabase) - - replicaGtidSet, err := mysql.ParseMysql56GTIDSet(replicaGtid) - require.NoError(t, err) - - idx := strings.Index(primaryGtid, ":") - require.True(t, idx > 0) - uuid := primaryGtid[:idx] - sequencePortion := primaryGtid[idx+1:] - if strings.Contains(sequencePortion, ":") { - sequencePortion = sequencePortion[strings.LastIndex(sequencePortion, ":")+1:] - } - if strings.Contains(sequencePortion, "-") { - sequencePortion = sequencePortion[strings.LastIndex(sequencePortion, "-")+1:] - } - - latestGtid, err := mysql.ParseGTID("MySQL56", uuid+":"+sequencePortion) - require.NoError(t, err) - - if replicaGtidSet.ContainsGTID(latestGtid) { - return - } else { - fmt.Printf("replica does not contain latest GTID from the primary yet... (primary: %s, replica: %s)\n", primaryGtid, replicaGtid) - time.Sleep(250 * time.Millisecond) - } - } - - t.Fatal("replica did not synchronize the latest GTID from the primary within " + timeLimit.String()) -} - // waitForReplicaToReachGtid waits (up to 10s) for the replica's @@gtid_executed sys var to show that // it has executed the |target| gtid transaction number. func waitForReplicaToReachGtid(t *testing.T, target int) { @@ -686,22 +650,27 @@ func stopDoltSqlServer(t *testing.T) { } } -// startReplication starts up replication on the replica, connecting to |port| on the primary, -// creates the test database, db01, on the primary, and ensures it gets replicated to the replica. -func startReplication(t *testing.T, port int) { - startReplicationWithDelay(t, port, 100*time.Millisecond) -} - -// startReplication starts up replication on the replica, connecting to |port| on the primary, -// pauses for |delay| before creating the test database, db01, on the primary, and ensures it -// gets replicated to the replica. -func startReplicationWithDelay(t *testing.T, port int, delay time.Duration) { +// startReplication configures the replication source on the replica and runs the START REPLICA statement. +func startReplication(_ *testing.T, port int) { replicaDatabase.MustExec( fmt.Sprintf("change replication source to SOURCE_HOST='localhost', "+ "SOURCE_USER='replicator', SOURCE_PASSWORD='Zqr8_blrGm1!', "+ "SOURCE_PORT=%v, SOURCE_AUTO_POSITION=1, SOURCE_CONNECT_RETRY=5;", port)) replicaDatabase.MustExec("start replica;") +} + +// startReplicationAndCreateTestDb starts up replication on the replica, connecting to |port| on the primary, +// creates the test database, db01, on the primary, and ensures it gets replicated to the replica. +func startReplicationAndCreateTestDb(t *testing.T, port int) { + startReplicationAndCreateTestDbWithDelay(t, port, 100*time.Millisecond) +} + +// startReplicationAndCreateTestDbWithDelay starts up replication on the replica, connecting to |port| on the primary, +// pauses for |delay| before creating the test database, db01, on the primary, and ensures it +// gets replicated to the replica. +func startReplicationAndCreateTestDbWithDelay(t *testing.T, port int, delay time.Duration) { + startReplication(t, port) time.Sleep(delay) // Look to see if the test database, db01, has been created yet. If not, create it and wait for it to @@ -824,9 +793,8 @@ func startMySqlServer(dir string) (int, *os.Process, error) { "--server-id=11223344", fmt.Sprintf("--socket=mysql-%v.sock", mySqlPort), "--general_log_file="+dir+"general_log", - "--log-bin="+dir+"log_bin", "--slow_query_log_file="+dir+"slow_query_log", - "--log-error="+dir+"log_error", + "--log-error="+dir+"error_log", fmt.Sprintf("--pid-file="+dir+"pid-%v.pid", mySqlPort)) mysqlLogFilePath = filepath.Join(dir, fmt.Sprintf("mysql-%d.out.log", time.Now().Unix()))