Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: emit MVCC range tombstones over rangefeeds #76443

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,20 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error {
// expect SST ingestion into spans with active changefeeds.
return errors.Errorf("unexpected SST ingestion: %v", t)

case *roachpb.RangeFeedDeleteRange:
// For now, we just error on MVCC range tombstones. These are currently
// only expected to be used by schema GC and IMPORT INTO, and such spans
// should not have active changefeeds across them.
//
// TODO(erikgrinaker): Write an end-to-end test which verifies that an
// IMPORT INTO which gets rolled back using MVCC range tombstones will
// not be visible to a changefeed, neither when started before the
// import or when resuming from a timestamp before the import. The table
// decriptor should be marked as offline during the import, and catchup
// scans should detect this and prevent reading anything in that
// timespan.
return errors.Errorf("unexpected MVCC range deletion: %v", t)

default:
return errors.Errorf("unexpected RangeFeedEvent variant %v", t)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamproducer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ func (s *eventStream) streamLoop(ctx context.Context, frontier *span.Frontier) e
}
default:
// TODO(yevgeniy): Handle SSTs.
// TODO(erikgrinaker): Handle DeleteRange events (MVCC range tombstones).
Comment on lines 388 to +389
Copy link
Contributor Author

@erikgrinaker erikgrinaker Feb 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@miretskiy @dt Do we need to address these TODOs before stability in order to test streaming replication with 22.1? That is, add handling for AddSSTable and MVCC range tombstones in streaming replication.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We decided that we will test what we can -- even if some of those ops are not ready yet.
So, these TODOs are fine for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. We have pending PRs/issues for the necessary APIs to ingest these on the target cluster, and plan to get these in for 22.1 in experimental form. We'll be maturing these APIs during stability and the 22.1 cycle. So if we get the streaming ingestion code in place before stability as well (i.e. simply make the appropriate API calls) then we should be able to do full end-to-end testing of streaming replication sometime during the 22.1 cycle.

return errors.AssertionFailedf("unexpected event")
}
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/kv/kvclient/rangefeed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type config struct {
onCheckpoint OnCheckpoint
onFrontierAdvance OnFrontierAdvance
onSSTable OnSSTable
onDeleteRange OnDeleteRange
extraPProfLabels []string
}

Expand Down Expand Up @@ -178,6 +179,25 @@ func WithOnSSTable(f OnSSTable) Option {
})
}

// OnDeleteRange is called when an MVCC range tombstone is written (e.g. when
// DeleteRange is called with UseExperimentalRangeTombstone, but not when the
// range is deleted using point tombstones). If this callback is not provided,
// such range tombstones will be ignored.
//
// MVCC range tombstones are currently experimental, and disabled by default.
// They are only written when storage.CanUseExperimentalMVCCRangeTombstones() is
// true, and are only expected during certain operations like schema GC and
// IMPORT INTO (i.e. not across live tables).
type OnDeleteRange func(ctx context.Context, value *roachpb.RangeFeedDeleteRange)

// WithOnDeleteRange sets up a callback that's invoked whenever a MVCC range
// deletion tombstone is written.
func WithOnDeleteRange(f OnDeleteRange) Option {
return optionFunc(func(c *config) {
c.onDeleteRange = f
})
}

// OnFrontierAdvance is called when the rangefeed frontier is advanced with the
// new frontier timestamp.
type OnFrontierAdvance func(ctx context.Context, timestamp hlc.Timestamp)
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvclient/rangefeed/rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,10 @@ func (f *RangeFeed) processEvents(
"received unexpected rangefeed SST event with no OnSSTable handler")
}
f.onSSTable(ctx, ev.SST)
case ev.DelRange != nil:
if f.onDeleteRange != nil {
f.onDeleteRange(ctx, ev.DelRange)
}
case ev.Error != nil:
// Intentionally do nothing, we'll get an error returned from the
// call to RangeFeed.
Expand Down
112 changes: 112 additions & 0 deletions pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,118 @@ func TestWithOnSSTableCatchesUpIfNotSet(t *testing.T) {
require.Equal(t, expectKVs, seenKVs)
}

// TestWithDeleteRange tests that the rangefeed emits MVCC range tombstones.
func TestWithOnDeleteRange(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)
srv := tc.Server(0)
db := srv.DB()

_, _, err := tc.SplitRange(roachpb.Key("a"))
require.NoError(t, err)
require.NoError(t, tc.WaitForFullReplication())

