Skip to content

Commit

Permalink
VReplication: Add traffic state to vtctldclient workflow status output (
Browse files Browse the repository at this point in the history
#14280)

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Oct 15, 2023
1 parent 314ebcf commit 2663df2
Show file tree
Hide file tree
Showing 10 changed files with 233 additions and 139 deletions.
6 changes: 5 additions & 1 deletion go/cmd/vtctldclient/command/vreplication/common/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ func GetStatusCommand(opts *SubCommandsOpts) *cobra.Command {
}

func commandStatus(cmd *cobra.Command, args []string) error {
format, err := GetOutputFormat(cmd)
if err != nil {
return err
}
cli.FinishedParsing(cmd)

req := &vtctldatapb.WorkflowStatusRequest{
Expand All @@ -51,7 +55,7 @@ func commandStatus(cmd *cobra.Command, args []string) error {
return err
}

if err = OutputStatusResponse(resp, "json"); err != nil {
if err = OutputStatusResponse(resp, format); err != nil {
return err
}

Expand Down
4 changes: 3 additions & 1 deletion go/cmd/vtctldclient/command/vreplication/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,11 @@ func OutputStatusResponse(resp *vtctldatapb.WorkflowStatusResponse, format strin
shardstream.Id, BaseOptions.TargetKeyspace, tablet, shardstream.Status, shardstream.Info))
}
}
tout.WriteString("\nTraffic State: ")
tout.WriteString(resp.TrafficState)
output = tout.Bytes()
}
fmt.Printf("%s\n", output)
fmt.Println(string(output))
return nil
}

Expand Down
22 changes: 11 additions & 11 deletions go/test/endtoend/vreplication/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,17 +221,17 @@ func TestVtctldMigrate(t *testing.T) {
var output, expected string

t.Run("mount external cluster", func(t *testing.T) {
output, err := vc.VtctldClient.ExecuteCommandWithOutput("Mount", "Register", "--topo-type=etcd2",
output, err := vc.VtctldClient.ExecuteCommandWithOutput("Mount", "register", "--topo-type=etcd2",
fmt.Sprintf("--topo-server=localhost:%d", extVc.ClusterConfig.topoPort), "--topo-root=/vitess/global", "ext1")
require.NoError(t, err, "Mount Register command failed with %s", output)

output, err = vc.VtctldClient.ExecuteCommandWithOutput("Mount", "List")
output, err = vc.VtctldClient.ExecuteCommandWithOutput("Mount", "list")
require.NoError(t, err, "Mount List command failed with %s", output)

names := gjson.Get(output, "names")
require.Equal(t, 1, len(names.Array()))
require.Equal(t, "ext1", names.Array()[0].String())
output, err = vc.VtctldClient.ExecuteCommandWithOutput("Mount", "Show", "ext1")
output, err = vc.VtctldClient.ExecuteCommandWithOutput("Mount", "show", "ext1")
require.NoError(t, err, "Mount command failed with %s\n", output)

require.Equal(t, "etcd2", gjson.Get(output, "topo_type").String())
Expand All @@ -244,7 +244,7 @@ func TestVtctldMigrate(t *testing.T) {
t.Run("migrate from external cluster", func(t *testing.T) {
if output, err = vc.VtctldClient.ExecuteCommandWithOutput("Migrate",
"--target-keyspace", "product", "--workflow", "e1",
"Create", "--source-keyspace", "rating", "--mount-name", "ext1", "--all-tables", "--cells=extcell1", "--tablet-types=primary,replica"); err != nil {
"create", "--source-keyspace", "rating", "--mount-name", "ext1", "--all-tables", "--cells=extcell1", "--tablet-types=primary,replica"); err != nil {
t.Fatalf("Migrate command failed with %+v : %s\n", err, output)
}
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
Expand All @@ -258,21 +258,21 @@ func TestVtctldMigrate(t *testing.T) {
vdiffSideBySide(t, ksWorkflow, "extcell1")

output, err = vc.VtctldClient.ExecuteCommandWithOutput("Migrate",
"--target-keyspace", "product", "--workflow", "e1", "Show")
"--target-keyspace", "product", "--workflow", "e1", "show")
require.NoError(t, err, "Migrate command failed with %s", output)

wf := gjson.Get(output, "workflows").Array()[0]
require.Equal(t, "e1", wf.Get("name").String())
require.Equal(t, "Migrate", wf.Get("workflow_type").String())

output, err = vc.VtctldClient.ExecuteCommandWithOutput("Migrate",
"--target-keyspace", "product", "--workflow", "e1", "Progress")
"--target-keyspace", "product", "--workflow", "e1", "status", "--format=json")
require.NoError(t, err, "Migrate command failed with %s", output)

require.Equal(t, "Running", gjson.Get(output, "shard_streams.product/0.streams.0.status").String())

output, err = vc.VtctldClient.ExecuteCommandWithOutput("Migrate",
"--target-keyspace", "product", "--workflow", "e1", "Complete")
"--target-keyspace", "product", "--workflow", "e1", "complete")
require.NoError(t, err, "Migrate command failed with %s", output)

expectNumberOfStreams(t, vtgateConn, "migrate", "e1", "product:0", 0)
Expand All @@ -288,7 +288,7 @@ func TestVtctldMigrate(t *testing.T) {
waitForRowCount(t, vtgateConn, "product:0", "rating", 0)
waitForRowCount(t, vtgateConn, "product:0", "review", 0)
output, err = vc.VtctldClient.ExecuteCommandWithOutput("Migrate",
"--target-keyspace", "product", "--workflow", "e1", "Cancel")
"--target-keyspace", "product", "--workflow", "e1", "cancel")
require.NoError(t, err, "Migrate command failed with %s", output)

expectNumberOfStreams(t, vtgateConn, "migrate", "e1", "product:0", 0)
Expand All @@ -302,15 +302,15 @@ func TestVtctldMigrate(t *testing.T) {
})

t.Run("unmount external cluster", func(t *testing.T) {
output, err = vc.VtctldClient.ExecuteCommandWithOutput("Mount", "Unregister", "ext1")
output, err = vc.VtctldClient.ExecuteCommandWithOutput("Mount", "unregister", "ext1")
require.NoError(t, err, "Mount command failed with %s\n", output)

output, err = vc.VtctldClient.ExecuteCommandWithOutput("Mount", "List")
output, err = vc.VtctldClient.ExecuteCommandWithOutput("Mount", "list")
require.NoError(t, err, "Mount command failed with %+v : %s\n", output)
expected = "{}\n"
require.Equal(t, expected, output)

output, err = vc.VtctldClient.ExecuteCommandWithOutput("Mount", "Show", "ext1")
output, err = vc.VtctldClient.ExecuteCommandWithOutput("Mount", "show", "ext1")
require.Errorf(t, err, "there is no vitess cluster named ext1")
})
}
Loading

0 comments on commit 2663df2

Please sign in to comment.