Skip to content

Commit

Permalink
kvserver: emit MVCC range tombstones over rangefeeds
Browse files Browse the repository at this point in the history
This patch adds MVCC range tombstone support to range feeds. Whenever an
MVCC range tombstone is written, e.g. a `DeleteRange` request with
`UseExperimentalRangeTombstone`, a new `MVCCDeleteRangeOp` logical op
type is recorded and emitted across the range feed as an
`RangeFeedDeleteRange` event.

MVCC range tombstones are under active development, and highly
experimental. They will only be used (and emitted) after checking
`storage.CanUseExperimentalMVCCRangeTombstones`, which requires a
version gate and environment variable to be enabled.

Changefeeds will emit an error for these events. We do not expect to see
these in online spans with changefeeds, since they are initially only
planned for use with schema GC and import rollbacks.

The rangefeed client library has been extended with support for these
events, but no existing callers handle them for the same reason as
changefeeds. Initial scans do not emit regular tombstones, and thus not
range tombstones either, but catchup scans will emit them if
encountered.

This only has rudimentary integration testing, as MVCC range tombstones
are still experimental and lack e.g. persistence and replication -- more
comprehensive tests will be implemented later.

Release note: None
  • Loading branch information
erikgrinaker committed Feb 22, 2022
1 parent e2ff5e3 commit 68f91c1
Show file tree
Hide file tree
Showing 17 changed files with 314 additions and 9 deletions.
14 changes: 14 additions & 0 deletions pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,20 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error {
// expect SST ingestion into spans with active changefeeds.
return errors.Errorf("unexpected SST ingestion: %v", t)

case *roachpb.RangeFeedDeleteRange:
// For now, we just error on MVCC range tombstones. These are currently
// only expected to be used by schema GC and IMPORT INTO, and such spans
// should not have active changefeeds across them.
//
// TODO(erikgrinaker): Write an end-to-end test which verifies that an
// IMPORT INTO which gets rolled back using MVCC range tombstones will
// not be visible to a changefeed, neither when started before the
// import or when resuming from a timestamp before the import. The table
// decriptor should be marked as offline during the import, and catchup
// scans should detect this and prevent reading anything in that
// timespan.
return errors.Errorf("unexpected MVCC range deletion: %v", t)

default:
return errors.Errorf("unexpected RangeFeedEvent variant %v", t)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamproducer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ func (s *eventStream) streamLoop(ctx context.Context, frontier *span.Frontier) e
}
default:
// TODO(yevgeniy): Handle SSTs.
// TODO(erikgrinaker): Handle DeleteRange events (MVCC range tombstones).
return errors.AssertionFailedf("unexpected event")
}
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/kv/kvclient/rangefeed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type config struct {
onCheckpoint OnCheckpoint
onFrontierAdvance OnFrontierAdvance
onSSTable OnSSTable
onDeleteRange OnDeleteRange
extraPProfLabels []string
}

Expand Down Expand Up @@ -178,6 +179,25 @@ func WithOnSSTable(f OnSSTable) Option {
})
}

// OnDeleteRange is called when an MVCC range tombstone is written (e.g. when
// DeleteRange is called with UseExperimentalRangeTombstone, but not when the
// range is deleted using point tombstones). If this callback is not provided,
// such range tombstones will be ignored.
//
// MVCC range tombstones are currently experimental, and disabled by default.
// They are only written when storage.CanUseExperimentalMVCCRangeTombstones() is
// true, and are only expected during certain operations like schema GC and
// IMPORT INTO (i.e. not across live tables).
type OnDeleteRange func(ctx context.Context, value *roachpb.RangeFeedDeleteRange)

// WithOnDeleteRange sets up a callback that's invoked whenever a MVCC range
// deletion tombstone is written.
func WithOnDeleteRange(f OnDeleteRange) Option {
return optionFunc(func(c *config) {
c.onDeleteRange = f
})
}

// OnFrontierAdvance is called when the rangefeed frontier is advanced with the
// new frontier timestamp.
type OnFrontierAdvance func(ctx context.Context, timestamp hlc.Timestamp)
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvclient/rangefeed/rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,10 @@ func (f *RangeFeed) processEvents(
"received unexpected rangefeed SST event with no OnSSTable handler")
}
f.onSSTable(ctx, ev.SST)
case ev.DelRange != nil:
if f.onDeleteRange != nil {
f.onDeleteRange(ctx, ev.DelRange)
}
case ev.Error != nil:
// Intentionally do nothing, we'll get an error returned from the
// call to RangeFeed.
Expand Down
112 changes: 112 additions & 0 deletions pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,118 @@ func TestWithOnSSTableCatchesUpIfNotSet(t *testing.T) {
require.Equal(t, expectKVs, seenKVs)
}

