Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pull] main from dolthub:main #15

Merged
merged 17 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 2 additions & 37 deletions go/cmd/dolt/commands/engine/sqlengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,14 @@ import (
"github.com/dolthub/dolt/go/cmd/dolt/cli"
"github.com/dolthub/dolt/go/libraries/doltcore/branch_control"
"github.com/dolthub/dolt/go/libraries/doltcore/dconfig"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/servercfg"
dsqle "github.com/dolthub/dolt/go/libraries/doltcore/sqle"
dblr "github.com/dolthub/dolt/go/libraries/doltcore/sqle/binlogreplication"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/cluster"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/kvexec"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/mysql_file_handler"
drowexec "github.com/dolthub/dolt/go/libraries/doltcore/sqle/rowexec"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/statsnoms"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/statspro"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/writer"
Expand Down Expand Up @@ -154,8 +153,6 @@ func NewSqlEngine(
if err := configureBinlogPrimaryController(engine); err != nil {
return nil, err
}
pro.AddInitDatabaseHook(dblr.NewBinlogInitDatabaseHook(ctx, doltdb.DatabaseUpdateListeners))
pro.AddDropDatabaseHook(dblr.NewBinlogDropDatabaseHook(ctx, doltdb.DatabaseUpdateListeners))

config.ClusterController.SetIsStandbyCallback(func(isStandby bool) {
pro.SetIsStandby(isStandby)
Expand Down Expand Up @@ -195,7 +192,7 @@ func NewSqlEngine(
statsPro := statspro.NewProvider(pro, statsnoms.NewNomsStatsFactory(mrEnv.RemoteDialProvider()))
engine.Analyzer.Catalog.StatsProvider = statsPro

engine.Analyzer.ExecBuilder = rowexec.NewOverrideBuilder(drowexec.Builder{})
engine.Analyzer.ExecBuilder = rowexec.NewOverrideBuilder(kvexec.Builder{})
sessFactory := doltSessionFactory(pro, statsPro, mrEnv.Config(), bcController, config.Autocommit)
sqlEngine.provider = pro
sqlEngine.contextFactory = sqlContextFactory()
Expand Down Expand Up @@ -351,38 +348,6 @@ func configureBinlogReplicaController(config *SqlEngineConfig, engine *gms.Engin
func configureBinlogPrimaryController(engine *gms.Engine) error {
primaryController := dblr.NewDoltBinlogPrimaryController()
engine.Analyzer.Catalog.BinlogPrimaryController = primaryController

_, logBinValue, ok := sql.SystemVariables.GetGlobal("log_bin")
if !ok {
return fmt.Errorf("unable to load @@log_bin system variable")
}
logBin, ok := logBinValue.(int8)
if !ok {
return fmt.Errorf("unexpected type for @@log_bin system variable: %T", logBinValue)
}
if logBin == 1 {
logrus.Debug("Enabling binary logging")
binlogProducer, err := dblr.NewBinlogProducer(primaryController.StreamerManager())
if err != nil {
return err
}
doltdb.RegisterDatabaseUpdateListener(binlogProducer)
primaryController.BinlogProducer = binlogProducer
}

_, logBinBranchValue, ok := sql.SystemVariables.GetGlobal("log_bin_branch")
if !ok {
return fmt.Errorf("unable to load @@log_bin_branch system variable")
}
logBinBranch, ok := logBinBranchValue.(string)
if !ok {
return fmt.Errorf("unexpected type for @@log_bin_branch system variable: %T", logBinBranchValue)
}
if logBinBranch != "" {
logrus.Debugf("Setting binary logging branch to %s", logBinBranch)
dblr.BinlogBranch = logBinBranch
}

return nil
}

Expand Down
67 changes: 67 additions & 0 deletions go/cmd/dolt/commands/sqlserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,73 @@ func ConfigureServices(
}
controller.Register(PersistNondeterministicSystemVarDefaults)

InitBinlogging := &svcs.AnonService{
InitF: func(context.Context) error {
primaryController := sqlEngine.GetUnderlyingEngine().Analyzer.Catalog.BinlogPrimaryController
doltBinlogPrimaryController, ok := primaryController.(*binlogreplication.DoltBinlogPrimaryController)
if !ok {
return fmt.Errorf("unexpected type of binlog controller: %T", primaryController)
}

_, logBinValue, ok := sql.SystemVariables.GetGlobal("log_bin")
if !ok {
return fmt.Errorf("unable to load @@log_bin system variable")
}
logBin, ok := logBinValue.(int8)
if !ok {
return fmt.Errorf("unexpected type for @@log_bin system variable: %T", logBinValue)
}

_, logBinBranchValue, ok := sql.SystemVariables.GetGlobal("log_bin_branch")
if !ok {
return fmt.Errorf("unable to load @@log_bin_branch system variable")
}
logBinBranch, ok := logBinBranchValue.(string)
if !ok {
return fmt.Errorf("unexpected type for @@log_bin_branch system variable: %T", logBinBranchValue)
}
if logBinBranch != "" {
// If an invalid branch has been configured, let the server start up so that it's
// easier for customers to correct the value, but log a warning and don't enable
// binlog replication.
if strings.Contains(logBinBranch, "/") {
logrus.Warnf("branch names containing '/' are not supported "+
"for binlog replication. Not enabling binlog replication; fix "+
"@@log_bin_branch value and restart Dolt (current value: %s)", logBinBranch)
return nil
}

binlogreplication.BinlogBranch = logBinBranch
}

if logBin == 1 {
logrus.Infof("Enabling binary logging for branch %s", logBinBranch)
binlogProducer, err := binlogreplication.NewBinlogProducer(dEnv.FS)
if err != nil {
return err
}

logManager, err := binlogreplication.NewLogManager(fs)
if err != nil {
return err
}
binlogProducer.LogManager(logManager)
doltdb.RegisterDatabaseUpdateListener(binlogProducer)
doltBinlogPrimaryController.BinlogProducer(binlogProducer)

// Register binlog hooks for database creation/deletion
provider := sqlEngine.GetUnderlyingEngine().Analyzer.Catalog.DbProvider
if doltProvider, ok := provider.(*sqle.DoltDatabaseProvider); ok {
doltProvider.AddInitDatabaseHook(binlogreplication.NewBinlogInitDatabaseHook(nil, doltdb.DatabaseUpdateListeners))
doltProvider.AddDropDatabaseHook(binlogreplication.NewBinlogDropDatabaseHook(nil, doltdb.DatabaseUpdateListeners))
}
}

return nil
},
}
controller.Register(InitBinlogging)

// Add superuser if specified user exists; add root superuser if no user specified and no existing privileges
InitSuperUser := &svcs.AnonService{
InitF: func(context.Context) error {
Expand Down
4 changes: 2 additions & 2 deletions go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/dolthub/fslock v0.0.3
github.com/dolthub/ishell v0.0.0-20240701202509-2b217167d718
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81
github.com/dolthub/vitess v0.0.0-20240711213744-4232e1c4edae
github.com/dolthub/vitess v0.0.0-20240723185540-9f8cd7c03829
github.com/dustin/go-humanize v1.0.1
github.com/fatih/color v1.13.0
github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568
Expand Down Expand Up @@ -57,7 +57,7 @@ require (
github.com/cespare/xxhash/v2 v2.2.0
github.com/creasty/defaults v1.6.0
github.com/dolthub/flatbuffers/v23 v23.3.3-dh.2
github.com/dolthub/go-mysql-server v0.18.2-0.20240726192758-72470d302577
github.com/dolthub/go-mysql-server v0.18.2-0.20240729182701-b4cec29302d1
github.com/dolthub/gozstd v0.0.0-20240423170813-23a2903bca63
github.com/dolthub/swiss v0.1.0
github.com/goccy/go-json v0.10.2
Expand Down
8 changes: 4 additions & 4 deletions go/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ github.com/dolthub/fslock v0.0.3 h1:iLMpUIvJKMKm92+N1fmHVdxJP5NdyDK5bK7z7Ba2s2U=
github.com/dolthub/fslock v0.0.3/go.mod h1:QWql+P17oAAMLnL4HGB5tiovtDuAjdDTPbuqx7bYfa0=
github.com/dolthub/go-icu-regex v0.0.0-20230524105445-af7e7991c97e h1:kPsT4a47cw1+y/N5SSCkma7FhAPw7KeGmD6c9PBZW9Y=
github.com/dolthub/go-icu-regex v0.0.0-20230524105445-af7e7991c97e/go.mod h1:KPUcpx070QOfJK1gNe0zx4pA5sicIK1GMikIGLKC168=
github.com/dolthub/go-mysql-server v0.18.2-0.20240726192758-72470d302577 h1:k1zmiHU6vXi9xn5EtSRrAuys4YvRl0bOUCce4afEbsU=
github.com/dolthub/go-mysql-server v0.18.2-0.20240726192758-72470d302577/go.mod h1:P6bG0p+3mH4LS4DLo3BySh10ZJTDqgWyfWBu8gGE3eU=
github.com/dolthub/go-mysql-server v0.18.2-0.20240729182701-b4cec29302d1 h1:ydrib93syT0bpnYjgeKuLkn0R0B++lPNWMN9knWGMJY=
github.com/dolthub/go-mysql-server v0.18.2-0.20240729182701-b4cec29302d1/go.mod h1:ctH+JB+7+NeXPOfcujw/dzzPPgT9Byb/nOE44RRAztg=
github.com/dolthub/gozstd v0.0.0-20240423170813-23a2903bca63 h1:OAsXLAPL4du6tfbBgK0xXHZkOlos63RdKYS3Sgw/dfI=
github.com/dolthub/gozstd v0.0.0-20240423170813-23a2903bca63/go.mod h1:lV7lUeuDhH5thVGDCKXbatwKy2KW80L4rMT46n+Y2/Q=
github.com/dolthub/ishell v0.0.0-20240701202509-2b217167d718 h1:lT7hE5k+0nkBdj/1UOSFwjWpNxf+LCApbRHgnCA17XE=
Expand All @@ -197,8 +197,8 @@ github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81 h1:7/v8q9X
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81/go.mod h1:siLfyv2c92W1eN/R4QqG/+RjjX5W2+gCTRjZxBjI3TY=
github.com/dolthub/swiss v0.1.0 h1:EaGQct3AqeP/MjASHLiH6i4TAmgbG/c4rA6a1bzCOPc=
github.com/dolthub/swiss v0.1.0/go.mod h1:BeucyB08Vb1G9tumVN3Vp/pyY4AMUnr9p7Rz7wJ7kAQ=
github.com/dolthub/vitess v0.0.0-20240711213744-4232e1c4edae h1:SAYP6+hzkoYLWVzQTwQO0QbhVOwMsgy0dpRcL/QriS8=
github.com/dolthub/vitess v0.0.0-20240711213744-4232e1c4edae/go.mod h1:uBvlRluuL+SbEWTCZ68o0xvsdYZER3CEG/35INdzfJM=
github.com/dolthub/vitess v0.0.0-20240723185540-9f8cd7c03829 h1:/SEo3jm8zy0EVKIUPRcfagB9drQYI4KwQlSU/DNVRAQ=
github.com/dolthub/vitess v0.0.0-20240723185540-9f8cd7c03829/go.mod h1:uBvlRluuL+SbEWTCZ68o0xvsdYZER3CEG/35INdzfJM=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
Expand Down
132 changes: 132 additions & 0 deletions go/libraries/doltcore/sqle/binlogreplication/binlog_file_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright 2024 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package binlogreplication

import (
"encoding/binary"
"fmt"
"os"
"strconv"
"strings"

"github.com/dolthub/vitess/go/mysql"
)

// binlogFileMagicNumber holds the four bytes that start off every
// MySQL binlog file and identify the file as a MySQL binlog.
var binlogFileMagicNumber = []byte{0xfe, 0x62, 0x69, 0x6e}

// fileExists returns true if the specified |filepath| exists on disk and is not a directory,
// otherwise returns false. |filepath| is a fully specified path to a file on disk.
func fileExists(filepath string) bool {
info, err := os.Stat(filepath)
if os.IsNotExist(err) {
return false
}
return !info.IsDir()
}

// openBinlogFileForReading opens the specified |logfile| for reading and reads the first four bytes to make sure they
// are the expected binlog file magic numbers. If any problems are encountered opening the file or reading the first
// four bytes, an error is returned.
func openBinlogFileForReading(logfile string) (*os.File, error) {
file, err := os.Open(logfile)
if err != nil {
return nil, err
}
buffer := make([]byte, len(binlogFileMagicNumber))
bytesRead, err := file.Read(buffer)
if err != nil {
return nil, err
}
if bytesRead != len(binlogFileMagicNumber) || string(buffer) != string(binlogFileMagicNumber) {
return nil, fmt.Errorf("invalid magic number in binlog file!")
}

return file, nil
}

// readBinlogEventFromFile reads the next binlog event from the specified, open |file| and
// returns it. If no more events are available in the file, then io.EOF is returned.
func readBinlogEventFromFile(file *os.File) (mysql.BinlogEvent, error) {
headerBuffer := make([]byte, 4+1+4+4+4+2)
_, err := file.Read(headerBuffer)
if err != nil {
return nil, err
}

// Event Header:
//timestamp := headerBuffer[0:4]
//eventType := headerBuffer[4]
//serverId := binary.LittleEndian.Uint32(headerBuffer[5:5+4])
eventSize := binary.LittleEndian.Uint32(headerBuffer[9 : 9+4])

payloadBuffer := make([]byte, eventSize-uint32(len(headerBuffer)))
_, err = file.Read(payloadBuffer)
if err != nil {
return nil, err
}

return mysql.NewMysql56BinlogEvent(append(headerBuffer, payloadBuffer...)), nil
}

// readFirstGtidEventFromFile reads events from |file| until a GTID event is found, then
// returns that GTID event. If |file| has been read completely and no GTID events were found,
// then io.EOF is returned.
func readFirstGtidEventFromFile(file *os.File) (mysql.BinlogEvent, error) {
for {
binlogEvent, err := readBinlogEventFromFile(file)
if err != nil {
return nil, err
}

if binlogEvent.IsGTID() {
return binlogEvent, nil
}
}
}

// formatBinlogFilename formats a binlog filename using the specified |branch| and |sequence| number. The
// returned filename will be of the form "binlog-main.000001".
func formatBinlogFilename(branch string, sequence int) string {
return fmt.Sprintf("binlog-%s.%06d", branch, sequence)
}

// parseBinlogFilename parses a binlog filename, of the form "binlog-main.000001", into its branch and
// sequence number.
func parseBinlogFilename(filename string) (branch string, sequence int, err error) {
if !strings.HasPrefix(filename, "binlog-") {
return "", 0, fmt.Errorf("invalid binlog filename: %s; must start with 'binlog-'", filename)
}

filename = strings.TrimPrefix(filename, "binlog-")

splits := strings.Split(filename, ".")
if len(splits) != 2 {
return "", 0, fmt.Errorf(
"unable to parse binlog filename: %s; expected format 'binlog-branch.sequence'", filename)
}

branch = splits[0]
sequenceString := splits[1]

sequence, err = strconv.Atoi(sequenceString)
if err != nil {
return "", 0, fmt.Errorf(
"unable to parse binlog sequence number: %s; %s", sequenceString, err.Error())
}

return branch, sequence, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/dolthub/vitess/go/mysql"

"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
)

const binlogPositionDirectory = ".doltcfg"
Expand All @@ -38,19 +39,16 @@ type binlogPositionStore struct {
mu sync.Mutex
}

// Load loads a mysql.Position instance from the .doltcfg/binlog-position file at the root of the provider's filesystem.
// Load loads a mysql.Position instance from the .doltcfg/binlog-position file at the root of the specified |filesystem|.
// This file MUST be stored at the root of the provider's filesystem, and NOT inside a nested database's .doltcfg directory,
// since the binlog position contains events that cover all databases in a SQL server. The returned mysql.Position
// represents the set of GTIDs that have been successfully executed and applied on this replica. Currently only the
// default binlog channel ("") is supported. If no .doltcfg/binlog-position file is stored, this method returns a nil
// mysql.Position and a nil error. If any errors are encountered, a nil mysql.Position and an error are returned.
func (store *binlogPositionStore) Load(ctx *sql.Context) (*mysql.Position, error) {
func (store *binlogPositionStore) Load(filesys filesys.Filesys) (*mysql.Position, error) {
store.mu.Lock()
defer store.mu.Unlock()

doltSession := dsess.DSessFromSess(ctx.Session)
filesys := doltSession.Provider().FileSystem()

doltDirExists, _ := filesys.Exists(binlogPositionDirectory)
if !doltDirExists {
return nil, nil
Expand Down
Loading
Loading