Skip to content

Commit

Permalink
Add mutex and lock unlock for all read and writes.
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Farr <[email protected]>
  • Loading branch information
PrismaPhonic committed Aug 21, 2020
1 parent f66c2ff commit 4cc7ed6
Showing 1 changed file with 75 additions and 0 deletions.
75 changes: 75 additions & 0 deletions go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"reflect"
"strings"
"sync"
"time"

"golang.org/x/net/context"
Expand All @@ -39,6 +40,8 @@ import (
// FakeMysqlDaemon implements MysqlDaemon and allows the user to fake
// everything.
type FakeMysqlDaemon struct {
mu sync.Mutex

// db is the fake SQL DB we may use for some queries.
db *fakesqldb.DB

Expand Down Expand Up @@ -169,6 +172,8 @@ func NewFakeMysqlDaemon(db *fakesqldb.DB) *FakeMysqlDaemon {
Running: true,
IOThreadRunning: true,
}
result.mu.Lock()
defer result.mu.Unlock()
if db != nil {
result.appPool = dbconnpool.NewConnectionPool("AppConnPool", 5, time.Minute, 0)
result.appPool.Open(db.ConnParams())
Expand All @@ -178,6 +183,8 @@ func NewFakeMysqlDaemon(db *fakesqldb.DB) *FakeMysqlDaemon {

// Start is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) Start(ctx context.Context, cnf *mysqlctl.Mycnf, mysqldArgs ...string) error {
fmd.mu.Lock()
defer fmd.mu.Unlock()
if fmd.Running {
return fmt.Errorf("fake mysql daemon already running")
}
Expand All @@ -187,6 +194,8 @@ func (fmd *FakeMysqlDaemon) Start(ctx context.Context, cnf *mysqlctl.Mycnf, mysq

// Shutdown is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) Shutdown(ctx context.Context, cnf *mysqlctl.Mycnf, waitForMysqld bool) error {
fmd.mu.Lock()
defer fmd.mu.Unlock()
if !fmd.Running {
return fmt.Errorf("fake mysql daemon not running")
}
Expand Down Expand Up @@ -216,6 +225,8 @@ func (fmd *FakeMysqlDaemon) Wait(ctx context.Context, cnf *mysqlctl.Mycnf) error

// GetMysqlPort is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) GetMysqlPort() (int32, error) {
fmd.mu.Lock()
defer fmd.mu.Unlock()
if fmd.MysqlPort.Get() == -1 {
return 0, fmt.Errorf("FakeMysqlDaemon.GetMysqlPort returns an error")
}
Expand All @@ -224,6 +235,8 @@ func (fmd *FakeMysqlDaemon) GetMysqlPort() (int32, error) {

// ReplicationStatus is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) ReplicationStatus() (mysql.ReplicationStatus, error) {
fmd.mu.Lock()
defer fmd.mu.Unlock()
if fmd.ReplicationStatusError != nil {
return mysql.ReplicationStatus{}, fmd.ReplicationStatusError
}
Expand All @@ -243,6 +256,8 @@ func (fmd *FakeMysqlDaemon) ReplicationStatus() (mysql.ReplicationStatus, error)

// MasterStatus is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) MasterStatus(ctx context.Context) (mysql.MasterStatus, error) {
fmd.mu.Lock()
defer fmd.mu.Unlock()
if fmd.MasterStatusError != nil {
return mysql.MasterStatus{}, fmd.MasterStatusError
}
Expand All @@ -254,36 +269,48 @@ func (fmd *FakeMysqlDaemon) MasterStatus(ctx context.Context) (mysql.MasterStatu

// ResetReplication is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) ResetReplication(ctx context.Context) error {
fmd.mu.Lock()
defer fmd.mu.Unlock()
return fmd.ExecuteSuperQueryList(ctx, []string{
"FAKE RESET ALL REPLICATION",
})
}

// MasterPosition is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) MasterPosition() (mysql.Position, error) {
fmd.mu.Lock()
defer fmd.mu.Unlock()
return fmd.CurrentMasterPosition, nil
}

// IsReadOnly is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) IsReadOnly() (bool, error) {
fmd.mu.Lock()
defer fmd.mu.Unlock()
return fmd.ReadOnly, nil
}

// SetReadOnly is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) SetReadOnly(on bool) error {
fmd.mu.Lock()
defer fmd.mu.Unlock()
fmd.ReadOnly = on
return nil
}

// SetSuperReadOnly is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) SetSuperReadOnly(on bool) error {
fmd.mu.Lock()
defer fmd.mu.Unlock()
fmd.SuperReadOnly = on
fmd.ReadOnly = on
return nil
}

// StartReplication is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) StartReplication(hookExtraEnv map[string]string) error {
fmd.mu.Lock()
defer fmd.mu.Unlock()
if fmd.StartReplicationError != nil {
return fmd.StartReplicationError
}
Expand All @@ -294,6 +321,8 @@ func (fmd *FakeMysqlDaemon) StartReplication(hookExtraEnv map[string]string) err

// RestartReplication is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) RestartReplication(hookExtraEnv map[string]string) error {
fmd.mu.Lock()
defer fmd.mu.Unlock()
return fmd.ExecuteSuperQueryList(context.Background(), []string{
"STOP SLAVE",
"RESET SLAVE",
Expand All @@ -303,6 +332,8 @@ func (fmd *FakeMysqlDaemon) RestartReplication(hookExtraEnv map[string]string) e

// StartReplicationUntilAfter is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) StartReplicationUntilAfter(ctx context.Context, pos mysql.Position) error {
fmd.mu.Lock()
defer fmd.mu.Unlock()
if !reflect.DeepEqual(fmd.StartReplicationUntilAfterPos, pos) {
return fmt.Errorf("wrong pos for StartReplicationUntilAfter: expected %v got %v", fmd.SetReplicationPositionPos, pos)
}
Expand All @@ -314,20 +345,26 @@ func (fmd *FakeMysqlDaemon) StartReplicationUntilAfter(ctx context.Context, pos

// StopReplication is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) StopReplication(hookExtraEnv map[string]string) error {
fmd.mu.Lock()
defer fmd.mu.Unlock()
return fmd.ExecuteSuperQueryList(context.Background(), []string{
"STOP SLAVE",
})
}

// StopIOThread is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) StopIOThread(ctx context.Context) error {
fmd.mu.Lock()
defer fmd.mu.Unlock()
return fmd.ExecuteSuperQueryList(context.Background(), []string{
"STOP SLAVE IO_THREAD",
})
}

// SetReplicationPosition is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) SetReplicationPosition(ctx context.Context, pos mysql.Position) error {
fmd.mu.Lock()
defer fmd.mu.Unlock()
if !reflect.DeepEqual(fmd.SetReplicationPositionPos, pos) {
return fmt.Errorf("wrong pos for SetReplicationPosition: expected %v got %v", fmd.SetReplicationPositionPos, pos)
}
Expand All @@ -338,6 +375,8 @@ func (fmd *FakeMysqlDaemon) SetReplicationPosition(ctx context.Context, pos mysq

// SetMaster is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) SetMaster(ctx context.Context, masterHost string, masterPort int, stopReplicationBefore bool, startReplicationAfter bool) error {
fmd.mu.Lock()
defer fmd.mu.Unlock()
input := fmt.Sprintf("%v:%v", masterHost, masterPort)
if fmd.SetMasterInput != input {
return fmt.Errorf("wrong input for SetMasterCommands: expected %v got %v", fmd.SetMasterInput, input)
Expand All @@ -363,11 +402,15 @@ func (fmd *FakeMysqlDaemon) WaitForReparentJournal(ctx context.Context, timeCrea

// DemoteMaster is deprecated: use mysqld.MasterPosition() instead
func (fmd *FakeMysqlDaemon) DemoteMaster() (mysql.Position, error) {
fmd.mu.Lock()
defer fmd.mu.Unlock()
return fmd.CurrentMasterPosition, nil
}

// WaitMasterPos is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) WaitMasterPos(_ context.Context, pos mysql.Position) error {
fmd.mu.Lock()
defer fmd.mu.Unlock()
if fmd.TimeoutHook != nil {
return fmd.TimeoutHook()
}
Expand All @@ -379,6 +422,8 @@ func (fmd *FakeMysqlDaemon) WaitMasterPos(_ context.Context, pos mysql.Position)

// Promote is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) Promote(hookExtraEnv map[string]string) (mysql.Position, error) {
fmd.mu.Lock()
defer fmd.mu.Unlock()
if fmd.PromoteError != nil {
return mysql.Position{}, fmd.PromoteError
}
Expand All @@ -387,6 +432,8 @@ func (fmd *FakeMysqlDaemon) Promote(hookExtraEnv map[string]string) (mysql.Posit

// ExecuteSuperQueryList is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) ExecuteSuperQueryList(ctx context.Context, queryList []string) error {
fmd.mu.Lock()
defer fmd.mu.Unlock()
for _, query := range queryList {
// test we still have a query to compare
if fmd.ExpectedExecuteSuperQueryCurrent >= len(fmd.ExpectedExecuteSuperQueryList) {
Expand Down Expand Up @@ -421,6 +468,8 @@ func (fmd *FakeMysqlDaemon) ExecuteSuperQueryList(ctx context.Context, queryList

// FetchSuperQuery returns the results from the map, if any
func (fmd *FakeMysqlDaemon) FetchSuperQuery(ctx context.Context, query string) (*sqltypes.Result, error) {
fmd.mu.Lock()
defer fmd.mu.Unlock()
if fmd.FetchSuperQueryMap == nil {
return nil, fmt.Errorf("unexpected query: %v", query)
}
Expand All @@ -434,18 +483,24 @@ func (fmd *FakeMysqlDaemon) FetchSuperQuery(ctx context.Context, query string) (

// EnableBinlogPlayback is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) EnableBinlogPlayback() error {
fmd.mu.Lock()
defer fmd.mu.Unlock()
fmd.BinlogPlayerEnabled.Set(true)
return nil
}

// DisableBinlogPlayback disable playback of binlog events
func (fmd *FakeMysqlDaemon) DisableBinlogPlayback() error {
fmd.mu.Lock()
defer fmd.mu.Unlock()
fmd.BinlogPlayerEnabled.Set(false)
return nil
}

// Close is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) Close() {
fmd.mu.Lock()
defer fmd.mu.Unlock()
if fmd.appPool != nil {
fmd.appPool.Close()
}
Expand All @@ -454,6 +509,8 @@ func (fmd *FakeMysqlDaemon) Close() {
// CheckSuperQueryList returns an error if all the queries we expected
// haven't been seen.
func (fmd *FakeMysqlDaemon) CheckSuperQueryList() error {
fmd.mu.Lock()
defer fmd.mu.Unlock()
if fmd.ExpectedExecuteSuperQueryCurrent != len(fmd.ExpectedExecuteSuperQueryList) {
return fmt.Errorf("SuperQueryList wasn't consumed, saw %v queries, was expecting %v", fmd.ExpectedExecuteSuperQueryCurrent, len(fmd.ExpectedExecuteSuperQueryList))
}
Expand All @@ -462,6 +519,8 @@ func (fmd *FakeMysqlDaemon) CheckSuperQueryList() error {

// GetSchema is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) GetSchema(ctx context.Context, dbName string, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) {
fmd.mu.Lock()
defer fmd.mu.Unlock()
if fmd.SchemaFunc != nil {
return fmd.SchemaFunc()
}
Expand All @@ -483,6 +542,8 @@ func (fmd *FakeMysqlDaemon) GetPrimaryKeyColumns(ctx context.Context, dbName, ta

// PreflightSchemaChange is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) PreflightSchemaChange(ctx context.Context, dbName string, changes []string) ([]*tabletmanagerdatapb.SchemaChangeResult, error) {
fmd.mu.Lock()
defer fmd.mu.Unlock()
if fmd.PreflightSchemaChangeResult == nil {
return nil, fmt.Errorf("no preflight result defined")
}
Expand All @@ -491,6 +552,8 @@ func (fmd *FakeMysqlDaemon) PreflightSchemaChange(ctx context.Context, dbName st

// ApplySchemaChange is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) ApplySchemaChange(ctx context.Context, dbName string, change *tmutils.SchemaChange) (*tabletmanagerdatapb.SchemaChangeResult, error) {
fmd.mu.Lock()
defer fmd.mu.Unlock()
if fmd.ApplySchemaChangeResult == nil {
return nil, fmt.Errorf("no apply schema defined")
}
Expand All @@ -499,33 +562,45 @@ func (fmd *FakeMysqlDaemon) ApplySchemaChange(ctx context.Context, dbName string

// GetAppConnection is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) GetAppConnection(ctx context.Context) (*dbconnpool.PooledDBConnection, error) {
fmd.mu.Lock()
defer fmd.mu.Unlock()
return fmd.appPool.Get(ctx)
}

// GetDbaConnection is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) GetDbaConnection(ctx context.Context) (*dbconnpool.DBConnection, error) {
fmd.mu.Lock()
defer fmd.mu.Unlock()
return dbconnpool.NewDBConnection(ctx, fmd.db.ConnParams())
}

// GetAllPrivsConnection is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) GetAllPrivsConnection(ctx context.Context) (*dbconnpool.DBConnection, error) {
fmd.mu.Lock()
defer fmd.mu.Unlock()
return dbconnpool.NewDBConnection(ctx, fmd.db.ConnParams())
}

// SetSemiSyncEnabled is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) SetSemiSyncEnabled(master, replica bool) error {
fmd.mu.Lock()
defer fmd.mu.Unlock()
fmd.SemiSyncMasterEnabled = master
fmd.SemiSyncReplicaEnabled = replica
return nil
}

// SemiSyncEnabled is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) SemiSyncEnabled() (master, replica bool) {
fmd.mu.Lock()
defer fmd.mu.Unlock()
return fmd.SemiSyncMasterEnabled, fmd.SemiSyncReplicaEnabled
}

// SemiSyncReplicationStatus is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) SemiSyncReplicationStatus() (bool, error) {
fmd.mu.Lock()
defer fmd.mu.Unlock()
// The fake assumes the status worked.
return fmd.SemiSyncReplicaEnabled, nil
}

0 comments on commit 4cc7ed6

Please sign in to comment.