Skip to content

Commit

Permalink
Merge #75581 #84683
Browse files Browse the repository at this point in the history
75581: kv: Introduce MuxRangeFeed bi-directional RPC.  r=miretskiy a=miretskiy

As demonstrated in #74219, running 
rangefeed over large enough table or perhaps not consuming rangefeed events quickly enough, 
may cause undesirable memory blow up, including up to OOMing the node.  Since rangefeeds t
end to run on (almost) every node, a possibility exists that a entire cluster might
get destabilized.

One of the reasons for this is that the memory used by low(er) level http2
RPC transport is not currently being tracked -- nor do we have a mechanism
to add such tracking.  #74456 
provided partial mitigation for this issue, where a single RangeFeed stream could 
be restricted to use less memory. Nonetheless, the possibility of excessive memory 
usage remains since the number of rangefeed streams in the system could be very large.

This PR introduce a new bi-directional streaming RPC called `MuxRangeFeed`.
In a uni-directional streaming RPC, the client establishes connection to the
KV server and requests to receive events for 1 span.  A separate RPC stream
is created for each span.

In contrast, `MuxRangeFeed` is bi-directional RPC: the client is expected to
connect to the kv server and request as many spans as it wishes to receive
from that server. The server multiplexes all events onto a single stream,
and the client de-multiplexes those events appropriately on the receiving
end.

Note: just like in a uni-directional implementation, each call to
`MuxRangeFeed` method (i.e. a logical rangefeed established by the client)
is treated independently.  That is, even though it is possible to multiplex
all logical rangefeed streams onto a single bi-directional stream to a
single KV node, this optimization is not currently implementation as it
raises questions about scheduling and fairness.  If we need to further
reduce the number of bi-directional streams in the future, from
`number_logical_feeds*number_of_nodes` down to the `number_of_nodes`, such
optimization can be added at a later time.

Multiplexing all of the spans hosted on a node onto a single bi-directional
stream reduces the number of such stream from the `number_of_ranges` down to
`number_of_nodes` (per logical range feed).  This, in turn, enables the
client to reserve the worst case amount of memory before starting the
rangefeed.  This amount of memory is bound by the number of nodes in the
cluster times per-stream maximum memory -- default is `2MB`, but this can be
changed via dedicated rangefeed connection class.  The server side of the
equation may also benefit from knowing how many rangefeeds are running right
now.  Even though the server is somewhat protected from memory blow up via
http2 pushback, we may want to also manage server side memory better.
Memory accounting for client (and possible server) will be added in the
follow on PRs.

The use of new RPC is contingent on the callers providing `WithMuxRangefeed`
option when starting the rangefeed.  However, an envornmnet variable "kill
switch" `COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED` exists to disable the use
of mux rangefeed in case of serious issues being discovered.

Release note (enterprise change): Introduce a new Rangefeed RPC called
`MuxRangeFeed`.  Rangefeeds now use a common HTTP/2 stream per client
for all range replicas on a node, instead of one per replica. This
significantly reduces the amount of network buffer memory usage, which could
cause nodes to run out of memory if a client was slow to consume events.
The caller may opt in to use new mechanism by specifying `WithMuxRangefeed`
option when starting the rangefeed.  However, a cluster wide
`COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED` environment variable may be set to
`false` to inhibit the use of this new RPC.

Release Justification: Rangefeed scalability and stability improvement.  Safe to merge
since the functionality disabled by default.

84683: obsservice: ingest events r=andreimatei a=andreimatei

This patch adds a couple of things:
1. an RPC endpoint to CRDB for streaming out events. This RPC service is
   called by the Obs Service.
2. a library in CRDB for exporting events.
3. code in the Obs Service for ingesting the events and writing them
   into a table in the sink cluster.

The first use of the event exporting is for the system.eventlog events.
All events written to that table are now also exported. Once the Obs
Service takes hold in the future, I hope we can remove system.eventlog.

