Skip to content

Commit

Permalink
Merge #74222
Browse files Browse the repository at this point in the history
74222: kv,rpc: Introduce dedicated rangefeed connection class.  r=miretskiy a=miretskiy

Rangefeeds executing against very large tables may experience
OOMs under certain conditions. The underlying reason for this is that
cockroach uses 2MB per gRPC stream buffer size to improve system throughput
(particularly under high latency scenarios).  However, because the rangefeed
against a large table establishes a dedicated stream for each range,
the possiblity of an OOM exist if there are many streams from which events
are not consumed quickly enough.

This PR does two things.

First, it introduces a dedicated RPC connection class for use with rangefeeds.

The rangefeed connnection class configures rangefeed streams to use less memory
per stream than the default connection class.  The initial window size for rangefeed
connection class can be adjusted by updating `COCKROACH_RANGEFEED_RPC_INITIAL_WINDOW_SIZE`
environment variable whose default is 128KB.

For rangefeeds to use this new connection class,
a `kv.rangefeed.use_dedicated_connection_class.enabled` setting must be turned on.

Another change in this PR is that the default RPC window size can be adjusted
via `COCKROACH_RPC_INITIAL_WINDOW_SIZE` environment variable.  The default for this
variable is kept at 2MB.

Changing the values of either of those variables is an advanced operation and should
not be taken lightly since changes to those variables impact all aspects of cockroach
performance characteristics.  Values larger than 2MB will be trimmed to 2MB.
Setting either of those to 64KB or below will turn on "dynamic window" gRPC window sizes.

It should be noted that this change is a partial mitigation, and not a complete fix.
A full fix will require rework of rangefeed behavior, and will be done at a later time.

An alternative to introduction of a dedicated connection class was to simply
lower the default connection window size.  However, such change would impact
all RPCs in a system, and it was deemed too risky at this point.
Substantial benchmarking is needed before such change.

Informs #74219

