Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vtctld Workflow command: minor fixes #7008

Merged
merged 3 commits into from
Nov 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions go/vt/wrangler/vexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ func (vx *vexec) getMasterForShard(shard string) (*topo.TabletInfo, error) {

// WorkflowAction can start/stop/delete or list streams in _vt.vreplication on all masters in the target keyspace of the workflow.
func (wr *Wrangler) WorkflowAction(ctx context.Context, workflow, keyspace, action string, dryRun bool) (map[*topo.TabletInfo]*sqltypes.Result, error) {

if action == "show" {
replStatus, err := wr.ShowWorkflow(ctx, workflow, keyspace)
if err != nil {
Expand All @@ -286,11 +287,11 @@ func (wr *Wrangler) WorkflowAction(ctx context.Context, workflow, keyspace, acti
err = dumpStreamListAsJSON(replStatus, wr)
return nil, err
} else if action == "listall" {
workflows, err := wr.ListAllWorkflows(ctx, keyspace)
workflows, err := wr.ListAllWorkflows(ctx, keyspace, false)
if err != nil {
return nil, err
}
wr.printWorkflowList(workflows)
wr.printWorkflowList(keyspace, workflows)
return nil, err
}
results, err := wr.execWorkflowAction(ctx, workflow, keyspace, action, dryRun)
Expand Down Expand Up @@ -464,6 +465,9 @@ func (wr *Wrangler) getStreams(ctx context.Context, workflow, keyspace string) (
for master, result := range results {
var rsrStatus []*ReplicationStatus
qr := sqltypes.Proto3ToResult(result)
if len(qr.Rows) == 0 {
continue
}
for _, row := range qr.Rows {
status, sk, err := wr.getReplicationStatusFromRow(ctx, row, master)
if err != nil {
Expand Down Expand Up @@ -502,9 +506,18 @@ func (wr *Wrangler) getStreams(ctx context.Context, workflow, keyspace string) (
return &rsr, nil
}

// ListAllWorkflows will return a list of all active workflows for the given keyspace.
func (wr *Wrangler) ListAllWorkflows(ctx context.Context, keyspace string) ([]string, error) {
query := "select distinct workflow from _vt.vreplication where state <> 'Stopped'"
// ListActiveWorkflows will return a list of all active workflows for the given keyspace.
func (wr *Wrangler) ListActiveWorkflows(ctx context.Context, keyspace string) ([]string, error) {
return wr.ListAllWorkflows(ctx, keyspace, true)
}

// ListAllWorkflows will return a list of all workflows (Running and Stopped) for the given keyspace.
func (wr *Wrangler) ListAllWorkflows(ctx context.Context, keyspace string, active bool) ([]string, error) {
where := ""
if active {
where = " where state <> 'Stopped'"
}
query := "select distinct workflow from _vt.vreplication" + where
results, err := wr.runVexec(ctx, "", keyspace, query, false)
if err != nil {
return nil, err
Expand Down Expand Up @@ -559,12 +572,13 @@ func dumpStreamListAsJSON(replStatus *ReplicationStatusResult, wr *Wrangler) err
return nil
}

func (wr *Wrangler) printWorkflowList(workflows []string) {
func (wr *Wrangler) printWorkflowList(keyspace string, workflows []string) {
list := strings.Join(workflows, ", ")
if list == "" {
wr.Logger().Printf("No workflows found in keyspace %s", keyspace)
return
}
wr.Logger().Printf("Workflows: %v", list)
wr.Logger().Printf("Following workflow(s) found in keyspace %s: %v\n", keyspace, list)
}

func (wr *Wrangler) getCopyState(ctx context.Context, tablet *topo.TabletInfo, id int64) ([]copyState, error) {
Expand Down
21 changes: 15 additions & 6 deletions go/vt/wrangler/vexec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/logutil"
Expand Down Expand Up @@ -180,11 +179,17 @@ func TestWorkflowListStreams(t *testing.T) {
wr := New(logger, env.topoServ, env.tmc)

_, err := wr.WorkflowAction(ctx, workflow, keyspace, "listall", false)
require.Nil(t, err)
require.NoError(t, err)

_, err = wr.WorkflowAction(ctx, workflow, "badks", "show", false)
require.Errorf(t, err, "node doesn't exist: keyspaces/badks/shards")

_, err = wr.WorkflowAction(ctx, "badwf", keyspace, "show", false)
require.Errorf(t, err, "no streams found for workflow badwf in keyspace target")
logger.Clear()
_, err = wr.WorkflowAction(ctx, workflow, keyspace, "show", false)
require.Nil(t, err)
want := `Workflows: wrWorkflow
{
require.NoError(t, err)
want := `{
"Workflow": "wrWorkflow",
"SourceLocation": {
"Keyspace": "source",
Expand Down Expand Up @@ -330,9 +335,13 @@ func TestWorkflowListAll(t *testing.T) {
logger := logutil.NewMemoryLogger()
wr := New(logger, env.topoServ, env.tmc)

workflows, err := wr.ListAllWorkflows(ctx, keyspace)
workflows, err := wr.ListAllWorkflows(ctx, keyspace, true)
require.Nil(t, err)
require.Equal(t, []string{workflow}, workflows)

workflows, err = wr.ListAllWorkflows(ctx, keyspace, false)
require.Nil(t, err)
require.Equal(t, []string{workflow, "wrWorkflow2"}, workflows)
}

func TestVExecValidations(t *testing.T) {
Expand Down
16 changes: 16 additions & 0 deletions go/vt/wrangler/wrangler_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,31 @@ func newWranglerTestEnv(sourceShards, targetShards []string, query string, posit

env.tmc.setVRResults(master.tablet, "select table_name, lastpk from _vt.copy_state where vrepl_id = 1", result)

env.tmc.setVRResults(master.tablet, "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message from _vt.vreplication where db_name = 'vt_target' and workflow = 'bad'", result)

env.tmc.setVRResults(master.tablet, "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message from _vt.vreplication where db_name = 'vt_target' and workflow = 'badwf'", &sqltypes.Result{})
env.tmc.vrpos[tabletID] = testSourceGtid
env.tmc.pos[tabletID] = testTargetMasterPosition

env.tmc.waitpos[tabletID+1] = testTargetMasterPosition

env.tmc.setVRResults(master.tablet, "update _vt.vreplication set state='Running', message='', stop_pos='' where db_name='vt_target' and workflow='wrWorkflow'", &sqltypes.Result{})

result = sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"workflow",
"varchar"),
"wrWorkflow", "wrWorkflow2",
)
env.tmc.setVRResults(master.tablet, "select distinct workflow from _vt.vreplication where db_name = 'vt_target'", result)
tabletID += 10
}
master := env.addTablet(300, "target2", "0", topodatapb.TabletType_MASTER)
result := sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"workflow",
"varchar"),
"wrWorkflow", "wrWorkflow2",
)
env.tmc.setVRResults(master.tablet, "select distinct workflow from _vt.vreplication where db_name = 'vt_target2'", result)
wranglerEnv = env
return env
}
Expand Down