Skip to content

Commit

Permalink
Fix workflow update bug
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Nov 3, 2023
1 parent 1045dd5 commit fcc631d
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 270 deletions.
184 changes: 2 additions & 182 deletions go/vt/vtctl/workflow/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"strings"
"sync"
"text/template"
"time"

"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -142,144 +141,6 @@ func (mz *materializer) createWorkflowStreams(req *tabletmanagerdatapb.CreateVRe
})
}

// createMaterializerStreams creates the vreplication streams for Materialize
// and LookupVindex workflows.
func (mz *materializer) createMaterializerStreams() error {
if err := validateNewWorkflow(mz.ctx, mz.ts, mz.tmc, mz.ms.TargetKeyspace, mz.ms.Workflow); err != nil {
return err
}
err := mz.buildMaterializer()
if err != nil {
return err
}
if err := mz.deploySchema(); err != nil {
return err
}
insertMap := make(map[string]string, len(mz.targetShards))
for _, targetShard := range mz.targetShards {
sourceShards := mz.filterSourceShards(targetShard)
// streamKeyRangesEqual allows us to optimize the stream for the cases
// where while the target keyspace may be sharded, the target shard has
// a single source shard to stream data from and the target and source
// shard have equal key ranges. This can be done, for example, when doing
// shard by shard migrations -- migrating a single shard at a time between
// sharded source and sharded target keyspaces.
streamKeyRangesEqual := false
if len(sourceShards) == 1 && key.KeyRangeEqual(sourceShards[0].KeyRange, targetShard.KeyRange) {
streamKeyRangesEqual = true
}
inserts, err := mz.generateInserts(mz.ctx, sourceShards, streamKeyRangesEqual)
if err != nil {
return err
}
insertMap[key.KeyRangeString(targetShard.KeyRange)] = inserts
}
if err := mz.createStreams(mz.ctx, insertMap); err != nil {
return err
}
return nil
}

func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*topo.ShardInfo, keyRangesEqual bool) (string, error) {
ig := vreplication.NewInsertGenerator(binlogdatapb.VReplicationWorkflowState_Stopped, "{{.dbname}}")

for _, sourceShard := range sourceShards {
bls := &binlogdatapb.BinlogSource{
Keyspace: mz.ms.SourceKeyspace,
Shard: sourceShard.ShardName(),
Filter: &binlogdatapb.Filter{},
StopAfterCopy: mz.ms.StopAfterCopy,
ExternalCluster: mz.ms.ExternalCluster,
SourceTimeZone: mz.ms.SourceTimeZone,
TargetTimeZone: mz.ms.TargetTimeZone,
OnDdl: binlogdatapb.OnDDLAction(binlogdatapb.OnDDLAction_value[mz.ms.OnDdl]),
}
for _, ts := range mz.ms.TableSettings {
rule := &binlogdatapb.Rule{
Match: ts.TargetTable,
}

if ts.SourceExpression == "" {
bls.Filter.Rules = append(bls.Filter.Rules, rule)
continue
}

// Validate non-empty query.
stmt, err := sqlparser.Parse(ts.SourceExpression)
if err != nil {
return "", err
}
sel, ok := stmt.(*sqlparser.Select)
if !ok {
return "", fmt.Errorf("unrecognized statement: %s", ts.SourceExpression)
}
filter := ts.SourceExpression
if !keyRangesEqual && mz.targetVSchema.Keyspace.Sharded && mz.targetVSchema.Tables[ts.TargetTable].Type != vindexes.TypeReference {
cv, err := vindexes.FindBestColVindex(mz.targetVSchema.Tables[ts.TargetTable])
if err != nil {
return "", err
}
mappedCols := make([]*sqlparser.ColName, 0, len(cv.Columns))
for _, col := range cv.Columns {
colName, err := matchColInSelect(col, sel)
if err != nil {
return "", err
}
mappedCols = append(mappedCols, colName)
}
subExprs := make(sqlparser.SelectExprs, 0, len(mappedCols)+2)
for _, mappedCol := range mappedCols {
subExprs = append(subExprs, &sqlparser.AliasedExpr{Expr: mappedCol})
}
vindexName := fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name)
subExprs = append(subExprs, &sqlparser.AliasedExpr{Expr: sqlparser.NewStrLiteral(vindexName)})
subExprs = append(subExprs, &sqlparser.AliasedExpr{Expr: sqlparser.NewStrLiteral("{{.keyrange}}")})
inKeyRange := &sqlparser.FuncExpr{
Name: sqlparser.NewIdentifierCI("in_keyrange"),
Exprs: subExprs,
}
if sel.Where != nil {
sel.Where = &sqlparser.Where{
Type: sqlparser.WhereClause,
Expr: &sqlparser.AndExpr{
Left: inKeyRange,
Right: sel.Where.Expr,
},
}
} else {
sel.Where = &sqlparser.Where{
Type: sqlparser.WhereClause,
Expr: inKeyRange,
}
}

filter = sqlparser.String(sel)
}

rule.Filter = filter

bls.Filter.Rules = append(bls.Filter.Rules, rule)
}
workflowSubType := binlogdatapb.VReplicationWorkflowSubType_None
if mz.isPartial {
workflowSubType = binlogdatapb.VReplicationWorkflowSubType_Partial
}
var workflowType binlogdatapb.VReplicationWorkflowType
switch mz.ms.MaterializationIntent {
case vtctldatapb.MaterializationIntent_CUSTOM:
workflowType = binlogdatapb.VReplicationWorkflowType_Materialize
case vtctldatapb.MaterializationIntent_MOVETABLES:
workflowType = binlogdatapb.VReplicationWorkflowType_MoveTables
case vtctldatapb.MaterializationIntent_CREATELOOKUPINDEX:
workflowType = binlogdatapb.VReplicationWorkflowType_CreateLookupIndex
}
ig.AddRow(mz.ms.Workflow, bls, "", mz.ms.Cell, mz.ms.TabletTypes,
workflowType,
workflowSubType, mz.ms.DeferSecondaryKeys)
}
return ig.String(), nil
}

