From 0f9194924fb8804aa4c7d277836dafa83c78c4bc Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Tue, 25 Jan 2022 07:52:45 -0500 Subject: [PATCH] kv: Introduce MuxRangeFeed bi-directional RPC. As demonstrated in #74219, running rangefeed over large enough table or perhaps not consuming rangefeed events quickly enough, may cause undesirable memory blow up, including up to OOMing the node. Since rangefeeds tend to run on (almost) every node, a possiblity exists that a entire cluster might get destabilized. One of the reasons for this is that the memory used by low(er) level http2 RPC transport is not currently being tracked -- nor do we have a mechanism to add such tracking. #74456 provided partial mitigation for this issue, where a single RangeFeed stream could be restricted to use less memory. Nonetheless, the possibility of excessive memory usage remains since the number of rangefeed streams in the system could be very large. This PR introduce a new bi-directional streaming RPC called `MuxRangeFeed`. In a uni-directional streaming RPC, the client establishes connection to the KV server and requests to receive events for 1 span. A separate RPC stream is created for each span. In contrast, `MuxRangeFeed` is bi-directional RPC: the client is expected to connect to the kv server and request as many spans as it wishes to receive from that server. The server multiplexes all events onto a single stream, and the client de-multiplexes those events appropriately on the receiving end. Note: just like in a uni-directional implementation, each call to `MuxRangeFeed` method (i.e. a logical rangefeed established by the client) is treated independently. That is, even though it is possible to multiplex all logical rangefeed streams onto a single bi-directional stream to a single KV node, this optimization is not currently implementation as it raises questions about scheduling and fairness. If we need to further reduce the number of bi-directional streams in the future, from `number_logical_feeds*number_of_nodes` down to the `number_of_nodes`, such optimization can be added at a later time. Multiplexing all of the spans hosted on a node onto a single bi-directional stream reduces the number of such stream from the `number_of_ranges` down to `number_of_nodes` (per logical range feed). This, in turn, enables the client to reserve the worst case amount of memory before starting the rangefeed. This amount of memory is bound by the number of nodes in the cluster times per-stream maximum memory -- default is `2MB`, but this can be changed via dedicated rangefeed connection class. The server side of the equation may also benefit from knowing how many rangefeeds are running right now. Even though the server is somewhat protected from memory blow up via http2 pushback, we may want to also manage server side memory better. Memory accounting for client (and possible server) will be added in the follow on PRs. The use of new RPC is contingent on the callers providing `WithMuxRangefeed` option when starting the rangefeed. However, an envornmnet variable "kill switch" `COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED` exists to disable the use of mux rangefeed in case of serious issues being discovered. Release note (enterprise change): Introduce a new Rangefeed RPC called `MuxRangeFeed`. Rangefeeds now use a common HTTP/2 stream per client for all range replicas on a node, instead of one per replica. This significantly reduces the amount of network buffer memory usage, which could cause nodes to run out of memory if a client was slow to consume events. The caller may opt in to use new mechanism by specifying `WithMuxRangefeed` option when starting the rangefeed. However, a cluster wide `COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED` environment variable may be set to `false` to inhibit the use of this new RPC. Release justification: Rangefeed scalability and stability improvement. Safe to merge since the functionality disabled by default. --- build/bazelutil/check.sh | 2 +- .../settings/settings-for-tenants.txt | 2 +- docs/generated/settings/settings.html | 2 +- pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go | 1 + .../changefeedccl/kvfeed/physical_kv_feed.go | 1 + pkg/ccl/kvccl/kvtenantccl/connector_test.go | 4 + pkg/clusterversion/cockroach_versions.go | 7 + pkg/clusterversion/key_string.go | 5 +- pkg/kv/kvclient/kvcoord/BUILD.bazel | 7 + .../kvcoord/dist_sender_mux_rangefeed.go | 288 ++++++++++++ .../kvclient/kvcoord/dist_sender_rangefeed.go | 91 +++- .../dist_sender_rangefeed_mock_test.go | 182 ++++++++ .../kvcoord/dist_sender_rangefeed_test.go | 439 +++++++++++++----- pkg/kv/kvclient/kvcoord/send_test.go | 4 + pkg/kv/kvclient/kvcoord/transport_test.go | 6 + pkg/kv/kvserver/kvserverbase/base.go | 2 +- pkg/kv/kvserver/replica_rangefeed.go | 6 +- pkg/kv/kvserver/store.go | 2 +- pkg/kv/kvserver/stores.go | 2 +- pkg/roachpb/api.go | 17 +- pkg/roachpb/api.proto | 14 + pkg/roachpb/roachpbmock/BUILD.bazel | 1 + pkg/roachpb/roachpbmock/mocks_generated.go | 159 ++++++- pkg/rpc/auth_tenant.go | 3 +- pkg/rpc/context.go | 280 +++++++++-- pkg/rpc/context_test.go | 206 +++++--- pkg/rpc/nodedialer/nodedialer_test.go | 4 + pkg/rpc/restricted_internal_client.go | 1 + pkg/server/BUILD.bazel | 1 + pkg/server/node.go | 79 ++++ 30 files changed, 1557 insertions(+), 261 deletions(-) create mode 100644 pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go create mode 100644 pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go diff --git a/build/bazelutil/check.sh b/build/bazelutil/check.sh index 49cd804695dd..886986eeb7fc 100755 --- a/build/bazelutil/check.sh +++ b/build/bazelutil/check.sh @@ -22,7 +22,7 @@ pkg/kv/kvclient/rangecache/range_cache.go://go:generate mockgen -package=rangeca pkg/kv/kvclient/rangefeed/rangefeed.go://go:generate mockgen -destination=mocks_generated_test.go --package=rangefeed . DB pkg/kv/kvserver/concurrency/lock_table.go://go:generate ../../../util/interval/generic/gen.sh *lockState concurrency pkg/kv/kvserver/spanlatch/manager.go://go:generate ../../../util/interval/generic/gen.sh *latch spanlatch -pkg/roachpb/api.go://go:generate mockgen -package=roachpbmock -destination=roachpbmock/mocks_generated.go . InternalClient,Internal_RangeFeedClient +pkg/roachpb/api.go://go:generate mockgen -package=roachpbmock -destination=roachpbmock/mocks_generated.go . InternalClient,Internal_RangeFeedClient,Internal_MuxRangeFeedClient pkg/roachpb/batch.go://go:generate go run gen/main.go --filename batch_generated.go *.pb.go pkg/security/certmgr/cert.go://go:generate mockgen -package=certmgr -destination=mocks_generated_test.go . Cert pkg/security/securitytest/securitytest.go://go:generate go-bindata -mode 0600 -modtime 1400000000 -pkg securitytest -o embedded.go -ignore README.md -ignore regenerate.sh test_certs diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 69fe43009f8c..e407aec68a8a 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -290,4 +290,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https:///#/debug/tracez trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 22.1-58 set the active cluster version in the format '.' +version version 22.1-60 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 9aefdc75ff19..0e6f4205ea66 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -221,6 +221,6 @@ trace.opentelemetry.collectorstringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.span_registry.enabledbooleantrueif set, ongoing traces can be seen at https:///#/debug/tracez trace.zipkin.collectorstringthe address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -versionversion22.1-58set the active cluster version in the format '.' +versionversion22.1-60set the active cluster version in the format '.' diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go index f99318e85075..1cbb341a5805 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go @@ -392,6 +392,7 @@ func (f rawEventFeed) run( spans []kvcoord.SpanTimePair, withDiff bool, eventC chan<- kvcoord.RangeFeedMessage, + opts ...kvcoord.RangeFeedOption, ) error { var startAfter hlc.Timestamp for _, s := range spans { diff --git a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go index 0ba41d5b7a5c..9b5daab3eef7 100644 --- a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go @@ -38,6 +38,7 @@ type rangefeedFactory func( spans []kvcoord.SpanTimePair, withDiff bool, eventC chan<- kvcoord.RangeFeedMessage, + opts ...kvcoord.RangeFeedOption, ) error type rangefeed struct { diff --git a/pkg/ccl/kvccl/kvtenantccl/connector_test.go b/pkg/ccl/kvccl/kvtenantccl/connector_test.go index 672bd70e920b..4212f1d44a19 100644 --- a/pkg/ccl/kvccl/kvtenantccl/connector_test.go +++ b/pkg/ccl/kvccl/kvtenantccl/connector_test.go @@ -90,6 +90,10 @@ func (*mockServer) RangeFeed(*roachpb.RangeFeedRequest, roachpb.Internal_RangeFe panic("unimplemented") } +func (m *mockServer) MuxRangeFeed(server roachpb.Internal_MuxRangeFeedServer) error { + panic("implement me") +} + func (*mockServer) Join( context.Context, *roachpb.JoinNodeRequest, ) (*roachpb.JoinNodeResponse, error) { diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 7bafd825f66d..a977e5800d59 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -316,6 +316,8 @@ const ( // WaitedForDelRangeInGCJob corresponds to the migration which waits for // the GC jobs to adopt the use of DelRange with tombstones. WaitedForDelRangeInGCJob + // RangefeedUseOneStreamPerNode changes rangefeed implementation to use 1 RPC stream per node. + RangefeedUseOneStreamPerNode // ************************************************* // Step (1): Add new versions here. @@ -530,6 +532,11 @@ var versionsSingleton = keyedVersions{ Key: WaitedForDelRangeInGCJob, Version: roachpb.Version{Major: 22, Minor: 1, Internal: 58}, }, + { + Key: RangefeedUseOneStreamPerNode, + Version: roachpb.Version{Major: 22, Minor: 1, Internal: 60}, + }, + // ************************************************* // Step (2): Add new versions here. // Do not add new versions to a patch release. diff --git a/pkg/clusterversion/key_string.go b/pkg/clusterversion/key_string.go index 0ca285cfc113..01cd9fb62302 100644 --- a/pkg/clusterversion/key_string.go +++ b/pkg/clusterversion/key_string.go @@ -52,11 +52,12 @@ func _() { _ = x[SetRoleOptionsUserIDColumnNotNull-41] _ = x[UseDelRangeInGCJob-42] _ = x[WaitedForDelRangeInGCJob-43] + _ = x[RangefeedUseOneStreamPerNode-44] } -const _Key_name = "V21_2Start22_1ProbeRequestPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreEnablePebbleFormatVersionBlockPropertiesEnableLeaseHolderRemovalChangefeedIdlenessEnableNewStoreRebalancerClusterLocksVirtualTableAutoStatsTableSettingsEnableNewChangefeedOptionsV22_1Start22_2LocalTimestampsPebbleFormatSplitUserKeysMarkedCompactedEnsurePebbleFormatVersionRangeKeysEnablePebbleFormatVersionRangeKeysTrigramInvertedIndexesRemoveGrantPrivilegeMVCCRangeTombstonesUpgradeSequenceToBeReferencedByIDSampledStmtDiagReqsAddSSTableTombstonesSystemPrivilegesTableEnablePredicateProjectionChangefeedAlterSystemSQLInstancesAddLocalitySystemExternalConnectionsTableAlterSystemStatementStatisticsAddIndexRecommendationsRoleIDSequenceAddSystemUserIDColumnSystemUsersIDColumnIsBackfilledSetSystemUsersUserIDColumnNotNullSQLSchemaTelemetryScheduledJobsSchemaChangeSupportsCreateFunctionDeleteRequestReturnKeyPebbleFormatPrePebblev1MarkedRoleOptionsTableHasIDColumnRoleOptionsIDColumnIsBackfilledSetRoleOptionsUserIDColumnNotNullUseDelRangeInGCJobWaitedForDelRangeInGCJob" +const _Key_name = "V21_2Start22_1ProbeRequestPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreEnablePebbleFormatVersionBlockPropertiesEnableLeaseHolderRemovalChangefeedIdlenessEnableNewStoreRebalancerClusterLocksVirtualTableAutoStatsTableSettingsEnableNewChangefeedOptionsV22_1Start22_2LocalTimestampsPebbleFormatSplitUserKeysMarkedCompactedEnsurePebbleFormatVersionRangeKeysEnablePebbleFormatVersionRangeKeysTrigramInvertedIndexesRemoveGrantPrivilegeMVCCRangeTombstonesUpgradeSequenceToBeReferencedByIDSampledStmtDiagReqsAddSSTableTombstonesSystemPrivilegesTableEnablePredicateProjectionChangefeedAlterSystemSQLInstancesAddLocalitySystemExternalConnectionsTableAlterSystemStatementStatisticsAddIndexRecommendationsRoleIDSequenceAddSystemUserIDColumnSystemUsersIDColumnIsBackfilledSetSystemUsersUserIDColumnNotNullSQLSchemaTelemetryScheduledJobsSchemaChangeSupportsCreateFunctionDeleteRequestReturnKeyPebbleFormatPrePebblev1MarkedRoleOptionsTableHasIDColumnRoleOptionsIDColumnIsBackfilledSetRoleOptionsUserIDColumnNotNullUseDelRangeInGCJobWaitedForDelRangeInGCJobRangefeedUseOneStreamPerNode" -var _Key_index = [...]uint16{0, 5, 14, 26, 54, 84, 112, 133, 173, 197, 215, 239, 263, 285, 311, 316, 325, 340, 380, 414, 448, 470, 490, 509, 542, 561, 581, 602, 637, 671, 701, 754, 768, 789, 820, 853, 884, 918, 940, 969, 996, 1027, 1060, 1078, 1102} +var _Key_index = [...]uint16{0, 5, 14, 26, 54, 84, 112, 133, 173, 197, 215, 239, 263, 285, 311, 316, 325, 340, 380, 414, 448, 470, 490, 509, 542, 561, 581, 602, 637, 671, 701, 754, 768, 789, 820, 853, 884, 918, 940, 969, 996, 1027, 1060, 1078, 1102, 1130} func (i Key) String() string { if i < 0 || i >= Key(len(_Key_index)-1) { diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index 8db3465220c2..84f8c486de26 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "batch.go", "condensable_span_set.go", "dist_sender.go", + "dist_sender_mux_rangefeed.go", "dist_sender_rangefeed.go", "doc.go", "local_test_cluster_util.go", @@ -39,6 +40,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/base", + "//pkg/clusterversion", "//pkg/gossip", "//pkg/keys", "//pkg/kv", @@ -111,6 +113,7 @@ go_test( srcs = [ "batch_test.go", "condensable_span_set_test.go", + "dist_sender_rangefeed_mock_test.go", "dist_sender_rangefeed_test.go", "dist_sender_server_test.go", "dist_sender_test.go", @@ -143,6 +146,7 @@ go_test( deps = [ "//build/bazelutil:noop", "//pkg/base", + "//pkg/clusterversion", "//pkg/config", "//pkg/config/zonepb", "//pkg/gossip", @@ -166,6 +170,7 @@ go_test( "//pkg/security/securitytest", "//pkg/server", "//pkg/settings/cluster", + "//pkg/sql/catalog/desctestutils", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", "//pkg/storage", @@ -175,9 +180,11 @@ go_test( "//pkg/testutils/localtestcluster", "//pkg/testutils/serverutils", "//pkg/testutils/skip", + "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util", "//pkg/util/caller", + "//pkg/util/ctxgroup", "//pkg/util/grpcutil", "//pkg/util/hlc", "//pkg/util/leaktest", diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go new file mode 100644 index 000000000000..e74f78d9b38e --- /dev/null +++ b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go @@ -0,0 +1,288 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvcoord + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" +) + +// rangefeedMuxer is responsible for coordination and management of mux +// rangefeeds. rangefeedMuxer caches MuxRangeFeed stream per node, and executes +// each range feed request on an appropriate node. +type rangefeedMuxer struct { + // eventCh receives events from all active muxStreams. + eventCh chan *roachpb.MuxRangeFeedEvent + + // Context group controlling execution of MuxRangeFeed calls. + g ctxgroup.Group + + // State pertaining to actively executing MuxRangeFeeds. + mu struct { + syncutil.Mutex + + // terminalErr set when a terminal error occurs. + // Subsequent calls to startMuxRangeFeed will return this error. + terminalErr error + + // Each call to start new range feed gets a unique ID which is echoed back + // by MuxRangeFeed rpc. This is done as a safety mechanism to make sure + // that we always send the event to the correct consumer -- even if the + // range feed is terminated and re-established rapidly. + nextStreamID int64 + + // muxClient contains a nodeID->MuxRangeFeedClient. + muxClients map[roachpb.NodeID]*muxClientState + // producers maps streamID to the event producer -- data sent back to the + // consumer of range feed events. + producers map[int64]*channelRangeFeedEventProducer + } +} + +type muxClientState struct { + client roachpb.Internal_MuxRangeFeedClient + streams util.FastIntSet + cancel context.CancelFunc +} + +func newRangefeedMuxer(g ctxgroup.Group) *rangefeedMuxer { + m := &rangefeedMuxer{ + eventCh: make(chan *roachpb.MuxRangeFeedEvent), + g: g, + } + m.mu.muxClients = make(map[roachpb.NodeID]*muxClientState) + m.mu.producers = make(map[int64]*channelRangeFeedEventProducer) + + m.g.GoCtx(m.demuxLoop) + + return m +} + +// channelRangeFeedEventProducer is a rangeFeedEventProducer which receives +// events on input channel, and returns events when Recv is called. +type channelRangeFeedEventProducer struct { + ctx context.Context + termErrCh chan struct{} // Signalled to propagate terminal error the consumer. + termErr error // Set when terminal error occurs. + eventCh chan *roachpb.RangeFeedEvent +} + +// Recv implements rangeFeedEventProducer interface. +func (c *channelRangeFeedEventProducer) Recv() (*roachpb.RangeFeedEvent, error) { + select { + case <-c.ctx.Done(): + return nil, c.ctx.Err() + case <-c.termErrCh: + return nil, c.termErr + case e := <-c.eventCh: + return e, nil + } +} + +var _ roachpb.RangeFeedEventProducer = (*channelRangeFeedEventProducer)(nil) + +// startMuxRangeFeed begins the execution of rangefeed for the specified +// RangeFeedRequest. +// The passed in client is only needed to establish MuxRangeFeed RPC. +func (m *rangefeedMuxer) startMuxRangeFeed( + ctx context.Context, client rpc.RestrictedInternalClient, req *roachpb.RangeFeedRequest, +) (_ roachpb.RangeFeedEventProducer, cleanup func(), _ error) { + producer, rpcClient, streamID, cleanup, err := m.connect(ctx, client, req.Replica.NodeID) + if err != nil { + return nil, cleanup, err + } + req.StreamID = streamID + if err := rpcClient.Send(req); err != nil { + return nil, cleanup, err + } + return producer, cleanup, nil +} + +// connect establishes MuxRangeFeed connection for the specified node, re-using +// the existing one if one exists. Returns event producer, RPC client to send +// requests on, the streamID which should be used when sending request, and a +// cleanup function. Cleanup function never nil, and must always be invoked, +// even if error is returned. +func (m *rangefeedMuxer) connect( + ctx context.Context, client rpc.RestrictedInternalClient, nodeID roachpb.NodeID, +) ( + _ roachpb.RangeFeedEventProducer, + _ roachpb.Internal_MuxRangeFeedClient, + streamID int64, + cleanup func(), + _ error, +) { + m.mu.Lock() + defer m.mu.Unlock() + + streamID = m.mu.nextStreamID + m.mu.nextStreamID++ + + cleanup = func() { + m.mu.Lock() + defer m.mu.Unlock() + + delete(m.mu.producers, streamID) + // Cleanup mux state if it exists; it may be nil if this function exits + // early (for example if MuxRangeFeed call fails, before muxState + // initialized). + muxState := m.mu.muxClients[nodeID] + if muxState != nil { + muxState.streams.Remove(int(streamID)) + if muxState.streams.Len() == 0 { + // This node no longer has any active streams. + // Delete node from the muxClient list, and gracefully + // shutdown consumer go routine. + delete(m.mu.muxClients, nodeID) + muxState.cancel() + } + } + } + + if m.mu.terminalErr != nil { + return nil, nil, streamID, cleanup, m.mu.terminalErr + } + + var found bool + ms, found := m.mu.muxClients[nodeID] + + if !found { + ctx, cancel := context.WithCancel(ctx) + stream, err := client.MuxRangeFeed(ctx) + if err != nil { + cancel() + return nil, nil, streamID, cleanup, err + } + + ms = &muxClientState{client: stream, cancel: cancel} + m.mu.muxClients[nodeID] = ms + m.g.GoCtx(func(ctx context.Context) error { + defer cancel() + return m.receiveEventsFromNode(ctx, nodeID, stream) + }) + } + + // Start RangeFeed for this request. + ms.streams.Add(int(streamID)) + + producer := &channelRangeFeedEventProducer{ + ctx: ctx, + termErrCh: make(chan struct{}), + eventCh: make(chan *roachpb.RangeFeedEvent), + } + m.mu.producers[streamID] = producer + return producer, ms.client, streamID, cleanup, nil +} + +// demuxLoop de-multiplexes events and sends them to appropriate rangefeed event +// consumer. +func (m *rangefeedMuxer) demuxLoop(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case e := <-m.eventCh: + m.mu.Lock() + producer, found := m.mu.producers[e.StreamID] + m.mu.Unlock() + + if !found { + return m.shutdownWithError(errors.AssertionFailedf( + "expected to find consumer for streamID=%d", e.StreamID), + ) + } + + select { + case <-ctx.Done(): + return ctx.Err() + case producer.eventCh <- &e.RangeFeedEvent: + } + } + } +} + +// receiveEventsFromNode receives mux rangefeed events, and forwards them to the +// consumer channel. +func (m *rangefeedMuxer) receiveEventsFromNode( + ctx context.Context, nodeID roachpb.NodeID, stream roachpb.Internal_MuxRangeFeedClient, +) error { + for { + event, streamErr := stream.Recv() + + if streamErr != nil { + m.propagateStreamTerminationErrorToConsumers(nodeID, streamErr) + // Since the stream error is handled above, we return nil to gracefully shut down + // this go routine. + return nil //nolint:returnerrcheck + } + + select { + case <-ctx.Done(): + return ctx.Err() + case m.eventCh <- event: + } + } +} + +// propagateStreamTerminationErrorToConsumers called when mux stream running on +// a node encountered an error. All consumers will receive the stream +// termination error and will handle it appropriately. +func (m *rangefeedMuxer) propagateStreamTerminationErrorToConsumers( + nodeID roachpb.NodeID, streamErr error, +) { + // Grab muxStream associated with the node, and clear it out. + m.mu.Lock() + defer m.mu.Unlock() + + ms, streamFound := m.mu.muxClients[nodeID] + delete(m.mu.muxClients, nodeID) + // Note: it's okay if the stream is not found; this can happen if the + // nodeID was already removed from muxClients because we're trying to gracefully + // shutdown receiveEventsFromNode go routine. + if streamFound { + ms.streams.ForEach(func(streamID int) { + p := m.mu.producers[int64(streamID)] + delete(m.mu.producers, int64(streamID)) + if p != nil { + p.termErr = streamErr + close(p.termErrCh) + } + }) + } + +} + +// shutdownWithError terminates this rangefeedMuxer with a terminal error +// (usually an assertion failure). It's a bit unwieldy way to propagate this +// error to the caller, but as soon as one of consumers notices this terminal +// error, the context should be cancelled. +// Returns the terminal error passed in. +func (m *rangefeedMuxer) shutdownWithError(terminalErr error) error { + // Okay to run this under the lock since as soon as a consumer sees this terminal + // error, the whole rangefeed should terminate. + m.mu.Lock() + defer m.mu.Unlock() + + m.mu.terminalErr = terminalErr + for id, p := range m.mu.producers { + p.termErr = terminalErr + close(p.termErrCh) + delete(m.mu.producers, id) + } + + return terminalErr +} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 3f09a3334424..8ea19348e216 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -19,6 +19,7 @@ import ( "time" "unsafe" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -26,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/iterutil" @@ -69,6 +71,29 @@ func maxConcurrentCatchupScans(sv *settings.Values) int { return int(l) } +type rangeFeedConfig struct { + useMuxRangeFeed bool +} + +// RangeFeedOption configures a RangeFeed. +type RangeFeedOption interface { + set(*rangeFeedConfig) +} + +type optionFunc func(*rangeFeedConfig) + +func (o optionFunc) set(c *rangeFeedConfig) { o(c) } + +// WithMuxRangeFeed configures range feed to use MuxRangeFeed RPC. +func WithMuxRangeFeed() RangeFeedOption { + return optionFunc(func(c *rangeFeedConfig) { + c.useMuxRangeFeed = true + }) +} + +// A "kill switch" to disable multiplexing rangefeed if severe issues discovered with new implementation. +var enableMuxRangeFeed = envutil.EnvOrDefaultBool("COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED", true) + // RangeFeed divides a RangeFeed request on range boundaries and establishes a // RangeFeed to each of the individual ranges. It streams back results on the // provided channel. @@ -88,6 +113,7 @@ func (ds *DistSender) RangeFeed( startAfter hlc.Timestamp, // exclusive withDiff bool, eventCh chan<- RangeFeedMessage, + opts ...RangeFeedOption, ) error { timedSpans := make([]SpanTimePair, 0, len(spans)) for _, sp := range spans { @@ -96,7 +122,7 @@ func (ds *DistSender) RangeFeed( StartAfter: startAfter, }) } - return ds.RangeFeedSpans(ctx, timedSpans, withDiff, eventCh) + return ds.RangeFeedSpans(ctx, timedSpans, withDiff, eventCh, opts...) } // SpanTimePair is a pair of span along with its starting time. The starting @@ -110,12 +136,21 @@ type SpanTimePair struct { // RangeFeedSpans is similar to RangeFeed but allows specification of different // starting time for each span. func (ds *DistSender) RangeFeedSpans( - ctx context.Context, spans []SpanTimePair, withDiff bool, eventCh chan<- RangeFeedMessage, + ctx context.Context, + spans []SpanTimePair, + withDiff bool, + eventCh chan<- RangeFeedMessage, + opts ...RangeFeedOption, ) error { if len(spans) == 0 { return errors.AssertionFailedf("expected at least 1 span, got none") } + var cfg rangeFeedConfig + for _, opt := range opts { + opt.set(&cfg) + } + ctx = ds.AnnotateCtx(ctx) ctx, sp := tracing.EnsureChildSpan(ctx, ds.AmbientContext.Tracer, "dist sender") defer sp.Finish() @@ -128,6 +163,16 @@ func (ds *DistSender) RangeFeedSpans( "distSenderCatchupLimit", maxConcurrentCatchupScans(&ds.st.SV)) g := ctxgroup.WithContext(ctx) + + var eventProducer rangeFeedEventProducerFactory + if ds.st.Version.IsActive(ctx, clusterversion.RangefeedUseOneStreamPerNode) && + enableMuxRangeFeed && cfg.useMuxRangeFeed { + m := newRangefeedMuxer(g) + eventProducer = m.startMuxRangeFeed + } else { + eventProducer = legacyRangeFeedEventProducer + } + // Goroutine that processes subdivided ranges and creates a rangefeed for // each. rangeCh := make(chan singleRangeInfo, 16) @@ -137,7 +182,8 @@ func (ds *DistSender) RangeFeedSpans( case sri := <-rangeCh: // Spawn a child goroutine to process this feed. g.GoCtx(func(ctx context.Context) error { - return ds.partialRangeFeed(ctx, rr, sri.rs, sri.startAfter, sri.token, withDiff, &catchupSem, rangeCh, eventCh) + return ds.partialRangeFeed(ctx, rr, eventProducer, sri.rs, sri.startAfter, + sri.token, withDiff, &catchupSem, rangeCh, eventCh) }) case <-ctx.Done(): return ctx.Err() @@ -293,6 +339,7 @@ func (ds *DistSender) divideAndSendRangeFeedToRanges( func (ds *DistSender) partialRangeFeed( ctx context.Context, rr *rangeFeedRegistry, + streamProducerFactory rangeFeedEventProducerFactory, rs roachpb.RSpan, startAfter hlc.Timestamp, token rangecache.EvictionToken, @@ -334,8 +381,9 @@ func (ds *DistSender) partialRangeFeed( } // Establish a RangeFeed for a single Range. - maxTS, err := ds.singleRangeFeed(ctx, span, startAfter, withDiff, token.Desc(), - catchupSem, eventCh, active.onRangeEvent) + maxTS, err := ds.singleRangeFeed( + ctx, span, startAfter, withDiff, token.Desc(), + catchupSem, eventCh, streamProducerFactory, active.onRangeEvent) // Forward the timestamp in case we end up sending it again. startAfter.Forward(maxTS) @@ -408,6 +456,7 @@ func (ds *DistSender) singleRangeFeed( desc *roachpb.RangeDescriptor, catchupSem *limit.ConcurrentRequestLimiter, eventCh chan<- RangeFeedMessage, + streamProducerFactory rangeFeedEventProducerFactory, onRangeEvent onRangeEventCb, ) (hlc.Timestamp, error) { // Ensure context is cancelled on all errors, to prevent gRPC stream leaks. @@ -432,8 +481,6 @@ func (ds *DistSender) singleRangeFeed( return args.Timestamp, err } replicas.OptimizeReplicaOrder(ds.getNodeID(), latencyFn, ds.locality) - // The RangeFeed is not used for system critical traffic so use a DefaultClass - // connection regardless of the range. opts := SendOptions{class: connectionClass(&ds.st.SV)} transport, err := ds.transportFactory(opts, ds.nodeDialer, replicas) if err != nil { @@ -459,11 +506,21 @@ func (ds *DistSender) singleRangeFeed( // cleanup catchup reservation in case of early termination. defer finishCatchupScan() + var streamCleanup func() + maybeCleanupStream := func() { + if streamCleanup != nil { + streamCleanup() + streamCleanup = nil + } + } + defer maybeCleanupStream() + for { if transport.IsExhausted() { return args.Timestamp, newSendError( fmt.Sprintf("sending to all %d replicas failed", len(replicas))) } + maybeCleanupStream() args.Replica = transport.NextReplica() client, err := transport.NextInternalClient(ctx) @@ -473,7 +530,9 @@ func (ds *DistSender) singleRangeFeed( } log.VEventf(ctx, 3, "attempting to create a RangeFeed over replica %s", args.Replica) - stream, err := client.RangeFeed(ctx, &args) + + var stream roachpb.RangeFeedEventProducer + stream, streamCleanup, err = streamProducerFactory(ctx, client, &args) if err != nil { log.VErrEventf(ctx, 2, "RPC error: %s", err) if grpcutil.IsAuthError(err) { @@ -526,3 +585,19 @@ func connectionClass(sv *settings.Values) rpc.ConnectionClass { } return rpc.DefaultClass } + +type rangeFeedEventProducerFactory func( + ctx context.Context, + client rpc.RestrictedInternalClient, + req *roachpb.RangeFeedRequest, +) (roachpb.RangeFeedEventProducer, func(), error) + +// legacyRangeFeedEventProducer is a rangeFeedEventProducerFactory using +// legacy RangeFeed RPC. +func legacyRangeFeedEventProducer( + ctx context.Context, client rpc.RestrictedInternalClient, req *roachpb.RangeFeedRequest, +) (producer roachpb.RangeFeedEventProducer, cleanup func(), err error) { + cleanup = func() {} + producer, err = client.RangeFeed(ctx, req) + return producer, cleanup, err +} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go new file mode 100644 index 000000000000..9d311da46b95 --- /dev/null +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go @@ -0,0 +1,182 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvcoord + +import ( + "context" + "fmt" + "io" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/gossip" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache/rangecachemock" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/roachpb/roachpbmock" + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/stop" + gomock "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" +) + +// TestingSetEnableMuxRangeFeed adjusts enable rangefeed env variable +// for testing. +func TestingSetEnableMuxRangeFeed(enabled bool) func() { + old := enableMuxRangeFeed + enableMuxRangeFeed = enabled + return func() { + enableMuxRangeFeed = old + } +} + +// Tests that the range feed handles transport errors appropriately. In +// particular, that when encountering other decommissioned nodes it will refresh +// its range descriptor and retry, but if this node is decommissioned it will +// bail out. Regression test for: +// https://github.com/cockroachdb/cockroach/issues/66636 +func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + for _, useMuxRangeFeed := range []bool{false, true} { + for _, spec := range []struct { + errorCode codes.Code + expectRetry bool + }{ + {codes.FailedPrecondition, true}, // target node is decommissioned; retry + {codes.PermissionDenied, false}, // this node is decommissioned; abort + {codes.Unauthenticated, false}, // this node is not part of cluster; abort + } { + t.Run(fmt.Sprintf("mux=%t/%s", useMuxRangeFeed, spec.errorCode), + func(t *testing.T) { + clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) + g := makeGossip(t, stopper, rpcContext) + + desc := roachpb.RangeDescriptor{ + RangeID: 1, + Generation: 1, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + }, + } + for _, repl := range desc.InternalReplicas { + require.NoError(t, g.AddInfoProto( + gossip.MakeNodeIDKey(repl.NodeID), + newNodeDesc(repl.NodeID), + gossip.NodeDescriptorTTL, + )) + } + + ctrl := gomock.NewController(t) + transport := NewMockTransport(ctrl) + rangeDB := rangecachemock.NewMockRangeDescriptorDB(ctrl) + + // We start off with a cached lease on r1. + cachedLease := roachpb.Lease{ + Replica: desc.InternalReplicas[0], + Sequence: 1, + } + + // All nodes return the specified error code. We expect the range feed to + // keep trying all replicas in sequence regardless of error. + for _, repl := range desc.InternalReplicas { + transport.EXPECT().IsExhausted().Return(false) + transport.EXPECT().NextReplica().Return(repl) + transport.EXPECT().NextInternalClient(gomock.Any()).Return( + nil, grpcstatus.Error(spec.errorCode, "")) + } + transport.EXPECT().IsExhausted().Return(true) + transport.EXPECT().Release() + + // Once all replicas have failed, it should try to refresh the lease using + // the range cache. We let this succeed once. + rangeDB.EXPECT().FirstRange().Return(&desc, nil) + + // It then tries the replicas again. This time we just report the + // transport as exhausted immediately. + transport.EXPECT().IsExhausted().Return(true) + transport.EXPECT().Release() + + // This invalidates the cache yet again. This time we error. + rangeDB.EXPECT().FirstRange().Return(nil, grpcstatus.Error(spec.errorCode, "")) + + // If we expect a range lookup retry, allow the retry to succeed by + // returning a range descriptor and a client that immediately + // cancels the context and closes the range feed stream. + if spec.expectRetry { + rangeDB.EXPECT().FirstRange().Return(&desc, nil) + client := roachpbmock.NewMockInternalClient(ctrl) + + if useMuxRangeFeed { + stream := roachpbmock.NewMockInternal_MuxRangeFeedClient(ctrl) + stream.EXPECT().Send(gomock.Any()).Return(nil) + stream.EXPECT().Recv().Do(cancel).Return(nil, io.EOF) + client.EXPECT().MuxRangeFeed(gomock.Any()).Return(stream, nil) + } else { + stream := roachpbmock.NewMockInternal_RangeFeedClient(ctrl) + stream.EXPECT().Recv().Do(cancel).Return(nil, io.EOF) + client.EXPECT().RangeFeed(gomock.Any(), gomock.Any()).Return(stream, nil) + } + + transport.EXPECT().IsExhausted().Return(false) + transport.EXPECT().NextReplica().Return(desc.InternalReplicas[0]) + transport.EXPECT().NextInternalClient(gomock.Any()).Return(client, nil) + transport.EXPECT().Release() + } + + ds := NewDistSender(DistSenderConfig{ + AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), + Clock: clock, + NodeDescs: g, + RPCRetryOptions: &retry.Options{MaxRetries: 10}, + RPCContext: rpcContext, + TestingKnobs: ClientTestingKnobs{ + TransportFactory: func(SendOptions, *nodedialer.Dialer, ReplicaSlice) (Transport, error) { + return transport, nil + }, + }, + RangeDescriptorDB: rangeDB, + NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), + Settings: cluster.MakeTestingClusterSettings(), + }) + ds.rangeCache.Insert(ctx, roachpb.RangeInfo{ + Desc: desc, + Lease: cachedLease, + }) + + var opts []RangeFeedOption + if useMuxRangeFeed { + opts = append(opts, WithMuxRangeFeed()) + } + err := ds.RangeFeed(ctx, []roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}, hlc.Timestamp{}, + false, nil, opts...) + require.Error(t, err) + }) + } + } +} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go index f3fefa0c403c..1cfe6a93e2eb 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go @@ -8,147 +8,356 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package kvcoord +package kvcoord_test import ( "context" - "io" "testing" "time" - "github.com/cockroachdb/cockroach/pkg/gossip" + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache/rangecachemock" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/roachpb/roachpbmock" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/retry" - "github.com/cockroachdb/cockroach/pkg/util/stop" - gomock "github.com/golang/mock/gomock" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" - "google.golang.org/grpc/codes" - grpcstatus "google.golang.org/grpc/status" + "google.golang.org/grpc" ) -// Tests that the range feed handles transport errors appropriately. In -// particular, that when encountering other decommissioned nodes it will refresh -// its range descriptor and retry, but if this node is decommissioned it will -// bail out. Regression test for: -// https://github.com/cockroachdb/cockroach/issues/66636 -func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) +type testRangefeedClient struct { + rpc.RestrictedInternalClient + muxRangeFeedEnabled bool + count func() +} - for _, spec := range []struct { - errorCode codes.Code - expectRetry bool - }{ - {codes.FailedPrecondition, true}, // target node is decommissioned; retry - {codes.PermissionDenied, false}, // this node is decommissioned; abort - {codes.Unauthenticated, false}, // this node is not part of cluster; abort - } { - t.Run(spec.errorCode.String(), func(t *testing.T) { - clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) - g := makeGossip(t, stopper, rpcContext) - - desc := roachpb.RangeDescriptor{ - RangeID: 1, - Generation: 1, - StartKey: roachpb.RKeyMin, - EndKey: roachpb.RKeyMax, - InternalReplicas: []roachpb.ReplicaDescriptor{ - {NodeID: 1, StoreID: 1, ReplicaID: 1}, - {NodeID: 2, StoreID: 2, ReplicaID: 2}, - }, - } - for _, repl := range desc.InternalReplicas { - require.NoError(t, g.AddInfoProto( - gossip.MakeNodeIDKey(repl.NodeID), - newNodeDesc(repl.NodeID), - gossip.NodeDescriptorTTL, - )) - } +func (c *testRangefeedClient) RangeFeed( + ctx context.Context, args *roachpb.RangeFeedRequest, opts ...grpc.CallOption, +) (roachpb.Internal_RangeFeedClient, error) { + defer c.count() - ctrl := gomock.NewController(t) - transport := NewMockTransport(ctrl) - rangeDB := rangecachemock.NewMockRangeDescriptorDB(ctrl) + if c.muxRangeFeedEnabled && ctx.Value(useMuxRangeFeedCtxKey{}) != nil { + panic(errors.AssertionFailedf("unexpected call to RangeFeed")) + } + return c.RestrictedInternalClient.RangeFeed(ctx, args, opts...) +} - // We start off with a cached lease on r1. - cachedLease := roachpb.Lease{ - Replica: desc.InternalReplicas[0], - Sequence: 1, - } +func (c *testRangefeedClient) MuxRangeFeed( + ctx context.Context, opts ...grpc.CallOption, +) (roachpb.Internal_MuxRangeFeedClient, error) { + defer c.count() + + if !c.muxRangeFeedEnabled || ctx.Value(useMuxRangeFeedCtxKey{}) == nil { + panic(errors.AssertionFailedf("unexpected call to MuxRangeFeed")) + } + return c.RestrictedInternalClient.MuxRangeFeed(ctx, opts...) +} + +type internalClientCounts struct { + syncutil.Mutex + counts map[rpc.RestrictedInternalClient]int +} + +func (c *internalClientCounts) Inc(ic rpc.RestrictedInternalClient) { + if c == nil { + return + } + c.Lock() + defer c.Unlock() + c.counts[ic]++ +} + +type countConnectionsTransport struct { + wrapped kvcoord.Transport + counts *internalClientCounts + rfStreamEnabled bool +} + +var _ kvcoord.Transport = (*countConnectionsTransport)(nil) + +func (c *countConnectionsTransport) IsExhausted() bool { + return c.wrapped.IsExhausted() +} + +func (c *countConnectionsTransport) SendNext( + ctx context.Context, request roachpb.BatchRequest, +) (*roachpb.BatchResponse, error) { + return c.wrapped.SendNext(ctx, request) +} + +type testFeedCtxKey struct{} +type useMuxRangeFeedCtxKey struct{} + +func (c *countConnectionsTransport) NextInternalClient( + ctx context.Context, +) (rpc.RestrictedInternalClient, error) { + client, err := c.wrapped.NextInternalClient(ctx) + if err != nil { + return nil, err + } + + tc := &testRangefeedClient{ + RestrictedInternalClient: client, + muxRangeFeedEnabled: c.rfStreamEnabled, + } + // Count rangefeed calls but only for feeds started by this test. + if ctx.Value(testFeedCtxKey{}) != nil { + tc.count = func() { + c.counts.Inc(tc) + } + } else { + tc.count = func() {} + } + + return tc, nil +} + +func (c *countConnectionsTransport) NextReplica() roachpb.ReplicaDescriptor { + return c.wrapped.NextReplica() +} + +func (c *countConnectionsTransport) SkipReplica() { + c.wrapped.SkipReplica() +} + +func (c *countConnectionsTransport) MoveToFront(descriptor roachpb.ReplicaDescriptor) bool { + return c.wrapped.MoveToFront(descriptor) +} + +func (c *countConnectionsTransport) Release() { + c.wrapped.Release() +} + +func makeTransportFactory( + rfStreamEnabled bool, counts *internalClientCounts, +) kvcoord.TransportFactory { + return func( + options kvcoord.SendOptions, + dialer *nodedialer.Dialer, + slice kvcoord.ReplicaSlice, + ) (kvcoord.Transport, error) { + transport, err := kvcoord.GRPCTransportFactory(options, dialer, slice) + if err != nil { + return nil, err + } + countingTransport := &countConnectionsTransport{ + wrapped: transport, + rfStreamEnabled: rfStreamEnabled, + counts: counts, + } + return countingTransport, nil + } +} + +// rangeFeed is a helper to execute rangefeed. We are not using rangefeed library +// here because of circular dependencies. +func rangeFeed( + dsI interface{}, + sp roachpb.Span, + startFrom hlc.Timestamp, + onValue func(event kvcoord.RangeFeedMessage), + useMuxRangeFeed bool, +) func() { + ds := dsI.(*kvcoord.DistSender) + events := make(chan kvcoord.RangeFeedMessage) + ctx, cancel := context.WithCancel(context.WithValue(context.Background(), testFeedCtxKey{}, struct{}{})) - // All nodes return the specified error code. We expect the range feed to - // keep trying all replicas in sequence regardless of error. - for _, repl := range desc.InternalReplicas { - transport.EXPECT().IsExhausted().Return(false) - transport.EXPECT().NextReplica().Return(repl) - transport.EXPECT().NextInternalClient(gomock.Any()).Return( - nil, grpcstatus.Error(spec.errorCode, "")) + g := ctxgroup.WithContext(ctx) + g.GoCtx(func(ctx context.Context) (err error) { + var opts []kvcoord.RangeFeedOption + if useMuxRangeFeed { + opts = append(opts, kvcoord.WithMuxRangeFeed()) + ctx = context.WithValue(ctx, useMuxRangeFeedCtxKey{}, struct{}{}) + } + return ds.RangeFeed(ctx, []roachpb.Span{sp}, startFrom, false, events, opts...) + }) + g.GoCtx(func(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case ev := <-events: + onValue(ev) } - transport.EXPECT().IsExhausted().Return(true) - transport.EXPECT().Release() - - // Once all replicas have failed, it should try to refresh the lease using - // the range cache. We let this succeed once. - rangeDB.EXPECT().FirstRange().Return(&desc, nil) - - // It then tries the replicas again. This time we just report the - // transport as exhausted immediately. - transport.EXPECT().IsExhausted().Return(true) - transport.EXPECT().Release() - - // This invalidates the cache yet again. This time we error. - rangeDB.EXPECT().FirstRange().Return(nil, grpcstatus.Error(spec.errorCode, "")) - - // If we expect a range lookup retry, allow the retry to succeed by - // returning a range descriptor and a client that immediately - // cancels the context and closes the range feed stream. - if spec.expectRetry { - rangeDB.EXPECT().FirstRange().Return(&desc, nil) - stream := roachpbmock.NewMockInternal_RangeFeedClient(ctrl) - stream.EXPECT().Recv().Do(cancel).Return(nil, io.EOF) - client := roachpbmock.NewMockInternalClient(ctrl) - client.EXPECT().RangeFeed(gomock.Any(), gomock.Any()).Return(stream, nil) - transport.EXPECT().IsExhausted().Return(false) - transport.EXPECT().NextReplica().Return(desc.InternalReplicas[0]) - transport.EXPECT().NextInternalClient(gomock.Any()).Return(client, nil) - transport.EXPECT().Release() + } + }) + + return func() { + cancel() + _ = g.Wait() + } +} + +// observeNValues returns on value handler which expects to see N rangefeed values, +// along with the channel which gets closed when requisite count of events has been seen. +func observeNValues(n int) (chan struct{}, func(ev kvcoord.RangeFeedMessage)) { + var count = struct { + syncutil.Mutex + c int + }{} + allSeen := make(chan struct{}) + return allSeen, func(ev kvcoord.RangeFeedMessage) { + if ev.Val != nil { + count.Lock() + defer count.Unlock() + count.c++ + log.Infof(context.Background(), "Waiting N values: saw %d, want %d; current=%s", count.c, n, ev.Val.Key) + if count.c == n { + close(allSeen) } + } + } +} - ds := NewDistSender(DistSenderConfig{ - AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), - Clock: clock, - NodeDescs: g, - RPCRetryOptions: &retry.Options{MaxRetries: 10}, - RPCContext: rpcContext, - TestingKnobs: ClientTestingKnobs{ - TransportFactory: func(SendOptions, *nodedialer.Dialer, ReplicaSlice) (Transport, error) { - return transport, nil +func channelWaitWithTimeout(t *testing.T, ch chan struct{}) { + t.Helper() + timeOut := 30 * time.Second + if util.RaceEnabled { + timeOut *= 10 + } + select { + case <-ch: + case <-time.After(timeOut): + t.Fatal("test timed out") + } +} +func TestBiDirectionalRangefeedNotUsedUntilUpgradeFinalilzed(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + startServerAtVer := func(ver roachpb.Version) (*testcluster.TestCluster, func()) { + st := cluster.MakeTestingClusterSettingsWithVersions(ver, ver, true) + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, // Turn off replication queues. + ServerArgs: base.TestServerArgs{ + Settings: st, + Knobs: base.TestingKnobs{ + KVClient: &kvcoord.ClientTestingKnobs{ + TransportFactory: makeTransportFactory(false, nil), + }, + + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + BinaryVersionOverride: ver, }, }, - RangeDescriptorDB: rangeDB, - NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), - Settings: cluster.MakeTestingClusterSettings(), - }) - ds.rangeCache.Insert(ctx, roachpb.RangeInfo{ - Desc: desc, - Lease: cachedLease, - }) - - err := ds.RangeFeed(ctx, []roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}, hlc.Timestamp{}, false, nil) - require.Error(t, err) + }, + }) + return tc, func() { tc.Stopper().Stop(ctx) } + } + + // Create a small table; run rangefeed. The transport factory we injected above verifies + // that we use the old rangefeed implementation. + runRangeFeed := func(tc *testcluster.TestCluster, opts ...kvcoord.RangeFeedOption) { + ts := tc.Server(0) + + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + startTime := ts.Clock().Now() + + sqlDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) + sqlDB.Exec(t, `CREATE TABLE foo (key INT PRIMARY KEY)`) + sqlDB.Exec(t, `INSERT INTO foo (key) SELECT * FROM generate_series(1, 1000)`) + + fooDesc := desctestutils.TestingGetPublicTableDescriptor( + ts.DB(), keys.SystemSQLCodec, "defaultdb", "foo") + fooSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec) + + allSeen, onValue := observeNValues(1000) + closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, onValue, false) + channelWaitWithTimeout(t, allSeen) + closeFeed() + } + + t.Run("rangefeed-stream-disabled-prior-to-version-upgrade", func(t *testing.T) { + noRfStreamVer := clusterversion.ByKey(clusterversion.RangefeedUseOneStreamPerNode - 1) + tc, cleanup := startServerAtVer(noRfStreamVer) + defer cleanup() + runRangeFeed(tc) + }) + + t.Run("rangefeed-stream-disabled-via-environment", func(t *testing.T) { + defer kvcoord.TestingSetEnableMuxRangeFeed(false)() + // Even though we could use rangefeed stream, it's disabled via kill switch. + rfStreamVer := clusterversion.ByKey(clusterversion.RangefeedUseOneStreamPerNode) + tc, cleanup := startServerAtVer(rfStreamVer) + defer cleanup() + runRangeFeed(tc, kvcoord.WithMuxRangeFeed()) + }) +} + +func TestMuxRangeFeedConnectsToNodeOnce(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + connCounts := &internalClientCounts{counts: make(map[rpc.RestrictedInternalClient]int)} + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, // Turn off replication queues. + + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + KVClient: &kvcoord.ClientTestingKnobs{ + TransportFactory: makeTransportFactory(true, connCounts), + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + ts := tc.Server(0) + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + startTime := ts.Clock().Now() + + // Create a table, and split it so that we have multiple ranges, distributed across + // test cluster nodes. + sqlDB.ExecMultiple(t, + `SET CLUSTER SETTING kv.rangefeed.enabled = true`, + `ALTER DATABASE defaultdb CONFIGURE ZONE USING num_replicas = 1`, + `CREATE TABLE foo (key INT PRIMARY KEY)`, + `INSERT INTO foo (key) SELECT * FROM generate_series(1, 1000)`, + `ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(100, 900, 100))`, + ) + + for i := 100; i <= 900; i += 100 { + storeID := 1 + i%3 + rowID := i + testutils.SucceedsSoon(t, func() error { + _, err := sqlDB.DB.ExecContext(context.Background(), + "ALTER TABLE foo EXPERIMENTAL_RELOCATE VALUES (ARRAY[$1], $2)", storeID, rowID) + return err }) } + + fooDesc := desctestutils.TestingGetPublicTableDescriptor( + ts.DB(), keys.SystemSQLCodec, "defaultdb", "foo") + fooSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec) + + allSeen, onValue := observeNValues(1000) + closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, onValue, true) + channelWaitWithTimeout(t, allSeen) + closeFeed() + + // Verify we connected to each node once. + connCounts.Lock() + defer connCounts.Unlock() + for _, c := range connCounts.counts { + require.Equal(t, 1, c) + } } diff --git a/pkg/kv/kvclient/kvcoord/send_test.go b/pkg/kv/kvclient/kvcoord/send_test.go index bcc875850b08..61dcf55d812b 100644 --- a/pkg/kv/kvclient/kvcoord/send_test.go +++ b/pkg/kv/kvclient/kvcoord/send_test.go @@ -57,6 +57,10 @@ func (n Node) RangeFeed(_ *roachpb.RangeFeedRequest, _ roachpb.Internal_RangeFee panic("unimplemented") } +func (n Node) MuxRangeFeed(server roachpb.Internal_MuxRangeFeedServer) error { + panic("unimplemented") +} + func (n Node) GossipSubscription( _ *roachpb.GossipSubscriptionRequest, _ roachpb.Internal_GossipSubscriptionServer, ) error { diff --git a/pkg/kv/kvclient/kvcoord/transport_test.go b/pkg/kv/kvclient/kvcoord/transport_test.go index 7c5e07a899ba..ec3610569db2 100644 --- a/pkg/kv/kvclient/kvcoord/transport_test.go +++ b/pkg/kv/kvclient/kvcoord/transport_test.go @@ -187,6 +187,12 @@ func (m *mockInternalClient) RangeFeed( return nil, fmt.Errorf("unsupported RangeFeed call") } +func (m *mockInternalClient) MuxRangeFeed( + ctx context.Context, opts ...grpc.CallOption, +) (roachpb.Internal_MuxRangeFeedClient, error) { + return nil, fmt.Errorf("unsupported MuxRangeFeed call") +} + // GossipSubscription is part of the roachpb.InternalClient interface. func (m *mockInternalClient) GossipSubscription( ctx context.Context, args *roachpb.GossipSubscriptionRequest, _ ...grpc.CallOption, diff --git a/pkg/kv/kvserver/kvserverbase/base.go b/pkg/kv/kvserver/kvserverbase/base.go index 9daa9d43464e..35c55dca2cfe 100644 --- a/pkg/kv/kvserver/kvserverbase/base.go +++ b/pkg/kv/kvserver/kvserverbase/base.go @@ -117,7 +117,7 @@ type ReplicaResponseFilter func(context.Context, roachpb.BatchRequest, *roachpb. // ReplicaRangefeedFilter is used in unit tests to modify the request, inject // responses, or return errors from rangefeeds. type ReplicaRangefeedFilter func( - args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) *roachpb.Error // ContainsKey returns whether this range contains the specified key. diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index c3b62db37ef2..b1955eb52f2f 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -63,7 +63,7 @@ var RangeFeedRefreshInterval = settings.RegisterDurationSetting( // support for concurrent calls to Send. Note that the default implementation of // grpc.Stream is not safe for concurrent calls to Send. type lockedRangefeedStream struct { - wrapped rangefeed.Stream + wrapped roachpb.RangeFeedEventSink sendMu syncutil.Mutex } @@ -134,13 +134,13 @@ func (tp *rangefeedTxnPusher) ResolveIntents( // complete. The provided ConcurrentRequestLimiter is used to limit the number // of rangefeeds using catch-up iterators at the same time. func (r *Replica) RangeFeed( - args *roachpb.RangeFeedRequest, stream rangefeed.Stream, + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) *roachpb.Error { return r.rangeFeedWithRangeID(r.RangeID, args, stream) } func (r *Replica) rangeFeedWithRangeID( - _forStacks roachpb.RangeID, args *roachpb.RangeFeedRequest, stream rangefeed.Stream, + _forStacks roachpb.RangeID, args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) *roachpb.Error { if !r.isRangefeedEnabled() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) { return roachpb.NewErrorf("rangefeeds require the kv.rangefeed.enabled setting. See %s", diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index cb5769755027..b11490877a89 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -3090,7 +3090,7 @@ func (s *Store) Descriptor(ctx context.Context, useCached bool) (*roachpb.StoreD // the provided stream and returns with an optional error when the rangefeed is // complete. func (s *Store) RangeFeed( - args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) *roachpb.Error { if filter := s.TestingKnobs().TestingRangefeedFilter; filter != nil { diff --git a/pkg/kv/kvserver/stores.go b/pkg/kv/kvserver/stores.go index 9dd1fd81d859..fde593a1c248 100644 --- a/pkg/kv/kvserver/stores.go +++ b/pkg/kv/kvserver/stores.go @@ -234,7 +234,7 @@ func (ls *Stores) SendWithWriteBytes( // the provided stream and returns with an optional error when the rangefeed is // complete. func (ls *Stores) RangeFeed( - args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) *roachpb.Error { ctx := stream.Context() if args.RangeID == 0 { diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 602e1fdf877c..4aec679f5751 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -11,6 +11,7 @@ package roachpb import ( + "context" "fmt" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" @@ -23,7 +24,7 @@ import ( "github.com/dustin/go-humanize" ) -//go:generate mockgen -package=roachpbmock -destination=roachpbmock/mocks_generated.go . InternalClient,Internal_RangeFeedClient +//go:generate mockgen -package=roachpbmock -destination=roachpbmock/mocks_generated.go . InternalClient,Internal_RangeFeedClient,Internal_MuxRangeFeedClient // UserPriority is a custom type for transaction's user priority. type UserPriority float64 @@ -1847,3 +1848,17 @@ const ( // with the SpecificTenantOverrides precedence.. AllTenantsOverrides ) + +// RangeFeedEventSink is an interface for sending a single rangefeed event. +type RangeFeedEventSink interface { + Context() context.Context + Send(*RangeFeedEvent) error +} + +// RangeFeedEventProducer is an adapter for receiving rangefeed events with either +// the legacy RangeFeed RPC, or the MuxRangeFeed RPC. +type RangeFeedEventProducer interface { + // Recv receives the next rangefeed event. an io.EOF error indicates that the + // range needs to be restarted. + Recv() (*RangeFeedEvent, error) +} diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index be7d760b89ae..d67d25980e2f 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -2722,6 +2722,9 @@ message RangeFeedRequest { // AdmissionHeader is used only at the start of the range feed stream, since // the initial catch-up scan be expensive. AdmissionHeader admission_header = 4 [(gogoproto.nullable) = false]; + + // StreamID is set by the client issuing MuxRangeFeed requests. + int64 stream_id = 5 [(gogoproto.customname) = "StreamID"]; } // RangeFeedValue is a variant of RangeFeedEvent that represents an update to @@ -2795,6 +2798,14 @@ message RangeFeedEvent { RangeFeedDeleteRange delete_range = 5; } +// MuxRangeFeedEvent is a response generated by MuxRangeFeed RPC. It tags +// the underlying RangeFeedEvent with the ID of the range that produced this event. +message MuxRangeFeedEvent { + RangeFeedEvent event = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; + int32 range_id = 2 [(gogoproto.customname) = "RangeID", (gogoproto.casttype) = "RangeID"]; + // Server echoes back stream_id set by the client. + int64 stream_id = 3 [(gogoproto.customname) = "StreamID"]; +} // ResetQuorumRequest makes a range that is unavailable due to lost quorum // available again, at the cost of losing all of the data in the range. Any @@ -3004,7 +3015,10 @@ message JoinNodeResponse { service Internal { rpc Batch (BatchRequest) returns (BatchResponse) {} rpc RangeLookup (RangeLookupRequest) returns (RangeLookupResponse) {} + // RangeFeed RPC is deprecated. Use MuxRangeFeed instead. + // To be removed at version 23.1 rpc RangeFeed (RangeFeedRequest) returns (stream RangeFeedEvent) {} + rpc MuxRangeFeed (stream RangeFeedRequest) returns (stream MuxRangeFeedEvent) {} rpc GossipSubscription (GossipSubscriptionRequest) returns (stream GossipSubscriptionEvent) {} rpc ResetQuorum (ResetQuorumRequest) returns (ResetQuorumResponse) {} diff --git a/pkg/roachpb/roachpbmock/BUILD.bazel b/pkg/roachpb/roachpbmock/BUILD.bazel index c50e10c59fe3..9c9d0d0f4f11 100644 --- a/pkg/roachpb/roachpbmock/BUILD.bazel +++ b/pkg/roachpb/roachpbmock/BUILD.bazel @@ -8,6 +8,7 @@ gomock( interfaces = [ "InternalClient", "Internal_RangeFeedClient", + "Internal_MuxRangeFeedClient", ], library = "//pkg/roachpb", package = "roachpbmock", diff --git a/pkg/roachpb/roachpbmock/mocks_generated.go b/pkg/roachpb/roachpbmock/mocks_generated.go index a053cca023de..377c7b7b0fcb 100644 --- a/pkg/roachpb/roachpbmock/mocks_generated.go +++ b/pkg/roachpb/roachpbmock/mocks_generated.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/cockroachdb/cockroach/pkg/roachpb (interfaces: InternalClient,Internal_RangeFeedClient) +// Source: github.com/cockroachdb/cockroach/pkg/roachpb (interfaces: InternalClient,Internal_RangeFeedClient,Internal_MuxRangeFeedClient) // Package roachpbmock is a generated GoMock package. package roachpbmock @@ -137,6 +137,26 @@ func (mr *MockInternalClientMockRecorder) Join(arg0, arg1 interface{}, arg2 ...i return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Join", reflect.TypeOf((*MockInternalClient)(nil).Join), varargs...) } +// MuxRangeFeed mocks base method. +func (m *MockInternalClient) MuxRangeFeed(arg0 context.Context, arg1 ...grpc.CallOption) (roachpb.Internal_MuxRangeFeedClient, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "MuxRangeFeed", varargs...) + ret0, _ := ret[0].(roachpb.Internal_MuxRangeFeedClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// MuxRangeFeed indicates an expected call of MuxRangeFeed. +func (mr *MockInternalClientMockRecorder) MuxRangeFeed(arg0 interface{}, arg1 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MuxRangeFeed", reflect.TypeOf((*MockInternalClient)(nil).MuxRangeFeed), varargs...) +} + // RangeFeed mocks base method. func (m *MockInternalClient) RangeFeed(arg0 context.Context, arg1 *roachpb.RangeFeedRequest, arg2 ...grpc.CallOption) (roachpb.Internal_RangeFeedClient, error) { m.ctrl.T.Helper() @@ -379,3 +399,140 @@ func (mr *MockInternal_RangeFeedClientMockRecorder) Trailer() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockInternal_RangeFeedClient)(nil).Trailer)) } + +// MockInternal_MuxRangeFeedClient is a mock of Internal_MuxRangeFeedClient interface. +type MockInternal_MuxRangeFeedClient struct { + ctrl *gomock.Controller + recorder *MockInternal_MuxRangeFeedClientMockRecorder +} + +// MockInternal_MuxRangeFeedClientMockRecorder is the mock recorder for MockInternal_MuxRangeFeedClient. +type MockInternal_MuxRangeFeedClientMockRecorder struct { + mock *MockInternal_MuxRangeFeedClient +} + +// NewMockInternal_MuxRangeFeedClient creates a new mock instance. +func NewMockInternal_MuxRangeFeedClient(ctrl *gomock.Controller) *MockInternal_MuxRangeFeedClient { + mock := &MockInternal_MuxRangeFeedClient{ctrl: ctrl} + mock.recorder = &MockInternal_MuxRangeFeedClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockInternal_MuxRangeFeedClient) EXPECT() *MockInternal_MuxRangeFeedClientMockRecorder { + return m.recorder +} + +// CloseSend mocks base method. +func (m *MockInternal_MuxRangeFeedClient) CloseSend() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CloseSend") + ret0, _ := ret[0].(error) + return ret0 +} + +// CloseSend indicates an expected call of CloseSend. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) CloseSend() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseSend", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).CloseSend)) +} + +// Context mocks base method. +func (m *MockInternal_MuxRangeFeedClient) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).Context)) +} + +// Header mocks base method. +func (m *MockInternal_MuxRangeFeedClient) Header() (metadata.MD, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Header") + ret0, _ := ret[0].(metadata.MD) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Header indicates an expected call of Header. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) Header() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Header", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).Header)) +} + +// Recv mocks base method. +func (m *MockInternal_MuxRangeFeedClient) Recv() (*roachpb.MuxRangeFeedEvent, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Recv") + ret0, _ := ret[0].(*roachpb.MuxRangeFeedEvent) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Recv indicates an expected call of Recv. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) Recv() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).Recv)) +} + +// RecvMsg mocks base method. +func (m *MockInternal_MuxRangeFeedClient) RecvMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RecvMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecvMsg indicates an expected call of RecvMsg. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) RecvMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).RecvMsg), arg0) +} + +// Send mocks base method. +func (m *MockInternal_MuxRangeFeedClient) Send(arg0 *roachpb.RangeFeedRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Send", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Send indicates an expected call of Send. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) Send(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).Send), arg0) +} + +// SendMsg mocks base method. +func (m *MockInternal_MuxRangeFeedClient) SendMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendMsg indicates an expected call of SendMsg. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) SendMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).SendMsg), arg0) +} + +// Trailer mocks base method. +func (m *MockInternal_MuxRangeFeedClient) Trailer() metadata.MD { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Trailer") + ret0, _ := ret[0].(metadata.MD) + return ret0 +} + +// Trailer indicates an expected call of Trailer. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) Trailer() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).Trailer)) +} diff --git a/pkg/rpc/auth_tenant.go b/pkg/rpc/auth_tenant.go index 0f49cf9156e7..6d1e171f400e 100644 --- a/pkg/rpc/auth_tenant.go +++ b/pkg/rpc/auth_tenant.go @@ -53,9 +53,8 @@ func (a tenantAuthorizer) authorize( case "/cockroach.roachpb.Internal/RangeLookup": return a.authRangeLookup(tenID, req.(*roachpb.RangeLookupRequest)) - case "/cockroach.roachpb.Internal/RangeFeed": + case "/cockroach.roachpb.Internal/RangeFeed", "/cockroach.roachpb.Internal/MuxRangeFeed": return a.authRangeFeed(tenID, req.(*roachpb.RangeFeedRequest)) - case "/cockroach.roachpb.Internal/GossipSubscription": return a.authGossipSubscription(tenID, req.(*roachpb.GossipSubscriptionRequest)) diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index b8ac93a8020e..f1c2257884ea 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -900,6 +900,20 @@ var rangefeedStreamInfo = &grpc.StreamServerInfo{ IsServerStream: true, } +var muxRangeFeedDesc = &grpc.StreamDesc{ + StreamName: "MuxRangeFeed", + ServerStreams: true, + ClientStreams: true, +} + +const muxRangefeedMethodName = "/cockroach.roachpb.Internal/MuxRangeFeed" + +var muxRangefeedStreamInfo = &grpc.StreamServerInfo{ + FullMethod: muxRangefeedMethodName, + IsClientStream: true, + IsServerStream: true, +} + // RangeFeed implements the RestrictedInternalClient interface. func (a internalClientAdapter) RangeFeed( ctx context.Context, args *roachpb.RangeFeedRequest, opts ...grpc.CallOption, @@ -930,7 +944,7 @@ func (a internalClientAdapter) RangeFeed( args.AdmissionHeader.SourceLocation = roachpb.AdmissionHeader_LOCAL // Spawn a goroutine running the server-side handler. This goroutine - // communicates with the client stream through rangeFeedPipe. + // communicates with the client stream through rfPipe. go func() { // Handler adapts the ServerStream to the typed interface expected by the // RPC handler (Node.RangeFeed). `stream` might be `rfPipe` which we @@ -992,87 +1006,259 @@ func (x rangeFeedClientAdapter) Recv() (*roachpb.RangeFeedEvent, error) { return m, nil } -// rangeFeedPipe is a (uni-directional) pipe of *RangeFeedEvent that implements -// the grpc.ClientStream and grpc.ServerStream interfaces. -type rangeFeedPipe struct { +// MuxRangeFeed implements the RestrictedInternalClient interface. +func (a internalClientAdapter) MuxRangeFeed( + ctx context.Context, opts ...grpc.CallOption, +) (roachpb.Internal_MuxRangeFeedClient, error) { + // MuxRangeFeed consists of two streams: the client stream and the server + // stream. Those two streams are stitched together by the go routine below + // which sends client messages to the server channel, and server responses to + // the client. + clientSide, serverSide := newMuxRangeFeedPipes(grpcutil.NewLocalRequestContext(ctx)) + + go func() { + // Handler adapts the ServerStream to the typed interface expected by the + // RPC handler (Node.RangeFeed). `stream` might be `eventPipe` which we + // pass to the interceptor chain below, or it might be another + // implementation of `ServerStream` that wraps it; in practice it will be + // tracing.grpcinterceptor.StreamServerInterceptor. + handler := func(srv interface{}, stream grpc.ServerStream) error { + adapter := muxRangeFeedServerAdapter{ + ServerStream: stream, + } + return a.server.MuxRangeFeed(adapter) + } + // Run the server interceptors, which will bottom out by running `handler` + // (defined just above), which runs Node.MuxRangeFeed (our RPC handler). + // This call is blocking. + err := a.serverStreamInterceptors.run(a.server, serverSide, muxRangefeedStreamInfo, handler) + if err == nil { + err = io.EOF + } + serverSide.errC <- err + }() + + // Run the client-side interceptors, which produce a gprc.ClientStream. + // clientSide might end up being rfPipe, or it might end up being another + // grpc.ClientStream implementation that wraps it. + // + // NOTE: For actual RPCs, going to a remote note, there's a tracing client + // interceptor producing a tracing.grpcinterceptor.tracingClientStream + // implementation of ClientStream. That client interceptor does not run for + // these local requests handled by the internalClientAdapter (as opposed to + // the tracing server interceptor, which does run). + clientStream, err := a.clientStreamInterceptors.run(ctx, muxRangeFeedDesc, nil /* ClientConn */, muxRangefeedMethodName, + // This function runs at the bottom of the client interceptor stack, + // pretending to actually make an RPC call. We don't make any calls, but + // return the pipe on which messages from the server will come. + func( + ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, opts ...grpc.CallOption, + ) (grpc.ClientStream, error) { + return clientSide, nil + }, + opts...) + if err != nil { + return nil, err + } + + return muxRangeFeedClientAdapter{ + ClientStream: clientStream, + }, nil +} + +type muxRangeFeedClientAdapter struct { + grpc.ClientStream +} + +var _ roachpb.Internal_MuxRangeFeedClient = muxRangeFeedClientAdapter{} + +func (a muxRangeFeedClientAdapter) Send(request *roachpb.RangeFeedRequest) error { + // Mark this request as originating locally. + request.AdmissionHeader.SourceLocation = roachpb.AdmissionHeader_LOCAL + return a.SendMsg(request) +} + +func (a muxRangeFeedClientAdapter) Recv() (*roachpb.MuxRangeFeedEvent, error) { + m := new(roachpb.MuxRangeFeedEvent) + if err := a.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +type muxRangeFeedServerAdapter struct { + grpc.ServerStream +} + +var _ roachpb.Internal_MuxRangeFeedServer = muxRangeFeedServerAdapter{} + +func (a muxRangeFeedServerAdapter) Send(event *roachpb.MuxRangeFeedEvent) error { + return a.SendMsg(event) +} + +func (a muxRangeFeedServerAdapter) Recv() (*roachpb.RangeFeedRequest, error) { + m := new(roachpb.RangeFeedRequest) + if err := a.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// channelStream implements grpc.ClientStream, grpc.ServerStream and grpc.Stream +// interfaces for the purpose of intercepting RCP requests to local node. +// This type simply provides access to the context (as required by grpc.Stream) interface +// while all other methods panic. Type expected to be embedded inside other types. +// Channel stream exchanges messages via channel. +type channelStream struct { ctx context.Context respC chan interface{} errC chan error + doneC <-chan struct{} } -var _ grpc.ClientStream = &rangeFeedPipe{} -var _ grpc.ServerStream = &rangeFeedPipe{} - -// newRangeFeedPipe creates a rangeFeedPipe. The pipe is returned as a pointer -// for convenience, because it's used as a grpc.ClientStream and -// grpc.ServerStream, and these interfaces are implemented on the pointer -// receiver. -func newRangeFeedPipe(ctx context.Context) *rangeFeedPipe { - return &rangeFeedPipe{ +func makeChannelStream(ctx context.Context) channelStream { + return channelStream{ ctx: ctx, respC: make(chan interface{}, 128), errC: make(chan error, 1), + doneC: ctx.Done(), } } // grpc.ClientStream methods. -func (*rangeFeedPipe) Header() (metadata.MD, error) { panic("unimplemented") } -func (*rangeFeedPipe) Trailer() metadata.MD { panic("unimplemented") } -func (*rangeFeedPipe) CloseSend() error { panic("unimplemented") } +func (*channelStream) Header() (metadata.MD, error) { panic("unimplemented") } +func (*channelStream) Trailer() metadata.MD { panic("unimplemented") } +func (*channelStream) CloseSend() error { panic("unimplemented") } // grpc.ServerStream methods. -func (*rangeFeedPipe) SetHeader(metadata.MD) error { panic("unimplemented") } -func (*rangeFeedPipe) SendHeader(metadata.MD) error { panic("unimplemented") } -func (*rangeFeedPipe) SetTrailer(metadata.MD) { panic("unimplemented") } +func (*channelStream) SetHeader(metadata.MD) error { panic("unimplemented") } +func (*channelStream) SendHeader(metadata.MD) error { panic("unimplemented") } +func (*channelStream) SetTrailer(metadata.MD) { panic("unimplemented") } // Common grpc.{Client,Server}Stream methods. -func (p *rangeFeedPipe) Context() context.Context { return p.ctx } +func (s *channelStream) Context() context.Context { return s.ctx } -// SendMsg is part of the grpc.ServerStream interface. It is also part of the -// grpc.ClientStream interface but, in the case of the RangeFeed RPC (which is -// only server-streaming, not bi-directional), only the server sends. -func (p *rangeFeedPipe) SendMsg(m interface{}) error { +// send is the implementation of SendMsg +func (s *channelStream) send(m interface{}) error { select { - case p.respC <- m: + case s.respC <- m: return nil - case <-p.ctx.Done(): - return p.ctx.Err() - } -} - -// RecvMsg is part of the grpc.ClientStream interface. It is also technically -// part of the grpc.ServerStream interface but, in the case of the RangeFeed RPC -// (which is only server-streaming, not bi-directional), only the client -// receives. -func (p *rangeFeedPipe) RecvMsg(m interface{}) error { - out := m.(*roachpb.RangeFeedEvent) - msg, err := p.recvInternal() - if err != nil { - return err + case <-s.doneC: + return s.ctx.Err() } - *out = *msg.(*roachpb.RangeFeedEvent) - return nil } -// recvInternal is the implementation of RecvMsg. -func (p *rangeFeedPipe) recvInternal() (interface{}, error) { +// recv is the implementation of RecvMsg. +func (s *channelStream) recv() (interface{}, error) { // Prioritize respC. Both channels are buffered and the only guarantee we // have is that once an error is sent on errC no other events will be sent // on respC again. select { - case e := <-p.respC: + case e := <-s.respC: return e, nil - case err := <-p.errC: + case err := <-s.errC: select { - case e := <-p.respC: - p.errC <- err + case e := <-s.respC: + s.errC <- err return e, nil default: return nil, err } + case <-s.doneC: + return nil, s.ctx.Err() } } +// eventPipe is a (uni-directional) pipe of events that implements +// the grpc.ClientStream and grpc.ServerStream interfaces. +// The type of events are abstracted behind receiver and sender callbacks. +type eventPipe struct { + channelStream + receiver func(interface{}) error + sender func(interface{}) error +} + +var _ grpc.ClientStream = &eventPipe{} +var _ grpc.ServerStream = &eventPipe{} + +// SendMsg is part of the grpc.ServerStream interface. It is also part of the +// grpc.ClientStream interface. +func (p *eventPipe) SendMsg(m interface{}) error { + return p.sender(m) +} + +// RecvMsg is part of the grpc.ClientStream interface. It is also technically +// part of the grpc.ServerStream interface but, in the case of the RangeFeed RPC +// (which is only server-streaming, not bi-directional), only the client +// receives. +func (p *eventPipe) RecvMsg(m interface{}) error { + return p.receiver(m) +} + +// newRangeFeedPipe creates a eventPipe for receiving RangeFeedEvent. +// The pipe is returned as a pointer for convenience, because it's used as a +// grpc.ClientStream and grpc.ServerStream, and these interfaces are implemented +// on the pointer receiver. +func newRangeFeedPipe(ctx context.Context) *eventPipe { + cs := makeChannelStream(ctx) + return &eventPipe{ + channelStream: cs, + receiver: func(dst interface{}) error { + msg, err := cs.recv() + if err != nil { + return err + } + *dst.(*roachpb.RangeFeedEvent) = *msg.(*roachpb.RangeFeedEvent) + return nil + }, + sender: cs.send, + } +} + +// newMuxRangeFeedPipes creates a pair of eventPipes for client and server side +// streams of the MuxRangeFeed RPC. +// The pipes are returned as a pointers for convenience, because it's used as a +// grpc.ClientStream and grpc.ServerStream, and these interfaces are implemented +// on the pointer receiver. +func newMuxRangeFeedPipes(ctx context.Context) (clientPipe, serverPipe *eventPipe) { + clientStream := makeChannelStream(ctx) + serverStream := makeChannelStream(ctx) + clientPipe = &eventPipe{ + channelStream: clientStream, + // Client receives MuxRangeFeedEvents from the server. + receiver: func(dst interface{}) error { + msg, err := clientStream.recv() + if err != nil { + return err + } + *dst.(*roachpb.MuxRangeFeedEvent) = *msg.(*roachpb.MuxRangeFeedEvent) + return nil + }, + // Client sends RangeFeedRequests to the server. + sender: func(m interface{}) error { + return serverStream.send(m) + }, + } + serverPipe = &eventPipe{ + channelStream: serverStream, + // Server receives RangeFeedRequests from the client. + receiver: func(dst interface{}) error { + msg, err := serverStream.recv() + if err != nil { + return err + } + *dst.(*roachpb.RangeFeedRequest) = *msg.(*roachpb.RangeFeedRequest) + return nil + }, + // Server sends MuxRangeFeedEvents to the client. + sender: func(m interface{}) error { + return clientStream.send(m) + }, + } + + return clientPipe, serverPipe +} + // rangeFeedServerAdapter adapts an untyped ServerStream to the typed // roachpb.Internal_RangeFeedServer interface, expected by the RangeFeed RPC // handler. diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index 8e99e1e2a6da..17ba70200d67 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -319,8 +319,22 @@ var _ roachpb.InternalServer = &internalServer{} type internalServer struct { // rangeFeedEvents are returned on RangeFeed() calls. - rangeFeedEvents []roachpb.RangeFeedEvent - serverStream roachpb.Internal_RangeFeedServer + rangeFeedEvents []roachpb.RangeFeedEvent + rfServerStream roachpb.Internal_RangeFeedServer + muxRfServerStream roachpb.Internal_MuxRangeFeedServer +} + +func (s *internalServer) MuxRangeFeed(stream roachpb.Internal_MuxRangeFeedServer) error { + s.muxRfServerStream = stream + for _, ev := range s.rangeFeedEvents { + muxEv := &roachpb.MuxRangeFeedEvent{ + RangeFeedEvent: ev, + } + if err := stream.Send(muxEv); err != nil { + return err + } + } + return nil } func (*internalServer) Batch( @@ -338,7 +352,7 @@ func (*internalServer) RangeLookup( func (s *internalServer) RangeFeed( _ *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, ) error { - s.serverStream = stream + s.rfServerStream = stream for _, ev := range s.rangeFeedEvents { evCpy := ev if err := stream.Send(&evCpy); err != nil { @@ -539,37 +553,55 @@ func TestInternalClientAdapterWithClientStreamInterceptors(t *testing.T) { serverCtx.NodeID.Set(context.Background(), 1) _ /* server */, serverInterceptors := NewServerEx(serverCtx) + testutils.RunTrueAndFalse(t, "use_mux_rangefeed", func(t *testing.T, useMux bool) { + var clientInterceptors ClientInterceptorInfo + var s *testClientStream + clientInterceptors.StreamInterceptors = append(clientInterceptors.StreamInterceptors, + func( + ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, + method string, streamer grpc.Streamer, opts ...grpc.CallOption, + ) (grpc.ClientStream, error) { + clientStream, err := streamer(ctx, desc, cc, method, opts...) + if err != nil { + return nil, err + } + s = &testClientStream{inner: clientStream} + return s, nil + }) - var clientInterceptors ClientInterceptorInfo - var s *testClientStream - clientInterceptors.StreamInterceptors = append(clientInterceptors.StreamInterceptors, - func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { - clientStream, err := streamer(ctx, desc, cc, method, opts...) - if err != nil { - return nil, err + internal := &internalServer{rangeFeedEvents: []roachpb.RangeFeedEvent{{}, {}}} + serverCtx.SetLocalInternalServer(internal, serverInterceptors, clientInterceptors) + ic := serverCtx.GetLocalInternalClientForAddr(serverCtx.Config.AdvertiseAddr, 1) + lic, ok := ic.(internalClientAdapter) + require.True(t, ok) + require.Equal(t, internal, lic.server) + + var receiveEvent func() error + if useMux { + stream, err := lic.MuxRangeFeed(ctx) + require.NoError(t, err) + receiveEvent = func() error { + _, err := stream.Recv() + return err + } + } else { + stream, err := lic.RangeFeed(ctx, &roachpb.RangeFeedRequest{}) + require.NoError(t, err) + receiveEvent = func() error { + _, err := stream.Recv() + return err } - s = &testClientStream{inner: clientStream} - return s, nil - }) - - internal := &internalServer{rangeFeedEvents: []roachpb.RangeFeedEvent{{}, {}}} - serverCtx.SetLocalInternalServer(internal, serverInterceptors, clientInterceptors) - ic := serverCtx.GetLocalInternalClientForAddr(serverCtx.Config.AdvertiseAddr, 1) - lic, ok := ic.(internalClientAdapter) - require.True(t, ok) - require.Equal(t, internal, lic.server) - - stream, err := lic.RangeFeed(ctx, &roachpb.RangeFeedRequest{}) - require.NoError(t, err) - // Consume the stream. - for { - _, err := stream.Recv() - if err == io.EOF { - break } - require.NoError(t, err) - } - require.Equal(t, len(internal.rangeFeedEvents)+1, s.recvCount) + // Consume the stream. + for { + err := receiveEvent() + if err == io.EOF { + break + } + require.NoError(t, err) + } + require.Equal(t, len(internal.rangeFeedEvents)+1, s.recvCount) + }) } // Test that a server stream interceptor can wrap the ServerStream when the @@ -591,56 +623,78 @@ func TestInternalClientAdapterWithServerStreamInterceptors(t *testing.T) { serverCtx.NodeID.Set(context.Background(), 1) _ /* server */, serverInterceptors := NewServerEx(serverCtx) + testutils.RunTrueAndFalse(t, "use_mux_rangefeed", func(t *testing.T, useMux bool) { + const int1Name = "interceptor 1" + serverInterceptors.StreamInterceptors = append(serverInterceptors.StreamInterceptors, + func( + srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler, + ) error { + serverStream := &testServerStream{name: "interceptor 1", inner: ss} + return handler(srv, serverStream) + }) + var secondInterceptorWrapped grpc.ServerStream + const int2Name = "interceptor 2" + serverInterceptors.StreamInterceptors = append(serverInterceptors.StreamInterceptors, + func( + srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler, + ) error { + secondInterceptorWrapped = ss + serverStream := &testServerStream{name: int2Name, inner: ss} + return handler(srv, serverStream) + }) - const int1Name = "interceptor 1" - serverInterceptors.StreamInterceptors = append(serverInterceptors.StreamInterceptors, - func( - srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler, - ) error { - serverStream := &testServerStream{name: "interceptor 1", inner: ss} - return handler(srv, serverStream) - }) - var secondInterceptorWrapped grpc.ServerStream - const int2Name = "interceptor 2" - serverInterceptors.StreamInterceptors = append(serverInterceptors.StreamInterceptors, - func( - srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler, - ) error { - secondInterceptorWrapped = ss - serverStream := &testServerStream{name: int2Name, inner: ss} - return handler(srv, serverStream) - }) + internal := &internalServer{rangeFeedEvents: []roachpb.RangeFeedEvent{{}, {}}} + serverCtx.SetLocalInternalServer(internal, serverInterceptors, ClientInterceptorInfo{}) + ic := serverCtx.GetLocalInternalClientForAddr(serverCtx.Config.AdvertiseAddr, 1) + lic, ok := ic.(internalClientAdapter) + require.True(t, ok) + require.Equal(t, internal, lic.server) + + var receiveEvent func() error + if useMux { + stream, err := lic.MuxRangeFeed(ctx) + require.NoError(t, err) + receiveEvent = func() error { + _, err := stream.Recv() + return err + } + } else { + stream, err := lic.RangeFeed(ctx, &roachpb.RangeFeedRequest{}) + require.NoError(t, err) + receiveEvent = func() error { + _, err := stream.Recv() + return err + } + } - internal := &internalServer{rangeFeedEvents: []roachpb.RangeFeedEvent{{}, {}}} - serverCtx.SetLocalInternalServer(internal, serverInterceptors, ClientInterceptorInfo{}) - ic := serverCtx.GetLocalInternalClientForAddr(serverCtx.Config.AdvertiseAddr, 1) - lic, ok := ic.(internalClientAdapter) - require.True(t, ok) - require.Equal(t, internal, lic.server) + // Consume the stream. This will synchronize with the server RPC handler + // goroutine, ensuring that the server-side interceptors run. + for { + err := receiveEvent() + if err == io.EOF { + break + } + require.NoError(t, err) + } - stream, err := lic.RangeFeed(ctx, &roachpb.RangeFeedRequest{}) - require.NoError(t, err) + require.IsType(t, &testServerStream{}, secondInterceptorWrapped) - // Consume the stream. This will synchronize with the server RPC handler - // goroutine, ensuring that the server-side interceptors run. - for { - _, err := stream.Recv() - if err == io.EOF { - break + require.Equal(t, int1Name, secondInterceptorWrapped.(*testServerStream).name) + var ss grpc.ServerStream + if useMux { + require.IsType(t, muxRangeFeedServerAdapter{}, internal.muxRfServerStream) + ss = internal.muxRfServerStream.(muxRangeFeedServerAdapter).ServerStream + } else { + require.IsType(t, rangeFeedServerAdapter{}, internal.rfServerStream) + ss = internal.rfServerStream.(rangeFeedServerAdapter).ServerStream } - require.NoError(t, err) - } - - require.IsType(t, &testServerStream{}, secondInterceptorWrapped) - require.Equal(t, int1Name, secondInterceptorWrapped.(*testServerStream).name) - require.IsType(t, rangeFeedServerAdapter{}, internal.serverStream) - ss := internal.serverStream.(rangeFeedServerAdapter).ServerStream - require.IsType(t, &testServerStream{}, ss) - topStream := ss.(*testServerStream) - require.Equal(t, int2Name, topStream.name) - require.IsType(t, &testServerStream{}, topStream.inner) - bottomStream := topStream.inner.(*testServerStream) - require.Equal(t, int1Name, bottomStream.name) + require.IsType(t, &testServerStream{}, ss) + topStream := ss.(*testServerStream) + require.Equal(t, int2Name, topStream.name) + require.IsType(t, &testServerStream{}, topStream.inner) + bottomStream := topStream.inner.(*testServerStream) + require.Equal(t, int1Name, bottomStream.name) + }) } type testClientStream struct { diff --git a/pkg/rpc/nodedialer/nodedialer_test.go b/pkg/rpc/nodedialer/nodedialer_test.go index 6ab800c44b89..3d5756dd205c 100644 --- a/pkg/rpc/nodedialer/nodedialer_test.go +++ b/pkg/rpc/nodedialer/nodedialer_test.go @@ -571,6 +571,10 @@ func (*internalServer) RangeFeed( panic("unimplemented") } +func (s *internalServer) MuxRangeFeed(server roachpb.Internal_MuxRangeFeedServer) error { + panic("implement me") +} + func (*internalServer) GossipSubscription( *roachpb.GossipSubscriptionRequest, roachpb.Internal_GossipSubscriptionServer, ) error { diff --git a/pkg/rpc/restricted_internal_client.go b/pkg/rpc/restricted_internal_client.go index 1f7b480c3eae..808067578c4b 100644 --- a/pkg/rpc/restricted_internal_client.go +++ b/pkg/rpc/restricted_internal_client.go @@ -24,4 +24,5 @@ import ( type RestrictedInternalClient interface { Batch(ctx context.Context, in *roachpb.BatchRequest, opts ...grpc.CallOption) (*roachpb.BatchResponse, error) RangeFeed(ctx context.Context, in *roachpb.RangeFeedRequest, opts ...grpc.CallOption) (roachpb.Internal_RangeFeedClient, error) + MuxRangeFeed(ctx context.Context, opts ...grpc.CallOption) (roachpb.Internal_MuxRangeFeedClient, error) } diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index a098516f1ae5..024f9089e0e9 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -232,6 +232,7 @@ go_library( "//pkg/util/admission/admissionpb", "//pkg/util/buildutil", "//pkg/util/contextutil", + "//pkg/util/ctxgroup", "//pkg/util/envutil", "//pkg/util/goschedstats", "//pkg/util/grpcutil", diff --git a/pkg/server/node.go b/pkg/server/node.go index 05542ada7708..aaac487ad8ab 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -46,6 +46,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -1237,6 +1238,12 @@ func (n *Node) RangeLookup( // RangeFeed implements the roachpb.InternalServer interface. func (n *Node) RangeFeed( args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, +) error { + return n.singleRangeFeed(args, stream) +} + +func (n *Node) singleRangeFeed( + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) error { pErr := n.stores.RangeFeed(args, stream) if pErr != nil { @@ -1249,6 +1256,78 @@ func (n *Node) RangeFeed( return nil } +// setRangeIDEventSink annotates each response with range and stream IDs. +// This is used by MuxRangeFeed. +// TODO: This code can be removed in 22.2 once MuxRangeFeed is the default, and +// the old style RangeFeed deprecated. +type setRangeIDEventSink struct { + ctx context.Context + rangeID roachpb.RangeID + streamID int64 + wrapped *lockedMuxStream +} + +func (s *setRangeIDEventSink) Context() context.Context { + return s.ctx +} + +func (s *setRangeIDEventSink) Send(event *roachpb.RangeFeedEvent) error { + response := &roachpb.MuxRangeFeedEvent{ + RangeFeedEvent: *event, + RangeID: s.rangeID, + StreamID: s.streamID, + } + return s.wrapped.Send(response) +} + +var _ roachpb.RangeFeedEventSink = (*setRangeIDEventSink)(nil) + +// lockedMuxStream provides support for concurrent calls to Send. +// The underlying MuxRangeFeedServer is not safe for concurrent calls to Send. +type lockedMuxStream struct { + wrapped roachpb.Internal_MuxRangeFeedServer + sendMu syncutil.Mutex +} + +func (s *lockedMuxStream) Context() context.Context { + return s.wrapped.Context() +} + +func (s *lockedMuxStream) Send(e *roachpb.MuxRangeFeedEvent) error { + s.sendMu.Lock() + defer s.sendMu.Unlock() + return s.wrapped.Send(e) +} + +// MuxRangeFeed implements the roachpb.InternalServer interface. +func (n *Node) MuxRangeFeed(stream roachpb.Internal_MuxRangeFeedServer) error { + ctx, cancelFeeds := n.stopper.WithCancelOnQuiesce(stream.Context()) + defer cancelFeeds() + rfGrp := ctxgroup.WithContext(ctx) + + muxStream := &lockedMuxStream{wrapped: stream} + for { + req, err := stream.Recv() + if err != nil { + cancelFeeds() + return errors.CombineErrors(err, rfGrp.Wait()) + } + + rfGrp.GoCtx(func(ctx context.Context) error { + ctx, span := tracing.ForkSpan(logtags.AddTag(ctx, "r", req.RangeID), "mux-rf") + defer span.Finish() + + sink := setRangeIDEventSink{ + ctx: ctx, + rangeID: req.RangeID, + streamID: req.StreamID, + wrapped: muxStream, + } + return n.singleRangeFeed(req, &sink) + }) + } +} + // ResetQuorum implements the roachpb.InternalServer interface. func (n *Node) ResetQuorum( ctx context.Context, req *roachpb.ResetQuorumRequest,