// TestWithDeleteRange tests that the rangefeed emits MVCC range tombstones.
func TestWithOnDeleteRange(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)
srv := tc.Server(0)
db := srv.DB()

_, _, err := tc.SplitRange(roachpb.Key("a"))
require.NoError(t, err)
require.NoError(t, tc.WaitForFullReplication())

_, err = tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.rangefeed.enabled = true")
require.NoError(t, err)
f, err := rangefeed.NewFactory(srv.Stopper(), db, srv.ClusterSettings(), nil)
require.NoError(t, err)

// We lay down a couple of range tombstones and points. The first range
// tombstone should not be visible, because initial scans do not emit
// tombstones. The second one should be visible, because catchup scans do emit
// tombstones. The range tombstone should be ordered after the initial point,
// but before the catchup point.
require.NoError(t, db.ExperimentalDelRangeUsingTombstone(ctx, "c", "d"))
require.NoError(t, db.Put(ctx, "foo", "initial"))
rangeFeedTS := db.Clock().Now()
require.NoError(t, db.ExperimentalDelRangeUsingTombstone(ctx, "d", "z"))
require.NoError(t, db.Put(ctx, "foo", "catchup"))

// We start the rangefeed over a narrower span than the DeleteRanges (c-g),
// to ensure the DeleteRange event is truncated to the registration span.
var once sync.Once
checkpointC := make(chan struct{})
deleteRangeC := make(chan *roachpb.RangeFeedDeleteRange)
rowC := make(chan *roachpb.RangeFeedValue)
spans := []roachpb.Span{{Key: roachpb.Key("c"), EndKey: roachpb.Key("g")}}
r, err := f.RangeFeed(ctx, "test", spans, rangeFeedTS,
func(ctx context.Context, e *roachpb.RangeFeedValue) {
select {
case rowC <- e:
case <-ctx.Done():
}
},
rangefeed.WithInitialScan(nil),
rangefeed.WithOnCheckpoint(func(ctx context.Context, checkpoint *roachpb.RangeFeedCheckpoint) {
once.Do(func() {
close(checkpointC)
})
}),
rangefeed.WithOnDeleteRange(func(ctx context.Context, e *roachpb.RangeFeedDeleteRange) {
select {
case deleteRangeC <- e:
case <-ctx.Done():
}
}),
)
require.NoError(t, err)
defer r.Close()

// Wait for initial scan. We should see the foo=initial point, but not the
// range tombstone.
select {
case e := <-rowC:
require.Equal(t, roachpb.Key("foo"), e.Key)
value, err := e.Value.GetBytes()
require.NoError(t, err)
require.Equal(t, "initial", string(value))
case <-time.After(3 * time.Second):
require.Fail(t, "timed out waiting for DeleteRange event")
}

// Wait for catchup scan. We should see the second range tombstone, truncated
// to the rangefeed bounds (c-g), and it should be ordered before the point
// foo=catchup.
select {
case e := <-deleteRangeC:
require.Equal(t, roachpb.Span{Key: roachpb.Key("d"), EndKey: roachpb.Key("g")}, e.Span)
require.NotEmpty(t, e.Timestamp)
case <-time.After(3 * time.Second):
require.Fail(t, "timed out waiting for DeleteRange event")
}

select {
case e := <-rowC:
require.Equal(t, roachpb.Key("foo"), e.Key)
value, err := e.Value.GetBytes()
require.NoError(t, err)
require.Equal(t, "catchup", string(value))
case <-time.After(3 * time.Second):
require.Fail(t, "timed out waiting for DeleteRange event")
}

// Wait for checkpoint after catchup scan.
select {
case <-checkpointC:
case <-time.After(3 * time.Second):
require.Fail(t, "timed out waiting for checkpoint")
}

// Send another DeleteRange, and wait for the rangefeed event. This should
// be truncated to the rangefeed bounds (c-g).
require.NoError(t, db.ExperimentalDelRangeUsingTombstone(ctx, "a", "z"))
select {
case e := <-deleteRangeC:
require.Equal(t, roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("g")}, e.Span)
require.NotEmpty(t, e.Timestamp)
case <-time.After(3 * time.Second):
require.Fail(t, "timed out waiting for DeleteRange event")
}
}

