Skip to content

Commit

Permalink
More tweaks
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jul 28, 2023
1 parent f2b7313 commit ca06b32
Showing 1 changed file with 11 additions and 12 deletions.
23 changes: 11 additions & 12 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"sync"
"time"

"golang.org/x/exp/maps"

"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -536,7 +538,10 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflowNa
// initialized properly before allowing new writes on the target.
sequenceMetadata := make(map[string]*sequenceMetadata)
// For sharded to sharded migrations the sequence must already be setup.
if ts.SourceKeyspaceSchema() != nil && ts.SourceKeyspaceSchema().Keyspace != nil && ts.SourceKeyspaceSchema().Keyspace.Sharded {
// For reshards the sequence usage is not changed.
if ts.workflowType == binlogdatapb.VReplicationWorkflowType_MoveTables &&
ts.SourceKeyspaceSchema() != nil && ts.SourceKeyspaceSchema().Keyspace != nil &&
!ts.SourceKeyspaceSchema().Keyspace.Sharded {
sequenceMetadata, err = ts.getSequenceMetadata(ctx)
if err != nil {
werr := vterrors.Wrapf(err, "getSequenceMetadata failed")
Expand Down Expand Up @@ -1961,6 +1966,7 @@ func (ts *trafficSwitcher) getSequenceMetadata(ctx context.Context) (map[string]
return nil, nil
}

targetDBName := maps.Values(ts.Targets())[0].GetPrimary().DbName()
sequencesByBackingTable := make(map[string]*sequenceMetadata)
for _, table := range ts.Tables() {
vs, ok := vschema.Tables[table]
Expand All @@ -1972,8 +1978,7 @@ func (ts *trafficSwitcher) getSequenceMetadata(ctx context.Context) (map[string]
backingTableName: vs.AutoIncrement.Sequence,
usingTableName: table,
usingTableDefinition: vs,
// TODO: get and set this properly to deal with db_name_overrides
usingTableDBName: "vt_" + ts.targetKeyspace,
usingTableDBName: targetDBName,
}
// If the sequence table is fully qualified in the vschema then
// we don't need to find it later.
Expand Down Expand Up @@ -2035,17 +2040,11 @@ func (ts *trafficSwitcher) getSequenceMetadata(ctx context.Context) (map[string]
if tableDef != nil && tableDef.Type == vindexes.TypeSequence &&
sm != nil && tableName == sm.backingTableName {
tablesFound++
// If the sequence backing table is being moved then we do not
// want to initialize it.
if keyspace == ts.targetKeyspace {
delete(sequencesByBackingTable, tableName)
continue
}
sm.backingTableKeyspace = keyspace
// TODO: get and set this properly in order to deal with db_name_overrides
sm.backingTableDBName = "vt_" + keyspace
if tablesFound == tableCount {
log.Errorf("DEBUG: sequence backing tables found: %+v", sequencesByBackingTable)
log.Errorf("DEBUG: sequence backing table found: %+v", sequencesByBackingTable)
return sequencesByBackingTable, nil
}
}
Expand Down Expand Up @@ -2086,7 +2085,7 @@ func (ts *trafficSwitcher) initializeTargetSequenceTables(ctx context.Context, s
sqlescape.EscapeID(sequenceMetadata.usingTableDBName),
sqlescape.EscapeID(sequenceMetadata.usingTableName),
)
ts.Logger().Errorf("DEBUG: query: %s on shard: %s", query.Query, target.GetShard().ShardName())
log.Errorf("DEBUG: query: %s on shard: %s", query.Query, target.GetShard().ShardName())
qr, err := ts.wr.ExecuteFetchAsApp(ctx, target.GetPrimary().GetAlias(), true, query.Query, 1)
if err != nil || len(qr.Rows) != 1 {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get max used value for target table %s in order to initialize the backing sequence table %s: %v",
Expand All @@ -2097,7 +2096,7 @@ func (ts *trafficSwitcher) initializeTargetSequenceTables(ctx context.Context, s
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get max used value for target table %s in order to initialize the backing sequence table %s: %v",
sequenceMetadata.usingTableName, ts.targetKeyspace, err)
}
ts.Logger().Errorf("DEBUG: max ID seen on shard %s: %d", target.GetShard().ShardName(), maxID)
log.Errorf("DEBUG: max ID seen on shard %s: %d", target.GetShard().ShardName(), maxID)
srMu.Lock()
shardResults = append(shardResults, maxID)
srMu.Unlock()
Expand Down

0 comments on commit ca06b32

Please sign in to comment.