Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Usage of VReplicationExec For _vt.vreplication Reads #14424

Merged
merged 46 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
52daf18
Generalize new workflow creation path
mattlord Nov 2, 2023
e9b94d8
Move first VReplication usage
mattlord Nov 2, 2023
12e1a60
Correct generalized create flow
mattlord Nov 2, 2023
1045dd5
Get Materialize creation working w/o VReplicationExec
mattlord Nov 2, 2023
fcc631d
Fix workflow update bug
mattlord Nov 2, 2023
c7637e7
Merge remote-tracking branch 'origin/main' into rm_vrepl_exec
mattlord Jan 3, 2024
4e0a542
Add HasVReplicationWorkflows and ReadVReplicationWorkflows RPCs
mattlord Jan 3, 2024
e7c3f00
Bug fixes
mattlord Jan 3, 2024
5d2f427
Begin moving GetWorkflows away from VReplicationExec
mattlord Jan 5, 2024
e764623
Fix concurrency for scanWorkflow
mattlord Jan 8, 2024
b17dfee
Re-add lost workflow data
mattlord Jan 8, 2024
98b6ace
Merge remote-tracking branch 'origin/main' into rm_vrepl_exec
mattlord Jan 8, 2024
cd33853
Add unit tests for new RPCs
mattlord Jan 9, 2024
2e095ca
Merge remote-tracking branch 'origin/main' into rm_vrepl_exec
mattlord Jan 9, 2024
bb37806
WiP unit test adjustments
mattlord Jan 9, 2024
7d251c4
Merge remote-tracking branch 'origin/main' into rm_vrepl_exec
mattlord Jan 23, 2024
5adc834
Add back tablet type and cell info
mattlord Jan 23, 2024
1ceee08
Fixup TestMoveTables unit test
mattlord Jan 23, 2024
f76b42b
Leave wrangler / vtctlclient implementation unchanged
mattlord Jan 24, 2024
812960e
Fixup TestSourceShardSelection unit test
mattlord Jan 24, 2024
ee28bc1
Fixup the TestFailedMoveTablesCreateCleanup unit test
mattlord Jan 24, 2024
b3371a5
Fixup TestMigrateVSchema unit test
mattlord Jan 24, 2024
c2a00b1
Fixup other materializer unit tests
mattlord Jan 24, 2024
4056b30
Migrate materializer unit tests from workflow to tabletmanager
mattlord Jan 25, 2024
d0735cd
Merge remote-tracking branch 'origin/main' into rm_vrepl_exec
mattlord Jan 28, 2024
1af708f
Merge remote-tracking branch 'origin/main' into rm_vrepl_exec
mattlord Feb 2, 2024
c7baccd
Changes after self review
mattlord Feb 6, 2024
5138001
Merge remote-tracking branch 'origin/main' into rm_vrepl_exec
mattlord Feb 6, 2024
c8b264d
Re-correct scanWorkflow concurrency
mattlord Feb 6, 2024
47c04f6
Minor tweaks from self review
mattlord Feb 6, 2024
7e50a5b
Merge remote-tracking branch 'origin/main' into rm_vrepl_exec
mattlord Feb 9, 2024
561a154
Merge remote-tracking branch 'origin/main' into rm_vrepl_exec
mattlord Feb 12, 2024
9162c6f
Modify stream check to catch unmigrated streams
mattlord Feb 12, 2024
9831ee5
Add RPC to update the state for multiple workflows and address review…
mattlord Mar 5, 2024
c2ae9a4
Merge remote-tracking branch 'origin/main' into rm_vrepl_exec
mattlord Mar 5, 2024
c82c8e5
Fix db_name usage in new RPC
mattlord Mar 5, 2024
292c4c6
Several things...
mattlord Mar 6, 2024
798c509
Minor changes after self review of new code
mattlord Mar 6, 2024
bb1219c
Address review comments
mattlord Mar 10, 2024
7a1b5d9
Address more review comments
mattlord Mar 10, 2024
a97f7ae
Merge remote-tracking branch 'origin/main' into rm_vrepl_exec
mattlord Mar 10, 2024
94d59b3
Fixup uncaught merge conflict
mattlord Mar 10, 2024
c910193
Comment nit
mattlord Mar 10, 2024
f2d64ea
Also split out query building for ReadVReplicationWorkflows
mattlord Mar 10, 2024
35492ed
Stop needlessly passing db_name from vtctld to vttablet
mattlord Mar 10, 2024
60f7d05
Minor changes after self review of new code
mattlord Mar 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@

