From a0841b468942cd18c359bb3286ad86ddd1aa9821 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Mon, 20 Dec 2021 14:25:39 -0500 Subject: [PATCH 1/2] changefeedccl: Have null sink record stats. Record stats while discarding data. Release Notes: None --- pkg/ccl/changefeedccl/sink.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 1f76466de106..d83c872cda07 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -99,7 +99,7 @@ func getSink( switch { case u.Scheme == changefeedbase.SinkSchemeNull: - return makeNullSink(sinkURL{URL: u}) + return makeNullSink(sinkURL{URL: u}, m) case u.Scheme == changefeedbase.SinkSchemeKafka: return validateOptionsAndMakeSink(changefeedbase.KafkaValidOptions, func() (Sink, error) { return makeKafkaSink(ctx, sinkURL{URL: u}, feedCfg.Targets, feedCfg.Opts, m) @@ -359,12 +359,13 @@ func (s *bufferSink) Dial() error { } type nullSink struct { - ticker *time.Ticker + ticker *time.Ticker + metrics *sliMetrics } var _ Sink = (*nullSink)(nil) -func makeNullSink(u sinkURL) (Sink, error) { +func makeNullSink(u sinkURL, m *sliMetrics) (Sink, error) { var pacer *time.Ticker if delay := u.consumeParam(`delay`); delay != "" { pace, err := time.ParseDuration(delay) @@ -373,7 +374,7 @@ func makeNullSink(u sinkURL) (Sink, error) { } pacer = time.NewTicker(pace) } - return &nullSink{ticker: pacer}, nil + return &nullSink{ticker: pacer, metrics: m}, nil } func (n *nullSink) pace(ctx context.Context) error { @@ -397,7 +398,7 @@ func (n *nullSink) EmitRow( r kvevent.Alloc, ) error { defer r.Release(ctx) - + defer n.metrics.recordEmittedMessages()(1, mvcc, len(key)+len(value), sinkDoesNotCompress) if err := n.pace(ctx); err != nil { return err } @@ -411,6 +412,7 @@ func (n *nullSink) EmitRow( func (n *nullSink) EmitResolvedTimestamp( ctx context.Context, encoder Encoder, resolved hlc.Timestamp, ) error { + defer n.metrics.recordResolvedCallback()() if err := n.pace(ctx); err != nil { return err } @@ -423,6 +425,7 @@ func (n *nullSink) EmitResolvedTimestamp( // Flush implements Sink interface. func (n *nullSink) Flush(ctx context.Context) error { + defer n.metrics.recordFlushRequestCallback()() if log.V(2) { log.Info(ctx, "flushing") } From 5cf23b8fa1359e58020b49c8e10c295867c492b5 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Wed, 22 Dec 2021 15:52:29 -0500 Subject: [PATCH 2/2] kv,rpc: Introduce dedicated rangefeed connection class. Rangefeeds executing against very large tables may experience OOMs under certain conditions. The underlying reason for this is that cockroach uses 2MB per gRPC stream buffer size to improve system throughput (particularly under high latency scenarios). However, because the rangefeed against a large table establishes a dedicated stream for each range, the possiblity of an OOM exist if there are many streams from which events are not consumed quickly enough. This PR does two things. First, it introduces a dedicated RPC connection class for use with rangefeeds. The rangefeed connnection class configures rangefeed streams to use less memory per stream than the default connection class. The initial window size for rangefeed connection class can be adjusted by updating `COCKROACH_RANGEFEED_RPC_INITIAL_WINDOW_SIZE` environment variable whose default is 128KB. For rangefeeds to use this new connection class, a `kv.rangefeed.use_dedicated_connection_class.enabled` setting must be turned on. Another change in this PR is that the default RPC window size can be adjusted via `COCKROACH_RPC_INITIAL_WINDOW_SIZE` environment variable. The default for this variable is kept at 2MB. Changing the values of either of those variables is an advanced operation and should not be taken lightly since changes to those variables impact all aspects of cockroach performance characteristics. Values larger than 2MB will be trimmed to 2MB. Setting either of those to 64KB or below will turn on "dynamic window" gRPC window sizes. It should be noted that this change is a partial mitigation, and not a complete fix. A full fix will require rework of rangefeed behavior, and will be done at a later time. An alternative to introduction of a dedicated connection class was to simply lower the default connection window size. However, such change would impact all RPCs in a system, and it was deemed too risky at this point. Substantial benchmarking is needed before such change. Release Notes (performance improvement): Allow rangefeed streams to use separate http connection when `kv.rangefeed.use_dedicated_connection_class.enabled` setting is turned on. Using separate connection class reduces the possiblity of OOMs when running rangefeeds against very large tables. The connection window size for rangefeed can be adjusted via `COCKROACH_RANGEFEED_INITIAL_WINDOW_SIZE` environment variable, whose default is 128KB. --- .../kvclient/kvcoord/dist_sender_rangefeed.go | 18 +++++++++- pkg/rpc/connection_class.go | 7 ++-- pkg/rpc/context.go | 36 ++++++++++++++++--- 3 files changed, 53 insertions(+), 8 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index ab986b7beb6b..26688a01a9a8 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -22,6 +22,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -41,6 +43,13 @@ type singleRangeInfo struct { token rangecache.EvictionToken } +var useDedicatedRangefeedConnectionClass = settings.RegisterBoolSetting( + "kv.rangefeed.use_dedicated_connection_class.enabled", + "uses dedicated connection when running rangefeeds", + util.ConstantWithMetamorphicTestBool( + "kv.rangefeed.use_dedicated_connection_class.enabled", false), +) + // RangeFeed divides a RangeFeed request on range boundaries and establishes a // RangeFeed to each of the individual ranges. It streams back results on the // provided channel. @@ -368,7 +377,7 @@ func (ds *DistSender) singleRangeFeed( replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), latencyFn) // The RangeFeed is not used for system critical traffic so use a DefaultClass // connection regardless of the range. - opts := SendOptions{class: rpc.DefaultClass} + opts := SendOptions{class: connectionClass(&ds.st.SV)} transport, err := ds.transportFactory(opts, ds.nodeDialer, replicas) if err != nil { return args.Timestamp, err @@ -424,3 +433,10 @@ func (ds *DistSender) singleRangeFeed( } } } + +func connectionClass(sv *settings.Values) rpc.ConnectionClass { + if useDedicatedRangefeedConnectionClass.Get(sv) { + return rpc.RangefeedClass + } + return rpc.DefaultClass +} diff --git a/pkg/rpc/connection_class.go b/pkg/rpc/connection_class.go index 1e80edb1fa9d..2193708fc03f 100644 --- a/pkg/rpc/connection_class.go +++ b/pkg/rpc/connection_class.go @@ -36,6 +36,8 @@ const ( DefaultClass ConnectionClass = iota // SystemClass is the ConnectionClass used for system traffic. SystemClass + // RangefeedClass is the ConnectionClass used for rangefeeds. + RangefeedClass // NumConnectionClasses is the number of valid ConnectionClass values. NumConnectionClasses int = iota @@ -43,8 +45,9 @@ const ( // connectionClassName maps classes to their name. var connectionClassName = map[ConnectionClass]string{ - DefaultClass: "default", - SystemClass: "system", + DefaultClass: "default", + SystemClass: "system", + RangefeedClass: "rangefeed", } // String implements the fmt.Stringer interface. diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index e7bbe99b3f31..cb4c93405b88 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -65,9 +65,32 @@ const ( ) const ( - defaultWindowSize = 65535 - initialWindowSize = defaultWindowSize * 32 // for an RPC + defaultWindowSize = 65535 +) + +func getWindowSize(name string, c ConnectionClass, defaultSize int) int32 { + const maxWindowSize = defaultWindowSize * 32 + s := envutil.EnvOrDefaultInt(name, defaultSize) + if s > maxWindowSize { + log.Warningf(context.Background(), "%s value too large; trimmed to %d", name, maxWindowSize) + s = maxWindowSize + } + if s <= defaultWindowSize { + log.Warningf(context.Background(), + "%s RPC will use dynamic window sizes due to %s value lower than %d", c, name, defaultSize) + } + return int32(s) +} + +var ( + // for an RPC + initialWindowSize = getWindowSize( + "COCKROACH_RPC_INITIAL_WINDOW_SIZE", DefaultClass, defaultWindowSize*32) initialConnWindowSize = initialWindowSize * 16 // for a connection + + // for RangeFeed RPC + rangefeedInitialWindowSize = getWindowSize( + "COCKROACH_RANGEFEED_RPC_INITIAL_WINDOW_SIZE", RangefeedClass, 2*defaultWindowSize /* 128K */) ) // GRPC Dialer connection timeout. 20s matches default value that is @@ -1016,9 +1039,12 @@ func (ctx *Context) grpcDialRaw( Backoff: backoffConfig, MinConnectTimeout: minConnectionTimeout})) dialOpts = append(dialOpts, grpc.WithKeepaliveParams(clientKeepalive)) - dialOpts = append(dialOpts, - grpc.WithInitialWindowSize(initialWindowSize), - grpc.WithInitialConnWindowSize(initialConnWindowSize)) + dialOpts = append(dialOpts, grpc.WithInitialConnWindowSize(initialConnWindowSize)) + if class == RangefeedClass { + dialOpts = append(dialOpts, grpc.WithInitialWindowSize(rangefeedInitialWindowSize)) + } else { + dialOpts = append(dialOpts, grpc.WithInitialWindowSize(initialWindowSize)) + } dialer := onlyOnceDialer{ redialChan: make(chan struct{}),