Skip to content

Commit

Permalink
cdc: refactor to replace map of changefeed targets with array
Browse files Browse the repository at this point in the history
Necessary intermediate step towards changefeeds on things other than full tables.
Previously, changefeed targets were serialized as a map of TableID->StatementTimeName.
This keeps that for now for compatibility but adds in a list of target specifications
that's more extendable, and changes the API for everything that will eventually need
to care (for example, an encoder will need to know that it's encoding a column family).

Still needs a test with the old-style payload.

Release note: None
  • Loading branch information
HonoreDB committed Feb 21, 2022
1 parent c875359 commit 11594ba
Show file tree
Hide file tree
Showing 21 changed files with 206 additions and 115 deletions.
25 changes: 18 additions & 7 deletions pkg/ccl/changefeedccl/alter_changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,17 @@ func alterChangefeedPlanHook(
targetDescs = append(targetDescs, descs...)
}

newTargets, err := getTargets(ctx, p, targetDescs, details.Opts)
newTargets, newTables, err := getTargetsAndTables(ctx, p, targetDescs, details.Opts)
if err != nil {
return err
}
// add old targets
for id, target := range details.Targets {
newTargets[id] = target
for id, table := range details.Tables {
newTables[id] = table
}
details.Targets = newTargets
details.Tables = newTables
details.TargetSpecifications = append(details.TargetSpecifications, newTargets...)

}

if opts.DropTargets != nil {
Expand All @@ -129,12 +131,21 @@ func alterChangefeedPlanHook(
if err := p.CheckPrivilege(ctx, desc, privilege.SELECT); err != nil {
return err
}
delete(details.Targets, table.GetID())
delete(details.Tables, table.GetID())
}
}

newTargetSpecifications := make([]jobspb.ChangefeedTargetSpecification, len(details.TargetSpecifications)-len(opts.DropTargets))
for _, ts := range details.TargetSpecifications {
if _, ok := details.Tables[ts.TableID]; ok {
newTargetSpecifications = append(newTargetSpecifications, ts)
}
}
details.TargetSpecifications = newTargetSpecifications

}

