Skip to content

Commit

Permalink
streamingccl: add setting to disable mux rangefeeds
Browse files Browse the repository at this point in the history
Since mux rangefeeds are still default off, we want the ability to opt
out of them if we find an issue with them.

Epic: none

Release note: None
  • Loading branch information
stevendanna committed Oct 30, 2023
1 parent 1c75a04 commit e3b26a7
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 1 deletion.
8 changes: 8 additions & 0 deletions pkg/ccl/streamingccl/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ var StreamReplicationMinCheckpointFrequency = settings.RegisterDurationSetting(
settings.WithName("physical_replication.producer.min_checkpoint_frequency"),
)

// StreamProducerMuxRangefeeds controls whether we start event streams using the mux rangefeeds.
var StreamProducerMuxRangefeeds = settings.RegisterBoolSetting(
settings.SystemOnly,
"physical_replication.producer.mux_rangefeeds.enabled",
"controls whether rangefeeds used for physical replication use mux rangefeeds",
true,
)

// StreamReplicationConsumerHeartbeatFrequency controls frequency the stream replication
// destination cluster sends heartbeat to the source cluster to keep the stream alive.
var StreamReplicationConsumerHeartbeatFrequency = settings.RegisterDurationSetting(
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/streamingccl/streamproducer/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"runtime/pprof"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand Down Expand Up @@ -108,6 +109,8 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) error {

s.doneChan = make(chan struct{})

useMux := streamingccl.StreamProducerMuxRangefeeds.Get(&s.execCfg.Settings.SV)

// Common rangefeed options.
opts := []rangefeed.Option{
rangefeed.WithPProfLabel("job", fmt.Sprintf("id=%d", s.streamID)),
Expand All @@ -120,7 +123,7 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) error {
rangefeed.WithMemoryMonitor(s.mon),

rangefeed.WithOnSSTable(s.onSSTable),
rangefeed.WithMuxRangefeed(true),
rangefeed.WithMuxRangefeed(useMux),
rangefeed.WithOnDeleteRange(s.onDeleteRange),
}

Expand Down

0 comments on commit e3b26a7

Please sign in to comment.