Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
81105: changefeedccl: Various changes and fixes r=miretskiy a=miretskiy

See commits for details.

Release Notes: None

Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
craig[bot] and Yevgeniy Miretskiy committed May 16, 2022
2 parents 4fd43e7 + c9cb49c commit a30f5a3
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 263 deletions.
15 changes: 5 additions & 10 deletions pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func TestAlterChangefeedPersistSinkURI(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

bucket, accessKey, secretKey := checkS3Credentials(t)
const unredactedSinkURI = "null://blah?AWS_ACCESS_KEY_ID=the_secret"

params, _ := tests.CreateTestServerParams()
s, rawSQLDB, _ := serverutils.StartServer(t, params)
Expand Down Expand Up @@ -475,9 +475,7 @@ func TestAlterChangefeedPersistSinkURI(t *testing.T) {
},
}

query = fmt.Sprintf(`CREATE CHANGEFEED FOR TABLE foo, bar INTO
's3://%s/fake/path?AWS_ACCESS_KEY_ID=%s&AWS_SECRET_ACCESS_KEY=%s'`, bucket, accessKey, secretKey)
sqlDB.QueryRow(t, query).Scan(&changefeedID)
sqlDB.QueryRow(t, `CREATE CHANGEFEED FOR TABLE foo, bar INTO $1`, unredactedSinkURI).Scan(&changefeedID)

sqlDB.Exec(t, `PAUSE JOB $1`, changefeedID)
waitForJobStatus(sqlDB, t, changefeedID, `paused`)
Expand All @@ -492,16 +490,13 @@ func TestAlterChangefeedPersistSinkURI(t *testing.T) {
details, ok := job.Details().(jobspb.ChangefeedDetails)
require.True(t, ok)

require.Equal(t, details.SinkURI,
fmt.Sprintf(`s3://%s/fake/path?AWS_ACCESS_KEY_ID=%s&AWS_SECRET_ACCESS_KEY=%s`, bucket, accessKey, secretKey))
require.Equal(t, unredactedSinkURI, details.SinkURI)
}

func TestAlterChangefeedChangeSinkTypeError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

bucket, accessKey, secretKey := checkS3Credentials(t)

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
Expand All @@ -516,8 +511,8 @@ func TestAlterChangefeedChangeSinkTypeError(t *testing.T) {
waitForJobStatus(sqlDB, t, feed.JobID(), `paused`)

sqlDB.ExpectErr(t,
`pq: New sink type "s3" does not match original sink type "kafka". Altering the sink type of a changefeed is disallowed, consider creating a new changefeed instead.`,
fmt.Sprintf(`ALTER CHANGEFEED %d SET sink = 's3://%s/fake/path?AWS_ACCESS_KEY_ID=%s&AWS_SECRET_ACCESS_KEY=%s'`, feed.JobID(), bucket, accessKey, secretKey),
`pq: New sink type "null" does not match original sink type "kafka". Altering the sink type of a changefeed is disallowed, consider creating a new changefeed instead.`,
fmt.Sprintf(`ALTER CHANGEFEED %d SET sink = 'null://'`, feed.JobID()),
)
}

Expand Down
178 changes: 7 additions & 171 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
Expand Down Expand Up @@ -87,7 +85,7 @@ type changeAggregator struct {
// eventProducer produces the next event from the kv feed.
eventProducer kvevent.Reader
// eventConsumer consumes the event.
eventConsumer kvEventConsumer
eventConsumer *kvEventToRowConsumer

// lastFlush and flushFrequency keep track of the flush frequency.
lastFlush time.Time
Expand Down Expand Up @@ -294,21 +292,17 @@ func (ca *changeAggregator) Start(ctx context.Context) {
kvFeedHighWater = ca.spec.Feed.StatementTime
}

ca.eventProducer, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, endTime, ca.sliMetrics)
ca.eventProducer, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, endTime)
if err != nil {
// Early abort in the case that there is an error creating the sink.
ca.MoveToDraining(err)
ca.cancel()
return
}

