Skip to content

Commit

Permalink
kv: Introduce MuxRangeFeed bi-directional RPC.
Browse files Browse the repository at this point in the history
As demonstrated in cockroachdb#74219, running rangefeed over large enough table
or perhaps not consuming rangefeed events quickly enough,
may cause undesirable memory blow up, including up to OOMing the node.
Since rangefeeds tend to run on (almost) every node, a possiblity exists
that a entire cluster might get distabilized.

One of the reasons for this is that the memory used by low(er) level http2
RPC transport is not currently being tracked -- nor do we have a mechanism
to add such tracking.  cockroachdb#74456 provided partial mitigation for this issue,
where a single RangeFeed stream could be restricted to use less memory.
Nonetheless, the possibility of excessive memory usage remains since the number
of rangefeed streams in the system could be very large.

This PR introduce a new bi-directional streaming RPC called `MuxRangeFeed`.
In a uni-directonal streaming RPC, the client estabilishes connection to the KV
server and requests to receive events for 1 span.  A separate RPC stream is created
for each span.

In contrast, `MuxRangeFeed` is bi-directional RPC: the client
is expected to connect to the kv server and request as many spans as it wishes to
receive from that server. The server multiplexes all events onto a single
stream, and the client de-multiplexes those events appropriately on the receiving
end.

Note: just like in a uni-directional implementation, each call to
`MuxRangeFeed` method (i.e. a logical rangefeed established by the client)
is treated independently.  That is, even though it is possible to
multiplex all logical rangefeed streams onto a single bi-directional stream to a
single KV node, this optimization is not currently implementation as it raises
questions about scheduling and fairness.  If we need to further reduce the number
of bi-directional streams in the future, from `number_logical_feeds*number_of_nodes`
down to the `number_of_nodes`, such optimization can be added at a later time.

Multiplexing all of the spans hosted on a node onto a single bi-directional stream
reduces the number of such stream from the `number_of_ranges` down to `number_of_nodes`
(per logical range feed).  This, in turn, enables the client to reserve the worst case
amount of memory before starting the rangefeed.  This amount of memory is bound by
the number of nodes in the cluster times per-stream maximum memory -- default is `2MB`,
but this can be changed via dedicated rangefeed connection class.
The server side of the equation may also benefit from knowing how many rangefeeds
are running right now.  Even though the server is somewhat protected from memory
blow up via http2 pushback, we may want to also manage server side memory better.
Memory accounting for client (and possible server) will be added in the follow on PRs.

The use of new RPC is contingent on the callers providing
`WithMuxRangefeed` option when starting the rangefeed.
However, an envornmnet variable "kill switch" `COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED`
exists to disable the use of mux rangefeed in case of serious issues
being discovered.

Release Notes (enterprise change) Rangefeeds (which are used internally e.g. by changefeeds)
now use a common HTTP/2 stream per client for all range replicas on a node,
instead of one per replica. This significantly reduces the amount of network buffer
memory usage, which could cause nodes to run out of memory if a client was slow to consume events.
The rangefeed implementation can be revered back to its old implementation
via restarting the cluster with the `COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED`
environment variable set to false.

Release note (enterprise change): Introduce a new Rangefeed RPC called
`MuxRangeFeed`.  Rangefeeds may now use a common HTTP/2 stream per client
for all range replicas on a node, instead of one per replica. This significantly
reduces the amount of network buffer memory usage, which could cause nodes
 to run out of memory if a client was slow to consume events.
The caller may opt in to use new mechanism by specifying
`WithMuxRangefeed` option when starting the rangefeed.  However, a cluster wide
`COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED` environment variable may be set to
`false` to inhibit the use of this new RPC.
  • Loading branch information