_, err = tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.rangefeed.enabled = true")
require.NoError(t, err)
f, err := rangefeed.NewFactory(srv.Stopper(), db, srv.ClusterSettings(), nil)
require.NoError(t, err)

// We lay down a couple of range tombstones and points. The first range
// tombstone should not be visible, because initial scans do not emit
// tombstones. The second one should be visible, because catchup scans do emit
// tombstones. The range tombstone should be ordered after the initial point,
// but before the catchup point.
require.NoError(t, db.ExperimentalDelRangeUsingTombstone(ctx, "c", "d"))
require.NoError(t, db.Put(ctx, "foo", "initial"))
rangeFeedTS := db.Clock().Now()
require.NoError(t, db.ExperimentalDelRangeUsingTombstone(ctx, "d", "z"))
require.NoError(t, db.Put(ctx, "foo", "catchup"))

// We start the rangefeed over a narrower span than the DeleteRanges (c-g),
// to ensure the DeleteRange event is truncated to the registration span.
var once sync.Once
checkpointC := make(chan struct{})
deleteRangeC := make(chan *roachpb.RangeFeedDeleteRange)
rowC := make(chan *roachpb.RangeFeedValue)
spans := []roachpb.Span{{Key: roachpb.Key("c"), EndKey: roachpb.Key("g")}}
r, err := f.RangeFeed(ctx, "test", spans, rangeFeedTS,
func(ctx context.Context, e *roachpb.RangeFeedValue) {
select {
case rowC <- e:
case <-ctx.Done():
}
},
rangefeed.WithInitialScan(nil),
rangefeed.WithOnCheckpoint(func(ctx context.Context, checkpoint *roachpb.RangeFeedCheckpoint) {
once.Do(func() {
close(checkpointC)
})
}),
rangefeed.WithOnDeleteRange(func(ctx context.Context, e *roachpb.RangeFeedDeleteRange) {
select {
case deleteRangeC <- e:
case <-ctx.Done():
}
}),
)
require.NoError(t, err)
defer r.Close()

// Wait for initial scan. We should see the foo=initial point, but not the
// range tombstone.
select {
case e := <-rowC:
require.Equal(t, roachpb.Key("foo"), e.Key)
value, err := e.Value.GetBytes()
require.NoError(t, err)
require.Equal(t, "initial", string(value))
case <-time.After(3 * time.Second):
require.Fail(t, "timed out waiting for DeleteRange event")
}

// Wait for catchup scan. We should see the second range tombstone, truncated
// to the rangefeed bounds (c-g), and it should be ordered before the point
// foo=catchup.
select {
case e := <-deleteRangeC:
require.Equal(t, roachpb.Span{Key: roachpb.Key("d"), EndKey: roachpb.Key("g")}, e.Span)
require.NotEmpty(t, e.Timestamp)
case <-time.After(3 * time.Second):
require.Fail(t, "timed out waiting for DeleteRange event")
}

select {
case e := <-rowC:
require.Equal(t, roachpb.Key("foo"), e.Key)
value, err := e.Value.GetBytes()
require.NoError(t, err)
require.Equal(t, "catchup", string(value))
case <-time.After(3 * time.Second):
require.Fail(t, "timed out waiting for DeleteRange event")
}

// Wait for checkpoint after catchup scan.
select {
case <-checkpointC:
case <-time.After(3 * time.Second):
require.Fail(t, "timed out waiting for checkpoint")
}

// Send another DeleteRange, and wait for the rangefeed event. This should
// be truncated to the rangefeed bounds (c-g).
require.NoError(t, db.ExperimentalDelRangeUsingTombstone(ctx, "a", "z"))
select {
case e := <-deleteRangeC:
require.Equal(t, roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("g")}, e.Span)
require.NotEmpty(t, e.Timestamp)
case <-time.After(3 * time.Second):
require.Fail(t, "timed out waiting for DeleteRange event")
}
}

// TestUnrecoverableErrors verifies that unrecoverable internal errors are surfaced
// to callers.
func TestUnrecoverableErrors(t *testing.T) {
Expand Down
63 changes: 62 additions & 1 deletion pkg/kv/kvserver/rangefeed/catchup_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import (
// A CatchUpIterator is an iterator for catchUp-scans.
type CatchUpIterator struct {
storage.SimpleMVCCIterator
close func()
rangeKeyIter *storage.MVCCRangeKeyIterator
close func()
}

// NewCatchUpIterator returns a CatchUpIterator for the given Reader.
Expand Down Expand Up @@ -68,13 +69,22 @@ func NewCatchUpIterator(
})
}

