Skip to content

Commit

Permalink
VtctldClient Reshard: add e2e tests to confirm CLI options and fix di…
Browse files Browse the repository at this point in the history
…scovered issues. (#15353)

Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps authored Feb 26, 2024
1 parent 24b0579 commit 491e416
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 44 deletions.
12 changes: 5 additions & 7 deletions go/cmd/vtctldclient/command/vreplication/reshard/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,18 @@ func commandReshardCreate(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

req := &vtctldatapb.ReshardCreateRequest{
Workflow: common.BaseOptions.Workflow,
Keyspace: common.BaseOptions.TargetKeyspace,

Workflow: common.BaseOptions.Workflow,
Keyspace: common.BaseOptions.TargetKeyspace,
TabletTypes: common.CreateOptions.TabletTypes,
TabletSelectionPreference: tsp,
Cells: common.CreateOptions.Cells,
OnDdl: common.CreateOptions.OnDDL,
DeferSecondaryKeys: common.CreateOptions.DeferSecondaryKeys,
AutoStart: common.CreateOptions.AutoStart,
StopAfterCopy: common.CreateOptions.StopAfterCopy,

SourceShards: reshardCreateOptions.sourceShards,
TargetShards: reshardCreateOptions.targetShards,
SkipSchemaCopy: reshardCreateOptions.skipSchemaCopy,
SourceShards: reshardCreateOptions.sourceShards,
TargetShards: reshardCreateOptions.targetShards,
SkipSchemaCopy: reshardCreateOptions.skipSchemaCopy,
}
resp, err := common.GetClient().ReshardCreate(common.GetCommandCtx(), req)
if err != nil {
Expand Down
191 changes: 165 additions & 26 deletions go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"google.golang.org/protobuf/encoding/protojson"

"vitess.io/vitess/go/test/endtoend/cluster"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand All @@ -54,20 +53,33 @@ func TestVtctldclientCLI(t *testing.T) {
require.NotNil(t, zone2)
defer vc.TearDown()

sourceKeyspace := "product"
targetKeyspace := "customer"
sourceKeyspaceName := "product"
targetKeyspaceName := "customer"
var mt iMoveTables
workflowName := "wf1"
targetTabs := setupMinimalCustomerKeyspace(t)

t.Run("MoveTablesCreateFlags1", func(t *testing.T) {
testMoveTablesFlags1(t, &mt, sourceKeyspace, targetKeyspace, workflowName, targetTabs)
testMoveTablesFlags1(t, &mt, sourceKeyspaceName, targetKeyspaceName, workflowName, targetTabs)
})
t.Run("MoveTablesCreateFlags2", func(t *testing.T) {
testMoveTablesFlags2(t, &mt, sourceKeyspace, targetKeyspace, workflowName, targetTabs)
testMoveTablesFlags2(t, &mt, sourceKeyspaceName, targetKeyspaceName, workflowName, targetTabs)
})
t.Run("MoveTablesCompleteFlags3", func(t *testing.T) {
testMoveTablesFlags3(t, sourceKeyspaceName, targetKeyspaceName, targetTabs)
})
t.Run("MoveTablesCompleteFlags", func(t *testing.T) {
testMoveTablesFlags3(t, sourceKeyspace, targetKeyspace, targetTabs)
t.Run("Reshard", func(t *testing.T) {
cell := vc.Cells["zone1"]
targetKeyspace := cell.Keyspaces[targetKeyspaceName]
sourceShard := "-80"
newShards := "-40,40-80"
require.NoError(t, vc.AddShards(t, []*Cell{cell}, targetKeyspace, newShards, 1, 0, 400, nil))
reshardWorkflowName := "reshard"
tablets := map[string]*cluster.VttabletProcess{
"-40": targetKeyspace.Shards["-40"].Tablets["zone1-400"].Vttablet,
"40-80": targetKeyspace.Shards["40-80"].Tablets["zone1-500"].Vttablet,
}
splitShard(t, targetKeyspaceName, reshardWorkflowName, sourceShard, newShards, tablets)
})
}

Expand All @@ -81,34 +93,31 @@ func testMoveTablesFlags1(t *testing.T, mt *iMoveTables, sourceKeyspace, targetK
}
completeFlags := []string{"--keep-routing-rules", "--keep-data"}
switchFlags := []string{}
// Test one set of MoveTable flags.
*mt = createMoveTables(t, sourceKeyspace, targetKeyspace, workflowName, tables, createFlags, completeFlags, switchFlags)
(*mt).Show()
moveTablesOutput := (*mt).GetLastOutput()
// Test one set of MoveTable flags.
moveTablesResponse := getMoveTablesShowResponse(mt)
workflowResponse := getWorkflow(targetKeyspace, workflowName)

workflowOutput, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", "customer", "show", "--workflow", "wf1")
require.NoError(t, err)
var moveTablesResponse vtctldatapb.GetWorkflowsResponse
err = protojson.Unmarshal([]byte(moveTablesOutput), &moveTablesResponse)
require.NoError(t, err)

var workflowResponse vtctldatapb.GetWorkflowsResponse
err = protojson.Unmarshal([]byte(workflowOutput), &workflowResponse)
require.NoError(t, err)

moveTablesResponse.Workflows[0].MaxVReplicationTransactionLag = 0
moveTablesResponse.Workflows[0].MaxVReplicationLag = 0
workflowResponse.Workflows[0].MaxVReplicationTransactionLag = 0
workflowResponse.Workflows[0].MaxVReplicationLag = 0
// also validates that MoveTables Show and Workflow Show return the same output.
require.EqualValues(t, moveTablesResponse.CloneVT(), workflowResponse.CloneVT())
require.EqualValues(t, moveTablesResponse.CloneVT(), workflowResponse)

// Validate that the flags are set correctly in the database.
validateWorkflow1(t, workflowResponse.Workflows)
validateMoveTablesWorkflow(t, workflowResponse.Workflows)
// Since we used --no-routing-rules, there should be no routing rules.
confirmNoRoutingRules(t)
}

func getMoveTablesShowResponse(mt *iMoveTables) *vtctldatapb.GetWorkflowsResponse {
moveTablesOutput := (*mt).GetLastOutput()
var moveTablesResponse vtctldatapb.GetWorkflowsResponse
err := protojson.Unmarshal([]byte(moveTablesOutput), &moveTablesResponse)
require.NoError(vc.t, err)
moveTablesResponse.Workflows[0].MaxVReplicationTransactionLag = 0
moveTablesResponse.Workflows[0].MaxVReplicationLag = 0
return moveTablesResponse.CloneVT()
}

// Validates some of the flags created from the previous test.
func testMoveTablesFlags2(t *testing.T, mt *iMoveTables, sourceKeyspace, targetKeyspace, workflowName string, targetTabs map[string]*cluster.VttabletProcess) {
ksWorkflow := fmt.Sprintf("%s.%s", targetKeyspace, workflowName)
Expand Down Expand Up @@ -184,6 +193,135 @@ func createMoveTables(t *testing.T, sourceKeyspace, targetKeyspace, workflowName
return mt
}

// reshard helpers

func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards string, targetTabs map[string]*cluster.VttabletProcess) {
createFlags := []string{"--auto-start=false", "--defer-secondary-keys=false", "--stop-after-copy",
"--on-ddl", "STOP", "--tablet-types", "primary,rdonly", "--tablet-types-in-preference-order=true",
"--all-cells", "--format=json",
}
rs := newReshard(vc, &reshardWorkflow{
workflowInfo: &workflowInfo{
vc: vc,
workflowName: workflowName,
targetKeyspace: keyspace,
},
sourceShards: sourceShards,
targetShards: targetShards,
createFlags: createFlags,
}, workflowFlavorVtctld)

ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflowName)
rs.Create()
validateReshardResponse(rs)
workflowResponse := getWorkflow(keyspace, workflowName)
reshardShowResponse := getReshardShowResponse(&rs)
require.EqualValues(t, reshardShowResponse, workflowResponse)
validateReshardWorkflow(t, workflowResponse.Workflows)
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, workflowName), binlogdatapb.VReplicationWorkflowState_Stopped.String())
rs.Start()
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Stopped.String())
for _, tab := range targetTabs {
alias := fmt.Sprintf("zone1-%d", tab.TabletUID)
query := "update _vt.vreplication set source := replace(source, 'stop_after_copy:true', 'stop_after_copy:false') where db_name = 'vt_customer' and workflow = '" + workflowName + "'"
output, err := vc.VtctlClient.ExecuteCommandWithOutput("ExecuteFetchAsDba", alias, query)
require.NoError(t, err, output)
}
rs.Start()
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, workflowName), binlogdatapb.VReplicationWorkflowState_Running.String())
rs.Stop()
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Stopped.String())
rs.Start()
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, workflowName), binlogdatapb.VReplicationWorkflowState_Running.String())
for _, targetTab := range targetTabs {
catchup(t, targetTab, workflowName, "Reshard")
}
vdiff(t, keyspace, workflowName, "zone1", false, true, nil)

