diff --git a/build/bazelutil/check.sh b/build/bazelutil/check.sh index 49cd804695dd..886986eeb7fc 100755 --- a/build/bazelutil/check.sh +++ b/build/bazelutil/check.sh @@ -22,7 +22,7 @@ pkg/kv/kvclient/rangecache/range_cache.go://go:generate mockgen -package=rangeca pkg/kv/kvclient/rangefeed/rangefeed.go://go:generate mockgen -destination=mocks_generated_test.go --package=rangefeed . DB pkg/kv/kvserver/concurrency/lock_table.go://go:generate ../../../util/interval/generic/gen.sh *lockState concurrency pkg/kv/kvserver/spanlatch/manager.go://go:generate ../../../util/interval/generic/gen.sh *latch spanlatch -pkg/roachpb/api.go://go:generate mockgen -package=roachpbmock -destination=roachpbmock/mocks_generated.go . InternalClient,Internal_RangeFeedClient +pkg/roachpb/api.go://go:generate mockgen -package=roachpbmock -destination=roachpbmock/mocks_generated.go . InternalClient,Internal_RangeFeedClient,Internal_MuxRangeFeedClient pkg/roachpb/batch.go://go:generate go run gen/main.go --filename batch_generated.go *.pb.go pkg/security/certmgr/cert.go://go:generate mockgen -package=certmgr -destination=mocks_generated_test.go . Cert pkg/security/securitytest/securitytest.go://go:generate go-bindata -mode 0600 -modtime 1400000000 -pkg securitytest -o embedded.go -ignore README.md -ignore regenerate.sh test_certs diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 69fe43009f8c..e407aec68a8a 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -290,4 +290,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https:///#/debug/tracez trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 22.1-58 set the active cluster version in the format '.' +version version 22.1-60 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 9aefdc75ff19..0e6f4205ea66 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -221,6 +221,6 @@ trace.opentelemetry.collectorstringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.span_registry.enabledbooleantrueif set, ongoing traces can be seen at https:///#/debug/tracez trace.zipkin.collectorstringthe address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -versionversion22.1-58set the active cluster version in the format '.' +versionversion22.1-60set the active cluster version in the format '.' diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go index f99318e85075..1cbb341a5805 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go @@ -392,6 +392,7 @@ func (f rawEventFeed) run( spans []kvcoord.SpanTimePair, withDiff bool, eventC chan<- kvcoord.RangeFeedMessage, + opts ...kvcoord.RangeFeedOption, ) error { var startAfter hlc.Timestamp for _, s := range spans { diff --git a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go index 0ba41d5b7a5c..9b5daab3eef7 100644 --- a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go @@ -38,6 +38,7 @@ type rangefeedFactory func( spans []kvcoord.SpanTimePair, withDiff bool, eventC chan<- kvcoord.RangeFeedMessage, + opts ...kvcoord.RangeFeedOption, ) error type rangefeed struct { diff --git a/pkg/ccl/kvccl/kvtenantccl/connector_test.go b/pkg/ccl/kvccl/kvtenantccl/connector_test.go index 672bd70e920b..4212f1d44a19 100644 --- a/pkg/ccl/kvccl/kvtenantccl/connector_test.go +++ b/pkg/ccl/kvccl/kvtenantccl/connector_test.go @@ -90,6 +90,10 @@ func (*mockServer) RangeFeed(*roachpb.RangeFeedRequest, roachpb.Internal_RangeFe panic("unimplemented") } +func (m *mockServer) MuxRangeFeed(server roachpb.Internal_MuxRangeFeedServer) error { + panic("implement me") +} + func (*mockServer) Join( context.Context, *roachpb.JoinNodeRequest, ) (*roachpb.JoinNodeResponse, error) { diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 7bafd825f66d..a977e5800d59 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -316,6 +316,8 @@ const ( // WaitedForDelRangeInGCJob corresponds to the migration which waits for // the GC jobs to adopt the use of DelRange with tombstones. WaitedForDelRangeInGCJob + // RangefeedUseOneStreamPerNode changes rangefeed implementation to use 1 RPC stream per node. + RangefeedUseOneStreamPerNode // ************************************************* // Step (1): Add new versions here. @@ -530,6 +532,11 @@ var versionsSingleton = keyedVersions{ Key: WaitedForDelRangeInGCJob, Version: roachpb.Version{Major: 22, Minor: 1, Internal: 58}, }, + { + Key: RangefeedUseOneStreamPerNode, + Version: roachpb.Version{Major: 22, Minor: 1, Internal: 60}, + }, + // ************************************************* // Step (2): Add new versions here. // Do not add new versions to a patch release. diff --git a/pkg/clusterversion/key_string.go b/pkg/clusterversion/key_string.go index 0ca285cfc113..01cd9fb62302 100644 --- a/pkg/clusterversion/key_string.go +++ b/pkg/clusterversion/key_string.go @@ -52,11 +52,12 @@ func _() { _ = x[SetRoleOptionsUserIDColumnNotNull-41] _ = x[UseDelRangeInGCJob-42] _ = x[WaitedForDelRangeInGCJob-43] + _ = x[RangefeedUseOneStreamPerNode-44] } -const _Key_name = "V21_2Start22_1ProbeRequestPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreEnablePebbleFormatVersionBlockPropertiesEnableLeaseHolderRemovalChangefeedIdlenessEnableNewStoreRebalancerClusterLocksVirtualTableAutoStatsTableSettingsEnableNewChangefeedOptionsV22_1Start22_2LocalTimestampsPebbleFormatSplitUserKeysMarkedCompactedEnsurePebbleFormatVersionRangeKeysEnablePebbleFormatVersionRangeKeysTrigramInvertedIndexesRemoveGrantPrivilegeMVCCRangeTombstonesUpgradeSequenceToBeReferencedByIDSampledStmtDiagReqsAddSSTableTombstonesSystemPrivilegesTableEnablePredicateProjectionChangefeedAlterSystemSQLInstancesAddLocalitySystemExternalConnectionsTableAlterSystemStatementStatisticsAddIndexRecommendationsRoleIDSequenceAddSystemUserIDColumnSystemUsersIDColumnIsBackfilledSetSystemUsersUserIDColumnNotNullSQLSchemaTelemetryScheduledJobsSchemaChangeSupportsCreateFunctionDeleteRequestReturnKeyPebbleFormatPrePebblev1MarkedRoleOptionsTableHasIDColumnRoleOptionsIDColumnIsBackfilledSetRoleOptionsUserIDColumnNotNullUseDelRangeInGCJobWaitedForDelRangeInGCJob" +const _Key_name = "V21_2Start22_1ProbeRequestPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreEnablePebbleFormatVersionBlockPropertiesEnableLeaseHolderRemovalChangefeedIdlenessEnableNewStoreRebalancerClusterLocksVirtualTableAutoStatsTableSettingsEnableNewChangefeedOptionsV22_1Start22_2LocalTimestampsPebbleFormatSplitUserKeysMarkedCompactedEnsurePebbleFormatVersionRangeKeysEnablePebbleFormatVersionRangeKeysTrigramInvertedIndexesRemoveGrantPrivilegeMVCCRangeTombstonesUpgradeSequenceToBeReferencedByIDSampledStmtDiagReqsAddSSTableTombstonesSystemPrivilegesTableEnablePredicateProjectionChangefeedAlterSystemSQLInstancesAddLocalitySystemExternalConnectionsTableAlterSystemStatementStatisticsAddIndexRecommendationsRoleIDSequenceAddSystemUserIDColumnSystemUsersIDColumnIsBackfilledSetSystemUsersUserIDColumnNotNullSQLSchemaTelemetryScheduledJobsSchemaChangeSupportsCreateFunctionDeleteRequestReturnKeyPebbleFormatPrePebblev1MarkedRoleOptionsTableHasIDColumnRoleOptionsIDColumnIsBackfilledSetRoleOptionsUserIDColumnNotNullUseDelRangeInGCJobWaitedForDelRangeInGCJobRangefeedUseOneStreamPerNode" -var _Key_index = [...]uint16{0, 5, 14, 26, 54, 84, 112, 133, 173, 197, 215, 239, 263, 285, 311, 316, 325, 340, 380, 414, 448, 470, 490, 509, 542, 561, 581, 602, 637, 671, 701, 754, 768, 789, 820, 853, 884, 918, 940, 969, 996, 1027, 1060, 1078, 1102} +var _Key_index = [...]uint16{0, 5, 14, 26, 54, 84, 112, 133, 173, 197, 215, 239, 263, 285, 311, 316, 325, 340, 380, 414, 448, 470, 490, 509, 542, 561, 581, 602, 637, 671, 701, 754, 768, 789, 820, 853, 884, 918, 940, 969, 996, 1027, 1060, 1078, 1102, 1130} func (i Key) String() string { if i < 0 || i >= Key(len(_Key_index)-1) { diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index 8db3465220c2..84f8c486de26 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "batch.go", "condensable_span_set.go", "dist_sender.go", + "dist_sender_mux_rangefeed.go", "dist_sender_rangefeed.go", "doc.go", "local_test_cluster_util.go", @@ -39,6 +40,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/base", + "//pkg/clusterversion", "//pkg/gossip", "//pkg/keys", "//pkg/kv", @@ -111,6 +113,7 @@ go_test( srcs = [ "batch_test.go", "condensable_span_set_test.go", + "dist_sender_rangefeed_mock_test.go", "dist_sender_rangefeed_test.go", "dist_sender_server_test.go", "dist_sender_test.go", @@ -143,6 +146,7 @@ go_test( deps = [ "//build/bazelutil:noop", "//pkg/base", + "//pkg/clusterversion", "//pkg/config", "//pkg/config/zonepb", "//pkg/gossip", @@ -166,6 +170,7 @@ go_test( "//pkg/security/securitytest", "//pkg/server", "//pkg/settings/cluster", + "//pkg/sql/catalog/desctestutils", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", "//pkg/storage", @@ -175,9 +180,11 @@ go_test( "//pkg/testutils/localtestcluster", "//pkg/testutils/serverutils", "//pkg/testutils/skip", + "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util", "//pkg/util/caller", + "//pkg/util/ctxgroup", "//pkg/util/grpcutil", "//pkg/util/hlc", "//pkg/util/leaktest", diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go new file mode 100644 index 000000000000..e74f78d9b38e --- /dev/null +++ b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go @@ -0,0 +1,288 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvcoord + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" +) + +// rangefeedMuxer is responsible for coordination and management of mux +// rangefeeds. rangefeedMuxer caches MuxRangeFeed stream per node, and executes +// each range feed request on an appropriate node. +type rangefeedMuxer struct { + // eventCh receives events from all active muxStreams. + eventCh chan *roachpb.MuxRangeFeedEvent + + // Context group controlling execution of MuxRangeFeed calls. + g ctxgroup.Group + + // State pertaining to actively executing MuxRangeFeeds. + mu struct { + syncutil.Mutex + + // terminalErr set when a terminal error occurs. + // Subsequent calls to startMuxRangeFeed will return this error. + terminalErr error + + // Each call to start new range feed gets a unique ID which is echoed back + // by MuxRangeFeed rpc. This is done as a safety mechanism to make sure + // that we always send the event to the correct consumer -- even if the + // range feed is terminated and re-established rapidly. + nextStreamID int64 + + // muxClient contains a nodeID->MuxRangeFeedClient. + muxClients map[roachpb.NodeID]*muxClientState + // producers maps streamID to the event producer -- data sent back to the + // consumer of range feed events. + producers map[int64]*channelRangeFeedEventProducer + } +} + +type muxClientState struct { + client roachpb.Internal_MuxRangeFeedClient + streams util.FastIntSet + cancel context.CancelFunc +} + +func newRangefeedMuxer(g ctxgroup.Group) *rangefeedMuxer { + m := &rangefeedMuxer{ + eventCh: make(chan *roachpb.MuxRangeFeedEvent), + g: g, + } + m.mu.muxClients = make(map[roachpb.NodeID]*muxClientState) + m.mu.producers = make(map[int64]*channelRangeFeedEventProducer) + + m.g.GoCtx(m.demuxLoop) + + return m +} + +// channelRangeFeedEventProducer is a rangeFeedEventProducer which receives +// events on input channel, and returns events when Recv is called. +type channelRangeFeedEventProducer struct { + ctx context.Context + termErrCh chan struct{} // Signalled to propagate terminal error the consumer. + termErr error // Set when terminal error occurs. + eventCh chan *roachpb.RangeFeedEvent +} + +// Recv implements rangeFeedEventProducer interface. +func (c *channelRangeFeedEventProducer) Recv() (*roachpb.RangeFeedEvent, error) { + select { + case <-c.ctx.Done(): + return nil, c.ctx.Err() + case <-c.termErrCh: + return nil, c.termErr + case e := <-c.eventCh: + return e, nil + } +} + +var _ roachpb.RangeFeedEventProducer = (*channelRangeFeedEventProducer)(nil) + +// startMuxRangeFeed begins the execution of rangefeed for the specified +// RangeFeedRequest. +// The passed in client is only needed to establish MuxRangeFeed RPC. +func (m *rangefeedMuxer) startMuxRangeFeed( + ctx context.Context, client rpc.RestrictedInternalClient, req *roachpb.RangeFeedRequest, +) (_ roachpb.RangeFeedEventProducer, cleanup func(), _ error) { + producer, rpcClient, streamID, cleanup, err := m.connect(ctx, client, req.Replica.NodeID) + if err != nil { + return nil, cleanup, err + } + req.StreamID = streamID + if err := rpcClient.Send(req); err != nil { + return nil, cleanup, err + } + return producer, cleanup, nil +} + +// connect establishes MuxRangeFeed connection for the specified node, re-using +// the existing one if one exists. Returns event producer, RPC client to send +// requests on, the streamID which should be used when sending request, and a +// cleanup function. Cleanup function never nil, and must always be invoked, +// even if error is returned. +func (m *rangefeedMuxer) connect( + ctx context.Context, client rpc.RestrictedInternalClient, nodeID roachpb.NodeID, +) ( + _ roachpb.RangeFeedEventProducer, + _ roachpb.Internal_MuxRangeFeedClient, + streamID int64, + cleanup func(), + _ error, +) { + m.mu.Lock() + defer m.mu.Unlock() + + streamID = m.mu.nextStreamID + m.mu.nextStreamID++ + + cleanup = func() { + m.mu.Lock() + defer m.mu.Unlock() + + delete(m.mu.producers, streamID) + // Cleanup mux state if it exists; it may be nil if this function exits + // early (for example if MuxRangeFeed call fails, before muxState + // initialized). + muxState := m.mu.muxClients[nodeID] + if muxState != nil { + muxState.streams.Remove(int(streamID)) + if muxState.streams.Len() == 0 { + // This node no longer has any active streams. + // Delete node from the muxClient list, and gracefully + // shutdown consumer go routine. + delete(m.mu.muxClients, nodeID) + muxState.cancel() + } + } + } + + if m.mu.terminalErr != nil { + return nil, nil, streamID, cleanup, m.mu.terminalErr + } + + var found bool + ms, found := m.mu.muxClients[nodeID] + + if !found { + ctx, cancel := context.WithCancel(ctx) + stream, err := client.MuxRangeFeed(ctx) + if err != nil { + cancel() + return nil, nil, streamID, cleanup, err + } + + ms = &muxClientState{client: stream, cancel: cancel} + m.mu.muxClients[nodeID] = ms + m.g.GoCtx(func(ctx context.Context) error { + defer cancel() + return m.receiveEventsFromNode(ctx, nodeID, stream) + }) + } + + // Start RangeFeed for this request. + ms.streams.Add(int(streamID)) + + producer := &channelRangeFeedEventProducer{ + ctx: ctx, + termErrCh: make(chan struct{}), + eventCh: make(chan *roachpb.RangeFeedEvent), + } + m.mu.producers[streamID] = producer + return producer, ms.client, streamID, cleanup, nil +} + +// demuxLoop de-multiplexes events and sends them to appropriate rangefeed event +// consumer. +func (m *rangefeedMuxer) demuxLoop(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case e := <-m.eventCh: + m.mu.Lock() + producer, found := m.mu.producers[e.StreamID] + m.mu.Unlock() + + if !found { + return m.shutdownWithError(errors.AssertionFailedf( + "expected to find consumer for streamID=%d", e.StreamID), + ) + } + + select { + case <-ctx.Done(): + return ctx.Err() + case producer.eventCh <- &e.RangeFeedEvent: + } + } + } +} + +// receiveEventsFromNode receives mux rangefeed events, and forwards them to the +// consumer channel. +func (m *rangefeedMuxer) receiveEventsFromNode( + ctx context.Context, nodeID roachpb.NodeID, stream roachpb.Internal_MuxRangeFeedClient, +) error { + for { + event, streamErr := stream.Recv() + + if streamErr != nil { + m.propagateStreamTerminationErrorToConsumers(nodeID, streamErr) + // Since the stream error is handled above, we return nil to gracefully shut down + // this go routine. + return nil //nolint:returnerrcheck + } + + select { + case <-ctx.Done(): + return ctx.Err() + case m.eventCh <- event: + } + } +} + +// propagateStreamTerminationErrorToConsumers called when mux stream running on +// a node encountered an error. All consumers will receive the stream +// termination error and will handle it appropriately. +func (m *rangefeedMuxer) propagateStreamTerminationErrorToConsumers( + nodeID roachpb.NodeID, streamErr error, +) { + // Grab muxStream associated with the node, and clear it out. + m.mu.Lock() + defer m.mu.Unlock() + + ms, streamFound := m.mu.muxClients[nodeID] + delete(m.mu.muxClients, nodeID) + // Note: it's okay if the stream is not found; this can happen if the + // nodeID was already removed from muxClients because we're trying to gracefully + // shutdown receiveEventsFromNode go routine. + if streamFound { + ms.streams.ForEach(func(streamID int) { + p := m.mu.producers[int64(streamID)] + delete(m.mu.producers, int64(streamID)) + if p != nil { + p.termErr = streamErr + close(p.termErrCh) + } + }) + } + +} + +// shutdownWithError terminates this rangefeedMuxer with a terminal error +// (usually an assertion failure). It's a bit unwieldy way to propagate this +// error to the caller, but as soon as one of consumers notices this terminal +// error, the context should be cancelled. +// Returns the terminal error passed in. +func (m *rangefeedMuxer) shutdownWithError(terminalErr error) error { + // Okay to run this under the lock since as soon as a consumer sees this terminal + // error, the whole rangefeed should terminate. + m.mu.Lock() + defer m.mu.Unlock() + + m.mu.terminalErr = terminalErr + for id, p := range m.mu.producers { + p.termErr = terminalErr + close(p.termErrCh) + delete(m.mu.producers, id) + } + + return terminalErr +} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 3f09a3334424..8ea19348e216 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -19,6 +19,7 @@ import ( "time" "unsafe" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -26,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/iterutil" @@ -69,6 +71,29 @@ func maxConcurrentCatchupScans(sv *settings.Values) int { return int(l) } +type rangeFeedConfig struct { + useMuxRangeFeed bool +} + +// RangeFeedOption configures a RangeFeed. +type RangeFeedOption interface { + set(*rangeFeedConfig) +} + +type optionFunc func(*rangeFeedConfig) + +func (o optionFunc) set(c *rangeFeedConfig) { o(c) } + +// WithMuxRangeFeed configures range feed to use MuxRangeFeed RPC. +func WithMuxRangeFeed() RangeFeedOption { + return optionFunc(func(c *rangeFeedConfig) { + c.useMuxRangeFeed = true + }) +} + +// A "kill switch" to disable multiplexing rangefeed if severe issues discovered with new implementation. +var enableMuxRangeFeed = envutil.EnvOrDefaultBool("COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED", true) + // RangeFeed divides a RangeFeed request on range boundaries and establishes a // RangeFeed to each of the individual ranges. It streams back results on the // provided channel. @@ -88,6 +113,7 @@ func (ds *DistSender) RangeFeed( startAfter hlc.Timestamp, // exclusive withDiff bool, eventCh chan<- RangeFeedMessage, + opts ...RangeFeedOption, ) error { timedSpans := make([]SpanTimePair, 0, len(spans)) for _, sp := range spans { @@ -96,7 +122,7 @@ func (ds *DistSender) RangeFeed( StartAfter: startAfter, }) } - return ds.RangeFeedSpans(ctx, timedSpans, withDiff, eventCh) + return ds.RangeFeedSpans(ctx, timedSpans, withDiff, eventCh, opts...) } // SpanTimePair is a pair of span along with its starting time. The starting @@ -110,12 +136,21 @@ type SpanTimePair struct { // RangeFeedSpans is similar to RangeFeed but allows specification of different // starting time for each span. func (ds *DistSender) RangeFeedSpans( - ctx context.Context, spans []SpanTimePair, withDiff bool, eventCh chan<- RangeFeedMessage, + ctx context.Context, + spans []SpanTimePair, + withDiff bool, + eventCh chan<- RangeFeedMessage, + opts ...RangeFeedOption, ) error { if len(spans) == 0 { return errors.AssertionFailedf("expected at least 1 span, got none") } + var cfg rangeFeedConfig + for _, opt := range opts { + opt.set(&cfg) + } + ctx = ds.AnnotateCtx(ctx) ctx, sp := tracing.EnsureChildSpan(ctx, ds.AmbientContext.Tracer, "dist sender") defer sp.Finish() @@ -128,6 +163,16 @@ func (ds *DistSender) RangeFeedSpans( "distSenderCatchupLimit", maxConcurrentCatchupScans(&ds.st.SV)) g := ctxgroup.WithContext(ctx) + + var eventProducer rangeFeedEventProducerFactory + if ds.st.Version.IsActive(ctx, clusterversion.RangefeedUseOneStreamPerNode) && + enableMuxRangeFeed && cfg.useMuxRangeFeed { + m := newRangefeedMuxer(g) + eventProducer = m.startMuxRangeFeed + } else { + eventProducer = legacyRangeFeedEventProducer + } + // Goroutine that processes subdivided ranges and creates a rangefeed for // each. rangeCh := make(chan singleRangeInfo, 16) @@ -137,7 +182,8 @@ func (ds *DistSender) RangeFeedSpans( case sri := <-rangeCh: // Spawn a child goroutine to process this feed. g.GoCtx(func(ctx context.Context) error { - return ds.partialRangeFeed(ctx, rr, sri.rs, sri.startAfter, sri.token, withDiff, &catchupSem, rangeCh, eventCh) + return ds.partialRangeFeed(ctx, rr, eventProducer, sri.rs, sri.startAfter, + sri.token, withDiff, &catchupSem, rangeCh, eventCh) }) case <-ctx.Done(): return ctx.Err() @@ -293,6 +339,7 @@ func (ds *DistSender) divideAndSendRangeFeedToRanges( func (ds *DistSender) partialRangeFeed( ctx context.Context, rr *rangeFeedRegistry, + streamProducerFactory rangeFeedEventProducerFactory, rs roachpb.RSpan, startAfter hlc.Timestamp, token rangecache.EvictionToken, @@ -334,8 +381,9 @@ func (ds *DistSender) partialRangeFeed( } // Establish a RangeFeed for a single Range. - maxTS, err := ds.singleRangeFeed(ctx, span, startAfter, withDiff, token.Desc(), - catchupSem, eventCh, active.onRangeEvent) + maxTS, err := ds.singleRangeFeed( + ctx, span, startAfter, withDiff, token.Desc(), + catchupSem, eventCh, streamProducerFactory, active.onRangeEvent) // Forward the timestamp in case we end up sending it again. startAfter.Forward(maxTS) @@ -408,6 +456,7 @@ func (ds *DistSender) singleRangeFeed( desc *roachpb.RangeDescriptor, catchupSem *limit.ConcurrentRequestLimiter, eventCh chan<- RangeFeedMessage, + streamProducerFactory rangeFeedEventProducerFactory, onRangeEvent onRangeEventCb, ) (hlc.Timestamp, error) { // Ensure context is cancelled on all errors, to prevent gRPC stream leaks. @@ -432,8 +481,6 @@ func (ds *DistSender) singleRangeFeed( return args.Timestamp, err } replicas.OptimizeReplicaOrder(ds.getNodeID(), latencyFn, ds.locality) - // The RangeFeed is not used for system critical traffic so use a DefaultClass - // connection regardless of the range. opts := SendOptions{class: connectionClass(&ds.st.SV)} transport, err := ds.transportFactory(opts, ds.nodeDialer, replicas) if err != nil { @@ -459,11 +506,21 @@ func (ds *DistSender) singleRangeFeed( // cleanup catchup reservation in case of early termination. defer finishCatchupScan() + var streamCleanup func() + maybeCleanupStream := func() { + if streamCleanup != nil { + streamCleanup() + streamCleanup = nil + } + } + defer maybeCleanupStream() + for { if transport.IsExhausted() { return args.Timestamp, newSendError( fmt.Sprintf("sending to all %d replicas failed", len(replicas))) } + maybeCleanupStream() args.Replica = transport.NextReplica() client, err := transport.NextInternalClient(ctx) @@ -473,7 +530,9 @@ func (ds *DistSender) singleRangeFeed( } log.VEventf(ctx, 3, "attempting to create a RangeFeed over replica %s", args.Replica) - stream, err := client.RangeFeed(ctx, &args) + + var stream roachpb.RangeFeedEventProducer + stream, streamCleanup, err = streamProducerFactory(ctx, client, &args) if err != nil { log.VErrEventf(ctx, 2, "RPC error: %s", err) if grpcutil.IsAuthError(err) { @@ -526,3 +585,19 @@ func connectionClass(sv *settings.Values) rpc.ConnectionClass { } return rpc.DefaultClass } + +type rangeFeedEventProducerFactory func( + ctx context.Context, + client rpc.RestrictedInternalClient, + req *roachpb.RangeFeedRequest, +) (roachpb.RangeFeedEventProducer, func(), error) + +// legacyRangeFeedEventProducer is a rangeFeedEventProducerFactory using +// legacy RangeFeed RPC. +func legacyRangeFeedEventProducer( + ctx context.Context, client rpc.RestrictedInternalClient, req *roachpb.RangeFeedRequest, +) (producer roachpb.RangeFeedEventProducer, cleanup func(), err error) { + cleanup = func() {} + producer, err = client.RangeFeed(ctx, req) + return producer, cleanup, err +} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go new file mode 100644 index 000000000000..9d311da46b95 --- /dev/null +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go @@ -0,0 +1,182 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvcoord + +import ( + "context" + "fmt" + "io" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/gossip" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache/rangecachemock" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/roachpb/roachpbmock" + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/stop" + gomock "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" +) + +// TestingSetEnableMuxRangeFeed adjusts enable rangefeed env variable +// for testing. +func TestingSetEnableMuxRangeFeed(enabled bool) func() { + old := enableMuxRangeFeed + enableMuxRangeFeed = enabled + return func() { + enableMuxRangeFeed = old + } +} + +// Tests that the range feed handles transport errors appropriately. In +// particular, that when encountering other decommissioned nodes it will refresh +// its range descriptor and retry, but if this node is decommissioned it will +// bail out. Regression test for: +// https://github.com/cockroachdb/cockroach/issues/66636 +func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + for _, useMuxRangeFeed := range []bool{false, true} { + for _, spec := range []struct { + errorCode codes.Code + expectRetry bool + }{ + {codes.FailedPrecondition, true}, // target node is decommissioned; retry + {codes.PermissionDenied, false}, // this node is decommissioned; abort + {codes.Unauthenticated, false}, // this node is not part of cluster; abort + } { + t.Run(fmt.Sprintf("mux=%t/%s", useMuxRangeFeed, spec.errorCode), + func(t *testing.T) { + clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) + g := makeGossip(t, stopper, rpcContext) + + desc := roachpb.RangeDescriptor{ + RangeID: 1, + Generation: 1, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + }, + } + for _, repl := range desc.InternalReplicas { + require.NoError(t, g.AddInfoProto( + gossip.MakeNodeIDKey(repl.NodeID), + newNodeDesc(repl.NodeID), + gossip.NodeDescriptorTTL, + )) + } + + ctrl := gomock.NewController(t) + transport := NewMockTransport(ctrl) + rangeDB := rangecachemock.NewMockRangeDescriptorDB(ctrl) + + // We start off with a cached lease on r1. + cachedLease := roachpb.Lease{ + Replica: desc.InternalReplicas[0], + Sequence: 1, + } + + // All nodes return the specified error code. We expect the range feed to + // keep trying all replicas in sequence regardless of error. + for _, repl := range desc.InternalReplicas { + transport.EXPECT().IsExhausted().Return(false) + transport.EXPECT().NextReplica().Return(repl) + transport.EXPECT().NextInternalClient(gomock.Any()).Return( + nil, grpcstatus.Error(spec.errorCode, "")) + } + transport.EXPECT().IsExhausted().Return(true) + transport.EXPECT().Release() + + // Once all replicas have failed, it should try to refresh the lease using + // the range cache. We let this succeed once. + rangeDB.EXPECT().FirstRange().Return(&desc, nil) + + // It then tries the replicas again. This time we just report the + // transport as exhausted immediately. + transport.EXPECT().IsExhausted().Return(true) + transport.EXPECT().Release() + + // This invalidates the cache yet again. This time we error. + rangeDB.EXPECT().FirstRange().Return(nil, grpcstatus.Error(spec.errorCode, "")) + + // If we expect a range lookup retry, allow the retry to succeed by + // returning a range descriptor and a client that immediately + // cancels the context and closes the range feed stream. + if spec.expectRetry { + rangeDB.EXPECT().FirstRange().Return(&desc, nil) + client := roachpbmock.NewMockInternalClient(ctrl) + + if useMuxRangeFeed { + stream := roachpbmock.NewMockInternal_MuxRangeFeedClient(ctrl) + stream.EXPECT().Send(gomock.Any()).Return(nil) + stream.EXPECT().Recv().Do(cancel).Return(nil, io.EOF) + client.EXPECT().MuxRangeFeed(gomock.Any()).Return(stream, nil) + } else { + stream := roachpbmock.NewMockInternal_RangeFeedClient(ctrl) + stream.EXPECT().Recv().Do(cancel).Return(nil, io.EOF) + client.EXPECT().RangeFeed(gomock.Any(), gomock.Any()).Return(stream, nil) + } + + transport.EXPECT().IsExhausted().Return(false) + transport.EXPECT().NextReplica().Return(desc.InternalReplicas[0]) + transport.EXPECT().NextInternalClient(gomock.Any()).Return(client, nil) + transport.EXPECT().Release() + } + + ds := NewDistSender(DistSenderConfig{ + AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), + Clock: clock, + NodeDescs: g, + RPCRetryOptions: &retry.Options{MaxRetries: 10}, + RPCContext: rpcContext, + TestingKnobs: ClientTestingKnobs{ + TransportFactory: func(SendOptions, *nodedialer.Dialer, ReplicaSlice) (Transport, error) { + return transport, nil + }, + }, + RangeDescriptorDB: rangeDB, + NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), + Settings: cluster.MakeTestingClusterSettings(), + }) + ds.rangeCache.Insert(ctx, roachpb.RangeInfo{ + Desc: desc, + Lease: cachedLease, + }) + + var opts []RangeFeedOption + if useMuxRangeFeed { + opts = append(opts, WithMuxRangeFeed()) + } + err := ds.RangeFeed(ctx, []roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}, hlc.Timestamp{}, + false, nil, opts...) + require.Error(t, err) + }) + } + } +} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go index f3fefa0c403c..1cfe6a93e2eb 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go @@ -8,147 +8,356 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package kvcoord +package kvcoord_test import ( "context" - "io" "testing" "time" - "github.com/cockroachdb/cockroach/pkg/gossip" + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache/rangecachemock" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/roachpb/roachpbmock" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/retry" - "github.com/cockroachdb/cockroach/pkg/util/stop" - gomock "github.com/golang/mock/gomock" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" - "google.golang.org/grpc/codes" - grpcstatus "google.golang.org/grpc/status" + "google.golang.org/grpc" ) -// Tests that the range feed handles transport errors appropriately. In -// particular, that when encountering other decommissioned nodes it will refresh -// its range descriptor and retry, but if this node is decommissioned it will -// bail out. Regression test for: -// https://github.com/cockroachdb/cockroach/issues/66636 -func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) +type testRangefeedClient struct { + rpc.RestrictedInternalClient + muxRangeFeedEnabled bool + count func() +} - for _, spec := range []struct { - errorCode codes.Code - expectRetry bool - }{ - {codes.FailedPrecondition, true}, // target node is decommissioned; retry - {codes.PermissionDenied, false}, // this node is decommissioned; abort - {codes.Unauthenticated, false}, // this node is not part of cluster; abort - } { - t.Run(spec.errorCode.String(), func(t *testing.T) { - clock := hlc.NewClockWithSystemTimeSource(time.Nanosecond /* maxOffset */) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) - g := makeGossip(t, stopper, rpcContext) - - desc := roachpb.RangeDescriptor{ - RangeID: 1, - Generation: 1, - StartKey: roachpb.RKeyMin, - EndKey: roachpb.RKeyMax, - InternalReplicas: []roachpb.ReplicaDescriptor{ - {NodeID: 1, StoreID: 1, ReplicaID: 1}, - {NodeID: 2, StoreID: 2, ReplicaID: 2}, - }, - } - for _, repl := range desc.InternalReplicas { - require.NoError(t, g.AddInfoProto( - gossip.MakeNodeIDKey(repl.NodeID), - newNodeDesc(repl.NodeID), - gossip.NodeDescriptorTTL, - )) - } +func (c *testRangefeedClient) RangeFeed( + ctx context.Context, args *roachpb.RangeFeedRequest, opts ...grpc.CallOption, +) (roachpb.Internal_RangeFeedClient, error) { + defer c.count() - ctrl := gomock.NewController(t) - transport := NewMockTransport(ctrl) - rangeDB := rangecachemock.NewMockRangeDescriptorDB(ctrl) + if c.muxRangeFeedEnabled && ctx.Value(useMuxRangeFeedCtxKey{}) != nil { + panic(errors.AssertionFailedf("unexpected call to RangeFeed")) + } + return c.RestrictedInternalClient.RangeFeed(ctx, args, opts...) +} - // We start off with a cached lease on r1. - cachedLease := roachpb.Lease{ - Replica: desc.InternalReplicas[0], - Sequence: 1, - } +func (c *testRangefeedClient) MuxRangeFeed( + ctx context.Context, opts ...grpc.CallOption, +) (roachpb.Internal_MuxRangeFeedClient, error) { + defer c.count() + + if !c.muxRangeFeedEnabled || ctx.Value(useMuxRangeFeedCtxKey{}) == nil { + panic(errors.AssertionFailedf("unexpected call to MuxRangeFeed")) + } + return c.RestrictedInternalClient.MuxRangeFeed(ctx, opts...) +} + +type internalClientCounts struct { + syncutil.Mutex + counts map[rpc.RestrictedInternalClient]int +} + +func (c *internalClientCounts) Inc(ic rpc.RestrictedInternalClient) { + if c == nil { + return + } + c.Lock() + defer c.Unlock() + c.counts[ic]++ +} + +type countConnectionsTransport struct { + wrapped kvcoord.Transport + counts *internalClientCounts + rfStreamEnabled bool +} + +var _ kvcoord.Transport = (*countConnectionsTransport)(nil) + +func (c *countConnectionsTransport) IsExhausted() bool { + return c.wrapped.IsExhausted() +} + +func (c *countConnectionsTransport) SendNext( + ctx context.Context, request roachpb.BatchRequest, +) (*roachpb.BatchResponse, error) { + return c.wrapped.SendNext(ctx, request) +} + +type testFeedCtxKey struct{} +type useMuxRangeFeedCtxKey struct{} + +func (c *countConnectionsTransport) NextInternalClient( + ctx context.Context, +) (rpc.RestrictedInternalClient, error) { + client, err := c.wrapped.NextInternalClient(ctx) + if err != nil { + return nil, err + } + + tc := &testRangefeedClient{ + RestrictedInternalClient: client, + muxRangeFeedEnabled: c.rfStreamEnabled, + } + // Count rangefeed calls but only for feeds started by this test. + if ctx.Value(testFeedCtxKey{}) != nil { + tc.count = func() { + c.counts.Inc(tc) + } + } else { + tc.count = func() {} + } + + return tc, nil +} + +func (c *countConnectionsTransport) NextReplica() roachpb.ReplicaDescriptor { + return c.wrapped.NextReplica() +} + +func (c *countConnectionsTransport) SkipReplica() { + c.wrapped.SkipReplica() +} + +func (c *countConnectionsTransport) MoveToFront(descriptor roachpb.ReplicaDescriptor) bool { + return c.wrapped.MoveToFront(descriptor) +} + +func (c *countConnectionsTransport) Release() { + c.wrapped.Release() +} + +func makeTransportFactory( + rfStreamEnabled bool, counts *internalClientCounts, +) kvcoord.TransportFactory { + return func( + options kvcoord.SendOptions, + dialer *nodedialer.Dialer, + slice kvcoord.ReplicaSlice, + ) (kvcoord.Transport, error) { + transport, err := kvcoord.GRPCTransportFactory(options, dialer, slice) + if err != nil { + return nil, err + } + countingTransport := &countConnectionsTransport{ + wrapped: transport, + rfStreamEnabled: rfStreamEnabled, + counts: counts, + } + return countingTransport, nil + } +} + +// rangeFeed is a helper to execute rangefeed. We are not using rangefeed library +// here because of circular dependencies. +func rangeFeed( + dsI interface{}, + sp roachpb.Span, + startFrom hlc.Timestamp, + onValue func(event kvcoord.RangeFeedMessage), + useMuxRangeFeed bool, +) func() { + ds := dsI.(*kvcoord.DistSender) + events := make(chan kvcoord.RangeFeedMessage) + ctx, cancel := context.WithCancel(context.WithValue(context.Background(), testFeedCtxKey{}, struct{}{})) - // All nodes return the specified error code. We expect the range feed to - // keep trying all replicas in sequence regardless of error. - for _, repl := range desc.InternalReplicas { - transport.EXPECT().IsExhausted().Return(false) - transport.EXPECT().NextReplica().Return(repl) - transport.EXPECT().NextInternalClient(gomock.Any()).Return( - nil, grpcstatus.Error(spec.errorCode, "")) + g := ctxgroup.WithContext(ctx) + g.GoCtx(func(ctx context.Context) (err error) { + var opts []kvcoord.RangeFeedOption + if useMuxRangeFeed { + opts = append(opts, kvcoord.WithMuxRangeFeed()) + ctx = context.WithValue(ctx, useMuxRangeFeedCtxKey{}, struct{}{}) + } + return ds.RangeFeed(ctx, []roachpb.Span{sp}, startFrom, false, events, opts...) + }) + g.GoCtx(func(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case ev := <-events: + onValue(ev) } - transport.EXPECT().IsExhausted().Return(true) - transport.EXPECT().Release() - - // Once all replicas have failed, it should try to refresh the lease using - // the range cache. We let this succeed once. - rangeDB.EXPECT().FirstRange().Return(&desc, nil) - - // It then tries the replicas again. This time we just report the - // transport as exhausted immediately. - transport.EXPECT().IsExhausted().Return(true) - transport.EXPECT().Release() - - // This invalidates the cache yet again. This time we error. - rangeDB.EXPECT().FirstRange().Return(nil, grpcstatus.Error(spec.errorCode, "")) - - // If we expect a range lookup retry, allow the retry to succeed by - // returning a range descriptor and a client that immediately - // cancels the context and closes the range feed stream. - if spec.expectRetry { - rangeDB.EXPECT().FirstRange().Return(&desc, nil) - stream := roachpbmock.NewMockInternal_RangeFeedClient(ctrl) - stream.EXPECT().Recv().Do(cancel).Return(nil, io.EOF) - client := roachpbmock.NewMockInternalClient(ctrl) - client.EXPECT().RangeFeed(gomock.Any(), gomock.Any()).Return(stream, nil) - transport.EXPECT().IsExhausted().Return(false) - transport.EXPECT().NextReplica().Return(desc.InternalReplicas[0]) - transport.EXPECT().NextInternalClient(gomock.Any()).Return(client, nil) - transport.EXPECT().Release() + } + }) + + return func() { + cancel() + _ = g.Wait() + } +} + +// observeNValues returns on value handler which expects to see N rangefeed values, +// along with the channel which gets closed when requisite count of events has been seen. +func observeNValues(n int) (chan struct{}, func(ev kvcoord.RangeFeedMessage)) { + var count = struct { + syncutil.Mutex + c int + }{} + allSeen := make(chan struct{}) + return allSeen, func(ev kvcoord.RangeFeedMessage) { + if ev.Val != nil { + count.Lock() + defer count.Unlock() + count.c++ + log.Infof(context.Background(), "Waiting N values: saw %d, want %d; current=%s", count.c, n, ev.Val.Key) + if count.c == n { + close(allSeen) } + } + } +} - ds := NewDistSender(DistSenderConfig{ - AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), - Clock: clock, - NodeDescs: g, - RPCRetryOptions: &retry.Options{MaxRetries: 10}, - RPCContext: rpcContext, - TestingKnobs: ClientTestingKnobs{ - TransportFactory: func(SendOptions, *nodedialer.Dialer, ReplicaSlice) (Transport, error) { - return transport, nil +func channelWaitWithTimeout(t *testing.T, ch chan struct{}) { + t.Helper() + timeOut := 30 * time.Second + if util.RaceEnabled { + timeOut *= 10 + } + select { + case <-ch: + case <-time.After(timeOut): + t.Fatal("test timed out") + } +} +func TestBiDirectionalRangefeedNotUsedUntilUpgradeFinalilzed(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + startServerAtVer := func(ver roachpb.Version) (*testcluster.TestCluster, func()) { + st := cluster.MakeTestingClusterSettingsWithVersions(ver, ver, true) + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, // Turn off replication queues. + ServerArgs: base.TestServerArgs{ + Settings: st, + Knobs: base.TestingKnobs{ + KVClient: &kvcoord.ClientTestingKnobs{ + TransportFactory: makeTransportFactory(false, nil), + }, + + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + BinaryVersionOverride: ver, }, }, - RangeDescriptorDB: rangeDB, - NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), - Settings: cluster.MakeTestingClusterSettings(), - }) - ds.rangeCache.Insert(ctx, roachpb.RangeInfo{ - Desc: desc, - Lease: cachedLease, - }) - - err := ds.RangeFeed(ctx, []roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}, hlc.Timestamp{}, false, nil) - require.Error(t, err) + }, + }) + return tc, func() { tc.Stopper().Stop(ctx) } + } + + // Create a small table; run rangefeed. The transport factory we injected above verifies + // that we use the old rangefeed implementation. + runRangeFeed := func(tc *testcluster.TestCluster, opts ...kvcoord.RangeFeedOption) { + ts := tc.Server(0) + + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + startTime := ts.Clock().Now() + + sqlDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) + sqlDB.Exec(t, `CREATE TABLE foo (key INT PRIMARY KEY)`) + sqlDB.Exec(t, `INSERT INTO foo (key) SELECT * FROM generate_series(1, 1000)`) + + fooDesc := desctestutils.TestingGetPublicTableDescriptor( + ts.DB(), keys.SystemSQLCodec, "defaultdb", "foo") + fooSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec) + + allSeen, onValue := observeNValues(1000) + closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, onValue, false) + channelWaitWithTimeout(t, allSeen) + closeFeed() + } + + t.Run("rangefeed-stream-disabled-prior-to-version-upgrade", func(t *testing.T) { + noRfStreamVer := clusterversion.ByKey(clusterversion.RangefeedUseOneStreamPerNode - 1) + tc, cleanup := startServerAtVer(noRfStreamVer) + defer cleanup() + runRangeFeed(tc) + }) + + t.Run("rangefeed-stream-disabled-via-environment", func(t *testing.T) { + defer kvcoord.TestingSetEnableMuxRangeFeed(false)() + // Even though we could use rangefeed stream, it's disabled via kill switch. + rfStreamVer := clusterversion.ByKey(clusterversion.RangefeedUseOneStreamPerNode) + tc, cleanup := startServerAtVer(rfStreamVer) + defer cleanup() + runRangeFeed(tc, kvcoord.WithMuxRangeFeed()) + }) +} + +func TestMuxRangeFeedConnectsToNodeOnce(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + connCounts := &internalClientCounts{counts: make(map[rpc.RestrictedInternalClient]int)} + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, // Turn off replication queues. + + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + KVClient: &kvcoord.ClientTestingKnobs{ + TransportFactory: makeTransportFactory(true, connCounts), + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + ts := tc.Server(0) + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + startTime := ts.Clock().Now() + + // Create a table, and split it so that we have multiple ranges, distributed across + // test cluster nodes. + sqlDB.ExecMultiple(t, + `SET CLUSTER SETTING kv.rangefeed.enabled = true`, + `ALTER DATABASE defaultdb CONFIGURE ZONE USING num_replicas = 1`, + `CREATE TABLE foo (key INT PRIMARY KEY)`, + `INSERT INTO foo (key) SELECT * FROM generate_series(1, 1000)`, + `ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(100, 900, 100))`, + ) + + for i := 100; i <= 900; i += 100 { + storeID := 1 + i%3 + rowID := i + testutils.SucceedsSoon(t, func() error { + _, err := sqlDB.DB.ExecContext(context.Background(), + "ALTER TABLE foo EXPERIMENTAL_RELOCATE VALUES (ARRAY[$1], $2)", storeID, rowID) + return err }) } + + fooDesc := desctestutils.TestingGetPublicTableDescriptor( + ts.DB(), keys.SystemSQLCodec, "defaultdb", "foo") + fooSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec) + + allSeen, onValue := observeNValues(1000) + closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, onValue, true) + channelWaitWithTimeout(t, allSeen) + closeFeed() + + // Verify we connected to each node once. + connCounts.Lock() + defer connCounts.Unlock() + for _, c := range connCounts.counts { + require.Equal(t, 1, c) + } } diff --git a/pkg/kv/kvclient/kvcoord/send_test.go b/pkg/kv/kvclient/kvcoord/send_test.go index bcc875850b08..61dcf55d812b 100644 --- a/pkg/kv/kvclient/kvcoord/send_test.go +++ b/pkg/kv/kvclient/kvcoord/send_test.go @@ -57,6 +57,10 @@ func (n Node) RangeFeed(_ *roachpb.RangeFeedRequest, _ roachpb.Internal_RangeFee panic("unimplemented") } +func (n Node) MuxRangeFeed(server roachpb.Internal_MuxRangeFeedServer) error { + panic("unimplemented") +} + func (n Node) GossipSubscription( _ *roachpb.GossipSubscriptionRequest, _ roachpb.Internal_GossipSubscriptionServer, ) error { diff --git a/pkg/kv/kvclient/kvcoord/transport_test.go b/pkg/kv/kvclient/kvcoord/transport_test.go index 7c5e07a899ba..ec3610569db2 100644 --- a/pkg/kv/kvclient/kvcoord/transport_test.go +++ b/pkg/kv/kvclient/kvcoord/transport_test.go @@ -187,6 +187,12 @@ func (m *mockInternalClient) RangeFeed( return nil, fmt.Errorf("unsupported RangeFeed call") } +func (m *mockInternalClient) MuxRangeFeed( + ctx context.Context, opts ...grpc.CallOption, +) (roachpb.Internal_MuxRangeFeedClient, error) { + return nil, fmt.Errorf("unsupported MuxRangeFeed call") +} + // GossipSubscription is part of the roachpb.InternalClient interface. func (m *mockInternalClient) GossipSubscription( ctx context.Context, args *roachpb.GossipSubscriptionRequest, _ ...grpc.CallOption, diff --git a/pkg/kv/kvserver/kvserverbase/base.go b/pkg/kv/kvserver/kvserverbase/base.go index 9daa9d43464e..35c55dca2cfe 100644 --- a/pkg/kv/kvserver/kvserverbase/base.go +++ b/pkg/kv/kvserver/kvserverbase/base.go @@ -117,7 +117,7 @@ type ReplicaResponseFilter func(context.Context, roachpb.BatchRequest, *roachpb. // ReplicaRangefeedFilter is used in unit tests to modify the request, inject // responses, or return errors from rangefeeds. type ReplicaRangefeedFilter func( - args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) *roachpb.Error // ContainsKey returns whether this range contains the specified key. diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index c3b62db37ef2..b1955eb52f2f 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -63,7 +63,7 @@ var RangeFeedRefreshInterval = settings.RegisterDurationSetting( // support for concurrent calls to Send. Note that the default implementation of // grpc.Stream is not safe for concurrent calls to Send. type lockedRangefeedStream struct { - wrapped rangefeed.Stream + wrapped roachpb.RangeFeedEventSink sendMu syncutil.Mutex } @@ -134,13 +134,13 @@ func (tp *rangefeedTxnPusher) ResolveIntents( // complete. The provided ConcurrentRequestLimiter is used to limit the number // of rangefeeds using catch-up iterators at the same time. func (r *Replica) RangeFeed( - args *roachpb.RangeFeedRequest, stream rangefeed.Stream, + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) *roachpb.Error { return r.rangeFeedWithRangeID(r.RangeID, args, stream) } func (r *Replica) rangeFeedWithRangeID( - _forStacks roachpb.RangeID, args *roachpb.RangeFeedRequest, stream rangefeed.Stream, + _forStacks roachpb.RangeID, args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) *roachpb.Error { if !r.isRangefeedEnabled() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) { return roachpb.NewErrorf("rangefeeds require the kv.rangefeed.enabled setting. See %s", diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index cb5769755027..b11490877a89 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -3090,7 +3090,7 @@ func (s *Store) Descriptor(ctx context.Context, useCached bool) (*roachpb.StoreD // the provided stream and returns with an optional error when the rangefeed is // complete. func (s *Store) RangeFeed( - args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) *roachpb.Error { if filter := s.TestingKnobs().TestingRangefeedFilter; filter != nil { diff --git a/pkg/kv/kvserver/stores.go b/pkg/kv/kvserver/stores.go index 9dd1fd81d859..fde593a1c248 100644 --- a/pkg/kv/kvserver/stores.go +++ b/pkg/kv/kvserver/stores.go @@ -234,7 +234,7 @@ func (ls *Stores) SendWithWriteBytes( // the provided stream and returns with an optional error when the rangefeed is // complete. func (ls *Stores) RangeFeed( - args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) *roachpb.Error { ctx := stream.Context() if args.RangeID == 0 { diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 602e1fdf877c..4aec679f5751 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -11,6 +11,7 @@ package roachpb import ( + "context" "fmt" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" @@ -23,7 +24,7 @@ import ( "github.com/dustin/go-humanize" ) -//go:generate mockgen -package=roachpbmock -destination=roachpbmock/mocks_generated.go . InternalClient,Internal_RangeFeedClient +//go:generate mockgen -package=roachpbmock -destination=roachpbmock/mocks_generated.go . InternalClient,Internal_RangeFeedClient,Internal_MuxRangeFeedClient // UserPriority is a custom type for transaction's user priority. type UserPriority float64 @@ -1847,3 +1848,17 @@ const ( // with the SpecificTenantOverrides precedence.. AllTenantsOverrides ) + +// RangeFeedEventSink is an interface for sending a single rangefeed event. +type RangeFeedEventSink interface { + Context() context.Context + Send(*RangeFeedEvent) error +} + +// RangeFeedEventProducer is an adapter for receiving rangefeed events with either +// the legacy RangeFeed RPC, or the MuxRangeFeed RPC. +type RangeFeedEventProducer interface { + // Recv receives the next rangefeed event. an io.EOF error indicates that the + // range needs to be restarted. + Recv() (*RangeFeedEvent, error) +} diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index be7d760b89ae..d67d25980e2f 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -2722,6 +2722,9 @@ message RangeFeedRequest { // AdmissionHeader is used only at the start of the range feed stream, since // the initial catch-up scan be expensive. AdmissionHeader admission_header = 4 [(gogoproto.nullable) = false]; + + // StreamID is set by the client issuing MuxRangeFeed requests. + int64 stream_id = 5 [(gogoproto.customname) = "StreamID"]; } // RangeFeedValue is a variant of RangeFeedEvent that represents an update to @@ -2795,6 +2798,14 @@ message RangeFeedEvent { RangeFeedDeleteRange delete_range = 5; } +// MuxRangeFeedEvent is a response generated by MuxRangeFeed RPC. It tags +// the underlying RangeFeedEvent with the ID of the range that produced this event. +message MuxRangeFeedEvent { + RangeFeedEvent event = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; + int32 range_id = 2 [(gogoproto.customname) = "RangeID", (gogoproto.casttype) = "RangeID"]; + // Server echoes back stream_id set by the client. + int64 stream_id = 3 [(gogoproto.customname) = "StreamID"]; +} // ResetQuorumRequest makes a range that is unavailable due to lost quorum // available again, at the cost of losing all of the data in the range. Any @@ -3004,7 +3015,10 @@ message JoinNodeResponse { service Internal { rpc Batch (BatchRequest) returns (BatchResponse) {} rpc RangeLookup (RangeLookupRequest) returns (RangeLookupResponse) {} + // RangeFeed RPC is deprecated. Use MuxRangeFeed instead. + // To be removed at version 23.1 rpc RangeFeed (RangeFeedRequest) returns (stream RangeFeedEvent) {} + rpc MuxRangeFeed (stream RangeFeedRequest) returns (stream MuxRangeFeedEvent) {} rpc GossipSubscription (GossipSubscriptionRequest) returns (stream GossipSubscriptionEvent) {} rpc ResetQuorum (ResetQuorumRequest) returns (ResetQuorumResponse) {} diff --git a/pkg/roachpb/roachpbmock/BUILD.bazel b/pkg/roachpb/roachpbmock/BUILD.bazel index c50e10c59fe3..9c9d0d0f4f11 100644 --- a/pkg/roachpb/roachpbmock/BUILD.bazel +++ b/pkg/roachpb/roachpbmock/BUILD.bazel @@ -8,6 +8,7 @@ gomock( interfaces = [ "InternalClient", "Internal_RangeFeedClient", + "Internal_MuxRangeFeedClient", ], library = "//pkg/roachpb", package = "roachpbmock", diff --git a/pkg/roachpb/roachpbmock/mocks_generated.go b/pkg/roachpb/roachpbmock/mocks_generated.go index a053cca023de..377c7b7b0fcb 100644 --- a/pkg/roachpb/roachpbmock/mocks_generated.go +++ b/pkg/roachpb/roachpbmock/mocks_generated.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/cockroachdb/cockroach/pkg/roachpb (interfaces: InternalClient,Internal_RangeFeedClient) +// Source: github.com/cockroachdb/cockroach/pkg/roachpb (interfaces: InternalClient,Internal_RangeFeedClient,Internal_MuxRangeFeedClient) // Package roachpbmock is a generated GoMock package. package roachpbmock @@ -137,6 +137,26 @@ func (mr *MockInternalClientMockRecorder) Join(arg0, arg1 interface{}, arg2 ...i return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Join", reflect.TypeOf((*MockInternalClient)(nil).Join), varargs...) } +// MuxRangeFeed mocks base method. +func (m *MockInternalClient) MuxRangeFeed(arg0 context.Context, arg1 ...grpc.CallOption) (roachpb.Internal_MuxRangeFeedClient, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "MuxRangeFeed", varargs...) + ret0, _ := ret[0].(roachpb.Internal_MuxRangeFeedClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// MuxRangeFeed indicates an expected call of MuxRangeFeed. +func (mr *MockInternalClientMockRecorder) MuxRangeFeed(arg0 interface{}, arg1 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MuxRangeFeed", reflect.TypeOf((*MockInternalClient)(nil).MuxRangeFeed), varargs...) +} + // RangeFeed mocks base method. func (m *MockInternalClient) RangeFeed(arg0 context.Context, arg1 *roachpb.RangeFeedRequest, arg2 ...grpc.CallOption) (roachpb.Internal_RangeFeedClient, error) { m.ctrl.T.Helper() @@ -379,3 +399,140 @@ func (mr *MockInternal_RangeFeedClientMockRecorder) Trailer() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockInternal_RangeFeedClient)(nil).Trailer)) } + +// MockInternal_MuxRangeFeedClient is a mock of Internal_MuxRangeFeedClient interface. +type MockInternal_MuxRangeFeedClient struct { + ctrl *gomock.Controller + recorder *MockInternal_MuxRangeFeedClientMockRecorder +} + +// MockInternal_MuxRangeFeedClientMockRecorder is the mock recorder for MockInternal_MuxRangeFeedClient. +type MockInternal_MuxRangeFeedClientMockRecorder struct { + mock *MockInternal_MuxRangeFeedClient +} + +// NewMockInternal_MuxRangeFeedClient creates a new mock instance. +func NewMockInternal_MuxRangeFeedClient(ctrl *gomock.Controller) *MockInternal_MuxRangeFeedClient { + mock := &MockInternal_MuxRangeFeedClient{ctrl: ctrl} + mock.recorder = &MockInternal_MuxRangeFeedClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockInternal_MuxRangeFeedClient) EXPECT() *MockInternal_MuxRangeFeedClientMockRecorder { + return m.recorder +} + +// CloseSend mocks base method. +func (m *MockInternal_MuxRangeFeedClient) CloseSend() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CloseSend") + ret0, _ := ret[0].(error) + return ret0 +} + +// CloseSend indicates an expected call of CloseSend. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) CloseSend() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseSend", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).CloseSend)) +} + +// Context mocks base method. +func (m *MockInternal_MuxRangeFeedClient) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).Context)) +} + +// Header mocks base method. +func (m *MockInternal_MuxRangeFeedClient) Header() (metadata.MD, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Header") + ret0, _ := ret[0].(metadata.MD) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Header indicates an expected call of Header. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) Header() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Header", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).Header)) +} + +// Recv mocks base method. +func (m *MockInternal_MuxRangeFeedClient) Recv() (*roachpb.MuxRangeFeedEvent, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Recv") + ret0, _ := ret[0].(*roachpb.MuxRangeFeedEvent) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Recv indicates an expected call of Recv. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) Recv() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).Recv)) +} + +// RecvMsg mocks base method. +func (m *MockInternal_MuxRangeFeedClient) RecvMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RecvMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecvMsg indicates an expected call of RecvMsg. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) RecvMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).RecvMsg), arg0) +} + +// Send mocks base method. +func (m *MockInternal_MuxRangeFeedClient) Send(arg0 *roachpb.RangeFeedRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Send", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Send indicates an expected call of Send. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) Send(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).Send), arg0) +} + +// SendMsg mocks base method. +func (m *MockInternal_MuxRangeFeedClient) SendMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendMsg indicates an expected call of SendMsg. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) SendMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).SendMsg), arg0) +} + +// Trailer mocks base method. +func (m *MockInternal_MuxRangeFeedClient) Trailer() metadata.MD { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Trailer") + ret0, _ := ret[0].(metadata.MD) + return ret0 +} + +// Trailer indicates an expected call of Trailer. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) Trailer() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).Trailer)) +} diff --git a/pkg/rpc/auth_tenant.go b/pkg/rpc/auth_tenant.go index 0f49cf9156e7..6d1e171f400e 100644 --- a/pkg/rpc/auth_tenant.go +++ b/pkg/rpc/auth_tenant.go @@ -53,9 +53,8 @@ func (a tenantAuthorizer) authorize( case "/cockroach.roachpb.Internal/RangeLookup": return a.authRangeLookup(tenID, req.(*roachpb.RangeLookupRequest)) - case "/cockroach.roachpb.Internal/RangeFeed": + case "/cockroach.roachpb.Internal/RangeFeed", "/cockroach.roachpb.Internal/MuxRangeFeed": return a.authRangeFeed(tenID, req.(*roachpb.RangeFeedRequest)) - case "/cockroach.roachpb.Internal/GossipSubscription": return a.authGossipSubscription(tenID, req.(*roachpb.GossipSubscriptionRequest)) diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index b8ac93a8020e..f1c2257884ea 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -900,6 +900,20 @@ var rangefeedStreamInfo = &grpc.StreamServerInfo{ IsServerStream: true, } +var muxRangeFeedDesc = &grpc.StreamDesc{ + StreamName: "MuxRangeFeed", + ServerStreams: true, + ClientStreams: true, +} + +const muxRangefeedMethodName = "/cockroach.roachpb.Internal/MuxRangeFeed" + +var muxRangefeedStreamInfo = &grpc.StreamServerInfo{ + FullMethod: muxRangefeedMethodName, + IsClientStream: true, + IsServerStream: true, +} + // RangeFeed implements the RestrictedInternalClient interface. func (a internalClientAdapter) RangeFeed( ctx context.Context, args *roachpb.RangeFeedRequest, opts ...grpc.CallOption, @@ -930,7 +944,7 @@ func (a internalClientAdapter) RangeFeed( args.AdmissionHeader.SourceLocation = roachpb.AdmissionHeader_LOCAL // Spawn a goroutine running the server-side handler. This goroutine - // communicates with the client stream through rangeFeedPipe. + // communicates with the client stream through rfPipe. go func() { // Handler adapts the ServerStream to the typed interface expected by the // RPC handler (Node.RangeFeed). `stream` might be `rfPipe` which we @@ -992,87 +1006,259 @@ func (x rangeFeedClientAdapter) Recv() (*roachpb.RangeFeedEvent, error) { return m, nil } -// rangeFeedPipe is a (uni-directional) pipe of *RangeFeedEvent that implements -// the grpc.ClientStream and grpc.ServerStream interfaces. -type rangeFeedPipe struct { +// MuxRangeFeed implements the RestrictedInternalClient interface. +func (a internalClientAdapter) MuxRangeFeed( + ctx context.Context, opts ...grpc.CallOption, +) (roachpb.Internal_MuxRangeFeedClient, error) { + // MuxRangeFeed consists of two streams: the client stream and the server + // stream. Those two streams are stitched together by the go routine below + // which sends client messages to the server channel, and server responses to + // the client. + clientSide, serverSide := newMuxRangeFeedPipes(grpcutil.NewLocalRequestContext(ctx)) + + go func() { + // Handler adapts the ServerStream to the typed interface expected by the + // RPC handler (Node.RangeFeed). `stream` might be `eventPipe` which we + // pass to the interceptor chain below, or it might be another + // implementation of `ServerStream` that wraps it; in practice it will be + // tracing.grpcinterceptor.StreamServerInterceptor. + handler := func(srv interface{}, stream grpc.ServerStream) error { + adapter := muxRangeFeedServerAdapter{ + ServerStream: stream, + } + return a.server.MuxRangeFeed(adapter) + } + // Run the server interceptors, which will bottom out by running `handler` + // (defined just above), which runs Node.MuxRangeFeed (our RPC handler). + // This call is blocking. + err := a.serverStreamInterceptors.run(a.server, serverSide, muxRangefeedStreamInfo, handler) + if err == nil { + err = io.EOF + } + serverSide.errC <- err + }() + + // Run the client-side interceptors, which produce a gprc.ClientStream. + // clientSide might end up being rfPipe, or it might end up being another + // grpc.ClientStream implementation that wraps it. + // + // NOTE: For actual RPCs, going to a remote note, there's a tracing client + // interceptor producing a tracing.grpcinterceptor.tracingClientStream + // implementation of ClientStream. That client interceptor does not run for + // these local requests handled by the internalClientAdapter (as opposed to + // the tracing server interceptor, which does run). + clientStream, err := a.clientStreamInterceptors.run(ctx, muxRangeFeedDesc, nil /* ClientConn */, muxRangefeedMethodName, + // This function runs at the bottom of the client interceptor stack, + // pretending to actually make an RPC call. We don't make any calls, but + // return the pipe on which messages from the server will come. + func( + ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, opts ...grpc.CallOption, + ) (grpc.ClientStream, error) { + return clientSide, nil + }, + opts...) + if err != nil { + return nil, err + } + + return muxRangeFeedClientAdapter{ + ClientStream: clientStream, + }, nil +} + +type muxRangeFeedClientAdapter struct { + grpc.ClientStream +} + +var _ roachpb.Internal_MuxRangeFeedClient = muxRangeFeedClientAdapter{} + +func (a muxRangeFeedClientAdapter) Send(request *roachpb.RangeFeedRequest) error { + // Mark this request as originating locally. + request.AdmissionHeader.SourceLocation = roachpb.AdmissionHeader_LOCAL + return a.SendMsg(request) +} + +func (a muxRangeFeedClientAdapter) Recv() (*roachpb.MuxRangeFeedEvent, error) { + m := new(roachpb.MuxRangeFeedEvent) + if err := a.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +type muxRangeFeedServerAdapter struct { + grpc.ServerStream +} + +var _ roachpb.Internal_MuxRangeFeedServer = muxRangeFeedServerAdapter{} + +func (a muxRangeFeedServerAdapter) Send(event *roachpb.MuxRangeFeedEvent) error { + return a.SendMsg(event) +} + +func (a muxRangeFeedServerAdapter) Recv() (*roachpb.RangeFeedRequest, error) { + m := new(roachpb.RangeFeedRequest) + if err := a.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// channelStream implements grpc.ClientStream, grpc.ServerStream and grpc.Stream +// interfaces for the purpose of intercepting RCP requests to local node. +// This type simply provides access to the context (as required by grpc.Stream) interface +// while all other methods panic. Type expected to be embedded inside other types. +// Channel stream exchanges messages via channel. +type channelStream struct { ctx context.Context respC chan interface{} errC chan error + doneC <-chan struct{} } -var _ grpc.ClientStream = &rangeFeedPipe{} -var _ grpc.ServerStream = &rangeFeedPipe{} - -// newRangeFeedPipe creates a rangeFeedPipe. The pipe is returned as a pointer -// for convenience, because it's used as a grpc.ClientStream and -// grpc.ServerStream, and these interfaces are implemented on the pointer -// receiver. -func newRangeFeedPipe(ctx context.Context) *rangeFeedPipe { - return &rangeFeedPipe{ +func makeChannelStream(ctx context.Context) channelStream { + return channelStream{ ctx: ctx, respC: make(chan interface{}, 128), errC: make(chan error, 1), + doneC: ctx.Done(), } } // grpc.ClientStream methods. -func (*rangeFeedPipe) Header() (metadata.MD, error) { panic("unimplemented") } -func (*rangeFeedPipe) Trailer() metadata.MD { panic("unimplemented") } -func (*rangeFeedPipe) CloseSend() error { panic("unimplemented") } +func (*channelStream) Header() (metadata.MD, error) { panic("unimplemented") } +func (*channelStream) Trailer() metadata.MD { panic("unimplemented") } +func (*channelStream) CloseSend() error { panic("unimplemented") } // grpc.ServerStream methods. -func (*rangeFeedPipe) SetHeader(metadata.MD) error { panic("unimplemented") } -func (*rangeFeedPipe) SendHeader(metadata.MD) error { panic("unimplemented") } -func (*rangeFeedPipe) SetTrailer(metadata.MD) { panic("unimplemented") } +func (*channelStream) SetHeader(metadata.MD) error { panic("unimplemented") } +func (*channelStream) SendHeader(metadata.MD) error { panic("unimplemented") } +func (*channelStream) SetTrailer(metadata.MD) { panic("unimplemented") } // Common grpc.{Client,Server}Stream methods. -func (p *rangeFeedPipe) Context() context.Context { return p.ctx } +func (s *channelStream) Context() context.Context { return s.ctx } -// SendMsg is part of the grpc.ServerStream interface. It is also part of the -// grpc.ClientStream interface but, in the case of the RangeFeed RPC (which is -// only server-streaming, not bi-directional), only the server sends. -func (p *rangeFeedPipe) SendMsg(m interface{}) error { +// send is the implementation of SendMsg +func (s *channelStream) send(m interface{}) error { select { - case p.respC <- m: + case s.respC <- m: return nil - case <-p.ctx.Done(): - return p.ctx.Err() - } -} - -// RecvMsg is part of the grpc.ClientStream interface. It is also technically -// part of the grpc.ServerStream interface but, in the case of the RangeFeed RPC -// (which is only server-streaming, not bi-directional), only the client -// receives. -func (p *rangeFeedPipe) RecvMsg(m interface{}) error { - out := m.(*roachpb.RangeFeedEvent) - msg, err := p.recvInternal() - if err != nil { - return err + case <-s.doneC: + return s.ctx.Err() } - *out = *msg.(*roachpb.RangeFeedEvent) - return nil } -// recvInternal is the implementation of RecvMsg. -func (p *rangeFeedPipe) recvInternal() (interface{}, error) { +// recv is the implementation of RecvMsg. +func (s *channelStream) recv() (interface{}, error) { // Prioritize respC. Both channels are buffered and the only guarantee we // have is that once an error is sent on errC no other events will be sent // on respC again. select { - case e := <-p.respC: + case e := <-s.respC: return e, nil - case err := <-p.errC: + case err := <-s.errC: select { - case e := <-p.respC: - p.errC <- err + case e := <-s.respC: + s.errC <- err return e, nil default: return nil, err } + case <-s.doneC: + return nil, s.ctx.Err() } } +// eventPipe is a (uni-directional) pipe of events that implements +// the grpc.ClientStream and grpc.ServerStream interfaces. +// The type of events are abstracted behind receiver and sender callbacks. +type eventPipe struct { + channelStream + receiver func(interface{}) error + sender func(interface{}) error +} + +var _ grpc.ClientStream = &eventPipe{} +var _ grpc.ServerStream = &eventPipe{} + +// SendMsg is part of the grpc.ServerStream interface. It is also part of the +// grpc.ClientStream interface. +func (p *eventPipe) SendMsg(m interface{}) error { + return p.sender(m) +} + +// RecvMsg is part of the grpc.ClientStream interface. It is also technically +// part of the grpc.ServerStream interface but, in the case of the RangeFeed RPC +// (which is only server-streaming, not bi-directional), only the client +// receives. +func (p *eventPipe) RecvMsg(m interface{}) error { + return p.receiver(m) +} + +// newRangeFeedPipe creates a eventPipe for receiving RangeFeedEvent. +// The pipe is returned as a pointer for convenience, because it's used as a +// grpc.ClientStream and grpc.ServerStream, and these interfaces are implemented +// on the pointer receiver. +func newRangeFeedPipe(ctx context.Context) *eventPipe { + cs := makeChannelStream(ctx) + return &eventPipe{ + channelStream: cs, + receiver: func(dst interface{}) error { + msg, err := cs.recv() + if err != nil { + return err + } + *dst.(*roachpb.RangeFeedEvent) = *msg.(*roachpb.RangeFeedEvent) + return nil + }, + sender: cs.send, + } +} + +// newMuxRangeFeedPipes creates a pair of eventPipes for client and server side +// streams of the MuxRangeFeed RPC. +// The pipes are returned as a pointers for convenience, because it's used as a +// grpc.ClientStream and grpc.ServerStream, and these interfaces are implemented +// on the pointer receiver. +func newMuxRangeFeedPipes(ctx context.Context) (clientPipe, serverPipe *eventPipe) { + clientStream := makeChannelStream(ctx) + serverStream := makeChannelStream(ctx) + clientPipe = &eventPipe{ + channelStream: clientStream, + // Client receives MuxRangeFeedEvents from the server. + receiver: func(dst interface{}) error { + msg, err := clientStream.recv() + if err != nil { + return err + } + *dst.(*roachpb.MuxRangeFeedEvent) = *msg.(*roachpb.MuxRangeFeedEvent) + return nil + }, + // Client sends RangeFeedRequests to the server. + sender: func(m interface{}) error { + return serverStream.send(m) + }, + } + serverPipe = &eventPipe{ + channelStream: serverStream, + // Server receives RangeFeedRequests from the client. + receiver: func(dst interface{}) error { + msg, err := serverStream.recv() + if err != nil { + return err + } + *dst.(*roachpb.RangeFeedRequest) = *msg.(*roachpb.RangeFeedRequest) + return nil + }, + // Server sends MuxRangeFeedEvents to the client. + sender: func(m interface{}) error { + return clientStream.send(m) + }, + } + + return clientPipe, serverPipe +} + // rangeFeedServerAdapter adapts an untyped ServerStream to the typed // roachpb.Internal_RangeFeedServer interface, expected by the RangeFeed RPC // handler. diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index 8e99e1e2a6da..17ba70200d67 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -319,8 +319,22 @@ var _ roachpb.InternalServer = &internalServer{} type internalServer struct { // rangeFeedEvents are returned on RangeFeed() calls. - rangeFeedEvents []roachpb.RangeFeedEvent - serverStream roachpb.Internal_RangeFeedServer + rangeFeedEvents []roachpb.RangeFeedEvent + rfServerStream roachpb.Internal_RangeFeedServer + muxRfServerStream roachpb.Internal_MuxRangeFeedServer +} + +func (s *internalServer) MuxRangeFeed(stream roachpb.Internal_MuxRangeFeedServer) error { + s.muxRfServerStream = stream + for _, ev := range s.rangeFeedEvents { + muxEv := &roachpb.MuxRangeFeedEvent{ + RangeFeedEvent: ev, + } + if err := stream.Send(muxEv); err != nil { + return err + } + } + return nil } func (*internalServer) Batch( @@ -338,7 +352,7 @@ func (*internalServer) RangeLookup( func (s *internalServer) RangeFeed( _ *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, ) error { - s.serverStream = stream + s.rfServerStream = stream for _, ev := range s.rangeFeedEvents { evCpy := ev if err := stream.Send(&evCpy); err != nil { @@ -539,37 +553,55 @@ func TestInternalClientAdapterWithClientStreamInterceptors(t *testing.T) { serverCtx.NodeID.Set(context.Background(), 1) _ /* server */, serverInterceptors := NewServerEx(serverCtx) + testutils.RunTrueAndFalse(t, "use_mux_rangefeed", func(t *testing.T, useMux bool) { + var clientInterceptors ClientInterceptorInfo + var s *testClientStream + clientInterceptors.StreamInterceptors = append(clientInterceptors.StreamInterceptors, + func( + ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, + method string, streamer grpc.Streamer, opts ...grpc.CallOption, + ) (grpc.ClientStream, error) { + clientStream, err := streamer(ctx, desc, cc, method, opts...) + if err != nil { + return nil, err + } + s = &testClientStream{inner: clientStream} + return s, nil + }) - var clientInterceptors ClientInterceptorInfo - var s *testClientStream - clientInterceptors.StreamInterceptors = append(clientInterceptors.StreamInterceptors, - func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { - clientStream, err := streamer(ctx, desc, cc, method, opts...) - if err != nil { - return nil, err + internal := &internalServer{rangeFeedEvents: []roachpb.RangeFeedEvent{{}, {}}} + serverCtx.SetLocalInternalServer(internal, serverInterceptors, clientInterceptors) + ic := serverCtx.GetLocalInternalClientForAddr(serverCtx.Config.AdvertiseAddr, 1) + lic, ok := ic.(internalClientAdapter) + require.True(t, ok) + require.Equal(t, internal, lic.server) + + var receiveEvent func() error + if useMux { + stream, err := lic.MuxRangeFeed(ctx) + require.NoError(t, err) + receiveEvent = func() error { + _, err := stream.Recv() + return err + } + } else { + stream, err := lic.RangeFeed(ctx, &roachpb.RangeFeedRequest{}) + require.NoError(t, err) + receiveEvent = func() error { + _, err := stream.Recv() + return err } - s = &testClientStream{inner: clientStream} - return s, nil - }) - - internal := &internalServer{rangeFeedEvents: []roachpb.RangeFeedEvent{{}, {}}} - serverCtx.SetLocalInternalServer(internal, serverInterceptors, clientInterceptors) - ic := serverCtx.GetLocalInternalClientForAddr(serverCtx.Config.AdvertiseAddr, 1) - lic, ok := ic.(internalClientAdapter) - require.True(t, ok) - require.Equal(t, internal, lic.server) - - stream, err := lic.RangeFeed(ctx, &roachpb.RangeFeedRequest{}) - require.NoError(t, err) - // Consume the stream. - for { - _, err := stream.Recv() - if err == io.EOF { - break } - require.NoError(t, err) - } - require.Equal(t, len(internal.rangeFeedEvents)+1, s.recvCount) + // Consume the stream. + for { + err := receiveEvent() + if err == io.EOF { + break + } + require.NoError(t, err) + } + require.Equal(t, len(internal.rangeFeedEvents)+1, s.recvCount) + }) } // Test that a server stream interceptor can wrap the ServerStream when the @@ -591,56 +623,78 @@ func TestInternalClientAdapterWithServerStreamInterceptors(t *testing.T) { serverCtx.NodeID.Set(context.Background(), 1) _ /* server */, serverInterceptors := NewServerEx(serverCtx) + testutils.RunTrueAndFalse(t, "use_mux_rangefeed", func(t *testing.T, useMux bool) { + const int1Name = "interceptor 1" + serverInterceptors.StreamInterceptors = append(serverInterceptors.StreamInterceptors, + func( + srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler, + ) error { + serverStream := &testServerStream{name: "interceptor 1", inner: ss} + return handler(srv, serverStream) + }) + var secondInterceptorWrapped grpc.ServerStream + const int2Name = "interceptor 2" + serverInterceptors.StreamInterceptors = append(serverInterceptors.StreamInterceptors, + func( + srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler, + ) error { + secondInterceptorWrapped = ss + serverStream := &testServerStream{name: int2Name, inner: ss} + return handler(srv, serverStream) + }) - const int1Name = "interceptor 1" - serverInterceptors.StreamInterceptors = append(serverInterceptors.StreamInterceptors, - func( - srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler, - ) error { - serverStream := &testServerStream{name: "interceptor 1", inner: ss} - return handler(srv, serverStream) - }) - var secondInterceptorWrapped grpc.ServerStream - const int2Name = "interceptor 2" - serverInterceptors.StreamInterceptors = append(serverInterceptors.StreamInterceptors, - func( - srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler, - ) error { - secondInterceptorWrapped = ss - serverStream := &testServerStream{name: int2Name, inner: ss} - return handler(srv, serverStream) - }) + internal := &internalServer{rangeFeedEvents: []roachpb.RangeFeedEvent{{}, {}}} + serverCtx.SetLocalInternalServer(internal, serverInterceptors, ClientInterceptorInfo{}) + ic := serverCtx.GetLocalInternalClientForAddr(serverCtx.Config.AdvertiseAddr, 1) + lic, ok := ic.(internalClientAdapter) + require.True(t, ok) + require.Equal(t, internal, lic.server) + + var receiveEvent func() error + if useMux { + stream, err := lic.MuxRangeFeed(ctx) + require.NoError(t, err) + receiveEvent = func() error { + _, err := stream.Recv() + return err + } + } else { + stream, err := lic.RangeFeed(ctx, &roachpb.RangeFeedRequest{}) + require.NoError(t, err) + receiveEvent = func() error { + _, err := stream.Recv() + return err + } + } - internal := &internalServer{rangeFeedEvents: []roachpb.RangeFeedEvent{{}, {}}} - serverCtx.SetLocalInternalServer(internal, serverInterceptors, ClientInterceptorInfo{}) - ic := serverCtx.GetLocalInternalClientForAddr(serverCtx.Config.AdvertiseAddr, 1) - lic, ok := ic.(internalClientAdapter) - require.True(t, ok) - require.Equal(t, internal, lic.server) + // Consume the stream. This will synchronize with the server RPC handler + // goroutine, ensuring that the server-side interceptors run. + for { + err := receiveEvent() + if err == io.EOF { + break + } + require.NoError(t, err) + } - stream, err := lic.RangeFeed(ctx, &roachpb.RangeFeedRequest{}) - require.NoError(t, err) + require.IsType(t, &testServerStream{}, secondInterceptorWrapped) - // Consume the stream. This will synchronize with the server RPC handler - // goroutine, ensuring that the server-side interceptors run. - for { - _, err := stream.Recv() - if err == io.EOF { - break + require.Equal(t, int1Name, secondInterceptorWrapped.(*testServerStream).name) + var ss grpc.ServerStream + if useMux { + require.IsType(t, muxRangeFeedServerAdapter{}, internal.muxRfServerStream) + ss = internal.muxRfServerStream.(muxRangeFeedServerAdapter).ServerStream + } else { + require.IsType(t, rangeFeedServerAdapter{}, internal.rfServerStream) + ss = internal.rfServerStream.(rangeFeedServerAdapter).ServerStream } - require.NoError(t, err) - } - - require.IsType(t, &testServerStream{}, secondInterceptorWrapped) - require.Equal(t, int1Name, secondInterceptorWrapped.(*testServerStream).name) - require.IsType(t, rangeFeedServerAdapter{}, internal.serverStream) - ss := internal.serverStream.(rangeFeedServerAdapter).ServerStream - require.IsType(t, &testServerStream{}, ss) - topStream := ss.(*testServerStream) - require.Equal(t, int2Name, topStream.name) - require.IsType(t, &testServerStream{}, topStream.inner) - bottomStream := topStream.inner.(*testServerStream) - require.Equal(t, int1Name, bottomStream.name) + require.IsType(t, &testServerStream{}, ss) + topStream := ss.(*testServerStream) + require.Equal(t, int2Name, topStream.name) + require.IsType(t, &testServerStream{}, topStream.inner) + bottomStream := topStream.inner.(*testServerStream) + require.Equal(t, int1Name, bottomStream.name) + }) } type testClientStream struct { diff --git a/pkg/rpc/nodedialer/nodedialer_test.go b/pkg/rpc/nodedialer/nodedialer_test.go index 6ab800c44b89..3d5756dd205c 100644 --- a/pkg/rpc/nodedialer/nodedialer_test.go +++ b/pkg/rpc/nodedialer/nodedialer_test.go @@ -571,6 +571,10 @@ func (*internalServer) RangeFeed( panic("unimplemented") } +func (s *internalServer) MuxRangeFeed(server roachpb.Internal_MuxRangeFeedServer) error { + panic("implement me") +} + func (*internalServer) GossipSubscription( *roachpb.GossipSubscriptionRequest, roachpb.Internal_GossipSubscriptionServer, ) error { diff --git a/pkg/rpc/restricted_internal_client.go b/pkg/rpc/restricted_internal_client.go index 1f7b480c3eae..808067578c4b 100644 --- a/pkg/rpc/restricted_internal_client.go +++ b/pkg/rpc/restricted_internal_client.go @@ -24,4 +24,5 @@ import ( type RestrictedInternalClient interface { Batch(ctx context.Context, in *roachpb.BatchRequest, opts ...grpc.CallOption) (*roachpb.BatchResponse, error) RangeFeed(ctx context.Context, in *roachpb.RangeFeedRequest, opts ...grpc.CallOption) (roachpb.Internal_RangeFeedClient, error) + MuxRangeFeed(ctx context.Context, opts ...grpc.CallOption) (roachpb.Internal_MuxRangeFeedClient, error) } diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index a098516f1ae5..024f9089e0e9 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -232,6 +232,7 @@ go_library( "//pkg/util/admission/admissionpb", "//pkg/util/buildutil", "//pkg/util/contextutil", + "//pkg/util/ctxgroup", "//pkg/util/envutil", "//pkg/util/goschedstats", "//pkg/util/grpcutil", diff --git a/pkg/server/node.go b/pkg/server/node.go index 05542ada7708..aaac487ad8ab 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -46,6 +46,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -1237,6 +1238,12 @@ func (n *Node) RangeLookup( // RangeFeed implements the roachpb.InternalServer interface. func (n *Node) RangeFeed( args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, +) error { + return n.singleRangeFeed(args, stream) +} + +func (n *Node) singleRangeFeed( + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) error { pErr := n.stores.RangeFeed(args, stream) if pErr != nil { @@ -1249,6 +1256,78 @@ func (n *Node) RangeFeed( return nil } +// setRangeIDEventSink annotates each response with range and stream IDs. +// This is used by MuxRangeFeed. +// TODO: This code can be removed in 22.2 once MuxRangeFeed is the default, and +// the old style RangeFeed deprecated. +type setRangeIDEventSink struct { + ctx context.Context + rangeID roachpb.RangeID + streamID int64 + wrapped *lockedMuxStream +} + +func (s *setRangeIDEventSink) Context() context.Context { + return s.ctx +} + +func (s *setRangeIDEventSink) Send(event *roachpb.RangeFeedEvent) error { + response := &roachpb.MuxRangeFeedEvent{ + RangeFeedEvent: *event, + RangeID: s.rangeID, + StreamID: s.streamID, + } + return s.wrapped.Send(response) +} + +var _ roachpb.RangeFeedEventSink = (*setRangeIDEventSink)(nil) + +// lockedMuxStream provides support for concurrent calls to Send. +// The underlying MuxRangeFeedServer is not safe for concurrent calls to Send. +type lockedMuxStream struct { + wrapped roachpb.Internal_MuxRangeFeedServer + sendMu syncutil.Mutex +} + +func (s *lockedMuxStream) Context() context.Context { + return s.wrapped.Context() +} + +func (s *lockedMuxStream) Send(e *roachpb.MuxRangeFeedEvent) error { + s.sendMu.Lock() + defer s.sendMu.Unlock() + return s.wrapped.Send(e) +} + +// MuxRangeFeed implements the roachpb.InternalServer interface. +func (n *Node) MuxRangeFeed(stream roachpb.Internal_MuxRangeFeedServer) error { + ctx, cancelFeeds := n.stopper.WithCancelOnQuiesce(stream.Context()) + defer cancelFeeds() + rfGrp := ctxgroup.WithContext(ctx) + + muxStream := &lockedMuxStream{wrapped: stream} + for { + req, err := stream.Recv() + if err != nil { + cancelFeeds() + return errors.CombineErrors(err, rfGrp.Wait()) + } + + rfGrp.GoCtx(func(ctx context.Context) error { + ctx, span := tracing.ForkSpan(logtags.AddTag(ctx, "r", req.RangeID), "mux-rf") + defer span.Finish() + + sink := setRangeIDEventSink{ + ctx: ctx, + rangeID: req.RangeID, + streamID: req.StreamID, + wrapped: muxStream, + } + return n.singleRangeFeed(req, &sink) + }) + } +} + // ResetQuorum implements the roachpb.InternalServer interface. func (n *Node) ResetQuorum( ctx context.Context, req *roachpb.ResetQuorumRequest,