Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-16.0] VDiff: properly split cell values in record when using TabletPicker (#14099) #14102

Merged
merged 2 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 20 additions & 13 deletions go/test/endtoend/vreplication/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,23 +103,27 @@ var testCases = []*testCase{
}

func TestVDiff2(t *testing.T) {
allCellNames = "zone1"
defaultCellName := "zone1"
allCellNames = "zone5,zone1,zone2,zone3,zone4"
sourceKs := "product"
sourceShards := []string{"0"}
targetKs := "customer"
targetShards := []string{"-80", "80-"}
// This forces us to use multiple vstream packets even with small test tables
extraVTTabletArgs = []string{"--vstream_packet_size=1"}

vc = NewVitessCluster(t, "TestVDiff2", []string{allCellNames}, mainClusterConfig)
vc = NewVitessCluster(t, "TestVDiff2", strings.Split(allCellNames, ","), mainClusterConfig)
require.NotNil(t, vc)
defaultCell = vc.Cells[defaultCellName]
cells := []*Cell{defaultCell}
zone1 := vc.Cells["zone1"]
zone2 := vc.Cells["zone2"]
zone3 := vc.Cells["zone3"]
defaultCell = zone1

defer vc.TearDown(t)

vc.AddKeyspace(t, cells, sourceKs, strings.Join(sourceShards, ","), initialProductVSchema, initialProductSchema, 0, 0, 100, sourceKsOpts)
// The primary tablet is only added in the first cell.
// We ONLY add primary tablets in this test.
_, err := vc.AddKeyspace(t, []*Cell{zone2, zone1, zone3}, sourceKs, strings.Join(sourceShards, ","), initialProductVSchema, initialProductSchema, 0, 0, 100, sourceKsOpts)
require.NoError(t, err)

vtgate = defaultCell.Vtgates[0]
require.NotNil(t, vtgate)
Expand All @@ -139,23 +143,25 @@ func TestVDiff2(t *testing.T) {

generateMoreCustomers(t, sourceKs, 100)

_, err := vc.AddKeyspace(t, cells, targetKs, strings.Join(targetShards, ","), customerVSchema, customerSchema, 0, 0, 200, targetKsOpts)
// The primary tablet is only added in the first cell.
// We ONLY add primary tablets in this test.
tks, err := vc.AddKeyspace(t, []*Cell{zone3, zone1, zone2}, targetKs, strings.Join(targetShards, ","), customerVSchema, customerSchema, 0, 0, 200, targetKsOpts)
require.NoError(t, err)
for _, shard := range targetShards {
require.NoError(t, cluster.WaitForHealthyShard(vc.VtctldClient, targetKs, shard))
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
testWorkflow(t, vc, tc, cells)
// Primary tablets for any new shards are added in the first cell.
testWorkflow(t, vc, tc, tks, []*Cell{zone3, zone2, zone1})
})
}
}

func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, cells []*Cell) {
func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, cells []*Cell) {
arrTargetShards := strings.Split(tc.targetShards, ",")
if tc.typ == "Reshard" {
tks := vc.Cells[cells[0].Name].Keyspaces[tc.targetKs]
require.NoError(t, vc.AddShards(t, cells, tks, tc.targetShards, 0, 0, tc.tabletBaseID, targetKsOpts))
for _, shard := range arrTargetShards {
require.NoError(t, cluster.WaitForHealthyShard(vc.VtctldClient, tc.targetKs, shard))
Expand All @@ -168,6 +174,7 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, cells []*Cell)
if tc.typ == "Reshard" {
args = append(args, "--source_shards", tc.sourceShards, "--target_shards", tc.targetShards)
}
args = append(args, "--cells", allCellNames)
args = append(args, "--tables", tc.tables)
args = append(args, "Create")
args = append(args, ksWorkflow)
Expand All @@ -180,14 +187,14 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, cells []*Cell)
updateTableStats(t, tab, tc.tables) // need to do this in order to test progress reports
}

vdiff(t, tc.targetKs, tc.workflow, cells[0].Name, true, true, nil)
vdiff(t, tc.targetKs, tc.workflow, allCellNames, true, true, nil)

if tc.autoRetryError {
testAutoRetryError(t, tc, cells[0].Name)
testAutoRetryError(t, tc, allCellNames)
}

if tc.resume {
testResume(t, tc, cells[0].Name)
testResume(t, tc, allCellNames)
}