Yevgeniy Miretskiy committed Aug 3, 2022
1 parent dda6460 commit 5c53a46
Show file tree
Hide file tree
Showing 30 changed files with 1,357 additions and 158 deletions.
2 changes: 1 addition & 1 deletion build/bazelutil/check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pkg/kv/kvclient/rangecache/range_cache.go://go:generate mockgen -package=rangeca
pkg/kv/kvclient/rangefeed/rangefeed.go://go:generate mockgen -destination=mocks_generated_test.go --package=rangefeed . DB
pkg/kv/kvserver/concurrency/lock_table.go://go:generate ../../../util/interval/generic/gen.sh *lockState concurrency
pkg/kv/kvserver/spanlatch/manager.go://go:generate ../../../util/interval/generic/gen.sh *latch spanlatch
pkg/roachpb/api.go://go:generate mockgen -package=roachpbmock -destination=roachpbmock/mocks_generated.go . InternalClient,Internal_RangeFeedClient
pkg/roachpb/api.go://go:generate mockgen -package=roachpbmock -destination=roachpbmock/mocks_generated.go . InternalClient,Internal_RangeFeedClient,Internal_MuxRangeFeedClient
pkg/roachpb/batch.go://go:generate go run gen/main.go --filename batch_generated.go *.pb.go
pkg/security/certmgr/cert.go://go:generate mockgen -package=certmgr -destination=mocks_generated_test.go . Cert
pkg/security/securitytest/securitytest.go://go:generate go-bindata -mode 0600 -modtime 1400000000 -pkg securitytest -o embedded.go -ignore README.md -ignore regenerate.sh test_certs
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -283,4 +283,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 22.1-32 set the active cluster version in the format '<major>.<minor>'
version version 22.1-34 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,6 @@
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.span_registry.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://<ui>/#/debug/tracez</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>22.1-32</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>22.1-34</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ func (f rawEventFeed) run(
spans []kvcoord.SpanTimePair,
withDiff bool,
eventC chan<- *roachpb.RangeFeedEvent,
opts ...kvcoord.RangeFeedOption,
) error {
var startAfter hlc.Timestamp
for _, s := range spans {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type rangefeedFactory func(
spans []kvcoord.SpanTimePair,
withDiff bool,
eventC chan<- *roachpb.RangeFeedEvent,
opts ...kvcoord.RangeFeedOption,
) error

type rangefeed struct {
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/kvccl/kvtenantccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func (*mockServer) RangeFeed(*roachpb.RangeFeedRequest, roachpb.Internal_RangeFe
panic("unimplemented")
}

func (m *mockServer) MuxRangeFeed(server roachpb.Internal_MuxRangeFeedServer) error {
panic("implement me")
}

func (*mockServer) Join(
context.Context, *roachpb.JoinNodeRequest,
) (*roachpb.JoinNodeResponse, error) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ const (
// AlterSystemStatementStatisticsAddIndexRecommendations adds an
// index_recommendations column to the system.statement_statistics table.
AlterSystemStatementStatisticsAddIndexRecommendations
// RangefeedUseOneStreamPerNode changes rangefeed implementation to use 1 RPC stream per node.
RangefeedUseOneStreamPerNode

// *************************************************
// Step (1): Add new versions here.
Expand Down Expand Up @@ -564,6 +566,10 @@ var versionsSingleton = keyedVersions{
Key: AlterSystemStatementStatisticsAddIndexRecommendations,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 32},
},
{
Key: RangefeedUseOneStreamPerNode,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 34},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"batch.go",
"condensable_span_set.go",
"dist_sender.go",
"dist_sender_mux_rangefeed.go",
"dist_sender_rangefeed.go",
"doc.go",
"local_test_cluster_util.go",
Expand Down Expand Up @@ -38,6 +39,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/clusterversion",
"//pkg/gossip",
"//pkg/keys",
"//pkg/kv",
Expand Down Expand Up @@ -110,6 +112,7 @@ go_test(
srcs = [
"batch_test.go",
"condensable_span_set_test.go",
"dist_sender_rangefeed_mock_test.go",
"dist_sender_rangefeed_test.go",
"dist_sender_server_test.go",
"dist_sender_test.go",
Expand Down Expand Up @@ -142,6 +145,7 @@ go_test(
deps = [
"//build/bazelutil:noop",
"//pkg/base",
"//pkg/clusterversion",
"//pkg/config",
"//pkg/config/zonepb",
"//pkg/gossip",
Expand All @@ -165,6 +169,7 @@ go_test(
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/storage",
Expand All @@ -174,9 +179,11 @@ go_test(
"//pkg/testutils/localtestcluster",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/caller",
"//pkg/util/ctxgroup",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/leaktest",
Expand Down
Loading

0 comments on commit 5c53a46

Please sign in to comment.