rs.SwitchReadsAndWrites()
waitForLowLag(t, keyspace, workflowName+"_reverse")
vdiff(t, keyspace, workflowName+"_reverse", "zone1", true, false, nil)

rs.ReverseReadsAndWrites()
waitForLowLag(t, keyspace, workflowName)
vdiff(t, keyspace, workflowName, "zone1", false, true, nil)
rs.SwitchReadsAndWrites()
rs.Complete()
}

func getReshardShowResponse(rs *iReshard) *vtctldatapb.GetWorkflowsResponse {
(*rs).Show()
reshardOutput := (*rs).GetLastOutput()
var reshardResponse vtctldatapb.GetWorkflowsResponse
err := protojson.Unmarshal([]byte(reshardOutput), &reshardResponse)
require.NoError(vc.t, err)
reshardResponse.Workflows[0].MaxVReplicationTransactionLag = 0
reshardResponse.Workflows[0].MaxVReplicationLag = 0
return reshardResponse.CloneVT()
}

func validateReshardResponse(rs iReshard) {
resp := getReshardResponse(rs)
require.NotNil(vc.t, resp)
require.NotNil(vc.t, resp.ShardStreams)
require.Equal(vc.t, len(resp.ShardStreams), 2)
keyspace := "customer"
for _, shard := range []string{"-40", "40-80"} {
streams := resp.ShardStreams[fmt.Sprintf("%s/%s", keyspace, shard)]
require.Equal(vc.t, 1, len(streams.Streams))
require.Equal(vc.t, binlogdatapb.VReplicationWorkflowState_Stopped.String(), streams.Streams[0].Status)
}
}

