From fae286d34191d764476947aaf7d29a9f35b79357 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Mon, 9 Nov 2020 18:40:04 +0100 Subject: [PATCH 1/3] Workflow ListAll also lists stopped workflows Signed-off-by: Rohit Nayak --- go/vt/wrangler/vexec.go | 17 +++++++++++++---- go/vt/wrangler/vexec_test.go | 6 +++++- go/vt/wrangler/wrangler_env_test.go | 7 +++++++ 3 files changed, 25 insertions(+), 5 deletions(-) diff --git a/go/vt/wrangler/vexec.go b/go/vt/wrangler/vexec.go index 20d3506fd16..7271567dc86 100644 --- a/go/vt/wrangler/vexec.go +++ b/go/vt/wrangler/vexec.go @@ -286,7 +286,7 @@ func (wr *Wrangler) WorkflowAction(ctx context.Context, workflow, keyspace, acti err = dumpStreamListAsJSON(replStatus, wr) return nil, err } else if action == "listall" { - workflows, err := wr.ListAllWorkflows(ctx, keyspace) + workflows, err := wr.ListAllWorkflows(ctx, keyspace, true) if err != nil { return nil, err } @@ -502,9 +502,18 @@ func (wr *Wrangler) getStreams(ctx context.Context, workflow, keyspace string) ( return &rsr, nil } -// ListAllWorkflows will return a list of all active workflows for the given keyspace. -func (wr *Wrangler) ListAllWorkflows(ctx context.Context, keyspace string) ([]string, error) { - query := "select distinct workflow from _vt.vreplication where state <> 'Stopped'" +// ListActiveWorkflows will return a list of all active workflows for the given keyspace. +func (wr *Wrangler) ListActiveWorkflows(ctx context.Context, keyspace string) ([]string, error) { + return wr.ListAllWorkflows(ctx, keyspace, true) +} + +// ListAllWorkflows will return a list of all workflows (Running and Stopped) for the given keyspace. +func (wr *Wrangler) ListAllWorkflows(ctx context.Context, keyspace string, active bool) ([]string, error) { + where := "" + if active { + where = " where state <> 'Stopped'" + } + query := "select distinct workflow from _vt.vreplication" + where results, err := wr.runVexec(ctx, "", keyspace, query, false) if err != nil { return nil, err diff --git a/go/vt/wrangler/vexec_test.go b/go/vt/wrangler/vexec_test.go index 9e6284aec09..00849365dd8 100644 --- a/go/vt/wrangler/vexec_test.go +++ b/go/vt/wrangler/vexec_test.go @@ -330,9 +330,13 @@ func TestWorkflowListAll(t *testing.T) { logger := logutil.NewMemoryLogger() wr := New(logger, env.topoServ, env.tmc) - workflows, err := wr.ListAllWorkflows(ctx, keyspace) + workflows, err := wr.ListAllWorkflows(ctx, keyspace, true) require.Nil(t, err) require.Equal(t, []string{workflow}, workflows) + + workflows, err = wr.ListAllWorkflows(ctx, keyspace, false) + require.Nil(t, err) + require.Equal(t, []string{workflow, "wrWorkflow2"}, workflows) } func TestVExecValidations(t *testing.T) { diff --git a/go/vt/wrangler/wrangler_env_test.go b/go/vt/wrangler/wrangler_env_test.go index 16116f85412..9adaba6f62b 100644 --- a/go/vt/wrangler/wrangler_env_test.go +++ b/go/vt/wrangler/wrangler_env_test.go @@ -165,6 +165,13 @@ func newWranglerTestEnv(sourceShards, targetShards []string, query string, posit ) env.tmc.setVRResults(master.tablet, "select distinct workflow from _vt.vreplication where state != 'Stopped' and db_name = 'vt_target'", result) + result = sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "workflow", + "varchar"), + "wrWorkflow", "wrWorkflow2", + ) + env.tmc.setVRResults(master.tablet, "select distinct workflow from _vt.vreplication where db_name = 'vt_target'", result) + result = sqltypes.MakeTestResult(sqltypes.MakeTestFields( "table|lastpk", "varchar|varchar"), From 894626e134df26ea37573fbf8fdb835e38dab68b Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Mon, 9 Nov 2020 19:10:15 +0100 Subject: [PATCH 2/3] Improve listall output Signed-off-by: Rohit Nayak --- go/vt/wrangler/vexec.go | 9 +++++---- go/vt/wrangler/vexec_test.go | 5 +++-- go/vt/wrangler/wrangler_env_test.go | 2 ++ 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/go/vt/wrangler/vexec.go b/go/vt/wrangler/vexec.go index 7271567dc86..81a4ef81e14 100644 --- a/go/vt/wrangler/vexec.go +++ b/go/vt/wrangler/vexec.go @@ -286,11 +286,11 @@ func (wr *Wrangler) WorkflowAction(ctx context.Context, workflow, keyspace, acti err = dumpStreamListAsJSON(replStatus, wr) return nil, err } else if action == "listall" { - workflows, err := wr.ListAllWorkflows(ctx, keyspace, true) + workflows, err := wr.ListAllWorkflows(ctx, keyspace, false) if err != nil { return nil, err } - wr.printWorkflowList(workflows) + wr.printWorkflowList(keyspace, workflows) return nil, err } results, err := wr.execWorkflowAction(ctx, workflow, keyspace, action, dryRun) @@ -568,12 +568,13 @@ func dumpStreamListAsJSON(replStatus *ReplicationStatusResult, wr *Wrangler) err return nil } -func (wr *Wrangler) printWorkflowList(workflows []string) { +func (wr *Wrangler) printWorkflowList(keyspace string, workflows []string) { list := strings.Join(workflows, ", ") if list == "" { + wr.Logger().Printf("No workflows found in keyspace %s", keyspace) return } - wr.Logger().Printf("Workflows: %v", list) + wr.Logger().Printf("Following workflow(s) found in keyspace %s: %v\n", keyspace, list) } func (wr *Wrangler) getCopyState(ctx context.Context, tablet *topo.TabletInfo, id int64) ([]copyState, error) { diff --git a/go/vt/wrangler/vexec_test.go b/go/vt/wrangler/vexec_test.go index 00849365dd8..cfd004e255e 100644 --- a/go/vt/wrangler/vexec_test.go +++ b/go/vt/wrangler/vexec_test.go @@ -180,9 +180,10 @@ func TestWorkflowListStreams(t *testing.T) { wr := New(logger, env.topoServ, env.tmc) _, err := wr.WorkflowAction(ctx, workflow, keyspace, "listall", false) - require.Nil(t, err) + require.NoError(t, err) + _, err = wr.WorkflowAction(ctx, workflow, keyspace, "show", false) - require.Nil(t, err) + require.NoError(t, err) want := `Workflows: wrWorkflow { "Workflow": "wrWorkflow", diff --git a/go/vt/wrangler/wrangler_env_test.go b/go/vt/wrangler/wrangler_env_test.go index 9adaba6f62b..605dcc406ac 100644 --- a/go/vt/wrangler/wrangler_env_test.go +++ b/go/vt/wrangler/wrangler_env_test.go @@ -180,6 +180,8 @@ func newWranglerTestEnv(sourceShards, targetShards []string, query string, posit env.tmc.setVRResults(master.tablet, "select table_name, lastpk from _vt.copy_state where vrepl_id = 1", result) + env.tmc.setVRResults(master.tablet, "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message from _vt.vreplication where db_name = 'vt_target' and workflow = 'bad'", result) + env.tmc.vrpos[tabletID] = testSourceGtid env.tmc.pos[tabletID] = testTargetMasterPosition From 1215cca9eab8525cf385c063c58f2eaa42293e00 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Mon, 9 Nov 2020 22:44:31 +0100 Subject: [PATCH 3/3] Show error if invalid workflow is provided Signed-off-by: Rohit Nayak --- go/vt/wrangler/vexec.go | 4 ++++ go/vt/wrangler/vexec_test.go | 10 +++++++--- go/vt/wrangler/wrangler_env_test.go | 21 ++++++++++++++------- 3 files changed, 25 insertions(+), 10 deletions(-) diff --git a/go/vt/wrangler/vexec.go b/go/vt/wrangler/vexec.go index 81a4ef81e14..eeaa9e3a822 100644 --- a/go/vt/wrangler/vexec.go +++ b/go/vt/wrangler/vexec.go @@ -278,6 +278,7 @@ func (vx *vexec) getMasterForShard(shard string) (*topo.TabletInfo, error) { // WorkflowAction can start/stop/delete or list streams in _vt.vreplication on all masters in the target keyspace of the workflow. func (wr *Wrangler) WorkflowAction(ctx context.Context, workflow, keyspace, action string, dryRun bool) (map[*topo.TabletInfo]*sqltypes.Result, error) { + if action == "show" { replStatus, err := wr.ShowWorkflow(ctx, workflow, keyspace) if err != nil { @@ -464,6 +465,9 @@ func (wr *Wrangler) getStreams(ctx context.Context, workflow, keyspace string) ( for master, result := range results { var rsrStatus []*ReplicationStatus qr := sqltypes.Proto3ToResult(result) + if len(qr.Rows) == 0 { + continue + } for _, row := range qr.Rows { status, sk, err := wr.getReplicationStatusFromRow(ctx, row, master) if err != nil { diff --git a/go/vt/wrangler/vexec_test.go b/go/vt/wrangler/vexec_test.go index cfd004e255e..36bc266cfd8 100644 --- a/go/vt/wrangler/vexec_test.go +++ b/go/vt/wrangler/vexec_test.go @@ -26,7 +26,6 @@ import ( "time" "github.com/stretchr/testify/require" - "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/logutil" @@ -182,10 +181,15 @@ func TestWorkflowListStreams(t *testing.T) { _, err := wr.WorkflowAction(ctx, workflow, keyspace, "listall", false) require.NoError(t, err) + _, err = wr.WorkflowAction(ctx, workflow, "badks", "show", false) + require.Errorf(t, err, "node doesn't exist: keyspaces/badks/shards") + + _, err = wr.WorkflowAction(ctx, "badwf", keyspace, "show", false) + require.Errorf(t, err, "no streams found for workflow badwf in keyspace target") + logger.Clear() _, err = wr.WorkflowAction(ctx, workflow, keyspace, "show", false) require.NoError(t, err) - want := `Workflows: wrWorkflow -{ + want := `{ "Workflow": "wrWorkflow", "SourceLocation": { "Keyspace": "source", diff --git a/go/vt/wrangler/wrangler_env_test.go b/go/vt/wrangler/wrangler_env_test.go index 605dcc406ac..a9f7fc34f75 100644 --- a/go/vt/wrangler/wrangler_env_test.go +++ b/go/vt/wrangler/wrangler_env_test.go @@ -165,13 +165,6 @@ func newWranglerTestEnv(sourceShards, targetShards []string, query string, posit ) env.tmc.setVRResults(master.tablet, "select distinct workflow from _vt.vreplication where state != 'Stopped' and db_name = 'vt_target'", result) - result = sqltypes.MakeTestResult(sqltypes.MakeTestFields( - "workflow", - "varchar"), - "wrWorkflow", "wrWorkflow2", - ) - env.tmc.setVRResults(master.tablet, "select distinct workflow from _vt.vreplication where db_name = 'vt_target'", result) - result = sqltypes.MakeTestResult(sqltypes.MakeTestFields( "table|lastpk", "varchar|varchar"), @@ -182,6 +175,7 @@ func newWranglerTestEnv(sourceShards, targetShards []string, query string, posit env.tmc.setVRResults(master.tablet, "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message from _vt.vreplication where db_name = 'vt_target' and workflow = 'bad'", result) + env.tmc.setVRResults(master.tablet, "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message from _vt.vreplication where db_name = 'vt_target' and workflow = 'badwf'", &sqltypes.Result{}) env.tmc.vrpos[tabletID] = testSourceGtid env.tmc.pos[tabletID] = testTargetMasterPosition @@ -189,8 +183,21 @@ func newWranglerTestEnv(sourceShards, targetShards []string, query string, posit env.tmc.setVRResults(master.tablet, "update _vt.vreplication set state='Running', message='', stop_pos='' where db_name='vt_target' and workflow='wrWorkflow'", &sqltypes.Result{}) + result = sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "workflow", + "varchar"), + "wrWorkflow", "wrWorkflow2", + ) + env.tmc.setVRResults(master.tablet, "select distinct workflow from _vt.vreplication where db_name = 'vt_target'", result) tabletID += 10 } + master := env.addTablet(300, "target2", "0", topodatapb.TabletType_MASTER) + result := sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "workflow", + "varchar"), + "wrWorkflow", "wrWorkflow2", + ) + env.tmc.setVRResults(master.tablet, "select distinct workflow from _vt.vreplication where db_name = 'vt_target2'", result) wranglerEnv = env return env }