Skip to content

Commit

Permalink
kv: create RangeFeedMessage type to encapsulate RangeFeedEvent
Browse files Browse the repository at this point in the history
This PR creates kvcoor.RangeFeedMessage encapsulating
RangeFeedEvent and additional RegisteredSpan field which
is the span of the rangefeed registration that emits this
event.

This field is to help rangefeed clients to properly trim
the received SST with only single registered span. In the
case of creating rangefeed with multiple spans, rangefeed
consumers may end up receiving duplicate SST without
proper trimming using this field.

Release note: None
  • Loading branch information
gh-casper committed Aug 2, 2022
1 parent f461294 commit b000151
Show file tree
Hide file tree
Showing 15 changed files with 133 additions and 69 deletions.
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ type rangefeedFactory func(
ctx context.Context,
spans []kvcoord.SpanTimePair,
withDiff bool,
eventC chan<- *roachpb.RangeFeedEvent,
eventC chan<- kvcoord.RangeFeedMessage,
) error

type rangefeed struct {
memBuf kvevent.Writer
cfg rangeFeedConfig
eventC chan *roachpb.RangeFeedEvent
eventC chan kvcoord.RangeFeedMessage
knobs TestingKnobs
}

Expand All @@ -68,7 +68,7 @@ func (p rangefeedFactory) Run(ctx context.Context, sink kvevent.Writer, cfg rang
feed := rangefeed{
memBuf: sink,
cfg: cfg,
eventC: make(chan *roachpb.RangeFeedEvent, 128),
eventC: make(chan kvcoord.RangeFeedMessage, 128),
knobs: cfg.Knobs,
}
g := ctxgroup.WithContext(ctx)
Expand Down
7 changes: 5 additions & 2 deletions pkg/ccl/streamingccl/streamproducer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,14 @@ func (s *eventStream) onSpanCompleted(ctx context.Context, sp roachpb.Span) erro
}
}