func validateReshardWorkflow(t *testing.T, workflows []*vtctldatapb.Workflow) {
require.Equal(t, 1, len(workflows))
wf := workflows[0]
require.Equal(t, "reshard", wf.Name)
require.Equal(t, binlogdatapb.VReplicationWorkflowType_Reshard.String(), wf.WorkflowType)
require.Equal(t, "None", wf.WorkflowSubType)
require.Equal(t, "customer", wf.Target.Keyspace)
require.Equal(t, 2, len(wf.Target.Shards))
require.Equal(t, "customer", wf.Source.Keyspace)
require.Equal(t, 1, len(wf.Source.Shards))
require.False(t, wf.DeferSecondaryKeys)

require.GreaterOrEqual(t, len(wf.ShardStreams), int(1))
oneStream := maps.Values(wf.ShardStreams)[0]
require.NotNil(t, oneStream)

stream := oneStream.Streams[0]
require.Equal(t, binlogdatapb.VReplicationWorkflowState_Stopped.String(), stream.State)
require.Equal(t, stream.TabletSelectionPreference, tabletmanagerdatapb.TabletSelectionPreference_INORDER)
require.True(t, slices.Equal([]topodatapb.TabletType{topodatapb.TabletType_PRIMARY, topodatapb.TabletType_RDONLY}, stream.TabletTypes))
require.True(t, slices.Equal([]string{"zone1", "zone2"}, stream.Cells))

bls := stream.BinlogSource
require.Equal(t, binlogdatapb.OnDDLAction_STOP, bls.OnDdl)
require.True(t, bls.StopAfterCopy)

}

func getReshardResponse(rs iReshard) *vtctldatapb.WorkflowStatusResponse {
reshardOutput := rs.GetLastOutput()
var reshardResponse vtctldatapb.WorkflowStatusResponse
err := protojson.Unmarshal([]byte(reshardOutput), &reshardResponse)
require.NoError(vc.t, err)
return reshardResponse.CloneVT()
}

// helper functions

func getWorkflow(targetKeyspace, workflow string) *vtctldatapb.GetWorkflowsResponse {
workflowOutput, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", targetKeyspace, "show", "--workflow", workflow)
require.NoError(vc.t, err)
var workflowResponse vtctldatapb.GetWorkflowsResponse
err = protojson.Unmarshal([]byte(workflowOutput), &workflowResponse)
require.NoError(vc.t, err)
workflowResponse.Workflows[0].MaxVReplicationTransactionLag = 0
workflowResponse.Workflows[0].MaxVReplicationLag = 0
return workflowResponse.CloneVT()
}

