From b00015135a1a1c56aab403ee75185d3b4edb617c Mon Sep 17 00:00:00 2001 From: Casper Date: Wed, 20 Jul 2022 16:51:40 -0400 Subject: [PATCH 1/2] kv: create RangeFeedMessage type to encapsulate RangeFeedEvent This PR creates kvcoor.RangeFeedMessage encapsulating RangeFeedEvent and additional RegisteredSpan field which is the span of the rangefeed registration that emits this event. This field is to help rangefeed clients to properly trim the received SST with only single registered span. In the case of creating rangefeed with multiple spans, rangefeed consumers may end up receiving duplicate SST without proper trimming using this field. Release note: None --- .../changefeedccl/kvfeed/physical_kv_feed.go | 6 +- .../streamproducer/event_stream.go | 7 +- pkg/kv/kvclient/kvcoord/BUILD.bazel | 1 + .../kvclient/kvcoord/dist_sender_rangefeed.go | 12 ++-- pkg/kv/kvclient/kvcoord/rangefeed_message.go | 24 +++++++ pkg/kv/kvclient/rangefeed/BUILD.bazel | 1 + pkg/kv/kvclient/rangefeed/config.go | 8 ++- pkg/kv/kvclient/rangefeed/db_adapter.go | 2 +- .../rangefeed/mocks_generated_test.go | 3 +- pkg/kv/kvclient/rangefeed/rangefeed.go | 9 +-- .../rangefeed/rangefeed_external_test.go | 23 ++++--- .../kvclient/rangefeed/rangefeed_mock_test.go | 67 ++++++++++--------- pkg/kv/kvnemesis/watcher.go | 4 +- pkg/kv/kvserver/client_rangefeed_test.go | 13 ++-- pkg/kv/kvserver/replica_rangefeed_test.go | 22 ++++-- 15 files changed, 133 insertions(+), 69 deletions(-) create mode 100644 pkg/kv/kvclient/kvcoord/rangefeed_message.go diff --git a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go index 142a9f700a88..ff52d56be302 100644 --- a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go @@ -37,13 +37,13 @@ type rangefeedFactory func( ctx context.Context, spans []kvcoord.SpanTimePair, withDiff bool, - eventC chan<- *roachpb.RangeFeedEvent, + eventC chan<- kvcoord.RangeFeedMessage, ) error type rangefeed struct { memBuf kvevent.Writer cfg rangeFeedConfig - eventC chan *roachpb.RangeFeedEvent + eventC chan kvcoord.RangeFeedMessage knobs TestingKnobs } @@ -68,7 +68,7 @@ func (p rangefeedFactory) Run(ctx context.Context, sink kvevent.Writer, cfg rang feed := rangefeed{ memBuf: sink, cfg: cfg, - eventC: make(chan *roachpb.RangeFeedEvent, 128), + eventC: make(chan kvcoord.RangeFeedMessage, 128), knobs: cfg.Knobs, } g := ctxgroup.WithContext(ctx) diff --git a/pkg/ccl/streamingccl/streamproducer/event_stream.go b/pkg/ccl/streamingccl/streamproducer/event_stream.go index 67a6389cc464..dcfa762c2fbb 100644 --- a/pkg/ccl/streamingccl/streamproducer/event_stream.go +++ b/pkg/ccl/streamingccl/streamproducer/event_stream.go @@ -250,11 +250,14 @@ func (s *eventStream) onSpanCompleted(ctx context.Context, sp roachpb.Span) erro } } -func (s *eventStream) onSSTable(ctx context.Context, sst *roachpb.RangeFeedSSTable) { +func (s *eventStream) onSSTable( + ctx context.Context, sst *roachpb.RangeFeedSSTable, registeredSpan roachpb.Span, +) { select { case <-ctx.Done(): case s.eventsCh <- roachpb.RangeFeedEvent{SST: sst}: - log.VInfof(ctx, 1, "onSSTable: %s@%s", sst.Span, sst.WriteTS) + log.VInfof(ctx, 1, "onSSTable: %s@%s with registered span %s", + sst.Span, sst.WriteTS, registeredSpan) } } diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index 31656644fc01..8db3465220c2 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "lock_spans_over_budget_error.go", "node_store.go", "range_iter.go", + "rangefeed_message.go", "replica_slice.go", "testing_knobs.go", "transport.go", diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index ac0e856fc64a..5106f5e209aa 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -87,7 +87,7 @@ func (ds *DistSender) RangeFeed( spans []roachpb.Span, startAfter hlc.Timestamp, // exclusive withDiff bool, - eventCh chan<- *roachpb.RangeFeedEvent, + eventCh chan<- RangeFeedMessage, ) error { timedSpans := make([]SpanTimePair, 0, len(spans)) for _, sp := range spans { @@ -110,7 +110,7 @@ type SpanTimePair struct { // RangeFeedSpans is similar to RangeFeed but allows specification of different // starting time for each span. func (ds *DistSender) RangeFeedSpans( - ctx context.Context, spans []SpanTimePair, withDiff bool, eventCh chan<- *roachpb.RangeFeedEvent, + ctx context.Context, spans []SpanTimePair, withDiff bool, eventCh chan<- RangeFeedMessage, ) error { if len(spans) == 0 { return errors.AssertionFailedf("expected at least 1 span, got none") @@ -299,7 +299,7 @@ func (ds *DistSender) partialRangeFeed( withDiff bool, catchupSem *limit.ConcurrentRequestLimiter, rangeCh chan<- singleRangeInfo, - eventCh chan<- *roachpb.RangeFeedEvent, + eventCh chan<- RangeFeedMessage, ) error { // Bound the partial rangefeed to the partial span. span := rs.AsRawSpanWithNoLocals() @@ -407,7 +407,7 @@ func (ds *DistSender) singleRangeFeed( withDiff bool, desc *roachpb.RangeDescriptor, catchupSem *limit.ConcurrentRequestLimiter, - eventCh chan<- *roachpb.RangeFeedEvent, + eventCh chan<- RangeFeedMessage, onRangeEvent onRangeEventCb, ) (hlc.Timestamp, error) { // Ensure context is cancelled on all errors, to prevent gRPC stream leaks. @@ -491,6 +491,7 @@ func (ds *DistSender) singleRangeFeed( if err != nil { return args.Timestamp, err } + msg := RangeFeedMessage{RangeFeedEvent: event, RegisteredSpan: span} switch t := event.GetValue().(type) { case *roachpb.RangeFeedCheckpoint: if t.Span.Contains(args.Span) { @@ -500,6 +501,7 @@ func (ds *DistSender) singleRangeFeed( } args.Timestamp.Forward(t.ResolvedTS.Next()) } + case *roachpb.RangeFeedSSTable: case *roachpb.RangeFeedError: log.VErrEventf(ctx, 2, "RangeFeedError: %s", t.Error.GoError()) if catchupRes != nil { @@ -510,7 +512,7 @@ func (ds *DistSender) singleRangeFeed( onRangeEvent(args.Replica.NodeID, desc.RangeID, event) select { - case eventCh <- event: + case eventCh <- msg: case <-ctx.Done(): return args.Timestamp, ctx.Err() } diff --git a/pkg/kv/kvclient/kvcoord/rangefeed_message.go b/pkg/kv/kvclient/kvcoord/rangefeed_message.go new file mode 100644 index 000000000000..0244a3c88797 --- /dev/null +++ b/pkg/kv/kvclient/kvcoord/rangefeed_message.go @@ -0,0 +1,24 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvcoord + +import "github.com/cockroachdb/cockroach/pkg/roachpb" + +// RangeFeedMessage is a type that encapsulates the roachpb.RangeFeedEvent. +type RangeFeedMessage struct { + + // RangeFeed event this message holds. + *roachpb.RangeFeedEvent + + // The span of the rangefeed registration that overlaps with SST span if + // event has an underlying roachpb.RangeFeedSSTable event. + RegisteredSpan roachpb.Span +} diff --git a/pkg/kv/kvclient/rangefeed/BUILD.bazel b/pkg/kv/kvclient/rangefeed/BUILD.bazel index 88ccae035884..c2206f4d9f69 100644 --- a/pkg/kv/kvclient/rangefeed/BUILD.bazel +++ b/pkg/kv/kvclient/rangefeed/BUILD.bazel @@ -66,6 +66,7 @@ go_test( "//pkg/base", "//pkg/keys", "//pkg/kv", + "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvserver", "//pkg/roachpb", "//pkg/security/securityassets", diff --git a/pkg/kv/kvclient/rangefeed/config.go b/pkg/kv/kvclient/rangefeed/config.go index bb16fda15ff3..c3d5621f421a 100644 --- a/pkg/kv/kvclient/rangefeed/config.go +++ b/pkg/kv/kvclient/rangefeed/config.go @@ -158,6 +158,8 @@ func WithOnCheckpoint(f OnCheckpoint) Option { // provided, a catchup scan will be run instead that will include the contents // of these SSTs. // +// 'registeredSpan' is a span of rangefeed registration that emits the SSTable. +// // Note that the SST is emitted as it was ingested, so it may contain keys // outside of the rangefeed span, and the caller should prune the SST contents // as appropriate. Futhermore, these events do not contain previous values as @@ -169,7 +171,11 @@ func WithOnCheckpoint(f OnCheckpoint) Option { // MVCCHistoryMutationError and thus will not be emitted here -- there should be // no such requests into spans with rangefeeds across them, but it is up to // callers to ensure this. -type OnSSTable func(ctx context.Context, sst *roachpb.RangeFeedSSTable) +type OnSSTable func( + ctx context.Context, + sst *roachpb.RangeFeedSSTable, + registeredSpan roachpb.Span, +) // WithOnSSTable sets up a callback that's invoked whenever an SSTable is // ingested. diff --git a/pkg/kv/kvclient/rangefeed/db_adapter.go b/pkg/kv/kvclient/rangefeed/db_adapter.go index 237993e4ed8c..759091e9909b 100644 --- a/pkg/kv/kvclient/rangefeed/db_adapter.go +++ b/pkg/kv/kvclient/rangefeed/db_adapter.go @@ -73,7 +73,7 @@ func (dbc *dbAdapter) RangeFeed( spans []roachpb.Span, startFrom hlc.Timestamp, withDiff bool, - eventC chan<- *roachpb.RangeFeedEvent, + eventC chan<- kvcoord.RangeFeedMessage, ) error { return dbc.distSender.RangeFeed(ctx, spans, startFrom, withDiff, eventC) } diff --git a/pkg/kv/kvclient/rangefeed/mocks_generated_test.go b/pkg/kv/kvclient/rangefeed/mocks_generated_test.go index b126499dae55..b0472217ad47 100644 --- a/pkg/kv/kvclient/rangefeed/mocks_generated_test.go +++ b/pkg/kv/kvclient/rangefeed/mocks_generated_test.go @@ -8,6 +8,7 @@ import ( context "context" reflect "reflect" + kvcoord "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" roachpb "github.com/cockroachdb/cockroach/pkg/roachpb" hlc "github.com/cockroachdb/cockroach/pkg/util/hlc" gomock "github.com/golang/mock/gomock" @@ -37,7 +38,7 @@ func (m *MockDB) EXPECT() *MockDBMockRecorder { } // RangeFeed mocks base method. -func (m *MockDB) RangeFeed(arg0 context.Context, arg1 []roachpb.Span, arg2 hlc.Timestamp, arg3 bool, arg4 chan<- *roachpb.RangeFeedEvent) error { +func (m *MockDB) RangeFeed(arg0 context.Context, arg1 []roachpb.Span, arg2 hlc.Timestamp, arg3 bool, arg4 chan<- kvcoord.RangeFeedMessage) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RangeFeed", arg0, arg1, arg2, arg3, arg4) ret0, _ := ret[0].(error) diff --git a/pkg/kv/kvclient/rangefeed/rangefeed.go b/pkg/kv/kvclient/rangefeed/rangefeed.go index e9ee7a2cfd03..eda29adfdca1 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -54,7 +55,7 @@ type DB interface { spans []roachpb.Span, startFrom hlc.Timestamp, withDiff bool, - eventC chan<- *roachpb.RangeFeedEvent, + eventC chan<- kvcoord.RangeFeedMessage, ) error // Scan encapsulates scanning a key span at a given point in time. The method @@ -279,7 +280,7 @@ func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) { // TODO(ajwerner): Consider adding event buffering. Doing so would require // draining when the rangefeed fails. - eventCh := make(chan *roachpb.RangeFeedEvent) + eventCh := make(chan kvcoord.RangeFeedMessage) for i := 0; r.Next(); i++ { ts := frontier.Frontier() @@ -333,7 +334,7 @@ func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) { // processEvents processes events sent by the rangefeed on the eventCh. func (f *RangeFeed) processEvents( - ctx context.Context, frontier *span.Frontier, eventCh <-chan *roachpb.RangeFeedEvent, + ctx context.Context, frontier *span.Frontier, eventCh <-chan kvcoord.RangeFeedMessage, ) error { for { select { @@ -357,7 +358,7 @@ func (f *RangeFeed) processEvents( return errors.AssertionFailedf( "received unexpected rangefeed SST event with no OnSSTable handler") } - f.onSSTable(ctx, ev.SST) + f.onSSTable(ctx, ev.SST, ev.RegisteredSpan) case ev.DeleteRange != nil: if f.onDeleteRange == nil { if kvserverbase.GlobalMVCCRangeTombstoneForTesting { diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go index 2f3c355823cf..b9002ec1c24a 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -490,7 +491,7 @@ func TestWithOnSSTable(t *testing.T) { // narrower. var once sync.Once checkpointC := make(chan struct{}) - sstC := make(chan *roachpb.RangeFeedSSTable) + sstC := make(chan kvcoord.RangeFeedMessage) spans := []roachpb.Span{{Key: roachpb.Key("c"), EndKey: roachpb.Key("e")}} r, err := f.RangeFeed(ctx, "test", spans, db.Clock().Now(), func(ctx context.Context, value *roachpb.RangeFeedValue) {}, @@ -499,9 +500,14 @@ func TestWithOnSSTable(t *testing.T) { close(checkpointC) }) }), - rangefeed.WithOnSSTable(func(ctx context.Context, sst *roachpb.RangeFeedSSTable) { + rangefeed.WithOnSSTable(func(ctx context.Context, sst *roachpb.RangeFeedSSTable, registeredSpan roachpb.Span) { select { - case sstC <- sst: + case sstC <- kvcoord.RangeFeedMessage{ + RangeFeedEvent: &roachpb.RangeFeedEvent{ + SST: sst, + }, + RegisteredSpan: registeredSpan, + }: case <-ctx.Done(): } }), @@ -533,16 +539,17 @@ func TestWithOnSSTable(t *testing.T) { require.Nil(t, pErr) // Wait for the SST event and check its contents. - var sstEvent *roachpb.RangeFeedSSTable + var sstMessage kvcoord.RangeFeedMessage select { - case sstEvent = <-sstC: + case sstMessage = <-sstC: case <-time.After(3 * time.Second): require.Fail(t, "timed out waiting for SST event") } - require.Equal(t, roachpb.Span{Key: sstStart, EndKey: sstEnd}, sstEvent.Span) - require.Equal(t, now, sstEvent.WriteTS) - require.Equal(t, sstKVs, storageutils.ScanSST(t, sstEvent.Data)) + require.Equal(t, roachpb.Span{Key: sstStart, EndKey: sstEnd}, sstMessage.SST.Span) + require.Equal(t, now, sstMessage.SST.WriteTS) + require.Equal(t, sstKVs, storageutils.ScanSST(t, sstMessage.SST.Data)) + require.Equal(t, spans[0], sstMessage.RegisteredSpan) } // TestWithOnSSTableCatchesUpIfNotSet tests that the rangefeed runs a catchup diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go index d104c05834e8..8565b10b4baa 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go @@ -16,6 +16,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -35,7 +36,7 @@ type mockClient struct { spans []roachpb.Span, startFrom hlc.Timestamp, withDiff bool, - eventC chan<- *roachpb.RangeFeedEvent, + eventC chan<- kvcoord.RangeFeedMessage, ) error scan func( @@ -52,7 +53,7 @@ func (m *mockClient) RangeFeed( spans []roachpb.Span, startFrom hlc.Timestamp, withDiff bool, - eventC chan<- *roachpb.RangeFeedEvent, + eventC chan<- kvcoord.RangeFeedMessage, ) error { return m.rangefeed(ctx, spans, startFrom, withDiff, eventC) } @@ -162,15 +163,16 @@ func TestRangeFeedMock(t *testing.T) { return nil }, rangefeed: func( - ctx context.Context, spans []roachpb.Span, startFrom hlc.Timestamp, withDiff bool, eventC chan<- *roachpb.RangeFeedEvent, + ctx context.Context, spans []roachpb.Span, startFrom hlc.Timestamp, withDiff bool, eventC chan<- kvcoord.RangeFeedMessage, ) error { assert.False(t, withDiff) // it was not set sendEvent := func(ts hlc.Timestamp) { - eventC <- &roachpb.RangeFeedEvent{ - Val: &roachpb.RangeFeedValue{ - Key: sp.Key, - }, - } + eventC <- kvcoord.RangeFeedMessage{ + RangeFeedEvent: &roachpb.RangeFeedEvent{ + Val: &roachpb.RangeFeedValue{ + Key: sp.Key, + }, + }} } iteration++ switch { @@ -180,7 +182,7 @@ func TestRangeFeedMock(t *testing.T) { return errors.New("boom") case iteration == firstPartialCheckpoint: assert.Equal(t, startFrom, initialTS) - eventC <- &roachpb.RangeFeedEvent{ + eventC <- kvcoord.RangeFeedMessage{RangeFeedEvent: &roachpb.RangeFeedEvent{ Checkpoint: &roachpb.RangeFeedCheckpoint{ Span: roachpb.Span{ Key: sp.Key, @@ -188,32 +190,34 @@ func TestRangeFeedMock(t *testing.T) { }, ResolvedTS: nextTS, }, - } + }} sendEvent(initialTS) return errors.New("boom") case iteration == secondPartialCheckpoint: assert.Equal(t, startFrom, initialTS) - eventC <- &roachpb.RangeFeedEvent{ - Checkpoint: &roachpb.RangeFeedCheckpoint{ - Span: roachpb.Span{ - Key: sp.Key.PrefixEnd(), - EndKey: sp.EndKey, + eventC <- kvcoord.RangeFeedMessage{ + RangeFeedEvent: &roachpb.RangeFeedEvent{ + Checkpoint: &roachpb.RangeFeedCheckpoint{ + Span: roachpb.Span{ + Key: sp.Key.PrefixEnd(), + EndKey: sp.EndKey, + }, + ResolvedTS: nextTS, }, - ResolvedTS: nextTS, - }, - } + }} sendEvent(nextTS) return errors.New("boom") case iteration == fullCheckpoint: // At this point the frontier should have a complete checkpoint at // nextTS. assert.Equal(t, startFrom, nextTS) - eventC <- &roachpb.RangeFeedEvent{ - Checkpoint: &roachpb.RangeFeedCheckpoint{ - Span: sp, - ResolvedTS: lastTS, - }, - } + eventC <- kvcoord.RangeFeedMessage{ + RangeFeedEvent: &roachpb.RangeFeedEvent{ + Checkpoint: &roachpb.RangeFeedCheckpoint{ + Span: sp, + ResolvedTS: lastTS, + }, + }} sendEvent(nextTS) return errors.New("boom") case iteration == lastEvent: @@ -263,14 +267,15 @@ func TestRangeFeedMock(t *testing.T) { return nil }, rangefeed: func( - ctx context.Context, spans []roachpb.Span, startFrom hlc.Timestamp, withDiff bool, eventC chan<- *roachpb.RangeFeedEvent, + ctx context.Context, spans []roachpb.Span, startFrom hlc.Timestamp, withDiff bool, eventC chan<- kvcoord.RangeFeedMessage, ) error { assert.True(t, withDiff) - eventC <- &roachpb.RangeFeedEvent{ - Val: &roachpb.RangeFeedValue{ - Key: sp.Key, - }, - } + eventC <- kvcoord.RangeFeedMessage{ + RangeFeedEvent: &roachpb.RangeFeedEvent{ + Val: &roachpb.RangeFeedValue{ + Key: sp.Key, + }, + }} <-ctx.Done() return ctx.Err() }, @@ -359,7 +364,7 @@ func TestBackoffOnRangefeedFailure(t *testing.T) { Times(3). Return(errors.New("rangefeed failed")) db.EXPECT().RangeFeed(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Do(func(context.Context, []roachpb.Span, hlc.Timestamp, bool, chan<- *roachpb.RangeFeedEvent) { + Do(func(context.Context, []roachpb.Span, hlc.Timestamp, bool, chan<- kvcoord.RangeFeedMessage) { cancel() }). Return(nil) diff --git a/pkg/kv/kvnemesis/watcher.go b/pkg/kv/kvnemesis/watcher.go index ae3b5c79e681..908bfa1394b6 100644 --- a/pkg/kv/kvnemesis/watcher.go +++ b/pkg/kv/kvnemesis/watcher.go @@ -70,7 +70,7 @@ func Watch(ctx context.Context, env *Env, dbs []*kv.DB, dataSpan roachpb.Span) ( } startTs := firstDB.Clock().Now() - eventC := make(chan *roachpb.RangeFeedEvent, 128) + eventC := make(chan kvcoord.RangeFeedMessage, 128) w.g.GoCtx(func(ctx context.Context) error { ts := startTs for i := 0; ; i = (i + 1) % len(dbs) { @@ -147,7 +147,7 @@ func (w *Watcher) WaitForFrontier(ctx context.Context, ts hlc.Timestamp) (retErr } } -func (w *Watcher) processEvents(ctx context.Context, eventC chan *roachpb.RangeFeedEvent) error { +func (w *Watcher) processEvents(ctx context.Context, eventC chan kvcoord.RangeFeedMessage) error { for { select { case <-ctx.Done(): diff --git a/pkg/kv/kvserver/client_rangefeed_test.go b/pkg/kv/kvserver/client_rangefeed_test.go index 909f022a0dd0..233bad25ca5b 100644 --- a/pkg/kv/kvserver/client_rangefeed_test.go +++ b/pkg/kv/kvserver/client_rangefeed_test.go @@ -72,7 +72,7 @@ func TestRangefeedWorksOnSystemRangesUnconditionally(t *testing.T) { EndKey: descTableKey.PrefixEnd(), } - evChan := make(chan *roachpb.RangeFeedEvent) + evChan := make(chan kvcoord.RangeFeedMessage) rangefeedErrChan := make(chan error, 1) ctxToCancel, cancel := context.WithCancel(ctx) go func() { @@ -100,6 +100,11 @@ func TestRangefeedWorksOnSystemRangesUnconditionally(t *testing.T) { require.EqualValues(t, junkDescriptor.DescriptorProto(), &gotProto) break } + + if !ev.RegisteredSpan.Equal(descTableSpan) { + t.Fatal("registered span in the message should be equal to " + + "the span used to create the rangefeed") + } } cancel() // There are several cases that seems like they can happen due @@ -129,7 +134,7 @@ func TestRangefeedWorksOnSystemRangesUnconditionally(t *testing.T) { } return nil }) - evChan := make(chan *roachpb.RangeFeedEvent) + evChan := make(chan kvcoord.RangeFeedMessage) require.Regexp(t, `rangefeeds require the kv\.rangefeed.enabled setting`, ds.RangeFeed(ctx, []roachpb.Span{scratchSpan}, startTS, false /* withDiff */, evChan)) }) @@ -179,7 +184,7 @@ func TestMergeOfRangeEventTableWhileRunningRangefeed(t *testing.T) { defer cancel() rangefeedErrChan := make(chan error, 1) // Make the buffer large so we don't risk blocking. - eventCh := make(chan *roachpb.RangeFeedEvent, 1000) + eventCh := make(chan kvcoord.RangeFeedMessage, 1000) start := db.Clock().Now() go func() { rangefeedErrChan <- ds.RangeFeed(rangefeedCtx, @@ -244,7 +249,7 @@ func TestRangefeedIsRoutedToNonVoter(t *testing.T) { require.NoError(t, err) rangefeedErrChan := make(chan error, 1) - eventCh := make(chan *roachpb.RangeFeedEvent, 1000) + eventCh := make(chan kvcoord.RangeFeedMessage, 1000) go func() { rangefeedErrChan <- ds.RangeFeed( rangefeedCtx, diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index 06d1f0fa3ea1..09a02d38a775 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -376,6 +376,7 @@ func TestReplicaRangefeed(t *testing.T) { storage.MVCCValue{Value: expVal7q})) require.NoError(t, sstWriter.Finish()) + // Does this mean all ts in sst should be equal to the batch ts? _, _, _, pErr = store1.DB().AddSSTableAtBatchTimestamp(ctx, roachpb.Key("b"), roachpb.Key("r"), sstFile.Data(), false /* disallowConflicts */, false /* disallowShadowing */, hlc.Timestamp{}, nil, /* stats */ true /* ingestAsWrites */, ts7) @@ -419,6 +420,7 @@ func TestReplicaRangefeed(t *testing.T) { Key: roachpb.Key("q"), Value: expVal7q, PrevValue: expVal6q, }}, }...) + // here checkForExpEvents(expEvents) // Cancel each of the rangefeed streams. @@ -1085,12 +1087,12 @@ func TestReplicaRangefeedPushesTransactions(t *testing.T) { ts1 := tc.Server(0).Clock().Now() rangeFeedCtx, rangeFeedCancel := context.WithCancel(ctx) defer rangeFeedCancel() - rangeFeedChs := make([]chan *roachpb.RangeFeedEvent, len(repls)) + rangeFeedChs := make([]chan kvcoord.RangeFeedMessage, len(repls)) rangeFeedErrC := make(chan error, len(repls)) for i := range repls { desc := repls[i].Desc() ds := tc.Server(i).DistSenderI().(*kvcoord.DistSender) - rangeFeedCh := make(chan *roachpb.RangeFeedEvent) + rangeFeedCh := make(chan kvcoord.RangeFeedMessage) rangeFeedChs[i] = rangeFeedCh go func() { span := roachpb.Span{ @@ -1239,7 +1241,7 @@ func TestRangefeedCheckpointsRecoverFromLeaseExpiration(t *testing.T) { rangeFeedCtx, rangeFeedCancel := context.WithCancel(ctx) defer rangeFeedCancel() ds := tc.Server(0).DistSenderI().(*kvcoord.DistSender) - rangeFeedCh := make(chan *roachpb.RangeFeedEvent) + rangeFeedCh := make(chan kvcoord.RangeFeedMessage) rangeFeedErrC := make(chan error, 1) go func() { span := roachpb.Span{ @@ -1259,6 +1261,7 @@ func TestRangefeedCheckpointsRecoverFromLeaseExpiration(t *testing.T) { if c := event.Checkpoint; c != nil && ts.Less(c.ResolvedTS) { checkpointed = true } + case err := <-rangeFeedErrC: t.Fatal(err) case <-timeout: @@ -1402,12 +1405,13 @@ func TestNewRangefeedForceLeaseRetry(t *testing.T) { rangeFeedCtx, rangeFeedCancel := context.WithCancel(ctx) defer rangeFeedCancel() ds := tc.Server(0).DistSenderI().(*kvcoord.DistSender) - rangeFeedCh := make(chan *roachpb.RangeFeedEvent) + rangeFeedCh := make(chan kvcoord.RangeFeedMessage) rangeFeedErrC := make(chan error, 1) + rangefeedSpan := roachpb.Span{ + Key: desc.StartKey.AsRawKey(), EndKey: desc.EndKey.AsRawKey(), + } startRangefeed := func() { - span := roachpb.Span{ - Key: desc.StartKey.AsRawKey(), EndKey: desc.EndKey.AsRawKey(), - } + span := rangefeedSpan rangeFeedErrC <- ds.RangeFeed(rangeFeedCtx, []roachpb.Span{span}, ts1, false /* withDiff */, rangeFeedCh) } @@ -1422,6 +1426,10 @@ func TestNewRangefeedForceLeaseRetry(t *testing.T) { if c := event.Checkpoint; c != nil && ts.Less(c.ResolvedTS) { checkpointed = true } + if !event.RegisteredSpan.Equal(rangefeedSpan) { + t.Fatal("registered span in the message should be equal to " + + "the span used to create the rangefeed") + } case err := <-rangeFeedErrC: t.Fatal(err) case <-timeout: From dae630410dee5c06a4d38f7790cb754686c5f1e2 Mon Sep 17 00:00:00 2001 From: Casper Date: Wed, 27 Jul 2022 17:30:50 -0400 Subject: [PATCH 2/2] streamingccl: support DeleteRange in tenant stream replication This PR supports DeleteRange operation both in producer that process DelRange from rangefeed and in stream ingestion processor that ingest DelRange as SST into destination. For version >= 22.2, SSTs can also contain range tombstones, i.e., DelRange, this PR also supports ingesting them. Ingestion processor uses a separate SST writer to ingest range tombstones as SSTBatcher does not support adding MVCCRangeKeys. yet. This PR also cleans up the GenerationEvent that is no longer a valid concept in the current consumer-tracked design. Release note: None --- pkg/BUILD.bazel | 2 + pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go | 4 +- pkg/ccl/streamingccl/BUILD.bazel | 24 +- pkg/ccl/streamingccl/event.go | 87 ++++-- .../streamingccl/streamclient/client_test.go | 5 +- .../streamclient/partitioned_stream_client.go | 7 +- pkg/ccl/streamingccl/streamingest/BUILD.bazel | 2 + .../stream_ingestion_processor.go | 264 ++++++++++++++---- .../stream_ingestion_processor_test.go | 56 ---- .../stream_replication_e2e_test.go | 90 ++++++ .../streamingtest/replication_helpers.go | 14 - pkg/ccl/streamingccl/streampb/stream.proto | 1 + .../streamingccl/streamproducer/BUILD.bazel | 3 + .../streamproducer/event_stream.go | 131 +++++---- .../streamproducer/replication_stream_test.go | 136 +++++++++ pkg/ccl/streamingccl/utils.go | 111 ++++++++ pkg/ccl/streamingccl/utils_test.go | 132 +++++++++ pkg/kv/kvclient/kvcoord/rangefeed_message.go | 4 +- pkg/kv/kvserver/replica_rangefeed_test.go | 1 - 19 files changed, 856 insertions(+), 218 deletions(-) create mode 100644 pkg/ccl/streamingccl/utils.go create mode 100644 pkg/ccl/streamingccl/utils_test.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 5143aff8f364..26ed3f48eb94 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -79,6 +79,7 @@ ALL_TESTS = [ "//pkg/ccl/streamingccl/streamclient:streamclient_test", "//pkg/ccl/streamingccl/streamingest:streamingest_test", "//pkg/ccl/streamingccl/streamproducer:streamproducer_test", + "//pkg/ccl/streamingccl:streamingccl_test", "//pkg/ccl/telemetryccl:telemetryccl_test", "//pkg/ccl/testccl/sqlccl:sqlccl_test", "//pkg/ccl/testccl/workload/schemachange:schemachange_test", @@ -745,6 +746,7 @@ GO_TARGETS = [ "//pkg/ccl/streamingccl/streamproducer:streamproducer", "//pkg/ccl/streamingccl/streamproducer:streamproducer_test", "//pkg/ccl/streamingccl:streamingccl", + "//pkg/ccl/streamingccl:streamingccl_test", "//pkg/ccl/telemetryccl:telemetryccl_test", "//pkg/ccl/testccl/sqlccl:sqlccl_test", "//pkg/ccl/testccl/workload/schemachange:schemachange_test", diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go index f06741afb227..f99318e85075 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go @@ -391,7 +391,7 @@ func (f rawEventFeed) run( ctx context.Context, spans []kvcoord.SpanTimePair, withDiff bool, - eventC chan<- *roachpb.RangeFeedEvent, + eventC chan<- kvcoord.RangeFeedMessage, ) error { var startAfter hlc.Timestamp for _, s := range spans { @@ -414,7 +414,7 @@ func (f rawEventFeed) run( f = f[i:] for i := range f { select { - case eventC <- &f[i]: + case eventC <- kvcoord.RangeFeedMessage{RangeFeedEvent: &f[i]}: case <-ctx.Done(): return ctx.Err() } diff --git a/pkg/ccl/streamingccl/BUILD.bazel b/pkg/ccl/streamingccl/BUILD.bazel index 6033606fbe7a..561c6e56398d 100644 --- a/pkg/ccl/streamingccl/BUILD.bazel +++ b/pkg/ccl/streamingccl/BUILD.bazel @@ -1,5 +1,5 @@ load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "streamingccl", @@ -8,6 +8,7 @@ go_library( "errors.go", "event.go", "settings.go", + "utils.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl", visibility = ["//visibility:public"], @@ -16,7 +17,28 @@ go_library( "//pkg/jobs/jobspb", "//pkg/roachpb", "//pkg/settings", + "//pkg/storage", "//pkg/streaming", + "//pkg/util/hlc", + "@com_github_cockroachdb_errors//:errors", + ], +) + +go_test( + name = "streamingccl_test", + srcs = ["utils_test.go"], + embed = [":streamingccl"], + deps = [ + "//pkg/clusterversion", + "//pkg/roachpb", + "//pkg/settings/cluster", + "//pkg/storage", + "//pkg/testutils/storageutils", + "//pkg/util/hlc", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/timeutil", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/ccl/streamingccl/event.go b/pkg/ccl/streamingccl/event.go index 90cfe9713efb..d04af2a9b1c6 100644 --- a/pkg/ccl/streamingccl/event.go +++ b/pkg/ccl/streamingccl/event.go @@ -23,16 +23,18 @@ const ( // SSTableEvent indicates that the SSTable field of an event holds an updated // SSTable which needs to be ingested. SSTableEvent + // DeleteRangeEvent indicates that the DeleteRange field of an event holds a + // DeleteRange which needs to be ingested. + DeleteRangeEvent // CheckpointEvent indicates that GetResolvedSpans will be meaningful. The resolved // timestamp indicates that all KVs have been emitted up to this timestamp. CheckpointEvent - // GenerationEvent indicates that the stream should start ingesting with the - // updated topology. - GenerationEvent ) // Event describes an event emitted by a cluster to cluster stream. Its Type // field indicates which other fields are meaningful. +// TODO(casper): refactor this to use a protobuf message type that has one of +// union of event types below. type Event interface { // Type specifies which accessor will be meaningful. Type() EventType @@ -40,9 +42,12 @@ type Event interface { // GetKV returns a KV event if the EventType is KVEvent. GetKV() *roachpb.KeyValue - // GetSSTable returns a SSTable event if the EventType is SSTable. + // GetSSTable returns a AddSSTable event if the EventType is SSTableEvent. GetSSTable() *roachpb.RangeFeedSSTable + // GetDeleteRange returns a DeleteRange event if the EventType is DeleteRangeEvent. + GetDeleteRange() *roachpb.RangeFeedDeleteRange + // GetResolvedSpans returns a list of span-time pairs indicating the time for // which all KV events within that span has been emitted. GetResolvedSpans() *[]jobspb.ResolvedSpan @@ -70,6 +75,11 @@ func (kve kvEvent) GetSSTable() *roachpb.RangeFeedSSTable { return nil } +// GetDeleteRange implements the Event interface. +func (kve kvEvent) GetDeleteRange() *roachpb.RangeFeedDeleteRange { + return nil +} + // GetResolvedSpans implements the Event interface. func (kve kvEvent) GetResolvedSpans() *[]jobspb.ResolvedSpan { return nil @@ -95,6 +105,11 @@ func (sste sstableEvent) GetSSTable() *roachpb.RangeFeedSSTable { return &sste.sst } +// GetDeleteRange implements the Event interface. +func (sste sstableEvent) GetDeleteRange() *roachpb.RangeFeedDeleteRange { + return nil +} + // GetResolvedSpans implements the Event interface. func (sste sstableEvent) GetResolvedSpans() *[]jobspb.ResolvedSpan { return nil @@ -102,59 +117,71 @@ func (sste sstableEvent) GetResolvedSpans() *[]jobspb.ResolvedSpan { var _ Event = sstableEvent{} -// checkpointEvent indicates that the stream has emitted every change for all -// keys in the span it is responsible for up until this timestamp. -type checkpointEvent struct { - resolvedSpans []jobspb.ResolvedSpan +// delRangeEvent is a DeleteRange event that needs to be ingested. +type delRangeEvent struct { + delRange roachpb.RangeFeedDeleteRange } -var _ Event = checkpointEvent{} - // Type implements the Event interface. -func (ce checkpointEvent) Type() EventType { - return CheckpointEvent +func (dre delRangeEvent) Type() EventType { + return DeleteRangeEvent } // GetKV implements the Event interface. -func (ce checkpointEvent) GetKV() *roachpb.KeyValue { +func (dre delRangeEvent) GetKV() *roachpb.KeyValue { return nil } // GetSSTable implements the Event interface. -func (ce checkpointEvent) GetSSTable() *roachpb.RangeFeedSSTable { +func (dre delRangeEvent) GetSSTable() *roachpb.RangeFeedSSTable { return nil } +// GetDeleteRange implements the Event interface. +func (dre delRangeEvent) GetDeleteRange() *roachpb.RangeFeedDeleteRange { + return &dre.delRange +} + // GetResolvedSpans implements the Event interface. -func (ce checkpointEvent) GetResolvedSpans() *[]jobspb.ResolvedSpan { - return &ce.resolvedSpans +func (dre delRangeEvent) GetResolvedSpans() *[]jobspb.ResolvedSpan { + return nil } -// generationEvent indicates that the topology of the stream has changed. -type generationEvent struct{} +var _ Event = delRangeEvent{} -var _ Event = generationEvent{} +// checkpointEvent indicates that the stream has emitted every change for all +// keys in the span it is responsible for up until this timestamp. +type checkpointEvent struct { + resolvedSpans []jobspb.ResolvedSpan +} + +var _ Event = checkpointEvent{} // Type implements the Event interface. -func (ge generationEvent) Type() EventType { - return GenerationEvent +func (ce checkpointEvent) Type() EventType { + return CheckpointEvent } // GetKV implements the Event interface. -func (ge generationEvent) GetKV() *roachpb.KeyValue { +func (ce checkpointEvent) GetKV() *roachpb.KeyValue { return nil } // GetSSTable implements the Event interface. -func (ge generationEvent) GetSSTable() *roachpb.RangeFeedSSTable { +func (ce checkpointEvent) GetSSTable() *roachpb.RangeFeedSSTable { return nil } -// GetResolvedSpans implements the Event interface. -func (ge generationEvent) GetResolvedSpans() *[]jobspb.ResolvedSpan { +// GetDeleteRange implements the Event interface. +func (ce checkpointEvent) GetDeleteRange() *roachpb.RangeFeedDeleteRange { return nil } +// GetResolvedSpans implements the Event interface. +func (ce checkpointEvent) GetResolvedSpans() *[]jobspb.ResolvedSpan { + return &ce.resolvedSpans +} + // MakeKVEvent creates an Event from a KV. func MakeKVEvent(kv roachpb.KeyValue) Event { return kvEvent{kv: kv} @@ -165,12 +192,12 @@ func MakeSSTableEvent(sst roachpb.RangeFeedSSTable) Event { return sstableEvent{sst: sst} } +// MakeDeleteRangeEvent creates an Event from a DeleteRange. +func MakeDeleteRangeEvent(delRange roachpb.RangeFeedDeleteRange) Event { + return delRangeEvent{delRange: delRange} +} + // MakeCheckpointEvent creates an Event from a resolved timestamp. func MakeCheckpointEvent(resolvedSpans []jobspb.ResolvedSpan) Event { return checkpointEvent{resolvedSpans: resolvedSpans} } - -// MakeGenerationEvent creates an GenerationEvent. -func MakeGenerationEvent() Event { - return generationEvent{} -} diff --git a/pkg/ccl/streamingccl/streamclient/client_test.go b/pkg/ccl/streamingccl/streamclient/client_test.go index 5b98977bd338..1a4351e1c55d 100644 --- a/pkg/ccl/streamingccl/streamclient/client_test.go +++ b/pkg/ccl/streamingccl/streamclient/client_test.go @@ -237,6 +237,9 @@ func ExampleClient() { case streamingccl.SSTableEvent: sst := event.GetSSTable() fmt.Printf("sst: %s->%s@%d\n", sst.Span.String(), string(sst.Data), sst.WriteTS.WallTime) + case streamingccl.DeleteRangeEvent: + delRange := event.GetDeleteRange() + fmt.Printf("delRange: %s@%d\n", delRange.Span.String(), delRange.Timestamp.WallTime) case streamingccl.CheckpointEvent: ingested.Lock() minTS := hlc.MaxTimestamp @@ -248,8 +251,6 @@ func ExampleClient() { ingested.ts.Forward(minTS) ingested.Unlock() fmt.Printf("resolved %d\n", minTS.WallTime) - case streamingccl.GenerationEvent: - fmt.Printf("received generation event") default: panic(fmt.Sprintf("unexpected event type %v", event.Type())) } diff --git a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go index be23beaedc6e..7337b8943c4e 100644 --- a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go @@ -264,8 +264,13 @@ func parseEvent(streamEvent *streampb.StreamEvent) streamingccl.Event { } else if len(streamEvent.Batch.KeyValues) > 0 { event = streamingccl.MakeKVEvent(streamEvent.Batch.KeyValues[0]) streamEvent.Batch.KeyValues = streamEvent.Batch.KeyValues[1:] + } else if len(streamEvent.Batch.DelRanges) > 0 { + event = streamingccl.MakeDeleteRangeEvent(streamEvent.Batch.DelRanges[0]) + streamEvent.Batch.DelRanges = streamEvent.Batch.DelRanges[1:] } - if len(streamEvent.Batch.KeyValues) == 0 && len(streamEvent.Batch.Ssts) == 0 { + if len(streamEvent.Batch.KeyValues) == 0 && + len(streamEvent.Batch.Ssts) == 0 && + len(streamEvent.Batch.DelRanges) == 0 { streamEvent.Batch = nil } } diff --git a/pkg/ccl/streamingccl/streamingest/BUILD.bazel b/pkg/ccl/streamingccl/streamingest/BUILD.bazel index 921744b25e5c..3363ac8b46dd 100644 --- a/pkg/ccl/streamingccl/streamingest/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamingest/BUILD.bazel @@ -45,6 +45,7 @@ go_library( "//pkg/sql/sem/tree", "//pkg/sql/types", "//pkg/storage", + "//pkg/storage/enginepb", "//pkg/streaming", "//pkg/util/ctxgroup", "//pkg/util/hlc", @@ -114,6 +115,7 @@ go_test( "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/sqlutils", + "//pkg/testutils/storageutils", "//pkg/testutils/testcluster", "//pkg/util/hlc", "//pkg/util/json", diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index 8293fffaf712..64658d04df68 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -19,9 +19,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/bulk" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -30,6 +33,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -67,11 +71,48 @@ var streamIngestionResultTypes = []*types.T{ } type mvccKeyValues []storage.MVCCKeyValue +type mvccRangeKeyValues []storage.MVCCRangeKeyValue func (s mvccKeyValues) Len() int { return len(s) } func (s mvccKeyValues) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s mvccKeyValues) Less(i, j int) bool { return s[i].Key.Less(s[j].Key) } +// Specialized SST batcher that is responsible for ingesting range tombstones. +type rangeKeyBatcher struct { + db *kv.DB + + // Functor that creates a new range key SST writer in case + // we need to operate on a new batch. The created SST writer + // operates on the rangeKeySSTFile below. + // TODO(casper): replace this if SSTBatcher someday has support for + // adding MVCCRangeKeyValue + rangeKeySSTWriterMaker func() *storage.SSTWriter + // In-memory SST file for flushing MVCC range keys + rangeKeySSTFile *storage.MemFile + // curRangeKVBatch is the current batch of range KVs which will + // be ingested through 'flush' later. + curRangeKVBatch mvccRangeKeyValues + + // Minimum timestamp in the current batch. Used for metrics purpose. + minTimestamp hlc.Timestamp + // Data size of the current batch. + dataSize int +} + +func newRangeKeyBatcher(ctx context.Context, cs *cluster.Settings, db *kv.DB) *rangeKeyBatcher { + batcher := &rangeKeyBatcher{ + db: db, + minTimestamp: hlc.MaxTimestamp, + dataSize: 0, + rangeKeySSTFile: &storage.MemFile{}, + } + batcher.rangeKeySSTWriterMaker = func() *storage.SSTWriter { + w := storage.MakeIngestionSSTWriter(ctx, cs, batcher.rangeKeySSTFile) + return &w + } + return batcher +} + type streamIngestionProcessor struct { execinfra.ProcessorBase @@ -82,14 +123,16 @@ type streamIngestionProcessor struct { // rewriteToDiffKey Indicates whether we are rekeying a key into a different key. rewriteToDiffKey bool - // curBatch temporarily batches MVCC Keys so they can be + // curKVBatch temporarily batches MVCC Keys so they can be // sorted before ingestion. // TODO: This doesn't yet use a buffering adder since the current // implementation is specific to ingesting KV pairs without timestamps rather // than MVCCKeys. - curBatch mvccKeyValues - // batcher is used to flush SSTs to the storage layer. - batcher *bulk.SSTBatcher + curKVBatch mvccKeyValues + // batcher is used to flush KVs into SST to the storage layer. + batcher *bulk.SSTBatcher + // rangeBatcher is used to flush range KVs into SST to the storage layer. + rangeBatcher *rangeKeyBatcher maxFlushRateTimer *timeutil.Timer // client is a streaming client which provides a stream of events from a given @@ -198,7 +241,7 @@ func newStreamIngestionDataProcessor( flowCtx: flowCtx, spec: spec, output: output, - curBatch: make([]storage.MVCCKeyValue, 0), + curKVBatch: make([]storage.MVCCKeyValue, 0), frontier: frontier, maxFlushRateTimer: timeutil.NewTimer(), cutoverProvider: &cutoverFromJobProgress{ @@ -243,6 +286,8 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) { return } + sip.rangeBatcher = newRangeKeyBatcher(ctx, evalCtx.Settings, db) + // Start a poller that checks if the stream ingestion job has been signaled to // cutover. sip.pollingWaitGroup.Add(1) @@ -375,7 +420,6 @@ func (sip *streamIngestionProcessor) close() { if sip.cancelMergeAndWait != nil { sip.cancelMergeAndWait() } - sip.InternalClose() } @@ -509,6 +553,10 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpans, err if err := sip.bufferSST(event.GetSSTable()); err != nil { return nil, err } + case streamingccl.DeleteRangeEvent: + if err := sip.bufferDelRange(event.GetDeleteRange()); err != nil { + return nil, err + } case streamingccl.CheckpointEvent: if err := sip.bufferCheckpoint(event); err != nil { return nil, err @@ -525,15 +573,6 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpans, err } return sip.flush() - case streamingccl.GenerationEvent: - log.Info(sip.Ctx, "GenerationEvent received") - select { - case <-sip.cutoverCh: - sip.internalDrained = true - return nil, nil - case <-sip.Ctx.Done(): - return nil, sip.Ctx.Err() - } default: return nil, errors.Newf("unknown streaming event type %v", event.Type()) } @@ -558,30 +597,74 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpans, err } func (sip *streamIngestionProcessor) bufferSST(sst *roachpb.RangeFeedSSTable) error { + // TODO(casper): we currently buffer all keys in an SST at once even for large SSTs. + // If in the future we decide buffer them in separate batches, we need to be + // careful with checkpoints: we can only send checkpoint whose TS >= SST batch TS + // after the full SST gets ingested. + _, sp := tracing.ChildSpan(sip.Ctx, "stream-ingestion-buffer-sst") defer sp.Finish() + return streamingccl.ScanSST(sst, sst.Span, + func(keyVal storage.MVCCKeyValue) error { + return sip.bufferKV(&roachpb.KeyValue{ + Key: keyVal.Key.Key, + Value: roachpb.Value{ + RawBytes: keyVal.Value, + Timestamp: keyVal.Key.Timestamp, + }, + }) + }, func(rangeKeyVal storage.MVCCRangeKeyValue) error { + return sip.bufferRangeKeyVal(rangeKeyVal) + }) +} - iter, err := storage.NewMemSSTIterator(sst.Data, true) +func (sip *streamIngestionProcessor) rekey(key roachpb.Key) ([]byte, error) { + rekey, ok, err := sip.rekeyer.RewriteKey(key) + if !ok { + return nil, errors.New("every key is expected to match tenant prefix") + } + if err != nil { + return nil, err + } + return rekey, nil +} + +func (sip *streamIngestionProcessor) bufferDelRange(delRange *roachpb.RangeFeedDeleteRange) error { + tombstoneVal, err := storage.EncodeMVCCValue(storage.MVCCValue{ + MVCCValueHeader: enginepb.MVCCValueHeader{ + LocalTimestamp: hlc.ClockTimestamp{ + WallTime: 0, + }}, + }) if err != nil { return err } - defer iter.Close() - for ; ; iter.Next() { - if ok, err := iter.Valid(); err != nil { - return err - } else if !ok { // cursor passed the span end key - break - } - if err = sip.bufferKV(&roachpb.KeyValue{ - Key: iter.UnsafeKey().Key, - Value: roachpb.Value{ - RawBytes: iter.UnsafeValue(), - Timestamp: iter.UnsafeKey().Timestamp, - }, - }); err != nil { - return err - } + return sip.bufferRangeKeyVal(storage.MVCCRangeKeyValue{ + RangeKey: storage.MVCCRangeKey{ + StartKey: delRange.Span.Key, + EndKey: delRange.Span.EndKey, + Timestamp: delRange.Timestamp, + }, + Value: tombstoneVal, + }) +} + +func (sip *streamIngestionProcessor) bufferRangeKeyVal( + rangeKeyVal storage.MVCCRangeKeyValue, +) error { + _, sp := tracing.ChildSpan(sip.Ctx, "stream-ingestion-buffer-range-key") + defer sp.Finish() + + var err error + rangeKeyVal.RangeKey.StartKey, err = sip.rekey(rangeKeyVal.RangeKey.StartKey) + if err != nil { + return err } + rangeKeyVal.RangeKey.EndKey, err = sip.rekey(rangeKeyVal.RangeKey.EndKey) + if err != nil { + return err + } + sip.rangeBatcher.buffer(rangeKeyVal) return nil } @@ -593,14 +676,11 @@ func (sip *streamIngestionProcessor) bufferKV(kv *roachpb.KeyValue) error { return errors.New("kv event expected to have kv") } - rekey, ok, err := sip.rekeyer.RewriteKey(kv.Key) - if !ok { - return errors.New("every key is expected to match tenant prefix") - } + var err error + kv.Key, err = sip.rekey(kv.Key) if err != nil { return err } - kv.Key = rekey if sip.rewriteToDiffKey { kv.Value.ClearChecksum() @@ -611,7 +691,8 @@ func (sip *streamIngestionProcessor) bufferKV(kv *roachpb.KeyValue) error { Key: kv.Key, Timestamp: kv.Value.Timestamp, } - sip.curBatch = append(sip.curBatch, storage.MVCCKeyValue{Key: mvccKey, Value: kv.Value.RawBytes}) + sip.curKVBatch = append(sip.curKVBatch, + storage.MVCCKeyValue{Key: mvccKey, Value: kv.Value.RawBytes}) return nil } @@ -643,37 +724,117 @@ func (sip *streamIngestionProcessor) bufferCheckpoint(event partitionEvent) erro return nil } +// Write a batch of MVCC range keys into the SST batcher, and returns +// the current size of all buffered range keys. +func (r *rangeKeyBatcher) buffer(rangeKV storage.MVCCRangeKeyValue) { + r.curRangeKVBatch = append(r.curRangeKVBatch, rangeKV) + r.dataSize += rangeKV.RangeKey.EncodedSize() + len(rangeKV.Value) +} + +func (r *rangeKeyBatcher) size() int { + return r.dataSize +} + +// Flush all the range keys buffered so far into storage as an SST. +func (r *rangeKeyBatcher) flush(ctx context.Context) error { + if len(r.curRangeKVBatch) == 0 { + return nil + } + + sstWriter := r.rangeKeySSTWriterMaker() + defer sstWriter.Close() + // Sort current batch as the SST writer requires a sorted order. + sort.Slice(r.curRangeKVBatch, func(i, j int) bool { + return r.curRangeKVBatch[i].RangeKey.Compare(r.curRangeKVBatch[j].RangeKey) < 0 + }) + + start, end := keys.MaxKey, keys.MinKey + for _, rangeKeyVal := range r.curRangeKVBatch { + if err := sstWriter.PutRawMVCCRangeKey(rangeKeyVal.RangeKey, rangeKeyVal.Value); err != nil { + return err + } + + if rangeKeyVal.RangeKey.StartKey.Compare(start) < 0 { + start = rangeKeyVal.RangeKey.StartKey + } + if rangeKeyVal.RangeKey.EndKey.Compare(end) > 0 { + end = rangeKeyVal.RangeKey.EndKey + } + if rangeKeyVal.RangeKey.Timestamp.Less(r.minTimestamp) { + r.minTimestamp = rangeKeyVal.RangeKey.Timestamp + } + } + + // Finish the current batch. + if err := sstWriter.Finish(); err != nil { + return err + } + + _, _, err := r.db.AddSSTable(ctx, start, end, r.rangeKeySSTFile.Data(), + false /* disallowConflicts */, false, /* disallowShadowing */ + hlc.Timestamp{}, nil /* stats */, false, /* ingestAsWrites */ + r.db.Clock().Now()) + return err +} + +// Reset all the states inside the batcher and needs to called after flush +// for further uses. +func (r *rangeKeyBatcher) reset() { + if len(r.curRangeKVBatch) == 0 { + return + } + r.rangeKeySSTFile.Reset() + r.minTimestamp = hlc.MaxTimestamp + r.dataSize = 0 + r.curRangeKVBatch = r.curRangeKVBatch[:0] +} + func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) { ctx, sp := tracing.ChildSpan(sip.Ctx, "stream-ingestion-flush") defer sp.Finish() flushedCheckpoints := jobspb.ResolvedSpans{ResolvedSpans: make([]jobspb.ResolvedSpan, 0)} // Ensure that the current batch is sorted. - sort.Sort(sip.curBatch) - + sort.Sort(sip.curKVBatch) totalSize := 0 minBatchMVCCTimestamp := hlc.MaxTimestamp - for _, kv := range sip.curBatch { - if err := sip.batcher.AddMVCCKey(ctx, kv.Key, kv.Value); err != nil { - return nil, errors.Wrapf(err, "adding key %+v", kv) + for _, keyVal := range sip.curKVBatch { + if err := sip.batcher.AddMVCCKey(ctx, keyVal.Key, keyVal.Value); err != nil { + return nil, errors.Wrapf(err, "adding key %+v", keyVal) + } + if keyVal.Key.Timestamp.Less(minBatchMVCCTimestamp) { + minBatchMVCCTimestamp = keyVal.Key.Timestamp } - if kv.Key.Timestamp.Less(minBatchMVCCTimestamp) { - minBatchMVCCTimestamp = kv.Key.Timestamp + totalSize += len(keyVal.Key.Key) + len(keyVal.Value) + } + + if sip.rangeBatcher.size() > 0 { + totalSize += sip.rangeBatcher.size() + if sip.rangeBatcher.minTimestamp.Less(minBatchMVCCTimestamp) { + minBatchMVCCTimestamp = sip.rangeBatcher.minTimestamp } - totalSize += len(kv.Key.Key) + len(kv.Value) } - if len(sip.curBatch) > 0 { + if len(sip.curKVBatch) > 0 || sip.rangeBatcher.size() > 0 { preFlushTime := timeutil.Now() defer func() { sip.metrics.FlushHistNanos.RecordValue(timeutil.Since(preFlushTime).Nanoseconds()) sip.metrics.CommitLatency.RecordValue(timeutil.Since(minBatchMVCCTimestamp.GoTime()).Nanoseconds()) sip.metrics.Flushes.Inc(1) sip.metrics.IngestedBytes.Inc(int64(totalSize)) - sip.metrics.IngestedEvents.Inc(int64(len(sip.curBatch))) + sip.metrics.IngestedEvents.Inc(int64(len(sip.curKVBatch))) + sip.metrics.IngestedEvents.Inc(int64(sip.rangeBatcher.size())) }() - if err := sip.batcher.Flush(ctx); err != nil { - return nil, errors.Wrap(err, "flushing") + if len(sip.curKVBatch) > 0 { + if err := sip.batcher.Flush(ctx); err != nil { + return nil, errors.Wrap(err, "flushing sst batcher") + } + } + + if sip.rangeBatcher.size() > 0 { + if err := sip.rangeBatcher.flush(ctx); err != nil { + return nil, errors.Wrap(err, "flushing range key sst") + } } } @@ -685,8 +846,9 @@ func (sip *streamIngestionProcessor) flush() (*jobspb.ResolvedSpans, error) { }) // Reset the current batch. - sip.curBatch = nil sip.lastFlushTime = timeutil.Now() + sip.curKVBatch = nil + sip.rangeBatcher.reset() return &flushedCheckpoints, sip.batcher.Reset(ctx) } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go index d3ff10993c3f..f3ec19e117a8 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go @@ -14,7 +14,6 @@ import ( "math" "net/url" "strconv" - "sync" "testing" "time" @@ -322,61 +321,6 @@ func TestStreamIngestionProcessor(t *testing.T) { require.Nil(t, row) testutils.IsError(meta.Err, "this client always returns an error") }) - - t.Run("stream ingestion processor shuts down gracefully on losing client connection", func(t *testing.T) { - events := []streamingccl.Event{streamingccl.MakeGenerationEvent()} - mockClient := &mockStreamClient{ - partitionEvents: map[string][]streamingccl.Event{"foo": events}, - } - - startTime := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} - partitions := []streamclient.PartitionInfo{{SubscriptionToken: streamclient.SubscriptionToken("foo")}} - - processEventCh := make(chan struct{}) - defer close(processEventCh) - streamingTestingKnob := &sql.StreamingTestingKnobs{RunAfterReceivingEvent: func(ctx context.Context) error { - processEventCh <- struct{}{} - return nil - }} - sip, out, err := getStreamIngestionProcessor(ctx, t, registry, kvDB, - partitions, startTime, []jobspb.ResolvedSpan{} /* checkpoint */, nil /* interceptEvents */, tenantRekey, mockClient, nil /* cutoverProvider */, streamingTestingKnob) - defer func() { - require.NoError(t, sip.forceClientForTests.Close(ctx)) - }() - require.NoError(t, err) - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - sip.Run(ctx) - }() - - // The channel will block on read if the event has not been intercepted yet. - // Once it unblocks, we are guaranteed that the mockClient has sent the - // GenerationEvent and the processor has read it. - <-processEventCh - - // The sip processor has received a GenerationEvent and is thus - // waiting for a cutover signal, so let's send one! - sip.cutoverCh <- struct{}{} - - wg.Wait() - // Ensure that all the outputs are properly closed. - if !out.ProducerClosed() { - t.Fatalf("output RowReceiver not closed") - } - - for { - // No metadata should have been produced since the processor - // should have been moved to draining state with a nil error. - row := out.NextNoMeta(t) - if row == nil { - break - } - t.Fatalf("more output rows than expected") - } - }) } func getPartitionSpanToTableID( diff --git a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go index 7d9a7c94c979..ca97ebbfe0da 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/storageutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -324,6 +325,10 @@ func TestTenantStreamingSuccessfulIngestion(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + // TODO(casper): disabled due to error when setting a cluster setting + // "setting updated but timed out waiting to read new value" + skip.UnderStressRace(t, "disabled under stress race") + dataSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method == "GET" { if _, err := w.Write([]byte("42,42\n43,43\n")); err != nil { @@ -379,6 +384,10 @@ func TestTenantStreamingProducerJobTimedOut(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + // TODO(casper): disabled due to error when setting a cluster setting + // "setting updated but timed out waiting to read new value" + skip.UnderStressRace(t, "disabled under stress race") + ctx := context.Background() args := defaultTenantStreamingClustersArgs args.srcClusterSettings[`stream_replication.job_liveness_timeout`] = `'1m'` @@ -434,6 +443,10 @@ func TestTenantStreamingPauseResumeIngestion(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + // TODO(casper): disabled due to error when setting a cluster setting + // "setting updated but timed out waiting to read new value" + skip.UnderStressRace(t, "disabled under stress race") + ctx := context.Background() args := defaultTenantStreamingClustersArgs c, cleanup := createTenantStreamingClusters(ctx, t, args) @@ -488,6 +501,10 @@ func TestTenantStreamingPauseOnError(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + // TODO(casper): disabled due to error when setting a cluster setting + // "setting updated but timed out waiting to read new value" + skip.UnderStressRace(t, "disabled under stress race") + ctx := context.Background() ingestErrCh := make(chan error, 1) args := defaultTenantStreamingClustersArgs @@ -533,6 +550,10 @@ func TestTenantStreamingCheckpoint(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + // TODO(casper): disabled due to error when setting a cluster setting + // "setting updated but timed out waiting to read new value" + skip.UnderStressRace(t, "disabled under stress race") + ctx := context.Background() lastClientStart := make(map[string]hlc.Timestamp) @@ -822,3 +843,72 @@ func TestTenantStreamingCutoverOnSourceFailure(t *testing.T) { // Ingestion job should succeed despite source failure due to the successful cutover jobutils.WaitForJobToSucceed(t, c.destSysSQL, jobspb.JobID(ingestionJobID)) } + +func TestTenantStreamingDeleteRange(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // TODO(casper): disabled due to error when setting a cluster setting + // "setting updated but timed out waiting to read new value" + skip.UnderStressRace(t, "disabled under stress race") + + ctx := context.Background() + c, cleanup := createTenantStreamingClusters(ctx, t, defaultTenantStreamingClustersArgs) + defer cleanup() + + producerJobID, ingestionJobID := c.startStreamReplication() + jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID)) + jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(ingestionJobID)) + + srcTime := c.srcSysServer.Clock().Now() + c.waitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID)) + + cleanUpTenant := c.createDestTenantSQL(ctx) + defer func() { + require.NoError(t, cleanUpTenant()) + }() + + c.compareResult("SELECT * FROM d.t1") + c.compareResult("SELECT * FROM d.t2") + + // Introduce a DeleteRange on t1 and t2. + checkDelRangeOnTable := func(table string, embeddedInSST bool) { + srcCodec := keys.MakeSQLCodec(c.args.srcTenantID) + desc := desctestutils.TestingGetPublicTableDescriptor( + c.srcSysServer.DB(), srcCodec, "d", table) + tableSpan := desc.PrimaryIndexSpan(srcCodec) + + // Introduce a DelRange on the table span. + srcTimeBeforeDelRange := c.srcSysServer.Clock().Now() + // Put the DelRange in the SST. + if embeddedInSST { + batchHLCTime := c.srcSysServer.Clock().Now() + batchHLCTime.Logical = 0 + data, start, end := storageutils.MakeSST(t, c.srcSysServer.ClusterSettings(), []interface{}{ + storageutils.RangeKV(string(tableSpan.Key), string(tableSpan.EndKey), int(batchHLCTime.WallTime), ""), + }) + _, _, _, err := c.srcSysServer.DB().AddSSTableAtBatchTimestamp(ctx, start, end, data, false, + false, hlc.Timestamp{}, nil, false, batchHLCTime) + require.NoError(t, err) + } else { + // Use DelRange directly. + // Inserted two out-of-order overlapping DelRanges to check if it works + // on multiple ranges keys in the same batch. + require.NoError(t, c.srcSysServer.DB().DelRangeUsingTombstone(ctx, + tableSpan.Key.Next(), tableSpan.EndKey)) + require.NoError(t, c.srcSysServer.DB().DelRangeUsingTombstone(ctx, + tableSpan.Key, tableSpan.Key.Next().Next())) + } + c.waitUntilHighWatermark(c.srcSysServer.Clock().Now(), jobspb.JobID(ingestionJobID)) + c.compareResult(fmt.Sprintf("SELECT * FROM d.%s", table)) + + // Point-in-time query, check if the DeleteRange is MVCC-compatible. + c.compareResult(fmt.Sprintf("SELECT * FROM d.%s AS OF SYSTEM TIME %d", + table, srcTimeBeforeDelRange.WallTime)) + } + + // Test on two tables to check if the range keys sst batcher + // can work on multiple flushes. + checkDelRangeOnTable("t1", true /* embeddedInSST */) + checkDelRangeOnTable("t2", false /* embeddedInSST */) +} diff --git a/pkg/ccl/streamingccl/streamingtest/replication_helpers.go b/pkg/ccl/streamingccl/streamingtest/replication_helpers.go index a5bdc90b0a42..6d13de1968d1 100644 --- a/pkg/ccl/streamingccl/streamingtest/replication_helpers.go +++ b/pkg/ccl/streamingccl/streamingtest/replication_helpers.go @@ -65,14 +65,6 @@ func ResolvedAtLeast(lo hlc.Timestamp) FeedPredicate { } } -// ReceivedNewGeneration makes a FeedPredicate that matches when a GenerationEvent has -// been received. -func ReceivedNewGeneration() FeedPredicate { - return func(msg streamingccl.Event) bool { - return msg.Type() == streamingccl.GenerationEvent - } -} - // FeedSource is a source of events for a ReplicationFeed. type FeedSource interface { // Next returns the next event, and a flag indicating if there are more events @@ -112,12 +104,6 @@ func (rf *ReplicationFeed) ObserveResolved(ctx context.Context, lo hlc.Timestamp return minResolvedTimestamp(*rf.msg.GetResolvedSpans()) } -// ObserveGeneration consumes the feed until we received a GenerationEvent. Returns true. -func (rf *ReplicationFeed) ObserveGeneration(ctx context.Context) bool { - require.NoError(rf.t, rf.consumeUntil(ctx, ReceivedNewGeneration())) - return true -} - // Close cleans up any resources. func (rf *ReplicationFeed) Close(ctx context.Context) { rf.f.Close(ctx) diff --git a/pkg/ccl/streamingccl/streampb/stream.proto b/pkg/ccl/streamingccl/streampb/stream.proto index 9fb27c803783..dfd6e4faf0a4 100644 --- a/pkg/ccl/streamingccl/streampb/stream.proto +++ b/pkg/ccl/streamingccl/streampb/stream.proto @@ -71,6 +71,7 @@ message StreamEvent { message Batch { repeated roachpb.KeyValue key_values = 1 [(gogoproto.nullable) = false]; repeated roachpb.RangeFeedSSTable ssts = 2 [(gogoproto.nullable) = false]; + repeated roachpb.RangeFeedDeleteRange del_ranges = 3 [(gogoproto.nullable) = false]; } // Checkpoint represents stream checkpoint. diff --git a/pkg/ccl/streamingccl/streamproducer/BUILD.bazel b/pkg/ccl/streamingccl/streamproducer/BUILD.bazel index 45a5f7988abf..e49eb81068d5 100644 --- a/pkg/ccl/streamingccl/streamproducer/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamproducer/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "//pkg/jobs/jobsprotectedts", "//pkg/keys", "//pkg/kv", + "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvclient/rangefeed", "//pkg/kv/kvserver/protectedts", "//pkg/kv/kvserver/protectedts/ptpb", @@ -89,7 +90,9 @@ go_test( "//pkg/testutils", "//pkg/testutils/jobutils", "//pkg/testutils/serverutils", + "//pkg/testutils/skip", "//pkg/testutils/sqlutils", + "//pkg/testutils/storageutils", "//pkg/testutils/testcluster", "//pkg/util/hlc", "//pkg/util/leaktest", diff --git a/pkg/ccl/streamingccl/streamproducer/event_stream.go b/pkg/ccl/streamingccl/streamproducer/event_stream.go index dcfa762c2fbb..c308e6982bc4 100644 --- a/pkg/ccl/streamingccl/streamproducer/event_stream.go +++ b/pkg/ccl/streamingccl/streamproducer/event_stream.go @@ -13,9 +13,11 @@ import ( "fmt" "time" + "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" @@ -46,12 +48,12 @@ type eventStream struct { data tree.Datums // Data to send to the consumer // Fields below initialized when Start called. - rf *rangefeed.RangeFeed // Currently running rangefeed. - streamGroup ctxgroup.Group // Context group controlling stream execution. - eventsCh chan roachpb.RangeFeedEvent // Channel receiving rangefeed events. - errCh chan error // Signaled when error occurs in rangefeed. - streamCh chan tree.Datums // Channel signaled to forward datums to consumer. - sp *tracing.Span // Span representing the lifetime of the eventStream. + rf *rangefeed.RangeFeed // Currently running rangefeed. + streamGroup ctxgroup.Group // Context group controlling stream execution. + eventsCh chan kvcoord.RangeFeedMessage // Channel receiving rangefeed events. + errCh chan error // Signaled when error occurs in rangefeed. + streamCh chan tree.Datums // Channel signaled to forward datums to consumer. + sp *tracing.Span // Span representing the lifetime of the eventStream. } var _ eval.ValueGenerator = (*eventStream)(nil) @@ -82,7 +84,7 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) error { s.errCh = make(chan error) // Events channel gets RangeFeedEvents and is consumed by ValueGenerator. - s.eventsCh = make(chan roachpb.RangeFeedEvent) + s.eventsCh = make(chan kvcoord.RangeFeedMessage) // Stream channel receives datums to be sent to the consumer. s.streamCh = make(chan tree.Datums) @@ -98,6 +100,8 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) error { rangefeed.WithMemoryMonitor(s.mon), rangefeed.WithOnSSTable(s.onSSTable), + + rangefeed.WithOnDeleteRange(s.onDeleteRange), } frontier, err := span.MakeFrontier(s.spec.Spans...) @@ -223,7 +227,7 @@ func (s *eventStream) Close(ctx context.Context) { func (s *eventStream) onValue(ctx context.Context, value *roachpb.RangeFeedValue) { select { case <-ctx.Done(): - case s.eventsCh <- roachpb.RangeFeedEvent{Val: value}: + case s.eventsCh <- kvcoord.RangeFeedMessage{RangeFeedEvent: &roachpb.RangeFeedEvent{Val: value}}: log.VInfof(ctx, 1, "onValue: %s@%s", value.Key, value.Value.Timestamp) } } @@ -231,7 +235,7 @@ func (s *eventStream) onValue(ctx context.Context, value *roachpb.RangeFeedValue func (s *eventStream) onCheckpoint(ctx context.Context, checkpoint *roachpb.RangeFeedCheckpoint) { select { case <-ctx.Done(): - case s.eventsCh <- roachpb.RangeFeedEvent{Checkpoint: checkpoint}: + case s.eventsCh <- kvcoord.RangeFeedMessage{RangeFeedEvent: &roachpb.RangeFeedEvent{Checkpoint: checkpoint}}: log.VInfof(ctx, 1, "onCheckpoint: %s@%s", checkpoint.Span, checkpoint.ResolvedTS) } } @@ -244,7 +248,9 @@ func (s *eventStream) onSpanCompleted(ctx context.Context, sp roachpb.Span) erro select { case <-ctx.Done(): return ctx.Err() - case s.eventsCh <- roachpb.RangeFeedEvent{Checkpoint: &checkpoint}: + case s.eventsCh <- kvcoord.RangeFeedMessage{ + RangeFeedEvent: &roachpb.RangeFeedEvent{Checkpoint: &checkpoint}, + }: log.VInfof(ctx, 1, "onSpanCompleted: %s@%s", checkpoint.Span, checkpoint.ResolvedTS) return nil } @@ -255,12 +261,23 @@ func (s *eventStream) onSSTable( ) { select { case <-ctx.Done(): - case s.eventsCh <- roachpb.RangeFeedEvent{SST: sst}: + case s.eventsCh <- kvcoord.RangeFeedMessage{ + RangeFeedEvent: &roachpb.RangeFeedEvent{SST: sst}, + RegisteredSpan: registeredSpan, + }: log.VInfof(ctx, 1, "onSSTable: %s@%s with registered span %s", sst.Span, sst.WriteTS, registeredSpan) } } +func (s *eventStream) onDeleteRange(ctx context.Context, delRange *roachpb.RangeFeedDeleteRange) { + select { + case <-ctx.Done(): + case s.eventsCh <- kvcoord.RangeFeedMessage{RangeFeedEvent: &roachpb.RangeFeedEvent{DeleteRange: delRange}}: + log.VInfof(ctx, 1, "onDeleteRange: %s@%s", delRange.Span, delRange.Timestamp) + } +} + // makeCheckpoint generates checkpoint based on the frontier. func makeCheckpoint(f *span.Frontier) (checkpoint streampb.StreamEvent_StreamCheckpoint) { f.Entries(func(sp roachpb.Span, ts hlc.Timestamp) (done span.OpResult) { @@ -335,51 +352,13 @@ func (p *checkpointPacer) shouldCheckpoint( return false } -// Trim the received SST to only contain data within the boundaries of spans in the partition -// spec, and execute the specified operation on each roachpb.KeyValue in the trimmed SSTable. -func (s *eventStream) trimSST( - sst *roachpb.RangeFeedSSTable, op func(keyVal roachpb.KeyValue), -) error { - iter, err := storage.NewMemSSTIterator(sst.Data, true) - if err != nil { - return err - } - defer iter.Close() - - copyKV := func(mvccKey storage.MVCCKey, value []byte) roachpb.KeyValue { - keyCopy := make([]byte, len(mvccKey.Key)) - copy(keyCopy, mvccKey.Key) - valueCopy := make([]byte, len(value)) - copy(valueCopy, value) - return roachpb.KeyValue{ - Key: keyCopy, - Value: roachpb.Value{RawBytes: valueCopy, Timestamp: mvccKey.Timestamp}, - } - } - - return s.subscribedSpans.ForEach(func(span roachpb.Span) error { - iter.SeekGE(storage.MVCCKey{Key: span.Key}) - endKey := storage.MVCCKey{Key: span.EndKey} - for ; ; iter.Next() { - if ok, err := iter.Valid(); err != nil || !ok { - return err - } - if !iter.UnsafeKey().Less(endKey) { // passed the span boundary - break - } - op(copyKV(iter.UnsafeKey(), iter.UnsafeValue())) - } - return nil - }) -} - // Add a RangeFeedSSTable into current batch and return number of bytes added. func (s *eventStream) addSST( - sst *roachpb.RangeFeedSSTable, batch *streampb.StreamEvent_Batch, + sst *roachpb.RangeFeedSSTable, registeredSpan roachpb.Span, batch *streampb.StreamEvent_Batch, ) (int, error) { // We send over the whole SSTable if the sst span is within - // the subscribed spans boundary. - if s.subscribedSpans.Encloses(sst.Span) { + // the registered span boundaries. + if registeredSpan.Contains(sst.Span) { batch.Ssts = append(batch.Ssts, *sst) return sst.Size(), nil } @@ -388,10 +367,31 @@ func (s *eventStream) addSST( // TODO(casper): add metrics to track number of SSTs, and number of ssts // that are not inside the boundaries (and possible count+size of kvs in such ssts). size := 0 - if err := s.trimSST(sst, func(keyVal roachpb.KeyValue) { - batch.KeyValues = append(batch.KeyValues, keyVal) - size += keyVal.Size() - }); err != nil { + // Extract the received SST to only contain data within the boundaries of + // matching registered span. Execute the specified operations on each MVCC + // key value and each MVCCRangeKey value in the trimmed SSTable. + if err := streamingccl.ScanSST(sst, registeredSpan, + func(mvccKV storage.MVCCKeyValue) error { + batch.KeyValues = append(batch.KeyValues, roachpb.KeyValue{ + Key: mvccKV.Key.Key, + Value: roachpb.Value{ + RawBytes: mvccKV.Value, + Timestamp: mvccKV.Key.Timestamp, + }, + }) + size += batch.KeyValues[len(batch.KeyValues)-1].Size() + return nil + }, func(rangeKeyVal storage.MVCCRangeKeyValue) error { + batch.DelRanges = append(batch.DelRanges, roachpb.RangeFeedDeleteRange{ + Span: roachpb.Span{ + Key: rangeKeyVal.RangeKey.StartKey, + EndKey: rangeKeyVal.RangeKey.EndKey, + }, + Timestamp: rangeKeyVal.RangeKey.Timestamp, + }) + size += batch.DelRanges[len(batch.DelRanges)-1].Size() + return nil + }); err != nil { return 0, err } return size, nil @@ -413,12 +413,21 @@ func (s *eventStream) streamLoop(ctx context.Context, frontier *span.Frontier) e batchSize += keyValue.Size() } + addDelRange := func(delRange *roachpb.RangeFeedDeleteRange) error { + // DelRange's span is already trimmed to enclosed within + // the subscribed span, just emit it. + batch.DelRanges = append(batch.DelRanges, *delRange) + batchSize += delRange.Size() + return nil + } + maybeFlushBatch := func(force bool) error { if (force && batchSize > 0) || batchSize > int(s.spec.Config.BatchByteSize) { defer func() { batchSize = 0 batch.KeyValues = batch.KeyValues[:0] batch.Ssts = batch.Ssts[:0] + batch.DelRanges = batch.DelRanges[:0] }() return s.flushEvent(ctx, &streampb.StreamEvent{Batch: &batch}) } @@ -459,7 +468,7 @@ func (s *eventStream) streamLoop(ctx context.Context, frontier *span.Frontier) e } } case ev.SST != nil: - size, err := s.addSST(ev.SST, &batch) + size, err := s.addSST(ev.SST, ev.RegisteredSpan, &batch) if err != nil { return err } @@ -467,8 +476,14 @@ func (s *eventStream) streamLoop(ctx context.Context, frontier *span.Frontier) e if err := maybeFlushBatch(flushIfNeeded); err != nil { return err } + case ev.DeleteRange != nil: + if err := addDelRange(ev.DeleteRange); err != nil { + return err + } + if err := maybeFlushBatch(flushIfNeeded); err != nil { + return err + } default: - // TODO(erikgrinaker): Handle DeleteRange events (MVCC range tombstones). return errors.AssertionFailedf("unexpected event") } } diff --git a/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go b/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go index a755f2f5396c..2e2028005d85 100644 --- a/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go +++ b/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go @@ -13,6 +13,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "sort" "testing" "time" @@ -33,7 +34,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/distsql" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/storageutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -563,3 +566,136 @@ func TestCompleteStreamReplication(t *testing.T) { }) } } + +func sortDelRanges(receivedDelRanges []roachpb.RangeFeedDeleteRange) { + sort.Slice(receivedDelRanges, func(i, j int) bool { + if !receivedDelRanges[i].Timestamp.Equal(receivedDelRanges[j].Timestamp) { + return receivedDelRanges[i].Timestamp.Compare(receivedDelRanges[j].Timestamp) < 0 + } + if !receivedDelRanges[i].Span.Key.Equal(receivedDelRanges[j].Span.Key) { + return receivedDelRanges[i].Span.Key.Compare(receivedDelRanges[j].Span.Key) < 0 + } + return receivedDelRanges[i].Span.EndKey.Compare(receivedDelRanges[j].Span.EndKey) < 0 + }) +} + +func TestStreamDeleteRange(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + skip.UnderStressRace(t, "disabled under stress and race") + + h, cleanup := streamingtest.NewReplicationHelper(t, base.TestServerArgs{ + // Test hangs when run within the default test tenant. Tracked with + // #76378. + DisableDefaultTestTenant: true, + }) + defer cleanup() + srcTenant, cleanupTenant := h.CreateTenant(t, serverutils.TestTenantID()) + defer cleanupTenant() + + srcTenant.SQL.Exec(t, ` +CREATE DATABASE d; +CREATE TABLE d.t1(i int primary key, a string, b string); +CREATE TABLE d.t2(i int primary key, a string, b string); +CREATE TABLE d.t3(i int primary key, a string, b string); +INSERT INTO d.t1 (i) VALUES (1); +INSERT INTO d.t2 (i) VALUES (1); +INSERT INTO d.t3 (i) VALUES (1); +USE d; +`) + + ctx := context.Background() + rows := h.SysSQL.QueryStr(t, "SELECT crdb_internal.start_replication_stream($1)", srcTenant.ID.ToUint64()) + streamID := rows[0][0] + + const streamPartitionQuery = `SELECT * FROM crdb_internal.stream_partition($1, $2)` + // Only subscribe to table t1 and t2, not t3. + source, feed := startReplication(t, h, makePartitionStreamDecoder, + streamPartitionQuery, streamID, encodeSpec(t, h, srcTenant, h.SysServer.Clock().Now(), "t1", "t2")) + defer feed.Close(ctx) + + // TODO(casper): Replace with DROP TABLE once drop table uses the MVCC-compatible DelRange + tableSpan := func(table string) roachpb.Span { + desc := desctestutils.TestingGetPublicTableDescriptor( + h.SysServer.DB(), srcTenant.Codec, "d", table) + return desc.PrimaryIndexSpan(srcTenant.Codec) + } + + t1Span, t2Span, t3Span := tableSpan("t1"), tableSpan("t2"), tableSpan("t3") + // Range deleted is outside the subscribed spans + require.NoError(t, h.SysServer.DB().DelRangeUsingTombstone(ctx, t2Span.EndKey, t3Span.Key)) + // Range is t1s - t2e, emitting 2 events, t1s - t1e and t2s - t2e. + require.NoError(t, h.SysServer.DB().DelRangeUsingTombstone(ctx, t1Span.Key, t2Span.EndKey)) + // Range is t1e - t2sn, emitting t2s - t2sn. + require.NoError(t, h.SysServer.DB().DelRangeUsingTombstone(ctx, t1Span.EndKey, t2Span.Key.Next())) + + // Expected DelRange spans after sorting. + expectedDelRangeSpan1 := roachpb.Span{Key: t1Span.Key, EndKey: t1Span.EndKey} + expectedDelRangeSpan2 := roachpb.Span{Key: t2Span.Key, EndKey: t2Span.EndKey} + expectedDelRangeSpan3 := roachpb.Span{Key: t2Span.Key, EndKey: t2Span.Key.Next()} + + codec := source.codec.(*partitionStreamDecoder) + receivedDelRanges := make([]roachpb.RangeFeedDeleteRange, 0, 3) + for { + require.True(t, source.rows.Next()) + source.codec.decode() + if codec.e.Batch != nil { + receivedDelRanges = append(receivedDelRanges, codec.e.Batch.DelRanges...) + } + if len(receivedDelRanges) == 3 { + break + } + } + + sortDelRanges(receivedDelRanges) + require.Equal(t, expectedDelRangeSpan1, receivedDelRanges[0].Span) + require.Equal(t, expectedDelRangeSpan2, receivedDelRanges[1].Span) + require.Equal(t, expectedDelRangeSpan3, receivedDelRanges[2].Span) + + // Adding a SSTable that contains DeleteRange + batchHLCTime := h.SysServer.Clock().Now() + batchHLCTime.Logical = 0 + ts := int(batchHLCTime.WallTime) + data, start, end := storageutils.MakeSST(t, h.SysServer.ClusterSettings(), []interface{}{ + storageutils.PointKV(string(t2Span.Key), ts, "5"), + // Delete range from t1s - t2s, emitting t1s - t1e. + storageutils.RangeKV(string(t1Span.Key), string(t2Span.Key), ts, ""), + // Delete range from t1e - t2enn, emitting t2s - t2e. + storageutils.RangeKV(string(t1Span.EndKey), string(t2Span.EndKey.Next().Next()), ts, ""), + // Delete range for t2sn - t2en, which overlaps the range above on t2s - t2e, emitting nothing. + storageutils.RangeKV(string(t2Span.Key.Next()), string(t2Span.EndKey.Next()), ts, ""), + // Delete range for t3s - t3e, emitting nothing. + storageutils.RangeKV(string(t3Span.Key), string(t3Span.EndKey), ts, ""), + }) + expectedDelRange1 := roachpb.RangeFeedDeleteRange{Span: t1Span, Timestamp: batchHLCTime} + expectedDelRange2 := roachpb.RangeFeedDeleteRange{Span: t2Span, Timestamp: batchHLCTime} + require.Equal(t, t1Span.Key, start) + require.Equal(t, t3Span.EndKey, end) + + // Using same batch ts so that this SST can be emitted through rangefeed. + _, _, _, err := h.SysServer.DB().AddSSTableAtBatchTimestamp(ctx, start, end, data, false, + false, hlc.Timestamp{}, nil, false, batchHLCTime) + require.NoError(t, err) + + receivedDelRanges = receivedDelRanges[:0] + receivedKVs := make([]roachpb.KeyValue, 0) + for { + require.True(t, source.rows.Next()) + source.codec.decode() + if codec.e.Batch != nil { + require.Empty(t, codec.e.Batch.Ssts) + receivedKVs = append(receivedKVs, codec.e.Batch.KeyValues...) + receivedDelRanges = append(receivedDelRanges, codec.e.Batch.DelRanges...) + } + + if len(receivedDelRanges) == 2 && len(receivedKVs) == 1 { + break + } + } + + sortDelRanges(receivedDelRanges) + require.Equal(t, t2Span.Key, receivedKVs[0].Key) + require.Equal(t, batchHLCTime, receivedKVs[0].Value.Timestamp) + require.Equal(t, expectedDelRange1, receivedDelRanges[0]) + require.Equal(t, expectedDelRange2, receivedDelRanges[1]) +} diff --git a/pkg/ccl/streamingccl/utils.go b/pkg/ccl/streamingccl/utils.go new file mode 100644 index 000000000000..8bd4f9a0e27f --- /dev/null +++ b/pkg/ccl/streamingccl/utils.go @@ -0,0 +1,111 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streamingccl + +import ( + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" +) + +// ScanSST scans the SSTable in the given RangeFeedSSTable within +// 'scanWithin' boundaries and execute given operations on each +// emitted MVCCKeyValue and MVCCRangeKeyValue. +func ScanSST( + sst *roachpb.RangeFeedSSTable, + scanWithin roachpb.Span, + mvccKeyValOp func(key storage.MVCCKeyValue) error, + mvccRangeKeyValOp func(rangeKeyVal storage.MVCCRangeKeyValue) error, +) error { + rangeKVs := make([]*storage.MVCCRangeKeyValue, 0) + timestampToRangeKey := make(map[hlc.Timestamp]*storage.MVCCRangeKeyValue) + // Iterator may release fragmented ranges, we try to de-fragment them + // before we release roachpb.RangeFeedDeleteRange events. + mergeRangeKV := func(rangeKV storage.MVCCRangeKeyValue) { + // Range keys are emitted with increasing order in terms of start key, + // so we only need to check if the current range key can be concatenated behind + // previous one on the same timestamp. + lastKV, ok := timestampToRangeKey[rangeKV.RangeKey.Timestamp] + if ok && lastKV.RangeKey.EndKey.Equal(rangeKV.RangeKey.StartKey) { + lastKV.RangeKey.EndKey = rangeKV.RangeKey.EndKey + return + } + rangeKVs = append(rangeKVs, &rangeKV) + timestampToRangeKey[rangeKV.RangeKey.Timestamp] = rangeKVs[len(rangeKVs)-1] + } + + // We iterate points and ranges separately on the SST for clarity + // and simplicity. + pointIter, err := storage.NewPebbleMemSSTIterator(sst.Data, true, + storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsOnly, + // Only care about upper bound as we are iterating forward. + UpperBound: scanWithin.EndKey, + }) + if err != nil { + return err + } + defer pointIter.Close() + + for pointIter.SeekGE(storage.MVCCKey{Key: scanWithin.Key}); ; pointIter.Next() { + if valid, err := pointIter.Valid(); err != nil { + return err + } else if !valid { + break + } + if err = mvccKeyValOp(storage.MVCCKeyValue{ + Key: pointIter.Key(), + Value: pointIter.Value(), + }); err != nil { + return err + } + } + + rangeIter, err := storage.NewPebbleMemSSTIterator(sst.Data, true, + storage.IterOptions{ + KeyTypes: storage.IterKeyTypeRangesOnly, + UpperBound: scanWithin.EndKey, + }) + if err != nil { + return err + } + defer rangeIter.Close() + + for rangeIter.SeekGE(storage.MVCCKey{Key: scanWithin.Key}); ; rangeIter.Next() { + if valid, err := rangeIter.Valid(); err != nil { + return err + } else if !valid { + break + } + for _, rangeKeyVersion := range rangeIter.RangeKeys().Versions { + mvccVal, err := storage.DecodeMVCCValue(rangeKeyVersion.Value) + if err != nil { + return err + } + if !mvccVal.IsTombstone() { + return errors.Errorf("only expect range tombstone from MVCC range key: %s", rangeIter.RangeBounds()) + } + intersectedSpan := scanWithin.Intersect(rangeIter.RangeBounds()) + mergeRangeKV(storage.MVCCRangeKeyValue{ + RangeKey: storage.MVCCRangeKey{ + StartKey: intersectedSpan.Key.Clone(), + EndKey: intersectedSpan.EndKey.Clone(), + Timestamp: rangeKeyVersion.Timestamp}, + Value: rangeKeyVersion.Value, + }) + } + } + for _, rangeKey := range rangeKVs { + if err = mvccRangeKeyValOp(*rangeKey); err != nil { + return err + } + } + return nil +} diff --git a/pkg/ccl/streamingccl/utils_test.go b/pkg/ccl/streamingccl/utils_test.go new file mode 100644 index 000000000000..9443a87148af --- /dev/null +++ b/pkg/ccl/streamingccl/utils_test.go @@ -0,0 +1,132 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streamingccl + +import ( + "sort" + "testing" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/testutils/storageutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" +) + +func sortMVCCKVs(kvs []storage.MVCCKeyValue) { + sort.Slice(kvs, func(i, j int) bool { + if !kvs[i].Key.Timestamp.Equal(kvs[j].Key.Timestamp) { + return kvs[i].Key.Timestamp.Compare(kvs[j].Key.Timestamp) < 0 + } + return kvs[i].Key.Key.Compare(kvs[j].Key.Key) < 0 + }) +} + +func sortMVCCRangeKeys(rangeKey []storage.MVCCRangeKey) { + sort.Slice(rangeKey, func(i, j int) bool { + if !rangeKey[i].Timestamp.Equal(rangeKey[j].Timestamp) { + return rangeKey[i].Timestamp.Compare(rangeKey[j].Timestamp) < 0 + } + if !rangeKey[i].StartKey.Equal(rangeKey[j].StartKey) { + return rangeKey[i].StartKey.Compare(rangeKey[j].StartKey) < 0 + } + return rangeKey[i].EndKey.Compare(rangeKey[j].EndKey) < 0 + }) +} + +func TestScanSST(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + require.True(t, roachpb.Key("ca").Compare(roachpb.Key("c")) > 0) + + cs := cluster.MakeTestingClusterSettingsWithVersions( + clusterversion.TestingBinaryVersion, + clusterversion.TestingBinaryMinSupportedVersion, + true, /* initializeVersion */ + ) + data, start, end := storageutils.MakeSST(t, cs, []interface{}{ + storageutils.PointKV("ba", 30, "30"), + storageutils.PointKV("c", 5, "5"), + storageutils.PointKV("ca", 30, "30"), + // Delete range from t1e - t2s, emitting t1s - t1e. + storageutils.RangeKV("a", "c", 10, ""), + // Delete range from t1e - t2enn, emitting t2s - t2e. + storageutils.RangeKV("b", "cb", 15, ""), + // Delete range for t2sn - t2en, emitting t2sn - t2e. + storageutils.RangeKV("ca", "da", 20, ""), + // Delete range for t3s - t3e, emitting nothing. + storageutils.RangeKV("e", "f", 25, ""), + }) + + checkScan := func(scanWithin roachpb.Span, + expectedPointKVs []storage.MVCCKeyValue, + expectedRangeKeys []storage.MVCCRangeKey, + ) { + actualPointKVs := make([]storage.MVCCKeyValue, 0, len(expectedPointKVs)) + actualRangeKVs := make([]storage.MVCCRangeKey, 0, len(expectedRangeKeys)) + require.NoError(t, ScanSST(&roachpb.RangeFeedSSTable{ + Data: data, + Span: roachpb.Span{Key: start, EndKey: end}, + WriteTS: hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}, + }, scanWithin, func(mvccKV storage.MVCCKeyValue) error { + actualPointKVs = append(actualPointKVs, mvccKV) + return nil + }, func(mvccRangeKV storage.MVCCRangeKeyValue) error { + actualRangeKVs = append(actualRangeKVs, mvccRangeKV.RangeKey) + return nil + })) + sortMVCCKVs(actualPointKVs) + sortMVCCRangeKeys(actualRangeKVs) + require.Equal(t, expectedPointKVs, actualPointKVs) + require.Equal(t, expectedRangeKeys, actualRangeKVs) + } + + checkScan(roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}, + []storage.MVCCKeyValue{}, + []storage.MVCCRangeKey{ + storageutils.RangeKey("a", "b", 10), + }) + + checkScan(roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}, + []storage.MVCCKeyValue{ + storageutils.PointKV("c", 5, "5"), + storageutils.PointKV("ca", 30, "30"), + }, []storage.MVCCRangeKey{ + storageutils.RangeKey("c", "cb", 15), + storageutils.RangeKey("ca", "d", 20), + }) + + checkScan(roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("d")}, + []storage.MVCCKeyValue{ + storageutils.PointKV("c", 5, "5"), + storageutils.PointKV("ba", 30, "30"), + storageutils.PointKV("ca", 30, "30"), + }, []storage.MVCCRangeKey{ + storageutils.RangeKey("a", "c", 10), + storageutils.RangeKey("b", "cb", 15), + storageutils.RangeKey("ca", "d", 20), + }) + + checkScan(roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("c")}, + []storage.MVCCKeyValue{ + storageutils.PointKV("ba", 30, "30"), + }, []storage.MVCCRangeKey{ + storageutils.RangeKey("a", "c", 10), + storageutils.RangeKey("b", "c", 15), + }) + + checkScan(roachpb.Span{Key: roachpb.Key("da"), EndKey: roachpb.Key("e")}, + []storage.MVCCKeyValue{}, []storage.MVCCRangeKey{}) +} diff --git a/pkg/kv/kvclient/kvcoord/rangefeed_message.go b/pkg/kv/kvclient/kvcoord/rangefeed_message.go index 0244a3c88797..5d4f222194d8 100644 --- a/pkg/kv/kvclient/kvcoord/rangefeed_message.go +++ b/pkg/kv/kvclient/kvcoord/rangefeed_message.go @@ -18,7 +18,7 @@ type RangeFeedMessage struct { // RangeFeed event this message holds. *roachpb.RangeFeedEvent - // The span of the rangefeed registration that overlaps with SST span if - // event has an underlying roachpb.RangeFeedSSTable event. + // The span of the rangefeed registration that overlaps with the key or span + // in the RangeFeed event. RegisteredSpan roachpb.Span } diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index 09a02d38a775..5c9832cbc36c 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -376,7 +376,6 @@ func TestReplicaRangefeed(t *testing.T) { storage.MVCCValue{Value: expVal7q})) require.NoError(t, sstWriter.Finish()) - // Does this mean all ts in sst should be equal to the batch ts? _, _, _, pErr = store1.DB().AddSSTableAtBatchTimestamp(ctx, roachpb.Key("b"), roachpb.Key("r"), sstFile.Data(), false /* disallowConflicts */, false /* disallowShadowing */, hlc.Timestamp{}, nil, /* stats */ true /* ingestAsWrites */, ts7)