Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc: refactor to replace map of changefeed targets with array #75430

Merged
merged 1 commit into from
Feb 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions pkg/ccl/changefeedccl/alter_changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,17 @@ func alterChangefeedPlanHook(
targetDescs = append(targetDescs, descs...)
}

newTargets, err := getTargets(ctx, p, targetDescs, details.Opts)
newTargets, newTables, err := getTargetsAndTables(ctx, p, targetDescs, details.Opts)
if err != nil {
return err
}
// add old targets
for id, target := range details.Targets {
newTargets[id] = target
for id, table := range details.Tables {
newTables[id] = table
}
details.Targets = newTargets
details.Tables = newTables
details.TargetSpecifications = append(details.TargetSpecifications, newTargets...)

}

if opts.DropTargets != nil {
Expand All @@ -129,12 +131,21 @@ func alterChangefeedPlanHook(
if err := p.CheckPrivilege(ctx, desc, privilege.SELECT); err != nil {
return err
}
delete(details.Targets, table.GetID())
delete(details.Tables, table.GetID())
}
}

newTargetSpecifications := make([]jobspb.ChangefeedTargetSpecification, len(details.TargetSpecifications)-len(opts.DropTargets))
for _, ts := range details.TargetSpecifications {
if _, stillThere := details.Tables[ts.TableID]; stillThere {
newTargetSpecifications = append(newTargetSpecifications, ts)
}
}
details.TargetSpecifications = newTargetSpecifications

}