if ca.spec.Feed.Opts[changefeedbase.OptFormat] == string(changefeedbase.OptFormatNative) {
ca.eventConsumer = newNativeKVConsumer(ca.sink)
} else {
ca.eventConsumer = newKVEventToRowConsumer(
ctx, ca.flowCtx.Cfg, ca.frontier.SpanFrontier(), kvFeedHighWater,
ca.sink, ca.encoder, ca.spec.Feed, ca.knobs, ca.topicNamer)
}
ca.eventConsumer = newKVEventToRowConsumer(
ctx, ca.flowCtx.Cfg, ca.frontier.SpanFrontier(), kvFeedHighWater,
ca.sink, ca.encoder, ca.spec.Feed, ca.knobs, ca.topicNamer)
}

func (ca *changeAggregator) startKVFeed(
Expand All @@ -317,7 +311,6 @@ func (ca *changeAggregator) startKVFeed(
initialHighWater hlc.Timestamp,
needsInitialScan bool,
endTime hlc.Timestamp,
sm *sliMetrics,
) (kvevent.Reader, error) {
cfg := ca.flowCtx.Cfg
buf := kvevent.NewThrottlingBuffer(
Expand All @@ -326,7 +319,7 @@ func (ca *changeAggregator) startKVFeed(

// KVFeed takes ownership of the kvevent.Writer portion of the buffer, while
// we return the kvevent.Reader part to the caller.
kvfeedCfg := ca.makeKVFeedCfg(ctx, spans, buf, initialHighWater, needsInitialScan, endTime, sm)
kvfeedCfg := ca.makeKVFeedCfg(ctx, spans, buf, initialHighWater, needsInitialScan, endTime)

// Give errCh enough buffer both possible errors from supporting goroutines,
// but only the first one is ever used.
Expand Down Expand Up @@ -358,7 +351,6 @@ func (ca *changeAggregator) makeKVFeedCfg(
initialHighWater hlc.Timestamp,
needsInitialScan bool,
endTime hlc.Timestamp,
sm *sliMetrics,
) kvfeed.Config {
schemaChangeEvents := changefeedbase.SchemaChangeEventClass(
ca.spec.Feed.Opts[changefeedbase.OptSchemaChangeEvents])
Expand Down Expand Up @@ -657,11 +649,6 @@ func (ca *changeAggregator) ConsumerClosed() {
ca.close()
}

type kvEventConsumer interface {
// ConsumeEvent responsible for consuming kv event.
ConsumeEvent(ctx context.Context, event kvevent.Event) error
}

type kvEventToRowConsumer struct {
frontier *span.Frontier
encoder Encoder
Expand All @@ -676,8 +663,6 @@ type kvEventToRowConsumer struct {
topicNamer *TopicNamer
}

var _ kvEventConsumer = &kvEventToRowConsumer{}

func newKVEventToRowConsumer(
ctx context.Context,
cfg *execinfra.ServerConfig,
Expand All @@ -688,7 +673,7 @@ func newKVEventToRowConsumer(
details jobspb.ChangefeedDetails,
knobs TestingKnobs,
topicNamer *TopicNamer,
) kvEventConsumer {
) *kvEventToRowConsumer {
rfCache := newRowFetcherCache(
ctx,
cfg.Codec,
Expand All @@ -711,129 +696,6 @@ func newKVEventToRowConsumer(
}
}

type tableDescriptorTopic struct {
tableDesc catalog.TableDescriptor
spec jobspb.ChangefeedTargetSpecification
nameComponentsCache []string
identifierCache TopicIdentifier
}

// GetNameComponents implements the TopicDescriptor interface
func (tdt *tableDescriptorTopic) GetNameComponents() []string {
if len(tdt.nameComponentsCache) == 0 {
tdt.nameComponentsCache = []string{tdt.spec.StatementTimeName}
}
return tdt.nameComponentsCache
}

// GetTopicIdentifier implements the TopicDescriptor interface
func (tdt *tableDescriptorTopic) GetTopicIdentifier() TopicIdentifier {
if tdt.identifierCache.TableID == 0 {
tdt.identifierCache = TopicIdentifier{
TableID: tdt.tableDesc.GetID(),
}
}
return tdt.identifierCache
}

// GetVersion implements the TopicDescriptor interface
func (tdt *tableDescriptorTopic) GetVersion() descpb.DescriptorVersion {
return tdt.tableDesc.GetVersion()
}

// GetTargetSpecification implements the TopicDescriptor interface
func (tdt *tableDescriptorTopic) GetTargetSpecification() jobspb.ChangefeedTargetSpecification {
return tdt.spec
}

var _ TopicDescriptor = &tableDescriptorTopic{}

type columnFamilyTopic struct {
tableDesc catalog.TableDescriptor
familyDesc descpb.ColumnFamilyDescriptor
spec jobspb.ChangefeedTargetSpecification
nameComponentsCache []string
identifierCache TopicIdentifier
}

// GetNameComponents implements the TopicDescriptor interface
func (cft *columnFamilyTopic) GetNameComponents() []string {
if len(cft.nameComponentsCache) == 0 {
cft.nameComponentsCache = []string{
cft.spec.StatementTimeName,
cft.familyDesc.Name,
}
}
return cft.nameComponentsCache
}

// GetTopicIdentifier implements the TopicDescriptor interface
func (cft *columnFamilyTopic) GetTopicIdentifier() TopicIdentifier {
if cft.identifierCache.TableID == 0 {
cft.identifierCache = TopicIdentifier{
TableID: cft.tableDesc.GetID(),
FamilyID: cft.familyDesc.ID,
}
}
return cft.identifierCache
}

// GetVersion implements the TopicDescriptor interface
func (cft *columnFamilyTopic) GetVersion() descpb.DescriptorVersion {
return cft.tableDesc.GetVersion()
}

// GetTargetSpecification implements the TopicDescriptor interface
func (cft *columnFamilyTopic) GetTargetSpecification() jobspb.ChangefeedTargetSpecification {
return cft.spec
}

var _ TopicDescriptor = &columnFamilyTopic{}

type noTopic struct{}

func (n noTopic) GetNameComponents() []string {
return []string{}
}

func (n noTopic) GetTopicIdentifier() TopicIdentifier {
return TopicIdentifier{}
}

func (n noTopic) GetVersion() descpb.DescriptorVersion {
return 0
}

func (n noTopic) GetTargetSpecification() jobspb.ChangefeedTargetSpecification {
return jobspb.ChangefeedTargetSpecification{}
}

var _ TopicDescriptor = &noTopic{}

func makeTopicDescriptorFromSpecForRow(
s jobspb.ChangefeedTargetSpecification, r encodeRow,
) (TopicDescriptor, error) {
switch s.Type {
case jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY:
return &tableDescriptorTopic{
tableDesc: r.tableDesc,
spec: s,
}, nil
case jobspb.ChangefeedTargetSpecification_EACH_FAMILY, jobspb.ChangefeedTargetSpecification_COLUMN_FAMILY:
familyDesc, err := r.tableDesc.FindFamilyByID(r.familyID)
if err != nil {
return noTopic{}, err
}
return &columnFamilyTopic{
tableDesc: r.tableDesc,
spec: s,
familyDesc: *familyDesc,
}, nil
default:
return noTopic{}, errors.AssertionFailedf("Unsupported target type %s", s.Type)
}
}

func (c *kvEventToRowConsumer) topicForRow(r encodeRow) (TopicDescriptor, error) {
if topic, ok := c.topicDescriptorCache[TopicIdentifier{TableID: r.tableDesc.GetID(), FamilyID: r.familyID}]; ok {
if topic.GetVersion() == r.tableDesc.GetVersion() {
Expand Down Expand Up @@ -1043,32 +905,6 @@ func (c *kvEventToRowConsumer) eventToRow(
return r, nil
}

type nativeKVConsumer struct {
sink Sink
}

var _ kvEventConsumer = &nativeKVConsumer{}

func newNativeKVConsumer(sink Sink) kvEventConsumer {
return &nativeKVConsumer{sink: sink}
}

// ConsumeEvent implements kvEventConsumer interface.
func (c *nativeKVConsumer) ConsumeEvent(ctx context.Context, ev kvevent.Event) error {
if ev.Type() != kvevent.TypeKV {
return errors.AssertionFailedf("expected kv ev, got %v", ev.Type())
}
keyBytes := []byte(ev.KV().Key)
val := ev.KV().Value
valBytes, err := protoutil.Marshal(&val)
if err != nil {
return err
}

return c.sink.EmitRow(
ctx, &noTopic{}, keyBytes, valBytes, val.Timestamp, val.Timestamp, ev.DetachAlloc())
}

const (
emitAllResolved = 0
emitNoResolved = -1
Expand Down
Loading

0 comments on commit a30f5a3

Please sign in to comment.