diff --git a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go index 4072ba00bb99..ee589f5381c7 100644 --- a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go @@ -118,6 +118,20 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error { // expect SST ingestion into spans with active changefeeds. return errors.Errorf("unexpected SST ingestion: %v", t) + case *roachpb.RangeFeedDeleteRange: + // For now, we just error on MVCC range tombstones. These are currently + // only expected to be used by schema GC and IMPORT INTO, and such spans + // should not have active changefeeds across them. + // + // TODO(erikgrinaker): Write an end-to-end test which verifies that an + // IMPORT INTO which gets rolled back using MVCC range tombstones will + // not be visible to a changefeed, neither when started before the + // import or when resuming from a timestamp before the import. The table + // decriptor should be marked as offline during the import, and catchup + // scans should detect this and prevent reading anything in that + // timespan. + return errors.Errorf("unexpected MVCC range deletion: %v", t) + default: return errors.Errorf("unexpected RangeFeedEvent variant %v", t) } diff --git a/pkg/ccl/streamingccl/streamproducer/event_stream.go b/pkg/ccl/streamingccl/streamproducer/event_stream.go index 9ea6282aa4fe..fd9b0591110b 100644 --- a/pkg/ccl/streamingccl/streamproducer/event_stream.go +++ b/pkg/ccl/streamingccl/streamproducer/event_stream.go @@ -386,6 +386,7 @@ func (s *eventStream) streamLoop(ctx context.Context, frontier *span.Frontier) e } default: // TODO(yevgeniy): Handle SSTs. + // TODO(erikgrinaker): Handle DeleteRange events (MVCC range tombstones). return errors.AssertionFailedf("unexpected event") } } diff --git a/pkg/kv/kvclient/rangefeed/config.go b/pkg/kv/kvclient/rangefeed/config.go index 56a0e235730a..2dbd650abf6f 100644 --- a/pkg/kv/kvclient/rangefeed/config.go +++ b/pkg/kv/kvclient/rangefeed/config.go @@ -41,6 +41,7 @@ type config struct { onCheckpoint OnCheckpoint onFrontierAdvance OnFrontierAdvance onSSTable OnSSTable + onDeleteRange OnDeleteRange extraPProfLabels []string } @@ -178,6 +179,25 @@ func WithOnSSTable(f OnSSTable) Option { }) } +// OnDeleteRange is called when an MVCC range tombstone is written (e.g. when +// DeleteRange is called with UseExperimentalRangeTombstone, but not when the +// range is deleted using point tombstones). If this callback is not provided, +// such range tombstones will be ignored. +// +// MVCC range tombstones are currently experimental, and disabled by default. +// They are only written when storage.CanUseExperimentalMVCCRangeTombstones() is +// true, and are only expected during certain operations like schema GC and +// IMPORT INTO (i.e. not across live tables). +type OnDeleteRange func(ctx context.Context, value *roachpb.RangeFeedDeleteRange) + +// WithOnDeleteRange sets up a callback that's invoked whenever a MVCC range +// deletion tombstone is written. +func WithOnDeleteRange(f OnDeleteRange) Option { + return optionFunc(func(c *config) { + c.onDeleteRange = f + }) +} + // OnFrontierAdvance is called when the rangefeed frontier is advanced with the // new frontier timestamp. type OnFrontierAdvance func(ctx context.Context, timestamp hlc.Timestamp) diff --git a/pkg/kv/kvclient/rangefeed/rangefeed.go b/pkg/kv/kvclient/rangefeed/rangefeed.go index fb377e484edc..25e7b21a625e 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed.go @@ -349,6 +349,10 @@ func (f *RangeFeed) processEvents( "received unexpected rangefeed SST event with no OnSSTable handler") } f.onSSTable(ctx, ev.SST) + case ev.DelRange != nil: + if f.onDeleteRange != nil { + f.onDeleteRange(ctx, ev.DelRange) + } case ev.Error != nil: // Intentionally do nothing, we'll get an error returned from the // call to RangeFeed. diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go index 4682506b5013..e0122092cb14 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go @@ -613,6 +613,118 @@ func TestWithOnSSTableCatchesUpIfNotSet(t *testing.T) { require.Equal(t, expectKVs, seenKVs) } +// TestWithDeleteRange tests that the rangefeed emits MVCC range tombstones. +func TestWithOnDeleteRange(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + srv := tc.Server(0) + db := srv.DB() + + _, _, err := tc.SplitRange(roachpb.Key("a")) + require.NoError(t, err) + require.NoError(t, tc.WaitForFullReplication()) + + _, err = tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.rangefeed.enabled = true") + require.NoError(t, err) + f, err := rangefeed.NewFactory(srv.Stopper(), db, srv.ClusterSettings(), nil) + require.NoError(t, err) + + // We lay down a couple of range tombstones and points. The first range + // tombstone should not be visible, because initial scans do not emit + // tombstones. The second one should be visible, because catchup scans do emit + // tombstones. The range tombstone should be ordered after the initial point, + // but before the catchup point. + require.NoError(t, db.ExperimentalDelRangeUsingTombstone(ctx, "c", "d")) + require.NoError(t, db.Put(ctx, "foo", "initial")) + rangeFeedTS := db.Clock().Now() + require.NoError(t, db.ExperimentalDelRangeUsingTombstone(ctx, "d", "z")) + require.NoError(t, db.Put(ctx, "foo", "catchup")) + + // We start the rangefeed over a narrower span than the DeleteRanges (c-g), + // to ensure the DeleteRange event is truncated to the registration span. + var once sync.Once + checkpointC := make(chan struct{}) + deleteRangeC := make(chan *roachpb.RangeFeedDeleteRange) + rowC := make(chan *roachpb.RangeFeedValue) + spans := []roachpb.Span{{Key: roachpb.Key("c"), EndKey: roachpb.Key("g")}} + r, err := f.RangeFeed(ctx, "test", spans, rangeFeedTS, + func(ctx context.Context, e *roachpb.RangeFeedValue) { + select { + case rowC <- e: + case <-ctx.Done(): + } + }, + rangefeed.WithInitialScan(nil), + rangefeed.WithOnCheckpoint(func(ctx context.Context, checkpoint *roachpb.RangeFeedCheckpoint) { + once.Do(func() { + close(checkpointC) + }) + }), + rangefeed.WithOnDeleteRange(func(ctx context.Context, e *roachpb.RangeFeedDeleteRange) { + select { + case deleteRangeC <- e: + case <-ctx.Done(): + } + }), + ) + require.NoError(t, err) + defer r.Close() + + // Wait for initial scan. We should see the foo=initial point, but not the + // range tombstone. + select { + case e := <-rowC: + require.Equal(t, roachpb.Key("foo"), e.Key) + value, err := e.Value.GetBytes() + require.NoError(t, err) + require.Equal(t, "initial", string(value)) + case <-time.After(3 * time.Second): + require.Fail(t, "timed out waiting for DeleteRange event") + } + + // Wait for catchup scan. We should see the second range tombstone, truncated + // to the rangefeed bounds (c-g), and it should be ordered before the point + // foo=catchup. + select { + case e := <-deleteRangeC: + require.Equal(t, roachpb.Span{Key: roachpb.Key("d"), EndKey: roachpb.Key("g")}, e.Span) + require.NotEmpty(t, e.Timestamp) + case <-time.After(3 * time.Second): + require.Fail(t, "timed out waiting for DeleteRange event") + } + + select { + case e := <-rowC: + require.Equal(t, roachpb.Key("foo"), e.Key) + value, err := e.Value.GetBytes() + require.NoError(t, err) + require.Equal(t, "catchup", string(value)) + case <-time.After(3 * time.Second): + require.Fail(t, "timed out waiting for DeleteRange event") + } + + // Wait for checkpoint after catchup scan. + select { + case <-checkpointC: + case <-time.After(3 * time.Second): + require.Fail(t, "timed out waiting for checkpoint") + } + + // Send another DeleteRange, and wait for the rangefeed event. This should + // be truncated to the rangefeed bounds (c-g). + require.NoError(t, db.ExperimentalDelRangeUsingTombstone(ctx, "a", "z")) + select { + case e := <-deleteRangeC: + require.Equal(t, roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("g")}, e.Span) + require.NotEmpty(t, e.Timestamp) + case <-time.After(3 * time.Second): + require.Fail(t, "timed out waiting for DeleteRange event") + } +} + // TestUnrecoverableErrors verifies that unrecoverable internal errors are surfaced // to callers. func TestUnrecoverableErrors(t *testing.T) { diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan.go b/pkg/kv/kvserver/rangefeed/catchup_scan.go index ab349d8ad6cc..c8de0c146120 100644 --- a/pkg/kv/kvserver/rangefeed/catchup_scan.go +++ b/pkg/kv/kvserver/rangefeed/catchup_scan.go @@ -25,7 +25,8 @@ import ( // A CatchUpIterator is an iterator for catchUp-scans. type CatchUpIterator struct { storage.SimpleMVCCIterator - close func() + rangeKeyIter *storage.MVCCRangeKeyIterator + close func() } // NewCatchUpIterator returns a CatchUpIterator for the given Reader. @@ -68,6 +69,12 @@ func NewCatchUpIterator( }) } + ret.rangeKeyIter = storage.NewMVCCRangeKeyIterator(reader, storage.MVCCRangeKeyIterOptions{ + LowerBound: args.Span.Key, + UpperBound: args.Span.EndKey, + MinTimestamp: args.Timestamp, + }) + return ret } @@ -75,6 +82,9 @@ func NewCatchUpIterator( // callback. func (i *CatchUpIterator) Close() { i.SimpleMVCCIterator.Close() + if i.rangeKeyIter != nil { // can be nil in tests + i.rangeKeyIter.Close() + } if i.close != nil { i.close() } @@ -94,6 +104,23 @@ func (i *CatchUpIterator) CatchUpScan( withDiff bool, outputFn outputEventFn, ) error { + // We emit any MVCC range tombstones first, so that the consumer can use them + // to potentially discard point keys below them if they wish. + // + // Note that MVCC range tombstones are currently experimental, and will only + // be used when storage.CanUseExperimentalMVCCRangeTombstones() is enabled. + // + // TODO(erikgrinaker): We don't pass in startKey, endKey, and catchUpTimestamp + // here because we've already set the appropriate bounds when the iterator was + // constructed. This assumes that the caller always passes in the same bounds + // to CatchUpScan() as it does to NewCatchUpIterator(), otherwise this will + // emit incorrect events. Verify that this is always the case, and remove the + // redundant parameters to this method as it is unsafe to pass other values. + err := i.catchUpScanRangeTombstones(outputFn) + if err != nil { + return err + } + var a bufalloc.ByteAllocator // MVCCIterator will encounter historical values for each key in // reverse-chronological order. To output in chronological order, store @@ -242,3 +269,37 @@ func (i *CatchUpIterator) CatchUpScan( // Output events for the last key encountered. return outputEvents() } + +// catchupScanRangeTombstones runs a catchup scan for range tombstones. +func (i *CatchUpIterator) catchUpScanRangeTombstones(outputFn outputEventFn) error { + if i.rangeKeyIter == nil { // can be nil in tests + return nil + } + + for { + if ok, err := i.rangeKeyIter.Valid(); err != nil { + return err + } else if !ok { + break + } + + rangeKey, value := i.rangeKeyIter.Key(), i.rangeKeyIter.Value() + if len(value) > 0 { + // We currently expect all range keys to be tombstones. + return errors.AssertionFailedf("unexpected non-tombstone range key %s with value %x", + rangeKey, value) + } + + err := outputFn(&roachpb.RangeFeedEvent{ + DelRange: &roachpb.RangeFeedDeleteRange{ + Span: roachpb.Span{Key: rangeKey.StartKey, EndKey: rangeKey.EndKey}, + Timestamp: rangeKey.Timestamp, + }, + }) + if err != nil { + return err + } + i.rangeKeyIter.Next() + } + return nil +} diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go b/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go index eb9a34a7d0a9..accc28a8698f 100644 --- a/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go +++ b/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go @@ -188,7 +188,7 @@ func setupMVCCPebble(b testing.TB, dir string, lBaseMaxBytes int64, readOnly boo opts.FS = vfs.Default opts.LBaseMaxBytes = lBaseMaxBytes opts.ReadOnly = readOnly - opts.FormatMajorVersion = pebble.FormatBlockPropertyCollector + opts.FormatMajorVersion = pebble.FormatRangeKeys peb, err := storage.NewPebble( context.Background(), storage.PebbleConfig{ diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 7f309d60b3ab..75627e7f3e76 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -629,6 +629,10 @@ func (p *Processor) consumeLogicalOps(ctx context.Context, ops []enginepb.MVCCLo // Publish the new value directly. p.publishValue(ctx, t.Key, t.Timestamp, t.Value, t.PrevValue) + case *enginepb.MVCCDeleteRangeOp: + // Publish the range deletion directly. + p.publishDeleteRange(ctx, t.StartKey, t.EndKey, t.Timestamp) + case *enginepb.MVCCWriteIntentOp: // No updates to publish. @@ -698,6 +702,22 @@ func (p *Processor) publishValue( p.reg.PublishToOverlapping(roachpb.Span{Key: key}, &event) } +func (p *Processor) publishDeleteRange( + ctx context.Context, startKey, endKey roachpb.Key, timestamp hlc.Timestamp, +) { + span := roachpb.Span{Key: startKey, EndKey: endKey} + if !p.Span.ContainsKeyRange(roachpb.RKey(startKey), roachpb.RKey(endKey)) { + log.Fatalf(ctx, "span %s not in Processor's key range %v", span, p.Span) + } + + var event roachpb.RangeFeedEvent + event.MustSetValue(&roachpb.RangeFeedDeleteRange{ + Span: span, + Timestamp: timestamp, + }) + p.reg.PublishToOverlapping(span, &event) +} + func (p *Processor) publishSSTable( ctx context.Context, sst []byte, sstSpan roachpb.Span, sstWTS hlc.Timestamp, ) { diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index 919cf3505105..b1fa1cbc24ca 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -169,6 +169,13 @@ func (r *registration) validateEvent(event *roachpb.RangeFeedEvent) { if t.WriteTS.IsEmpty() { panic(fmt.Sprintf("unexpected empty RangeFeedSSTable.Timestamp: %v", t)) } + case *roachpb.RangeFeedDeleteRange: + if len(t.Span.Key) == 0 || len(t.Span.EndKey) == 0 { + panic(fmt.Sprintf("unexpected empty key in RangeFeedDeleteRange.Span: %v", t)) + } + if t.Timestamp.IsEmpty() { + panic(fmt.Sprintf("unexpected empty RangeFeedDeleteRange.Timestamp: %v", t)) + } default: panic(fmt.Sprintf("unexpected RangeFeedEvent variant: %v", t)) } @@ -215,6 +222,13 @@ func (r *registration) maybeStripEvent(event *roachpb.RangeFeedEvent) *roachpb.R t = copyOnWrite().(*roachpb.RangeFeedCheckpoint) t.Span = r.span } + case *roachpb.RangeFeedDeleteRange: + // Truncate the range tombstone to the registration bounds. + if i := t.Span.Intersect(r.span); !i.Equal(t.Span) { + t = copyOnWrite().(*roachpb.RangeFeedDeleteRange) + t.Span = i.Clone() + } + case *roachpb.RangeFeedSSTable: // SSTs are always sent in their entirety, it is up to the caller to // filter out irrelevant entries. @@ -389,6 +403,8 @@ func (reg *registry) PublishToOverlapping(span roachpb.Span, event *roachpb.Rang minTS = t.Value.Timestamp case *roachpb.RangeFeedSSTable: minTS = t.WriteTS + case *roachpb.RangeFeedDeleteRange: + minTS = t.Timestamp case *roachpb.RangeFeedCheckpoint: // Always publish checkpoint notifications, regardless of a registration's // starting timestamp. diff --git a/pkg/kv/kvserver/rangefeed/resolved_timestamp.go b/pkg/kv/kvserver/rangefeed/resolved_timestamp.go index 0119b735bc20..4b7e6dd6f743 100644 --- a/pkg/kv/kvserver/rangefeed/resolved_timestamp.go +++ b/pkg/kv/kvserver/rangefeed/resolved_timestamp.go @@ -142,6 +142,10 @@ func (rts *resolvedTimestamp) consumeLogicalOp(op enginepb.MVCCLogicalOp) bool { rts.assertOpAboveRTS(op, t.Timestamp) return false + case *enginepb.MVCCDeleteRangeOp: + rts.assertOpAboveRTS(op, t.Timestamp) + return false + case *enginepb.MVCCWriteIntentOp: rts.assertOpAboveRTS(op, t.Timestamp) return rts.intentQ.IncRef(t.TxnID, t.TxnKey, t.TxnMinTimestamp, t.Timestamp) diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 25f18ed294c3..a55b0d3080cc 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -520,7 +520,8 @@ func (r *Replica) populatePrevValsInLogicalOpLogRaftMuLocked( case *enginepb.MVCCWriteIntentOp, *enginepb.MVCCUpdateIntentOp, *enginepb.MVCCAbortIntentOp, - *enginepb.MVCCAbortTxnOp: + *enginepb.MVCCAbortTxnOp, + *enginepb.MVCCDeleteRangeOp: // Nothing to do. continue default: @@ -594,7 +595,8 @@ func (r *Replica) handleLogicalOpLogRaftMuLocked( case *enginepb.MVCCWriteIntentOp, *enginepb.MVCCUpdateIntentOp, *enginepb.MVCCAbortIntentOp, - *enginepb.MVCCAbortTxnOp: + *enginepb.MVCCAbortTxnOp, + *enginepb.MVCCDeleteRangeOp: // Nothing to do. continue default: diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 2514b5ef09e3..0863b39ecc83 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -1480,6 +1480,9 @@ func (e *RangeFeedEvent) ShallowCopy() *RangeFeedEvent { case *RangeFeedSSTable: cpySST := *t cpy.MustSetValue(&cpySST) + case *RangeFeedDeleteRange: + cpyDelRange := *t + cpy.MustSetValue(&cpyDelRange) case *RangeFeedError: cpyErr := *t cpy.MustSetValue(&cpyErr) diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index 23ffd29e018a..f65f5ab1058c 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -2660,15 +2660,24 @@ message RangeFeedSSTable { util.hlc.Timestamp write_ts = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "WriteTS"]; } +// RangeFeedDeleteRange is a variant of RangeFeedEvent that represents a +// deletion of the specified key range at the given timestamp using an MVCC +// range tombstone. +message RangeFeedDeleteRange { + Span span = 1 [(gogoproto.nullable) = false]; + util.hlc.Timestamp timestamp = 2 [(gogoproto.nullable) = false]; +} + // RangeFeedEvent is a union of all event types that may be returned on a // RangeFeed response stream. message RangeFeedEvent { option (gogoproto.onlyone) = true; - RangeFeedValue val = 1; - RangeFeedCheckpoint checkpoint = 2; - RangeFeedError error = 3; - RangeFeedSSTable sst = 4 [(gogoproto.customname) = "SST"]; + RangeFeedValue val = 1; + RangeFeedCheckpoint checkpoint = 2; + RangeFeedError error = 3; + RangeFeedSSTable sst = 4 [(gogoproto.customname) = "SST"]; + RangeFeedDeleteRange del_range = 5; } diff --git a/pkg/roachpb/data.go b/pkg/roachpb/data.go index 1d8fdacd8317..9ab17227c8c2 100644 --- a/pkg/roachpb/data.go +++ b/pkg/roachpb/data.go @@ -2073,6 +2073,11 @@ func (u *LockUpdate) SetTxn(txn *Transaction) { u.IgnoredSeqNums = txn.IgnoredSeqNums } +// Clone returns a copy of the span. +func (s Span) Clone() Span { + return Span{Key: s.Key.Clone(), EndKey: s.EndKey.Clone()} +} + // EqualValue is Equal. // // TODO(tbg): remove this passthrough. diff --git a/pkg/storage/enginepb/mvcc3.proto b/pkg/storage/enginepb/mvcc3.proto index a528febf764f..66d08ce50200 100644 --- a/pkg/storage/enginepb/mvcc3.proto +++ b/pkg/storage/enginepb/mvcc3.proto @@ -303,6 +303,15 @@ message MVCCAbortTxnOp { (gogoproto.nullable) = false]; } +// MVCCDeleteRangeOp corresponds to a range deletion using an MVCC range +// tombstone. +message MVCCDeleteRangeOp { + bytes start_key = 1; + bytes end_key = 2; + util.hlc.Timestamp timestamp = 3 [(gogoproto.nullable) = false]; +} + + // MVCCLogicalOp is a union of all logical MVCC operation types. message MVCCLogicalOp { option (gogoproto.onlyone) = true; @@ -313,4 +322,5 @@ message MVCCLogicalOp { MVCCCommitIntentOp commit_intent = 4; MVCCAbortIntentOp abort_intent = 5; MVCCAbortTxnOp abort_txn = 6; + MVCCDeleteRangeOp delete_range = 7; } diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 3bd3548e1e3e..8a56c9c2fc0b 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -2255,7 +2255,18 @@ func ExperimentalMVCCDeleteRangeUsingTombstone( func experimentalMVCCDeleteRangeUsingTombstoneInternal( ctx context.Context, rw ReadWriter, ms *enginepb.MVCCStats, rangeKey MVCCRangeKey, ) error { - return rw.ExperimentalPutMVCCRangeKey(rangeKey, nil) + if err := rw.ExperimentalPutMVCCRangeKey(rangeKey, nil); err != nil { + return err + } + + rw.LogLogicalOp(MVCCDeleteRangeOpType, MVCCLogicalOpDetails{ + Safe: true, + Key: rangeKey.StartKey, + EndKey: rangeKey.EndKey, + Timestamp: rangeKey.Timestamp, + }) + + return nil } func recordIteratorStats(traceSpan *tracing.Span, iteratorStats IteratorStats) { diff --git a/pkg/storage/mvcc_logical_ops.go b/pkg/storage/mvcc_logical_ops.go index cc198509c243..8cce1bb0241f 100644 --- a/pkg/storage/mvcc_logical_ops.go +++ b/pkg/storage/mvcc_logical_ops.go @@ -44,6 +44,8 @@ const ( MVCCCommitIntentOpType // MVCCAbortIntentOpType corresponds to the MVCCAbortIntentOp variant. MVCCAbortIntentOpType + // MVCCDeleteRangeOpType corresponds to the MVCCDeleteRangeOp variant. + MVCCDeleteRangeOpType ) // MVCCLogicalOpDetails contains details about the occurrence of an MVCC logical @@ -51,6 +53,7 @@ const ( type MVCCLogicalOpDetails struct { Txn enginepb.TxnMeta Key roachpb.Key + EndKey roachpb.Key // only set for MVCCDeleteRangeOpType Timestamp hlc.Timestamp // Safe indicates that the values in this struct will never be invalidated @@ -133,6 +136,16 @@ func (ol *OpLoggerBatch) logLogicalOp(op MVCCLogicalOpType, details MVCCLogicalO ol.recordOp(&enginepb.MVCCAbortIntentOp{ TxnID: details.Txn.ID, }) + case MVCCDeleteRangeOpType: + if !details.Safe { + ol.opsAlloc, details.Key = ol.opsAlloc.Copy(details.Key, 0) + ol.opsAlloc, details.EndKey = ol.opsAlloc.Copy(details.EndKey, 0) + } + ol.recordOp(&enginepb.MVCCDeleteRangeOp{ + StartKey: details.Key, + EndKey: details.EndKey, + Timestamp: details.Timestamp, + }) default: panic(fmt.Sprintf("unexpected op type %v", op)) } diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 533adb4af8b4..3fd10ba9900a 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -1000,6 +1000,12 @@ func (p *Pebble) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) MVCCIt } return iter } + // If this Pebble database does not support range keys yet, fall back to only + // iterating over point keys to avoid errors. This is effectively the same, + // since a database without range key support contains no range keys. + if opts.KeyTypes != IterKeyTypePointsOnly && p.db.FormatMajorVersion() < pebble.FormatRangeKeys { + opts.KeyTypes = IterKeyTypePointsOnly + } iter := newPebbleIterator(p.db, nil, opts) if iter == nil { @@ -1013,6 +1019,10 @@ func (p *Pebble) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) MVCCIt // NewEngineIterator implements the Engine interface. func (p *Pebble) NewEngineIterator(opts IterOptions) EngineIterator { + // TODO(erikgrinaker): Reconsider this if necessary. + if opts.KeyTypes != IterKeyTypePointsOnly { + panic("range keys are not supported for engine iterators, only MVCC") + } iter := newPebbleIterator(p.db, nil, opts) if iter == nil { panic("couldn't create a new iterator") @@ -1457,8 +1467,9 @@ func (p *Pebble) NewUnindexedBatch(writeOnly bool) Batch { // NewSnapshot implements the Engine interface. func (p *Pebble) NewSnapshot() Reader { return &pebbleSnapshot{ - snapshot: p.db.NewSnapshot(), - settings: p.settings, + snapshot: p.db.NewSnapshot(), + settings: p.settings, + formatMajorVersion: p.db.FormatMajorVersion(), } } @@ -1799,6 +1810,11 @@ func (p *pebbleReadOnly) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions return iter } + if opts.KeyTypes != IterKeyTypePointsOnly && + p.parent.db.FormatMajorVersion() < pebble.FormatRangeKeys { + opts.KeyTypes = IterKeyTypePointsOnly + } + if !opts.MinTimestampHint.IsEmpty() { // MVCCIterators that specify timestamp bounds cannot be cached. iter := MVCCIterator(newPebbleIterator(p.parent.db, nil, opts)) @@ -1842,6 +1858,9 @@ func (p *pebbleReadOnly) NewEngineIterator(opts IterOptions) EngineIterator { if p.closed { panic("using a closed pebbleReadOnly") } + if opts.KeyTypes != IterKeyTypePointsOnly { + panic("range keys are not supported for engine iterators, only MVCC") + } iter := &p.normalEngineIter if opts.Prefix { @@ -1979,9 +1998,10 @@ func (p *pebbleReadOnly) LogLogicalOp(op MVCCLogicalOpType, details MVCCLogicalO // pebbleSnapshot represents a snapshot created using Pebble.NewSnapshot(). type pebbleSnapshot struct { - snapshot *pebble.Snapshot - settings *cluster.Settings - closed bool + snapshot *pebble.Snapshot + settings *cluster.Settings + formatMajorVersion pebble.FormatMajorVersion + closed bool } var _ Reader = &pebbleSnapshot{} @@ -2067,6 +2087,11 @@ func (p *pebbleSnapshot) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions } return iter } + + if opts.KeyTypes != IterKeyTypePointsOnly && p.formatMajorVersion < pebble.FormatRangeKeys { + opts.KeyTypes = IterKeyTypePointsOnly + } + iter := MVCCIterator(newPebbleIterator(p.snapshot, nil, opts)) if util.RaceEnabled { iter = wrapInUnsafeIter(iter) @@ -2076,6 +2101,9 @@ func (p *pebbleSnapshot) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions // NewEngineIterator implements the Reader interface. func (p pebbleSnapshot) NewEngineIterator(opts IterOptions) EngineIterator { + if opts.KeyTypes != IterKeyTypePointsOnly { + panic("range keys are not supported for engine iterators, only MVCC") + } return newPebbleIterator(p.snapshot, nil, opts) } diff --git a/pkg/storage/pebble_batch.go b/pkg/storage/pebble_batch.go index b7cff9d7661b..c6db2633e532 100644 --- a/pkg/storage/pebble_batch.go +++ b/pkg/storage/pebble_batch.go @@ -207,6 +207,10 @@ func (p *pebbleBatch) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) M return iter } + if opts.KeyTypes != IterKeyTypePointsOnly && p.db.FormatMajorVersion() < pebble.FormatRangeKeys { + opts.KeyTypes = IterKeyTypePointsOnly + } + if !opts.MinTimestampHint.IsEmpty() { // MVCCIterators that specify timestamp bounds cannot be cached. iter := MVCCIterator(newPebbleIterator(p.batch, nil, opts)) @@ -258,6 +262,10 @@ func (p *pebbleBatch) NewEngineIterator(opts IterOptions) EngineIterator { panic("write-only batch") } + if opts.KeyTypes != IterKeyTypePointsOnly { + panic("range keys are not supported for engine iterators, only MVCC") + } + iter := &p.normalEngineIter if opts.Prefix { iter = &p.prefixEngineIter