From 566dff233d94aa18e2f9e53cdf888adf9ee22d53 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 11 Feb 2022 16:10:52 +0000 Subject: [PATCH] kvserver: emit MVCC range tombstones over rangefeeds This patch adds MVCC range tombstone support in rangefeeds. Whenever an MVCC range tombstone is written, a new `MVCCDeleteRangeOp` logical op is recorded and emitted across the rangefeed as a `RangeFeedDeleteRange` event. MVCC range tombstones will only be written when the `MVCCRangeTombstones` version gate has been enabled. Changefeeds will emit an error for these events. We do not expect to see these in online spans with changefeeds, since they are initially only planned for use with schema GC and import rollbacks. The rangefeed client library has been extended with support for these events, but no existing callers handle them for the same reason as changefeeds. Initial scans do not emit regular tombstones, and thus not range tombstones either, but catchup scans will emit them if encountered. This patch has rudimentary testing of MVCC range tombstones in rangefeeds. A later patch will add a data-driven test harness for rangefeeds with more exhaustive tests. Release note: None --- .../changefeedccl/kvfeed/physical_kv_feed.go | 15 ++ .../streamproducer/event_stream.go | 1 + pkg/kv/kvclient/rangefeed/BUILD.bazel | 1 + pkg/kv/kvclient/rangefeed/config.go | 19 +++ pkg/kv/kvclient/rangefeed/rangefeed.go | 6 + .../rangefeed/rangefeed_external_test.go | 157 ++++++++++++++++++ pkg/kv/kvserver/rangefeed/catchup_scan.go | 86 ++++++++-- .../rangefeed/catchup_scan_bench_test.go | 2 +- .../kvserver/rangefeed/catchup_scan_test.go | 5 + pkg/kv/kvserver/rangefeed/processor.go | 23 +++ pkg/kv/kvserver/rangefeed/registry.go | 15 ++ .../kvserver/rangefeed/resolved_timestamp.go | 4 + pkg/kv/kvserver/rangefeed/task_test.go | 7 +- pkg/kv/kvserver/replica_rangefeed.go | 6 +- pkg/roachpb/api.go | 3 + pkg/roachpb/api.proto | 17 +- pkg/roachpb/data.go | 5 + pkg/storage/enginepb/mvcc3.proto | 10 ++ pkg/storage/mvcc.go | 21 ++- pkg/storage/mvcc_key.go | 26 ++- pkg/storage/mvcc_key_test.go | 40 ++++- pkg/storage/mvcc_logical_ops.go | 13 ++ 22 files changed, 450 insertions(+), 32 deletions(-) diff --git a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go index 0b8018894d61..142a9f700a88 100644 --- a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go @@ -123,6 +123,21 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error { // expect SST ingestion into spans with active changefeeds. return errors.Errorf("unexpected SST ingestion: %v", t) + case *roachpb.RangeFeedDeleteRange: + // For now, we just error on MVCC range tombstones. These are currently + // only expected to be used by schema GC and IMPORT INTO, and such spans + // should not have active changefeeds across them. + // + // TODO(erikgrinaker): Write an end-to-end test which verifies that an + // IMPORT INTO which gets rolled back using MVCC range tombstones will + // not be visible to a changefeed, neither when it was started before + // the import or when resuming from a timestamp before the import. The + // table decriptor should be marked as offline during the import, and + // catchup scans should detect that this happened and prevent reading + // anything in that timespan. See: + // https://github.com/cockroachdb/cockroach/issues/70433 + return errors.Errorf("unexpected MVCC range deletion: %v", t) + default: return errors.Errorf("unexpected RangeFeedEvent variant %v", t) } diff --git a/pkg/ccl/streamingccl/streamproducer/event_stream.go b/pkg/ccl/streamingccl/streamproducer/event_stream.go index 5c8929b848cf..fda74cf22210 100644 --- a/pkg/ccl/streamingccl/streamproducer/event_stream.go +++ b/pkg/ccl/streamingccl/streamproducer/event_stream.go @@ -472,6 +472,7 @@ func (s *eventStream) streamLoop(ctx context.Context, frontier *span.Frontier) e return err } default: + // TODO(erikgrinaker): Handle DeleteRange events (MVCC range tombstones). return errors.AssertionFailedf("unexpected event") } } diff --git a/pkg/kv/kvclient/rangefeed/BUILD.bazel b/pkg/kv/kvclient/rangefeed/BUILD.bazel index 1e4cffb02cda..888d3cd3f275 100644 --- a/pkg/kv/kvclient/rangefeed/BUILD.bazel +++ b/pkg/kv/kvclient/rangefeed/BUILD.bazel @@ -73,6 +73,7 @@ go_test( "//pkg/spanconfig", "//pkg/spanconfig/spanconfigptsreader", "//pkg/sql/catalog/desctestutils", + "//pkg/storage", "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", diff --git a/pkg/kv/kvclient/rangefeed/config.go b/pkg/kv/kvclient/rangefeed/config.go index 56a0e235730a..d7f0101a8ec1 100644 --- a/pkg/kv/kvclient/rangefeed/config.go +++ b/pkg/kv/kvclient/rangefeed/config.go @@ -41,6 +41,7 @@ type config struct { onCheckpoint OnCheckpoint onFrontierAdvance OnFrontierAdvance onSSTable OnSSTable + onDeleteRange OnDeleteRange extraPProfLabels []string } @@ -178,6 +179,24 @@ func WithOnSSTable(f OnSSTable) Option { }) } +// OnDeleteRange is called when an MVCC range tombstone is written (e.g. when +// DeleteRange is called with UseExperimentalRangeTombstone, but not when the +// range is deleted using point tombstones). If this callback is not provided, +// an error is emitted when these are encountered. +// +// MVCC range tombstones are currently experimental, and requires the +// MVCCRangeTombstones version gate. They are only expected during certain +// operations like schema GC and IMPORT INTO (i.e. not across live tables). +type OnDeleteRange func(ctx context.Context, value *roachpb.RangeFeedDeleteRange) + +// WithOnDeleteRange sets up a callback that's invoked whenever an MVCC range +// deletion tombstone is written. +func WithOnDeleteRange(f OnDeleteRange) Option { + return optionFunc(func(c *config) { + c.onDeleteRange = f + }) +} + // OnFrontierAdvance is called when the rangefeed frontier is advanced with the // new frontier timestamp. type OnFrontierAdvance func(ctx context.Context, timestamp hlc.Timestamp) diff --git a/pkg/kv/kvclient/rangefeed/rangefeed.go b/pkg/kv/kvclient/rangefeed/rangefeed.go index 950bc5610389..f039e0569f47 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed.go @@ -357,6 +357,12 @@ func (f *RangeFeed) processEvents( "received unexpected rangefeed SST event with no OnSSTable handler") } f.onSSTable(ctx, ev.SST) + case ev.DeleteRange != nil: + if f.onDeleteRange == nil { + return errors.AssertionFailedf( + "received unexpected rangefeed DeleteRange event with no OnDeleteRange handler: %s", ev) + } + f.onDeleteRange(ctx, ev.DeleteRange) case ev.Error != nil: // Intentionally do nothing, we'll get an error returned from the // call to RangeFeed. diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go index e1fc934bf71f..e9c9673b6082 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/sstutil" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" @@ -615,6 +616,162 @@ func TestWithOnSSTableCatchesUpIfNotSet(t *testing.T) { require.Equal(t, expectKVs, seenKVs) } +// TestWithOnDeleteRange tests that the rangefeed emits MVCC range tombstones. +// +// TODO(erikgrinaker): These kinds of tests should really use a data-driven test +// harness, for more exhaustive testing. But it'll do for now. +func TestWithOnDeleteRange(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + srv := tc.Server(0) + db := srv.DB() + + _, _, err := tc.SplitRange(roachpb.Key("a")) + require.NoError(t, err) + require.NoError(t, tc.WaitForFullReplication()) + + _, err = tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.rangefeed.enabled = true") + require.NoError(t, err) + f, err := rangefeed.NewFactory(srv.Stopper(), db, srv.ClusterSettings(), nil) + require.NoError(t, err) + + // We lay down a few MVCC range tombstones and points. The first range + // tombstone should not be visible, because initial scans do not emit + // tombstones, nor should the points covered by it. The second range tombstone + // should be visible, because catchup scans do emit tombstones. The range + // tombstone should be ordered after the initial point, but before the foo + // catchup point, and the previous values should respect the range tombstones. + require.NoError(t, db.Put(ctx, "covered", "covered")) + require.NoError(t, db.Put(ctx, "foo", "covered")) + require.NoError(t, db.ExperimentalDelRangeUsingTombstone(ctx, "a", "z")) + require.NoError(t, db.Put(ctx, "foo", "initial")) + rangeFeedTS := db.Clock().Now() + require.NoError(t, db.Put(ctx, "covered", "catchup")) + require.NoError(t, db.ExperimentalDelRangeUsingTombstone(ctx, "a", "z")) + require.NoError(t, db.Put(ctx, "foo", "catchup")) + + // We start the rangefeed over a narrower span than the DeleteRanges (c-g), + // to ensure the DeleteRange event is truncated to the registration span. + var checkpointOnce sync.Once + checkpointC := make(chan struct{}) + deleteRangeC := make(chan *roachpb.RangeFeedDeleteRange) + rowC := make(chan *roachpb.RangeFeedValue) + + spans := []roachpb.Span{{Key: roachpb.Key("c"), EndKey: roachpb.Key("g")}} + r, err := f.RangeFeed(ctx, "test", spans, rangeFeedTS, + func(ctx context.Context, e *roachpb.RangeFeedValue) { + select { + case rowC <- e: + case <-ctx.Done(): + } + }, + rangefeed.WithDiff(true), + rangefeed.WithInitialScan(nil), + rangefeed.WithOnCheckpoint(func(ctx context.Context, checkpoint *roachpb.RangeFeedCheckpoint) { + checkpointOnce.Do(func() { + close(checkpointC) + }) + }), + rangefeed.WithOnDeleteRange(func(ctx context.Context, e *roachpb.RangeFeedDeleteRange) { + select { + case deleteRangeC <- e: + case <-ctx.Done(): + } + }), + ) + require.NoError(t, err) + defer r.Close() + + // Wait for initial scan. We should see the foo=initial point, but not the + // range tombstone nor the covered points. + select { + case e := <-rowC: + require.Equal(t, roachpb.Key("foo"), e.Key) + value, err := e.Value.GetBytes() + require.NoError(t, err) + require.Equal(t, "initial", string(value)) + prevValue, err := e.PrevValue.GetBytes() + require.NoError(t, err) + require.Equal(t, "initial", string(prevValue)) // initial scans supply current as prev + case <-time.After(3 * time.Second): + require.Fail(t, "timed out waiting for initial scan event") + } + + // Wait for catchup scan. We should see the second range tombstone, truncated + // to the rangefeed bounds (c-g), and it should be ordered before the points + // covered=catchup and foo=catchup. both points should have a tombstone as the + // previous value. + select { + case e := <-deleteRangeC: + require.Equal(t, roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("g")}, e.Span) + require.NotEmpty(t, e.Timestamp) + case <-time.After(3 * time.Second): + require.Fail(t, "timed out waiting for DeleteRange event") + } + + select { + case e := <-rowC: + require.Equal(t, roachpb.Key("covered"), e.Key) + value, err := e.Value.GetBytes() + require.NoError(t, err) + require.Equal(t, "catchup", string(value)) + prevValue, err := storage.DecodeMVCCValue(e.PrevValue.RawBytes) + require.NoError(t, err) + require.True(t, prevValue.IsTombstone()) + case <-time.After(3 * time.Second): + require.Fail(t, "timed out waiting for foo=catchup event") + } + + select { + case e := <-rowC: + require.Equal(t, roachpb.Key("foo"), e.Key) + value, err := e.Value.GetBytes() + require.NoError(t, err) + require.Equal(t, "catchup", string(value)) + prevValue, err := storage.DecodeMVCCValue(e.PrevValue.RawBytes) + require.NoError(t, err) + require.True(t, prevValue.IsTombstone()) + case <-time.After(3 * time.Second): + require.Fail(t, "timed out waiting for foo=catchup event") + } + + // Wait for checkpoint after catchup scan. + select { + case <-checkpointC: + case <-time.After(3 * time.Second): + require.Fail(t, "timed out waiting for checkpoint") + } + + // Send another DeleteRange, and wait for the rangefeed event. This should + // be truncated to the rangefeed bounds (c-g). + require.NoError(t, db.ExperimentalDelRangeUsingTombstone(ctx, "a", "z")) + select { + case e := <-deleteRangeC: + require.Equal(t, roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("g")}, e.Span) + require.NotEmpty(t, e.Timestamp) + case <-time.After(3 * time.Second): + require.Fail(t, "timed out waiting for DeleteRange event") + } + + // A final point write should be emitted with a tombstone as the previous value. + require.NoError(t, db.Put(ctx, "foo", "final")) + select { + case e := <-rowC: + require.Equal(t, roachpb.Key("foo"), e.Key) + value, err := e.Value.GetBytes() + require.NoError(t, err) + require.Equal(t, "final", string(value)) + prevValue, err := storage.DecodeMVCCValue(e.PrevValue.RawBytes) + require.NoError(t, err) + require.True(t, prevValue.IsTombstone()) + case <-time.After(3 * time.Second): + require.Fail(t, "timed out waiting for foo=final event") + } +} + // TestUnrecoverableErrors verifies that unrecoverable internal errors are surfaced // to callers. func TestUnrecoverableErrors(t *testing.T) { diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan.go b/pkg/kv/kvserver/rangefeed/catchup_scan.go index 622e3361bb95..f64258f941ab 100644 --- a/pkg/kv/kvserver/rangefeed/catchup_scan.go +++ b/pkg/kv/kvserver/rangefeed/catchup_scan.go @@ -65,6 +65,8 @@ func NewCatchUpIterator( return &CatchUpIterator{ simpleCatchupIter: storage.NewMVCCIncrementalIterator(reader, storage.MVCCIncrementalIterOptions{ + KeyTypes: storage.IterKeyTypePointsAndRanges, + StartKey: span.Key, EndKey: span.EndKey, StartTime: startTime, EndTime: hlc.MaxTimestamp, @@ -96,6 +98,15 @@ type outputEventFn func(e *roachpb.RangeFeedEvent) error // CatchUpScan iterates over all changes in the configured key/time span, and // emits them as RangeFeedEvents via outputFn in chronological order. +// +// MVCC range tombstones are emitted at their start key, in chronological order. +// Because the start key itself is not timestamped, these will be ordered before +// all of the timestamped point keys that they overlap. For more details, see +// MVCC range key info on storage.SimpleMVCCIterator. +// +// For example, with MVCC range tombstones [a-f)@5 and [a-f)@3 overlapping point +// keys a@6, a@4, and b@2, the emitted order is [a-f)@3,[a-f)@5,a@4,a@6,b@2 because +// the start key "a" is ordered before all of the timestamped point keys. func (i *CatchUpIterator) CatchUpScan(outputFn outputEventFn, withDiff bool) error { var a bufalloc.ByteAllocator // MVCCIterator will encounter historical values for each key in @@ -103,18 +114,7 @@ func (i *CatchUpIterator) CatchUpScan(outputFn outputEventFn, withDiff bool) err // events for the same key until a different key is encountered, then output // the encountered values in reverse. This also allows us to buffer events // as we fill in previous values. - var lastKey roachpb.Key reorderBuf := make([]roachpb.RangeFeedEvent, 0, 5) - addPrevToLastEvent := func(val []byte) { - if l := len(reorderBuf); l > 0 { - if reorderBuf[l-1].Val.PrevValue.IsPresent() { - panic("RangeFeedValue.PrevVal unexpectedly set") - } - // TODO(sumeer): find out if it is deliberate that we are not populating - // PrevValue.Timestamp. - reorderBuf[l-1].Val.PrevValue.RawBytes = val - } - } outputEvents := func() error { for i := len(reorderBuf) - 1; i >= 0; i-- { @@ -130,7 +130,9 @@ func (i *CatchUpIterator) CatchUpScan(outputFn outputEventFn, withDiff bool) err // Iterate though all keys using Next. We want to publish all committed // versions of each key that are after the registration's startTS, so we // can't use NextKey. + var lastKey roachpb.Key var meta enginepb.MVCCMetadata + var rangeKeysStart roachpb.Key i.SeekGE(storage.MVCCKey{Key: i.span.Key}) for { if ok, err := i.Valid(); err != nil { @@ -139,6 +141,45 @@ func (i *CatchUpIterator) CatchUpScan(outputFn outputEventFn, withDiff bool) err break } + hasPoint, hasRange := i.HasPointAndRange() + + // Emit any new MVCC range tombstones when their start key is encountered. + // Range keys can currently only be MVCC range tombstones. + // + // TODO(erikgrinaker): Find a faster/better way to detect range key changes + // that doesn't involve constant comparisons. Pebble probably already knows, + // we just need a way to ask it. + if hasRange { + if rangeBounds := i.RangeBounds(); !rangeBounds.Key.Equal(rangeKeysStart) { + rangeKeysStart = append(rangeKeysStart[:0], rangeBounds.Key...) + + // Emit events for these MVCC range tombstones, in chronological order. + rangeKeys := i.RangeKeys() + for i := len(rangeKeys) - 1; i >= 0; i-- { + var span roachpb.Span + a, span.Key = a.Copy(rangeBounds.Key, 0) + a, span.EndKey = a.Copy(rangeBounds.EndKey, 0) + err := outputFn(&roachpb.RangeFeedEvent{ + DeleteRange: &roachpb.RangeFeedDeleteRange{ + Span: span, + Timestamp: rangeKeys[i].RangeKey.Timestamp, + }, + }) + if err != nil { + return err + } + } + } + } + + // If there's no point key here (i.e. we found a bare range key above), then + // step onto the next key. This may be a point key version at the same key + // as the range key's start bound, or a later point/range key. + if !hasPoint { + i.Next() + continue + } + unsafeKey := i.UnsafeKey() unsafeValRaw := i.UnsafeValue() if !unsafeKey.IsValue() { @@ -217,9 +258,26 @@ func (i *CatchUpIterator) CatchUpScan(outputFn outputEventFn, withDiff bool) err var val []byte a, val = a.Copy(unsafeVal, 0) if withDiff { - // Update the last version with its - // previous value (this version). - addPrevToLastEvent(val) + // Update the last version with its previous value (this version). + if l := len(reorderBuf) - 1; l >= 0 { + if reorderBuf[l].Val.PrevValue.IsPresent() { + return errors.AssertionFailedf("unexpected previous value %s for key %s", + reorderBuf[l].Val.PrevValue, key) + } + // If an MVCC range tombstone exists between this value and the next + // one, we don't emit the value after all -- it should be a tombstone. + // + // TODO(erikgrinaker): We can't save range keys when we detect changes + // to rangeKeysStart above, because NextIgnoringTime() could reveal + // additional MVCC range tombstones below StartTime that cover this + // point. We need to find a more performant way to handle this. + if !hasRange || !storage.HasRangeKeyBetween( + i.RangeKeys(), reorderBuf[l].Val.Value.Timestamp, ts) { + // TODO(sumeer): find out if it is deliberate that we are not populating + // PrevValue.Timestamp. + reorderBuf[l].Val.PrevValue.RawBytes = val + } + } } if !ignore { diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go b/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go index 568fcb2edf8a..15eddbe29298 100644 --- a/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go +++ b/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go @@ -175,7 +175,7 @@ func setupMVCCPebble(b testing.TB, dir string, lBaseMaxBytes int64, readOnly boo opts.FS = vfs.Default opts.LBaseMaxBytes = lBaseMaxBytes opts.ReadOnly = readOnly - opts.FormatMajorVersion = pebble.FormatBlockPropertyCollector + opts.FormatMajorVersion = pebble.FormatRangeKeys peb, err := storage.NewPebble( context.Background(), storage.PebbleConfig{ diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan_test.go b/pkg/kv/kvserver/rangefeed/catchup_scan_test.go index 9d9c78e0ac13..bf0b7ff6689a 100644 --- a/pkg/kv/kvserver/rangefeed/catchup_scan_test.go +++ b/pkg/kv/kvserver/rangefeed/catchup_scan_test.go @@ -26,6 +26,11 @@ import ( "github.com/stretchr/testify/require" ) +// TODO(erikgrinaker): This should be migrated to a data-driven test harness for +// end-to-end rangefeed testing, with more exhaustive test cases. See: +// https://github.com/cockroachdb/cockroach/issues/82715 +// +// For now, see rangefeed_external_test.go for rudimentary range key tests. func TestCatchupScan(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 1f2e8d55a1e0..3bd0b58bf7c7 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -700,6 +700,10 @@ func (p *Processor) consumeLogicalOps( // Publish the new value directly. p.publishValue(ctx, t.Key, t.Timestamp, t.Value, t.PrevValue, allocation) + case *enginepb.MVCCDeleteRangeOp: + // Publish the range deletion directly. + p.publishDeleteRange(ctx, t.StartKey, t.EndKey, t.Timestamp, allocation) + case *enginepb.MVCCWriteIntentOp: // No updates to publish. @@ -777,6 +781,25 @@ func (p *Processor) publishValue( p.reg.PublishToOverlapping(ctx, roachpb.Span{Key: key}, &event, allocation) } +func (p *Processor) publishDeleteRange( + ctx context.Context, + startKey, endKey roachpb.Key, + timestamp hlc.Timestamp, + allocation *SharedBudgetAllocation, +) { + span := roachpb.Span{Key: startKey, EndKey: endKey} + if !p.Span.ContainsKeyRange(roachpb.RKey(startKey), roachpb.RKey(endKey)) { + log.Fatalf(ctx, "span %s not in Processor's key range %v", span, p.Span) + } + + var event roachpb.RangeFeedEvent + event.MustSetValue(&roachpb.RangeFeedDeleteRange{ + Span: span, + Timestamp: timestamp, + }) + p.reg.PublishToOverlapping(ctx, span, &event, allocation) +} + func (p *Processor) publishSSTable( ctx context.Context, sst []byte, diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index 09a7c2005b85..a945160e95f6 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -198,6 +198,13 @@ func (r *registration) validateEvent(event *roachpb.RangeFeedEvent) { if t.WriteTS.IsEmpty() { panic(fmt.Sprintf("unexpected empty RangeFeedSSTable.Timestamp: %v", t)) } + case *roachpb.RangeFeedDeleteRange: + if len(t.Span.Key) == 0 || len(t.Span.EndKey) == 0 { + panic(fmt.Sprintf("unexpected empty key in RangeFeedDeleteRange.Span: %v", t)) + } + if t.Timestamp.IsEmpty() { + panic(fmt.Sprintf("unexpected empty RangeFeedDeleteRange.Timestamp: %v", t)) + } default: panic(fmt.Sprintf("unexpected RangeFeedEvent variant: %v", t)) } @@ -244,6 +251,12 @@ func (r *registration) maybeStripEvent(event *roachpb.RangeFeedEvent) *roachpb.R t = copyOnWrite().(*roachpb.RangeFeedCheckpoint) t.Span = r.span } + case *roachpb.RangeFeedDeleteRange: + // Truncate the range tombstone to the registration bounds. + if i := t.Span.Intersect(r.span); !i.Equal(t.Span) { + t = copyOnWrite().(*roachpb.RangeFeedDeleteRange) + t.Span = i.Clone() + } case *roachpb.RangeFeedSSTable: // SSTs are always sent in their entirety, it is up to the caller to // filter out irrelevant entries. @@ -440,6 +453,8 @@ func (reg *registry) PublishToOverlapping( minTS = t.Value.Timestamp case *roachpb.RangeFeedSSTable: minTS = t.WriteTS + case *roachpb.RangeFeedDeleteRange: + minTS = t.Timestamp case *roachpb.RangeFeedCheckpoint: // Always publish checkpoint notifications, regardless of a registration's // starting timestamp. diff --git a/pkg/kv/kvserver/rangefeed/resolved_timestamp.go b/pkg/kv/kvserver/rangefeed/resolved_timestamp.go index 0119b735bc20..4b7e6dd6f743 100644 --- a/pkg/kv/kvserver/rangefeed/resolved_timestamp.go +++ b/pkg/kv/kvserver/rangefeed/resolved_timestamp.go @@ -142,6 +142,10 @@ func (rts *resolvedTimestamp) consumeLogicalOp(op enginepb.MVCCLogicalOp) bool { rts.assertOpAboveRTS(op, t.Timestamp) return false + case *enginepb.MVCCDeleteRangeOp: + rts.assertOpAboveRTS(op, t.Timestamp) + return false + case *enginepb.MVCCWriteIntentOp: rts.assertOpAboveRTS(op, t.Timestamp) return rts.intentQ.IncRef(t.TxnID, t.TxnKey, t.TxnMinTimestamp, t.Timestamp) diff --git a/pkg/kv/kvserver/rangefeed/task_test.go b/pkg/kv/kvserver/rangefeed/task_test.go index f3d2b7c1d773..e236c10ebec4 100644 --- a/pkg/kv/kvserver/rangefeed/task_test.go +++ b/pkg/kv/kvserver/rangefeed/task_test.go @@ -196,17 +196,18 @@ func (s *testIterator) curKV() storage.MVCCKeyValue { // HasPointAndRange implements SimpleMVCCIterator. func (s *testIterator) HasPointAndRange() (bool, bool) { - panic("not implemented") + ok, err := s.Valid() + return ok && err == nil, false } // RangeBounds implements SimpleMVCCIterator. func (s *testIterator) RangeBounds() roachpb.Span { - panic("not implemented") + return roachpb.Span{} } // RangeTombstones implements SimpleMVCCIterator. func (s *testIterator) RangeKeys() []storage.MVCCRangeKeyValue { - panic("not implemented") + return []storage.MVCCRangeKeyValue{} } func TestInitResolvedTSScan(t *testing.T) { diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 88e4013878d5..c3b62db37ef2 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -513,7 +513,8 @@ func (r *Replica) populatePrevValsInLogicalOpLogRaftMuLocked( case *enginepb.MVCCWriteIntentOp, *enginepb.MVCCUpdateIntentOp, *enginepb.MVCCAbortIntentOp, - *enginepb.MVCCAbortTxnOp: + *enginepb.MVCCAbortTxnOp, + *enginepb.MVCCDeleteRangeOp: // Nothing to do. continue default: @@ -587,7 +588,8 @@ func (r *Replica) handleLogicalOpLogRaftMuLocked( case *enginepb.MVCCWriteIntentOp, *enginepb.MVCCUpdateIntentOp, *enginepb.MVCCAbortIntentOp, - *enginepb.MVCCAbortTxnOp: + *enginepb.MVCCAbortTxnOp, + *enginepb.MVCCDeleteRangeOp: // Nothing to do. continue default: diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 5eba5ae37a20..d9cb171fe036 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -1519,6 +1519,9 @@ func (e *RangeFeedEvent) ShallowCopy() *RangeFeedEvent { case *RangeFeedSSTable: cpySST := *t cpy.MustSetValue(&cpySST) + case *RangeFeedDeleteRange: + cpyDelRange := *t + cpy.MustSetValue(&cpyDelRange) case *RangeFeedError: cpyErr := *t cpy.MustSetValue(&cpyErr) diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index 10e2768da420..9620f4c9833e 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -2783,15 +2783,24 @@ message RangeFeedSSTable { util.hlc.Timestamp write_ts = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "WriteTS"]; } +// RangeFeedDeleteRange is a variant of RangeFeedEvent that represents a +// deletion of the specified key range at the given timestamp using an MVCC +// range tombstone. +message RangeFeedDeleteRange { + Span span = 1 [(gogoproto.nullable) = false]; + util.hlc.Timestamp timestamp = 2 [(gogoproto.nullable) = false]; +} + // RangeFeedEvent is a union of all event types that may be returned on a // RangeFeed response stream. message RangeFeedEvent { option (gogoproto.onlyone) = true; - RangeFeedValue val = 1; - RangeFeedCheckpoint checkpoint = 2; - RangeFeedError error = 3; - RangeFeedSSTable sst = 4 [(gogoproto.customname) = "SST"]; + RangeFeedValue val = 1; + RangeFeedCheckpoint checkpoint = 2; + RangeFeedError error = 3; + RangeFeedSSTable sst = 4 [(gogoproto.customname) = "SST"]; + RangeFeedDeleteRange delete_range = 5; } diff --git a/pkg/roachpb/data.go b/pkg/roachpb/data.go index 1814d1ee63c0..f0dc9643ff0a 100644 --- a/pkg/roachpb/data.go +++ b/pkg/roachpb/data.go @@ -2088,6 +2088,11 @@ func (ls LockStateInfo) String() string { return redact.StringWithoutMarkers(ls) } +// Clone returns a copy of the span. +func (s Span) Clone() Span { + return Span{Key: s.Key.Clone(), EndKey: s.EndKey.Clone()} +} + // EqualValue is Equal. // // TODO(tbg): remove this passthrough. diff --git a/pkg/storage/enginepb/mvcc3.proto b/pkg/storage/enginepb/mvcc3.proto index ec05dad30d65..f34a47a9c85d 100644 --- a/pkg/storage/enginepb/mvcc3.proto +++ b/pkg/storage/enginepb/mvcc3.proto @@ -343,6 +343,15 @@ message MVCCAbortTxnOp { (gogoproto.nullable) = false]; } +// MVCCDeleteRangeOp corresponds to a range deletion using an MVCC range +// tombstone. +message MVCCDeleteRangeOp { + bytes start_key = 1; + bytes end_key = 2; + util.hlc.Timestamp timestamp = 3 [(gogoproto.nullable) = false]; +} + + // MVCCLogicalOp is a union of all logical MVCC operation types. message MVCCLogicalOp { option (gogoproto.onlyone) = true; @@ -353,4 +362,5 @@ message MVCCLogicalOp { MVCCCommitIntentOp commit_intent = 4; MVCCAbortIntentOp abort_intent = 5; MVCCAbortTxnOp abort_txn = 6; + MVCCDeleteRangeOp delete_range = 7; } diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index deeeac6facec..0ef3dca7c5e3 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -973,7 +973,7 @@ func mvccGetMetadata( // metadata), or the point version's timestamp if it was a tombstone. if hasRange { rangeKeys := iter.RangeKeys() - if rkv, ok := firstRangeKeyAbove(rangeKeys, unsafeKey.Timestamp); ok { + if rkv, ok := FirstRangeKeyAbove(rangeKeys, unsafeKey.Timestamp); ok { meta.Deleted = true meta.Timestamp = rangeKeys[0].RangeKey.Timestamp.ToLegacyTimestamp() keyLastSeen := rkv.RangeKey.Timestamp @@ -2735,8 +2735,19 @@ func ExperimentalMVCCDeleteRangeUsingTombstone( } } - // Write the tombstone. - return rw.ExperimentalPutMVCCRangeKey(rangeKey, value) + if err := rw.ExperimentalPutMVCCRangeKey(rangeKey, value); err != nil { + return err + } + + // Record the logical operation, for rangefeed emission. + rw.LogLogicalOp(MVCCDeleteRangeOpType, MVCCLogicalOpDetails{ + Safe: true, + Key: rangeKey.StartKey, + EndKey: rangeKey.EndKey, + Timestamp: rangeKey.Timestamp, + }) + + return nil } func recordIteratorStats(traceSpan *tracing.Span, iteratorStats IteratorStats) { @@ -3798,7 +3809,7 @@ func mvccResolveWriteIntent( // synthesize a point tombstone at the lowest range tombstone covering it. // This is where the point key ceases to exist, contributing to GCBytesAge. if len(unsafeNextValueRaw) > 0 { - if rk, found := firstRangeKeyAbove(iter.RangeKeys(), unsafeNextKey.Timestamp); found { + if rk, found := FirstRangeKeyAbove(iter.RangeKeys(), unsafeNextKey.Timestamp); found { unsafeNextKey.Timestamp = rk.RangeKey.Timestamp unsafeNextValueRaw = []byte{} } @@ -4485,7 +4496,7 @@ func ComputeStatsForRangeWithVisitors( // only take them into account for versioned values. var nextRangeTombstone hlc.Timestamp if isValue { - if rkv, ok := firstRangeKeyAbove(rangeKeys, unsafeKey.Timestamp); ok { + if rkv, ok := FirstRangeKeyAbove(rangeKeys, unsafeKey.Timestamp); ok { nextRangeTombstone = rkv.RangeKey.Timestamp } } diff --git a/pkg/storage/mvcc_key.go b/pkg/storage/mvcc_key.go index 8bf9b39abb9f..dbbbbfa868c3 100644 --- a/pkg/storage/mvcc_key.go +++ b/pkg/storage/mvcc_key.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/errors" ) @@ -438,11 +439,14 @@ func (k MVCCRangeKey) Validate() (err error) { } } -// firstRangeKeyAbove does a binary search for the first range key at or above +// FirstRangeKeyAbove does a binary search for the first range key at or above // the given timestamp. It assumes the range keys are ordered in descending // timestamp order, as returned by SimpleMVCCIterator.RangeKeys(). Returns false // if no matching range key was found. -func firstRangeKeyAbove(rangeKeys []MVCCRangeKeyValue, ts hlc.Timestamp) (MVCCRangeKeyValue, bool) { +// +// TODO(erikgrinaker): Consider using a new type for []MVCCRangeKeyValue as +// returned by SimpleMVCCIterator.RangeKeys(), and add this as a method. +func FirstRangeKeyAbove(rangeKeys []MVCCRangeKeyValue, ts hlc.Timestamp) (MVCCRangeKeyValue, bool) { // This is kind of odd due to sort.Search() semantics: we do a binary search // for the first range tombstone that's below the timestamp, then return the // previous range tombstone if any. @@ -453,3 +457,21 @@ func firstRangeKeyAbove(rangeKeys []MVCCRangeKeyValue, ts hlc.Timestamp) (MVCCRa } return MVCCRangeKeyValue{}, false } + +// HasRangeKeyBetween checks whether an MVCC range key exists between the two +// given timestamps (in order). It assumes the range keys are ordered in +// descending timestamp order, as returned by SimpleMVCCIterator.RangeKeys(). +func HasRangeKeyBetween(rangeKeys []MVCCRangeKeyValue, upper, lower hlc.Timestamp) bool { + if len(rangeKeys) == 0 { + return false + } + if util.RaceEnabled && upper.Less(lower) { + panic(errors.AssertionFailedf("HasRangeKeyBetween given upper %s <= lower %s", upper, lower)) + } + if rkv, ok := FirstRangeKeyAbove(rangeKeys, lower); ok { + // Consider equal timestamps to be "between". This shouldn't really happen, + // since MVCC enforces point and range keys can't have the same timestamp. + return rkv.RangeKey.Timestamp.LessEq(upper) + } + return false +} diff --git a/pkg/storage/mvcc_key_test.go b/pkg/storage/mvcc_key_test.go index f99b1b5f1fa2..8bac4ddbf4b8 100644 --- a/pkg/storage/mvcc_key_test.go +++ b/pkg/storage/mvcc_key_test.go @@ -22,6 +22,7 @@ import ( "testing/quick" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -523,7 +524,7 @@ func TestFirstRangeKeyAbove(t *testing.T) { } for _, tc := range testcases { t.Run(fmt.Sprintf("%d", tc.ts), func(t *testing.T) { - rkv, ok := firstRangeKeyAbove(rangeKVs, hlc.Timestamp{WallTime: tc.ts}) + rkv, ok := FirstRangeKeyAbove(rangeKVs, hlc.Timestamp{WallTime: tc.ts}) if tc.expect == 0 { require.False(t, ok) require.Empty(t, rkv) @@ -535,6 +536,43 @@ func TestFirstRangeKeyAbove(t *testing.T) { } } +func TestHasRangeKeyBetween(t *testing.T) { + defer leaktest.AfterTest(t)() + + rangeKVs := []MVCCRangeKeyValue{ + rangeKV("a", "f", 5, MVCCValue{}), + rangeKV("a", "f", 1, MVCCValue{}), + } + + testcases := []struct { + upper, lower int + expect bool + }{ + {0, 0, false}, + {0, 1, false}, // wrong order + {1, 0, true}, + {1, 1, true}, + {0, 2, false}, // wrong order + {4, 6, false}, // wrong order + {6, 4, true}, + {5, 5, true}, + {4, 4, false}, + {6, 6, false}, + {4, 2, false}, + {0, 9, false}, // wrong order + {9, 0, true}, + } + for _, tc := range testcases { + t.Run(fmt.Sprintf("%d,%d", tc.upper, tc.lower), func(t *testing.T) { + if util.RaceEnabled && tc.upper < tc.lower { + require.Panics(t, func() { HasRangeKeyBetween(rangeKVs, wallTS(tc.upper), wallTS(tc.lower)) }) + } else { + require.Equal(t, tc.expect, HasRangeKeyBetween(rangeKVs, wallTS(tc.upper), wallTS(tc.lower))) + } + }) + } +} + func pointKey(key string, ts int) MVCCKey { return MVCCKey{Key: roachpb.Key(key), Timestamp: wallTS(ts)} } diff --git a/pkg/storage/mvcc_logical_ops.go b/pkg/storage/mvcc_logical_ops.go index 7872fb8d0610..6ce5d27480f2 100644 --- a/pkg/storage/mvcc_logical_ops.go +++ b/pkg/storage/mvcc_logical_ops.go @@ -45,6 +45,8 @@ const ( MVCCCommitIntentOpType // MVCCAbortIntentOpType corresponds to the MVCCAbortIntentOp variant. MVCCAbortIntentOpType + // MVCCDeleteRangeOpType corresponds to the MVCCDeleteRangeOp variant. + MVCCDeleteRangeOpType ) // MVCCLogicalOpDetails contains details about the occurrence of an MVCC logical @@ -52,6 +54,7 @@ const ( type MVCCLogicalOpDetails struct { Txn enginepb.TxnMeta Key roachpb.Key + EndKey roachpb.Key // only set for MVCCDeleteRangeOpType Timestamp hlc.Timestamp // Safe indicates that the values in this struct will never be invalidated @@ -142,6 +145,16 @@ func (ol *OpLoggerBatch) logLogicalOp(op MVCCLogicalOpType, details MVCCLogicalO ol.recordOp(&enginepb.MVCCAbortIntentOp{ TxnID: details.Txn.ID, }) + case MVCCDeleteRangeOpType: + if !details.Safe { + ol.opsAlloc, details.Key = ol.opsAlloc.Copy(details.Key, 0) + ol.opsAlloc, details.EndKey = ol.opsAlloc.Copy(details.EndKey, 0) + } + ol.recordOp(&enginepb.MVCCDeleteRangeOp{ + StartKey: details.Key, + EndKey: details.EndKey, + Timestamp: details.Timestamp, + }) default: panic(fmt.Sprintf("unexpected op type %v", op)) }