Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: Various changes and fixes #81105

Merged
merged 4 commits into from
May 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func TestAlterChangefeedPersistSinkURI(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

bucket, accessKey, secretKey := checkS3Credentials(t)
const unredactedSinkURI = "null://blah?AWS_ACCESS_KEY_ID=the_secret"

params, _ := tests.CreateTestServerParams()
s, rawSQLDB, _ := serverutils.StartServer(t, params)
Expand Down Expand Up @@ -475,9 +475,7 @@ func TestAlterChangefeedPersistSinkURI(t *testing.T) {
},
}

query = fmt.Sprintf(`CREATE CHANGEFEED FOR TABLE foo, bar INTO
's3://%s/fake/path?AWS_ACCESS_KEY_ID=%s&AWS_SECRET_ACCESS_KEY=%s'`, bucket, accessKey, secretKey)
sqlDB.QueryRow(t, query).Scan(&changefeedID)
sqlDB.QueryRow(t, `CREATE CHANGEFEED FOR TABLE foo, bar INTO $1`, unredactedSinkURI).Scan(&changefeedID)

sqlDB.Exec(t, `PAUSE JOB $1`, changefeedID)
waitForJobStatus(sqlDB, t, changefeedID, `paused`)
Expand All @@ -492,16 +490,13 @@ func TestAlterChangefeedPersistSinkURI(t *testing.T) {
details, ok := job.Details().(jobspb.ChangefeedDetails)
require.True(t, ok)

require.Equal(t, details.SinkURI,
fmt.Sprintf(`s3://%s/fake/path?AWS_ACCESS_KEY_ID=%s&AWS_SECRET_ACCESS_KEY=%s`, bucket, accessKey, secretKey))
require.Equal(t, unredactedSinkURI, details.SinkURI)
}

func TestAlterChangefeedChangeSinkTypeError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

bucket, accessKey, secretKey := checkS3Credentials(t)

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
Expand All @@ -516,8 +511,8 @@ func TestAlterChangefeedChangeSinkTypeError(t *testing.T) {
waitForJobStatus(sqlDB, t, feed.JobID(), `paused`)

sqlDB.ExpectErr(t,
`pq: New sink type "s3" does not match original sink type "kafka". Altering the sink type of a changefeed is disallowed, consider creating a new changefeed instead.`,
fmt.Sprintf(`ALTER CHANGEFEED %d SET sink = 's3://%s/fake/path?AWS_ACCESS_KEY_ID=%s&AWS_SECRET_ACCESS_KEY=%s'`, feed.JobID(), bucket, accessKey, secretKey),
`pq: New sink type "null" does not match original sink type "kafka". Altering the sink type of a changefeed is disallowed, consider creating a new changefeed instead.`,
fmt.Sprintf(`ALTER CHANGEFEED %d SET sink = 'null://'`, feed.JobID()),
)
}

Expand Down
178 changes: 7 additions & 171 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
Expand Down Expand Up @@ -87,7 +85,7 @@ type changeAggregator struct {
// eventProducer produces the next event from the kv feed.
eventProducer kvevent.Reader
// eventConsumer consumes the event.
eventConsumer kvEventConsumer
eventConsumer *kvEventToRowConsumer

// lastFlush and flushFrequency keep track of the flush frequency.
lastFlush time.Time
Expand Down Expand Up @@ -294,21 +292,17 @@ func (ca *changeAggregator) Start(ctx context.Context) {
kvFeedHighWater = ca.spec.Feed.StatementTime
}

ca.eventProducer, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, endTime, ca.sliMetrics)
ca.eventProducer, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, endTime)
if err != nil {
// Early abort in the case that there is an error creating the sink.
ca.MoveToDraining(err)
ca.cancel()
return
}

if ca.spec.Feed.Opts[changefeedbase.OptFormat] == string(changefeedbase.OptFormatNative) {
ca.eventConsumer = newNativeKVConsumer(ca.sink)
} else {
ca.eventConsumer = newKVEventToRowConsumer(
ctx, ca.flowCtx.Cfg, ca.frontier.SpanFrontier(), kvFeedHighWater,
ca.sink, ca.encoder, ca.spec.Feed, ca.knobs, ca.topicNamer)
}
ca.eventConsumer = newKVEventToRowConsumer(
ctx, ca.flowCtx.Cfg, ca.frontier.SpanFrontier(), kvFeedHighWater,
ca.sink, ca.encoder, ca.spec.Feed, ca.knobs, ca.topicNamer)
}