if len(details.Targets) == 0 {
if len(details.Tables) == 0 {
return errors.Errorf("cannot drop all targets for changefeed job %d", jobID)
}

Expand All @@ -152,7 +163,7 @@ func alterChangefeedPlanHook(
}

var targets tree.TargetList
for _, target := range details.Targets {
for _, target := range details.Tables {
targetName := tree.MakeTableNameFromPrefix(tree.ObjectNamePrefix{}, tree.Name(target.StatementTimeName))
targets.Tables = append(targets.Tables, &targetName)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,15 @@ func createBenchmarkChangefeed(
tableDesc := desctestutils.TestingGetPublicTableDescriptor(s.DB(), keys.SystemSQLCodec, database, table)
spans := []roachpb.Span{tableDesc.PrimaryIndexSpan(keys.SystemSQLCodec)}
details := jobspb.ChangefeedDetails{
Targets: jobspb.ChangefeedTargets{tableDesc.GetID(): jobspb.ChangefeedTarget{
Tables: jobspb.ChangefeedTargets{tableDesc.GetID(): jobspb.ChangefeedTargetTable{
StatementTimeName: tableDesc.GetName(),
}},
Opts: map[string]string{
changefeedbase.OptEnvelope: string(changefeedbase.OptEnvelopeRow),
},
}
initialHighWater := hlc.Timestamp{}
encoder, err := makeJSONEncoder(details.Opts, details.Targets)
encoder, err := makeJSONEncoder(details.Opts, AllTargets(details))
if err != nil {
return nil, nil, err
}
Expand All @@ -228,7 +228,7 @@ func createBenchmarkChangefeed(
Clock: feedClock,
Gossip: gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)),
Spans: spans,
Targets: details.Targets,
Targets: AllTargets(details),
Writer: buf,
Metrics: &metrics.KVFeedMetrics,
MM: mm,
Expand Down
16 changes: 9 additions & 7 deletions pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func createProtectedTimestampRecord(
ctx context.Context,
codec keys.SQLCodec,
jobID jobspb.JobID,
targets jobspb.ChangefeedTargets,
targets []jobspb.ChangefeedTargetSpecification,
resolved hlc.Timestamp,
progress *jobspb.ChangefeedProgress,
) *ptpb.Record {
Expand All @@ -69,19 +69,21 @@ func createProtectedTimestampRecord(
jobsprotectedts.Jobs, targetToProtect)
}

func makeTargetToProtect(targets jobspb.ChangefeedTargets) *ptpb.Target {
func makeTargetToProtect(targets []jobspb.ChangefeedTargetSpecification) *ptpb.Target {
// NB: We add 1 because we're also going to protect system.descriptors.
// We protect system.descriptors because a changefeed needs all of the history
// of table descriptors to version data.
tablesToProtect := make(descpb.IDs, 0, len(targets)+1)
for t := range targets {
tablesToProtect = append(tablesToProtect, t)
for _, t := range targets {
tablesToProtect = append(tablesToProtect, t.TableID)
}
tablesToProtect = append(tablesToProtect, keys.DescriptorTableID)
return ptpb.MakeSchemaObjectsTarget(tablesToProtect)
}

func makeSpansToProtect(codec keys.SQLCodec, targets jobspb.ChangefeedTargets) []roachpb.Span {
func makeSpansToProtect(
codec keys.SQLCodec, targets []jobspb.ChangefeedTargetSpecification,
) []roachpb.Span {
// NB: We add 1 because we're also going to protect system.descriptors.
// We protect system.descriptors because a changefeed needs all of the history
// of table descriptors to version data.
Expand All @@ -93,8 +95,8 @@ func makeSpansToProtect(codec keys.SQLCodec, targets jobspb.ChangefeedTargets) [
EndKey: tablePrefix.PrefixEnd(),
})
}
for t := range targets {
addTablePrefix(uint32(t))
for _, t := range targets {
addTablePrefix(uint32(t.TableID))
}
addTablePrefix(keys.DescriptorTableID)
return spansToProtect
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func distChangefeedFlow(
spansTS = spansTS.Next()
}
var err error
trackedSpans, err = fetchSpansForTargets(ctx, execCfg, details.Targets, spansTS)
trackedSpans, err = fetchSpansForTargets(ctx, execCfg, AllTargets(details), spansTS)
if err != nil {
return err
}
Expand All @@ -121,7 +121,7 @@ func distChangefeedFlow(
func fetchSpansForTargets(
ctx context.Context,
execCfg *sql.ExecutorConfig,
targets jobspb.ChangefeedTargets,
targets []jobspb.ChangefeedTargetSpecification,
ts hlc.Timestamp,
) ([]roachpb.Span, error) {
var spans []roachpb.Span
Expand All @@ -133,10 +133,10 @@ func fetchSpansForTargets(
return err
}
// Note that all targets are currently guaranteed to be tables.
for tableID := range targets {
for _, table := range targets {
flags := tree.ObjectLookupFlagsWithRequired()
flags.AvoidLeased = true
tableDesc, err := descriptors.GetImmutableTableByID(ctx, txn, tableID, flags)
tableDesc, err := descriptors.GetImmutableTableByID(ctx, txn, table.TableID, flags)
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func newChangeAggregatorProcessor(
}

var err error
if ca.encoder, err = getEncoder(ca.spec.Feed.Opts, ca.spec.Feed.Targets); err != nil {
if ca.encoder, err = getEncoder(ca.spec.Feed.Opts, AllTargets(ca.spec.Feed)); err != nil {
return nil, err
}

Expand Down Expand Up @@ -345,7 +345,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
if schemaChangePolicy == changefeedbase.OptSchemaChangePolicyIgnore {
sf = schemafeed.DoNothingSchemaFeed
} else {
sf = schemafeed.New(ctx, cfg, schemaChangeEvents, ca.spec.Feed.Targets,
sf = schemafeed.New(ctx, cfg, schemaChangeEvents, AllTargets(ca.spec.Feed),
initialHighWater, &ca.metrics.SchemaFeedMetrics)
}

Expand All @@ -358,7 +358,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
Gossip: cfg.Gossip,
Spans: spans,
BackfillCheckpoint: ca.spec.Checkpoint.Spans,
Targets: ca.spec.Feed.Targets,
Targets: AllTargets(ca.spec.Feed),
Metrics: &ca.metrics.KVFeedMetrics,
OnBackfillCallback: ca.sliMetrics.getBackfillCallback(),
MM: ca.kvFeedMemMon,
Expand Down Expand Up @@ -1085,7 +1085,7 @@ func newChangeFrontierProcessor(
cf.freqEmitResolved = emitNoResolved
}

if cf.encoder, err = getEncoder(spec.Feed.Opts, spec.Feed.Targets); err != nil {
if cf.encoder, err = getEncoder(spec.Feed.Opts, AllTargets(spec.Feed)); err != nil {
return nil, err
}

Expand Down Expand Up @@ -1454,7 +1454,7 @@ func (cf *changeFrontier) manageProtectedTimestamps(

recordID := progress.ProtectedTimestampRecord
if recordID == uuid.Nil {
ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, cf.spec.Feed.Targets, highWater, progress)
ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), highWater, progress)
if err := pts.Protect(ctx, txn, ptr); err != nil {
return err
}
Expand Down
65 changes: 48 additions & 17 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,16 +181,17 @@ func changefeedPlanHook(
return err
}

targets, err := getTargets(ctx, p, targetDescs, opts)
targets, tables, err := getTargetsAndTables(ctx, p, targetDescs, opts)
if err != nil {
return err
}

details := jobspb.ChangefeedDetails{
Targets: targets,
Opts: opts,
SinkURI: sinkURI,
StatementTime: statementTime,
Tables: tables,
Opts: opts,
SinkURI: sinkURI,
StatementTime: statementTime,
TargetSpecifications: targets,
}
progress := jobspb.Progress{
Progress: &jobspb.Progress_HighWater{},
Expand Down Expand Up @@ -243,7 +244,7 @@ func changefeedPlanHook(
return err
}

if _, err := getEncoder(details.Opts, details.Targets); err != nil {
if _, err := getEncoder(details.Opts, AllTargets(details)); err != nil {
return err
}

Expand All @@ -269,7 +270,7 @@ func changefeedPlanHook(
}
telemetry.Count(`changefeed.create.sink.` + telemetrySink)
telemetry.Count(`changefeed.create.format.` + details.Opts[changefeedbase.OptFormat])
telemetry.CountBucketed(`changefeed.create.num_tables`, int64(len(targets)))
telemetry.CountBucketed(`changefeed.create.num_tables`, int64(len(tables)))

if scope, ok := opts[changefeedbase.OptMetricsScope]; ok {
if err := utilccl.CheckEnterpriseEnabled(
Expand Down Expand Up @@ -332,7 +333,7 @@ func changefeedPlanHook(
var protectedTimestampID uuid.UUID
codec := p.ExecCfg().Codec
if shouldProtectTimestamps(codec) {
ptr = createProtectedTimestampRecord(ctx, codec, jobID, details.Targets, statementTime, progress.GetChangefeed())
ptr = createProtectedTimestampRecord(ctx, codec, jobID, AllTargets(details), statementTime, progress.GetChangefeed())
protectedTimestampID = ptr.ID.GetUUID()
}

Expand Down Expand Up @@ -470,35 +471,41 @@ func getTableDescriptors(
return targetDescs, err
}

func getTargets(
func getTargetsAndTables(
ctx context.Context,
p sql.PlanHookState,
targetDescs []catalog.Descriptor,
opts map[string]string,
) (jobspb.ChangefeedTargets, error) {
targets := make(jobspb.ChangefeedTargets, len(targetDescs))
) ([]jobspb.ChangefeedTargetSpecification, jobspb.ChangefeedTargets, error) {
tables := make(jobspb.ChangefeedTargets, len(targetDescs))
targets := make([]jobspb.ChangefeedTargetSpecification, len(targetDescs))
for _, desc := range targetDescs {
if table, isTable := desc.(catalog.TableDescriptor); isTable {
if err := p.CheckPrivilege(ctx, desc, privilege.SELECT); err != nil {
return nil, err
return nil, nil, err
}
_, qualified := opts[changefeedbase.OptFullTableName]
name, err := getChangefeedTargetName(ctx, table, p.ExecCfg(), p.ExtendedEvalContext().Txn, qualified)
if err != nil {
return nil, err
return nil, nil, err
}
targets[table.GetID()] = jobspb.ChangefeedTarget{
tables[table.GetID()] = jobspb.ChangefeedTargetTable{
StatementTimeName: name,
}
ts := jobspb.ChangefeedTargetSpecification{
Type: jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY,
TableID: table.GetID(),
}
targets = append(targets, ts)
if err := changefeedbase.ValidateTable(targets, table); err != nil {
return nil, err
return nil, nil, err
}
for _, warning := range changefeedbase.WarningsForTable(targets, table, opts) {
for _, warning := range changefeedbase.WarningsForTable(tables, table, opts) {
p.BufferClientNotice(ctx, pgnotice.Newf("%s", warning))
}
}
}
return targets, nil
return targets, tables, nil
}

func validateSink(
Expand Down Expand Up @@ -953,3 +960,27 @@ func getChangefeedTargetName(
}
return desc.GetName(), nil
}

// AllTargets gets all the targets listed in a ChangefeedDetails,
// from the statement time name map in old protos
// or the TargetSpecifications in new ones.
func AllTargets(cd jobspb.ChangefeedDetails) (targets []jobspb.ChangefeedTargetSpecification) {
if len(cd.TargetSpecifications) > 0 {
for _, ts := range cd.TargetSpecifications {
if ts.TableID > 0 {
ts.StatementTimeName = cd.Tables[ts.TableID].StatementTimeName
targets = append(targets, ts)
}
}
} else {
for id, t := range cd.Tables {
ct := jobspb.ChangefeedTargetSpecification{
Type: jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY,
TableID: id,
StatementTimeName: t.StatementTimeName,
}
targets = append(targets, ct)
}
}
return
}
16 changes: 13 additions & 3 deletions pkg/ccl/changefeedccl/changefeedbase/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,19 @@ import (
)

// ValidateTable validates that a table descriptor can be watched by a CHANGEFEED.
func ValidateTable(targets jobspb.ChangefeedTargets, tableDesc catalog.TableDescriptor) error {
t, ok := targets[tableDesc.GetID()]
if !ok {
func ValidateTable(
targets []jobspb.ChangefeedTargetSpecification, tableDesc catalog.TableDescriptor,
) error {
var t jobspb.ChangefeedTargetSpecification
var found bool
for _, cts := range targets {
if cts.TableID == tableDesc.GetID() {
t = cts
found = true
break
}
}
if !found {
return errors.Errorf(`unwatched table: %s`, tableDesc.GetName())
}

Expand Down
Loading

0 comments on commit 11594ba

Please sign in to comment.