Skip to content

Commit

Permalink
cdc: refactor to replace map of changefeed targets with array
Browse files Browse the repository at this point in the history
Necessary intermediate step towards changefeeds on things other than full tables.
Previously, changefeed targets were serialized as a map of TableID->StatementTimeName.
This keeps that for now for compatibility but adds in a list of target specifications
that's more extendable, and changes the API for everything that will eventually need
to care (for example, an encoder will need to know that it's encoding a column family).

Still needs a test with the old-style payload.

Release note: None
  • Loading branch information
HonoreDB committed Jan 28, 2022
1 parent 8cd2808 commit 6b2d95f
Show file tree
Hide file tree
Showing 20 changed files with 176 additions and 102 deletions.
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,15 @@ func createBenchmarkChangefeed(
tableDesc := catalogkv.TestingGetTableDescriptor(s.DB(), keys.SystemSQLCodec, database, table)
spans := []roachpb.Span{tableDesc.PrimaryIndexSpan(keys.SystemSQLCodec)}
details := jobspb.ChangefeedDetails{
Targets: jobspb.ChangefeedTargets{tableDesc.GetID(): jobspb.ChangefeedTarget{
Tables: jobspb.ChangefeedTargets{tableDesc.GetID(): jobspb.ChangefeedTargetTable{
StatementTimeName: tableDesc.GetName(),
}},
Opts: map[string]string{
changefeedbase.OptEnvelope: string(changefeedbase.OptEnvelopeRow),
},
}
initialHighWater := hlc.Timestamp{}
encoder, err := makeJSONEncoder(details.Opts, details.Targets)
encoder, err := makeJSONEncoder(details.Opts, AllTargets(details))
if err != nil {
return nil, nil, err
}
Expand All @@ -228,7 +228,7 @@ func createBenchmarkChangefeed(
Clock: feedClock,
Gossip: gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)),
Spans: spans,
Targets: details.Targets,
Targets: AllTargets(details),
Writer: buf,
Metrics: &metrics.KVFeedMetrics,
MM: mm,
Expand Down
16 changes: 9 additions & 7 deletions pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func createProtectedTimestampRecord(
pts protectedts.Storage,
txn *kv.Txn,
jobID jobspb.JobID,
targets jobspb.ChangefeedTargets,
targets []jobspb.ChangefeedTargetSpecification,
resolved hlc.Timestamp,
progress *jobspb.ChangefeedProgress,
) error {
Expand All @@ -74,19 +74,21 @@ func createProtectedTimestampRecord(
return pts.Protect(ctx, txn, rec)
}

func makeTargetToProtect(targets jobspb.ChangefeedTargets) *ptpb.Target {
func makeTargetToProtect(targets []jobspb.ChangefeedTargetSpecification) *ptpb.Target {
// NB: We add 1 because we're also going to protect system.descriptors.
// We protect system.descriptors because a changefeed needs all of the history
// of table descriptors to version data.
tablesToProtect := make(descpb.IDs, 0, len(targets)+1)
for t := range targets {
tablesToProtect = append(tablesToProtect, t)
for _, t := range targets {
tablesToProtect = append(tablesToProtect, t.TableID)
}
tablesToProtect = append(tablesToProtect, keys.DescriptorTableID)
return ptpb.MakeSchemaObjectsTarget(tablesToProtect)
}

func makeSpansToProtect(codec keys.SQLCodec, targets jobspb.ChangefeedTargets) []roachpb.Span {
func makeSpansToProtect(
codec keys.SQLCodec, targets []jobspb.ChangefeedTargetSpecification,
) []roachpb.Span {
// NB: We add 1 because we're also going to protect system.descriptors.
// We protect system.descriptors because a changefeed needs all of the history
// of table descriptors to version data.
Expand All @@ -98,8 +100,8 @@ func makeSpansToProtect(codec keys.SQLCodec, targets jobspb.ChangefeedTargets) [
EndKey: tablePrefix.PrefixEnd(),
})
}
for t := range targets {
addTablePrefix(uint32(t))
for _, t := range targets {
addTablePrefix(uint32(t.TableID))
}
addTablePrefix(keys.DescriptorTableID)
return spansToProtect
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func distChangefeedFlow(
spansTS = spansTS.Next()
}
var err error
trackedSpans, err = fetchSpansForTargets(ctx, execCfg, details.Targets, spansTS)
trackedSpans, err = fetchSpansForTargets(ctx, execCfg, AllTargets(details), spansTS)
if err != nil {
return err
}
Expand All @@ -121,7 +121,7 @@ func distChangefeedFlow(
func fetchSpansForTargets(
ctx context.Context,
execCfg *sql.ExecutorConfig,
targets jobspb.ChangefeedTargets,
targets []jobspb.ChangefeedTargetSpecification,
ts hlc.Timestamp,
) ([]roachpb.Span, error) {
var spans []roachpb.Span
Expand All @@ -133,10 +133,10 @@ func fetchSpansForTargets(
return err
}
// Note that all targets are currently guaranteed to be tables.
for tableID := range targets {
for _, table := range targets {
flags := tree.ObjectLookupFlagsWithRequired()
flags.AvoidLeased = true
tableDesc, err := descriptors.GetImmutableTableByID(ctx, txn, tableID, flags)
tableDesc, err := descriptors.GetImmutableTableByID(ctx, txn, table.TableID, flags)
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func newChangeAggregatorProcessor(
}

var err error
if ca.encoder, err = getEncoder(ca.spec.Feed.Opts, ca.spec.Feed.Targets); err != nil {
if ca.encoder, err = getEncoder(ca.spec.Feed.Opts, AllTargets(ca.spec.Feed)); err != nil {
return nil, err
}

Expand Down Expand Up @@ -346,7 +346,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
if schemaChangePolicy == changefeedbase.OptSchemaChangePolicyIgnore {
sf = schemafeed.DoNothingSchemaFeed
} else {
sf = schemafeed.New(ctx, cfg, schemaChangeEvents, ca.spec.Feed.Targets,
sf = schemafeed.New(ctx, cfg, schemaChangeEvents, AllTargets(ca.spec.Feed),
initialHighWater, &ca.metrics.SchemaFeedMetrics)
}

Expand All @@ -359,7 +359,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
Gossip: cfg.Gossip,
Spans: spans,
BackfillCheckpoint: ca.spec.Checkpoint.Spans,
Targets: ca.spec.Feed.Targets,
Targets: AllTargets(ca.spec.Feed),
Metrics: &ca.metrics.KVFeedMetrics,
OnBackfillCallback: ca.sliMetrics.getBackfillCallback(),
MM: ca.kvFeedMemMon,
Expand Down Expand Up @@ -1081,7 +1081,7 @@ func newChangeFrontierProcessor(
cf.freqEmitResolved = emitNoResolved
}

if cf.encoder, err = getEncoder(spec.Feed.Opts, spec.Feed.Targets); err != nil {
if cf.encoder, err = getEncoder(spec.Feed.Opts, AllTargets(spec.Feed)); err != nil {
return nil, err
}

Expand Down Expand Up @@ -1515,7 +1515,7 @@ func (cf *changeFrontier) maybeProtectTimestamp(
}

jobID := cf.spec.JobID
targets := cf.spec.Feed.Targets
targets := AllTargets(cf.spec.Feed)
return createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), pts, txn, jobID, targets, resolved, progress)
}

Expand Down
55 changes: 43 additions & 12 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ func changefeedPlanHook(
return err
}

targets := make(jobspb.ChangefeedTargets, len(targetDescs))
tables := make(jobspb.ChangefeedTargets, len(targetDescs))
targets := make([]jobspb.ChangefeedTargetSpecification, len(targetDescs))
for _, desc := range targetDescs {
if table, isTable := desc.(catalog.TableDescriptor); isTable {
if err := p.CheckPrivilege(ctx, desc, privilege.SELECT); err != nil {
Expand All @@ -244,23 +245,29 @@ func changefeedPlanHook(
if err != nil {
return err
}
targets[table.GetID()] = jobspb.ChangefeedTarget{
tables[table.GetID()] = jobspb.ChangefeedTargetTable{
StatementTimeName: name,
}
ts := jobspb.ChangefeedTargetSpecification{
Type: jobspb.ChangefeedTargetSpecification_FULL_TABLE,
TableID: table.GetID(),
}
targets = append(targets, ts)
if err := changefeedbase.ValidateTable(targets, table); err != nil {
return err
}
for _, warning := range changefeedbase.WarningsForTable(targets, table, opts) {
for _, warning := range changefeedbase.WarningsForTable(tables, table, opts) {
p.BufferClientNotice(ctx, pgnotice.Newf("%s", warning))
}
}
}

details := jobspb.ChangefeedDetails{
Targets: targets,
Opts: opts,
SinkURI: sinkURI,
StatementTime: statementTime,
Tables: tables,
Opts: opts,
SinkURI: sinkURI,
StatementTime: statementTime,
TargetSpecifications: targets,
}
progress := jobspb.Progress{
Progress: &jobspb.Progress_HighWater{},
Expand Down Expand Up @@ -313,7 +320,7 @@ func changefeedPlanHook(
return err
}

if _, err := getEncoder(details.Opts, details.Targets); err != nil {
if _, err := getEncoder(details.Opts, AllTargets(details)); err != nil {
return err
}

Expand All @@ -339,7 +346,7 @@ func changefeedPlanHook(
}
telemetry.Count(`changefeed.create.sink.` + telemetrySink)
telemetry.Count(`changefeed.create.format.` + details.Opts[changefeedbase.OptFormat])
telemetry.CountBucketed(`changefeed.create.num_tables`, int64(len(targets)))
telemetry.CountBucketed(`changefeed.create.num_tables`, int64(len(tables)))

if scope, ok := opts[changefeedbase.OptMetricsScope]; ok {
if err := utilccl.CheckEnterpriseEnabled(
Expand Down Expand Up @@ -418,8 +425,8 @@ func changefeedPlanHook(
shouldProtectTimestamp := initialScanFromOptions(details.Opts) && p.ExecCfg().Codec.ForSystemTenant()
if shouldProtectTimestamp {
protectedTimestampID = uuid.MakeV4()
deprecatedSpansToProtect := makeSpansToProtect(p.ExecCfg().Codec, details.Targets)
targetToProtect := makeTargetToProtect(details.Targets)
deprecatedSpansToProtect := makeSpansToProtect(p.ExecCfg().Codec, AllTargets(details))
targetToProtect := makeTargetToProtect(AllTargets(details))
progress.GetChangefeed().ProtectedTimestampRecord = protectedTimestampID
ptr = jobsprotectedts.MakeRecord(protectedTimestampID, int64(jobID), statementTime,
deprecatedSpansToProtect, jobsprotectedts.Jobs, targetToProtect)
Expand Down Expand Up @@ -880,7 +887,7 @@ func (b *changefeedResumer) OnPauseRequest(
execCfg := jobExec.(sql.JobExecContext).ExecCfg()
pts := execCfg.ProtectedTimestampProvider
return createProtectedTimestampRecord(ctx, execCfg.Codec, pts, txn, b.job.ID(),
details.Targets, *resolved, cp)
AllTargets(details), *resolved, cp)
}

// getQualifiedTableName returns the database-qualified name of the table
Expand Down Expand Up @@ -918,3 +925,27 @@ func getChangefeedTargetName(
}
return desc.GetName(), nil
}

// AllTargets gets all the targets listed in a ChangefeedDetails,
// from the statement time name map in old protos
// or the TargetSpecifications in new ones.
func AllTargets(cd jobspb.ChangefeedDetails) (targets []jobspb.ChangefeedTargetSpecification) {
if len(cd.TargetSpecifications) > 0 {
for _, ts := range cd.TargetSpecifications {
if ts.TableID > 0 {
ts.StatementTimeName = cd.Tables[ts.TableID].StatementTimeName
targets = append(targets, ts)
}
}
} else {
for id, t := range cd.Tables {
ct := jobspb.ChangefeedTargetSpecification{
Type: jobspb.ChangefeedTargetSpecification_FULL_TABLE,
TableID: id,
StatementTimeName: t.StatementTimeName,
}
targets = append(targets, ct)
}
}
return
}
16 changes: 13 additions & 3 deletions pkg/ccl/changefeedccl/changefeedbase/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,19 @@ import (
)

// ValidateTable validates that a table descriptor can be watched by a CHANGEFEED.
func ValidateTable(targets jobspb.ChangefeedTargets, tableDesc catalog.TableDescriptor) error {
t, ok := targets[tableDesc.GetID()]
if !ok {
func ValidateTable(
targets []jobspb.ChangefeedTargetSpecification, tableDesc catalog.TableDescriptor,
) error {
var t jobspb.ChangefeedTargetSpecification
var found bool
for _, cts := range targets {
if cts.TableID == tableDesc.GetID() {
t = cts
found = true
break
}
}
if !found {
return errors.Errorf(`unwatched table: %s`, tableDesc.GetName())
}

Expand Down
23 changes: 13 additions & 10 deletions pkg/ccl/changefeedccl/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ type Encoder interface {
EncodeResolvedTimestamp(context.Context, string, hlc.Timestamp) ([]byte, error)
}

func getEncoder(opts map[string]string, targets jobspb.ChangefeedTargets) (Encoder, error) {
func getEncoder(
opts map[string]string, targets []jobspb.ChangefeedTargetSpecification,
) (Encoder, error) {
switch changefeedbase.FormatType(opts[changefeedbase.OptFormat]) {
case ``, changefeedbase.OptFormatJSON:
return makeJSONEncoder(opts, targets)
Expand All @@ -102,7 +104,7 @@ func getEncoder(opts map[string]string, targets jobspb.ChangefeedTargets) (Encod
type jsonEncoder struct {
updatedField, mvccTimestampField, beforeField, wrapped, keyOnly, keyInValue, topicInValue bool

targets jobspb.ChangefeedTargets
targets []jobspb.ChangefeedTargetSpecification
alloc tree.DatumAlloc
buf bytes.Buffer
virtualColumnVisibility string
Expand All @@ -111,7 +113,7 @@ type jsonEncoder struct {
var _ Encoder = &jsonEncoder{}

func makeJSONEncoder(
opts map[string]string, targets jobspb.ChangefeedTargets,
opts map[string]string, targets []jobspb.ChangefeedTargetSpecification,
) (*jsonEncoder, error) {
e := &jsonEncoder{
targets: targets,
Expand Down Expand Up @@ -184,12 +186,13 @@ func (e *jsonEncoder) encodeKeyRaw(row encodeRow) ([]interface{}, error) {
func (e *jsonEncoder) encodeTopicRaw(row encodeRow) (interface{}, error) {
descID := row.tableDesc.GetID()
// use the target list since row.tableDesc.GetName() will not have fully qualified names
topicName, ok := e.targets[descID]
if !ok {
return nil, fmt.Errorf("table with name %s and descriptor ID %d not found in changefeed target list",
row.tableDesc.GetName(), descID)
for _, topic := range e.targets {
if topic.TableID == descID {
return topic.StatementTimeName, nil
}
}
return topicName.StatementTimeName, nil
return nil, fmt.Errorf("table with name %s and descriptor ID %d not found in changefeed target list",
row.tableDesc.GetName(), descID)
}

// EncodeValue implements the Encoder interface.
Expand Down Expand Up @@ -328,8 +331,8 @@ type confluentAvroEncoder struct {
schemaRegistry schemaRegistry
schemaPrefix string
updatedField, beforeField, keyOnly bool
targets jobspb.ChangefeedTargets
virtualColumnVisibility string
targets []jobspb.ChangefeedTargetSpecification

keyCache *cache.UnorderedCache // [tableIDAndVersion]confluentRegisteredKeySchema
valueCache *cache.UnorderedCache // [tableIDAndVersionPair]confluentRegisteredEnvelopeSchema
Expand Down Expand Up @@ -367,7 +370,7 @@ var encoderCacheConfig = cache.Config{
}

func newConfluentAvroEncoder(
opts map[string]string, targets jobspb.ChangefeedTargets,
opts map[string]string, targets []jobspb.ChangefeedTargetSpecification,
) (*confluentAvroEncoder, error) {
e := &confluentAvroEncoder{
schemaPrefix: opts[changefeedbase.OptAvroSchemaPrefix],
Expand Down
14 changes: 8 additions & 6 deletions pkg/ccl/changefeedccl/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,12 @@ func TestEncoders(t *testing.T) {
t.Fatalf(`unknown format: %s`, o[changefeedbase.OptFormat])
}

target := jobspb.ChangefeedTarget{
target := jobspb.ChangefeedTargetSpecification{
Type: jobspb.ChangefeedTargetSpecification_FULL_TABLE,
TableID: tableDesc.GetID(),
StatementTimeName: tableDesc.GetName(),
}
targets := jobspb.ChangefeedTargets{}
targets[tableDesc.GetID()] = target
targets := []jobspb.ChangefeedTargetSpecification{target}

e, err := getEncoder(o, targets)
if len(expected.err) > 0 {
Expand Down Expand Up @@ -360,11 +361,12 @@ func TestAvroEncoderWithTLS(t *testing.T) {
return string(avroToJSON(t, reg, r))
}

target := jobspb.ChangefeedTarget{
target := jobspb.ChangefeedTargetSpecification{
Type: jobspb.ChangefeedTargetSpecification_FULL_TABLE,
TableID: tableDesc.GetID(),
StatementTimeName: tableDesc.GetName(),
}
targets := jobspb.ChangefeedTargets{}
targets[tableDesc.GetID()] = target
targets := []jobspb.ChangefeedTargetSpecification{target}

e, err := getEncoder(opts, targets)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Config struct {
Gossip gossip.OptionalGossip
Spans []roachpb.Span
BackfillCheckpoint []roachpb.Span
Targets jobspb.ChangefeedTargets
Targets []jobspb.ChangefeedTargetSpecification
Writer kvevent.Writer
Metrics *kvevent.Metrics
OnBackfillCallback func() func()
Expand Down
Loading

0 comments on commit 6b2d95f

Please sign in to comment.