func (s *eventStream) onSSTable(ctx context.Context, sst *roachpb.RangeFeedSSTable) {
func (s *eventStream) onSSTable(
ctx context.Context, sst *roachpb.RangeFeedSSTable, registeredSpan roachpb.Span,
) {
select {
case <-ctx.Done():
case s.eventsCh <- roachpb.RangeFeedEvent{SST: sst}:
log.VInfof(ctx, 1, "onSSTable: %s@%s", sst.Span, sst.WriteTS)
log.VInfof(ctx, 1, "onSSTable: %s@%s with registered span %s",
sst.Span, sst.WriteTS, registeredSpan)
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"lock_spans_over_budget_error.go",
"node_store.go",
"range_iter.go",
"rangefeed_message.go",
"replica_slice.go",
"testing_knobs.go",
"transport.go",
Expand Down
12 changes: 7 additions & 5 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (ds *DistSender) RangeFeed(
spans []roachpb.Span,
startAfter hlc.Timestamp, // exclusive
withDiff bool,
eventCh chan<- *roachpb.RangeFeedEvent,
eventCh chan<- RangeFeedMessage,
) error {
timedSpans := make([]SpanTimePair, 0, len(spans))
for _, sp := range spans {
Expand All @@ -110,7 +110,7 @@ type SpanTimePair struct {
// RangeFeedSpans is similar to RangeFeed but allows specification of different
// starting time for each span.
func (ds *DistSender) RangeFeedSpans(
ctx context.Context, spans []SpanTimePair, withDiff bool, eventCh chan<- *roachpb.RangeFeedEvent,
ctx context.Context, spans []SpanTimePair, withDiff bool, eventCh chan<- RangeFeedMessage,
) error {
if len(spans) == 0 {
return errors.AssertionFailedf("expected at least 1 span, got none")
Expand Down Expand Up @@ -299,7 +299,7 @@ func (ds *DistSender) partialRangeFeed(
withDiff bool,
catchupSem *limit.ConcurrentRequestLimiter,
rangeCh chan<- singleRangeInfo,
eventCh chan<- *roachpb.RangeFeedEvent,
eventCh chan<- RangeFeedMessage,
) error {
// Bound the partial rangefeed to the partial span.
span := rs.AsRawSpanWithNoLocals()
Expand Down Expand Up @@ -407,7 +407,7 @@ func (ds *DistSender) singleRangeFeed(
withDiff bool,
desc *roachpb.RangeDescriptor,
catchupSem *limit.ConcurrentRequestLimiter,
eventCh chan<- *roachpb.RangeFeedEvent,
eventCh chan<- RangeFeedMessage,
onRangeEvent onRangeEventCb,
) (hlc.Timestamp, error) {
// Ensure context is cancelled on all errors, to prevent gRPC stream leaks.
Expand Down Expand Up @@ -491,6 +491,7 @@ func (ds *DistSender) singleRangeFeed(
if err != nil {
return args.Timestamp, err
}
msg := RangeFeedMessage{RangeFeedEvent: event, RegisteredSpan: span}
switch t := event.GetValue().(type) {
case *roachpb.RangeFeedCheckpoint:
if t.Span.Contains(args.Span) {
Expand All @@ -500,6 +501,7 @@ func (ds *DistSender) singleRangeFeed(
}
args.Timestamp.Forward(t.ResolvedTS.Next())
}
case *roachpb.RangeFeedSSTable:
case *roachpb.RangeFeedError:
log.VErrEventf(ctx, 2, "RangeFeedError: %s", t.Error.GoError())
if catchupRes != nil {
Expand All @@ -510,7 +512,7 @@ func (ds *DistSender) singleRangeFeed(
onRangeEvent(args.Replica.NodeID, desc.RangeID, event)

select {
case eventCh <- event:
case eventCh <- msg:
case <-ctx.Done():
return args.Timestamp, ctx.Err()
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/kv/kvclient/kvcoord/rangefeed_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvcoord

import "github.com/cockroachdb/cockroach/pkg/roachpb"

// RangeFeedMessage is a type that encapsulates the roachpb.RangeFeedEvent.
type RangeFeedMessage struct {

// RangeFeed event this message holds.
*roachpb.RangeFeedEvent

// The span of the rangefeed registration that overlaps with SST span if
// event has an underlying roachpb.RangeFeedSSTable event.
RegisteredSpan roachpb.Span
}
1 change: 1 addition & 0 deletions pkg/kv/kvclient/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ go_test(
"//pkg/base",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/security/securityassets",
Expand Down
8 changes: 7 additions & 1 deletion pkg/kv/kvclient/rangefeed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ func WithOnCheckpoint(f OnCheckpoint) Option {
// provided, a catchup scan will be run instead that will include the contents
// of these SSTs.
//
// 'registeredSpan' is a span of rangefeed registration that emits the SSTable.
//
// Note that the SST is emitted as it was ingested, so it may contain keys
// outside of the rangefeed span, and the caller should prune the SST contents
// as appropriate. Futhermore, these events do not contain previous values as
Expand All @@ -169,7 +171,11 @@ func WithOnCheckpoint(f OnCheckpoint) Option {
// MVCCHistoryMutationError and thus will not be emitted here -- there should be
// no such requests into spans with rangefeeds across them, but it is up to
// callers to ensure this.
type OnSSTable func(ctx context.Context, sst *roachpb.RangeFeedSSTable)
type OnSSTable func(
ctx context.Context,
sst *roachpb.RangeFeedSSTable,
registeredSpan roachpb.Span,
)

// WithOnSSTable sets up a callback that's invoked whenever an SSTable is
// ingested.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/rangefeed/db_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (dbc *dbAdapter) RangeFeed(
spans []roachpb.Span,
startFrom hlc.Timestamp,
withDiff bool,
eventC chan<- *roachpb.RangeFeedEvent,
eventC chan<- kvcoord.RangeFeedMessage,
) error {
return dbc.distSender.RangeFeed(ctx, spans, startFrom, withDiff, eventC)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvclient/rangefeed/mocks_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions pkg/kv/kvclient/rangefeed/rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -54,7 +55,7 @@ type DB interface {
spans []roachpb.Span,
startFrom hlc.Timestamp,
withDiff bool,
eventC chan<- *roachpb.RangeFeedEvent,
eventC chan<- kvcoord.RangeFeedMessage,
) error

// Scan encapsulates scanning a key span at a given point in time. The method
Expand Down Expand Up @@ -279,7 +280,7 @@ func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) {

// TODO(ajwerner): Consider adding event buffering. Doing so would require
// draining when the rangefeed fails.
eventCh := make(chan *roachpb.RangeFeedEvent)
eventCh := make(chan kvcoord.RangeFeedMessage)

for i := 0; r.Next(); i++ {
ts := frontier.Frontier()
Expand Down Expand Up @@ -333,7 +334,7 @@ func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) {

// processEvents processes events sent by the rangefeed on the eventCh.
func (f *RangeFeed) processEvents(
ctx context.Context, frontier *span.Frontier, eventCh <-chan *roachpb.RangeFeedEvent,
ctx context.Context, frontier *span.Frontier, eventCh <-chan kvcoord.RangeFeedMessage,
) error {
for {
select {
Expand All @@ -357,7 +358,7 @@ func (f *RangeFeed) processEvents(
return errors.AssertionFailedf(
"received unexpected rangefeed SST event with no OnSSTable handler")
}
f.onSSTable(ctx, ev.SST)
f.onSSTable(ctx, ev.SST, ev.RegisteredSpan)
case ev.DeleteRange != nil:
if f.onDeleteRange == nil {
if kvserverbase.GlobalMVCCRangeTombstoneForTesting {
Expand Down
23 changes: 15 additions & 8 deletions pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -490,7 +491,7 @@ func TestWithOnSSTable(t *testing.T) {
// narrower.
var once sync.Once
checkpointC := make(chan struct{})
sstC := make(chan *roachpb.RangeFeedSSTable)
sstC := make(chan kvcoord.RangeFeedMessage)
spans := []roachpb.Span{{Key: roachpb.Key("c"), EndKey: roachpb.Key("e")}}
r, err := f.RangeFeed(ctx, "test", spans, db.Clock().Now(),
func(ctx context.Context, value *roachpb.RangeFeedValue) {},
Expand All @@ -499,9 +500,14 @@ func TestWithOnSSTable(t *testing.T) {
close(checkpointC)
})
}),
rangefeed.WithOnSSTable(func(ctx context.Context, sst *roachpb.RangeFeedSSTable) {
rangefeed.WithOnSSTable(func(ctx context.Context, sst *roachpb.RangeFeedSSTable, registeredSpan roachpb.Span) {
select {
case sstC <- sst:
case sstC <- kvcoord.RangeFeedMessage{
RangeFeedEvent: &roachpb.RangeFeedEvent{
SST: sst,
},
RegisteredSpan: registeredSpan,
}:
case <-ctx.Done():
}
}),
Expand Down Expand Up @@ -533,16 +539,17 @@ func TestWithOnSSTable(t *testing.T) {
require.Nil(t, pErr)

// Wait for the SST event and check its contents.
var sstEvent *roachpb.RangeFeedSSTable
var sstMessage kvcoord.RangeFeedMessage
select {
case sstEvent = <-sstC:
case sstMessage = <-sstC:
case <-time.After(3 * time.Second):
require.Fail(t, "timed out waiting for SST event")
}

require.Equal(t, roachpb.Span{Key: sstStart, EndKey: sstEnd}, sstEvent.Span)
require.Equal(t, now, sstEvent.WriteTS)
require.Equal(t, sstKVs, storageutils.ScanSST(t, sstEvent.Data))
require.Equal(t, roachpb.Span{Key: sstStart, EndKey: sstEnd}, sstMessage.SST.Span)
require.Equal(t, now, sstMessage.SST.WriteTS)
require.Equal(t, sstKVs, storageutils.ScanSST(t, sstMessage.SST.Data))
require.Equal(t, spans[0], sstMessage.RegisteredSpan)
}

// TestWithOnSSTableCatchesUpIfNotSet tests that the rangefeed runs a catchup
Expand Down
Loading

0 comments on commit b000151

Please sign in to comment.