diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 1f76466de106..d83c872cda07 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -99,7 +99,7 @@ func getSink( switch { case u.Scheme == changefeedbase.SinkSchemeNull: - return makeNullSink(sinkURL{URL: u}) + return makeNullSink(sinkURL{URL: u}, m) case u.Scheme == changefeedbase.SinkSchemeKafka: return validateOptionsAndMakeSink(changefeedbase.KafkaValidOptions, func() (Sink, error) { return makeKafkaSink(ctx, sinkURL{URL: u}, feedCfg.Targets, feedCfg.Opts, m) @@ -359,12 +359,13 @@ func (s *bufferSink) Dial() error { } type nullSink struct { - ticker *time.Ticker + ticker *time.Ticker + metrics *sliMetrics } var _ Sink = (*nullSink)(nil) -func makeNullSink(u sinkURL) (Sink, error) { +func makeNullSink(u sinkURL, m *sliMetrics) (Sink, error) { var pacer *time.Ticker if delay := u.consumeParam(`delay`); delay != "" { pace, err := time.ParseDuration(delay) @@ -373,7 +374,7 @@ func makeNullSink(u sinkURL) (Sink, error) { } pacer = time.NewTicker(pace) } - return &nullSink{ticker: pacer}, nil + return &nullSink{ticker: pacer, metrics: m}, nil } func (n *nullSink) pace(ctx context.Context) error { @@ -397,7 +398,7 @@ func (n *nullSink) EmitRow( r kvevent.Alloc, ) error { defer r.Release(ctx) - + defer n.metrics.recordEmittedMessages()(1, mvcc, len(key)+len(value), sinkDoesNotCompress) if err := n.pace(ctx); err != nil { return err } @@ -411,6 +412,7 @@ func (n *nullSink) EmitRow( func (n *nullSink) EmitResolvedTimestamp( ctx context.Context, encoder Encoder, resolved hlc.Timestamp, ) error { + defer n.metrics.recordResolvedCallback()() if err := n.pace(ctx); err != nil { return err } @@ -423,6 +425,7 @@ func (n *nullSink) EmitResolvedTimestamp( // Flush implements Sink interface. func (n *nullSink) Flush(ctx context.Context) error { + defer n.metrics.recordFlushRequestCallback()() if log.V(2) { log.Info(ctx, "flushing") } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index ab986b7beb6b..26688a01a9a8 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -22,6 +22,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -41,6 +43,13 @@ type singleRangeInfo struct { token rangecache.EvictionToken } +var useDedicatedRangefeedConnectionClass = settings.RegisterBoolSetting( + "kv.rangefeed.use_dedicated_connection_class.enabled", + "uses dedicated connection when running rangefeeds", + util.ConstantWithMetamorphicTestBool( + "kv.rangefeed.use_dedicated_connection_class.enabled", false), +) + // RangeFeed divides a RangeFeed request on range boundaries and establishes a // RangeFeed to each of the individual ranges. It streams back results on the // provided channel. @@ -368,7 +377,7 @@ func (ds *DistSender) singleRangeFeed( replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), latencyFn) // The RangeFeed is not used for system critical traffic so use a DefaultClass // connection regardless of the range. - opts := SendOptions{class: rpc.DefaultClass} + opts := SendOptions{class: connectionClass(&ds.st.SV)} transport, err := ds.transportFactory(opts, ds.nodeDialer, replicas) if err != nil { return args.Timestamp, err @@ -424,3 +433,10 @@ func (ds *DistSender) singleRangeFeed( } } } + +func connectionClass(sv *settings.Values) rpc.ConnectionClass { + if useDedicatedRangefeedConnectionClass.Get(sv) { + return rpc.RangefeedClass + } + return rpc.DefaultClass +} diff --git a/pkg/rpc/connection_class.go b/pkg/rpc/connection_class.go index 1e80edb1fa9d..2193708fc03f 100644 --- a/pkg/rpc/connection_class.go +++ b/pkg/rpc/connection_class.go @@ -36,6 +36,8 @@ const ( DefaultClass ConnectionClass = iota // SystemClass is the ConnectionClass used for system traffic. SystemClass + // RangefeedClass is the ConnectionClass used for rangefeeds. + RangefeedClass // NumConnectionClasses is the number of valid ConnectionClass values. NumConnectionClasses int = iota @@ -43,8 +45,9 @@ const ( // connectionClassName maps classes to their name. var connectionClassName = map[ConnectionClass]string{ - DefaultClass: "default", - SystemClass: "system", + DefaultClass: "default", + SystemClass: "system", + RangefeedClass: "rangefeed", } // String implements the fmt.Stringer interface. diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index e7bbe99b3f31..cb4c93405b88 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -65,9 +65,32 @@ const ( ) const ( - defaultWindowSize = 65535 - initialWindowSize = defaultWindowSize * 32 // for an RPC + defaultWindowSize = 65535 +) + +func getWindowSize(name string, c ConnectionClass, defaultSize int) int32 { + const maxWindowSize = defaultWindowSize * 32 + s := envutil.EnvOrDefaultInt(name, defaultSize) + if s > maxWindowSize { + log.Warningf(context.Background(), "%s value too large; trimmed to %d", name, maxWindowSize) + s = maxWindowSize + } + if s <= defaultWindowSize { + log.Warningf(context.Background(), + "%s RPC will use dynamic window sizes due to %s value lower than %d", c, name, defaultSize) + } + return int32(s) +} + +var ( + // for an RPC + initialWindowSize = getWindowSize( + "COCKROACH_RPC_INITIAL_WINDOW_SIZE", DefaultClass, defaultWindowSize*32) initialConnWindowSize = initialWindowSize * 16 // for a connection + + // for RangeFeed RPC + rangefeedInitialWindowSize = getWindowSize( + "COCKROACH_RANGEFEED_RPC_INITIAL_WINDOW_SIZE", RangefeedClass, 2*defaultWindowSize /* 128K */) ) // GRPC Dialer connection timeout. 20s matches default value that is @@ -1016,9 +1039,12 @@ func (ctx *Context) grpcDialRaw( Backoff: backoffConfig, MinConnectTimeout: minConnectionTimeout})) dialOpts = append(dialOpts, grpc.WithKeepaliveParams(clientKeepalive)) - dialOpts = append(dialOpts, - grpc.WithInitialWindowSize(initialWindowSize), - grpc.WithInitialConnWindowSize(initialConnWindowSize)) + dialOpts = append(dialOpts, grpc.WithInitialConnWindowSize(initialConnWindowSize)) + if class == RangefeedClass { + dialOpts = append(dialOpts, grpc.WithInitialWindowSize(rangefeedInitialWindowSize)) + } else { + dialOpts = append(dialOpts, grpc.WithInitialWindowSize(initialWindowSize)) + } dialer := onlyOnceDialer{ redialChan: make(chan struct{}),