diff --git a/pkg/kv/kvclient/rangefeed/config.go b/pkg/kv/kvclient/rangefeed/config.go index 73ea4d522ff3..7a391aef076f 100644 --- a/pkg/kv/kvclient/rangefeed/config.go +++ b/pkg/kv/kvclient/rangefeed/config.go @@ -13,6 +13,7 @@ package rangefeed import ( "context" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/retry" ) @@ -27,6 +28,7 @@ type config struct { withInitialScan bool withDiff bool onInitialScanError OnInitialScanError + onCheckpoint OnCheckpoint } type optionFunc func(*config) @@ -80,6 +82,17 @@ func WithRetry(options retry.Options) Option { }) } +// OnCheckpoint is called when a rangefeed checkpoint occurs. +type OnCheckpoint func(ctx context.Context, checkpoint *roachpb.RangeFeedCheckpoint) + +// WithOnCheckpoint sets up a callback that's invoked whenever a check point +// event is emitted. +func WithOnCheckpoint(f OnCheckpoint) Option { + return optionFunc(func(c *config) { + c.onCheckpoint = f + }) +} + func initConfig(c *config, options []Option) { *c = config{} // the default config is its zero value for _, o := range options { diff --git a/pkg/kv/kvclient/rangefeed/rangefeed.go b/pkg/kv/kvclient/rangefeed/rangefeed.go index aff22fe5135e..c98fd6c17bd5 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed.go @@ -321,6 +321,9 @@ func (f *RangeFeed) processEvents( if _, err := frontier.Forward(ev.Checkpoint.Span, ev.Checkpoint.ResolvedTS); err != nil { return err } + if f.onCheckpoint != nil { + f.onCheckpoint(ctx, ev.Checkpoint) + } case ev.Error != nil: // Intentionally do nothing, we'll get an error returned from the // call to RangeFeed.