ms := &vtctldatapb.MaterializeSettings{
Workflow: common.BaseOptions.Workflow,
MaterializationIntent: vtctldatapb.MaterializationIntent_CUSTOM,

Check warning on line 96 in go/cmd/vtctldclient/command/vreplication/materialize/create.go

View check run for this annotation

Codecov / codecov/patch

go/cmd/vtctldclient/command/vreplication/materialize/create.go#L96

Added line #L96 was not covered by tests
TargetKeyspace: common.BaseOptions.TargetKeyspace,
SourceKeyspace: createOptions.SourceKeyspace,
TableSettings: createOptions.TableSettings.val,
Expand Down
20 changes: 18 additions & 2 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,11 +568,27 @@ func isTableInDenyList(t *testing.T, vc *VitessCluster, ksShard string, table st
return found, nil
}

func expectNumberOfStreams(t *testing.T, vtgateConn *mysql.Conn, name string, workflow string, database string, want int) {
query := sqlparser.BuildParsedQuery("select count(*) from %s.vreplication where workflow='%s'", sidecarDBIdentifier, workflow).Query
// expectNumberOfStreams waits for the given number of streams to be present and
// by default RUNNING. If you want to wait for different states, then you can
// pass in the state(s) you want to wait for.
func expectNumberOfStreams(t *testing.T, vtgateConn *mysql.Conn, name string, workflow string, database string, want int, states ...string) {
var query string
if len(states) == 0 {
states = append(states, binlogdatapb.VReplicationWorkflowState_Running.String())
}
query = sqlparser.BuildParsedQuery("select count(*) from %s.vreplication where workflow='%s' and state in ('%s')",
sidecarDBIdentifier, workflow, strings.Join(states, "','")).Query
waitForQueryResult(t, vtgateConn, database, query, fmt.Sprintf(`[[INT64(%d)]]`, want))
}

// confirmAllStreamsRunning confirms that all of the migrated streams are running
// after a Reshard.
func confirmAllStreamsRunning(t *testing.T, vtgateConn *mysql.Conn, database string) {
query := sqlparser.BuildParsedQuery("select count(*) from %s.vreplication where state != '%s'",
sidecarDBIdentifier, binlogdatapb.VReplicationWorkflowState_Running.String()).Query
waitForQueryResult(t, vtgateConn, database, query, `[[INT64(0)]]`)
}

func printShardPositions(vc *VitessCluster, ksShards []string) {
for _, ksShard := range ksShards {
output, err := vc.VtctlClient.ExecuteCommandWithOutput("ShardReplicationPositions", ksShard)
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/vreplication/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestVtctlMigrate(t *testing.T) {
"--source=ext1.rating", "create", ksWorkflow); err != nil {
t.Fatalf("Migrate command failed with %+v : %s\n", err, output)
}
expectNumberOfStreams(t, vtgateConn, "migrate", "e1", "product:0", 1)
expectNumberOfStreams(t, vtgateConn, "migrate", "e1", "product:0", 1, binlogdatapb.VReplicationWorkflowState_Stopped.String())
waitForRowCount(t, vtgateConn, "product:0", "rating", 0)
waitForRowCount(t, vtgateConn, "product:0", "review", 0)
if output, err = vc.VtctlClient.ExecuteCommandWithOutput("Migrate", "cancel", ksWorkflow); err != nil {
Expand Down Expand Up @@ -267,7 +267,7 @@ func TestVtctldMigrate(t *testing.T) {
"--mount-name", "ext1", "--all-tables", "--auto-start=false", "--cells=extcell1")
require.NoError(t, err, "Migrate command failed with %s", output)

expectNumberOfStreams(t, vtgateConn, "migrate", "e1", "product:0", 1)
expectNumberOfStreams(t, vtgateConn, "migrate", "e1", "product:0", 1, binlogdatapb.VReplicationWorkflowState_Stopped.String())
waitForRowCount(t, vtgateConn, "product:0", "rating", 0)
waitForRowCount(t, vtgateConn, "product:0", "review", 0)
output, err = vc.VtctldClient.ExecuteCommandWithOutput("Migrate",
Expand Down
3 changes: 3 additions & 0 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,10 +338,13 @@ func testVreplicationWorkflows(t *testing.T, limited bool, binlogRowImage string

insertMoreCustomers(t, 16)
reshardCustomer2to4Split(t, nil, "")
confirmAllStreamsRunning(t, vtgateConn, "customer:-40")
expectNumberOfStreams(t, vtgateConn, "Customer2to4", "sales", "product:0", 4)
reshardCustomer3to2SplitMerge(t)
confirmAllStreamsRunning(t, vtgateConn, "customer:-60")
expectNumberOfStreams(t, vtgateConn, "Customer3to2", "sales", "product:0", 3)
reshardCustomer3to1Merge(t)
confirmAllStreamsRunning(t, vtgateConn, "customer:0")
expectNumberOfStreams(t, vtgateConn, "Customer3to1", "sales", "product:0", 1)

t.Run("Verify CopyState Is Optimized Afterwards", func(t *testing.T) {
Expand Down
3 changes: 1 addition & 2 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,8 +558,7 @@ type VRSettings struct {
DeferSecondaryKeys bool
}

// ReadVRSettings retrieves the throttler settings for
// vreplication from the checkpoint table.
// ReadVRSettings retrieves the settings for a vreplication stream.
func ReadVRSettings(dbClient DBClient, uid int32) (VRSettings, error) {
query := fmt.Sprintf("select pos, stop_pos, max_tps, max_replication_lag, state, workflow_type, workflow, workflow_sub_type, defer_secondary_keys from _vt.vreplication where id=%v", uid)
qr, err := dbClient.ExecuteFetch(query, 1)
Expand Down
6 changes: 4 additions & 2 deletions go/vt/binlog/binlogplayer/mock_dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ func (dc *MockDBClient) Close() {

// ExecuteFetch is part of the DBClient interface
func (dc *MockDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Result, err error) {
// Serialize ExecuteFetch to enforce a strict order on shared dbClients.
dc.expectMu.Lock()
defer dc.expectMu.Unlock()

dc.t.Helper()
msg := "DBClient query: %v"
if dc.Tag != "" {
Expand All @@ -195,8 +199,6 @@ func (dc *MockDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Re
}
}

dc.expectMu.Lock()
defer dc.expectMu.Unlock()
if dc.currentResult >= len(dc.expect) {
msg := "DBClientMock: query: %s, no more requests are expected"
if dc.Tag != "" {
Expand Down
11 changes: 11 additions & 0 deletions go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"vitess.io/vitess/go/vt/vttablet/tabletconn"

querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)
Expand Down Expand Up @@ -75,6 +76,16 @@
}
)

// BuildTabletTypesString is a helper to build a serialized string representation of
// the tablet type(s) and optional in order clause for later use with the TabletPicker.
func BuildTabletTypesString(tabletTypes []topodatapb.TabletType, tabletSelectionPreference tabletmanagerdatapb.TabletSelectionPreference) string {
tabletTypesStr := topoproto.MakeStringTypeCSV(tabletTypes)
if tabletSelectionPreference == tabletmanagerdatapb.TabletSelectionPreference_INORDER {
tabletTypesStr = InOrderHint + tabletTypesStr

Check warning on line 84 in go/vt/discovery/tablet_picker.go

View check run for this annotation

Codecov / codecov/patch

go/vt/discovery/tablet_picker.go#L84

Added line #L84 was not covered by tests
}
return tabletTypesStr
}

// GetTabletPickerRetryDelay synchronizes changes to tabletPickerRetryDelay. Used in tests only at the moment
func GetTabletPickerRetryDelay() time.Duration {
muTabletPickerRetryDelay.Lock()
Expand Down
Loading
Loading