diff --git a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go index 0b8018894d61..142a9f700a88 100644 --- a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go @@ -123,6 +123,21 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error { // expect SST ingestion into spans with active changefeeds. return errors.Errorf("unexpected SST ingestion: %v", t) + case *roachpb.RangeFeedDeleteRange: + // For now, we just error on MVCC range tombstones. These are currently + // only expected to be used by schema GC and IMPORT INTO, and such spans + // should not have active changefeeds across them. + // + // TODO(erikgrinaker): Write an end-to-end test which verifies that an + // IMPORT INTO which gets rolled back using MVCC range tombstones will + // not be visible to a changefeed, neither when it was started before + // the import or when resuming from a timestamp before the import. The + // table decriptor should be marked as offline during the import, and + // catchup scans should detect that this happened and prevent reading + // anything in that timespan. See: + // https://github.com/cockroachdb/cockroach/issues/70433 + return errors.Errorf("unexpected MVCC range deletion: %v", t) + default: return errors.Errorf("unexpected RangeFeedEvent variant %v", t) } diff --git a/pkg/ccl/streamingccl/streamproducer/event_stream.go b/pkg/ccl/streamingccl/streamproducer/event_stream.go index 5c8929b848cf..fda74cf22210 100644 --- a/pkg/ccl/streamingccl/streamproducer/event_stream.go +++ b/pkg/ccl/streamingccl/streamproducer/event_stream.go @@ -472,6 +472,7 @@ func (s *eventStream) streamLoop(ctx context.Context, frontier *span.Frontier) e return err } default: + // TODO(erikgrinaker): Handle DeleteRange events (MVCC range tombstones). return errors.AssertionFailedf("unexpected event") } } diff --git a/pkg/kv/kvclient/rangefeed/BUILD.bazel b/pkg/kv/kvclient/rangefeed/BUILD.bazel index 1e4cffb02cda..888d3cd3f275 100644 --- a/pkg/kv/kvclient/rangefeed/BUILD.bazel +++ b/pkg/kv/kvclient/rangefeed/BUILD.bazel @@ -73,6 +73,7 @@ go_test( "//pkg/spanconfig", "//pkg/spanconfig/spanconfigptsreader", "//pkg/sql/catalog/desctestutils", + "//pkg/storage", "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", diff --git a/pkg/kv/kvclient/rangefeed/config.go b/pkg/kv/kvclient/rangefeed/config.go index 56a0e235730a..d7f0101a8ec1 100644 --- a/pkg/kv/kvclient/rangefeed/config.go +++ b/pkg/kv/kvclient/rangefeed/config.go @@ -41,6 +41,7 @@ type config struct { onCheckpoint OnCheckpoint onFrontierAdvance OnFrontierAdvance onSSTable OnSSTable + onDeleteRange OnDeleteRange extraPProfLabels []string } @@ -178,6 +179,24 @@ func WithOnSSTable(f OnSSTable) Option { }) } +// OnDeleteRange is called when an MVCC range tombstone is written (e.g. when +// DeleteRange is called with UseExperimentalRangeTombstone, but not when the +// range is deleted using point tombstones). If this callback is not provided, +// an error is emitted when these are encountered. +// +// MVCC range tombstones are currently experimental, and requires the +// MVCCRangeTombstones version gate. They are only expected during certain +// operations like schema GC and IMPORT INTO (i.e. not across live tables). +type OnDeleteRange func(ctx context.Context, value *roachpb.RangeFeedDeleteRange) + +// WithOnDeleteRange sets up a callback that's invoked whenever an MVCC range +// deletion tombstone is written. +func WithOnDeleteRange(f OnDeleteRange) Option { + return optionFunc(func(c *config) { + c.onDeleteRange = f + }) +} + // OnFrontierAdvance is called when the rangefeed frontier is advanced with the // new frontier timestamp. type OnFrontierAdvance func(ctx context.Context, timestamp hlc.Timestamp) diff --git a/pkg/kv/kvclient/rangefeed/rangefeed.go b/pkg/kv/kvclient/rangefeed/rangefeed.go index 950bc5610389..f039e0569f47 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed.go @@ -357,6 +357,12 @@ func (f *RangeFeed) processEvents( "received unexpected rangefeed SST event with no OnSSTable handler") } f.onSSTable(ctx, ev.SST) + case ev.DeleteRange != nil: + if f.onDeleteRange == nil { + return errors.AssertionFailedf( + "received unexpected rangefeed DeleteRange event with no OnDeleteRange handler: %s", ev) + } + f.onDeleteRange(ctx, ev.DeleteRange) case ev.Error != nil: // Intentionally do nothing, we'll get an error returned from the // call to RangeFeed. diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go index e1fc934bf71f..e9c9673b6082 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/sstutil" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" @@ -615,6 +616,162 @@ func TestWithOnSSTableCatchesUpIfNotSet(t *testing.T) { require.Equal(t, expectKVs, seenKVs) } +// TestWithOnDeleteRange tests that the rangefeed emits MVCC range tombstones. +// +// TODO(erikgrinaker): These kinds of tests should really use a data-driven test +// harness, for more exhaustive testing. But it'll do for now. +func TestWithOnDeleteRange(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + srv := tc.Server(0) + db := srv.DB() + + _, _, err := tc.SplitRange(roachpb.Key("a")) + require.NoError(t, err) + require.NoError(t, tc.WaitForFullReplication()) + + _, err = tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.rangefeed.enabled = true") + require.NoError(t, err) + f, err := rangefeed.NewFactory(srv.Stopper(), db, srv.ClusterSettings(), nil) + require.NoError(t, err) + + // We lay down a few MVCC range tombstones and points. The first range + // tombstone should not be visible, because initial scans do not emit + // tombstones, nor should the points covered by it. The second range tombstone + // should be visible, because catchup scans do emit tombstones. The range + // tombstone should be ordered after the initial point, but before the foo + // catchup point, and the previous values should respect the range tombstones. + require.NoError(t, db.Put(ctx, "covered", "covered")) + require.NoError(t, db.Put(ctx, "foo", "covered")) + require.NoError(t, db.ExperimentalDelRangeUsingTombstone(ctx, "a", "z")) + require.NoError(t, db.Put(ctx, "foo", "initial")) + rangeFeedTS := db.Clock().Now() + require.NoError(t, db.Put(ctx, "covered", "catchup")) + require.NoError(t, db.ExperimentalDelRangeUsingTombstone(ctx, "a", "z")) + require.NoError(t, db.Put(ctx, "foo", "catchup")) + + // We start the rangefeed over a narrower span than the DeleteRanges (c-g), + // to ensure the DeleteRange event is truncated to the registration span. + var checkpointOnce sync.Once + checkpointC := make(chan struct{}) + deleteRangeC := make(chan *roachpb.RangeFeedDeleteRange) + rowC := make(chan *roachpb.RangeFeedValue) + + spans := []roachpb.Span{{Key: roachpb.Key("c"), EndKey: roachpb.Key("g")}} + r, err := f.RangeFeed(ctx, "test", spans, rangeFeedTS, + func(ctx context.Context, e *roachpb.RangeFeedValue) { + select { + case rowC <- e: + case <-ctx.Done(): + } + }, + rangefeed.WithDiff(true), + rangefeed.WithInitialScan(nil), + rangefeed.WithOnCheckpoint(func(ctx context.Context, checkpoint *roachpb.RangeFeedCheckpoint) { + checkpointOnce.Do(func() { + close(checkpointC) + }) + }), + rangefeed.WithOnDeleteRange(func(ctx context.Context, e *roachpb.RangeFeedDeleteRange) { + select { + case deleteRangeC <- e: + case <-ctx.Done(): + } + }), + ) + require.NoError(t, err) + defer r.Close() + + // Wait for initial scan. We should see the foo=initial point, but not the + // range tombstone nor the covered points. + select { + case e := <-rowC: + require.Equal(t, roachpb.Key("foo"), e.Key) + value, err := e.Value.GetBytes() + require.NoError(t, err) + require.Equal(t, "initial", string(value)) + prevValue, err := e.PrevValue.GetBytes() + require.NoError(t, err) + require.Equal(t, "initial", string(prevValue)) // initial scans supply current as prev + case <-time.After(3 * time.Second): + require.Fail(t, "timed out waiting for initial scan event") + } + + // Wait for catchup scan. We should see the second range tombstone, truncated + // to the rangefeed bounds (c-g), and it should be ordered before the points + // covered=catchup and foo=catchup. both points should have a tombstone as the + // previous value. + select { + case e := <-deleteRangeC: + require.Equal(t, roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("g")}, e.Span) + require.NotEmpty(t, e.Timestamp) + case <-time.After(3 * time.Second): + require.Fail(t, "timed out waiting for DeleteRange event") + } + + select { + case e := <-rowC: + require.Equal(t, roachpb.Key("covered"), e.Key) + value, err := e.Value.GetBytes() + require.NoError(t, err) + require.Equal(t, "catchup", string(value)) + prevValue, err := storage.DecodeMVCCValue(e.PrevValue.RawBytes) + require.NoError(t, err) + require.True(t, prevValue.IsTombstone()) + case <-time.After(3 * time.Second): + require.Fail(t, "timed out waiting for foo=catchup event") + } + + select { + case e := <-rowC: + require.Equal(t, roachpb.Key("foo"), e.Key) + value, err := e.Value.GetBytes() + require.NoError(t, err) + require.Equal(t, "catchup", string(value)) + prevValue, err := storage.DecodeMVCCValue(e.PrevValue.RawBytes) + require.NoError(t, err) + require.True(t, prevValue.IsTombstone()) + case <-time.After(3 * time.Second): + require.Fail(t, "timed out waiting for foo=catchup event") + } + + // Wait for checkpoint after catchup scan. + select { + case <-checkpointC: + case <-time.After(3 * time.Second): + require.Fail(t, "timed out waiting for checkpoint") + } + + // Send another DeleteRange, and wait for the rangefeed event. This should + // be truncated to the rangefeed bounds (c-g). + require.NoError(t, db.ExperimentalDelRangeUsingTombstone(ctx, "a", "z")) + select { + case e := <-deleteRangeC: + require.Equal(t, roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("g")}, e.Span) + require.NotEmpty(t, e.Timestamp) + case <-time.After(3 * time.Second): + require.Fail(t, "timed out waiting for DeleteRange event") + } + + // A final point write should be emitted with a tombstone as the previous value. + require.NoError(t, db.Put(ctx, "foo", "final")) + select { + case e := <-rowC: + require.Equal(t, roachpb.Key("foo"), e.Key) + value, err := e.Value.GetBytes() + require.NoError(t, err) + require.Equal(t, "final", string(value)) + prevValue, err := storage.DecodeMVCCValue(e.PrevValue.RawBytes) + require.NoError(t, err) + require.True(t, prevValue.IsTombstone()) + case <-time.After(3 * time.Second): + require.Fail(t, "timed out waiting for foo=final event") + } +} + // TestUnrecoverableErrors verifies that unrecoverable internal errors are surfaced // to callers. func TestUnrecoverableErrors(t *testing.T) { diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan.go b/pkg/kv/kvserver/rangefeed/catchup_scan.go index 622e3361bb95..f64258f941ab 100644 --- a/pkg/kv/kvserver/rangefeed/catchup_scan.go +++ b/pkg/kv/kvserver/rangefeed/catchup_scan.go @@ -65,6 +65,8 @@ func NewCatchUpIterator( return &CatchUpIterator{ simpleCatchupIter: storage.NewMVCCIncrementalIterator(reader, storage.MVCCIncrementalIterOptions{ + KeyTypes: storage.IterKeyTypePointsAndRanges, + StartKey: span.Key, EndKey: span.EndKey, StartTime: startTime, EndTime: hlc.MaxTimestamp, @@ -96,6 +98,15 @@ type outputEventFn func(e *roachpb.RangeFeedEvent) error // CatchUpScan iterates over all changes in the configured key/time span, and // emits them as RangeFeedEvents via outputFn in chronological order. +// +// MVCC range tombstones are emitted at their start key, in chronological order. +// Because the start key itself is not timestamped, these will be ordered before +// all of the timestamped point keys that they overlap. For more details, see +// MVCC range key info on storage.SimpleMVCCIterator. +// +// For example, with MVCC range tombstones [a-f)@5 and [a-f)@3 overlapping point +// keys a@6, a@4, and b@2, the emitted order is [a-f)@3,[a-f)@5,a@4,a@6,b@2 because +// the start key "a" is ordered before all of the timestamped point keys. func (i *CatchUpIterator) CatchUpScan(outputFn outputEventFn, withDiff bool) error { var a bufalloc.ByteAllocator // MVCCIterator will encounter historical values for each key in @@ -103,18 +114,7 @@ func (i *CatchUpIterator) CatchUpScan(outputFn outputEventFn, withDiff bool) err // events for the same key until a different key is encountered, then output // the encountered values in reverse. This also allows us to buffer events // as we fill in previous values. - var lastKey roachpb.Key reorderBuf := make([]roachpb.RangeFeedEvent, 0, 5) - addPrevToLastEvent := func(val []byte) { - if l := len(reorderBuf); l > 0 { - if reorderBuf[l-1].Val.PrevValue.IsPresent() { - panic("RangeFeedValue.PrevVal unexpectedly set") - } - // TODO(sumeer): find out if it is deliberate that we are not populating - // PrevValue.Timestamp. - reorderBuf[l-1].Val.PrevValue.RawBytes = val - } - } outputEvents := func() error { for i := len(reorderBuf) - 1; i >= 0; i-- { @@ -130,7 +130,9 @@ func (i *CatchUpIterator) CatchUpScan(outputFn outputEventFn, withDiff bool) err // Iterate though all keys using Next. We want to publish all committed // versions of each key that are after the registration's startTS, so we // can't use NextKey. + var lastKey roachpb.Key var meta enginepb.MVCCMetadata + var rangeKeysStart roachpb.Key i.SeekGE(storage.MVCCKey{Key: i.span.Key}) for { if ok, err := i.Valid(); err != nil { @@ -139,6 +141,45 @@ func (i *CatchUpIterator) CatchUpScan(outputFn outputEventFn, withDiff bool) err break } + hasPoint, hasRange := i.HasPointAndRange() + + // Emit any new MVCC range tombstones when their start key is encountered. + // Range keys can currently only be MVCC range tombstones. + // + // TODO(erikgrinaker): Find a faster/better way to detect range key changes + // that doesn't involve constant comparisons. Pebble probably already knows, + // we just need a way to ask it. + if hasRange { + if rangeBounds := i.RangeBounds(); !rangeBounds.Key.Equal(rangeKeysStart) { + rangeKeysStart = append(rangeKeysStart[:0], rangeBounds.Key...) + + // Emit events for these MVCC range tombstones, in chronological order. + rangeKeys := i.RangeKeys() + for i := len(rangeKeys) - 1; i >= 0; i-- { + var span roachpb.Span + a, span.Key = a.Copy(rangeBounds.Key, 0) + a, span.EndKey = a.Copy(rangeBounds.EndKey, 0) + err := outputFn(&roachpb.RangeFeedEvent{ + DeleteRange: &roachpb.RangeFeedDeleteRange{ + Span: span, + Timestamp: rangeKeys[i].RangeKey.Timestamp, + }, + }) + if err != nil { + return err + } + } + } + } + + // If there's no point key here (i.e. we found a bare range key above), then + // step onto the next key. This may be a point key version at the same key + // as the range key's start bound, or a later point/range key. + if !hasPoint { + i.Next() + continue + } + unsafeKey := i.UnsafeKey() unsafeValRaw := i.UnsafeValue() if !unsafeKey.IsValue() { @@ -217,9 +258,26 @@ func (i *CatchUpIterator) CatchUpScan(outputFn outputEventFn, withDiff bool) err var val []byte a, val = a.Copy(unsafeVal, 0) if withDiff { - // Update the last version with its - // previous value (this version). - addPrevToLastEvent(val) + // Update the last version with its previous value (this version). + if l := len(reorderBuf) - 1; l >= 0 { + if reorderBuf[l].Val.PrevValue.IsPresent() { + return errors.AssertionFailedf("unexpected previous value %s for key %s", + reorderBuf[l].Val.PrevValue, key) + } + // If an MVCC range tombstone exists between this value and the next + // one, we don't emit the value after all -- it should be a tombstone. + // + // TODO(erikgrinaker): We can't save range keys when we detect changes + // to rangeKeysStart above, because NextIgnoringTime() could reveal + // additional MVCC range tombstones below StartTime that cover this + // point. We need to find a more performant way to handle this. + if !hasRange || !storage.HasRangeKeyBetween( + i.RangeKeys(), reorderBuf[l].Val.Value.Timestamp, ts) { + // TODO(sumeer): find out if it is deliberate that we are not populating + // PrevValue.Timestamp. + reorderBuf[l].Val.PrevValue.RawBytes = val + } + } } if !ignore { diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go b/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go index 568fcb2edf8a..15eddbe29298 100644 --- a/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go +++ b/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go @@ -175,7 +175,7 @@ func setupMVCCPebble(b testing.TB, dir string, lBaseMaxBytes int64, readOnly boo opts.FS = vfs.Default opts.LBaseMaxBytes = lBaseMaxBytes opts.ReadOnly = readOnly - opts.FormatMajorVersion = pebble.FormatBlockPropertyCollector + opts.FormatMajorVersion = pebble.FormatRangeKeys peb, err := storage.NewPebble( context.Background(), storage.PebbleConfig{ diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan_test.go b/pkg/kv/kvserver/rangefeed/catchup_scan_test.go index 9d9c78e0ac13..bf0b7ff6689a 100644 --- a/pkg/kv/kvserver/rangefeed/catchup_scan_test.go +++ b/pkg/kv/kvserver/rangefeed/catchup_scan_test.go @@ -26,6 +26,11 @@ import ( "github.com/stretchr/testify/require" ) +// TODO(erikgrinaker): This should be migrated to a data-driven test harness for +// end-to-end rangefeed testing, with more exhaustive test cases. See: +// https://github.com/cockroachdb/cockroach/issues/82715 +// +// For now, see rangefeed_external_test.go for rudimentary range key tests. func TestCatchupScan(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 1f2e8d55a1e0..3bd0b58bf7c7 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -700,6 +700,10 @@ func (p *Processor) consumeLogicalOps( // Publish the new value directly. p.publishValue(ctx, t.Key, t.Timestamp, t.Value, t.PrevValue, allocation) + case *enginepb.MVCCDeleteRangeOp: + // Publish the range deletion directly. + p.publishDeleteRange(ctx, t.StartKey, t.EndKey, t.Timestamp, allocation) + case *enginepb.MVCCWriteIntentOp: // No updates to publish. @@ -777,6 +781,25 @@ func (p *Processor) publishValue( p.reg.PublishToOverlapping(ctx, roachpb.Span{Key: key}, &event, allocation) } +func (p *Processor) publishDeleteRange( + ctx context.Context, + startKey, endKey roachpb.Key, + timestamp hlc.Timestamp, + allocation *SharedBudgetAllocation, +) { + span := roachpb.Span{Key: startKey, EndKey: endKey} + if !p.Span.ContainsKeyRange(roachpb.RKey(startKey), roachpb.RKey(endKey)) { + log.Fatalf(ctx, "span %s not in Processor's key range %v", span, p.Span) + } + + var event roachpb.RangeFeedEvent + event.MustSetValue(&roachpb.RangeFeedDeleteRange{ + Span: span, + Timestamp: timestamp, + }) + p.reg.PublishToOverlapping(ctx, span, &event, allocation) +} + func (p *Processor) publishSSTable( ctx context.Context, sst []byte, diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index 09a7c2005b85..a945160e95f6 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -198,6 +198,13 @@ func (r *registration) validateEvent(event *roachpb.RangeFeedEvent) { if t.WriteTS.IsEmpty() { panic(fmt.Sprintf("unexpected empty RangeFeedSSTable.Timestamp: %v", t)) } + case *roachpb.RangeFeedDeleteRange: + if len(t.Span.Key) == 0 || len(t.Span.EndKey) == 0 { + panic(fmt.Sprintf("unexpected empty key in RangeFeedDeleteRange.Span: %v", t)) + } + if t.Timestamp.IsEmpty() { + panic(fmt.Sprintf("unexpected empty RangeFeedDeleteRange.Timestamp: %v", t)) + } default: panic(fmt.Sprintf("unexpected RangeFeedEvent variant: %v", t)) } @@ -244,6 +251,12 @@ func (r *registration) maybeStripEvent(event *roachpb.RangeFeedEvent) *roachpb.R t = copyOnWrite().(*roachpb.RangeFeedCheckpoint) t.Span = r.span } + case *roachpb.RangeFeedDeleteRange: + // Truncate the range tombstone to the registration bounds. + if i := t.Span.Intersect(r.span); !i.Equal(t.Span) { + t = copyOnWrite().(*roachpb.RangeFeedDeleteRange) + t.Span = i.Clone() + } case *roachpb.RangeFeedSSTable: // SSTs are always sent in their entirety, it is up to the caller to // filter out irrelevant entries. @@ -440,6 +453,8 @@ func (reg *registry) PublishToOverlapping( minTS = t.Value.Timestamp case *roachpb.RangeFeedSSTable: minTS = t.WriteTS + case *roachpb.RangeFeedDeleteRange: + minTS = t.Timestamp case *roachpb.RangeFeedCheckpoint: // Always publish checkpoint notifications, regardless of a registration's // starting timestamp. diff --git a/pkg/kv/kvserver/rangefeed/resolved_timestamp.go b/pkg/kv/kvserver/rangefeed/resolved_timestamp.go index 0119b735bc20..4b7e6dd6f743 100644 --- a/pkg/kv/kvserver/rangefeed/resolved_timestamp.go +++ b/pkg/kv/kvserver/rangefeed/resolved_timestamp.go @@ -142,6 +142,10 @@ func (rts *resolvedTimestamp) consumeLogicalOp(op enginepb.MVCCLogicalOp) bool { rts.assertOpAboveRTS(op, t.Timestamp) return false + case *enginepb.MVCCDeleteRangeOp: + rts.assertOpAboveRTS(op, t.Timestamp) + return false + case *enginepb.MVCCWriteIntentOp: rts.assertOpAboveRTS(op, t.Timestamp) return rts.intentQ.IncRef(t.TxnID, t.TxnKey, t.TxnMinTimestamp, t.Timestamp) diff --git a/pkg/kv/kvserver/rangefeed/task_test.go b/pkg/kv/kvserver/rangefeed/task_test.go index f3d2b7c1d773..e236c10ebec4 100644 --- a/pkg/kv/kvserver/rangefeed/task_test.go +++ b/pkg/kv/kvserver/rangefeed/task_test.go @@ -196,17 +196,18 @@ func (s *testIterator) curKV() storage.MVCCKeyValue { // HasPointAndRange implements SimpleMVCCIterator. func (s *testIterator) HasPointAndRange() (bool, bool) { - panic("not implemented") + ok, err := s.Valid() + return ok && err == nil, false } // RangeBounds implements SimpleMVCCIterator. func (s *testIterator) RangeBounds() roachpb.Span { - panic("not implemented") + return roachpb.Span{} } // RangeTombstones implements SimpleMVCCIterator. func (s *testIterator) RangeKeys() []storage.MVCCRangeKeyValue { - panic("not implemented") + return []storage.MVCCRangeKeyValue{} } func TestInitResolvedTSScan(t *testing.T) { diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 88e4013878d5..c3b62db37ef2 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -513,7 +513,8 @@ func (r *Replica) populatePrevValsInLogicalOpLogRaftMuLocked( case *enginepb.MVCCWriteIntentOp, *enginepb.MVCCUpdateIntentOp, *enginepb.MVCCAbortIntentOp, - *enginepb.MVCCAbortTxnOp: + *enginepb.MVCCAbortTxnOp, + *enginepb.MVCCDeleteRangeOp: // Nothing to do. continue default: @@ -587,7 +588,8 @@ func (r *Replica) handleLogicalOpLogRaftMuLocked( case *enginepb.MVCCWriteIntentOp, *enginepb.MVCCUpdateIntentOp, *enginepb.MVCCAbortIntentOp, - *enginepb.MVCCAbortTxnOp: + *enginepb.MVCCAbortTxnOp, + *enginepb.MVCCDeleteRangeOp: // Nothing to do. continue default: diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 5eba5ae37a20..d9cb171fe036 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -1519,6 +1519,9 @@ func (e *RangeFeedEvent) ShallowCopy() *RangeFeedEvent { case *RangeFeedSSTable: cpySST := *t cpy.MustSetValue(&cpySST) + case *RangeFeedDeleteRange: + cpyDelRange := *t + cpy.MustSetValue(&cpyDelRange) case *RangeFeedError: cpyErr := *t cpy.MustSetValue(&cpyErr) diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index b7f558761ada..2bf1f1c66c5a 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -2767,15 +2767,24 @@ message RangeFeedSSTable { util.hlc.Timestamp write_ts = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "WriteTS"]; } +// RangeFeedDeleteRange is a variant of RangeFeedEvent that represents a +// deletion of the specified key range at the given timestamp using an MVCC +// range tombstone. +message RangeFeedDeleteRange { + Span span = 1 [(gogoproto.nullable) = false]; + util.hlc.Timestamp timestamp = 2 [(gogoproto.nullable) = false]; +} + // RangeFeedEvent is a union of all event types that may be returned on a // RangeFeed response stream. message RangeFeedEvent { option (gogoproto.onlyone) = true; - RangeFeedValue val = 1; - RangeFeedCheckpoint checkpoint = 2; - RangeFeedError error = 3; - RangeFeedSSTable sst = 4 [(gogoproto.customname) = "SST"]; + RangeFeedValue val = 1; + RangeFeedCheckpoint checkpoint = 2; + RangeFeedError error = 3; + RangeFeedSSTable sst = 4 [(gogoproto.customname) = "SST"]; + RangeFeedDeleteRange delete_range = 5; } diff --git a/pkg/roachpb/data.go b/pkg/roachpb/data.go index 39c964412ced..8d1b30bd21ff 100644 --- a/pkg/roachpb/data.go +++ b/pkg/roachpb/data.go @@ -2067,6 +2067,11 @@ func (ls LockStateInfo) String() string { return redact.StringWithoutMarkers(ls) } +// Clone returns a copy of the span. +func (s Span) Clone() Span { + return Span{Key: s.Key.Clone(), EndKey: s.EndKey.Clone()} +} + // EqualValue is Equal. // // TODO(tbg): remove this passthrough. diff --git a/pkg/storage/enginepb/mvcc3.proto b/pkg/storage/enginepb/mvcc3.proto index c10364435bc4..470f64197088 100644 --- a/pkg/storage/enginepb/mvcc3.proto +++ b/pkg/storage/enginepb/mvcc3.proto @@ -335,6 +335,15 @@ message MVCCAbortTxnOp { (gogoproto.nullable) = false]; } +// MVCCDeleteRangeOp corresponds to a range deletion using an MVCC range +// tombstone. +message MVCCDeleteRangeOp { + bytes start_key = 1; + bytes end_key = 2; + util.hlc.Timestamp timestamp = 3 [(gogoproto.nullable) = false]; +} + + // MVCCLogicalOp is a union of all logical MVCC operation types. message MVCCLogicalOp { option (gogoproto.onlyone) = true; @@ -345,4 +354,5 @@ message MVCCLogicalOp { MVCCCommitIntentOp commit_intent = 4; MVCCAbortIntentOp abort_intent = 5; MVCCAbortTxnOp abort_txn = 6; + MVCCDeleteRangeOp delete_range = 7; } diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 0d4808ce5aae..7ae915bf4374 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -2471,7 +2471,19 @@ func ExperimentalMVCCDeleteRangeUsingTombstone( value.LocalTimestamp = hlc.ClockTimestamp{} } - return rw.ExperimentalPutMVCCRangeKey(rangeKey, value) + if err := rw.ExperimentalPutMVCCRangeKey(rangeKey, value); err != nil { + return err + } + + // Record the logical operation, for rangefeed emission. + rw.LogLogicalOp(MVCCDeleteRangeOpType, MVCCLogicalOpDetails{ + Safe: true, + Key: rangeKey.StartKey, + EndKey: rangeKey.EndKey, + Timestamp: rangeKey.Timestamp, + }) + + return nil } func recordIteratorStats(traceSpan *tracing.Span, iteratorStats IteratorStats) { diff --git a/pkg/storage/mvcc_key.go b/pkg/storage/mvcc_key.go index f128b69c77cd..7d80bd2b1bfc 100644 --- a/pkg/storage/mvcc_key.go +++ b/pkg/storage/mvcc_key.go @@ -13,6 +13,7 @@ package storage import ( "encoding/binary" "fmt" + "sort" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -423,3 +424,39 @@ func (k MVCCRangeKey) Validate() (err error) { return nil } } + +// FirstRangeKeyAbove does a binary search for the first range key at or above +// the given timestamp. It assumes the range keys are ordered in descending +// timestamp order, as returned by SimpleMVCCIterator.RangeKeys(). Returns false +// if no matching range key was found. +// +// TODO(erikgrinaker): Consider using a new type for []MVCCRangeKeyValue as +// returned by SimpleMVCCIterator.RangeKeys(), and add this as a method. +func FirstRangeKeyAbove(rangeKeys []MVCCRangeKeyValue, ts hlc.Timestamp) (MVCCRangeKeyValue, bool) { + // This is kind of odd due to sort.Search() semantics: we do a binary search + // for the first range tombstone that's below the timestamp, then return the + // previous range tombstone if any. + if i := sort.Search(len(rangeKeys), func(i int) bool { + return rangeKeys[i].RangeKey.Timestamp.Less(ts) + }); i > 0 { + return rangeKeys[i-1], true + } + return MVCCRangeKeyValue{}, false +} + +// HasRangeKeyBetween checks whether an MVCC range key exists between the two +// given timestamps. +func HasRangeKeyBetween(rangeKeys []MVCCRangeKeyValue, upper, lower hlc.Timestamp) bool { + if len(rangeKeys) == 0 { + return false + } + if upper.Less(lower) { + lower, upper = upper, lower + } + if rkv, ok := FirstRangeKeyAbove(rangeKeys, lower); ok { + // Consider equal timestamps to be "between". This shouldn't really happen, + // since MVCC enforces point and range keys can't have the same timestamp. + return rkv.RangeKey.Timestamp.LessEq(upper) + } + return false +} diff --git a/pkg/storage/mvcc_key_test.go b/pkg/storage/mvcc_key_test.go index 51509ed3ba92..ed17b2c31a03 100644 --- a/pkg/storage/mvcc_key_test.go +++ b/pkg/storage/mvcc_key_test.go @@ -473,6 +473,76 @@ func TestMVCCRangeKeyValidate(t *testing.T) { } } +func TestFirstRangeKeyAbove(t *testing.T) { + defer leaktest.AfterTest(t)() + + rangeKVs := []MVCCRangeKeyValue{ + rangeKV("a", "f", 6, MVCCValue{}), + rangeKV("a", "f", 4, MVCCValue{}), + rangeKV("a", "f", 3, MVCCValue{}), + rangeKV("a", "f", 1, MVCCValue{}), + } + + testcases := []struct { + ts int64 + expect int64 + }{ + {0, 1}, + {1, 1}, + {2, 3}, + {3, 3}, + {4, 4}, + {5, 6}, + {6, 6}, + {7, 0}, + } + for _, tc := range testcases { + t.Run(fmt.Sprintf("%d", tc.ts), func(t *testing.T) { + rkv, ok := FirstRangeKeyAbove(rangeKVs, hlc.Timestamp{WallTime: tc.ts}) + if tc.expect == 0 { + require.False(t, ok) + require.Empty(t, rkv) + } else { + require.True(t, ok) + require.Equal(t, rangeKV("a", "f", int(tc.expect), MVCCValue{}), rkv) + } + }) + } +} + +func TestHasRangeKeyBetween(t *testing.T) { + defer leaktest.AfterTest(t)() + + rangeKVs := []MVCCRangeKeyValue{ + rangeKV("a", "f", 5, MVCCValue{}), + rangeKV("a", "f", 1, MVCCValue{}), + } + + testcases := []struct { + lower, upper int + expect bool + }{ + {0, 0, false}, + {0, 1, true}, + {1, 0, true}, + {1, 1, true}, + {0, 2, true}, + {4, 6, true}, + {6, 4, true}, + {5, 5, true}, + {4, 4, false}, + {6, 6, false}, + {2, 4, false}, + {0, 9, true}, + {9, 0, true}, + } + for _, tc := range testcases { + t.Run(fmt.Sprintf("%d,%d", tc.lower, tc.upper), func(t *testing.T) { + require.Equal(t, tc.expect, HasRangeKeyBetween(rangeKVs, wallTS(tc.lower), wallTS(tc.upper))) + }) + } +} + func pointKey(key string, ts int) MVCCKey { return MVCCKey{Key: roachpb.Key(key), Timestamp: wallTS(ts)} } diff --git a/pkg/storage/mvcc_logical_ops.go b/pkg/storage/mvcc_logical_ops.go index 7872fb8d0610..6ce5d27480f2 100644 --- a/pkg/storage/mvcc_logical_ops.go +++ b/pkg/storage/mvcc_logical_ops.go @@ -45,6 +45,8 @@ const ( MVCCCommitIntentOpType // MVCCAbortIntentOpType corresponds to the MVCCAbortIntentOp variant. MVCCAbortIntentOpType + // MVCCDeleteRangeOpType corresponds to the MVCCDeleteRangeOp variant. + MVCCDeleteRangeOpType ) // MVCCLogicalOpDetails contains details about the occurrence of an MVCC logical @@ -52,6 +54,7 @@ const ( type MVCCLogicalOpDetails struct { Txn enginepb.TxnMeta Key roachpb.Key + EndKey roachpb.Key // only set for MVCCDeleteRangeOpType Timestamp hlc.Timestamp // Safe indicates that the values in this struct will never be invalidated @@ -142,6 +145,16 @@ func (ol *OpLoggerBatch) logLogicalOp(op MVCCLogicalOpType, details MVCCLogicalO ol.recordOp(&enginepb.MVCCAbortIntentOp{ TxnID: details.Txn.ID, }) + case MVCCDeleteRangeOpType: + if !details.Safe { + ol.opsAlloc, details.Key = ol.opsAlloc.Copy(details.Key, 0) + ol.opsAlloc, details.EndKey = ol.opsAlloc.Copy(details.EndKey, 0) + } + ol.recordOp(&enginepb.MVCCDeleteRangeOp{ + StartKey: details.Key, + EndKey: details.EndKey, + Timestamp: details.Timestamp, + }) default: panic(fmt.Sprintf("unexpected op type %v", op)) }