Skip to content

Commit

Permalink
Extend support for stream message in UpdateVReplicationWorkflow
Browse files Browse the repository at this point in the history
Signed-off-by: Noble Mittal <[email protected]>
  • Loading branch information
beingnoble03 committed Jan 9, 2025
1 parent f11a0a7 commit f6f42ba
Show file tree
Hide file tree
Showing 8 changed files with 388 additions and 284 deletions.
554 changes: 282 additions & 272 deletions go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go

Large diffs are not rendered by default.

48 changes: 48 additions & 0 deletions go/vt/proto/tabletmanagerdata/tabletmanagerdata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 3 additions & 6 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ const (
// Time to wait between LOCK TABLES cycles on the sources during SwitchWrites.
lockTablesCycleDelay = time.Duration(100 * time.Millisecond)

SqlFreezeWorkflow = "update _vt.vreplication set message = 'FROZEN' where db_name=%s and workflow=%s"
SqlUnfreezeWorkflow = "update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s"
)

Expand Down Expand Up @@ -725,7 +724,7 @@ func (s *Server) LookupVindexExternalize(ctx context.Context, req *vtctldatapb.L
}
resp.WorkflowDeleted = true
} else {
// Stop the workflow.
// Freeze the workflow.
err = forAllShards(targetShards, func(si *topo.ShardInfo) error {
tabletInfo, err := s.ts.GetTablet(ctx, si.PrimaryAlias)
if err != nil {
Expand All @@ -734,14 +733,12 @@ func (s *Server) LookupVindexExternalize(ctx context.Context, req *vtctldatapb.L
_, err = s.tmc.UpdateVReplicationWorkflow(ctx, tabletInfo.Tablet, &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: req.Name,
State: ptr.Of(binlogdatapb.VReplicationWorkflowState_Stopped),
Message: ptr.Of(Frozen),
})
if err != nil {
return vterrors.Wrapf(err, "failed to stop workflow %s on shard %s/%s", req.Name, tabletInfo.Keyspace, tabletInfo.Shard)
}
// Mark workflow as frozen.
query := fmt.Sprintf(SqlFreezeWorkflow, encodeString(tabletInfo.DbName()), encodeString(req.Name))
_, err = s.tmc.VReplicationExec(ctx, tabletInfo.Tablet, query)
return err
return nil
})
if err != nil {
return nil, err
Expand Down
8 changes: 6 additions & 2 deletions go/vt/vttablet/tabletmanager/rpc_vreplication.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ const (
// Retrieve the current configuration values for a workflow's vreplication stream(s).
sqlSelectVReplicationWorkflowConfig = "select id, source, cell, tablet_types, state, message from %s.vreplication where workflow = %a"
// Update the configuration values for a workflow's vreplication stream.
sqlUpdateVReplicationWorkflowStreamConfig = "update %s.vreplication set state = %a, source = %a, cell = %a, tablet_types = %a %s where id = %a"
sqlUpdateVReplicationWorkflowStreamConfig = "update %s.vreplication set state = %a, source = %a, cell = %a, tablet_types = %a, message = %a %s where id = %a"
// Update field values for multiple workflows. The final format specifier is
// used to optionally add any additional predicates to the query.
sqlUpdateVReplicationWorkflows = "update /*vt+ ALLOW_UNSAFE_VREPLICATION_WRITE */ %s.vreplication set%s where db_name = '%s'%s"
Expand Down Expand Up @@ -568,6 +568,9 @@ func (tm *TabletManager) UpdateVReplicationWorkflow(ctx context.Context, req *ta
if !textutil.ValueIsSimulatedNull(req.TabletTypes) {
tabletTypes = req.TabletTypes
}
if req.Message != nil && *req.Message != sqltypes.Null.String() {
message = *req.Message
}
tabletTypesStr := topoproto.MakeStringTypeCSV(tabletTypes)
if req.TabletSelectionPreference != nil &&
((inorder && *req.TabletSelectionPreference == tabletmanagerdatapb.TabletSelectionPreference_UNKNOWN) ||
Expand Down Expand Up @@ -609,9 +612,10 @@ func (tm *TabletManager) UpdateVReplicationWorkflow(ctx context.Context, req *ta
"sc": sqltypes.StringBindVariable(string(source)),
"cl": sqltypes.StringBindVariable(strings.Join(cells, ",")),
"tt": sqltypes.StringBindVariable(tabletTypesStr),
"ms": sqltypes.StringBindVariable(message),
"id": sqltypes.Int64BindVariable(id),
}
parsed = sqlparser.BuildParsedQuery(sqlUpdateVReplicationWorkflowStreamConfig, sidecar.GetIdentifier(), ":st", ":sc", ":cl", ":tt", options, ":id")
parsed = sqlparser.BuildParsedQuery(sqlUpdateVReplicationWorkflowStreamConfig, sidecar.GetIdentifier(), ":st", ":sc", ":cl", ":tt", ":ms", options, ":id")
stmt, err = parsed.GenerateQuery(bindVars, nil)
if err != nil {
return nil, err
Expand Down
5 changes: 1 addition & 4 deletions go/vt/vttablet/tabletmanager/rpc_vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2062,11 +2062,8 @@ func TestExternalizeLookupVindex(t *testing.T) {
fmt.Sprintf("%d|%s||primary|Stopped|", vreplID, bls),
), nil)
targetTablet.vrdbClient.ExpectRequest(idQuery, idRes, nil)
targetTablet.vrdbClient.ExpectRequest(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:"sourceks" shard:"0" filter:{rules:{match:"t1" filter:"select * from t1"}}', cell = '', tablet_types = '' where id in (1)`, &sqltypes.Result{}, nil)
targetTablet.vrdbClient.ExpectRequest(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:"sourceks" shard:"0" filter:{rules:{match:"t1" filter:"select * from t1"}}', cell = '', tablet_types = '', message = 'FROZEN' where id in (1)`, &sqltypes.Result{}, nil)
targetTablet.vrdbClient.ExpectRequest(`select * from _vt.vreplication where id = 1`, streamsResult, nil)

freezeQuery := fmt.Sprintf(workflow.SqlFreezeWorkflow, sqltypes.EncodeStringSQL("vt_targetks"), sqltypes.EncodeStringSQL(tcase.request.Name))
tenv.tmc.setVReplicationExecResults(targetTablet.tablet, freezeQuery, &sqltypes.Result{})
}
}

Expand Down
1 change: 1 addition & 0 deletions proto/tabletmanagerdata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,7 @@ message UpdateVReplicationWorkflowRequest {
optional binlogdata.VReplicationWorkflowState state = 6;
reserved 7; // unused, was: repeated string shards
map<string, string> config_overrides = 8;
optional string message = 9;
}

message UpdateVReplicationWorkflowResponse {
Expand Down
9 changes: 9 additions & 0 deletions web/vtadmin/src/proto/vtadmin.d.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 38 additions & 0 deletions web/vtadmin/src/proto/vtadmin.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit f6f42ba

Please sign in to comment.