Release Notes (performance improvement): Allow rangefeed streams to use separate
http connection when `kv.rangefeed.use_dedicated_connection_class.enabled` setting
is turned on.  Using separate connection class reduces the possiblity of OOMs when
running rangefeeds against very large tables.  The connection window size
for rangefeed can be adjusted via `COCKROACH_RANGEFEED_INITIAL_WINDOW_SIZE` environment
variable, whose default is 128KB.


Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
craig[bot] and Yevgeniy Miretskiy committed Jan 5, 2022
2 parents e7fcfb9 + 89a1139 commit fa91c71
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 13 deletions.
13 changes: 8 additions & 5 deletions pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func getSink(

switch {
case u.Scheme == changefeedbase.SinkSchemeNull:
return makeNullSink(sinkURL{URL: u})
return makeNullSink(sinkURL{URL: u}, m)
case u.Scheme == changefeedbase.SinkSchemeKafka:
return validateOptionsAndMakeSink(changefeedbase.KafkaValidOptions, func() (Sink, error) {
return makeKafkaSink(ctx, sinkURL{URL: u}, feedCfg.Targets, feedCfg.Opts, m)
Expand Down Expand Up @@ -371,12 +371,13 @@ func (s *bufferSink) Dial() error {
}

type nullSink struct {
ticker *time.Ticker
ticker *time.Ticker
metrics *sliMetrics
}

var _ Sink = (*nullSink)(nil)

func makeNullSink(u sinkURL) (Sink, error) {
func makeNullSink(u sinkURL, m *sliMetrics) (Sink, error) {
var pacer *time.Ticker
if delay := u.consumeParam(`delay`); delay != "" {
pace, err := time.ParseDuration(delay)
Expand All @@ -385,7 +386,7 @@ func makeNullSink(u sinkURL) (Sink, error) {
}
pacer = time.NewTicker(pace)
}
return &nullSink{ticker: pacer}, nil
return &nullSink{ticker: pacer, metrics: m}, nil
}

func (n *nullSink) pace(ctx context.Context) error {
Expand All @@ -409,7 +410,7 @@ func (n *nullSink) EmitRow(
r kvevent.Alloc,
) error {
defer r.Release(ctx)

defer n.metrics.recordEmittedMessages()(1, mvcc, len(key)+len(value), sinkDoesNotCompress)
if err := n.pace(ctx); err != nil {
return err
}
Expand All @@ -423,6 +424,7 @@ func (n *nullSink) EmitRow(
func (n *nullSink) EmitResolvedTimestamp(
ctx context.Context, encoder Encoder, resolved hlc.Timestamp,
) error {
defer n.metrics.recordResolvedCallback()()
if err := n.pace(ctx); err != nil {
return err
}
Expand All @@ -435,6 +437,7 @@ func (n *nullSink) EmitResolvedTimestamp(

// Flush implements Sink interface.
func (n *nullSink) Flush(ctx context.Context) error {
defer n.metrics.recordFlushRequestCallback()()
if log.V(2) {
log.Info(ctx, "flushing")
}
Expand Down
19 changes: 18 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand All @@ -41,6 +43,14 @@ type singleRangeInfo struct {
token rangecache.EvictionToken
}

var useDedicatedRangefeedConnectionClass = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.rangefeed.use_dedicated_connection_class.enabled",
"uses dedicated connection when running rangefeeds",
util.ConstantWithMetamorphicTestBool(
"kv.rangefeed.use_dedicated_connection_class.enabled", false),
)

// RangeFeed divides a RangeFeed request on range boundaries and establishes a
// RangeFeed to each of the individual ranges. It streams back results on the
// provided channel.
Expand Down Expand Up @@ -370,7 +380,7 @@ func (ds *DistSender) singleRangeFeed(
replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), latencyFn)
// The RangeFeed is not used for system critical traffic so use a DefaultClass
// connection regardless of the range.
opts := SendOptions{class: rpc.DefaultClass}
opts := SendOptions{class: connectionClass(&ds.st.SV)}
transport, err := ds.transportFactory(opts, ds.nodeDialer, replicas)
if err != nil {
return args.Timestamp, err
Expand Down Expand Up @@ -426,3 +436,10 @@ func (ds *DistSender) singleRangeFeed(
}
}
}

func connectionClass(sv *settings.Values) rpc.ConnectionClass {
if useDedicatedRangefeedConnectionClass.Get(sv) {
return rpc.RangefeedClass
}
return rpc.DefaultClass
}
7 changes: 5 additions & 2 deletions pkg/rpc/connection_class.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,18 @@ const (
DefaultClass ConnectionClass = iota
// SystemClass is the ConnectionClass used for system traffic.
SystemClass
// RangefeedClass is the ConnectionClass used for rangefeeds.
RangefeedClass

// NumConnectionClasses is the number of valid ConnectionClass values.
NumConnectionClasses int = iota
)

// connectionClassName maps classes to their name.
var connectionClassName = map[ConnectionClass]string{
DefaultClass: "default",
SystemClass: "system",
DefaultClass: "default",
SystemClass: "system",
RangefeedClass: "rangefeed",
}

// String implements the fmt.Stringer interface.
Expand Down
36 changes: 31 additions & 5 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,32 @@ const (
)

const (
defaultWindowSize = 65535
initialWindowSize = defaultWindowSize * 32 // for an RPC
defaultWindowSize = 65535
)

func getWindowSize(name string, c ConnectionClass, defaultSize int) int32 {
const maxWindowSize = defaultWindowSize * 32
s := envutil.EnvOrDefaultInt(name, defaultSize)
if s > maxWindowSize {
log.Warningf(context.Background(), "%s value too large; trimmed to %d", name, maxWindowSize)
s = maxWindowSize
}
if s <= defaultWindowSize {
log.Warningf(context.Background(),
"%s RPC will use dynamic window sizes due to %s value lower than %d", c, name, defaultSize)
}
return int32(s)
}

var (
// for an RPC
initialWindowSize = getWindowSize(
"COCKROACH_RPC_INITIAL_WINDOW_SIZE", DefaultClass, defaultWindowSize*32)
initialConnWindowSize = initialWindowSize * 16 // for a connection

// for RangeFeed RPC
rangefeedInitialWindowSize = getWindowSize(
"COCKROACH_RANGEFEED_RPC_INITIAL_WINDOW_SIZE", RangefeedClass, 2*defaultWindowSize /* 128K */)
)

// GRPC Dialer connection timeout. 20s matches default value that is
Expand Down Expand Up @@ -1079,9 +1102,12 @@ func (rpcCtx *Context) grpcDialRaw(
Backoff: backoffConfig,
MinConnectTimeout: minConnectionTimeout}))
dialOpts = append(dialOpts, grpc.WithKeepaliveParams(clientKeepalive))
dialOpts = append(dialOpts,
grpc.WithInitialWindowSize(initialWindowSize),
grpc.WithInitialConnWindowSize(initialConnWindowSize))
dialOpts = append(dialOpts, grpc.WithInitialConnWindowSize(initialConnWindowSize))
if class == RangefeedClass {
dialOpts = append(dialOpts, grpc.WithInitialWindowSize(rangefeedInitialWindowSize))
} else {
dialOpts = append(dialOpts, grpc.WithInitialWindowSize(initialWindowSize))
}

dialer := onlyOnceDialer{
redialChan: make(chan struct{}),
Expand Down

0 comments on commit fa91c71

Please sign in to comment.