Skip to content

Commit

Permalink
Merge commit 'd6d191b88b896c98cbbbeeca7005dc9bfd22c816'
Browse files Browse the repository at this point in the history
  • Loading branch information
alainjobart committed Oct 29, 2014
2 parents fe19f4b + d6d191b commit 3f2be6e
Show file tree
Hide file tree
Showing 4 changed files with 296 additions and 12 deletions.
29 changes: 27 additions & 2 deletions go/vt/mysqlctl/mysql_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package mysqlctl
import (
"fmt"

"github.com/youtube/vitess/go/vt/dbconnpool"
"github.com/youtube/vitess/go/vt/mysqlctl/proto"
)

Expand All @@ -19,13 +20,17 @@ type MysqlDaemon interface {
// GetMysqlPort returns the current port mysql is listening on.
GetMysqlPort() (int, error)

// StartSlave / StopSlave allows the caller to start or stop
// mysql replication
// replication related methods
StartSlave(hookExtraEnv map[string]string) error
StopSlave(hookExtraEnv map[string]string) error
SlaveStatus() (*proto.ReplicationStatus, error)

// Schema related methods
GetSchema(dbName string, tables, excludeTables []string, includeViews bool) (*proto.SchemaDefinition, error)

// GetDbaConnection returns a connection to be able to talk
// to the database as the admin user.
GetDbaConnection() (dbconnpool.PoolConnection, error)
}

// FakeMysqlDaemon implements MysqlDaemon and allows the user to fake
Expand All @@ -42,9 +47,15 @@ type FakeMysqlDaemon struct {
// Replicating is updated when calling StopSlave
Replicating bool

// CurrentSlaveStatus is returned by SlaveStatus
CurrentSlaveStatus *proto.ReplicationStatus

// Schema that will be returned by GetSchema. If nil we'll
// return an error.
Schema *proto.SchemaDefinition

// DbaConnectionFactory is the factory for making fake dba connection
DbaConnectionFactory func() (dbconnpool.PoolConnection, error)
}