func (ca *changeAggregator) startKVFeed(
Expand All @@ -317,7 +311,6 @@ func (ca *changeAggregator) startKVFeed(
initialHighWater hlc.Timestamp,
needsInitialScan bool,
endTime hlc.Timestamp,
sm *sliMetrics,
) (kvevent.Reader, error) {
cfg := ca.flowCtx.Cfg
buf := kvevent.NewThrottlingBuffer(
Expand All @@ -326,7 +319,7 @@ func (ca *changeAggregator) startKVFeed(

// KVFeed takes ownership of the kvevent.Writer portion of the buffer, while
// we return the kvevent.Reader part to the caller.
kvfeedCfg := ca.makeKVFeedCfg(ctx, spans, buf, initialHighWater, needsInitialScan, endTime, sm)
kvfeedCfg := ca.makeKVFeedCfg(ctx, spans, buf, initialHighWater, needsInitialScan, endTime)

// Give errCh enough buffer both possible errors from supporting goroutines,
// but only the first one is ever used.
Expand Down Expand Up @@ -358,7 +351,6 @@ func (ca *changeAggregator) makeKVFeedCfg(
initialHighWater hlc.Timestamp,
needsInitialScan bool,
endTime hlc.Timestamp,
sm *sliMetrics,
) kvfeed.Config {
schemaChangeEvents := changefeedbase.SchemaChangeEventClass(
ca.spec.Feed.Opts[changefeedbase.OptSchemaChangeEvents])
Expand Down Expand Up @@ -657,11 +649,6 @@ func (ca *changeAggregator) ConsumerClosed() {
ca.close()
}

type kvEventConsumer interface {
// ConsumeEvent responsible for consuming kv event.
ConsumeEvent(ctx context.Context, event kvevent.Event) error
}

type kvEventToRowConsumer struct {
frontier *span.Frontier
encoder Encoder
Expand All @@ -676,8 +663,6 @@ type kvEventToRowConsumer struct {
topicNamer *TopicNamer
}

var _ kvEventConsumer = &kvEventToRowConsumer{}

func newKVEventToRowConsumer(
ctx context.Context,
cfg *execinfra.ServerConfig,
Expand All @@ -688,7 +673,7 @@ func newKVEventToRowConsumer(
details jobspb.ChangefeedDetails,
knobs TestingKnobs,
topicNamer *TopicNamer,
) kvEventConsumer {
) *kvEventToRowConsumer {
rfCache := newRowFetcherCache(
ctx,
cfg.Codec,
Expand All @@ -711,129 +696,6 @@ func newKVEventToRowConsumer(
}
}

type tableDescriptorTopic struct {
tableDesc catalog.TableDescriptor
spec jobspb.ChangefeedTargetSpecification
nameComponentsCache []string
identifierCache TopicIdentifier
}

// GetNameComponents implements the TopicDescriptor interface
func (tdt *tableDescriptorTopic) GetNameComponents() []string {
if len(tdt.nameComponentsCache) == 0 {
tdt.nameComponentsCache = []string{tdt.spec.StatementTimeName}
}
return tdt.nameComponentsCache
}

// GetTopicIdentifier implements the TopicDescriptor interface
func (tdt *tableDescriptorTopic) GetTopicIdentifier() TopicIdentifier {
if tdt.identifierCache.TableID == 0 {
tdt.identifierCache = TopicIdentifier{
TableID: tdt.tableDesc.GetID(),
}
}
return tdt.identifierCache
}

// GetVersion implements the TopicDescriptor interface
func (tdt *tableDescriptorTopic) GetVersion() descpb.DescriptorVersion {
return tdt.tableDesc.GetVersion()
}

// GetTargetSpecification implements the TopicDescriptor interface
func (tdt *tableDescriptorTopic) GetTargetSpecification() jobspb.ChangefeedTargetSpecification {
return tdt.spec
}

var _ TopicDescriptor = &tableDescriptorTopic{}

type columnFamilyTopic struct {
tableDesc catalog.TableDescriptor
familyDesc descpb.ColumnFamilyDescriptor
spec jobspb.ChangefeedTargetSpecification
nameComponentsCache []string
identifierCache TopicIdentifier
}

// GetNameComponents implements the TopicDescriptor interface
func (cft *columnFamilyTopic) GetNameComponents() []string {
if len(cft.nameComponentsCache) == 0 {
cft.nameComponentsCache = []string{
cft.spec.StatementTimeName,
cft.familyDesc.Name,
}
}
return cft.nameComponentsCache
}

// GetTopicIdentifier implements the TopicDescriptor interface
func (cft *columnFamilyTopic) GetTopicIdentifier() TopicIdentifier {
if cft.identifierCache.TableID == 0 {
cft.identifierCache = TopicIdentifier{
TableID: cft.tableDesc.GetID(),
FamilyID: cft.familyDesc.ID,
}
}
return cft.identifierCache
}

// GetVersion implements the TopicDescriptor interface
func (cft *columnFamilyTopic) GetVersion() descpb.DescriptorVersion {
return cft.tableDesc.GetVersion()
}

// GetTargetSpecification implements the TopicDescriptor interface
func (cft *columnFamilyTopic) GetTargetSpecification() jobspb.ChangefeedTargetSpecification {
return cft.spec
}

var _ TopicDescriptor = &columnFamilyTopic{}

type noTopic struct{}

func (n noTopic) GetNameComponents() []string {
return []string{}
}

func (n noTopic) GetTopicIdentifier() TopicIdentifier {
return TopicIdentifier{}
}

func (n noTopic) GetVersion() descpb.DescriptorVersion {
return 0
}

func (n noTopic) GetTargetSpecification() jobspb.ChangefeedTargetSpecification {
return jobspb.ChangefeedTargetSpecification{}
}

var _ TopicDescriptor = &noTopic{}

func makeTopicDescriptorFromSpecForRow(
s jobspb.ChangefeedTargetSpecification, r encodeRow,
) (TopicDescriptor, error) {
switch s.Type {
case jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY:
return &tableDescriptorTopic{
tableDesc: r.tableDesc,
spec: s,
}, nil
case jobspb.ChangefeedTargetSpecification_EACH_FAMILY, jobspb.ChangefeedTargetSpecification_COLUMN_FAMILY:
familyDesc, err := r.tableDesc.FindFamilyByID(r.familyID)
if err != nil {
return noTopic{}, err
}
return &columnFamilyTopic{
tableDesc: r.tableDesc,
spec: s,
familyDesc: *familyDesc,
}, nil
default:
return noTopic{}, errors.AssertionFailedf("Unsupported target type %s", s.Type)
}
}

func (c *kvEventToRowConsumer) topicForRow(r encodeRow) (TopicDescriptor, error) {
if topic, ok := c.topicDescriptorCache[TopicIdentifier{TableID: r.tableDesc.GetID(), FamilyID: r.familyID}]; ok {
if topic.GetVersion() == r.tableDesc.GetVersion() {
Expand Down Expand Up @@ -1043,32 +905,6 @@ func (c *kvEventToRowConsumer) eventToRow(
return r, nil
}

type nativeKVConsumer struct {
sink Sink
}

var _ kvEventConsumer = &nativeKVConsumer{}

func newNativeKVConsumer(sink Sink) kvEventConsumer {
return &nativeKVConsumer{sink: sink}
}

// ConsumeEvent implements kvEventConsumer interface.
func (c *nativeKVConsumer) ConsumeEvent(ctx context.Context, ev kvevent.Event) error {
if ev.Type() != kvevent.TypeKV {
return errors.AssertionFailedf("expected kv ev, got %v", ev.Type())
}
keyBytes := []byte(ev.KV().Key)
val := ev.KV().Value
valBytes, err := protoutil.Marshal(&val)
if err != nil {
return err
}

return c.sink.EmitRow(
ctx, &noTopic{}, keyBytes, valBytes, val.Timestamp, val.Timestamp, ev.DetachAlloc())
}

const (
emitAllResolved = 0
emitNoResolved = -1
Expand Down
Loading