if len(details.Targets) == 0 {
if len(details.Tables) == 0 {
return errors.Errorf("cannot drop all targets for changefeed job %d", jobID)
}

Expand All @@ -152,7 +163,7 @@ func alterChangefeedPlanHook(
}

var targets tree.TargetList
for _, target := range details.Targets {
for _, target := range details.Tables {
targetName := tree.MakeTableNameFromPrefix(tree.ObjectNamePrefix{}, tree.Name(target.StatementTimeName))
targets.Tables = append(targets.Tables, &targetName)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,15 @@ func createBenchmarkChangefeed(
tableDesc := desctestutils.TestingGetPublicTableDescriptor(s.DB(), keys.SystemSQLCodec, database, table)
spans := []roachpb.Span{tableDesc.PrimaryIndexSpan(keys.SystemSQLCodec)}
details := jobspb.ChangefeedDetails{
Targets: jobspb.ChangefeedTargets{tableDesc.GetID(): jobspb.ChangefeedTarget{
Tables: jobspb.ChangefeedTargets{tableDesc.GetID(): jobspb.ChangefeedTargetTable{
StatementTimeName: tableDesc.GetName(),
}},
Opts: map[string]string{
changefeedbase.OptEnvelope: string(changefeedbase.OptEnvelopeRow),
},
}
initialHighWater := hlc.Timestamp{}
encoder, err := makeJSONEncoder(details.Opts, details.Targets)
encoder, err := makeJSONEncoder(details.Opts, AllTargets(details))
if err != nil {
return nil, nil, err
}
Expand All @@ -228,7 +228,7 @@ func createBenchmarkChangefeed(
Clock: feedClock,
Gossip: gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)),
Spans: spans,
Targets: details.Targets,
Targets: AllTargets(details),
Writer: buf,
Metrics: &metrics.KVFeedMetrics,
MM: mm,
Expand Down
16 changes: 9 additions & 7 deletions pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func createProtectedTimestampRecord(
ctx context.Context,
codec keys.SQLCodec,
jobID jobspb.JobID,
targets jobspb.ChangefeedTargets,
targets []jobspb.ChangefeedTargetSpecification,
resolved hlc.Timestamp,
progress *jobspb.ChangefeedProgress,
) *ptpb.Record {
Expand All @@ -69,19 +69,21 @@ func createProtectedTimestampRecord(
jobsprotectedts.Jobs, targetToProtect)
}

func makeTargetToProtect(targets jobspb.ChangefeedTargets) *ptpb.Target {
func makeTargetToProtect(targets []jobspb.ChangefeedTargetSpecification) *ptpb.Target {
// NB: We add 1 because we're also going to protect system.descriptors.
// We protect system.descriptors because a changefeed needs all of the history
// of table descriptors to version data.
tablesToProtect := make(descpb.IDs, 0, len(targets)+1)
for t := range targets {
tablesToProtect = append(tablesToProtect, t)
for _, t := range targets {
tablesToProtect = append(tablesToProtect, t.TableID)
}
tablesToProtect = append(tablesToProtect, keys.DescriptorTableID)
return ptpb.MakeSchemaObjectsTarget(tablesToProtect)
}

func makeSpansToProtect(codec keys.SQLCodec, targets jobspb.ChangefeedTargets) []roachpb.Span {
func makeSpansToProtect(
codec keys.SQLCodec, targets []jobspb.ChangefeedTargetSpecification,
) []roachpb.Span {
// NB: We add 1 because we're also going to protect system.descriptors.
// We protect system.descriptors because a changefeed needs all of the history
// of table descriptors to version data.
Expand All @@ -93,8 +95,8 @@ func makeSpansToProtect(codec keys.SQLCodec, targets jobspb.ChangefeedTargets) [
EndKey: tablePrefix.PrefixEnd(),
})
}
for t := range targets {
addTablePrefix(uint32(t))
for _, t := range targets {
addTablePrefix(uint32(t.TableID))
}
addTablePrefix(keys.DescriptorTableID)
return spansToProtect
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func distChangefeedFlow(
spansTS = spansTS.Next()
}
var err error
trackedSpans, err = fetchSpansForTargets(ctx, execCfg, details.Targets, spansTS)
trackedSpans, err = fetchSpansForTargets(ctx, execCfg, AllTargets(details), spansTS)
if err != nil {
return err
}
Expand All @@ -121,7 +121,7 @@ func distChangefeedFlow(
func fetchSpansForTargets(
ctx context.Context,
execCfg *sql.ExecutorConfig,
targets jobspb.ChangefeedTargets,
targets []jobspb.ChangefeedTargetSpecification,
ts hlc.Timestamp,
) ([]roachpb.Span, error) {
var spans []roachpb.Span
Expand All @@ -133,10 +133,10 @@ func fetchSpansForTargets(
return err
}
// Note that all targets are currently guaranteed to be tables.
for tableID := range targets {
for _, table := range targets {
flags := tree.ObjectLookupFlagsWithRequired()
flags.AvoidLeased = true
tableDesc, err := descriptors.GetImmutableTableByID(ctx, txn, tableID, flags)
tableDesc, err := descriptors.GetImmutableTableByID(ctx, txn, table.TableID, flags)
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func newChangeAggregatorProcessor(
}

var err error
if ca.encoder, err = getEncoder(ca.spec.Feed.Opts, ca.spec.Feed.Targets); err != nil {
if ca.encoder, err = getEncoder(ca.spec.Feed.Opts, AllTargets(ca.spec.Feed)); err != nil {
return nil, err
}

Expand Down Expand Up @@ -345,7 +345,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
if schemaChangePolicy == changefeedbase.OptSchemaChangePolicyIgnore {
sf = schemafeed.DoNothingSchemaFeed
} else {
sf = schemafeed.New(ctx, cfg, schemaChangeEvents, ca.spec.Feed.Targets,
sf = schemafeed.New(ctx, cfg, schemaChangeEvents, AllTargets(ca.spec.Feed),
initialHighWater, &ca.metrics.SchemaFeedMetrics)
}

Expand All @@ -358,7 +358,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
Gossip: cfg.Gossip,
Spans: spans,
BackfillCheckpoint: ca.spec.Checkpoint.Spans,
Targets: ca.spec.Feed.Targets,
Targets: AllTargets(ca.spec.Feed),
Metrics: &ca.metrics.KVFeedMetrics,
OnBackfillCallback: ca.sliMetrics.getBackfillCallback(),
MM: ca.kvFeedMemMon,
Expand Down Expand Up @@ -1085,7 +1085,7 @@ func newChangeFrontierProcessor(
cf.freqEmitResolved = emitNoResolved
}

if cf.encoder, err = getEncoder(spec.Feed.Opts, spec.Feed.Targets); err != nil {
if cf.encoder, err = getEncoder(spec.Feed.Opts, AllTargets(spec.Feed)); err != nil {
return nil, err
}

Expand Down Expand Up @@ -1454,7 +1454,7 @@ func (cf *changeFrontier) manageProtectedTimestamps(

recordID := progress.ProtectedTimestampRecord
if recordID == uuid.Nil {
ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, cf.spec.Feed.Targets, highWater, progress)
ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), highWater, progress)
if err := pts.Protect(ctx, txn, ptr); err != nil {
return err
}
Expand Down
66 changes: 49 additions & 17 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,16 +181,17 @@ func changefeedPlanHook(
return err
}

targets, err := getTargets(ctx, p, targetDescs, opts)
targets, tables, err := getTargetsAndTables(ctx, p, targetDescs, opts)
if err != nil {
return err
}

details := jobspb.ChangefeedDetails{
Targets: targets,
Opts: opts,
SinkURI: sinkURI,
StatementTime: statementTime,
Tables: tables,
Opts: opts,
SinkURI: sinkURI,
StatementTime: statementTime,
TargetSpecifications: targets,
}
progress := jobspb.Progress{
Progress: &jobspb.Progress_HighWater{},
Expand Down Expand Up @@ -243,7 +244,7 @@ func changefeedPlanHook(
return err
}

if _, err := getEncoder(details.Opts, details.Targets); err != nil {
if _, err := getEncoder(details.Opts, AllTargets(details)); err != nil {
return err
}

Expand All @@ -269,7 +270,7 @@ func changefeedPlanHook(
}
telemetry.Count(`changefeed.create.sink.` + telemetrySink)
telemetry.Count(`changefeed.create.format.` + details.Opts[changefeedbase.OptFormat])
telemetry.CountBucketed(`changefeed.create.num_tables`, int64(len(targets)))
telemetry.CountBucketed(`changefeed.create.num_tables`, int64(len(tables)))

if scope, ok := opts[changefeedbase.OptMetricsScope]; ok {
if err := utilccl.CheckEnterpriseEnabled(
Expand Down Expand Up @@ -332,7 +333,7 @@ func changefeedPlanHook(
var protectedTimestampID uuid.UUID
codec := p.ExecCfg().Codec
if shouldProtectTimestamps(codec) {
ptr = createProtectedTimestampRecord(ctx, codec, jobID, details.Targets, statementTime, progress.GetChangefeed())
ptr = createProtectedTimestampRecord(ctx, codec, jobID, AllTargets(details), statementTime, progress.GetChangefeed())
protectedTimestampID = ptr.ID.GetUUID()
}

Expand Down Expand Up @@ -470,35 +471,41 @@ func getTableDescriptors(
return targetDescs, err
}

func getTargets(
func getTargetsAndTables(
ctx context.Context,
p sql.PlanHookState,
targetDescs []catalog.Descriptor,
opts map[string]string,
) (jobspb.ChangefeedTargets, error) {
targets := make(jobspb.ChangefeedTargets, len(targetDescs))
) ([]jobspb.ChangefeedTargetSpecification, jobspb.ChangefeedTargets, error) {
tables := make(jobspb.ChangefeedTargets, len(targetDescs))
targets := make([]jobspb.ChangefeedTargetSpecification, len(targetDescs))
for _, desc := range targetDescs {
if table, isTable := desc.(catalog.TableDescriptor); isTable {
if err := p.CheckPrivilege(ctx, desc, privilege.SELECT); err != nil {
return nil, err
return nil, nil, err
}
_, qualified := opts[changefeedbase.OptFullTableName]
name, err := getChangefeedTargetName(ctx, table, p.ExecCfg(), p.ExtendedEvalContext().Txn, qualified)
if err != nil {
return nil, err
return nil, nil, err
}
targets[table.GetID()] = jobspb.ChangefeedTarget{
tables[table.GetID()] = jobspb.ChangefeedTargetTable{
StatementTimeName: name,
}
ts := jobspb.ChangefeedTargetSpecification{
Type: jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY,
TableID: table.GetID(),
}
targets = append(targets, ts)
if err := changefeedbase.ValidateTable(targets, table); err != nil {
return nil, err
return nil, nil, err
}
for _, warning := range changefeedbase.WarningsForTable(targets, table, opts) {
for _, warning := range changefeedbase.WarningsForTable(tables, table, opts) {
p.BufferClientNotice(ctx, pgnotice.Newf("%s", warning))
}
}
}
return targets, nil
return targets, tables, nil
}

func validateSink(
Expand Down Expand Up @@ -953,3 +960,28 @@ func getChangefeedTargetName(
}
return desc.GetName(), nil
}

// AllTargets gets all the targets listed in a ChangefeedDetails,
// from the statement time name map in old protos
// or the TargetSpecifications in new ones.
func AllTargets(cd jobspb.ChangefeedDetails) (targets []jobspb.ChangefeedTargetSpecification) {
//TODO: Use a version gate for this once we have CDC version gates
if len(cd.TargetSpecifications) > 0 {
for _, ts := range cd.TargetSpecifications {
if ts.TableID > 0 {
ts.StatementTimeName = cd.Tables[ts.TableID].StatementTimeName
targets = append(targets, ts)
}
}
} else {
for id, t := range cd.Tables {
ct := jobspb.ChangefeedTargetSpecification{
Type: jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY,
TableID: id,
StatementTimeName: t.StatementTimeName,
}
targets = append(targets, ct)
}
}
return
}
16 changes: 13 additions & 3 deletions pkg/ccl/changefeedccl/changefeedbase/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,19 @@ import (
)

// ValidateTable validates that a table descriptor can be watched by a CHANGEFEED.
func ValidateTable(targets jobspb.ChangefeedTargets, tableDesc catalog.TableDescriptor) error {
t, ok := targets[tableDesc.GetID()]
if !ok {
func ValidateTable(
targets []jobspb.ChangefeedTargetSpecification, tableDesc catalog.TableDescriptor,
) error {
var t jobspb.ChangefeedTargetSpecification
var found bool
for _, cts := range targets {
if cts.TableID == tableDesc.GetID() {
t = cts
found = true
break
}
}
if !found {
return errors.Errorf(`unwatched table: %s`, tableDesc.GetName())
}

Expand Down
Loading