From 72e9869e51d05c16f62a4d5e741a4935e4fad5d1 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Tue, 24 Aug 2021 07:26:16 -0400 Subject: [PATCH] kvclient/rangefeed: emit checkpoint events Grafted from #69269. This seems like a useful primitive for users of this library. We intend to use it in #69661 and #69614. Release note: None Co-authored-by: irfan sharif --- pkg/kv/kvclient/rangefeed/config.go | 13 +++++++++++++ pkg/kv/kvclient/rangefeed/rangefeed.go | 3 +++ 2 files changed, 16 insertions(+) diff --git a/pkg/kv/kvclient/rangefeed/config.go b/pkg/kv/kvclient/rangefeed/config.go index 73ea4d522ff3..7a391aef076f 100644 --- a/pkg/kv/kvclient/rangefeed/config.go +++ b/pkg/kv/kvclient/rangefeed/config.go @@ -13,6 +13,7 @@ package rangefeed import ( "context" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/retry" ) @@ -27,6 +28,7 @@ type config struct { withInitialScan bool withDiff bool onInitialScanError OnInitialScanError + onCheckpoint OnCheckpoint } type optionFunc func(*config) @@ -80,6 +82,17 @@ func WithRetry(options retry.Options) Option { }) } +// OnCheckpoint is called when a rangefeed checkpoint occurs. +type OnCheckpoint func(ctx context.Context, checkpoint *roachpb.RangeFeedCheckpoint) + +// WithOnCheckpoint sets up a callback that's invoked whenever a check point +// event is emitted. +func WithOnCheckpoint(f OnCheckpoint) Option { + return optionFunc(func(c *config) { + c.onCheckpoint = f + }) +} + func initConfig(c *config, options []Option) { *c = config{} // the default config is its zero value for _, o := range options { diff --git a/pkg/kv/kvclient/rangefeed/rangefeed.go b/pkg/kv/kvclient/rangefeed/rangefeed.go index aff22fe5135e..c98fd6c17bd5 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed.go @@ -321,6 +321,9 @@ func (f *RangeFeed) processEvents( if _, err := frontier.Forward(ev.Checkpoint.Span, ev.Checkpoint.ResolvedTS); err != nil { return err } + if f.onCheckpoint != nil { + f.onCheckpoint(ctx, ev.Checkpoint) + } case ev.Error != nil: // Intentionally do nothing, we'll get an error returned from the // call to RangeFeed.