Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Add ability to tag workflows #8388

Merged
merged 7 commits into from
Jul 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,7 @@ var AlterVReplicationTable = []string{
"ALTER TABLE _vt.vreplication MODIFY source BLOB NOT NULL",
"ALTER TABLE _vt.vreplication ADD KEY workflow_idx (workflow(64))",
"ALTER TABLE _vt.vreplication ADD COLUMN rows_copied BIGINT(20) NOT NULL DEFAULT 0",
"ALTER TABLE _vt.vreplication ADD COLUMN tags VARBINARY(1024) NOT NULL DEFAULT ''",
}

// WithDDLInitialQueries contains the queries to be expected by the mock db client during tests
Expand Down
1,130 changes: 570 additions & 560 deletions go/vt/proto/vtctldata/vtctldata.pb.go

Large diffs are not rendered by default.

47 changes: 47 additions & 0 deletions go/vt/proto/vtctldata/vtctldata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 26 additions & 10 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ var commands = []commandGroup{
"Workflow", []command{
{"Workflow", commandWorkflow,
rohit-nayak-ps marked this conversation as resolved.
Show resolved Hide resolved
"<ks.workflow> <action> --dry-run",
"Start/Stop/Delete/Show/ListAll Workflow on all target tablets in workflow. Example: Workflow merchant.morders Start",
"Start/Stop/Delete/Show/ListAll/Tags Workflow on all target tablets in workflow. Example: Workflow merchant.morders Start",
},
},
},
Expand Down Expand Up @@ -3491,8 +3491,8 @@ func commandWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 2 {
return fmt.Errorf("usage: Workflow --dry-run keyspace[.workflow] start/stop/delete/list/listall")
if subFlags.NArg() < 2 {
return fmt.Errorf("usage: Workflow --dry-run keyspace[.workflow] start/stop/delete/list/listall/tags [<tags>]")
}
keyspace := subFlags.Arg(0)
action := strings.ToLower(subFlags.Arg(1))
Expand All @@ -3512,14 +3512,30 @@ func commandWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.
if err != nil {
wr.Logger().Errorf("Keyspace %s not found", keyspace)
}

results, err := wr.WorkflowAction(ctx, workflow, keyspace, action, *dryRun)
if err != nil {
return err
}
if action == "show" || action == "listall" {
return nil
var results map[*topo.TabletInfo]*sqltypes.Result
if action == "tags" {
tags := ""
if subFlags.NArg() != 3 {
return fmt.Errorf("tags incorrectly specified, usage: Workflow keyspace.workflow tags <tags>")
}
tags = strings.ToLower(subFlags.Arg(2))
results, err = wr.WorkflowTagAction(ctx, keyspace, workflow, tags)
rohit-nayak-ps marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
} else {
if subFlags.NArg() != 2 {
return fmt.Errorf("usage: Workflow --dry-run keyspace[.workflow] start/stop/delete/list/listall")
}
results, err = wr.WorkflowAction(ctx, workflow, keyspace, action, *dryRun)
if err != nil {
return err
}
if action == "show" || action == "listall" {
return nil
}
}

if len(results) == 0 {
wr.Logger().Printf("no result returned\n")
return nil
Expand Down
9 changes: 8 additions & 1 deletion go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
db_name,
time_updated,
transaction_timestamp,
message
message,
tags
FROM
_vt.vreplication
%s`,
Expand Down Expand Up @@ -354,6 +355,11 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows

message := row[10].ToString()

tags := row[11].ToString()
var tagArray []string
if tags != "" {
tagArray = strings.Split(tags, ",")
}
stream := &vtctldatapb.Workflow_Stream{
Id: id,
Shard: tablet.Shard,
Expand All @@ -370,6 +376,7 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows
Seconds: timeUpdatedSeconds,
},
Message: message,
Tags: tagArray,
}

stream.CopyStates, err = s.getWorkflowCopyStates(ctx, tablet, id)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vreplication/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func init() {
allddls := append([]string{}, binlogplayer.CreateVReplicationTable()...)
allddls = append(allddls, binlogplayer.AlterVReplicationTable...)
allddls = append(allddls, createReshardingJournalTable, createCopyState)
allddls = append(allddls, createVReplicationLog)
allddls = append(allddls, createVReplicationLogTable)
withDDL = withddl.New(allddls)

withDDLInitialQueries = append(withDDLInitialQueries, binlogplayer.WithDDLInitialQueries...)
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletmanager/vreplication/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ func TestCreateDBAndTable(t *testing.T) {
dbClient.ExpectRequestRE("ALTER TABLE _vt.vreplication MODIFY source.*", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("ALTER TABLE _vt.vreplication ADD KEY.*", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("ALTER TABLE _vt.vreplication ADD COLUMN rows_copied.*", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("ALTER TABLE _vt.vreplication ADD COLUMN tags.*", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("create table if not exists _vt.resharding_journal.*", &sqltypes.Result{}, nil)
dbClient.ExpectRequestRE("create table if not exists _vt.copy_state.*", &sqltypes.Result{}, nil)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestMain(m *testing.M) {
return 1
}

if err := env.Mysqld.ExecuteSuperQuery(context.Background(), createVReplicationLog); err != nil {
if err := env.Mysqld.ExecuteSuperQuery(context.Background(), createVReplicationLogTable); err != nil {
fmt.Fprintf(os.Stderr, "%v", err)
return 1
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
)

const (
vreplicationLogTableName = "_vt.vreplication_log"
createVReplicationLog = `CREATE TABLE IF NOT EXISTS _vt.vreplication_log (
vreplicationLogTableName = "_vt.vreplication_log"
createVReplicationLogTable = `CREATE TABLE IF NOT EXISTS _vt.vreplication_log (
id BIGINT(20) AUTO_INCREMENT,
vrepl_id INT NOT NULL,
type VARBINARY(256) NOT NULL,
Expand Down
29 changes: 22 additions & 7 deletions go/vt/wrangler/vexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,14 @@ func (vx *vexec) getMasterForShard(shard string) (*topo.TabletInfo, error) {
return master, nil
}

func (wr *Wrangler) convertQueryResultToSQLTypesResult(results map[*topo.TabletInfo]*querypb.QueryResult) map[*topo.TabletInfo]*sqltypes.Result {
retResults := make(map[*topo.TabletInfo]*sqltypes.Result)
for tablet, result := range results {
retResults[tablet] = sqltypes.Proto3ToResult(result)
}
return retResults
}

// WorkflowAction can start/stop/delete or list streams in _vt.vreplication on all masters in the target keyspace of the workflow.
func (wr *Wrangler) WorkflowAction(ctx context.Context, workflow, keyspace, action string, dryRun bool) (map[*topo.TabletInfo]*sqltypes.Result, error) {

Expand All @@ -294,11 +302,7 @@ func (wr *Wrangler) WorkflowAction(ctx context.Context, workflow, keyspace, acti
return nil, err
}
results, err := wr.execWorkflowAction(ctx, workflow, keyspace, action, dryRun)
retResults := make(map[*topo.TabletInfo]*sqltypes.Result)
for tablet, result := range results {
retResults[tablet] = sqltypes.Proto3ToResult(result)
}
return retResults, err
return wr.convertQueryResultToSQLTypesResult(results), err
}

func (wr *Wrangler) getWorkflowActionQuery(action string) (string, error) {
Expand All @@ -325,6 +329,13 @@ func (wr *Wrangler) execWorkflowAction(ctx context.Context, workflow, keyspace,
return wr.runVexec(ctx, workflow, keyspace, query, dryRun)
}

// WorkflowTagAction sets or clears the tags for a workflow in a keyspace
func (wr *Wrangler) WorkflowTagAction(ctx context.Context, keyspace string, workflow string, tags string) (map[*topo.TabletInfo]*sqltypes.Result, error) {
query := fmt.Sprintf("update _vt.vreplication set tags = %s", encodeString(tags))
results, err := wr.runVexec(ctx, workflow, keyspace, query, false)
return wr.convertQueryResultToSQLTypesResult(results), err
}

// ReplicationStatusResult represents the result of trying to get the replication status for a given workflow.
type ReplicationStatusResult struct {
// Workflow represents the name of the workflow relevant to the related replication statuses.
Expand Down Expand Up @@ -385,6 +396,8 @@ type ReplicationStatus struct {
TimeUpdated int64
// Message represents the message column from the _vt.vreplication table.
Message string
// Tags contain the tags specified for this stream
Tags string
rohit-nayak-ps marked this conversation as resolved.
Show resolved Hide resolved

// CopyState represents the rows from the _vt.copy_state table.
CopyState []copyState
Expand All @@ -393,7 +406,7 @@ type ReplicationStatus struct {
func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqltypes.Value, master *topo.TabletInfo) (*ReplicationStatus, string, error) {
var err error
var id, timeUpdated, transactionTimestamp int64
var state, dbName, pos, stopPos, message string
var state, dbName, pos, stopPos, message, tags string
var bls binlogdatapb.BinlogSource
var mpos mysql.Position

Expand Down Expand Up @@ -426,6 +439,7 @@ func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqlty
return nil, "", err
}
message = row[9].ToString()
tags = row[10].ToString()
status := &ReplicationStatus{
Shard: master.Shard,
Tablet: master.AliasString(),
Expand All @@ -438,6 +452,7 @@ func (wr *Wrangler) getReplicationStatusFromRow(ctx context.Context, row []sqlty
TransactionTimestamp: transactionTimestamp,
TimeUpdated: timeUpdated,
Message: message,
Tags: tags,
}
status.CopyState, err = wr.getCopyState(ctx, master, id)
if err != nil {
Expand All @@ -453,7 +468,7 @@ func (wr *Wrangler) getStreams(ctx context.Context, workflow, keyspace string) (
rsr.ShardStatuses = make(map[string]*ShardReplicationStatus)
rsr.Workflow = workflow
var results map[*topo.TabletInfo]*querypb.QueryResult
query := "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message from _vt.vreplication"
query := "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message, tags from _vt.vreplication"
results, err := wr.runVexec(ctx, workflow, keyspace, query, false)
if err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions go/vt/wrangler/vexec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func TestWorkflowListStreams(t *testing.T) {
"TransactionTimestamp": 0,
"TimeUpdated": 1234,
"Message": "",
"Tags": "",
"CopyState": [
{
"Table": "t1",
Expand Down Expand Up @@ -271,6 +272,7 @@ func TestWorkflowListStreams(t *testing.T) {
"TransactionTimestamp": 0,
"TimeUpdated": 1234,
"Message": "",
"Tags": "",
"CopyState": [
{
"Table": "t1",
Expand Down
12 changes: 6 additions & 6 deletions go/vt/wrangler/wrangler_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,11 @@ func newWranglerTestEnv(sourceShards, targetShards []string, query string, posit
env.tmc.setVRResults(master.tablet, "insert into _vt.vreplication(state, workflow, db_name) values ('Running', 'wk1', 'ks1'), ('Stopped', 'wk1', 'ks1')", &sqltypes.Result{RowsAffected: 2})

result := sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|message",
"int64|varchar|varchar|varchar|int64|varchar|varchar|int64|int64|varchar"),
fmt.Sprintf("1|%v|MySQL56/14b68925-696a-11ea-aee7-fec597a91f5e:1-3||0|Running|vt_target|%d|0|", bls, timeUpdated),
"id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|message|tags",
"int64|varchar|varchar|varchar|int64|varchar|varchar|int64|int64|varchar|varchar"),
fmt.Sprintf("1|%v|MySQL56/14b68925-696a-11ea-aee7-fec597a91f5e:1-3||0|Running|vt_target|%d|0||", bls, timeUpdated),
)
env.tmc.setVRResults(master.tablet, "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message from _vt.vreplication where db_name = 'vt_target' and workflow = 'wrWorkflow'", result)
env.tmc.setVRResults(master.tablet, "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message, tags from _vt.vreplication where db_name = 'vt_target' and workflow = 'wrWorkflow'", result)
env.tmc.setVRResults(
master.tablet,
"select source, pos from _vt.vreplication where db_name='vt_target' and workflow='wrWorkflow'",
Expand All @@ -173,9 +173,9 @@ func newWranglerTestEnv(sourceShards, targetShards []string, query string, posit

env.tmc.setVRResults(master.tablet, "select table_name, lastpk from _vt.copy_state where vrepl_id = 1", result)

env.tmc.setVRResults(master.tablet, "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message from _vt.vreplication where db_name = 'vt_target' and workflow = 'bad'", result)
env.tmc.setVRResults(master.tablet, "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message, tags from _vt.vreplication where db_name = 'vt_target' and workflow = 'bad'", result)

env.tmc.setVRResults(master.tablet, "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message from _vt.vreplication where db_name = 'vt_target' and workflow = 'badwf'", &sqltypes.Result{})
env.tmc.setVRResults(master.tablet, "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message, tags from _vt.vreplication where db_name = 'vt_target' and workflow = 'badwf'", &sqltypes.Result{})
env.tmc.vrpos[tabletID] = testSourceGtid
env.tmc.pos[tabletID] = testTargetMasterPosition

Expand Down
1 change: 1 addition & 0 deletions proto/vtctldata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ message Workflow {
// if there are N logs that exist for the stream, and we fail to fetch the
// ith log, we will still return logs in [0, i) + (i, N].
string log_fetch_error = 14;
repeated string tags = 15;

message CopyState {
string table = 1;
Expand Down