From 5c53a460e93c2d0f65ba8a78cc1f59984c02f15c Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Tue, 25 Jan 2022 07:52:45 -0500 Subject: [PATCH] kv: Introduce MuxRangeFeed bi-directional RPC. As demonstrated in #74219, running rangefeed over large enough table or perhaps not consuming rangefeed events quickly enough, may cause undesirable memory blow up, including up to OOMing the node. Since rangefeeds tend to run on (almost) every node, a possiblity exists that a entire cluster might get distabilized. One of the reasons for this is that the memory used by low(er) level http2 RPC transport is not currently being tracked -- nor do we have a mechanism to add such tracking. #74456 provided partial mitigation for this issue, where a single RangeFeed stream could be restricted to use less memory. Nonetheless, the possibility of excessive memory usage remains since the number of rangefeed streams in the system could be very large. This PR introduce a new bi-directional streaming RPC called `MuxRangeFeed`. In a uni-directonal streaming RPC, the client estabilishes connection to the KV server and requests to receive events for 1 span. A separate RPC stream is created for each span. In contrast, `MuxRangeFeed` is bi-directional RPC: the client is expected to connect to the kv server and request as many spans as it wishes to receive from that server. The server multiplexes all events onto a single stream, and the client de-multiplexes those events appropriately on the receiving end. Note: just like in a uni-directional implementation, each call to `MuxRangeFeed` method (i.e. a logical rangefeed established by the client) is treated independently. That is, even though it is possible to multiplex all logical rangefeed streams onto a single bi-directional stream to a single KV node, this optimization is not currently implementation as it raises questions about scheduling and fairness. If we need to further reduce the number of bi-directional streams in the future, from `number_logical_feeds*number_of_nodes` down to the `number_of_nodes`, such optimization can be added at a later time. Multiplexing all of the spans hosted on a node onto a single bi-directional stream reduces the number of such stream from the `number_of_ranges` down to `number_of_nodes` (per logical range feed). This, in turn, enables the client to reserve the worst case amount of memory before starting the rangefeed. This amount of memory is bound by the number of nodes in the cluster times per-stream maximum memory -- default is `2MB`, but this can be changed via dedicated rangefeed connection class. The server side of the equation may also benefit from knowing how many rangefeeds are running right now. Even though the server is somewhat protected from memory blow up via http2 pushback, we may want to also manage server side memory better. Memory accounting for client (and possible server) will be added in the follow on PRs. The use of new RPC is contingent on the callers providing `WithMuxRangefeed` option when starting the rangefeed. However, an envornmnet variable "kill switch" `COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED` exists to disable the use of mux rangefeed in case of serious issues being discovered. Release Notes (enterprise change) Rangefeeds (which are used internally e.g. by changefeeds) now use a common HTTP/2 stream per client for all range replicas on a node, instead of one per replica. This significantly reduces the amount of network buffer memory usage, which could cause nodes to run out of memory if a client was slow to consume events. The rangefeed implementation can be revered back to its old implementation via restarting the cluster with the `COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED` environment variable set to false. Release note (enterprise change): Introduce a new Rangefeed RPC called `MuxRangeFeed`. Rangefeeds may now use a common HTTP/2 stream per client for all range replicas on a node, instead of one per replica. This significantly reduces the amount of network buffer memory usage, which could cause nodes to run out of memory if a client was slow to consume events. The caller may opt in to use new mechanism by specifying `WithMuxRangefeed` option when starting the rangefeed. However, a cluster wide `COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED` environment variable may be set to `false` to inhibit the use of this new RPC. --- build/bazelutil/check.sh | 2 +- .../settings/settings-for-tenants.txt | 2 +- docs/generated/settings/settings.html | 2 +- pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go | 1 + .../changefeedccl/kvfeed/physical_kv_feed.go | 1 + pkg/ccl/kvccl/kvtenantccl/connector_test.go | 4 + pkg/clusterversion/cockroach_versions.go | 6 + pkg/clusterversion/key_string.go | 5 +- pkg/kv/kvclient/kvcoord/BUILD.bazel | 7 + .../kvcoord/dist_sender_mux_rangefeed.go | 281 +++++++++++ .../kvclient/kvcoord/dist_sender_rangefeed.go | 91 +++- .../dist_sender_rangefeed_mock_test.go | 182 ++++++++ .../kvcoord/dist_sender_rangefeed_test.go | 435 +++++++++++++----- pkg/kv/kvclient/kvcoord/send_test.go | 4 + pkg/kv/kvclient/kvcoord/transport_test.go | 6 + pkg/kv/kvserver/kvserverbase/base.go | 2 +- pkg/kv/kvserver/replica_rangefeed.go | 6 +- pkg/kv/kvserver/store.go | 2 +- pkg/kv/kvserver/stores.go | 2 +- pkg/roachpb/api.go | 19 +- pkg/roachpb/api.proto | 14 + pkg/roachpb/roachpbmock/BUILD.bazel | 1 + pkg/roachpb/roachpbmock/mocks_generated.go | 159 ++++++- pkg/rpc/auth_tenant.go | 3 +- pkg/rpc/context.go | 204 +++++++- pkg/rpc/context_test.go | 5 + pkg/rpc/nodedialer/nodedialer_test.go | 4 + pkg/rpc/restricted_internal_client.go | 1 + pkg/server/BUILD.bazel | 1 + pkg/server/node.go | 63 +++ 30 files changed, 1357 insertions(+), 158 deletions(-) create mode 100644 pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go create mode 100644 pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go diff --git a/build/bazelutil/check.sh b/build/bazelutil/check.sh index 49cd804695dd..886986eeb7fc 100755 --- a/build/bazelutil/check.sh +++ b/build/bazelutil/check.sh @@ -22,7 +22,7 @@ pkg/kv/kvclient/rangecache/range_cache.go://go:generate mockgen -package=rangeca pkg/kv/kvclient/rangefeed/rangefeed.go://go:generate mockgen -destination=mocks_generated_test.go --package=rangefeed . DB pkg/kv/kvserver/concurrency/lock_table.go://go:generate ../../../util/interval/generic/gen.sh *lockState concurrency pkg/kv/kvserver/spanlatch/manager.go://go:generate ../../../util/interval/generic/gen.sh *latch spanlatch -pkg/roachpb/api.go://go:generate mockgen -package=roachpbmock -destination=roachpbmock/mocks_generated.go . InternalClient,Internal_RangeFeedClient +pkg/roachpb/api.go://go:generate mockgen -package=roachpbmock -destination=roachpbmock/mocks_generated.go . InternalClient,Internal_RangeFeedClient,Internal_MuxRangeFeedClient pkg/roachpb/batch.go://go:generate go run gen/main.go --filename batch_generated.go *.pb.go pkg/security/certmgr/cert.go://go:generate mockgen -package=certmgr -destination=mocks_generated_test.go . Cert pkg/security/securitytest/securitytest.go://go:generate go-bindata -mode 0600 -modtime 1400000000 -pkg securitytest -o embedded.go -ignore README.md -ignore regenerate.sh test_certs diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 4443f195c9ee..97ff51c78f5c 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -283,4 +283,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https:///#/debug/tracez trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 22.1-32 set the active cluster version in the format '.' +version version 22.1-34 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 9376a4c93c4f..7414100f0ba9 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -214,6 +214,6 @@ trace.opentelemetry.collectorstringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.span_registry.enabledbooleantrueif set, ongoing traces can be seen at https:///#/debug/tracez trace.zipkin.collectorstringthe address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -versionversion22.1-32set the active cluster version in the format '.' +versionversion22.1-34set the active cluster version in the format '.' diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go index f06741afb227..2164551c6874 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go @@ -392,6 +392,7 @@ func (f rawEventFeed) run( spans []kvcoord.SpanTimePair, withDiff bool, eventC chan<- *roachpb.RangeFeedEvent, + opts ...kvcoord.RangeFeedOption, ) error { var startAfter hlc.Timestamp for _, s := range spans { diff --git a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go index 142a9f700a88..7351bcb748ae 100644 --- a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go @@ -38,6 +38,7 @@ type rangefeedFactory func( spans []kvcoord.SpanTimePair, withDiff bool, eventC chan<- *roachpb.RangeFeedEvent, + opts ...kvcoord.RangeFeedOption, ) error type rangefeed struct { diff --git a/pkg/ccl/kvccl/kvtenantccl/connector_test.go b/pkg/ccl/kvccl/kvtenantccl/connector_test.go index 672bd70e920b..4212f1d44a19 100644 --- a/pkg/ccl/kvccl/kvtenantccl/connector_test.go +++ b/pkg/ccl/kvccl/kvtenantccl/connector_test.go @@ -90,6 +90,10 @@ func (*mockServer) RangeFeed(*roachpb.RangeFeedRequest, roachpb.Internal_RangeFe panic("unimplemented") } +func (m *mockServer) MuxRangeFeed(server roachpb.Internal_MuxRangeFeedServer) error { + panic("implement me") +} + func (*mockServer) Join( context.Context, *roachpb.JoinNodeRequest, ) (*roachpb.JoinNodeResponse, error) { diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 72ec94d32ec7..9ea3d75f9e5f 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -327,6 +327,8 @@ const ( // AlterSystemStatementStatisticsAddIndexRecommendations adds an // index_recommendations column to the system.statement_statistics table. AlterSystemStatementStatisticsAddIndexRecommendations + // RangefeedUseOneStreamPerNode changes rangefeed implementation to use 1 RPC stream per node. + RangefeedUseOneStreamPerNode // ************************************************* // Step (1): Add new versions here. @@ -564,6 +566,10 @@ var versionsSingleton = keyedVersions{ Key: AlterSystemStatementStatisticsAddIndexRecommendations, Version: roachpb.Version{Major: 22, Minor: 1, Internal: 32}, }, + { + Key: RangefeedUseOneStreamPerNode, + Version: roachpb.Version{Major: 22, Minor: 1, Internal: 34}, + }, // ************************************************* // Step (2): Add new versions here. diff --git a/pkg/clusterversion/key_string.go b/pkg/clusterversion/key_string.go index 2087c66a50a7..ccf8c420d572 100644 --- a/pkg/clusterversion/key_string.go +++ b/pkg/clusterversion/key_string.go @@ -58,11 +58,12 @@ func _() { _ = x[AlterSystemSQLInstancesAddLocality-47] _ = x[SystemExternalConnectionsTable-48] _ = x[AlterSystemStatementStatisticsAddIndexRecommendations-49] + _ = x[RangefeedUseOneStreamPerNode-50] } -const _Key_name = "V21_2Start22_1PebbleFormatBlockPropertyCollectorProbeRequestPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenantDeleteCommentsWithDroppedIndexesRemoveIncompatibleDatabasePrivilegesAddRaftAppliedIndexTermMigrationPostAddRaftAppliedIndexTermMigrationDontProposeWriteTimestampForLeaseTransfersEnablePebbleFormatVersionBlockPropertiesMVCCIndexBackfillerEnableLeaseHolderRemovalLooselyCoupledRaftLogTruncationChangefeedIdlenessEnableDeclarativeSchemaChangerRowLevelTTLPebbleFormatSplitUserKeysMarkedEnableNewStoreRebalancerClusterLocksVirtualTableAutoStatsTableSettingsSuperRegionsEnableNewChangefeedOptionsSpanCountTablePreSeedSpanCountTableSeedSpanCountTableV22_1Start22_2LocalTimestampsPebbleFormatSplitUserKeysMarkedCompactedEnsurePebbleFormatVersionRangeKeysEnablePebbleFormatVersionRangeKeysTrigramInvertedIndexesRemoveGrantPrivilegeMVCCRangeTombstonesUpgradeSequenceToBeReferencedByIDSampledStmtDiagReqsAddSSTableTombstonesSystemPrivilegesTableEnablePredicateProjectionChangefeedAlterSystemSQLInstancesAddLocalitySystemExternalConnectionsTableAlterSystemStatementStatisticsAddIndexRecommendations" +const _Key_name = "V21_2Start22_1PebbleFormatBlockPropertyCollectorProbeRequestPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenantDeleteCommentsWithDroppedIndexesRemoveIncompatibleDatabasePrivilegesAddRaftAppliedIndexTermMigrationPostAddRaftAppliedIndexTermMigrationDontProposeWriteTimestampForLeaseTransfersEnablePebbleFormatVersionBlockPropertiesMVCCIndexBackfillerEnableLeaseHolderRemovalLooselyCoupledRaftLogTruncationChangefeedIdlenessEnableDeclarativeSchemaChangerRowLevelTTLPebbleFormatSplitUserKeysMarkedEnableNewStoreRebalancerClusterLocksVirtualTableAutoStatsTableSettingsSuperRegionsEnableNewChangefeedOptionsSpanCountTablePreSeedSpanCountTableSeedSpanCountTableV22_1Start22_2LocalTimestampsPebbleFormatSplitUserKeysMarkedCompactedEnsurePebbleFormatVersionRangeKeysEnablePebbleFormatVersionRangeKeysTrigramInvertedIndexesRemoveGrantPrivilegeMVCCRangeTombstonesUpgradeSequenceToBeReferencedByIDSampledStmtDiagReqsAddSSTableTombstonesSystemPrivilegesTableEnablePredicateProjectionChangefeedAlterSystemSQLInstancesAddLocalitySystemExternalConnectionsTableAlterSystemStatementStatisticsAddIndexRecommendationsRangefeedUseOneStreamPerNode" -var _Key_index = [...]uint16{0, 5, 14, 48, 60, 88, 118, 146, 167, 186, 220, 258, 292, 324, 360, 392, 428, 470, 510, 529, 553, 584, 602, 632, 643, 674, 698, 722, 744, 756, 782, 796, 817, 835, 840, 849, 864, 904, 938, 972, 994, 1014, 1033, 1066, 1085, 1105, 1126, 1161, 1195, 1225, 1278} +var _Key_index = [...]uint16{0, 5, 14, 48, 60, 88, 118, 146, 167, 186, 220, 258, 292, 324, 360, 392, 428, 470, 510, 529, 553, 584, 602, 632, 643, 674, 698, 722, 744, 756, 782, 796, 817, 835, 840, 849, 864, 904, 938, 972, 994, 1014, 1033, 1066, 1085, 1105, 1126, 1161, 1195, 1225, 1278, 1306} func (i Key) String() string { if i < 0 || i >= Key(len(_Key_index)-1) { diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index 31656644fc01..859a12250e70 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "batch.go", "condensable_span_set.go", "dist_sender.go", + "dist_sender_mux_rangefeed.go", "dist_sender_rangefeed.go", "doc.go", "local_test_cluster_util.go", @@ -38,6 +39,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/base", + "//pkg/clusterversion", "//pkg/gossip", "//pkg/keys", "//pkg/kv", @@ -110,6 +112,7 @@ go_test( srcs = [ "batch_test.go", "condensable_span_set_test.go", + "dist_sender_rangefeed_mock_test.go", "dist_sender_rangefeed_test.go", "dist_sender_server_test.go", "dist_sender_test.go", @@ -142,6 +145,7 @@ go_test( deps = [ "//build/bazelutil:noop", "//pkg/base", + "//pkg/clusterversion", "//pkg/config", "//pkg/config/zonepb", "//pkg/gossip", @@ -165,6 +169,7 @@ go_test( "//pkg/security/securitytest", "//pkg/server", "//pkg/settings/cluster", + "//pkg/sql/catalog/desctestutils", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", "//pkg/storage", @@ -174,9 +179,11 @@ go_test( "//pkg/testutils/localtestcluster", "//pkg/testutils/serverutils", "//pkg/testutils/skip", + "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util", "//pkg/util/caller", + "//pkg/util/ctxgroup", "//pkg/util/grpcutil", "//pkg/util/hlc", "//pkg/util/leaktest", diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go new file mode 100644 index 000000000000..ab042ecf05ff --- /dev/null +++ b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go @@ -0,0 +1,281 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvcoord + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" +) + +// rangefeedMuxer is responsible for coordination and management of mux +// rangefeeds. rangefeedMuxer caches MuxRangeFeed stream per node, and executes +// each range feed request on an appropriate node. +type rangefeedMuxer struct { + // eventCh receives events from all active muxStreams. + eventCh chan *roachpb.MuxRangeFeedEvent + + // Context group controlling execution of MuxRangeFeed calls. + g ctxgroup.Group + + // State pertaining to actively executing MuxRangeFeeds. + mu struct { + syncutil.Mutex + + // terminalErr set when a terminal error occurs. + // Subsequent calls to startMuxRangeFeed will return this error. + terminalErr error + + // Each call to start new range feed gets a unique ID which is echoed back + // by MuxRangeFeed rpc. This is done as a safety mechanism to make sure + // that we always send the event to the correct consumer -- even if the + // range feed is terminated and re-established rapidly. + nextStreamID int64 + + // muxClient contains a nodeID->MuxRangeFeedClient. + muxClients map[roachpb.NodeID]*muxClientState + // producers maps streamID to the event producer -- data sent back to the + // consumer of range feed events. + producers map[int64]*channelRangeFeedEventProducer + } +} + +type muxClientState struct { + client roachpb.Internal_MuxRangeFeedClient + streams util.FastIntSet + cancel context.CancelFunc +} + +func newRangefeedMuxer(g ctxgroup.Group) *rangefeedMuxer { + m := &rangefeedMuxer{ + eventCh: make(chan *roachpb.MuxRangeFeedEvent), + g: g, + } + m.mu.muxClients = make(map[roachpb.NodeID]*muxClientState) + m.mu.producers = make(map[int64]*channelRangeFeedEventProducer) + + m.g.GoCtx(m.demux) + + return m +} + +type eventOrError struct { + event *roachpb.RangeFeedEvent + err error +} + +// channelRangeFeedEventProducer is a rangeFeedEventProducer which receives +// events on input channel, and returns events when Recv is called. +type channelRangeFeedEventProducer struct { + ctx context.Context + errCh chan error // Signalled to propagate terminal error the consumer. + input chan eventOrError +} + +// Recv implements rangeFeedEventProducer interface. +func (c *channelRangeFeedEventProducer) Recv() (*roachpb.RangeFeedEvent, error) { + select { + case <-c.ctx.Done(): + return nil, c.ctx.Err() + case err := <-c.errCh: + return nil, err + case e := <-c.input: + return e.event, e.err + } +} + +var _ roachpb.RangeFeedEventProducer = (*channelRangeFeedEventProducer)(nil) + +// startMuxRangeFeed begins the execution of rangefeed for the specified +// RangeFeedRequest. +func (m *rangefeedMuxer) startMuxRangeFeed( + ctx context.Context, client rpc.RestrictedInternalClient, req *roachpb.RangeFeedRequest, +) (_ roachpb.RangeFeedEventProducer, cleanup func(), _ error) { + m.mu.Lock() + defer m.mu.Unlock() + + var nodeID roachpb.NodeID + streamID := m.mu.nextStreamID + m.mu.nextStreamID++ + + cleanup = func() { + m.mu.Lock() + defer m.mu.Unlock() + + delete(m.mu.producers, streamID) + muxState := m.mu.muxClients[nodeID] + if muxState != nil { + muxState.streams.Remove(int(streamID)) + if muxState.streams.Len() == 0 { + // This node no longer has any active streams. + // Delete node from the muxClient list, and gracefully + // shutdown consumer go routine. + delete(m.mu.muxClients, nodeID) + muxState.cancel() + } + } + } + + if m.mu.terminalErr != nil { + return nil, cleanup, m.mu.terminalErr + } + + // Lookup or establish MuxRangeFeed for this node. + nodeID = req.Replica.NodeID + var found bool + ms, found := m.mu.muxClients[nodeID] + + if !found { + ctx, cancel := context.WithCancel(ctx) + stream, err := client.MuxRangeFeed(ctx) + if err != nil { + cancel() + return nil, cleanup, err + } + + ms = &muxClientState{client: stream, cancel: cancel} + m.mu.muxClients[req.Replica.NodeID] = ms + m.g.GoCtx(func(ctx context.Context) error { + defer cancel() + return m.receiveEventsFromNode(ctx, nodeID, stream) + }) + } + + // Start RangeFeed for this request. + req.StreamID = streamID + ms.streams.Add(int(streamID)) + + if err := ms.client.Send(req); err != nil { + return nil, cleanup, err + } + + producer := &channelRangeFeedEventProducer{ + ctx: ctx, + errCh: make(chan error, 1), + input: make(chan eventOrError), + } + m.mu.producers[streamID] = producer + return producer, cleanup, nil +} + +// demux de-multiplexes events and sends them to appropriate rangefeed event +// consumer. +func (m *rangefeedMuxer) demux(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case e := <-m.eventCh: + m.mu.Lock() + producer, found := m.mu.producers[e.StreamID] + m.mu.Unlock() + + if !found { + return m.shutdownWithError(errors.AssertionFailedf( + "expected to find consumer for streamID=%d", e.StreamID), + ) + } + + select { + case <-ctx.Done(): + return ctx.Err() + case producer.input <- eventOrError{event: &e.RangeFeedEvent}: + } + } + } +} + +// receiveEventsFromNode receives mux rangefeed events, and forwards them to the +// consumer channel. +func (m *rangefeedMuxer) receiveEventsFromNode( + ctx context.Context, nodeID roachpb.NodeID, stream roachpb.Internal_MuxRangeFeedClient, +) error { + for { + event, streamErr := stream.Recv() + + if streamErr != nil { + if err := m.propagateStreamTerminationErrorToConsumers(ctx, nodeID, streamErr); err != nil { + return err + } + // Since the stream error is handled above, we return nil to gracefully shut down + // this go routine. + return nil //nolint:returnerrcheck + } + + select { + case <-ctx.Done(): + return ctx.Err() + case m.eventCh <- event: + } + } +} + +// propagateStreamTerminationErrorToConsumers called when mux stream running on +// a node encountered an error. All consumers will receive the stream +// termination error and will handle it appropriately. +func (m *rangefeedMuxer) propagateStreamTerminationErrorToConsumers( + ctx context.Context, nodeID roachpb.NodeID, streamErr error, +) error { + // Grab muxStream associated with the node, and clear it out. + // We don't run entirety of this function under lock since + // as soon as we send an error to a consumer, a re-entrant call + // to re-establish range feed may be made. + m.mu.Lock() + ms, streamFound := m.mu.muxClients[nodeID] + // Note: it's okay if the stream is not found; this can happen if the + // nodeID was already removed from muxClients because we're trying to gracefully + // shutdown receiveEventsFromNode go routine. + + var producers []*channelRangeFeedEventProducer + if streamFound { + ms.streams.ForEach(func(streamID int) { + producers = append(producers, m.mu.producers[int64(streamID)]) + delete(m.mu.producers, int64(streamID)) + }) + } + + delete(m.mu.muxClients, nodeID) + m.mu.Unlock() + + for _, producer := range producers { + select { + case <-ctx.Done(): + return ctx.Err() + case producer.input <- eventOrError{err: streamErr}: + } + } + + return nil +} + +// shutdownWithError terminates this rangefeedMuxer with a terminal error +// (usually an assertion failure). It's a bit unwieldy way to propagate this +// error to the caller, but as soon as one of consumers notices this terminal +// error, the context should be cancelled. +// Returns the terminal error passed in. +func (m *rangefeedMuxer) shutdownWithError(terminalErr error) error { + // Okay to run this under the lock since as soon as a consumer sees this terminal + // error, the whole rangefeed should terminate. + m.mu.Lock() + defer m.mu.Unlock() + + m.mu.terminalErr = terminalErr + for _, p := range m.mu.producers { + p.errCh <- terminalErr // buffered channel + } + + return terminalErr +} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index ac0e856fc64a..6e8511fb5c1a 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -19,6 +19,7 @@ import ( "time" "unsafe" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -26,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/iterutil" @@ -69,6 +71,29 @@ func maxConcurrentCatchupScans(sv *settings.Values) int { return int(l) } +type rangeFeedConfig struct { + useMuxRangeFeed bool +} + +// RangeFeedOption configures a RangeFeed. +type RangeFeedOption interface { + set(*rangeFeedConfig) +} + +type optionFunc func(*rangeFeedConfig) + +func (o optionFunc) set(c *rangeFeedConfig) { o(c) } + +// WithMuxRangeFeed configures range feed to use MuxRangeFeed RPC. +func WithMuxRangeFeed() RangeFeedOption { + return optionFunc(func(c *rangeFeedConfig) { + c.useMuxRangeFeed = true + }) +} + +// A "kill switch" to disable multiplexing rangefeed if severe issues discovered with new implementation. +var enableMuxRangeFeed = envutil.EnvOrDefaultBool("COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED", true) + // RangeFeed divides a RangeFeed request on range boundaries and establishes a // RangeFeed to each of the individual ranges. It streams back results on the // provided channel. @@ -88,6 +113,7 @@ func (ds *DistSender) RangeFeed( startAfter hlc.Timestamp, // exclusive withDiff bool, eventCh chan<- *roachpb.RangeFeedEvent, + opts ...RangeFeedOption, ) error { timedSpans := make([]SpanTimePair, 0, len(spans)) for _, sp := range spans { @@ -96,7 +122,7 @@ func (ds *DistSender) RangeFeed( StartAfter: startAfter, }) } - return ds.RangeFeedSpans(ctx, timedSpans, withDiff, eventCh) + return ds.RangeFeedSpans(ctx, timedSpans, withDiff, eventCh, opts...) } // SpanTimePair is a pair of span along with its starting time. The starting @@ -110,12 +136,21 @@ type SpanTimePair struct { // RangeFeedSpans is similar to RangeFeed but allows specification of different // starting time for each span. func (ds *DistSender) RangeFeedSpans( - ctx context.Context, spans []SpanTimePair, withDiff bool, eventCh chan<- *roachpb.RangeFeedEvent, + ctx context.Context, + spans []SpanTimePair, + withDiff bool, + eventCh chan<- *roachpb.RangeFeedEvent, + opts ...RangeFeedOption, ) error { if len(spans) == 0 { return errors.AssertionFailedf("expected at least 1 span, got none") } + var cfg rangeFeedConfig + for _, opt := range opts { + opt.set(&cfg) + } + ctx = ds.AnnotateCtx(ctx) ctx, sp := tracing.EnsureChildSpan(ctx, ds.AmbientContext.Tracer, "dist sender") defer sp.Finish() @@ -128,6 +163,16 @@ func (ds *DistSender) RangeFeedSpans( "distSenderCatchupLimit", maxConcurrentCatchupScans(&ds.st.SV)) g := ctxgroup.WithContext(ctx) + + var eventProducer rangeFeedEventProducerFactory + if ds.st.Version.IsActive(ctx, clusterversion.RangefeedUseOneStreamPerNode) && + enableMuxRangeFeed && cfg.useMuxRangeFeed { + m := newRangefeedMuxer(g) + eventProducer = m.startMuxRangeFeed + } else { + eventProducer = legacyRangeFeedEventProducer + } + // Goroutine that processes subdivided ranges and creates a rangefeed for // each. rangeCh := make(chan singleRangeInfo, 16) @@ -137,7 +182,8 @@ func (ds *DistSender) RangeFeedSpans( case sri := <-rangeCh: // Spawn a child goroutine to process this feed. g.GoCtx(func(ctx context.Context) error { - return ds.partialRangeFeed(ctx, rr, sri.rs, sri.startAfter, sri.token, withDiff, &catchupSem, rangeCh, eventCh) + return ds.partialRangeFeed(ctx, rr, eventProducer, sri.rs, sri.startAfter, + sri.token, withDiff, &catchupSem, rangeCh, eventCh) }) case <-ctx.Done(): return ctx.Err() @@ -293,6 +339,7 @@ func (ds *DistSender) divideAndSendRangeFeedToRanges( func (ds *DistSender) partialRangeFeed( ctx context.Context, rr *rangeFeedRegistry, + streamProducerFactory rangeFeedEventProducerFactory, rs roachpb.RSpan, startAfter hlc.Timestamp, token rangecache.EvictionToken, @@ -334,8 +381,9 @@ func (ds *DistSender) partialRangeFeed( } // Establish a RangeFeed for a single Range. - maxTS, err := ds.singleRangeFeed(ctx, span, startAfter, withDiff, token.Desc(), - catchupSem, eventCh, active.onRangeEvent) + maxTS, err := ds.singleRangeFeed( + ctx, span, startAfter, withDiff, token.Desc(), + catchupSem, eventCh, streamProducerFactory, active.onRangeEvent) // Forward the timestamp in case we end up sending it again. startAfter.Forward(maxTS) @@ -408,6 +456,7 @@ func (ds *DistSender) singleRangeFeed( desc *roachpb.RangeDescriptor, catchupSem *limit.ConcurrentRequestLimiter, eventCh chan<- *roachpb.RangeFeedEvent, + streamProducerFactory rangeFeedEventProducerFactory, onRangeEvent onRangeEventCb, ) (hlc.Timestamp, error) { // Ensure context is cancelled on all errors, to prevent gRPC stream leaks. @@ -432,8 +481,6 @@ func (ds *DistSender) singleRangeFeed( return args.Timestamp, err } replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), latencyFn) - // The RangeFeed is not used for system critical traffic so use a DefaultClass - // connection regardless of the range. opts := SendOptions{class: connectionClass(&ds.st.SV)} transport, err := ds.transportFactory(opts, ds.nodeDialer, replicas) if err != nil { @@ -459,11 +506,21 @@ func (ds *DistSender) singleRangeFeed( // cleanup catchup reservation in case of early termination. defer finishCatchupScan() + var streamCleanup func() + maybeCleanupStream := func() { + if streamCleanup != nil { + streamCleanup() + streamCleanup = nil + } + } + defer maybeCleanupStream() + for { if transport.IsExhausted() { return args.Timestamp, newSendError( fmt.Sprintf("sending to all %d replicas failed", len(replicas))) } + maybeCleanupStream() args.Replica = transport.NextReplica() client, err := transport.NextInternalClient(ctx) @@ -473,7 +530,9 @@ func (ds *DistSender) singleRangeFeed( } log.VEventf(ctx, 3, "attempting to create a RangeFeed over replica %s", args.Replica) - stream, err := client.RangeFeed(ctx, &args) + + var stream roachpb.RangeFeedEventProducer + stream, streamCleanup, err = streamProducerFactory(ctx, client, &args) if err != nil { log.VErrEventf(ctx, 2, "RPC error: %s", err) if grpcutil.IsAuthError(err) { @@ -524,3 +583,19 @@ func connectionClass(sv *settings.Values) rpc.ConnectionClass { } return rpc.DefaultClass } + +type rangeFeedEventProducerFactory func( + ctx context.Context, + client rpc.RestrictedInternalClient, + req *roachpb.RangeFeedRequest, +) (roachpb.RangeFeedEventProducer, func(), error) + +// legacyRangeFeedEventProducer is a rangeFeedEventProducerFactory using +// legacy RangeFeed RPC. +func legacyRangeFeedEventProducer( + ctx context.Context, client rpc.RestrictedInternalClient, req *roachpb.RangeFeedRequest, +) (producer roachpb.RangeFeedEventProducer, cleanup func(), err error) { + cleanup = func() {} + producer, err = client.RangeFeed(ctx, req) + return producer, cleanup, err +} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go new file mode 100644 index 000000000000..9d311da46b95 --- /dev/null +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go @@ -0,0 +1,182 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvcoord + +import ( + "context" + "fmt" + "io" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/gossip" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache/rangecachemock" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/roachpb/roachpbmock" + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/stop" + gomock "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" +) + +// TestingSetEnableMuxRangeFeed adjusts enable rangefeed env variable +// for testing. +func TestingSetEnableMuxRangeFeed(enabled bool) func() { + old := enableMuxRangeFeed + enableMuxRangeFeed = enabled + return func() { + enableMuxRangeFeed = old + } +} + +// Tests that the range feed handles transport errors appropriately. In +// particular, that when encountering other decommissioned nodes it will refresh +// its range descriptor and retry, but if this node is decommissioned it will +// bail out. Regression test for: +// https://github.com/cockroachdb/cockroach/issues/66636 +func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + for _, useMuxRangeFeed := range []bool{false, true} { + for _, spec := range []struct { + errorCode codes.Code + expectRetry bool + }{ + {codes.FailedPrecondition, true}, // target node is decommissioned; retry + {codes.PermissionDenied, false}, // this node is decommissioned; abort + {codes.Unauthenticated, false}, // this node is not part of cluster; abort + } { + t.Run(fmt.Sprintf("mux=%t/%s", useMuxRangeFeed, spec.errorCode), + func(t *testing.T) { + clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) + g := makeGossip(t, stopper, rpcContext) + + desc := roachpb.RangeDescriptor{ + RangeID: 1, + Generation: 1, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + }, + } + for _, repl := range desc.InternalReplicas { + require.NoError(t, g.AddInfoProto( + gossip.MakeNodeIDKey(repl.NodeID), + newNodeDesc(repl.NodeID), + gossip.NodeDescriptorTTL, + )) + } + + ctrl := gomock.NewController(t) + transport := NewMockTransport(ctrl) + rangeDB := rangecachemock.NewMockRangeDescriptorDB(ctrl) + + // We start off with a cached lease on r1. + cachedLease := roachpb.Lease{ + Replica: desc.InternalReplicas[0], + Sequence: 1, + } + + // All nodes return the specified error code. We expect the range feed to + // keep trying all replicas in sequence regardless of error. + for _, repl := range desc.InternalReplicas { + transport.EXPECT().IsExhausted().Return(false) + transport.EXPECT().NextReplica().Return(repl) + transport.EXPECT().NextInternalClient(gomock.Any()).Return( + nil, grpcstatus.Error(spec.errorCode, "")) + } + transport.EXPECT().IsExhausted().Return(true) + transport.EXPECT().Release() + + // Once all replicas have failed, it should try to refresh the lease using + // the range cache. We let this succeed once. + rangeDB.EXPECT().FirstRange().Return(&desc, nil) + + // It then tries the replicas again. This time we just report the + // transport as exhausted immediately. + transport.EXPECT().IsExhausted().Return(true) + transport.EXPECT().Release() + + // This invalidates the cache yet again. This time we error. + rangeDB.EXPECT().FirstRange().Return(nil, grpcstatus.Error(spec.errorCode, "")) + + // If we expect a range lookup retry, allow the retry to succeed by + // returning a range descriptor and a client that immediately + // cancels the context and closes the range feed stream. + if spec.expectRetry { + rangeDB.EXPECT().FirstRange().Return(&desc, nil) + client := roachpbmock.NewMockInternalClient(ctrl) + + if useMuxRangeFeed { + stream := roachpbmock.NewMockInternal_MuxRangeFeedClient(ctrl) + stream.EXPECT().Send(gomock.Any()).Return(nil) + stream.EXPECT().Recv().Do(cancel).Return(nil, io.EOF) + client.EXPECT().MuxRangeFeed(gomock.Any()).Return(stream, nil) + } else { + stream := roachpbmock.NewMockInternal_RangeFeedClient(ctrl) + stream.EXPECT().Recv().Do(cancel).Return(nil, io.EOF) + client.EXPECT().RangeFeed(gomock.Any(), gomock.Any()).Return(stream, nil) + } + + transport.EXPECT().IsExhausted().Return(false) + transport.EXPECT().NextReplica().Return(desc.InternalReplicas[0]) + transport.EXPECT().NextInternalClient(gomock.Any()).Return(client, nil) + transport.EXPECT().Release() + } + + ds := NewDistSender(DistSenderConfig{ + AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), + Clock: clock, + NodeDescs: g, + RPCRetryOptions: &retry.Options{MaxRetries: 10}, + RPCContext: rpcContext, + TestingKnobs: ClientTestingKnobs{ + TransportFactory: func(SendOptions, *nodedialer.Dialer, ReplicaSlice) (Transport, error) { + return transport, nil + }, + }, + RangeDescriptorDB: rangeDB, + NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), + Settings: cluster.MakeTestingClusterSettings(), + }) + ds.rangeCache.Insert(ctx, roachpb.RangeInfo{ + Desc: desc, + Lease: cachedLease, + }) + + var opts []RangeFeedOption + if useMuxRangeFeed { + opts = append(opts, WithMuxRangeFeed()) + } + err := ds.RangeFeed(ctx, []roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}, hlc.Timestamp{}, + false, nil, opts...) + require.Error(t, err) + }) + } + } +} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go index f3fefa0c403c..1f796ace69c7 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go @@ -8,147 +8,352 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package kvcoord +package kvcoord_test import ( "context" - "io" "testing" "time" - "github.com/cockroachdb/cockroach/pkg/gossip" + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache/rangecachemock" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/roachpb/roachpbmock" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/retry" - "github.com/cockroachdb/cockroach/pkg/util/stop" - gomock "github.com/golang/mock/gomock" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" - "google.golang.org/grpc/codes" - grpcstatus "google.golang.org/grpc/status" + "google.golang.org/grpc" ) -// Tests that the range feed handles transport errors appropriately. In -// particular, that when encountering other decommissioned nodes it will refresh -// its range descriptor and retry, but if this node is decommissioned it will -// bail out. Regression test for: -// https://github.com/cockroachdb/cockroach/issues/66636 -func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) +type testRangefeedClient struct { + rpc.RestrictedInternalClient + muxRangeFeedEnabled bool + count func() +} - for _, spec := range []struct { - errorCode codes.Code - expectRetry bool - }{ - {codes.FailedPrecondition, true}, // target node is decommissioned; retry - {codes.PermissionDenied, false}, // this node is decommissioned; abort - {codes.Unauthenticated, false}, // this node is not part of cluster; abort - } { - t.Run(spec.errorCode.String(), func(t *testing.T) { - clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) - g := makeGossip(t, stopper, rpcContext) - - desc := roachpb.RangeDescriptor{ - RangeID: 1, - Generation: 1, - StartKey: roachpb.RKeyMin, - EndKey: roachpb.RKeyMax, - InternalReplicas: []roachpb.ReplicaDescriptor{ - {NodeID: 1, StoreID: 1, ReplicaID: 1}, - {NodeID: 2, StoreID: 2, ReplicaID: 2}, - }, - } - for _, repl := range desc.InternalReplicas { - require.NoError(t, g.AddInfoProto( - gossip.MakeNodeIDKey(repl.NodeID), - newNodeDesc(repl.NodeID), - gossip.NodeDescriptorTTL, - )) - } +func (c *testRangefeedClient) RangeFeed( + ctx context.Context, args *roachpb.RangeFeedRequest, opts ...grpc.CallOption, +) (roachpb.Internal_RangeFeedClient, error) { + defer c.count() - ctrl := gomock.NewController(t) - transport := NewMockTransport(ctrl) - rangeDB := rangecachemock.NewMockRangeDescriptorDB(ctrl) + if c.muxRangeFeedEnabled && ctx.Value(useMuxRangeFeedCtxKey{}) != nil { + panic(errors.AssertionFailedf("unexpected call to RangeFeed")) + } + return c.RestrictedInternalClient.RangeFeed(ctx, args, opts...) +} - // We start off with a cached lease on r1. - cachedLease := roachpb.Lease{ - Replica: desc.InternalReplicas[0], - Sequence: 1, - } +func (c *testRangefeedClient) MuxRangeFeed( + ctx context.Context, opts ...grpc.CallOption, +) (roachpb.Internal_MuxRangeFeedClient, error) { + defer c.count() + + if !c.muxRangeFeedEnabled { + panic(errors.AssertionFailedf("unexpected call to MuxRangeFeed")) + } + return c.RestrictedInternalClient.MuxRangeFeed(ctx, opts...) +} + +type internalClientCounts struct { + syncutil.Mutex + counts map[rpc.RestrictedInternalClient]int +} + +func (c *internalClientCounts) Inc(ic rpc.RestrictedInternalClient) { + if c == nil { + return + } + c.Lock() + defer c.Unlock() + c.counts[ic]++ +} + +type countConnectionsTransport struct { + wrapped kvcoord.Transport + counts *internalClientCounts + rfStreamEnabled bool +} + +func (c *countConnectionsTransport) IsExhausted() bool { + return c.wrapped.IsExhausted() +} + +func (c *countConnectionsTransport) SendNext( + ctx context.Context, request roachpb.BatchRequest, +) (*roachpb.BatchResponse, error) { + return c.wrapped.SendNext(ctx, request) +} + +type testFeedCtxKey struct{} +type useMuxRangeFeedCtxKey struct{} + +func (c *countConnectionsTransport) NextInternalClient( + ctx context.Context, +) (rpc.RestrictedInternalClient, error) { + client, err := c.wrapped.NextInternalClient(ctx) + if err != nil { + return nil, err + } + + tc := &testRangefeedClient{ + RestrictedInternalClient: client, + muxRangeFeedEnabled: c.rfStreamEnabled, + } + // Count rangefeed calls but only for feeds started by this test. + if ctx.Value(testFeedCtxKey{}) != nil { + tc.count = func() { + c.counts.Inc(tc) + } + } else { + tc.count = func() {} + } + + return tc, nil +} + +func (c *countConnectionsTransport) NextReplica() roachpb.ReplicaDescriptor { + return c.wrapped.NextReplica() +} + +func (c *countConnectionsTransport) SkipReplica() { + c.wrapped.SkipReplica() +} + +func (c *countConnectionsTransport) MoveToFront(descriptor roachpb.ReplicaDescriptor) { + c.wrapped.MoveToFront(descriptor) +} + +func (c *countConnectionsTransport) Release() { + c.wrapped.Release() +} + +var _ kvcoord.Transport = (*countConnectionsTransport)(nil) + +func makeTransportFactory( + rfStreamEnabled bool, counts *internalClientCounts, +) kvcoord.TransportFactory { + return func( + options kvcoord.SendOptions, + dialer *nodedialer.Dialer, + slice kvcoord.ReplicaSlice, + ) (kvcoord.Transport, error) { + transport, err := kvcoord.GRPCTransportFactory(options, dialer, slice) + if err != nil { + return nil, err + } + countingTransport := &countConnectionsTransport{ + wrapped: transport, + rfStreamEnabled: rfStreamEnabled, + counts: counts, + } + return countingTransport, nil + } +} + +// rangeFeed is a helper to execute rangefeed. We are not using rangefeed library +// here because of circular dependencies. +func rangeFeed( + dsI interface{}, + sp roachpb.Span, + startFrom hlc.Timestamp, + onValue func(event *roachpb.RangeFeedEvent), + useMuxRangeFeed bool, +) func() { + ds := dsI.(*kvcoord.DistSender) + events := make(chan *roachpb.RangeFeedEvent) + ctx, cancel := context.WithCancel(context.WithValue(context.Background(), testFeedCtxKey{}, struct{}{})) - // All nodes return the specified error code. We expect the range feed to - // keep trying all replicas in sequence regardless of error. - for _, repl := range desc.InternalReplicas { - transport.EXPECT().IsExhausted().Return(false) - transport.EXPECT().NextReplica().Return(repl) - transport.EXPECT().NextInternalClient(gomock.Any()).Return( - nil, grpcstatus.Error(spec.errorCode, "")) + g := ctxgroup.WithContext(ctx) + g.GoCtx(func(ctx context.Context) (err error) { + var opts []kvcoord.RangeFeedOption + if useMuxRangeFeed { + opts = append(opts, kvcoord.WithMuxRangeFeed()) + ctx = context.WithValue(ctx, useMuxRangeFeedCtxKey{}, struct{}{}) + } + return ds.RangeFeed(ctx, []roachpb.Span{sp}, startFrom, false, events, opts...) + }) + g.GoCtx(func(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case ev := <-events: + onValue(ev) } - transport.EXPECT().IsExhausted().Return(true) - transport.EXPECT().Release() - - // Once all replicas have failed, it should try to refresh the lease using - // the range cache. We let this succeed once. - rangeDB.EXPECT().FirstRange().Return(&desc, nil) - - // It then tries the replicas again. This time we just report the - // transport as exhausted immediately. - transport.EXPECT().IsExhausted().Return(true) - transport.EXPECT().Release() - - // This invalidates the cache yet again. This time we error. - rangeDB.EXPECT().FirstRange().Return(nil, grpcstatus.Error(spec.errorCode, "")) - - // If we expect a range lookup retry, allow the retry to succeed by - // returning a range descriptor and a client that immediately - // cancels the context and closes the range feed stream. - if spec.expectRetry { - rangeDB.EXPECT().FirstRange().Return(&desc, nil) - stream := roachpbmock.NewMockInternal_RangeFeedClient(ctrl) - stream.EXPECT().Recv().Do(cancel).Return(nil, io.EOF) - client := roachpbmock.NewMockInternalClient(ctrl) - client.EXPECT().RangeFeed(gomock.Any(), gomock.Any()).Return(stream, nil) - transport.EXPECT().IsExhausted().Return(false) - transport.EXPECT().NextReplica().Return(desc.InternalReplicas[0]) - transport.EXPECT().NextInternalClient(gomock.Any()).Return(client, nil) - transport.EXPECT().Release() + } + }) + + return func() { + cancel() + _ = g.Wait() + } +} + +// observeNValues returns on value handler which expects to see N rangefeed values, +// along with the channel which gets closed when requisite count of events has been seen. +func observeNValues(n int) (chan struct{}, func(ev *roachpb.RangeFeedEvent)) { + var count = struct { + syncutil.Mutex + c int + }{} + allSeen := make(chan struct{}) + return allSeen, func(ev *roachpb.RangeFeedEvent) { + if ev.Val != nil { + count.Lock() + defer count.Unlock() + count.c++ + log.Infof(context.Background(), "Waiting N values: saw %d, want %d; current=%s", count.c, n, ev.Val.Key) + if count.c == n { + close(allSeen) } + } + } +} - ds := NewDistSender(DistSenderConfig{ - AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), - Clock: clock, - NodeDescs: g, - RPCRetryOptions: &retry.Options{MaxRetries: 10}, - RPCContext: rpcContext, - TestingKnobs: ClientTestingKnobs{ - TransportFactory: func(SendOptions, *nodedialer.Dialer, ReplicaSlice) (Transport, error) { - return transport, nil +func channelWaitWithTimeout(t *testing.T, ch chan struct{}) { + t.Helper() + select { + case <-ch: + case <-time.After(30 * time.Second): + t.Fatal("timeout after 30 seconds") + } +} +func TestBiDirectionalRangefeedNotUsedUntilUpgradeFinalilzed(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + startServerAtVer := func(ver roachpb.Version) (*testcluster.TestCluster, func()) { + st := cluster.MakeTestingClusterSettingsWithVersions(ver, ver, true) + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, // Turn off replication queues. + + ServerArgs: base.TestServerArgs{ + Settings: st, + Knobs: base.TestingKnobs{ + KVClient: &kvcoord.ClientTestingKnobs{ + TransportFactory: makeTransportFactory(false, nil), + }, + + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + BinaryVersionOverride: ver, }, }, - RangeDescriptorDB: rangeDB, - NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), - Settings: cluster.MakeTestingClusterSettings(), - }) - ds.rangeCache.Insert(ctx, roachpb.RangeInfo{ - Desc: desc, - Lease: cachedLease, - }) - - err := ds.RangeFeed(ctx, []roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}, hlc.Timestamp{}, false, nil) - require.Error(t, err) + }, + }) + return tc, func() { tc.Stopper().Stop(ctx) } + } + + // Create a small table; run rangefeed. The transport factory we injected above verifies + // that we use the old rangefeed implementation. + runRangeFeed := func(tc *testcluster.TestCluster, opts ...kvcoord.RangeFeedOption) { + ts := tc.Server(0) + + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + startTime := ts.Clock().Now() + + sqlDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) + sqlDB.Exec(t, `CREATE TABLE foo (key INT PRIMARY KEY)`) + sqlDB.Exec(t, `INSERT INTO foo (key) SELECT * FROM generate_series(1, 1000)`) + + fooDesc := desctestutils.TestingGetPublicTableDescriptor( + ts.DB(), keys.SystemSQLCodec, "defaultdb", "foo") + fooSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec) + + allSeen, onValue := observeNValues(1000) + closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, onValue, false) + channelWaitWithTimeout(t, allSeen) + closeFeed() + } + + t.Run("rangefeed-stream-disabled-prior-to-version-upgrade", func(t *testing.T) { + noRfStreamVer := clusterversion.ByKey(clusterversion.RangefeedUseOneStreamPerNode - 1) + tc, cleanup := startServerAtVer(noRfStreamVer) + defer cleanup() + runRangeFeed(tc) + }) + + t.Run("rangefeed-stream-disabled-via-environment", func(t *testing.T) { + defer kvcoord.TestingSetEnableMuxRangeFeed(false)() + // Even though we could use rangefeed stream, it's disabled via kill switch. + rfStreamVer := clusterversion.ByKey(clusterversion.RangefeedUseOneStreamPerNode) + tc, cleanup := startServerAtVer(rfStreamVer) + defer cleanup() + runRangeFeed(tc, kvcoord.WithMuxRangeFeed()) + }) +} + +func TestMuxRangeFeedConnectsToNodeOnce(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + connCounts := &internalClientCounts{counts: make(map[rpc.RestrictedInternalClient]int)} + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, // Turn off replication queues. + + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + KVClient: &kvcoord.ClientTestingKnobs{ + TransportFactory: makeTransportFactory(true, connCounts), + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + ts := tc.Server(0) + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + startTime := ts.Clock().Now() + + // Create a table, and split it so that we have multiple ranges, distributed across + // test cluster nodes. + sqlDB.ExecMultiple(t, + `SET CLUSTER SETTING kv.rangefeed.enabled = true`, + `ALTER DATABASE defaultdb CONFIGURE ZONE USING num_replicas = 1`, + `CREATE TABLE foo (key INT PRIMARY KEY)`, + `INSERT INTO foo (key) SELECT * FROM generate_series(1, 1000)`, + `ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(100, 900, 100))`, + ) + + for i := 100; i <= 900; i += 100 { + storeID := 1 + i%3 + rowID := i + testutils.SucceedsSoon(t, func() error { + _, err := sqlDB.DB.ExecContext(context.Background(), + "ALTER TABLE foo EXPERIMENTAL_RELOCATE VALUES (ARRAY[$1], $2)", storeID, rowID) + return err }) } + + fooDesc := desctestutils.TestingGetPublicTableDescriptor( + ts.DB(), keys.SystemSQLCodec, "defaultdb", "foo") + fooSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec) + + allSeen, onValue := observeNValues(1000) + closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, onValue, true) + channelWaitWithTimeout(t, allSeen) + closeFeed() + + // Verify we connected to each node once. + connCounts.Lock() + defer connCounts.Unlock() + for _, c := range connCounts.counts { + require.Equal(t, 1, c) + } } diff --git a/pkg/kv/kvclient/kvcoord/send_test.go b/pkg/kv/kvclient/kvcoord/send_test.go index 87d392e399cc..5ac9d68768be 100644 --- a/pkg/kv/kvclient/kvcoord/send_test.go +++ b/pkg/kv/kvclient/kvcoord/send_test.go @@ -57,6 +57,10 @@ func (n Node) RangeFeed(_ *roachpb.RangeFeedRequest, _ roachpb.Internal_RangeFee panic("unimplemented") } +func (n Node) MuxRangeFeed(server roachpb.Internal_MuxRangeFeedServer) error { + panic("unimplemented") +} + func (n Node) GossipSubscription( _ *roachpb.GossipSubscriptionRequest, _ roachpb.Internal_GossipSubscriptionServer, ) error { diff --git a/pkg/kv/kvclient/kvcoord/transport_test.go b/pkg/kv/kvclient/kvcoord/transport_test.go index 7c5e07a899ba..ec3610569db2 100644 --- a/pkg/kv/kvclient/kvcoord/transport_test.go +++ b/pkg/kv/kvclient/kvcoord/transport_test.go @@ -187,6 +187,12 @@ func (m *mockInternalClient) RangeFeed( return nil, fmt.Errorf("unsupported RangeFeed call") } +func (m *mockInternalClient) MuxRangeFeed( + ctx context.Context, opts ...grpc.CallOption, +) (roachpb.Internal_MuxRangeFeedClient, error) { + return nil, fmt.Errorf("unsupported MuxRangeFeed call") +} + // GossipSubscription is part of the roachpb.InternalClient interface. func (m *mockInternalClient) GossipSubscription( ctx context.Context, args *roachpb.GossipSubscriptionRequest, _ ...grpc.CallOption, diff --git a/pkg/kv/kvserver/kvserverbase/base.go b/pkg/kv/kvserver/kvserverbase/base.go index 68da6429a4f8..cf2a33c3d466 100644 --- a/pkg/kv/kvserver/kvserverbase/base.go +++ b/pkg/kv/kvserver/kvserverbase/base.go @@ -126,7 +126,7 @@ type ReplicaResponseFilter func(context.Context, roachpb.BatchRequest, *roachpb. // ReplicaRangefeedFilter is used in unit tests to modify the request, inject // responses, or return errors from rangefeeds. type ReplicaRangefeedFilter func( - args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) *roachpb.Error // ContainsKey returns whether this range contains the specified key. diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index c3b62db37ef2..b1955eb52f2f 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -63,7 +63,7 @@ var RangeFeedRefreshInterval = settings.RegisterDurationSetting( // support for concurrent calls to Send. Note that the default implementation of // grpc.Stream is not safe for concurrent calls to Send. type lockedRangefeedStream struct { - wrapped rangefeed.Stream + wrapped roachpb.RangeFeedEventSink sendMu syncutil.Mutex } @@ -134,13 +134,13 @@ func (tp *rangefeedTxnPusher) ResolveIntents( // complete. The provided ConcurrentRequestLimiter is used to limit the number // of rangefeeds using catch-up iterators at the same time. func (r *Replica) RangeFeed( - args *roachpb.RangeFeedRequest, stream rangefeed.Stream, + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) *roachpb.Error { return r.rangeFeedWithRangeID(r.RangeID, args, stream) } func (r *Replica) rangeFeedWithRangeID( - _forStacks roachpb.RangeID, args *roachpb.RangeFeedRequest, stream rangefeed.Stream, + _forStacks roachpb.RangeID, args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) *roachpb.Error { if !r.isRangefeedEnabled() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) { return roachpb.NewErrorf("rangefeeds require the kv.rangefeed.enabled setting. See %s", diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index d979070a1b66..147226b1c525 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -3080,7 +3080,7 @@ func (s *Store) Descriptor(ctx context.Context, useCached bool) (*roachpb.StoreD // the provided stream and returns with an optional error when the rangefeed is // complete. func (s *Store) RangeFeed( - args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) *roachpb.Error { if filter := s.TestingKnobs().TestingRangefeedFilter; filter != nil { diff --git a/pkg/kv/kvserver/stores.go b/pkg/kv/kvserver/stores.go index 22e74cd48874..4b65fe75a4d1 100644 --- a/pkg/kv/kvserver/stores.go +++ b/pkg/kv/kvserver/stores.go @@ -228,7 +228,7 @@ func (ls *Stores) SendWithWriteBytes( // the provided stream and returns with an optional error when the rangefeed is // complete. func (ls *Stores) RangeFeed( - args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) *roachpb.Error { ctx := stream.Context() if args.RangeID == 0 { diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 8bc9e4a47b69..b54401e44213 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -11,6 +11,7 @@ package roachpb import ( + "context" "fmt" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" @@ -23,7 +24,7 @@ import ( "github.com/dustin/go-humanize" ) -//go:generate mockgen -package=roachpbmock -destination=roachpbmock/mocks_generated.go . InternalClient,Internal_RangeFeedClient +//go:generate mockgen -package=roachpbmock -destination=roachpbmock/mocks_generated.go . InternalClient,Internal_RangeFeedClient,Internal_MuxRangeFeedClient // UserPriority is a custom type for transaction's user priority. type UserPriority float64 @@ -1806,3 +1807,19 @@ const ( // with the SpecificTenantOverrides precedence.. AllTenantsOverrides ) + +// RangeFeedEventSink is an interface for sending a single rangefeed event. +// TODO: Remove once 22.2 is released. +type RangeFeedEventSink interface { + Context() context.Context + Send(*RangeFeedEvent) error +} + +// RangeFeedEventProducer is an adapter for receiving rangefeed events with either +// the legacy RangeFeed RPC, or the MuxRangeFeed RPC. +// TODO: Remove once 23.1 is released. +type RangeFeedEventProducer interface { + // Recv receives the next rangefeed event. an io.EOF error indicates that the + // range needs to be restarted. + Recv() (*RangeFeedEvent, error) +} diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index 90a9c8e67380..7a1ba58cd744 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -2681,6 +2681,9 @@ message RangeFeedRequest { // AdmissionHeader is used only at the start of the range feed stream, since // the initial catch-up scan be expensive. AdmissionHeader admission_header = 4 [(gogoproto.nullable) = false]; + + // StreamID is set by the client issuing MuxRangeFeed requests. + int64 stream_id = 5 [(gogoproto.customname) = "StreamID"]; } // RangeFeedValue is a variant of RangeFeedEvent that represents an update to @@ -2754,6 +2757,14 @@ message RangeFeedEvent { RangeFeedDeleteRange delete_range = 5; } +// MuxRangeFeedEvent is a response generated by MuxRangeFeed RPC. It tags +// the underlying RangeFeedEvent with the ID of the range that produced this event. +message MuxRangeFeedEvent { + RangeFeedEvent event = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; + int32 range_id = 2 [(gogoproto.customname) = "RangeID", (gogoproto.casttype) = "RangeID"]; + // Server echoes back stream_id set by the client. + int64 stream_id = 3 [(gogoproto.customname) = "StreamID"]; +} // ResetQuorumRequest makes a range that is unavailable due to lost quorum // available again, at the cost of losing all of the data in the range. Any @@ -2963,7 +2974,10 @@ message JoinNodeResponse { service Internal { rpc Batch (BatchRequest) returns (BatchResponse) {} rpc RangeLookup (RangeLookupRequest) returns (RangeLookupResponse) {} + // RangeFeed RPC is deprecated. Use MuxRangeFeed instead. + // To be removed at version 23.1 rpc RangeFeed (RangeFeedRequest) returns (stream RangeFeedEvent) {} + rpc MuxRangeFeed (stream RangeFeedRequest) returns (stream MuxRangeFeedEvent) {} rpc GossipSubscription (GossipSubscriptionRequest) returns (stream GossipSubscriptionEvent) {} rpc ResetQuorum (ResetQuorumRequest) returns (ResetQuorumResponse) {} diff --git a/pkg/roachpb/roachpbmock/BUILD.bazel b/pkg/roachpb/roachpbmock/BUILD.bazel index c50e10c59fe3..9c9d0d0f4f11 100644 --- a/pkg/roachpb/roachpbmock/BUILD.bazel +++ b/pkg/roachpb/roachpbmock/BUILD.bazel @@ -8,6 +8,7 @@ gomock( interfaces = [ "InternalClient", "Internal_RangeFeedClient", + "Internal_MuxRangeFeedClient", ], library = "//pkg/roachpb", package = "roachpbmock", diff --git a/pkg/roachpb/roachpbmock/mocks_generated.go b/pkg/roachpb/roachpbmock/mocks_generated.go index a053cca023de..377c7b7b0fcb 100644 --- a/pkg/roachpb/roachpbmock/mocks_generated.go +++ b/pkg/roachpb/roachpbmock/mocks_generated.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/cockroachdb/cockroach/pkg/roachpb (interfaces: InternalClient,Internal_RangeFeedClient) +// Source: github.com/cockroachdb/cockroach/pkg/roachpb (interfaces: InternalClient,Internal_RangeFeedClient,Internal_MuxRangeFeedClient) // Package roachpbmock is a generated GoMock package. package roachpbmock @@ -137,6 +137,26 @@ func (mr *MockInternalClientMockRecorder) Join(arg0, arg1 interface{}, arg2 ...i return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Join", reflect.TypeOf((*MockInternalClient)(nil).Join), varargs...) } +// MuxRangeFeed mocks base method. +func (m *MockInternalClient) MuxRangeFeed(arg0 context.Context, arg1 ...grpc.CallOption) (roachpb.Internal_MuxRangeFeedClient, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "MuxRangeFeed", varargs...) + ret0, _ := ret[0].(roachpb.Internal_MuxRangeFeedClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// MuxRangeFeed indicates an expected call of MuxRangeFeed. +func (mr *MockInternalClientMockRecorder) MuxRangeFeed(arg0 interface{}, arg1 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MuxRangeFeed", reflect.TypeOf((*MockInternalClient)(nil).MuxRangeFeed), varargs...) +} + // RangeFeed mocks base method. func (m *MockInternalClient) RangeFeed(arg0 context.Context, arg1 *roachpb.RangeFeedRequest, arg2 ...grpc.CallOption) (roachpb.Internal_RangeFeedClient, error) { m.ctrl.T.Helper() @@ -379,3 +399,140 @@ func (mr *MockInternal_RangeFeedClientMockRecorder) Trailer() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockInternal_RangeFeedClient)(nil).Trailer)) } + +// MockInternal_MuxRangeFeedClient is a mock of Internal_MuxRangeFeedClient interface. +type MockInternal_MuxRangeFeedClient struct { + ctrl *gomock.Controller + recorder *MockInternal_MuxRangeFeedClientMockRecorder +} + +// MockInternal_MuxRangeFeedClientMockRecorder is the mock recorder for MockInternal_MuxRangeFeedClient. +type MockInternal_MuxRangeFeedClientMockRecorder struct { + mock *MockInternal_MuxRangeFeedClient +} + +// NewMockInternal_MuxRangeFeedClient creates a new mock instance. +func NewMockInternal_MuxRangeFeedClient(ctrl *gomock.Controller) *MockInternal_MuxRangeFeedClient { + mock := &MockInternal_MuxRangeFeedClient{ctrl: ctrl} + mock.recorder = &MockInternal_MuxRangeFeedClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockInternal_MuxRangeFeedClient) EXPECT() *MockInternal_MuxRangeFeedClientMockRecorder { + return m.recorder +} + +// CloseSend mocks base method. +func (m *MockInternal_MuxRangeFeedClient) CloseSend() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CloseSend") + ret0, _ := ret[0].(error) + return ret0 +} + +// CloseSend indicates an expected call of CloseSend. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) CloseSend() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseSend", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).CloseSend)) +} + +// Context mocks base method. +func (m *MockInternal_MuxRangeFeedClient) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).Context)) +} + +// Header mocks base method. +func (m *MockInternal_MuxRangeFeedClient) Header() (metadata.MD, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Header") + ret0, _ := ret[0].(metadata.MD) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Header indicates an expected call of Header. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) Header() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Header", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).Header)) +} + +// Recv mocks base method. +func (m *MockInternal_MuxRangeFeedClient) Recv() (*roachpb.MuxRangeFeedEvent, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Recv") + ret0, _ := ret[0].(*roachpb.MuxRangeFeedEvent) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Recv indicates an expected call of Recv. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) Recv() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).Recv)) +} + +// RecvMsg mocks base method. +func (m *MockInternal_MuxRangeFeedClient) RecvMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RecvMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecvMsg indicates an expected call of RecvMsg. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) RecvMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).RecvMsg), arg0) +} + +// Send mocks base method. +func (m *MockInternal_MuxRangeFeedClient) Send(arg0 *roachpb.RangeFeedRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Send", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Send indicates an expected call of Send. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) Send(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).Send), arg0) +} + +// SendMsg mocks base method. +func (m *MockInternal_MuxRangeFeedClient) SendMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendMsg indicates an expected call of SendMsg. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) SendMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).SendMsg), arg0) +} + +// Trailer mocks base method. +func (m *MockInternal_MuxRangeFeedClient) Trailer() metadata.MD { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Trailer") + ret0, _ := ret[0].(metadata.MD) + return ret0 +} + +// Trailer indicates an expected call of Trailer. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) Trailer() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).Trailer)) +} diff --git a/pkg/rpc/auth_tenant.go b/pkg/rpc/auth_tenant.go index 0f49cf9156e7..6d1e171f400e 100644 --- a/pkg/rpc/auth_tenant.go +++ b/pkg/rpc/auth_tenant.go @@ -53,9 +53,8 @@ func (a tenantAuthorizer) authorize( case "/cockroach.roachpb.Internal/RangeLookup": return a.authRangeLookup(tenID, req.(*roachpb.RangeLookupRequest)) - case "/cockroach.roachpb.Internal/RangeFeed": + case "/cockroach.roachpb.Internal/RangeFeed", "/cockroach.roachpb.Internal/MuxRangeFeed": return a.authRangeFeed(tenID, req.(*roachpb.RangeFeedRequest)) - case "/cockroach.roachpb.Internal/GossipSubscription": return a.authGossipSubscription(tenID, req.(*roachpb.GossipSubscriptionRequest)) diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 26afd43e82b4..5588348fbe31 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -900,6 +900,20 @@ var rangefeedStreamInfo = &grpc.StreamServerInfo{ IsServerStream: true, } +var muxRangeFeedDesc = &grpc.StreamDesc{ + StreamName: "MuxRangeFeed", + ServerStreams: true, + ClientStreams: true, +} + +const muxRangefeedMethodName = "/cockroach.roachpb.Internal/MuxRangeFeed" + +var muxRangefeedStreamInfo = &grpc.StreamServerInfo{ + FullMethod: muxRangefeedMethodName, + IsClientStream: true, + IsServerStream: true, +} + // RangeFeed implements the RestrictedInternalClient interface. func (a internalClientAdapter) RangeFeed( ctx context.Context, args *roachpb.RangeFeedRequest, opts ...grpc.CallOption, @@ -924,7 +938,7 @@ func (a internalClientAdapter) RangeFeed( // -> grpc.ClientStream implementations provided by client-side interceptors // -> rangeFeedClientAdapter // -> RPC caller - rfPipe := newRangeFeedPipe(grpcutil.NewLocalRequestContext(ctx)) + rfPipe := newRangeFeedPipe(grpcutil.NewLocalRequestContext(ctx), receiveRangeFeedEvent) // Mark this request as originating locally. args.AdmissionHeader.SourceLocation = roachpb.AdmissionHeader_LOCAL @@ -992,12 +1006,125 @@ func (x rangeFeedClientAdapter) Recv() (*roachpb.RangeFeedEvent, error) { return m, nil } +// MuxRangeFeed implements the RestrictedInternalClient interface. +func (a internalClientAdapter) MuxRangeFeed( + ctx context.Context, opts ...grpc.CallOption, +) (roachpb.Internal_MuxRangeFeedClient, error) { + ctx, cancel := context.WithCancel(grpcutil.NewLocalRequestContext(ctx)) + + // MuxRangeFeed consists of two streams: the client stream and the server + // stream. Those two streams are stitched together by the go routine below + // which sends client messages to the server channel, and server responses to + // the client. + requestPipe := newRangeFeedPipe(ctx, receiveRangeFeedRequest) + responsePipe := newRangeFeedPipe(ctx, receiveMuxRangeFeedEvent) + + go func() { + defer cancel() + + // Handler adapts the ServerStream to the typed interface expected by the + // RPC handler (Node.RangeFeed). `stream` might be `rfPipe` which we + // pass to the interceptor chain below, or it might be another + // implementation of `ServerStream` that wraps it; in practice it will be + // tracing.grpcinterceptor.StreamServerInterceptor. + adapter := muxRangeFeedServerAdapter{ + localStream: localStream{ctx: ctx}, + pipeReader: requestPipe, + pipeWriter: responsePipe, + } + handler := func(srv interface{}, stream grpc.ServerStream) error { + return a.server.MuxRangeFeed(adapter) + } + // Run the server interceptors, which will bottom out by running `handler` + // (defined just above), which runs Node.MuxRangeFeed (our RPC handler). + // This call is blocking. + err := a.serverStreamInterceptors.run(a.server, requestPipe, muxRangefeedStreamInfo, handler) + if err == nil { + err = io.EOF + } + requestPipe.errC <- err + }() + + // Run the client-side interceptors, which produce a gprc.ClientStream. + // clientStream might end up being rfPipe, or it might end up being another + // grpc.ClientStream implementation that wraps it. + // + // NOTE: For actual RPCs, going to a remote note, there's a tracing client + // interceptor producing a tracing.grpcinterceptor.tracingClientStream + // implementation of ClientStream. That client interceptor does not run for + // these local requests handled by the internalClientAdapter (as opposed to + // the tracing server interceptor, which does run). + clientStream, err := a.clientStreamInterceptors.run(ctx, muxRangeFeedDesc, nil /* ClientConn */, muxRangefeedMethodName, + // This function runs at the bottom of the client interceptor stack, + // pretending to actually make an RPC call. We don't make any calls, but + // return the pipe on which messages from the server will come. + func( + ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, opts ...grpc.CallOption, + ) (grpc.ClientStream, error) { + return requestPipe, nil + }, + opts...) + if err != nil { + return nil, err + } + + return muxRangeFeedClientAdapter{ + localStream: localStream{ctx: ctx}, + pipeReader: responsePipe, + pipeWriter: clientStream, + }, nil +} + +type muxRangeFeedClientAdapter struct { + localStream + pipeWriter + pipeReader +} + +func (a muxRangeFeedClientAdapter) Send(request *roachpb.RangeFeedRequest) error { + // Mark this request as originating locally. + request.AdmissionHeader.SourceLocation = roachpb.AdmissionHeader_LOCAL + return a.pipeWriter.SendMsg(request) +} + +func (a muxRangeFeedClientAdapter) Recv() (*roachpb.MuxRangeFeedEvent, error) { + m := new(roachpb.MuxRangeFeedEvent) + if err := a.pipeReader.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +var _ roachpb.Internal_MuxRangeFeedClient = muxRangeFeedClientAdapter{} + +type muxRangeFeedServerAdapter struct { + localStream + pipeWriter + pipeReader +} + +var _ roachpb.Internal_MuxRangeFeedServer = muxRangeFeedServerAdapter{} + +func (a muxRangeFeedServerAdapter) Send(event *roachpb.MuxRangeFeedEvent) error { + return a.pipeWriter.SendMsg(event) +} + +func (a muxRangeFeedServerAdapter) Recv() (*roachpb.RangeFeedRequest, error) { + m := new(roachpb.RangeFeedRequest) + if err := a.pipeReader.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // rangeFeedPipe is a (uni-directional) pipe of *RangeFeedEvent that implements // the grpc.ClientStream and grpc.ServerStream interfaces. type rangeFeedPipe struct { - ctx context.Context - respC chan interface{} - errC chan error + localStream + respC chan interface{} + errC chan error + doneC <-chan struct{} + receiver func(interface{}, interface{}) } var _ grpc.ClientStream = &rangeFeedPipe{} @@ -1007,35 +1134,52 @@ var _ grpc.ServerStream = &rangeFeedPipe{} // for convenience, because it's used as a grpc.ClientStream and // grpc.ServerStream, and these interfaces are implemented on the pointer // receiver. -func newRangeFeedPipe(ctx context.Context) *rangeFeedPipe { +func newRangeFeedPipe(ctx context.Context, receiver rangeFeedEventReciever) *rangeFeedPipe { return &rangeFeedPipe{ - ctx: ctx, - respC: make(chan interface{}, 128), - errC: make(chan error, 1), + localStream: localStream{ctx: ctx}, + respC: make(chan interface{}, 128), + errC: make(chan error, 1), + doneC: ctx.Done(), + receiver: receiver, } } +// localStream implements grpc.ClientStream, grpc.ServerStream and grpc.Stream +// interfaces for the purpose of intercepting RCP requests to local node. +// This type simply provides access to the context (as required by grpc.Stream) interface +// while all other methods panic. Type expected to be embedded insdie other types. +type localStream struct { + ctx context.Context +} + // grpc.ClientStream methods. -func (*rangeFeedPipe) Header() (metadata.MD, error) { panic("unimplemented") } -func (*rangeFeedPipe) Trailer() metadata.MD { panic("unimplemented") } -func (*rangeFeedPipe) CloseSend() error { panic("unimplemented") } +func (localStream) Header() (metadata.MD, error) { panic("unimplemented") } +func (localStream) Trailer() metadata.MD { panic("unimplemented") } +func (localStream) CloseSend() error { panic("unimplemented") } // grpc.ServerStream methods. -func (*rangeFeedPipe) SetHeader(metadata.MD) error { panic("unimplemented") } -func (*rangeFeedPipe) SendHeader(metadata.MD) error { panic("unimplemented") } -func (*rangeFeedPipe) SetTrailer(metadata.MD) { panic("unimplemented") } +func (localStream) SetHeader(metadata.MD) error { panic("unimplemented") } +func (localStream) SendHeader(metadata.MD) error { panic("unimplemented") } +func (localStream) SetTrailer(metadata.MD) { panic("unimplemented") } // Common grpc.{Client,Server}Stream methods. -func (p *rangeFeedPipe) Context() context.Context { return p.ctx } +func (s localStream) Context() context.Context { return s.ctx } + +type pipeWriter interface { + SendMsg(m interface{}) error +} + +type pipeReader interface { + RecvMsg(m interface{}) error +} // SendMsg is part of the grpc.ServerStream interface. It is also part of the -// grpc.ClientStream interface but, in the case of the RangeFeed RPC (which is -// only server-streaming, not bi-directional), only the server sends. +// grpc.ClientStream interface. func (p *rangeFeedPipe) SendMsg(m interface{}) error { select { case p.respC <- m: return nil - case <-p.ctx.Done(): + case <-p.doneC: return p.ctx.Err() } } @@ -1045,12 +1189,11 @@ func (p *rangeFeedPipe) SendMsg(m interface{}) error { // (which is only server-streaming, not bi-directional), only the client // receives. func (p *rangeFeedPipe) RecvMsg(m interface{}) error { - out := m.(*roachpb.RangeFeedEvent) msg, err := p.recvInternal() if err != nil { return err } - *out = *msg.(*roachpb.RangeFeedEvent) + p.receiver(m, msg) return nil } @@ -1070,9 +1213,30 @@ func (p *rangeFeedPipe) recvInternal() (interface{}, error) { default: return nil, err } + case <-p.doneC: + return nil, p.ctx.Err() } } +// rangeFeedEventReceiver is an adopter function to convert from generic +// interface type the specific message type. +type rangeFeedEventReciever func(interface{}, interface{}) + +// receiveRangeFeedEvent is roachpb.RangeFeedEvent receiver. +func receiveRangeFeedEvent(dst interface{}, src interface{}) { + *dst.(*roachpb.RangeFeedEvent) = *src.(*roachpb.RangeFeedEvent) +} + +// receiveRangeFeedEvent is roachpb.MuxRangeFeedEvent receiver. +func receiveMuxRangeFeedEvent(dst interface{}, src interface{}) { + *dst.(*roachpb.MuxRangeFeedEvent) = *src.(*roachpb.MuxRangeFeedEvent) +} + +// receiveRangeFeedRequest is a roachpb.RangeFeedRequest receiver. +func receiveRangeFeedRequest(dst interface{}, src interface{}) { + *dst.(*roachpb.RangeFeedRequest) = *src.(*roachpb.RangeFeedRequest) +} + // rangeFeedServerAdapter adapts an untyped ServerStream to the typed // roachpb.Internal_RangeFeedServer interface, expected by the RangeFeed RPC // handler. diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index 32fd988ad609..58a9102040e3 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -324,6 +324,11 @@ type internalServer struct { serverStream roachpb.Internal_RangeFeedServer } +func (s *internalServer) MuxRangeFeed(server roachpb.Internal_MuxRangeFeedServer) error { + //TODO implement me + panic("implement me") +} + func (*internalServer) Batch( context.Context, *roachpb.BatchRequest, ) (*roachpb.BatchResponse, error) { diff --git a/pkg/rpc/nodedialer/nodedialer_test.go b/pkg/rpc/nodedialer/nodedialer_test.go index 6ab800c44b89..3d5756dd205c 100644 --- a/pkg/rpc/nodedialer/nodedialer_test.go +++ b/pkg/rpc/nodedialer/nodedialer_test.go @@ -571,6 +571,10 @@ func (*internalServer) RangeFeed( panic("unimplemented") } +func (s *internalServer) MuxRangeFeed(server roachpb.Internal_MuxRangeFeedServer) error { + panic("implement me") +} + func (*internalServer) GossipSubscription( *roachpb.GossipSubscriptionRequest, roachpb.Internal_GossipSubscriptionServer, ) error { diff --git a/pkg/rpc/restricted_internal_client.go b/pkg/rpc/restricted_internal_client.go index 1f7b480c3eae..808067578c4b 100644 --- a/pkg/rpc/restricted_internal_client.go +++ b/pkg/rpc/restricted_internal_client.go @@ -24,4 +24,5 @@ import ( type RestrictedInternalClient interface { Batch(ctx context.Context, in *roachpb.BatchRequest, opts ...grpc.CallOption) (*roachpb.BatchResponse, error) RangeFeed(ctx context.Context, in *roachpb.RangeFeedRequest, opts ...grpc.CallOption) (roachpb.Internal_RangeFeedClient, error) + MuxRangeFeed(ctx context.Context, opts ...grpc.CallOption) (roachpb.Internal_MuxRangeFeedClient, error) } diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 898a13950e34..d550bc726c96 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -227,6 +227,7 @@ go_library( "//pkg/util/admission/admissionpb", "//pkg/util/buildutil", "//pkg/util/contextutil", + "//pkg/util/ctxgroup", "//pkg/util/envutil", "//pkg/util/errorutil", "//pkg/util/goschedstats", diff --git a/pkg/server/node.go b/pkg/server/node.go index 88ef7b5abbec..8ea9c3aa6f4f 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -46,6 +46,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -1236,6 +1237,12 @@ func (n *Node) RangeLookup( // RangeFeed implements the roachpb.InternalServer interface. func (n *Node) RangeFeed( args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, +) error { + return n.singleRangeFeed(args, stream) +} + +func (n *Node) singleRangeFeed( + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) error { pErr := n.stores.RangeFeed(args, stream) if pErr != nil { @@ -1248,6 +1255,62 @@ func (n *Node) RangeFeed( return nil } +// setRangeIDEventSink annotates each response with range and stream IDs. +// This is used by MuxRangeFeed. +// TODO: This code can be removed in 22.2 once MuxRangeFeed is the default, and +// the old style RangeFeed deprecated. +type setRangeIDEventSink struct { + ctx context.Context + rangeID roachpb.RangeID + streamID int64 + wrapped roachpb.Internal_MuxRangeFeedServer +} + +func (s *setRangeIDEventSink) Context() context.Context { + return s.ctx +} + +func (s *setRangeIDEventSink) Send(event *roachpb.RangeFeedEvent) error { + response := &roachpb.MuxRangeFeedEvent{ + RangeFeedEvent: *event, + RangeID: s.rangeID, + StreamID: s.streamID, + } + return s.wrapped.Send(response) +} + +var _ roachpb.RangeFeedEventSink = (*setRangeIDEventSink)(nil) + +func (n *Node) asyncRangeFeed( + args roachpb.RangeFeedRequest, stream roachpb.Internal_MuxRangeFeedServer, +) func(ctx context.Context) error { + return func(ctx context.Context) error { + sink := setRangeIDEventSink{ + ctx: ctx, + rangeID: args.RangeID, + streamID: args.StreamID, + wrapped: stream, + } + return n.singleRangeFeed(&args, &sink) + } +} + +// MuxRangeFeed implements the roachpb.InternalServer interface. +func (n *Node) MuxRangeFeed(stream roachpb.Internal_MuxRangeFeedServer) error { + ctx, cancelFeeds := n.stopper.WithCancelOnQuiesce(stream.Context()) + defer cancelFeeds() + rfGrp := ctxgroup.WithContext(ctx) + + for { + req, err := stream.Recv() + if err != nil { + cancelFeeds() + return errors.CombineErrors(err, rfGrp.Wait()) + } + rfGrp.GoCtx(n.asyncRangeFeed(*req, stream)) + } +} + // ResetQuorum implements the roachpb.InternalServer interface. func (n *Node) ResetQuorum( ctx context.Context, req *roachpb.ResetQuorumRequest,