// TestUnrecoverableErrors verifies that unrecoverable internal errors are surfaced
// to callers.
func TestUnrecoverableErrors(t *testing.T) {
Expand Down
63 changes: 62 additions & 1 deletion pkg/kv/kvserver/rangefeed/catchup_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import (
// A CatchUpIterator is an iterator for catchUp-scans.
type CatchUpIterator struct {
storage.SimpleMVCCIterator
close func()
rangeKeyIter *storage.MVCCRangeKeyIterator
close func()
}

// NewCatchUpIterator returns a CatchUpIterator for the given Reader.
Expand Down Expand Up @@ -68,13 +69,22 @@ func NewCatchUpIterator(
})
}

ret.rangeKeyIter = storage.NewMVCCRangeKeyIterator(reader, storage.MVCCRangeKeyIterOptions{
LowerBound: args.Span.Key,
UpperBound: args.Span.EndKey,
MinTimestamp: args.Timestamp,
})

return ret
}

// Close closes the iterator and calls the instantiator-supplied close
// callback.
func (i *CatchUpIterator) Close() {
i.SimpleMVCCIterator.Close()
if i.rangeKeyIter != nil { // can be nil in tests
i.rangeKeyIter.Close()
}
if i.close != nil {
i.close()
}
Expand All @@ -94,6 +104,23 @@ func (i *CatchUpIterator) CatchUpScan(
withDiff bool,
outputFn outputEventFn,
) error {
// We emit any MVCC range tombstones first, so that the consumer can use them
// to potentially discard point keys below them if they wish.
//
// Note that MVCC range tombstones are currently experimental, and will only
// be used when storage.CanUseExperimentalMVCCRangeTombstones() is enabled.
//
// TODO(erikgrinaker): We don't pass in startKey, endKey, and catchUpTimestamp
// here because we've already set the appropriate bounds when the iterator was
// constructed. This assumes that the caller always passes in the same bounds
// to CatchUpScan() as it does to NewCatchUpIterator(), otherwise this will
// emit incorrect events. Verify that this is always the case, and remove the
// redundant parameters to this method as it is unsafe to pass other values.
err := i.catchUpScanRangeTombstones(outputFn)
if err != nil {
return err
}

var a bufalloc.ByteAllocator
// MVCCIterator will encounter historical values for each key in
// reverse-chronological order. To output in chronological order, store
Expand Down Expand Up @@ -242,3 +269,37 @@ func (i *CatchUpIterator) CatchUpScan(
// Output events for the last key encountered.
return outputEvents()
}

// catchupScanRangeTombstones runs a catchup scan for range tombstones.
func (i *CatchUpIterator) catchUpScanRangeTombstones(outputFn outputEventFn) error {
if i.rangeKeyIter == nil { // can be nil in tests
return nil
}

for {
if ok, err := i.rangeKeyIter.Valid(); err != nil {
return err
} else if !ok {
break
}

rangeKey, value := i.rangeKeyIter.Key(), i.rangeKeyIter.Value()
if len(value) > 0 {
// We currently expect all range keys to be tombstones.
return errors.AssertionFailedf("unexpected non-tombstone range key %s with value %x",
rangeKey, value)
}

err := outputFn(&roachpb.RangeFeedEvent{
DelRange: &roachpb.RangeFeedDeleteRange{
Span: roachpb.Span{Key: rangeKey.StartKey, EndKey: rangeKey.EndKey},
Timestamp: rangeKey.Timestamp,
},
})
if err != nil {
return err
}
i.rangeKeyIter.Next()
}
return nil
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func setupMVCCPebble(b testing.TB, dir string, lBaseMaxBytes int64, readOnly boo
opts.FS = vfs.Default
opts.LBaseMaxBytes = lBaseMaxBytes
opts.ReadOnly = readOnly
opts.FormatMajorVersion = pebble.FormatBlockPropertyCollector
opts.FormatMajorVersion = pebble.FormatRangeKeys
peb, err := storage.NewPebble(
context.Background(),
storage.PebbleConfig{
Expand Down
20 changes: 20 additions & 0 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,10 @@ func (p *Processor) consumeLogicalOps(ctx context.Context, ops []enginepb.MVCCLo
// Publish the new value directly.
p.publishValue(ctx, t.Key, t.Timestamp, t.Value, t.PrevValue)

case *enginepb.MVCCDeleteRangeOp:
// Publish the range deletion directly.
p.publishDeleteRange(ctx, t.StartKey, t.EndKey, t.Timestamp)

case *enginepb.MVCCWriteIntentOp:
// No updates to publish.

Expand Down Expand Up @@ -698,6 +702,22 @@ func (p *Processor) publishValue(
p.reg.PublishToOverlapping(roachpb.Span{Key: key}, &event)
}

func (p *Processor) publishDeleteRange(
ctx context.Context, startKey, endKey roachpb.Key, timestamp hlc.Timestamp,
) {
span := roachpb.Span{Key: startKey, EndKey: endKey}
if !p.Span.ContainsKeyRange(roachpb.RKey(startKey), roachpb.RKey(endKey)) {
log.Fatalf(ctx, "span %s not in Processor's key range %v", span, p.Span)
}

var event roachpb.RangeFeedEvent
event.MustSetValue(&roachpb.RangeFeedDeleteRange{
Span: span,
Timestamp: timestamp,
})
p.reg.PublishToOverlapping(span, &event)
}

func (p *Processor) publishSSTable(
ctx context.Context, sst []byte, sstSpan roachpb.Span, sstWTS hlc.Timestamp,
) {
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ func (r *registration) validateEvent(event *roachpb.RangeFeedEvent) {
if t.WriteTS.IsEmpty() {
panic(fmt.Sprintf("unexpected empty RangeFeedSSTable.Timestamp: %v", t))
}
case *roachpb.RangeFeedDeleteRange:
if len(t.Span.Key) == 0 || len(t.Span.EndKey) == 0 {
panic(fmt.Sprintf("unexpected empty key in RangeFeedDeleteRange.Span: %v", t))
}
if t.Timestamp.IsEmpty() {
panic(fmt.Sprintf("unexpected empty RangeFeedDeleteRange.Timestamp: %v", t))
}
default:
panic(fmt.Sprintf("unexpected RangeFeedEvent variant: %v", t))
}
Expand Down Expand Up @@ -215,6 +222,13 @@ func (r *registration) maybeStripEvent(event *roachpb.RangeFeedEvent) *roachpb.R
t = copyOnWrite().(*roachpb.RangeFeedCheckpoint)
t.Span = r.span
}
case *roachpb.RangeFeedDeleteRange:
// Truncate the range tombstone to the registration bounds.
if i := t.Span.Intersect(r.span); !i.Equal(t.Span) {
t = copyOnWrite().(*roachpb.RangeFeedDeleteRange)
t.Span = i.Clone()
}

case *roachpb.RangeFeedSSTable:
// SSTs are always sent in their entirety, it is up to the caller to
// filter out irrelevant entries.
Expand Down Expand Up @@ -389,6 +403,8 @@ func (reg *registry) PublishToOverlapping(span roachpb.Span, event *roachpb.Rang
minTS = t.Value.Timestamp
case *roachpb.RangeFeedSSTable:
minTS = t.WriteTS
case *roachpb.RangeFeedDeleteRange:
minTS = t.Timestamp
case *roachpb.RangeFeedCheckpoint:
// Always publish checkpoint notifications, regardless of a registration's
// starting timestamp.
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/rangefeed/resolved_timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ func (rts *resolvedTimestamp) consumeLogicalOp(op enginepb.MVCCLogicalOp) bool {
rts.assertOpAboveRTS(op, t.Timestamp)
return false

case *enginepb.MVCCDeleteRangeOp:
rts.assertOpAboveRTS(op, t.Timestamp)
return false

case *enginepb.MVCCWriteIntentOp:
rts.assertOpAboveRTS(op, t.Timestamp)
return rts.intentQ.IncRef(t.TxnID, t.TxnKey, t.TxnMinTimestamp, t.Timestamp)
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,8 @@ func (r *Replica) populatePrevValsInLogicalOpLogRaftMuLocked(
case *enginepb.MVCCWriteIntentOp,
*enginepb.MVCCUpdateIntentOp,
*enginepb.MVCCAbortIntentOp,
*enginepb.MVCCAbortTxnOp:
*enginepb.MVCCAbortTxnOp,
*enginepb.MVCCDeleteRangeOp:
// Nothing to do.
continue
default:
Expand Down Expand Up @@ -594,7 +595,8 @@ func (r *Replica) handleLogicalOpLogRaftMuLocked(
case *enginepb.MVCCWriteIntentOp,
*enginepb.MVCCUpdateIntentOp,
*enginepb.MVCCAbortIntentOp,
*enginepb.MVCCAbortTxnOp:
*enginepb.MVCCAbortTxnOp,
*enginepb.MVCCDeleteRangeOp:
// Nothing to do.
continue
default:
Expand Down
Loading

0 comments on commit 68f91c1

Please sign in to comment.