func (mz *materializer) generateBinlogSources(ctx context.Context, targetShard *topo.ShardInfo, sourceShards []*topo.ShardInfo, keyRangesEqual bool) ([]*binlogdatapb.BinlogSource, error) {
blses := make([]*binlogdatapb.BinlogSource, 0, len(mz.sourceShards))
for _, sourceShard := range sourceShards {
Expand Down Expand Up @@ -553,37 +414,13 @@ func (mz *materializer) buildMaterializer() error {
return nil
}

func (mz *materializer) createStreams(ctx context.Context, insertsMap map[string]string) error {
return forAllShards(mz.targetShards, func(target *topo.ShardInfo) error {
keyRange := key.KeyRangeString(target.KeyRange)
inserts := insertsMap[keyRange]
targetPrimary, err := mz.ts.GetTablet(ctx, target.PrimaryAlias)
if err != nil {
return vterrors.Wrapf(err, "GetTablet(%v) failed", target.PrimaryAlias)
}
buf := &strings.Builder{}
t := template.Must(template.New("").Parse(inserts))
input := map[string]string{
"keyrange": keyRange,
"dbname": targetPrimary.DbName(),
}
if err := t.Execute(buf, input); err != nil {
return err
}
if _, err := mz.tmc.VReplicationExec(ctx, targetPrimary.Tablet, buf.String()); err != nil {
return err
}
return nil
})
}

func (mz *materializer) startStreams(ctx context.Context) error {
return forAllShards(mz.targetShards, func(target *topo.ShardInfo) error {
targetPrimary, err := mz.ts.GetTablet(ctx, target.PrimaryAlias)
if err != nil {
return vterrors.Wrapf(err, "GetTablet(%v) failed", target.PrimaryAlias)
}
req := &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
if _, err := mz.tmc.UpdateVReplicationWorkflow(ctx, targetPrimary.Tablet, &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
Workflow: mz.ms.Workflow,
State: binlogdatapb.VReplicationWorkflowState_Running,
// Don't change anything else, so pass simulated NULLs.
Expand All @@ -592,30 +429,13 @@ func (mz *materializer) startStreams(ctx context.Context) error {
topodatapb.TabletType(textutil.SimulatedNullInt),
},
OnDdl: binlogdatapb.OnDDLAction(textutil.SimulatedNullInt),
}
if _, err := mz.tmc.UpdateVReplicationWorkflow(ctx, targetPrimary.Tablet, req); err != nil {
}); err != nil {
return vterrors.Wrap(err, "failed to update workflow")
}
return nil
})
}

func Materialize(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, ms *vtctldatapb.MaterializeSettings) error {
mz := &materializer{
ctx: ctx,
ts: ts,
sourceTs: ts,
tmc: tmc,
ms: ms,
}

err := mz.createMaterializerStreams()
if err != nil {
return err
}
return mz.startStreams(ctx)
}

func (mz *materializer) forAllTargets(f func(*topo.ShardInfo) error) error {
var wg sync.WaitGroup
allErrors := &concurrency.AllErrorRecorder{}
Expand Down
8 changes: 2 additions & 6 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1248,20 +1248,16 @@ func (s *Server) Materialize(ctx context.Context, ms *vtctldatapb.MaterializeSet
ms: ms,
}

tsp := tabletmanagerdatapb.TabletSelectionPreference_INORDER
tt, inOrder, err := discovery.ParseTabletTypesAndOrder(ms.TabletTypes)
tt, err := topoproto.ParseTabletTypes(ms.TabletTypes)
if err != nil {
return err
}
if inOrder {
tsp = tabletmanagerdatapb.TabletSelectionPreference_INORDER
}

err = mz.createWorkflowStreams(&tabletmanagerdatapb.CreateVReplicationWorkflowRequest{
Workflow: ms.Workflow,
Cells: strings.Split(ms.Cell, ","),
TabletTypes: tt,
TabletSelectionPreference: tsp,
TabletSelectionPreference: ms.TabletSelectionPreference,
WorkflowType: mz.getWorkflowType(),
DeferSecondaryKeys: ms.DeferSecondaryKeys,
AutoStart: true,
Expand Down
Loading

0 comments on commit fcc631d

Please sign in to comment.