diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index af6736fde112..1c4a1ec57ff6 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -14,6 +14,7 @@ import ( "sort" "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" @@ -27,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -263,6 +265,14 @@ func changefeedPlanHook( details.Opts[optKeyInValue] = `` } + settings := p.ExecCfg().Settings + // Changefeeds are based on the Rangefeed abstraction, which requires the + // `kv.rangefeed.enabled` setting to be true. + if !storage.RangefeedEnabled.Get(&settings.SV) { + return errors.Errorf("rangefeeds require the kv.rangefeed.enabled setting. See " + + base.DocsURL(`change-data-capture.html#enable-rangefeeds-to-reduce-latency`)) + } + // Feature telemetry telemetrySink := parsedSink.Scheme if telemetrySink == `` { @@ -277,7 +287,6 @@ func changefeedPlanHook( return MaybeStripRetryableErrorMarker(err) } - settings := p.ExecCfg().Settings if err := utilccl.CheckEnterpriseEnabled( settings, p.ExecCfg().ClusterID(), p.ExecCfg().Organization(), "CHANGEFEED", ); err != nil { diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index b0fa47086c0b..4338280e5b15 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -1288,7 +1288,7 @@ func TestChangefeedErrors(t *testing.T) { t, `rangefeeds require the kv.rangefeed.enabled setting`, `EXPERIMENTAL CHANGEFEED FOR rangefeed_off`, ) - sqlDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled TO DEFAULT`) + sqlDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) sqlDB.ExpectErr( t, `unknown format: nope`, diff --git a/pkg/storage/replica_rangefeed.go b/pkg/storage/replica_rangefeed.go index 029bfffad266..2db59e1d11a1 100644 --- a/pkg/storage/replica_rangefeed.go +++ b/pkg/storage/replica_rangefeed.go @@ -15,7 +15,6 @@ import ( "fmt" "time" - "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" @@ -128,10 +127,6 @@ func (i iteratorWithCloser) Close() { func (r *Replica) RangeFeed( args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, ) *roachpb.Error { - if !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) { - return roachpb.NewErrorf("rangefeeds require the kv.rangefeed.enabled setting. See " + - base.DocsURL(`change-data-capture.html#enable-rangefeeds-to-reduce-latency`)) - } ctx := r.AnnotateCtx(stream.Context()) var rspan roachpb.RSpan