The events are represented using OpenTelemetry protos. Unfortunately,
I've had to copy over the otel protos to our tree because I couldn't
figure out a vendoring solution. Problems encountered for vendoring are:
1. The repo where these protos live
(https://github.com/open-telemetry/opentelemetry-proto) is not
go-get-able. This means hacks are needed for our vendoring tools.
2. Some of the protos in that repo do not build with gogoproto (they
only build with protoc), because they use the new-ish "optional"
option on some fields. The logs protos that we use in this patch do not
have this problem, but others do (so we'll need to figure something out
in the future when dealing with the metrics proto).  FWIW, the
OpenTelemetry Collector ironically has the same problem (it uses
gogoproto), and it solved it through a sed that changes all the optional
fields to one-of's.
3. Even if you solve the first two problems, the next one is that we
already have a dependency on these compiled protos in our tree
(go.opentelemetry.io/proto/otlp). This repo contains generated code,
using protoc. We need it because of our use of the otlp trace exporter.
Bringing in the protos again, and building them with gogo, results in go
files that want to live in the same package/have the same import path.
So changing the import paths is needed.

Between all of these, I've given up - at least for the moment - and
decided to copy over to our tree the few protos that we actually need.
I'm also changing their import paths. You'll notice that there is a
script that codifies the process of transforming the needed protos from
their otel upstream.

Release note: None
Release justification: Non-production.

Co-authored-by: Yevgeniy Miretskiy <[email protected]>
Co-authored-by: Andrei Matei <[email protected]>
  • Loading branch information
3 people committed Aug 18, 2022
3 parents 0e340c7 + 34de5fb + bf3e9e4 commit 2183af9
Show file tree
Hide file tree
Showing 79 changed files with 3,939 additions and 492 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@
/pkg/util/admission/ @cockroachdb/kv-prs
/pkg/util/tracing @cockroachdb/obs-inf-prs
/pkg/workload/ @cockroachdb/sql-experience-noreview
/pkg/obs/ @cockroachdb/obs-inf-prs
/pkg/obsservice/ @cockroachdb/obs-inf-prs

# Own all bazel files to dev-inf, but don't request reviews for them
Expand Down
2 changes: 1 addition & 1 deletion build/bazelutil/check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pkg/kv/kvclient/rangecache/range_cache.go://go:generate mockgen -package=rangeca
pkg/kv/kvclient/rangefeed/rangefeed.go://go:generate mockgen -destination=mocks_generated_test.go --package=rangefeed . DB
pkg/kv/kvserver/concurrency/lock_table.go://go:generate ../../../util/interval/generic/gen.sh *lockState concurrency
pkg/kv/kvserver/spanlatch/manager.go://go:generate ../../../util/interval/generic/gen.sh *latch spanlatch
pkg/roachpb/api.go://go:generate mockgen -package=roachpbmock -destination=roachpbmock/mocks_generated.go . InternalClient,Internal_RangeFeedClient
pkg/roachpb/api.go://go:generate mockgen -package=roachpbmock -destination=roachpbmock/mocks_generated.go . InternalClient,Internal_RangeFeedClient,Internal_MuxRangeFeedClient
pkg/roachpb/batch.go://go:generate go run gen/main.go --filename batch_generated.go *.pb.go
pkg/security/certmgr/cert.go://go:generate mockgen -package=certmgr -destination=mocks_generated_test.go . Cert
pkg/security/securitytest/securitytest.go://go:generate go-bindata -mode 0600 -modtime 1400000000 -pkg securitytest -o embedded.go -ignore README.md -ignore regenerate.sh test_certs
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -291,4 +291,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 22.1-58 set the active cluster version in the format '<major>.<minor>'
version version 22.1-60 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,6 @@
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.span_registry.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://<ui>/#/debug/tracez</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>22.1-58</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>22.1-60</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ require (
github.com/google/pprof v0.0.0-20210827144239-02619b876842
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/google/skylark v0.0.0-20181101142754-a5f7082aabed
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/goware/modvendor v0.5.0
github.com/grpc-ecosystem/grpc-gateway v1.16.0
Expand Down Expand Up @@ -243,7 +244,6 @@ require (
github.com/golang-jwt/jwt/v4 v4.0.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/gax-go/v2 v2.4.0 // indirect
github.com/gorilla/handlers v1.5.1 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
Expand Down
14 changes: 14 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ ALL_TESTS = [
"//pkg/kv/kvserver/uncertainty:uncertainty_test",
"//pkg/kv/kvserver:kvserver_test",
"//pkg/kv:kv_test",
"//pkg/obsservice/obslib/ingest:ingest_test",
"//pkg/roachpb:roachpb_disallowed_imports_test",
"//pkg/roachpb:roachpb_test",
"//pkg/roachprod/cloud:cloud_test",
Expand Down Expand Up @@ -1160,10 +1161,17 @@ GO_TARGETS = [
"//pkg/kv:kv_test",
"//pkg/multitenant/tenantcostmodel:tenantcostmodel",
"//pkg/multitenant:multitenant",
"//pkg/obs:obs",
"//pkg/obsservice/cmd/obsservice:obsservice",
"//pkg/obsservice/cmd/obsservice:obsservice_lib",
"//pkg/obsservice/obslib/httpproxy:httpproxy",
"//pkg/obsservice/obslib/ingest:ingest",
"//pkg/obsservice/obslib/ingest:ingest_test",
"//pkg/obsservice/obslib/migrations:migrations",
"//pkg/obsservice/obspb/opentelemetry-proto/common/v1:common",
"//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:logs",
"//pkg/obsservice/obspb/opentelemetry-proto/resource/v1:resource",
"//pkg/obsservice/obspb:obspb",
"//pkg/release:release",
"//pkg/roachpb/gen:gen",
"//pkg/roachpb/gen:gen_lib",
Expand Down Expand Up @@ -2388,9 +2396,15 @@ GET_X_DATA_TARGETS = [
"//pkg/kv/kvserver/uncertainty:get_x_data",
"//pkg/multitenant:get_x_data",
"//pkg/multitenant/tenantcostmodel:get_x_data",
"//pkg/obs:get_x_data",
"//pkg/obsservice/cmd/obsservice:get_x_data",
"//pkg/obsservice/obslib/httpproxy:get_x_data",
"//pkg/obsservice/obslib/ingest:get_x_data",
"//pkg/obsservice/obslib/migrations:get_x_data",
"//pkg/obsservice/obspb:get_x_data",
"//pkg/obsservice/obspb/opentelemetry-proto/common/v1:get_x_data",
"//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:get_x_data",
"//pkg/obsservice/obspb/opentelemetry-proto/resource/v1:get_x_data",
"//pkg/release:get_x_data",
"//pkg/roachpb:get_x_data",
"//pkg/roachpb/gen:get_x_data",
Expand Down
1 change: 1 addition & 0 deletions pkg/base/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,5 @@ type TestingKnobs struct {
AdmissionControl ModuleTestingKnobs
UnusedIndexRecommendKnobs ModuleTestingKnobs
ExternalConnection ModuleTestingKnobs
EventExporter ModuleTestingKnobs
}
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ func (f rawEventFeed) run(
spans []kvcoord.SpanTimePair,
withDiff bool,
eventC chan<- kvcoord.RangeFeedMessage,
opts ...kvcoord.RangeFeedOption,
) error {
var startAfter hlc.Timestamp
for _, s := range spans {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type rangefeedFactory func(
spans []kvcoord.SpanTimePair,
withDiff bool,
eventC chan<- kvcoord.RangeFeedMessage,
opts ...kvcoord.RangeFeedOption,
) error

type rangefeed struct {
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/kvccl/kvtenantccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func (*mockServer) RangeFeed(*roachpb.RangeFeedRequest, roachpb.Internal_RangeFe
panic("unimplemented")
}

func (m *mockServer) MuxRangeFeed(server roachpb.Internal_MuxRangeFeedServer) error {
panic("implement me")
}

func (*mockServer) Join(
context.Context, *roachpb.JoinNodeRequest,
) (*roachpb.JoinNodeResponse, error) {
Expand Down
7 changes: 7 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,8 @@ const (
// WaitedForDelRangeInGCJob corresponds to the migration which waits for
// the GC jobs to adopt the use of DelRange with tombstones.
WaitedForDelRangeInGCJob
// RangefeedUseOneStreamPerNode changes rangefeed implementation to use 1 RPC stream per node.
RangefeedUseOneStreamPerNode

// *************************************************
// Step (1): Add new versions here.
Expand Down Expand Up @@ -530,6 +532,11 @@ var versionsSingleton = keyedVersions{
Key: WaitedForDelRangeInGCJob,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 58},
},
{
Key: RangefeedUseOneStreamPerNode,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 60},
},

// *************************************************
// Step (2): Add new versions here.
// Do not add new versions to a patch release.
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/gen/protobuf.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ PROTOBUF_SRCS = [
"//pkg/kv/kvserver/protectedts/ptstorage:ptstorage_go_proto",
"//pkg/kv/kvserver/readsummary/rspb:rspb_go_proto",
"//pkg/kv/kvserver:kvserver_go_proto",
"//pkg/obsservice/obspb/opentelemetry-proto/common/v1:v1_go_proto",
"//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:v1_go_proto",
"//pkg/obsservice/obspb/opentelemetry-proto/resource/v1:v1_go_proto",
"//pkg/obsservice/obspb:obs_go_proto",
"//pkg/roachpb:roachpb_go_proto",
"//pkg/rpc:rpc_go_proto",
"//pkg/server/diagnostics/diagnosticspb:diagnosticspb_go_proto",
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"batch.go",
"condensable_span_set.go",
"dist_sender.go",
"dist_sender_mux_rangefeed.go",
"dist_sender_rangefeed.go",
"doc.go",
"local_test_cluster_util.go",
Expand Down Expand Up @@ -39,6 +40,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/clusterversion",
"//pkg/gossip",
"//pkg/keys",
"//pkg/kv",
Expand Down Expand Up @@ -111,6 +113,7 @@ go_test(
srcs = [
"batch_test.go",
"condensable_span_set_test.go",
"dist_sender_rangefeed_mock_test.go",
"dist_sender_rangefeed_test.go",
"dist_sender_server_test.go",
"dist_sender_test.go",
Expand Down Expand Up @@ -143,6 +146,7 @@ go_test(
deps = [
"//build/bazelutil:noop",
"//pkg/base",
"//pkg/clusterversion",
"//pkg/config",
"//pkg/config/zonepb",
"//pkg/gossip",
Expand All @@ -166,6 +170,7 @@ go_test(
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/storage",
Expand All @@ -175,9 +180,11 @@ go_test(
"//pkg/testutils/localtestcluster",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/caller",
"//pkg/util/ctxgroup",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/leaktest",
Expand Down
Loading

0 comments on commit 2183af9

Please sign in to comment.