diff --git a/pkg/ccl/changefeedccl/alter_changefeed_test.go b/pkg/ccl/changefeedccl/alter_changefeed_test.go index 2a3b18566b29..4d12390f4bf3 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_test.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_test.go @@ -444,7 +444,7 @@ func TestAlterChangefeedPersistSinkURI(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - bucket, accessKey, secretKey := checkS3Credentials(t) + const unredactedSinkURI = "null://blah?AWS_ACCESS_KEY_ID=the_secret" params, _ := tests.CreateTestServerParams() s, rawSQLDB, _ := serverutils.StartServer(t, params) @@ -475,9 +475,7 @@ func TestAlterChangefeedPersistSinkURI(t *testing.T) { }, } - query = fmt.Sprintf(`CREATE CHANGEFEED FOR TABLE foo, bar INTO - 's3://%s/fake/path?AWS_ACCESS_KEY_ID=%s&AWS_SECRET_ACCESS_KEY=%s'`, bucket, accessKey, secretKey) - sqlDB.QueryRow(t, query).Scan(&changefeedID) + sqlDB.QueryRow(t, `CREATE CHANGEFEED FOR TABLE foo, bar INTO $1`, unredactedSinkURI).Scan(&changefeedID) sqlDB.Exec(t, `PAUSE JOB $1`, changefeedID) waitForJobStatus(sqlDB, t, changefeedID, `paused`) @@ -492,16 +490,13 @@ func TestAlterChangefeedPersistSinkURI(t *testing.T) { details, ok := job.Details().(jobspb.ChangefeedDetails) require.True(t, ok) - require.Equal(t, details.SinkURI, - fmt.Sprintf(`s3://%s/fake/path?AWS_ACCESS_KEY_ID=%s&AWS_SECRET_ACCESS_KEY=%s`, bucket, accessKey, secretKey)) + require.Equal(t, unredactedSinkURI, details.SinkURI) } func TestAlterChangefeedChangeSinkTypeError(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - bucket, accessKey, secretKey := checkS3Credentials(t) - testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(db) sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) @@ -516,8 +511,8 @@ func TestAlterChangefeedChangeSinkTypeError(t *testing.T) { waitForJobStatus(sqlDB, t, feed.JobID(), `paused`) sqlDB.ExpectErr(t, - `pq: New sink type "s3" does not match original sink type "kafka". Altering the sink type of a changefeed is disallowed, consider creating a new changefeed instead.`, - fmt.Sprintf(`ALTER CHANGEFEED %d SET sink = 's3://%s/fake/path?AWS_ACCESS_KEY_ID=%s&AWS_SECRET_ACCESS_KEY=%s'`, feed.JobID(), bucket, accessKey, secretKey), + `pq: New sink type "null" does not match original sink type "kafka". Altering the sink type of a changefeed is disallowed, consider creating a new changefeed instead.`, + fmt.Sprintf(`ALTER CHANGEFEED %d SET sink = 'null://'`, feed.JobID()), ) } diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index f8b950f9094b..49584cccb1a3 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -28,8 +28,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/sql/catalog" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -87,7 +85,7 @@ type changeAggregator struct { // eventProducer produces the next event from the kv feed. eventProducer kvevent.Reader // eventConsumer consumes the event. - eventConsumer kvEventConsumer + eventConsumer *kvEventToRowConsumer // lastFlush and flushFrequency keep track of the flush frequency. lastFlush time.Time @@ -294,7 +292,7 @@ func (ca *changeAggregator) Start(ctx context.Context) { kvFeedHighWater = ca.spec.Feed.StatementTime } - ca.eventProducer, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, endTime, ca.sliMetrics) + ca.eventProducer, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, endTime) if err != nil { // Early abort in the case that there is an error creating the sink. ca.MoveToDraining(err) @@ -302,13 +300,9 @@ func (ca *changeAggregator) Start(ctx context.Context) { return } - if ca.spec.Feed.Opts[changefeedbase.OptFormat] == string(changefeedbase.OptFormatNative) { - ca.eventConsumer = newNativeKVConsumer(ca.sink) - } else { - ca.eventConsumer = newKVEventToRowConsumer( - ctx, ca.flowCtx.Cfg, ca.frontier.SpanFrontier(), kvFeedHighWater, - ca.sink, ca.encoder, ca.spec.Feed, ca.knobs, ca.topicNamer) - } + ca.eventConsumer = newKVEventToRowConsumer( + ctx, ca.flowCtx.Cfg, ca.frontier.SpanFrontier(), kvFeedHighWater, + ca.sink, ca.encoder, ca.spec.Feed, ca.knobs, ca.topicNamer) } func (ca *changeAggregator) startKVFeed( @@ -317,7 +311,6 @@ func (ca *changeAggregator) startKVFeed( initialHighWater hlc.Timestamp, needsInitialScan bool, endTime hlc.Timestamp, - sm *sliMetrics, ) (kvevent.Reader, error) { cfg := ca.flowCtx.Cfg buf := kvevent.NewThrottlingBuffer( @@ -326,7 +319,7 @@ func (ca *changeAggregator) startKVFeed( // KVFeed takes ownership of the kvevent.Writer portion of the buffer, while // we return the kvevent.Reader part to the caller. - kvfeedCfg := ca.makeKVFeedCfg(ctx, spans, buf, initialHighWater, needsInitialScan, endTime, sm) + kvfeedCfg := ca.makeKVFeedCfg(ctx, spans, buf, initialHighWater, needsInitialScan, endTime) // Give errCh enough buffer both possible errors from supporting goroutines, // but only the first one is ever used. @@ -358,7 +351,6 @@ func (ca *changeAggregator) makeKVFeedCfg( initialHighWater hlc.Timestamp, needsInitialScan bool, endTime hlc.Timestamp, - sm *sliMetrics, ) kvfeed.Config { schemaChangeEvents := changefeedbase.SchemaChangeEventClass( ca.spec.Feed.Opts[changefeedbase.OptSchemaChangeEvents]) @@ -657,11 +649,6 @@ func (ca *changeAggregator) ConsumerClosed() { ca.close() } -type kvEventConsumer interface { - // ConsumeEvent responsible for consuming kv event. - ConsumeEvent(ctx context.Context, event kvevent.Event) error -} - type kvEventToRowConsumer struct { frontier *span.Frontier encoder Encoder @@ -676,8 +663,6 @@ type kvEventToRowConsumer struct { topicNamer *TopicNamer } -var _ kvEventConsumer = &kvEventToRowConsumer{} - func newKVEventToRowConsumer( ctx context.Context, cfg *execinfra.ServerConfig, @@ -688,7 +673,7 @@ func newKVEventToRowConsumer( details jobspb.ChangefeedDetails, knobs TestingKnobs, topicNamer *TopicNamer, -) kvEventConsumer { +) *kvEventToRowConsumer { rfCache := newRowFetcherCache( ctx, cfg.Codec, @@ -711,129 +696,6 @@ func newKVEventToRowConsumer( } } -type tableDescriptorTopic struct { - tableDesc catalog.TableDescriptor - spec jobspb.ChangefeedTargetSpecification - nameComponentsCache []string - identifierCache TopicIdentifier -} - -// GetNameComponents implements the TopicDescriptor interface -func (tdt *tableDescriptorTopic) GetNameComponents() []string { - if len(tdt.nameComponentsCache) == 0 { - tdt.nameComponentsCache = []string{tdt.spec.StatementTimeName} - } - return tdt.nameComponentsCache -} - -// GetTopicIdentifier implements the TopicDescriptor interface -func (tdt *tableDescriptorTopic) GetTopicIdentifier() TopicIdentifier { - if tdt.identifierCache.TableID == 0 { - tdt.identifierCache = TopicIdentifier{ - TableID: tdt.tableDesc.GetID(), - } - } - return tdt.identifierCache -} - -// GetVersion implements the TopicDescriptor interface -func (tdt *tableDescriptorTopic) GetVersion() descpb.DescriptorVersion { - return tdt.tableDesc.GetVersion() -} - -// GetTargetSpecification implements the TopicDescriptor interface -func (tdt *tableDescriptorTopic) GetTargetSpecification() jobspb.ChangefeedTargetSpecification { - return tdt.spec -} - -var _ TopicDescriptor = &tableDescriptorTopic{} - -type columnFamilyTopic struct { - tableDesc catalog.TableDescriptor - familyDesc descpb.ColumnFamilyDescriptor - spec jobspb.ChangefeedTargetSpecification - nameComponentsCache []string - identifierCache TopicIdentifier -} - -// GetNameComponents implements the TopicDescriptor interface -func (cft *columnFamilyTopic) GetNameComponents() []string { - if len(cft.nameComponentsCache) == 0 { - cft.nameComponentsCache = []string{ - cft.spec.StatementTimeName, - cft.familyDesc.Name, - } - } - return cft.nameComponentsCache -} - -// GetTopicIdentifier implements the TopicDescriptor interface -func (cft *columnFamilyTopic) GetTopicIdentifier() TopicIdentifier { - if cft.identifierCache.TableID == 0 { - cft.identifierCache = TopicIdentifier{ - TableID: cft.tableDesc.GetID(), - FamilyID: cft.familyDesc.ID, - } - } - return cft.identifierCache -} - -// GetVersion implements the TopicDescriptor interface -func (cft *columnFamilyTopic) GetVersion() descpb.DescriptorVersion { - return cft.tableDesc.GetVersion() -} - -// GetTargetSpecification implements the TopicDescriptor interface -func (cft *columnFamilyTopic) GetTargetSpecification() jobspb.ChangefeedTargetSpecification { - return cft.spec -} - -var _ TopicDescriptor = &columnFamilyTopic{} - -type noTopic struct{} - -func (n noTopic) GetNameComponents() []string { - return []string{} -} - -func (n noTopic) GetTopicIdentifier() TopicIdentifier { - return TopicIdentifier{} -} - -func (n noTopic) GetVersion() descpb.DescriptorVersion { - return 0 -} - -func (n noTopic) GetTargetSpecification() jobspb.ChangefeedTargetSpecification { - return jobspb.ChangefeedTargetSpecification{} -} - -var _ TopicDescriptor = &noTopic{} - -func makeTopicDescriptorFromSpecForRow( - s jobspb.ChangefeedTargetSpecification, r encodeRow, -) (TopicDescriptor, error) { - switch s.Type { - case jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY: - return &tableDescriptorTopic{ - tableDesc: r.tableDesc, - spec: s, - }, nil - case jobspb.ChangefeedTargetSpecification_EACH_FAMILY, jobspb.ChangefeedTargetSpecification_COLUMN_FAMILY: - familyDesc, err := r.tableDesc.FindFamilyByID(r.familyID) - if err != nil { - return noTopic{}, err - } - return &columnFamilyTopic{ - tableDesc: r.tableDesc, - spec: s, - familyDesc: *familyDesc, - }, nil - default: - return noTopic{}, errors.AssertionFailedf("Unsupported target type %s", s.Type) - } -} - func (c *kvEventToRowConsumer) topicForRow(r encodeRow) (TopicDescriptor, error) { if topic, ok := c.topicDescriptorCache[TopicIdentifier{TableID: r.tableDesc.GetID(), FamilyID: r.familyID}]; ok { if topic.GetVersion() == r.tableDesc.GetVersion() { @@ -1043,32 +905,6 @@ func (c *kvEventToRowConsumer) eventToRow( return r, nil } -type nativeKVConsumer struct { - sink Sink -} - -var _ kvEventConsumer = &nativeKVConsumer{} - -func newNativeKVConsumer(sink Sink) kvEventConsumer { - return &nativeKVConsumer{sink: sink} -} - -// ConsumeEvent implements kvEventConsumer interface. -func (c *nativeKVConsumer) ConsumeEvent(ctx context.Context, ev kvevent.Event) error { - if ev.Type() != kvevent.TypeKV { - return errors.AssertionFailedf("expected kv ev, got %v", ev.Type()) - } - keyBytes := []byte(ev.KV().Key) - val := ev.KV().Value - valBytes, err := protoutil.Marshal(&val) - if err != nil { - return err - } - - return c.sink.EmitRow( - ctx, &noTopic{}, keyBytes, valBytes, val.Timestamp, val.Timestamp, ev.DetachAlloc()) -} - const ( emitAllResolved = 0 emitNoResolved = -1 diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 62fdbe7ccdcc..95c9b22034fd 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -5831,39 +5831,35 @@ func TestChangefeedEndTime(t *testing.T) { defer log.Scope(t).Close(t) testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { + knobs := f.Server().TestingKnobs(). + DistSQL.(*execinfra.TestingKnobs). + Changefeed.(*TestingKnobs) + endTimeReached := make(chan struct{}) + knobs.FeedKnobs.EndTimeReached = func() bool { + select { + case <-endTimeReached: + return true + default: + return false + } + } + sqlDB := sqlutils.MakeSQLRunner(db) sqlDB.Exec(t, "CREATE TABLE foo (a INT PRIMARY KEY)") sqlDB.Exec(t, "INSERT INTO foo VALUES (1), (2), (3)") - var tsAfterInitialInsert string - sqlDB.QueryRow(t, "SELECT (cluster_logical_timestamp() + 10000000000)").Scan(&tsAfterInitialInsert) - feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH end_time = $1", tsAfterInitialInsert) - - time.Sleep(10 * time.Second) - sqlDB.Exec(t, "INSERT INTO foo VALUES (4), (5), (6)") + fakeEndTime := f.Server().Clock().Now().Add(int64(time.Hour), 0).AsOfSystemTime() + feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH end_time = $1", fakeEndTime) + defer closeFeed(t, feed) - seenMoreMessages := false - g := ctxgroup.WithContext(context.Background()) - g.Go(func() error { - assertPayloads(t, feed, []string{ - `foo: [1]->{"after": {"a": 1}}`, - `foo: [2]->{"after": {"a": 2}}`, - `foo: [3]->{"after": {"a": 3}}`, - }) - for { - _, err := feed.Next() - if err != nil { - return err - } - seenMoreMessages = true - } + assertPayloads(t, feed, []string{ + `foo: [1]->{"after": {"a": 1}}`, + `foo: [2]->{"after": {"a": 2}}`, + `foo: [3]->{"after": {"a": 3}}`, }) - defer func() { - closeFeed(t, feed) - _ = g.Wait() - require.False(t, seenMoreMessages) - }() + + close(endTimeReached) testFeed := feed.(cdctest.EnterpriseTestFeed) require.NoError(t, testFeed.WaitForStatus(func(s jobs.Status) bool { @@ -5880,6 +5876,19 @@ func TestChangefeedEndTimeWithCursor(t *testing.T) { defer log.Scope(t).Close(t) testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { + knobs := f.Server().TestingKnobs(). + DistSQL.(*execinfra.TestingKnobs). + Changefeed.(*TestingKnobs) + endTimeReached := make(chan struct{}) + knobs.FeedKnobs.EndTimeReached = func() bool { + select { + case <-endTimeReached: + return true + default: + return false + } + } + sqlDB := sqlutils.MakeSQLRunner(db) sqlDB.Exec(t, "CREATE TABLE foo (a INT PRIMARY KEY)") @@ -5889,34 +5898,16 @@ func TestChangefeedEndTimeWithCursor(t *testing.T) { sqlDB.QueryRow(t, "SELECT (cluster_logical_timestamp())").Scan(&tsCursor) sqlDB.Exec(t, "INSERT INTO foo VALUES (4), (5), (6)") - var tsEndTime string - sqlDB.QueryRow(t, "SELECT (cluster_logical_timestamp() + 10000000000)").Scan(&tsEndTime) - feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH cursor = $1, end_time = $2, no_initial_scan", tsCursor, tsEndTime) - - time.Sleep(10 * time.Second) - sqlDB.Exec(t, "INSERT INTO foo VALUES (7), (8), (9)") + fakeEndTime := f.Server().Clock().Now().Add(int64(time.Hour), 0).AsOfSystemTime() + feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH cursor = $1, end_time = $2, no_initial_scan", tsCursor, fakeEndTime) + defer closeFeed(t, feed) - seenMoreMessages := false - g := ctxgroup.WithContext(context.Background()) - g.Go(func() error { - assertPayloads(t, feed, []string{ - `foo: [4]->{"after": {"a": 4}}`, - `foo: [5]->{"after": {"a": 5}}`, - `foo: [6]->{"after": {"a": 6}}`, - }) - for { - _, err := feed.Next() - if err != nil { - return err - } - seenMoreMessages = true - } + assertPayloads(t, feed, []string{ + `foo: [4]->{"after": {"a": 4}}`, + `foo: [5]->{"after": {"a": 5}}`, + `foo: [6]->{"after": {"a": 6}}`, }) - defer func() { - closeFeed(t, feed) - _ = g.Wait() - require.False(t, seenMoreMessages) - }() + close(endTimeReached) testFeed := feed.(cdctest.EnterpriseTestFeed) require.NoError(t, testFeed.WaitForStatus(func(s jobs.Status) bool { diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 1cafed810c75..e969f9cb3273 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -121,8 +121,6 @@ const ( OptFormatAvro FormatType = `avro` OptFormatCSV FormatType = `csv` - OptFormatNative FormatType = `native` - OptOnErrorFail OnErrorType = `fail` OptOnErrorPause OnErrorType = `pause` diff --git a/pkg/ccl/changefeedccl/encoder.go b/pkg/ccl/changefeedccl/encoder.go index 5c046c59d445..22299c43a00d 100644 --- a/pkg/ccl/changefeedccl/encoder.go +++ b/pkg/ccl/changefeedccl/encoder.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/errors" ) @@ -83,35 +82,9 @@ func getEncoder( return makeJSONEncoder(opts, targets) case changefeedbase.OptFormatAvro, changefeedbase.DeprecatedOptFormatAvro: return newConfluentAvroEncoder(opts, targets) - case changefeedbase.OptFormatNative: - return &nativeEncoder{}, nil case changefeedbase.OptFormatCSV: return newCSVEncoder(opts), nil default: return nil, errors.Errorf(`unknown %s: %s`, changefeedbase.OptFormat, opts[changefeedbase.OptFormat]) } } - -// nativeEncoder only implements EncodeResolvedTimestamp. -// Unfortunately, the encoder assumes that it operates with encodeRow -- something -// that's just not the case when emitting raw KVs. -// In addition, there is a kafka specific concept (topic) that's exposed at the Encoder level. -// TODO(yevgeniy): Refactor encoder interface so that it operates on kvfeed events. -// In addition, decouple the concept of topic from the Encoder. -type nativeEncoder struct{} - -func (e *nativeEncoder) EncodeKey(ctx context.Context, row encodeRow) ([]byte, error) { - return nil, errors.New("EncodeKey unexpectedly called on nativeEncoder") -} - -func (e *nativeEncoder) EncodeValue(ctx context.Context, row encodeRow) ([]byte, error) { - return nil, errors.New("EncodeValue unexpectedly called on nativeEncoder") -} - -func (e *nativeEncoder) EncodeResolvedTimestamp( - ctx context.Context, s string, ts hlc.Timestamp, -) ([]byte, error) { - return protoutil.Marshal(&ts) -} - -var _ Encoder = &nativeEncoder{} diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 2d43827161d1..e50d3730e413 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -445,7 +445,7 @@ func (f *kvFeed) runUntilTableEvent( } g.GoCtx(func(ctx context.Context) error { - return copyFromSourceToDestUntilTableEvent(ctx, f.writer, memBuf, resumeFrontier, f.tableFeed, f.endTime) + return copyFromSourceToDestUntilTableEvent(ctx, f.writer, memBuf, resumeFrontier, f.tableFeed, f.endTime, f.knobs) }) g.GoCtx(func(ctx context.Context) error { return f.physicalFeed.Run(ctx, memBuf, physicalCfg) @@ -524,6 +524,7 @@ func copyFromSourceToDestUntilTableEvent( frontier *span.Frontier, tables schemafeed.SchemaFeed, endTime hlc.Timestamp, + knobs TestingKnobs, ) error { var ( scanBoundary errBoundaryReached @@ -557,6 +558,9 @@ func copyFromSourceToDestUntilTableEvent( if scanBoundary == nil { return false, false, nil } + if knobs.EndTimeReached != nil && knobs.EndTimeReached() { + return true, true, nil + } if e.Timestamp().Less(scanBoundary.Timestamp()) { return false, false, nil } diff --git a/pkg/ccl/changefeedccl/kvfeed/testing_knobs.go b/pkg/ccl/changefeedccl/kvfeed/testing_knobs.go index 4c795f6ed0d5..1d6a7e5eceb0 100644 --- a/pkg/ccl/changefeedccl/kvfeed/testing_knobs.go +++ b/pkg/ccl/changefeedccl/kvfeed/testing_knobs.go @@ -26,6 +26,9 @@ type TestingKnobs struct { // OnRangeFeedStart invoked when rangefeed starts. It is given // the list of SpanTimePairs. OnRangeFeedStart func(spans []kvcoord.SpanTimePair) + // EndTimeReached is a callback that may return true to indicate the + // feed should exit because its end time has been reached. + EndTimeReached func() bool } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/ccl/changefeedccl/topic.go b/pkg/ccl/changefeedccl/topic.go index a44cdceff027..373c3dce4cee 100644 --- a/pkg/ccl/changefeedccl/topic.go +++ b/pkg/ccl/changefeedccl/topic.go @@ -12,6 +12,7 @@ import ( "strings" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/errors" ) @@ -222,3 +223,126 @@ func (tn *TopicNamer) nameFromComponents(components ...string) string { return str } + +type tableDescriptorTopic struct { + tableDesc catalog.TableDescriptor + spec jobspb.ChangefeedTargetSpecification + nameComponentsCache []string + identifierCache TopicIdentifier +} + +// GetNameComponents implements the TopicDescriptor interface +func (tdt *tableDescriptorTopic) GetNameComponents() []string { + if len(tdt.nameComponentsCache) == 0 { + tdt.nameComponentsCache = []string{tdt.spec.StatementTimeName} + } + return tdt.nameComponentsCache +} + +// GetTopicIdentifier implements the TopicDescriptor interface +func (tdt *tableDescriptorTopic) GetTopicIdentifier() TopicIdentifier { + if tdt.identifierCache.TableID == 0 { + tdt.identifierCache = TopicIdentifier{ + TableID: tdt.tableDesc.GetID(), + } + } + return tdt.identifierCache +} + +// GetVersion implements the TopicDescriptor interface +func (tdt *tableDescriptorTopic) GetVersion() descpb.DescriptorVersion { + return tdt.tableDesc.GetVersion() +} + +// GetTargetSpecification implements the TopicDescriptor interface +func (tdt *tableDescriptorTopic) GetTargetSpecification() jobspb.ChangefeedTargetSpecification { + return tdt.spec +} + +var _ TopicDescriptor = &tableDescriptorTopic{} + +type columnFamilyTopic struct { + tableDesc catalog.TableDescriptor + familyDesc descpb.ColumnFamilyDescriptor + spec jobspb.ChangefeedTargetSpecification + nameComponentsCache []string + identifierCache TopicIdentifier +} + +// GetNameComponents implements the TopicDescriptor interface +func (cft *columnFamilyTopic) GetNameComponents() []string { + if len(cft.nameComponentsCache) == 0 { + cft.nameComponentsCache = []string{ + cft.spec.StatementTimeName, + cft.familyDesc.Name, + } + } + return cft.nameComponentsCache +} + +// GetTopicIdentifier implements the TopicDescriptor interface +func (cft *columnFamilyTopic) GetTopicIdentifier() TopicIdentifier { + if cft.identifierCache.TableID == 0 { + cft.identifierCache = TopicIdentifier{ + TableID: cft.tableDesc.GetID(), + FamilyID: cft.familyDesc.ID, + } + } + return cft.identifierCache +} + +// GetVersion implements the TopicDescriptor interface +func (cft *columnFamilyTopic) GetVersion() descpb.DescriptorVersion { + return cft.tableDesc.GetVersion() +} + +// GetTargetSpecification implements the TopicDescriptor interface +func (cft *columnFamilyTopic) GetTargetSpecification() jobspb.ChangefeedTargetSpecification { + return cft.spec +} + +var _ TopicDescriptor = &columnFamilyTopic{} + +type noTopic struct{} + +func (n noTopic) GetNameComponents() []string { + return []string{} +} + +func (n noTopic) GetTopicIdentifier() TopicIdentifier { + return TopicIdentifier{} +} + +func (n noTopic) GetVersion() descpb.DescriptorVersion { + return 0 +} + +func (n noTopic) GetTargetSpecification() jobspb.ChangefeedTargetSpecification { + return jobspb.ChangefeedTargetSpecification{} +} + +var _ TopicDescriptor = &noTopic{} + +func makeTopicDescriptorFromSpecForRow( + s jobspb.ChangefeedTargetSpecification, r encodeRow, +) (TopicDescriptor, error) { + switch s.Type { + case jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY: + return &tableDescriptorTopic{ + tableDesc: r.tableDesc, + spec: s, + }, nil + case jobspb.ChangefeedTargetSpecification_EACH_FAMILY, jobspb.ChangefeedTargetSpecification_COLUMN_FAMILY: + familyDesc, err := r.tableDesc.FindFamilyByID(r.familyID) + if err != nil { + return noTopic{}, err + } + return &columnFamilyTopic{ + tableDesc: r.tableDesc, + spec: s, + familyDesc: *familyDesc, + }, nil + default: + return noTopic{}, errors.AssertionFailedf("Unsupported target type %s", s.Type) + } +}