diff --git a/pkg/kv/kvclient/rangefeed/BUILD.bazel b/pkg/kv/kvclient/rangefeed/BUILD.bazel index 60fa751f270b..727d5ebb7494 100644 --- a/pkg/kv/kvclient/rangefeed/BUILD.bazel +++ b/pkg/kv/kvclient/rangefeed/BUILD.bazel @@ -79,6 +79,7 @@ go_test( "//pkg/security", "//pkg/security/securitytest", "//pkg/server", + "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/testcluster", "//pkg/util/encoding", @@ -86,6 +87,7 @@ go_test( "//pkg/util/leaktest", "//pkg/util/retry", "//pkg/util/stop", + "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", "@com_github_golang_mock//gomock", diff --git a/pkg/kv/kvclient/rangefeed/config.go b/pkg/kv/kvclient/rangefeed/config.go index 73ea4d522ff3..7a391aef076f 100644 --- a/pkg/kv/kvclient/rangefeed/config.go +++ b/pkg/kv/kvclient/rangefeed/config.go @@ -13,6 +13,7 @@ package rangefeed import ( "context" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/retry" ) @@ -27,6 +28,7 @@ type config struct { withInitialScan bool withDiff bool onInitialScanError OnInitialScanError + onCheckpoint OnCheckpoint } type optionFunc func(*config) @@ -80,6 +82,17 @@ func WithRetry(options retry.Options) Option { }) } +// OnCheckpoint is called when a rangefeed checkpoint occurs. +type OnCheckpoint func(ctx context.Context, checkpoint *roachpb.RangeFeedCheckpoint) + +// WithOnCheckpoint sets up a callback that's invoked whenever a check point +// event is emitted. +func WithOnCheckpoint(f OnCheckpoint) Option { + return optionFunc(func(c *config) { + c.onCheckpoint = f + }) +} + func initConfig(c *config, options []Option) { *c = config{} // the default config is its zero value for _, o := range options { diff --git a/pkg/kv/kvclient/rangefeed/rangefeed.go b/pkg/kv/kvclient/rangefeed/rangefeed.go index aff22fe5135e..c98fd6c17bd5 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed.go @@ -321,6 +321,9 @@ func (f *RangeFeed) processEvents( if _, err := frontier.Forward(ev.Checkpoint.Span, ev.Checkpoint.ResolvedTS); err != nil { return err } + if f.onCheckpoint != nil { + f.onCheckpoint(ctx, ev.Checkpoint) + } case ev.Error != nil: // Intentionally do nothing, we'll get an error returned from the // call to RangeFeed. diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go index 9b130b8374d8..47ab2b2f9a2c 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go @@ -12,14 +12,19 @@ package rangefeed_test import ( "context" + "errors" + "sync" "testing" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/stretchr/testify/require" ) @@ -64,16 +69,18 @@ func TestRangeFeedIntegration(t *testing.T) { require.NoError(t, err) rows := make(chan *roachpb.RangeFeedValue) initialScanDone := make(chan struct{}) - r, err := f.RangeFeed(ctx, "test", sp, afterB, func( - ctx context.Context, value *roachpb.RangeFeedValue, - ) { - select { - case rows <- value: - case <-ctx.Done(): - } - }, rangefeed.WithDiff(), rangefeed.WithInitialScan(func(ctx context.Context) { - close(initialScanDone) - })) + r, err := f.RangeFeed(ctx, "test", sp, afterB, + func(ctx context.Context, value *roachpb.RangeFeedValue) { + select { + case rows <- value: + case <-ctx.Done(): + } + }, + rangefeed.WithDiff(), + rangefeed.WithInitialScan(func(ctx context.Context) { + close(initialScanDone) + }), + ) require.NoError(t, err) defer r.Close() { @@ -106,3 +113,100 @@ func TestRangeFeedIntegration(t *testing.T) { require.Equal(t, int64(4), updated) } } + +// TestWithOnCheckpoint verifies that we correctly emit rangefeed checkpoint +// events. +func TestWithOnCheckpoint(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + db := tc.Server(0).DB() + scratchKey := tc.ScratchRange(t) + scratchKey = scratchKey[:len(scratchKey):len(scratchKey)] + mkKey := func(k string) roachpb.Key { + return encoding.EncodeStringAscending(scratchKey, k) + } + + sp := roachpb.Span{ + Key: scratchKey, + EndKey: scratchKey.PrefixEnd(), + } + { + // Enable rangefeeds, otherwise the thing will retry until they are enabled. + _, err := tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.rangefeed.enabled = true") + require.NoError(t, err) + } + { + // Lower the closed timestamp target duration to speed up the test. + _, err := tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'") + require.NoError(t, err) + } + + f, err := rangefeed.NewFactory(tc.Stopper(), db, nil) + require.NoError(t, err) + + var mu syncutil.RWMutex + var afterWriteTS hlc.Timestamp + checkpoints := make(chan *roachpb.RangeFeedCheckpoint) + + // We need to start a goroutine that reads of the checkpoints channel, so to + // not block the rangefeed itself. + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + // We should expect a checkpoint event covering the key we just wrote, at a + // timestamp greater than when we wrote it. + testutils.SucceedsSoon(t, func() error { + for { + select { + case c := <-checkpoints: + mu.RLock() + writeTSUnset := afterWriteTS.IsEmpty() + mu.RUnlock() + if writeTSUnset { + return errors.New("write to key hasn't gone through yet") + } + + if afterWriteTS.LessEq(c.ResolvedTS) && c.Span.ContainsKey(mkKey("a")) { + return nil + } + default: + return errors.New("no valid checkpoints found") + } + } + }) + }() + + rows := make(chan *roachpb.RangeFeedValue) + r, err := f.RangeFeed(ctx, "test", sp, db.Clock().Now(), + func(ctx context.Context, value *roachpb.RangeFeedValue) { + select { + case rows <- value: + case <-ctx.Done(): + } + }, + rangefeed.WithOnCheckpoint(func(ctx context.Context, checkpoint *roachpb.RangeFeedCheckpoint) { + select { + case checkpoints <- checkpoint: + case <-ctx.Done(): + } + }), + ) + require.NoError(t, err) + defer r.Close() + + require.NoError(t, db.Put(ctx, mkKey("a"), 1)) + mu.Lock() + afterWriteTS = db.Clock().Now() + mu.Unlock() + { + v := <-rows + require.Equal(t, mkKey("a"), v.Key) + } + + wg.Wait() +}