func (fmd *FakeMysqlDaemon) GetMasterAddr() (string, error) {
Expand Down Expand Up @@ -74,9 +85,23 @@ func (fmd *FakeMysqlDaemon) StopSlave(hookExtraEnv map[string]string) error {
return nil
}

func (fmd *FakeMysqlDaemon) SlaveStatus() (*proto.ReplicationStatus, error) {
if fmd.CurrentSlaveStatus == nil {
return nil, fmt.Errorf("no slave status defined")
}
return fmd.CurrentSlaveStatus, nil
}

func (fmd *FakeMysqlDaemon) GetSchema(dbName string, tables, excludeTables []string, includeViews bool) (*proto.SchemaDefinition, error) {
if fmd.Schema == nil {
return nil, fmt.Errorf("no schema defined")
}
return fmd.Schema, nil
}

func (fmd *FakeMysqlDaemon) GetDbaConnection() (dbconnpool.PoolConnection, error) {
if fmd.DbaConnectionFactory == nil {
return nil, fmt.Errorf("no DbaConnectionFactory set in this FakeMysqlDaemon")
}
return fmd.DbaConnectionFactory()
}
9 changes: 7 additions & 2 deletions go/vt/tabletmanager/agent_rpc_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ func (agent *ActionAgent) RefreshState() {
// ReloadSchema will reload the schema
// Should be called under RpcWrapLockAction.
func (agent *ActionAgent) ReloadSchema() {
if agent.DBConfigs == nil {
// we skip this for test instances that can't connect to the DB anyway
return
}

// This adds a dependency between tabletmanager and tabletserver,
// so it's not ideal. But I (alainjobart) think it's better
// to have up to date schema in vttablet.
Expand Down Expand Up @@ -249,7 +254,7 @@ func (agent *ActionAgent) ApplySchema(change *myproto.SchemaChange) (*myproto.Sc
// Should be called under RpcWrap.
func (agent *ActionAgent) ExecuteFetch(query string, maxrows int, wantFields, disableBinlogs bool) (*proto.QueryResult, error) {
// get a connection
conn, err := agent.Mysqld.GetDbaConnection()
conn, err := agent.MysqlDaemon.GetDbaConnection()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -282,7 +287,7 @@ func (agent *ActionAgent) ExecuteFetch(query string, maxrows int, wantFields, di
// SlaveStatus returns the replication status
// Should be called under RpcWrap.
func (agent *ActionAgent) SlaveStatus() (*myproto.ReplicationStatus, error) {
return agent.Mysqld.SlaveStatus()
return agent.MysqlDaemon.SlaveStatus()
}

// WaitSlavePosition waits until we reach the provided position,
Expand Down
257 changes: 255 additions & 2 deletions go/vt/worker/split_clone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,258 @@
package worker

import (
"fmt"
"strconv"
"strings"
"testing"
"time"

mproto "github.com/youtube/vitess/go/mysql/proto"
rpcproto "github.com/youtube/vitess/go/rpcwrap/proto"
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/vt/dbconnpool"
"github.com/youtube/vitess/go/vt/key"
"github.com/youtube/vitess/go/vt/logutil"
myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
_ "github.com/youtube/vitess/go/vt/tabletmanager/gorpctmclient"
"github.com/youtube/vitess/go/vt/tabletserver/proto"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/wrangler"
"github.com/youtube/vitess/go/vt/wrangler/testlib"
"github.com/youtube/vitess/go/vt/zktopo"
)

// This is a local SqlQuery RCP implementation to support the tests
type SqlQuery struct {
t *testing.T
}

func (sq *SqlQuery) GetSessionId(sessionParams *proto.SessionParams, sessionInfo *proto.SessionInfo) error {
return nil
}

func (sq *SqlQuery) StreamExecute(ctx *rpcproto.Context, query *proto.Query, sendReply func(reply interface{}) error) error {
// Custom parsing of the query we expect
min := 100
max := 200
var err error
parts := strings.Split(query.Sql, " ")
for _, part := range parts {
if strings.HasPrefix(part, "id>=") {
min, err = strconv.Atoi(part[4:])
if err != nil {
return err
}
} else if strings.HasPrefix(part, "id<") {
max, err = strconv.Atoi(part[3:])
}
}
sq.t.Logf("SqlQuery: got query: %v with min %v max %v", *query, min, max)

// Send the headers
if err := sendReply(&mproto.QueryResult{
Fields: []mproto.Field{
mproto.Field{
Name: "id",
Type: mproto.VT_LONGLONG,
},
mproto.Field{
Name: "msg",
Type: mproto.VT_VARCHAR,
},
mproto.Field{
Name: "keyspace_id",
Type: mproto.VT_LONGLONG,
},
},
}); err != nil {
return err
}

// Send the values
ksids := []uint64{0x2000000000000000, 0x6000000000000000}
for i := min; i < max; i++ {
if err := sendReply(&mproto.QueryResult{
Rows: [][]sqltypes.Value{
[]sqltypes.Value{
sqltypes.MakeString([]byte(fmt.Sprintf("%v", i))),
sqltypes.MakeString([]byte(fmt.Sprintf("Text for %v", i))),
sqltypes.MakeString([]byte(fmt.Sprintf("%v", ksids[i%2]))),
},
},
}); err != nil {
return err
}
}
// SELECT id, msg, keyspace_id FROM table1 WHERE id>=180 AND id<190 ORDER BY id
return nil
}

type ExpectedExecuteFetch struct {
Query string
MaxRows int
WantFields bool
QueryResult *mproto.QueryResult
Error error
}

// FakePoolConnection implements dbconnpool.PoolConnection
type FakePoolConnection struct {
t *testing.T
Closed bool

ExpectedExecuteFetch []ExpectedExecuteFetch
ExpectedExecuteFetchIndex int
}

func (fpc *FakePoolConnection) ExecuteFetch(query string, maxrows int, wantfields bool) (*mproto.QueryResult, error) {
if fpc.ExpectedExecuteFetchIndex >= len(fpc.ExpectedExecuteFetch) {
fpc.t.Errorf("got unexpected out of bound fetch: %v >= %v", fpc.ExpectedExecuteFetchIndex, len(fpc.ExpectedExecuteFetch))
return nil, fmt.Errorf("unexpected out of bound fetch")
}
expected := fpc.ExpectedExecuteFetch[fpc.ExpectedExecuteFetchIndex].Query
if strings.HasSuffix(expected, "*") {
if !strings.HasPrefix(query, expected[0:len(expected)-2]) {
fpc.t.Errorf("got unexpected query start: %v != %v", query, expected)
return nil, fmt.Errorf("unexpected query")
}
} else {
if query != expected {
fpc.t.Errorf("got unexpected query: %v != %v", query, expected)
return nil, fmt.Errorf("unexpected query")
}
}
fpc.t.Logf("ExecuteFetch: %v", query)
defer func() {
fpc.ExpectedExecuteFetchIndex++
}()
return fpc.ExpectedExecuteFetch[fpc.ExpectedExecuteFetchIndex].QueryResult, nil
}

func (fpc *FakePoolConnection) ExecuteStreamFetch(query string, callback func(*mproto.QueryResult) error, streamBufferSize int) error {
return nil
}

func (fpc *FakePoolConnection) Id() int64 {
return 1
}

func (fpc *FakePoolConnection) Close() {
fpc.Closed = true
}

func (fpc *FakePoolConnection) IsClosed() bool {
return fpc.Closed
}

func (fpc *FakePoolConnection) Recycle() {
}

// on the source rdonly guy, should only have one query to find min & max
func SourceRdonlyFactory(t *testing.T) func() (dbconnpool.PoolConnection, error) {
return func() (dbconnpool.PoolConnection, error) {
return &FakePoolConnection{
t: t,
ExpectedExecuteFetch: []ExpectedExecuteFetch{
ExpectedExecuteFetch{
Query: "SELECT MIN(id), MAX(id) FROM vt_ks.table1",
QueryResult: &mproto.QueryResult{
Fields: []mproto.Field{
mproto.Field{
Name: "min",
Type: mproto.VT_LONGLONG,
},
mproto.Field{
Name: "max",
Type: mproto.VT_LONGLONG,
},
},
Rows: [][]sqltypes.Value{
[]sqltypes.Value{
sqltypes.MakeString([]byte("100")),
sqltypes.MakeString([]byte("200")),
},
},
},
},
},
}, nil
}
}

// on the destinations
func DestinationsFactory(t *testing.T) func() (dbconnpool.PoolConnection, error) {
queryIndex := 0
return func() (dbconnpool.PoolConnection, error) {
defer func() {
queryIndex++
}()
switch queryIndex {
case 0:
return &FakePoolConnection{
t: t,
ExpectedExecuteFetch: []ExpectedExecuteFetch{
ExpectedExecuteFetch{
Query: "SET sql_log_bin = OFF",
QueryResult: &mproto.QueryResult{},
},
ExpectedExecuteFetch{
Query: "CREATE DATABASE `vt_ks` /*!40100 DEFAULT CHARACTER SET utf8 */",
QueryResult: &mproto.QueryResult{},
},
ExpectedExecuteFetch{
Query: "SET sql_log_bin = ON",
QueryResult: &mproto.QueryResult{},
},
},
}, nil
case 1:
return &FakePoolConnection{
t: t,
ExpectedExecuteFetch: []ExpectedExecuteFetch{
ExpectedExecuteFetch{
Query: "SET sql_log_bin = OFF",
QueryResult: &mproto.QueryResult{},
},
ExpectedExecuteFetch{
Query: "CREATE TABLE `vt_ks`.`resharding1` (\n" +
" `id` bigint(20) NOT NULL,\n" +
" `msg` varchar(64) DEFAULT NULL,\n" +
" `keyspace_id` bigint(20) unsigned NOT NULL,\n" +
" PRIMARY KEY (`id`),\n" +
" KEY `by_msg` (`msg`)\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8",
QueryResult: &mproto.QueryResult{},
},
ExpectedExecuteFetch{
Query: "SET sql_log_bin = ON",
QueryResult: &mproto.QueryResult{},
},
},
}, nil
default:
return &FakePoolConnection{
t: t,
ExpectedExecuteFetch: []ExpectedExecuteFetch{
ExpectedExecuteFetch{
Query: "SET sql_log_bin = OFF",
QueryResult: &mproto.QueryResult{},
},
ExpectedExecuteFetch{
Query: "INSERT INTO*",
QueryResult: &mproto.QueryResult{},
},
ExpectedExecuteFetch{
Query: "SET sql_log_bin = ON",
QueryResult: &mproto.QueryResult{},
},
},
}, nil
}
//return nil, fmt.Errorf("Unexpected connection")
}
}

func TestSplitClone(t *testing.T) {
ts := zktopo.NewTestServer(t, []string{"cell1", "cell2"})
wr := wrangler.New(logutil.NewConsoleLogger(), ts, time.Minute, time.Second)
Expand Down Expand Up @@ -65,14 +304,28 @@ func TestSplitClone(t *testing.T) {
PrimaryKeyColumns: []string{"id"},
Type: myproto.TABLE_BASE_TABLE,
DataLength: 2048,
RowCount: 10,
RowCount: 100,
},
},
Version: "unused",
}
sourceRdonly.FakeMysqlDaemon.DbaConnectionFactory = SourceRdonlyFactory(t)
sourceRdonly.FakeMysqlDaemon.CurrentSlaveStatus = &myproto.ReplicationStatus{
Position: myproto.ReplicationPosition{
GTIDSet: myproto.MariadbGTID{Domain: 12, Server: 34, Sequence: 5678},
},
}
sourceRdonly.RpcServer.Register(&SqlQuery{t: t})
leftMaster.FakeMysqlDaemon.DbaConnectionFactory = DestinationsFactory(t)
leftRdonly.FakeMysqlDaemon.DbaConnectionFactory = DestinationsFactory(t)
rightMaster.FakeMysqlDaemon.DbaConnectionFactory = DestinationsFactory(t)
rightRdonly.FakeMysqlDaemon.DbaConnectionFactory = DestinationsFactory(t)

wrk := NewSplitCloneWorker(wr, "cell1", "ks", "-80", nil, "populateBlpCheckpoint", 10, 1, 10)
wrk := NewSplitCloneWorker(wr, "cell1", "ks", "-80", nil, "skipAutoIncrement(table1)", 10, 1, 10).(*SplitCloneWorker)
wrk.Run()
status := wrk.StatusAsText()
t.Logf("Got status: %v", status)
if wrk.err != nil || wrk.state != stateSCDone {
t.Errorf("Worker run failed")
}
}
Loading

0 comments on commit 3f2be6e

Please sign in to comment.