func checkTablesExist(t *testing.T, tabletAlias string, tables []string) bool {
tablesResponse, err := vc.VtctldClient.ExecuteCommandWithOutput("GetSchema", tabletAlias, "--tables", strings.Join(tables, ","), "--table-names-only")
require.NoError(t, err)
Expand Down Expand Up @@ -211,6 +349,7 @@ func getRoutingRules(t *testing.T) *vschemapb.RoutingRules {
require.NoError(t, err)
return &routingRulesResponse
}

func confirmNoRoutingRules(t *testing.T) {
routingRulesResponse := getRoutingRules(t)
require.Zero(t, len(routingRulesResponse.Rules))
Expand All @@ -223,7 +362,7 @@ func confirmRoutingRulesExist(t *testing.T) {

// We only want to validate non-standard attributes that are set by the CLI. The other end-to-end tests validate the rest.
// We also check some of the standard attributes to make sure they are set correctly.
func validateWorkflow1(t *testing.T, workflows []*vtctldatapb.Workflow) {
func validateMoveTablesWorkflow(t *testing.T, workflows []*vtctldatapb.Workflow) {
require.Equal(t, 1, len(workflows))
wf := workflows[0]
require.Equal(t, "wf1", wf.Name)
Expand Down
31 changes: 22 additions & 9 deletions go/test/endtoend/vreplication/wrappers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ type moveTablesWorkflow struct {
tables string
atomicCopy bool
sourceShards string
createFlags []string // currently only used by vtctld

// currently only used by vtctld
lastOutput string
createFlags []string
completeFlags []string
switchFlags []string
}
Expand Down Expand Up @@ -270,7 +271,12 @@ type reshardWorkflow struct {
targetShards string
skipSchemaCopy bool

lastOutput string
// currently only used by vtctld
lastOutput string
createFlags []string
completeFlags []string
cancelFlags []string
switchFlags []string
}

type iReshard interface {
Expand Down Expand Up @@ -379,8 +385,9 @@ func (v VtctldReshard) Flavor() string {
func (v VtctldReshard) exec(args ...string) {
args2 := []string{"Reshard", "--workflow=" + v.workflowName, "--target-keyspace=" + v.targetKeyspace}
args2 = append(args2, args...)
if err := vc.VtctldClient.ExecuteCommand(args2...); err != nil {
v.vc.t.Fatalf("failed to create Reshard workflow: %v", err)
var err error
if v.lastOutput, err = vc.VtctldClient.ExecuteCommandWithOutput(args2...); err != nil {
v.vc.t.Fatalf("failed to create Reshard workflow: %v: %s", err, v.lastOutput)
}
}

Expand All @@ -395,20 +402,22 @@ func (v VtctldReshard) Create() {
if v.skipSchemaCopy {
args = append(args, "--skip-schema-copy="+strconv.FormatBool(v.skipSchemaCopy))
}
args = append(args, v.createFlags...)
v.exec(args...)
}

func (v VtctldReshard) SwitchReadsAndWrites() {
v.exec("SwitchTraffic")
args := []string{"SwitchTraffic"}
args = append(args, v.switchFlags...)
v.exec(args...)
}

func (v VtctldReshard) ReverseReadsAndWrites() {
v.exec("ReverseTraffic")
}

func (v VtctldReshard) Show() {
//TODO implement me
panic("implement me")
v.exec("Show")
}

func (v VtctldReshard) SwitchReads() {
Expand All @@ -422,11 +431,15 @@ func (v VtctldReshard) SwitchWrites() {
}

func (v VtctldReshard) Cancel() {
v.exec("Cancel")
args := []string{"Cancel"}
args = append(args, v.cancelFlags...)
v.exec(args...)
}

func (v VtctldReshard) Complete() {
v.exec("Complete")
args := []string{"Complete"}
args = append(args, v.completeFlags...)
v.exec(args...)
}

func (v VtctldReshard) GetLastOutput() string {
Expand Down
11 changes: 9 additions & 2 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1672,7 +1672,11 @@ func (s *Server) ReshardCreate(ctx context.Context, req *vtctldatapb.ReshardCrea
log.Errorf("%w", err2)
return nil, err
}
rs, err := s.buildResharder(ctx, keyspace, req.Workflow, req.SourceShards, req.TargetShards, strings.Join(cells, ","), "")
tabletTypesStr := topoproto.MakeStringTypeCSV(req.TabletTypes)
if req.TabletSelectionPreference == tabletmanagerdatapb.TabletSelectionPreference_INORDER {
tabletTypesStr = discovery.InOrderHint + tabletTypesStr
}
rs, err := s.buildResharder(ctx, keyspace, req.Workflow, req.SourceShards, req.TargetShards, strings.Join(cells, ","), tabletTypesStr)
if err != nil {
return nil, vterrors.Wrap(err, "buildResharder")
}
Expand All @@ -1695,7 +1699,10 @@ func (s *Server) ReshardCreate(ctx context.Context, req *vtctldatapb.ReshardCrea
} else {
log.Warningf("Streams will not be started since --auto-start is set to false")
}
return nil, nil
return s.WorkflowStatus(ctx, &vtctldatapb.WorkflowStatusRequest{
Keyspace: keyspace,
Workflow: req.Workflow,
})
}

// VDiffCreate is part of the vtctlservicepb.VtctldServer interface.
Expand Down

0 comments on commit 491e416

Please sign in to comment.