diff --git a/pkg/ccl/streamingccl/settings.go b/pkg/ccl/streamingccl/settings.go index 343883641c79..a7ec05343a75 100644 --- a/pkg/ccl/streamingccl/settings.go +++ b/pkg/ccl/streamingccl/settings.go @@ -57,6 +57,14 @@ var StreamReplicationMinCheckpointFrequency = settings.RegisterDurationSetting( settings.WithName("physical_replication.producer.min_checkpoint_frequency"), ) +// StreamProducerMuxRangefeeds controls whether we start event streams using the mux rangefeeds. +var StreamProducerMuxRangefeeds = settings.RegisterBoolSetting( + settings.SystemOnly, + "physical_replication.producer.mux_rangefeeds.enabled", + "controls whether rangefeeds used for physical replication use mux rangefeeds", + true, +) + // StreamReplicationConsumerHeartbeatFrequency controls frequency the stream replication // destination cluster sends heartbeat to the source cluster to keep the stream alive. var StreamReplicationConsumerHeartbeatFrequency = settings.RegisterDurationSetting( diff --git a/pkg/ccl/streamingccl/streamproducer/event_stream.go b/pkg/ccl/streamingccl/streamproducer/event_stream.go index f890afc37ead..d8ad4d93596f 100644 --- a/pkg/ccl/streamingccl/streamproducer/event_stream.go +++ b/pkg/ccl/streamingccl/streamproducer/event_stream.go @@ -14,6 +14,7 @@ import ( "runtime/pprof" "time" + "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -108,6 +109,8 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) error { s.doneChan = make(chan struct{}) + useMux := streamingccl.StreamProducerMuxRangefeeds.Get(&s.execCfg.Settings.SV) + // Common rangefeed options. opts := []rangefeed.Option{ rangefeed.WithPProfLabel("job", fmt.Sprintf("id=%d", s.streamID)), @@ -120,7 +123,7 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) error { rangefeed.WithMemoryMonitor(s.mon), rangefeed.WithOnSSTable(s.onSSTable), - rangefeed.WithMuxRangefeed(true), + rangefeed.WithMuxRangefeed(useMux), rangefeed.WithOnDeleteRange(s.onDeleteRange), }