// These are done here so that we have a valid workflow to test the commands against
Expand Down
6 changes: 5 additions & 1 deletion go/test/endtoend/vreplication/vdiff_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,11 @@ func doVdiff2(t *testing.T, keyspace, workflow, cells string, want *expectedVDif

func performVDiff2Action(t *testing.T, ksWorkflow, cells, action, actionArg string, expectError bool, extraFlags ...string) (uuid string, output string) {
var err error
args := []string{"VDiff", "--", "--tablet_types=primary", "--source_cell=" + cells, "--format=json"}
// This will always result in us using a PRIMARY tablet, which is all
// we start in many e2e tests, but it avoids the tablet picker logic
// where when you ONLY specify the PRIMARY type it then picks the
// shard's primary and ignores any cell settings.
args := []string{"VDiff", "--", "--tablet_types=in_order:primary,replica", "--source_cell=" + cells, "--format=json"}
if len(extraFlags) > 0 {
args = append(args, extraFlags...)
}
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletmanager/vdiff/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package vdiff
import (
"context"
"fmt"
"strings"
"testing"

"github.com/google/uuid"
Expand Down Expand Up @@ -106,8 +107,8 @@ func TestVDiff(t *testing.T) {
MaxRows: 100,
},
PickerOptions: &tabletmanagerdatapb.VDiffPickerOptions{
SourceCell: tstenv.Cells[0],
TargetCell: tstenv.Cells[0],
SourceCell: strings.Join(tstenv.Cells, ","),
TargetCell: strings.Join(tstenv.Cells, ","),
TabletTypes: "primary",
},
ReportOptions: &tabletmanagerdatapb.VDiffReportOptions{
Expand Down
51 changes: 29 additions & 22 deletions go/vt/vttablet/tabletmanager/vdiff/table_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -121,7 +122,7 @@ func (td *tableDiffer) initialize(ctx context.Context) error {
}
}()

if err := td.selectTablets(ctx, td.wd.opts.PickerOptions.SourceCell, td.wd.opts.PickerOptions.TabletTypes); err != nil {
if err := td.selectTablets(ctx); err != nil {
return err
}
if err := td.syncSourceStreams(ctx); err != nil {
Expand Down Expand Up @@ -199,16 +200,22 @@ func (td *tableDiffer) forEachSource(cb func(source *migrationSource) error) err
return allErrors.AggrError(vterrors.Aggregate)
}

func (td *tableDiffer) selectTablets(ctx context.Context, cell, tabletTypes string) error {
var wg sync.WaitGroup
ct := td.wd.ct
var err1, err2 error
func (td *tableDiffer) selectTablets(ctx context.Context) error {
var (
wg sync.WaitGroup
sourceErr, targetErr error
targetTablet *topodata.Tablet
)

// The cells from the vdiff record are a comma separated list.
sourceCells := strings.Split(td.wd.opts.PickerOptions.SourceCell, ",")
targetCells := strings.Split(td.wd.opts.PickerOptions.TargetCell, ",")

// For Mount+Migrate, the source tablets will be in a different
// Vitess cluster with its own TopoServer.
sourceTopoServer := ct.ts
if ct.externalCluster != "" {
extTS, err := ct.ts.OpenExternalVitessClusterServer(ctx, ct.externalCluster)
sourceTopoServer := td.wd.ct.ts
if td.wd.ct.externalCluster != "" {
extTS, err := td.wd.ct.ts.OpenExternalVitessClusterServer(ctx, td.wd.ct.externalCluster)
if err != nil {
return err
}
Expand All @@ -217,39 +224,39 @@ func (td *tableDiffer) selectTablets(ctx context.Context, cell, tabletTypes stri
wg.Add(1)
go func() {
defer wg.Done()
err1 = td.forEachSource(func(source *migrationSource) error {
tablet, err := pickTablet(ctx, sourceTopoServer, cell, ct.sourceKeyspace, source.shard, tabletTypes)
sourceErr = td.forEachSource(func(source *migrationSource) error {
sourceTablet, err := pickTablet(ctx, sourceTopoServer, sourceCells, td.wd.ct.sourceKeyspace, source.shard, td.wd.opts.PickerOptions.TabletTypes)
if err != nil {
return err
}
source.tablet = tablet
source.tablet = sourceTablet
return nil
})
}()

wg.Add(1)
go func() {
defer wg.Done()
tablet, err2 := pickTablet(ctx, ct.ts, td.wd.opts.PickerOptions.TargetCell, ct.vde.thisTablet.Keyspace,
ct.vde.thisTablet.Shard, td.wd.opts.PickerOptions.TabletTypes)
if err2 != nil {
targetTablet, targetErr = pickTablet(ctx, td.wd.ct.ts, targetCells, td.wd.ct.vde.thisTablet.Keyspace, td.wd.ct.vde.thisTablet.Shard,
td.wd.opts.PickerOptions.TabletTypes)
if targetErr != nil {
return
}
ct.targetShardStreamer = &shardStreamer{
tablet: tablet,
shard: tablet.Shard,
td.wd.ct.targetShardStreamer = &shardStreamer{
tablet: targetTablet,
shard: targetTablet.Shard,
}
}()

wg.Wait()
if err1 != nil {
return err1
if sourceErr != nil {
return sourceErr
}
return err2
return targetErr
}

func pickTablet(ctx context.Context, ts *topo.Server, cell, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) {
tp, err := discovery.NewTabletPicker(ts, []string{cell}, keyspace, shard, tabletTypes)
func pickTablet(ctx context.Context, ts *topo.Server, cells []string, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) {
tp, err := discovery.NewTabletPicker(ts, cells, keyspace, shard, tabletTypes)
if err != nil {
return nil, err
}
Expand Down