Skip to content

Commit

Permalink
Merge pull request cockroachdb#82312 from HonoreDB/fix_changefeed_upg…
Browse files Browse the repository at this point in the history
…rade_failure_backport

changefeedccl: handle old-style protobufs in rowfetcher_cache
  • Loading branch information
miretskiy authored Jun 2, 2022
2 parents b18717a + e4899fe commit 6f1582d
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 10 deletions.
5 changes: 4 additions & 1 deletion pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,11 @@ func createBenchmarkChangefeed(
return nil, nil, err
}
serverCfg := s.DistSQLServer().(*distsql.ServerImpl).ServerConfig
eventConsumer := newKVEventToRowConsumer(ctx, &serverCfg, sf, initialHighWater,
eventConsumer, err := newKVEventToRowConsumer(ctx, &serverCfg, sf, initialHighWater,
sink, encoder, details, TestingKnobs{}, nil)
if err != nil {
return nil, nil, err
}
tickFn := func(ctx context.Context) (*jobspb.ResolvedSpan, error) {
event, err := buf.Get(ctx)
if err != nil {
Expand Down
21 changes: 15 additions & 6 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func newChangeAggregatorProcessor(
}

if _, needTopics := ca.spec.Feed.Opts[changefeedbase.OptTopicInValue]; needTopics {
ca.topicNamer, err = MakeTopicNamer(ca.spec.Feed.TargetSpecifications)
ca.topicNamer, err = MakeTopicNamer(AllTargets(ca.spec.Feed))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -302,10 +302,16 @@ func (ca *changeAggregator) Start(ctx context.Context) {
if ca.spec.Feed.Opts[changefeedbase.OptFormat] == string(changefeedbase.OptFormatNative) {
ca.eventConsumer = newNativeKVConsumer(ca.sink)
} else {
ca.eventConsumer = newKVEventToRowConsumer(
ca.eventConsumer, err = newKVEventToRowConsumer(
ctx, ca.flowCtx.Cfg, ca.frontier.SpanFrontier(), initialHighWater,
ca.sink, ca.encoder, ca.spec.Feed, ca.knobs, ca.topicNamer)
}
if err != nil {
// Early abort in the case that there is an error creating the consumer.
ca.MoveToDraining(err)
ca.cancel()
return
}
}

func (ca *changeAggregator) startKVFeed(
Expand Down Expand Up @@ -704,15 +710,18 @@ func newKVEventToRowConsumer(
details jobspb.ChangefeedDetails,
knobs TestingKnobs,
topicNamer *TopicNamer,
) kvEventConsumer {
rfCache := newRowFetcherCache(
) (kvEventConsumer, error) {
rfCache, err := newRowFetcherCache(
ctx,
cfg.Codec,
cfg.LeaseManager.(*lease.Manager),
cfg.CollectionFactory,
cfg.DB,
details,
)
if err != nil {
return nil, err
}

return &kvEventToRowConsumer{
frontier: frontier,
Expand All @@ -724,7 +733,7 @@ func newKVEventToRowConsumer(
knobs: knobs,
topicDescriptorCache: make(map[TopicIdentifier]TopicDescriptor),
topicNamer: topicNamer,
}
}, nil
}

type tableDescriptorTopic struct {
Expand Down Expand Up @@ -860,7 +869,7 @@ func (c *kvEventToRowConsumer) topicForRow(r encodeRow) (TopicDescriptor, error)
if err != nil {
return noTopic{}, err
}
for _, s := range c.details.TargetSpecifications {
for _, s := range AllTargets(c.details) {
if s.TableID == r.tableDesc.GetID() && (s.FamilyName == "" || s.FamilyName == family.Name) {
topic, err := makeTopicDescriptorFromSpecForRow(s, r)
if err != nil {
Expand Down
9 changes: 6 additions & 3 deletions pkg/ccl/changefeedccl/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,11 @@ func newRowFetcherCache(
cf *descs.CollectionFactory,
db *kv.DB,
details jobspb.ChangefeedDetails,
) *rowFetcherCache {
specs := details.TargetSpecifications
) (*rowFetcherCache, error) {
specs := AllTargets(details)
if len(specs) == 0 {
return nil, errors.Newf("Could not derive any target specifications from %v", details)
}
watchedFamilies := make(map[watchedFamily]struct{}, len(specs))
for _, s := range specs {
watchedFamilies[watchedFamily{tableID: s.TableID, familyName: s.FamilyName}] = struct{}{}
Expand All @@ -92,7 +95,7 @@ func newRowFetcherCache(
db: db,
fetchers: cache.NewUnorderedCache(rfCacheConfig),
watchedFamilies: watchedFamilies,
}
}, nil
}

func (c *rowFetcherCache) TableDescForKey(
Expand Down

0 comments on commit 6f1582d

Please sign in to comment.