Skip to content

Commit

Permalink
Merge pull request dolthub#8139 from dolthub/fulghum/binlog_logfiles
Browse files Browse the repository at this point in the history
Feature: Log binlog events to disk
  • Loading branch information
fulghum authored Jul 29, 2024
2 parents c563747 + 8b29daf commit 7834f6a
Show file tree
Hide file tree
Showing 16 changed files with 1,472 additions and 327 deletions.
35 changes: 0 additions & 35 deletions go/cmd/dolt/commands/engine/sqlengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/dolthub/dolt/go/cmd/dolt/cli"
"github.com/dolthub/dolt/go/libraries/doltcore/branch_control"
"github.com/dolthub/dolt/go/libraries/doltcore/dconfig"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/servercfg"
dsqle "github.com/dolthub/dolt/go/libraries/doltcore/sqle"
Expand Down Expand Up @@ -154,8 +153,6 @@ func NewSqlEngine(
if err := configureBinlogPrimaryController(engine); err != nil {
return nil, err
}
pro.AddInitDatabaseHook(dblr.NewBinlogInitDatabaseHook(ctx, doltdb.DatabaseUpdateListeners))
pro.AddDropDatabaseHook(dblr.NewBinlogDropDatabaseHook(ctx, doltdb.DatabaseUpdateListeners))

config.ClusterController.SetIsStandbyCallback(func(isStandby bool) {
pro.SetIsStandby(isStandby)
Expand Down Expand Up @@ -351,38 +348,6 @@ func configureBinlogReplicaController(config *SqlEngineConfig, engine *gms.Engin
func configureBinlogPrimaryController(engine *gms.Engine) error {
primaryController := dblr.NewDoltBinlogPrimaryController()
engine.Analyzer.Catalog.BinlogPrimaryController = primaryController

_, logBinValue, ok := sql.SystemVariables.GetGlobal("log_bin")
if !ok {
return fmt.Errorf("unable to load @@log_bin system variable")
}
logBin, ok := logBinValue.(int8)
if !ok {
return fmt.Errorf("unexpected type for @@log_bin system variable: %T", logBinValue)
}
if logBin == 1 {
logrus.Debug("Enabling binary logging")
binlogProducer, err := dblr.NewBinlogProducer(primaryController.StreamerManager())
if err != nil {
return err
}
doltdb.RegisterDatabaseUpdateListener(binlogProducer)
primaryController.BinlogProducer = binlogProducer
}

_, logBinBranchValue, ok := sql.SystemVariables.GetGlobal("log_bin_branch")
if !ok {
return fmt.Errorf("unable to load @@log_bin_branch system variable")
}
logBinBranch, ok := logBinBranchValue.(string)
if !ok {
return fmt.Errorf("unexpected type for @@log_bin_branch system variable: %T", logBinBranchValue)
}
if logBinBranch != "" {
logrus.Debugf("Setting binary logging branch to %s", logBinBranch)
dblr.BinlogBranch = logBinBranch
}

return nil
}

Expand Down
67 changes: 67 additions & 0 deletions go/cmd/dolt/commands/sqlserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,73 @@ func ConfigureServices(
}
controller.Register(PersistNondeterministicSystemVarDefaults)

InitBinlogging := &svcs.AnonService{
InitF: func(context.Context) error {
primaryController := sqlEngine.GetUnderlyingEngine().Analyzer.Catalog.BinlogPrimaryController
doltBinlogPrimaryController, ok := primaryController.(*binlogreplication.DoltBinlogPrimaryController)
if !ok {
return fmt.Errorf("unexpected type of binlog controller: %T", primaryController)
}

_, logBinValue, ok := sql.SystemVariables.GetGlobal("log_bin")
if !ok {
return fmt.Errorf("unable to load @@log_bin system variable")
}
logBin, ok := logBinValue.(int8)
if !ok {
return fmt.Errorf("unexpected type for @@log_bin system variable: %T", logBinValue)
}

_, logBinBranchValue, ok := sql.SystemVariables.GetGlobal("log_bin_branch")
if !ok {
return fmt.Errorf("unable to load @@log_bin_branch system variable")
}
logBinBranch, ok := logBinBranchValue.(string)
if !ok {
return fmt.Errorf("unexpected type for @@log_bin_branch system variable: %T", logBinBranchValue)
}
if logBinBranch != "" {
// If an invalid branch has been configured, let the server start up so that it's
// easier for customers to correct the value, but log a warning and don't enable
// binlog replication.
if strings.Contains(logBinBranch, "/") {
logrus.Warnf("branch names containing '/' are not supported "+
"for binlog replication. Not enabling binlog replication; fix "+
"@@log_bin_branch value and restart Dolt (current value: %s)", logBinBranch)
return nil
}

binlogreplication.BinlogBranch = logBinBranch
}

if logBin == 1 {
logrus.Infof("Enabling binary logging for branch %s", logBinBranch)
binlogProducer, err := binlogreplication.NewBinlogProducer(dEnv.FS)
if err != nil {
return err
}

logManager, err := binlogreplication.NewLogManager(fs)
if err != nil {
return err
}
binlogProducer.LogManager(logManager)
doltdb.RegisterDatabaseUpdateListener(binlogProducer)
doltBinlogPrimaryController.BinlogProducer(binlogProducer)

// Register binlog hooks for database creation/deletion
provider := sqlEngine.GetUnderlyingEngine().Analyzer.Catalog.DbProvider
if doltProvider, ok := provider.(*sqle.DoltDatabaseProvider); ok {
doltProvider.AddInitDatabaseHook(binlogreplication.NewBinlogInitDatabaseHook(nil, doltdb.DatabaseUpdateListeners))
doltProvider.AddDropDatabaseHook(binlogreplication.NewBinlogDropDatabaseHook(nil, doltdb.DatabaseUpdateListeners))
}
}

return nil
},
}
controller.Register(InitBinlogging)

// Add superuser if specified user exists; add root superuser if no user specified and no existing privileges
InitSuperUser := &svcs.AnonService{
InitF: func(context.Context) error {
Expand Down
132 changes: 132 additions & 0 deletions go/libraries/doltcore/sqle/binlogreplication/binlog_file_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright 2024 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package binlogreplication

import (
"encoding/binary"
"fmt"
"os"
"strconv"
"strings"

"github.com/dolthub/vitess/go/mysql"
)

// binlogFileMagicNumber holds the four bytes that start off every
// MySQL binlog file and identify the file as a MySQL binlog.
var binlogFileMagicNumber = []byte{0xfe, 0x62, 0x69, 0x6e}

// fileExists returns true if the specified |filepath| exists on disk and is not a directory,
// otherwise returns false. |filepath| is a fully specified path to a file on disk.
func fileExists(filepath string) bool {
info, err := os.Stat(filepath)
if os.IsNotExist(err) {
return false
}
return !info.IsDir()
}

// openBinlogFileForReading opens the specified |logfile| for reading and reads the first four bytes to make sure they
// are the expected binlog file magic numbers. If any problems are encountered opening the file or reading the first
// four bytes, an error is returned.
func openBinlogFileForReading(logfile string) (*os.File, error) {
file, err := os.Open(logfile)
if err != nil {
return nil, err
}
buffer := make([]byte, len(binlogFileMagicNumber))
bytesRead, err := file.Read(buffer)
if err != nil {
return nil, err
}
if bytesRead != len(binlogFileMagicNumber) || string(buffer) != string(binlogFileMagicNumber) {
return nil, fmt.Errorf("invalid magic number in binlog file!")
}

return file, nil
}

// readBinlogEventFromFile reads the next binlog event from the specified, open |file| and
// returns it. If no more events are available in the file, then io.EOF is returned.
func readBinlogEventFromFile(file *os.File) (mysql.BinlogEvent, error) {
headerBuffer := make([]byte, 4+1+4+4+4+2)
_, err := file.Read(headerBuffer)
if err != nil {
return nil, err
}

// Event Header:
//timestamp := headerBuffer[0:4]
//eventType := headerBuffer[4]
//serverId := binary.LittleEndian.Uint32(headerBuffer[5:5+4])
eventSize := binary.LittleEndian.Uint32(headerBuffer[9 : 9+4])

payloadBuffer := make([]byte, eventSize-uint32(len(headerBuffer)))
_, err = file.Read(payloadBuffer)
if err != nil {
return nil, err
}

return mysql.NewMysql56BinlogEvent(append(headerBuffer, payloadBuffer...)), nil
}

// readFirstGtidEventFromFile reads events from |file| until a GTID event is found, then
// returns that GTID event. If |file| has been read completely and no GTID events were found,
// then io.EOF is returned.
func readFirstGtidEventFromFile(file *os.File) (mysql.BinlogEvent, error) {
for {
binlogEvent, err := readBinlogEventFromFile(file)
if err != nil {
return nil, err
}

if binlogEvent.IsGTID() {
return binlogEvent, nil
}
}
}

// formatBinlogFilename formats a binlog filename using the specified |branch| and |sequence| number. The
// returned filename will be of the form "binlog-main.000001".
func formatBinlogFilename(branch string, sequence int) string {
return fmt.Sprintf("binlog-%s.%06d", branch, sequence)
}

// parseBinlogFilename parses a binlog filename, of the form "binlog-main.000001", into its branch and
// sequence number.
func parseBinlogFilename(filename string) (branch string, sequence int, err error) {
if !strings.HasPrefix(filename, "binlog-") {
return "", 0, fmt.Errorf("invalid binlog filename: %s; must start with 'binlog-'", filename)
}

filename = strings.TrimPrefix(filename, "binlog-")

splits := strings.Split(filename, ".")
if len(splits) != 2 {
return "", 0, fmt.Errorf(
"unable to parse binlog filename: %s; expected format 'binlog-branch.sequence'", filename)
}

branch = splits[0]
sequenceString := splits[1]

sequence, err = strconv.Atoi(sequenceString)
if err != nil {
return "", 0, fmt.Errorf(
"unable to parse binlog sequence number: %s; %s", sequenceString, err.Error())
}

return branch, sequence, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/dolthub/vitess/go/mysql"

"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
)

const binlogPositionDirectory = ".doltcfg"
Expand All @@ -38,19 +39,16 @@ type binlogPositionStore struct {
mu sync.Mutex
}

// Load loads a mysql.Position instance from the .doltcfg/binlog-position file at the root of the provider's filesystem.
// Load loads a mysql.Position instance from the .doltcfg/binlog-position file at the root of the specified |filesystem|.
// This file MUST be stored at the root of the provider's filesystem, and NOT inside a nested database's .doltcfg directory,
// since the binlog position contains events that cover all databases in a SQL server. The returned mysql.Position
// represents the set of GTIDs that have been successfully executed and applied on this replica. Currently only the
// default binlog channel ("") is supported. If no .doltcfg/binlog-position file is stored, this method returns a nil
// mysql.Position and a nil error. If any errors are encountered, a nil mysql.Position and an error are returned.
func (store *binlogPositionStore) Load(ctx *sql.Context) (*mysql.Position, error) {
func (store *binlogPositionStore) Load(filesys filesys.Filesys) (*mysql.Position, error) {
store.mu.Lock()
defer store.mu.Unlock()

doltSession := dsess.DSessFromSess(ctx.Session)
filesys := doltSession.Provider().FileSystem()

doltDirExists, _ := filesys.Exists(binlogPositionDirectory)
if !doltDirExists {
return nil, nil
Expand Down
Loading

0 comments on commit 7834f6a

Please sign in to comment.