ret.rangeKeyIter = storage.NewMVCCRangeKeyIterator(reader, storage.MVCCRangeKeyIterOptions{
LowerBound: args.Span.Key,
UpperBound: args.Span.EndKey,
MinTimestamp: args.Timestamp,
})

return ret
}

// Close closes the iterator and calls the instantiator-supplied close
// callback.
func (i *CatchUpIterator) Close() {
i.SimpleMVCCIterator.Close()
if i.rangeKeyIter != nil { // can be nil in tests
i.rangeKeyIter.Close()
}
if i.close != nil {
i.close()
}
Expand All @@ -94,6 +104,23 @@ func (i *CatchUpIterator) CatchUpScan(
withDiff bool,
outputFn outputEventFn,
) error {
// We emit any MVCC range tombstones first, so that the consumer can use them
// to potentially discard point keys below them if they wish.
//
// Note that MVCC range tombstones are currently experimental, and will only
// be used when storage.CanUseExperimentalMVCCRangeTombstones() is enabled.
erikgrinaker marked this conversation as resolved.
Show resolved Hide resolved
//
// TODO(erikgrinaker): We don't pass in startKey, endKey, and catchUpTimestamp
// here because we've already set the appropriate bounds when the iterator was
// constructed. This assumes that the caller always passes in the same bounds
// to CatchUpScan() as it does to NewCatchUpIterator(), otherwise this will
// emit incorrect events. Verify that this is always the case, and remove the
// redundant parameters to this method as it is unsafe to pass other values.
err := i.catchUpScanRangeTombstones(outputFn)
if err != nil {
return err
}
erikgrinaker marked this conversation as resolved.
Show resolved Hide resolved

var a bufalloc.ByteAllocator
// MVCCIterator will encounter historical values for each key in
// reverse-chronological order. To output in chronological order, store
Expand Down Expand Up @@ -242,3 +269,37 @@ func (i *CatchUpIterator) CatchUpScan(
// Output events for the last key encountered.
return outputEvents()
}

// catchupScanRangeTombstones runs a catchup scan for range tombstones.
func (i *CatchUpIterator) catchUpScanRangeTombstones(outputFn outputEventFn) error {
aliher1911 marked this conversation as resolved.
Show resolved Hide resolved
if i.rangeKeyIter == nil { // can be nil in tests
return nil
}

for {
if ok, err := i.rangeKeyIter.Valid(); err != nil {
return err
} else if !ok {
break
}

rangeKey, value := i.rangeKeyIter.Key(), i.rangeKeyIter.Value()
if len(value) > 0 {
// We currently expect all range keys to be tombstones.
return errors.AssertionFailedf("unexpected non-tombstone range key %s with value %x",
rangeKey, value)
}

err := outputFn(&roachpb.RangeFeedEvent{
DelRange: &roachpb.RangeFeedDeleteRange{
Span: roachpb.Span{Key: rangeKey.StartKey, EndKey: rangeKey.EndKey},
Timestamp: rangeKey.Timestamp,
},
})
if err != nil {
return err
}
i.rangeKeyIter.Next()
}
return nil
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func setupMVCCPebble(b testing.TB, dir string, lBaseMaxBytes int64, readOnly boo
opts.FS = vfs.Default
opts.LBaseMaxBytes = lBaseMaxBytes
opts.ReadOnly = readOnly
opts.FormatMajorVersion = pebble.FormatBlockPropertyCollector
opts.FormatMajorVersion = pebble.FormatRangeKeys
peb, err := storage.NewPebble(
context.Background(),
storage.PebbleConfig{
Expand Down
20 changes: 20 additions & 0 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,10 @@ func (p *Processor) consumeLogicalOps(ctx context.Context, ops []enginepb.MVCCLo
// Publish the new value directly.
p.publishValue(ctx, t.Key, t.Timestamp, t.Value, t.PrevValue)

case *enginepb.MVCCDeleteRangeOp:
// Publish the range deletion directly.
p.publishDeleteRange(ctx, t.StartKey, t.EndKey, t.Timestamp)

case *enginepb.MVCCWriteIntentOp:
// No updates to publish.

Expand Down Expand Up @@ -698,6 +702,22 @@ func (p *Processor) publishValue(
p.reg.PublishToOverlapping(roachpb.Span{Key: key}, &event)
}

func (p *Processor) publishDeleteRange(
ctx context.Context, startKey, endKey roachpb.Key, timestamp hlc.Timestamp,
) {
span := roachpb.Span{Key: startKey, EndKey: endKey}
if !p.Span.ContainsKeyRange(roachpb.RKey(startKey), roachpb.RKey(endKey)) {
log.Fatalf(ctx, "span %s not in Processor's key range %v", span, p.Span)
}

var event roachpb.RangeFeedEvent
event.MustSetValue(&roachpb.RangeFeedDeleteRange{
Span: span,
Timestamp: timestamp,
})
p.reg.PublishToOverlapping(span, &event)
}

func (p *Processor) publishSSTable(
ctx context.Context, sst []byte, sstSpan roachpb.Span, sstWTS hlc.Timestamp,
) {
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ func (r *registration) validateEvent(event *roachpb.RangeFeedEvent) {
if t.WriteTS.IsEmpty() {
panic(fmt.Sprintf("unexpected empty RangeFeedSSTable.Timestamp: %v", t))
}
case *roachpb.RangeFeedDeleteRange:
if len(t.Span.Key) == 0 || len(t.Span.EndKey) == 0 {
panic(fmt.Sprintf("unexpected empty key in RangeFeedDeleteRange.Span: %v", t))
}
if t.Timestamp.IsEmpty() {
panic(fmt.Sprintf("unexpected empty RangeFeedDeleteRange.Timestamp: %v", t))
}
default:
panic(fmt.Sprintf("unexpected RangeFeedEvent variant: %v", t))
}
Expand Down Expand Up @@ -215,6 +222,13 @@ func (r *registration) maybeStripEvent(event *roachpb.RangeFeedEvent) *roachpb.R
t = copyOnWrite().(*roachpb.RangeFeedCheckpoint)
t.Span = r.span
}
case *roachpb.RangeFeedDeleteRange:
// Truncate the range tombstone to the registration bounds.
if i := t.Span.Intersect(r.span); !i.Equal(t.Span) {
t = copyOnWrite().(*roachpb.RangeFeedDeleteRange)
t.Span = i.Clone()
}

case *roachpb.RangeFeedSSTable:
// SSTs are always sent in their entirety, it is up to the caller to
// filter out irrelevant entries.
Expand Down Expand Up @@ -389,6 +403,8 @@ func (reg *registry) PublishToOverlapping(span roachpb.Span, event *roachpb.Rang
minTS = t.Value.Timestamp
case *roachpb.RangeFeedSSTable:
minTS = t.WriteTS
case *roachpb.RangeFeedDeleteRange:
minTS = t.Timestamp
case *roachpb.RangeFeedCheckpoint:
// Always publish checkpoint notifications, regardless of a registration's
// starting timestamp.
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/rangefeed/resolved_timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ func (rts *resolvedTimestamp) consumeLogicalOp(op enginepb.MVCCLogicalOp) bool {
rts.assertOpAboveRTS(op, t.Timestamp)
return false

case *enginepb.MVCCDeleteRangeOp:
rts.assertOpAboveRTS(op, t.Timestamp)
return false

case *enginepb.MVCCWriteIntentOp:
rts.assertOpAboveRTS(op, t.Timestamp)
return rts.intentQ.IncRef(t.TxnID, t.TxnKey, t.TxnMinTimestamp, t.Timestamp)
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,8 @@ func (r *Replica) populatePrevValsInLogicalOpLogRaftMuLocked(
case *enginepb.MVCCWriteIntentOp,
*enginepb.MVCCUpdateIntentOp,
*enginepb.MVCCAbortIntentOp,
*enginepb.MVCCAbortTxnOp:
*enginepb.MVCCAbortTxnOp,
*enginepb.MVCCDeleteRangeOp:
// Nothing to do.
continue
default:
Expand Down Expand Up @@ -594,7 +595,8 @@ func (r *Replica) handleLogicalOpLogRaftMuLocked(
case *enginepb.MVCCWriteIntentOp,
*enginepb.MVCCUpdateIntentOp,
*enginepb.MVCCAbortIntentOp,
*enginepb.MVCCAbortTxnOp:
*enginepb.MVCCAbortTxnOp,
*enginepb.MVCCDeleteRangeOp:
// Nothing to do.
continue
default:
Expand Down
Loading