Skip to content

Commit

Permalink
kv: Introduce MuxRangeFeed bi-directional RPC.
Browse files Browse the repository at this point in the history
As demonstrated in #74219, running rangefeed over large enough table
or perhaps not consuming rangefeed events quickly enough,
may cause undesirable memory blow up, including up to OOMing the node.
Since rangefeeds tend to run on (almost) every node, a possiblity exists
that a entire cluster might get distabilized.

One of the reasons for this is that the memory used by low(er) level http2
RPC transport is not currently being tracked -- nor do we have a mechanism
to add such tracking.  #74456 provided partial mitigation for this issue,
where a single RangeFeed stream could be restricted to use less memory.
Nonetheless, the possibility of excessive memory usage remains since the number
of rangefeed streams in the system could be very large.

This PR introduce a new bi-directional streaming RPC called `MuxRangeFeed`.
In a uni-directonal streaming RPC, the client estabilishes connection to the KV
server and requests to receive events for 1 span.  A separate RPC stream is created
for each span.

In contrast, `MuxRangeFeed` is bi-directional RPC: the client
is expected to connect to the kv server and request as many spans as it wishes to
receive from that server. The server multiplexes all events onto a single
stream, and the client de-multiplexes those events appropriately on the receiving
end.

Note: just like in a uni-directional implementation, each call to
`MuxRangeFeed` method (i.e. a logical rangefeed established by the client)
is treated independently.  That is, even though it is possible to
multiplex all logical rangefeed streams onto a single bi-directional stream to a
single KV node, this optimization is not currently implementation as it raises
questions about scheduling and fairness.  If we need to further reduce the number
of bi-directional streams in the future, from `number_logical_feeds*number_of_nodes`
down to the `number_of_nodes`, such optimization can be added at a later time.

Multiplexing all of the spans hosted on a node onto a single bi-directional stream
reduces the number of such stream from the `number_of_ranges` down to `number_of_nodes`
(per logical range feed).  This, in turn, enables the client to reserve the worst case
amount of memory before starting the rangefeed.  This amount of memory is bound by
the number of nodes in the cluster times per-stream maximum memory -- default is `2MB`,
but this can be changed via dedicated rangefeed connection class.
The server side of the equation may also benefit from knowing how many rangefeeds
are running right now.  Even though the server is somewhat protected from memory
blow up via http2 pushback, we may want to also manage server side memory better.
Memory accounting for client (and possible server) will be added in the follow on PRs.

The new RPC is turned on by default since the intention is to retire the uni-directional
rangefeed during later release.  However, this default maybe disable (and rangefeed
reverted to use old implementation) by setting `COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED`
environment variable to `false` and restarting the cluster.

Release Notes (enterprise change) Rangefeeds (which are used internally e.g. by changefeeds)
now use a common HTTP/2 stream per client for all range replicas on a node,
instead of one per replica. This significantly reduces the amount of network buffer
memory usage, which could cause nodes to run out of memory if a client was slow to consume events.
The rangefeed implementation can be revered back to its old implementation
via restarting the cluster with the `COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED`
environment variable set to false.
  • Loading branch information
Yevgeniy Miretskiy committed Jan 28, 2022
1 parent bc63116 commit c435159
Show file tree
Hide file tree
Showing 23 changed files with 1,624 additions and 259 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen
trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 21.2-50 set the active cluster version in the format '<major>.<minor>'
version version 21.2-52 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,6 @@
<tr><td><code>trace.jaeger.agent</code></td><td>string</td><td><code></code></td><td>the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.</td></tr>
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-50</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-52</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
6 changes: 6 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ const (
// EnableProtectedTimestampsForTenant enables the use of protected timestamps
// in secondary tenants.
EnableProtectedTimestampsForTenant
// Rangefeed use 1 stream per node
RangefeedUseOneStreamPerNode

// *************************************************
// Step (1): Add new versions here.
Expand Down Expand Up @@ -387,6 +389,10 @@ var versionsSingleton = keyedVersions{
Key: EnableProtectedTimestampsForTenant,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 50},
},
{
Key: RangefeedUseOneStreamPerNode,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 52},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"batch.go",
"condensable_span_set.go",
"dist_sender.go",
"dist_sender_mux_rangefeed.go",
"dist_sender_rangefeed.go",
"doc.go",
"local_test_cluster_util.go",
Expand Down Expand Up @@ -36,6 +37,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/clusterversion",
"//pkg/gossip",
"//pkg/keys",
"//pkg/kv",
Expand Down Expand Up @@ -112,6 +114,7 @@ go_test(
srcs = [
"batch_test.go",
"condensable_span_set_test.go",
"dist_sender_rangefeed_mock_test.go",
"dist_sender_rangefeed_test.go",
"dist_sender_server_test.go",
"dist_sender_test.go",
Expand Down Expand Up @@ -143,6 +146,7 @@ go_test(
deps = [
"//build/bazelutil:noop",
"//pkg/base",
"//pkg/clusterversion",
"//pkg/config",
"//pkg/config/zonepb",
"//pkg/gossip",
Expand All @@ -164,6 +168,7 @@ go_test(
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/storage",
Expand All @@ -174,9 +179,11 @@ go_test(
"//pkg/testutils/localtestcluster",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/caller",
"//pkg/util/ctxgroup",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/leaktest",
Expand Down
Loading

0 comments on commit c435159

Please sign in to comment.