diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 115a3027bc5b..d5673df271b0 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -177,4 +177,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 21.2-50 set the active cluster version in the format '.' +version version 21.2-52 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 3ca78510e519..ee96273653a4 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -185,6 +185,6 @@ trace.jaeger.agentstringthe address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collectorstringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collectorstringthe address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -versionversion21.2-50set the active cluster version in the format '.' +versionversion21.2-52set the active cluster version in the format '.' diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 1cc066ae2a2f..e2b940acac5f 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -251,6 +251,8 @@ const ( // EnableProtectedTimestampsForTenant enables the use of protected timestamps // in secondary tenants. EnableProtectedTimestampsForTenant + // Rangefeed use 1 stream per node + RangefeedUseOneStreamPerNode // ************************************************* // Step (1): Add new versions here. @@ -387,6 +389,10 @@ var versionsSingleton = keyedVersions{ Key: EnableProtectedTimestampsForTenant, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 50}, }, + { + Key: RangefeedUseOneStreamPerNode, + Version: roachpb.Version{Major: 21, Minor: 2, Internal: 52}, + }, // ************************************************* // Step (2): Add new versions here. diff --git a/pkg/clusterversion/key_string.go b/pkg/clusterversion/key_string.go index 1f9bfd1df5b4..6fa398b7123d 100644 --- a/pkg/clusterversion/key_string.go +++ b/pkg/clusterversion/key_string.go @@ -34,11 +34,12 @@ func _() { _ = x[UnsafeLossOfQuorumRecoveryRangeLog-23] _ = x[AlterSystemProtectedTimestampAddColumn-24] _ = x[EnableProtectedTimestampsForTenant-25] + _ = x[RangefeedUseOneStreamPerNode-26] } -const _Key_name = "V21_2Start22_1TargetBytesAvoidExcessAvoidDrainingNamesDrainingNamesMigrationTraceIDDoesntImplyStructuredRecordingAlterSystemTableStatisticsAddAvgSizeColAlterSystemStmtDiagReqsMVCCAddSSTableInsertPublicSchemaNamespaceEntryOnRestoreUnsplitRangesInAsyncGCJobsValidateGrantOptionPebbleFormatBlockPropertyCollectorProbeRequestSelectRPCsTakeTracingInfoInbandPreSeedTenantSpanConfigsSeedTenantSpanConfigsPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreScanWholeRowsSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenant" +const _Key_name = "V21_2Start22_1TargetBytesAvoidExcessAvoidDrainingNamesDrainingNamesMigrationTraceIDDoesntImplyStructuredRecordingAlterSystemTableStatisticsAddAvgSizeColAlterSystemStmtDiagReqsMVCCAddSSTableInsertPublicSchemaNamespaceEntryOnRestoreUnsplitRangesInAsyncGCJobsValidateGrantOptionPebbleFormatBlockPropertyCollectorProbeRequestSelectRPCsTakeTracingInfoInbandPreSeedTenantSpanConfigsSeedTenantSpanConfigsPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreScanWholeRowsSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenantRangefeedUseOneStreamPerNode" -var _Key_index = [...]uint16{0, 5, 14, 36, 54, 76, 113, 152, 175, 189, 230, 256, 275, 309, 321, 352, 376, 397, 425, 455, 483, 504, 517, 536, 570, 608, 642} +var _Key_index = [...]uint16{0, 5, 14, 36, 54, 76, 113, 152, 175, 189, 230, 256, 275, 309, 321, 352, 376, 397, 425, 455, 483, 504, 517, 536, 570, 608, 642, 670} func (i Key) String() string { if i < 0 || i >= Key(len(_Key_index)-1) { diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index e7f17acd46db..55b1a504aa4b 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -8,6 +8,7 @@ go_library( "batch.go", "condensable_span_set.go", "dist_sender.go", + "dist_sender_mux_rangefeed.go", "dist_sender_rangefeed.go", "doc.go", "local_test_cluster_util.go", @@ -36,6 +37,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/base", + "//pkg/clusterversion", "//pkg/gossip", "//pkg/keys", "//pkg/kv", @@ -112,6 +114,7 @@ go_test( srcs = [ "batch_test.go", "condensable_span_set_test.go", + "dist_sender_rangefeed_mock_test.go", "dist_sender_rangefeed_test.go", "dist_sender_server_test.go", "dist_sender_test.go", @@ -143,6 +146,7 @@ go_test( deps = [ "//build/bazelutil:noop", "//pkg/base", + "//pkg/clusterversion", "//pkg/config", "//pkg/config/zonepb", "//pkg/gossip", @@ -164,6 +168,7 @@ go_test( "//pkg/security/securitytest", "//pkg/server", "//pkg/settings/cluster", + "//pkg/sql/catalog/desctestutils", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", "//pkg/storage", @@ -174,9 +179,11 @@ go_test( "//pkg/testutils/localtestcluster", "//pkg/testutils/serverutils", "//pkg/testutils/skip", + "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util", "//pkg/util/caller", + "//pkg/util/ctxgroup", "//pkg/util/grpcutil", "//pkg/util/hlc", "//pkg/util/leaktest", diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go new file mode 100644 index 000000000000..91da3da53d1e --- /dev/null +++ b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go @@ -0,0 +1,419 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvcoord + +import ( + "context" + "fmt" + "io" + + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/grpcutil" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" +) + +type rfStream struct { + g ctxgroup.Group + ds *DistSender + rr *rangeFeedRegistry + eventsCh chan<- *roachpb.RangeFeedEvent + withDiff bool + + mu struct { + syncutil.Mutex + + // Currently running MuxRangeFeed streams. + streams map[roachpb.NodeID]*streamState + + // Counter identifying particular RangeFeed stream. + rangeFeedID int64 + } +} + +// startMuxRangeFeed starts rangefeed using streaming RangeFeedStream RPC. +func (ds *DistSender) startMuxRangeFeed( + ctx context.Context, + spans []roachpb.Span, + startFrom hlc.Timestamp, + withDiff bool, + rr *rangeFeedRegistry, + eventCh chan<- *roachpb.RangeFeedEvent, +) error { + rSpans, err := spansToResolvedSpans(spans) + if err != nil { + return err + } + + rfs := rfStream{ + g: ctxgroup.WithContext(ctx), + ds: ds, + rr: rr, + eventsCh: eventCh, + withDiff: withDiff, + } + + // Kick off the initial set of ranges. + for i := range rSpans { + rSpan := rSpans[i] + rfs.g.GoCtx(func(ctx context.Context) error { + return divideAndIterateRSpan(ctx, ds, rSpan, + func(rs roachpb.RSpan, token rangecache.EvictionToken) error { + return rfs.startRangeFeedStream(ctx, rs, startFrom, token) + }, + ) + }) + } + + return rfs.g.Wait() +} + +type rangeState struct { + *activeRangeFeed + startFrom hlc.Timestamp + token rangecache.EvictionToken + rSpan roachpb.RSpan +} + +func (rs *rangeState) restartFrom() hlc.Timestamp { + restartFrom := rs.activeRangeFeed.snapshot().Resolved + if restartFrom.Less(rs.startFrom) { + // RangeFeed happily forwards any closed timestamps it receives as + // soon as there are no outstanding intents under them. + // There it's possible that the resolved timestamp observed by this stream + // might be lower than the initial startFrom. + restartFrom = rs.startFrom + } + return restartFrom +} + +type rangeStateKey struct { + requestID int64 + rangeID roachpb.RangeID +} + +type streamState struct { + nodeID roachpb.NodeID + stream roachpb.Internal_MuxRangeFeedClient + + mu struct { + syncutil.Mutex + activeFeeds map[rangeStateKey]rangeState + } +} + +func (ss *streamState) getRangeState(key rangeStateKey) rangeState { + ss.mu.Lock() + defer ss.mu.Unlock() + return ss.mu.activeFeeds[key] +} + +func (ss *streamState) clearActive(key rangeStateKey) error { + ss.mu.Lock() + defer ss.mu.Unlock() + if rs, found := ss.mu.activeFeeds[key]; found { + rs.activeRangeFeed.release() + delete(ss.mu.activeFeeds, key) + return nil + } + return errors.AssertionFailedf("expected to find active range feed %s, found none", key) +} + +func (ss *streamState) numActiveFeeds() int { + ss.mu.Lock() + defer ss.mu.Unlock() + return len(ss.mu.activeFeeds) +} + +func (ss *streamState) markActive( + rr *rangeFeedRegistry, + nodeID roachpb.NodeID, + requestID int64, + token rangecache.EvictionToken, + rSpan roachpb.RSpan, + startFrom hlc.Timestamp, +) rangeStateKey { + ss.mu.Lock() + defer ss.mu.Unlock() + + if ss.mu.activeFeeds == nil { + ss.mu.activeFeeds = make(map[rangeStateKey]rangeState) + } + + rangeID := token.Desc().RangeID + span := rSpan.AsRawSpanWithNoLocals() + + active := newActiveRangeFeed(rr, span, startFrom) + active.setNodeID(nodeID) + active.setRangeID(rangeID) + + key := rangeStateKey{ + requestID: requestID, + rangeID: rangeID, + } + + ss.mu.activeFeeds[key] = rangeState{ + activeRangeFeed: active, + token: token, + rSpan: rSpan, + startFrom: startFrom, + } + return key +} + +// nextRequestID returns the next id to use when issuing RangeFeed calls on MuxStream. +func (rfs *rfStream) nextRequestID() int64 { + rfs.mu.Lock() + defer rfs.mu.Unlock() + rfs.mu.rangeFeedID++ + return rfs.mu.rangeFeedID +} + +// startRangeFeedStreem opens RangeFeedStream connection to a node that can serve +// rangefeed for the one of the replicas in token range descriptor. +func (rfs *rfStream) startRangeFeedStream( + ctx context.Context, rs roachpb.RSpan, startFrom hlc.Timestamp, token rangecache.EvictionToken, +) error { + if ctx.Err() != nil { + // Don't bother starting stream if we are already cancelled. + return ctx.Err() + } + + for r := retry.StartWithCtx(ctx, rfs.ds.rpcRetryOptions); r.Next(); { + if !token.Valid() { + var err error + ri, err := rfs.ds.getRoutingInfo(ctx, rs.Key, rangecache.EvictionToken{}, false) + if err != nil { + log.VErrEventf(ctx, 1, "range descriptor re-lookup failed: %s", err) + if !rangecache.IsRangeLookupErrorRetryable(err) { + return err + } + continue + } + token = ri + } + + ss, replica, mustStartConsumer, err := rfs.lookupOrCreateStream(ctx, token) + + if err != nil { + if IsSendError(err) { + token.Evict(ctx) + continue + } + return err + } + + if mustStartConsumer { + rfs.g.GoCtx(func(ctx context.Context) error { + if ss == nil { + panic("ss is nil") + } + return rfs.rfStreamEventProcessor(ctx, ss) + }) + } + + rangeID := token.Desc().RangeID + activeKey := ss.markActive(rfs.rr, replica.NodeID, rfs.nextRequestID(), token, rs, startFrom) + + req := roachpb.RangeFeedRequest{ + Span: rs.AsRawSpanWithNoLocals(), + Header: roachpb.Header{ + Timestamp: startFrom, + RangeID: rangeID, + }, + WithDiff: rfs.withDiff, + RequestID: activeKey.requestID, + } + req.Replica = replica + err = ss.stream.Send(&req) + if err == nil { + return nil + } + + log.VErrEventf(ctx, 2, "RPC error: %s", err) + if grpcutil.IsAuthError(err) { + // Authentication or authorization error. Propagate. + return err + } + // Otherwise, fallback to retry. + if err := ss.clearActive(activeKey); err != nil { + return err + } + } + + return ctx.Err() +} + +func (rfs *rfStream) lookupOrCreateStream( + ctx context.Context, token rangecache.EvictionToken, +) (*streamState, roachpb.ReplicaDescriptor, bool, error) { + transport, replicas, err := rfs.ds.prepareTransportForDescriptor(ctx, token.Desc()) + if err != nil { + return nil, roachpb.ReplicaDescriptor{}, false, err + } + defer transport.Release() + + for !transport.IsExhausted() { + ss, replica, mustStartConsumer, err := rfs.lookupOrCreateStreamForNextReplica(ctx, transport) + if err == nil { + return ss, replica, mustStartConsumer, err + } + } + + return nil, roachpb.ReplicaDescriptor{}, false, newSendError( + fmt.Sprintf("sending to all %d replicas failed", len(replicas))) +} + +func (rfs *rfStream) lookupOrCreateStreamForNextReplica( + ctx context.Context, transport Transport, +) (*streamState, roachpb.ReplicaDescriptor, bool, error) { + replica := transport.NextReplica() + nodeID := replica.NodeID + + rfs.mu.Lock() + defer rfs.mu.Unlock() + + if rfs.mu.streams == nil { + rfs.mu.streams = make(map[roachpb.NodeID]*streamState) + } + if ss, ok := rfs.mu.streams[nodeID]; ok { + return ss, replica, false, nil + } + clientCtx, client, err := transport.NextInternalClient(ctx) + if err != nil { + return nil, roachpb.ReplicaDescriptor{}, false, err + } + stream, err := client.MuxRangeFeed(clientCtx) + if err != nil { + return nil, roachpb.ReplicaDescriptor{}, false, err + } + + ss := &streamState{ + nodeID: nodeID, + stream: stream, + } + + rfs.mu.streams[nodeID] = ss + return ss, replica, true, nil +} + +func (rfs *rfStream) restartAllActive(ctx context.Context, ss *streamState) error { + rfs.mu.Lock() + defer rfs.mu.Unlock() + + if rfs.mu.streams[ss.nodeID] != ss { + panic("corrupt rangefeed state") + } + + delete(rfs.mu.streams, ss.nodeID) + + ss.mu.Lock() + defer ss.mu.Unlock() + + for _, active := range ss.mu.activeFeeds { + if err := rfs.restartRange(ctx, active.rSpan, active.restartFrom()); err != nil { + return err + } + } + + return nil +} + +func (rfs *rfStream) restartRange( + ctx context.Context, rs roachpb.RSpan, restartFrom hlc.Timestamp, +) error { + return divideAndIterateRSpan(ctx, rfs.ds, rs, + func(rs roachpb.RSpan, token rangecache.EvictionToken) error { + return rfs.startRangeFeedStream(ctx, rs, restartFrom, token) + }, + ) +} + +// rfStreamEventProcessor is responsible for processing rangefeed events from a node. +func (rfs *rfStream) rfStreamEventProcessor(ctx context.Context, ss *streamState) error { + for { + event, eventErr := ss.stream.Recv() + if eventErr == io.EOF { + if log.V(1) { + log.Infof(ctx, "RangeFeed for node %d disconnected. Restarting %d ranges", + ss.nodeID, ss.numActiveFeeds()) + } + return rfs.restartAllActive(ctx, ss) + } + + // Get range state associated with this rangefeed. + var rs rangeState + if eventErr == nil { + if event == nil { + return errors.AssertionFailedf("unexpected state: expected non nil event") + } + rsKey := rangeStateKey{requestID: event.RequestID, rangeID: event.RangeID} + rs = ss.getRangeState(rsKey) + if rs.activeRangeFeed == nil { + return errors.AssertionFailedf( + "unexpected state: expected to find active range feed for %v, found none", + rsKey) + } + } + + if eventErr == nil && event.Error != nil { + eventErr = event.Error.Error.GoError() + } + + if eventErr == nil { + // Dispatch event to the caller. + rs.onRangeEvent(&event.RangeFeedEvent) + + select { + case rfs.eventsCh <- &event.RangeFeedEvent: + case <-ctx.Done(): + return ctx.Err() + } + continue + } + + errDisposition, err := handleRangeError(eventErr) + if err != nil { + return err + } + + // Clear active range feed -- it will be re-added when we retry/restart. + if err := ss.clearActive( + rangeStateKey{requestID: event.RequestID, rangeID: event.RangeID}, + ); err != nil { + return err + } + + restartFrom := rs.restartFrom() + if log.V(1) { + log.Infof(ctx, "Transient error (%v) for rangefeed from node %d for range %d: %s %s@%s", + eventErr, ss.nodeID, event.RangeID, errDisposition, rs.rSpan, restartFrom) + } + + switch errDisposition { + case retryRange: + if err := rfs.startRangeFeedStream(ctx, rs.rSpan, restartFrom, rs.token); err != nil { + return err + } + case restartRange: + if err := rfs.restartRange(ctx, rs.rSpan, restartFrom); err != nil { + return err + } + default: + panic("unexpected error disposition") + } + } +} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 75c916399539..320da7f6dc96 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -18,6 +18,7 @@ import ( "time" "unsafe" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -25,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/iterutil" @@ -51,6 +53,9 @@ var useDedicatedRangefeedConnectionClass = settings.RegisterBoolSetting( "kv.rangefeed.use_dedicated_connection_class.enabled", false), ) +// A "kill switch" to disable multiplexing rangefeed if severe issues discovered with new implementation. +var enableMuxRangeFeed = envutil.EnvOrDefaultBool("COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED", true) + // RangeFeed divides a RangeFeed request on range boundaries and establishes a // RangeFeed to each of the individual ranges. It streams back results on the // provided channel. @@ -76,6 +81,28 @@ func (ds *DistSender) RangeFeed( ds.activeRangeFeeds.Store(rr, nil) defer ds.activeRangeFeeds.Delete(rr) + if ds.st.Version.IsActive(ctx, clusterversion.RangefeedUseOneStreamPerNode) && + enableMuxRangeFeed { + return ds.startMuxRangeFeed(ctx, spans, startFrom, withDiff, rr, eventCh) + } + + return ds.startRangeFeed(ctx, spans, startFrom, withDiff, rr, eventCh) +} + +// TODO(yevgeniy): Deprecate and remove non-streaming implementation in 22.2 +func (ds *DistSender) startRangeFeed( + ctx context.Context, + spans []roachpb.Span, + startFrom hlc.Timestamp, + withDiff bool, + rr *rangeFeedRegistry, + eventCh chan<- *roachpb.RangeFeedEvent, +) error { + rSpans, err := spansToResolvedSpans(spans) + if err != nil { + return err + } + g := ctxgroup.WithContext(ctx) // Goroutine that processes subdivided ranges and creates a rangefeed for // each. @@ -95,18 +122,29 @@ func (ds *DistSender) RangeFeed( }) // Kick off the initial set of ranges. - for _, span := range spans { - rs, err := keys.SpanAddr(span) - if err != nil { - return err - } + for i := range rSpans { + rs := rSpans[i] g.GoCtx(func(ctx context.Context) error { - return ds.divideAndSendRangeFeedToRanges(ctx, rs, startFrom, rangeCh) + return divideAndSendRangeFeedToRanges(ctx, ds, rs, startFrom, rangeCh) }) } + return g.Wait() } +func spansToResolvedSpans(spans []roachpb.Span) (rSpans []roachpb.RSpan, _ error) { + var sg roachpb.SpanGroup + sg.Add(spans...) + for _, sp := range sg.Slice() { + rs, err := keys.SpanAddr(sp) + if err != nil { + return nil, err + } + rSpans = append(rSpans, rs) + } + return rSpans, nil +} + // RangeFeedContext is the structure containing arguments passed to // RangeFeed call. It functions as a kind of key for an active range feed. type RangeFeedContext struct { @@ -138,17 +176,11 @@ func (ds *DistSender) ForEachActiveRangeFeed(fn ActiveRangeFeedIterFn) (iterErr const continueIter = true const stopIter = false - partialRangeFeed := func(active *activeRangeFeed) PartialRangeFeed { - active.Lock() - defer active.Unlock() - return active.PartialRangeFeed - } - ds.activeRangeFeeds.Range(func(k, v interface{}) bool { r := k.(*rangeFeedRegistry) r.ranges.Range(func(k, v interface{}) bool { active := k.(*activeRangeFeed) - if err := fn(r.RangeFeedContext, partialRangeFeed(active)); err != nil { + if err := fn(r.RangeFeedContext, active.snapshot()); err != nil { iterErr = err return stopIter } @@ -166,23 +198,50 @@ func (ds *DistSender) ForEachActiveRangeFeed(fn ActiveRangeFeedIterFn) (iterErr // activeRangeFeed is a thread safe PartialRangeFeed. type activeRangeFeed struct { - syncutil.Mutex - PartialRangeFeed + release func() + mu struct { + syncutil.Mutex + PartialRangeFeed + } } -func (a *activeRangeFeed) onRangeEvent( - nodeID roachpb.NodeID, rangeID roachpb.RangeID, event *roachpb.RangeFeedEvent, -) { - a.Lock() - defer a.Unlock() +func newActiveRangeFeed( + rr *rangeFeedRegistry, span roachpb.Span, startTS hlc.Timestamp, +) *activeRangeFeed { + a := &activeRangeFeed{} + a.mu.PartialRangeFeed.Span = span + a.mu.StartTS = startTS + a.release = func() { rr.ranges.Delete(a) } + rr.ranges.Store(a, nil) + return a +} + +func (a *activeRangeFeed) snapshot() PartialRangeFeed { + a.mu.Lock() + defer a.mu.Unlock() + return a.mu.PartialRangeFeed +} + +func (a *activeRangeFeed) setNodeID(nodeID roachpb.NodeID) { + a.mu.Lock() + defer a.mu.Unlock() + a.mu.NodeID = nodeID +} + +func (a *activeRangeFeed) setRangeID(rangeID roachpb.RangeID) { + a.mu.Lock() + defer a.mu.Unlock() + a.mu.RangeID = rangeID +} + +func (a *activeRangeFeed) onRangeEvent(event *roachpb.RangeFeedEvent) { + a.mu.Lock() + defer a.mu.Unlock() if event.Val != nil || event.SST != nil { - a.LastValueReceived = timeutil.Now() + a.mu.LastValueReceived = timeutil.Now() } else if event.Checkpoint != nil { - a.Resolved = event.Checkpoint.ResolvedTS + a.mu.Resolved = event.Checkpoint.ResolvedTS } - - a.NodeID = nodeID - a.RangeID = rangeID } // rangeFeedRegistry is responsible for keeping track of currently executing @@ -209,8 +268,11 @@ func newRangeFeedRegistry( return rr } -func (ds *DistSender) divideAndSendRangeFeedToRanges( - ctx context.Context, rs roachpb.RSpan, startFrom hlc.Timestamp, rangeCh chan<- singleRangeInfo, +func divideAndIterateRSpan( + ctx context.Context, + ds *DistSender, + rs roachpb.RSpan, + fn func(rs roachpb.RSpan, token rangecache.EvictionToken) error, ) error { // As RangeIterator iterates, it can return overlapping descriptors (and // during splits, this happens frequently), but divideAndSendRangeFeedToRanges @@ -226,14 +288,8 @@ func (ds *DistSender) divideAndSendRangeFeedToRanges( return err } nextRS.Key = partialRS.EndKey - select { - case rangeCh <- singleRangeInfo{ - rs: partialRS, - startFrom: startFrom, - token: ri.Token(), - }: - case <-ctx.Done(): - return ctx.Err() + if err := fn(partialRS, ri.Token()); err != nil { + return err } if !ri.NeedAnother(nextRS) { break @@ -242,6 +298,29 @@ func (ds *DistSender) divideAndSendRangeFeedToRanges( return ri.Error() } +func divideAndSendRangeFeedToRanges( + ctx context.Context, + ds *DistSender, + rs roachpb.RSpan, + startFrom hlc.Timestamp, + rangeCh chan<- singleRangeInfo, +) error { + return divideAndIterateRSpan( + ctx, ds, rs, + func(rs roachpb.RSpan, token rangecache.EvictionToken) error { + select { + case rangeCh <- singleRangeInfo{ + rs: rs, + startFrom: startFrom, + token: token, + }: + return nil + case <-ctx.Done(): + return ctx.Err() + } + }) +} + // partialRangeFeed establishes a RangeFeed to the range specified by desc. It // manages lifecycle events of the range in order to maintain the RangeFeed // connection; this may involve instructing higher-level functions to retry @@ -260,14 +339,8 @@ func (ds *DistSender) partialRangeFeed( span := rs.AsRawSpanWithNoLocals() // Register partial range feed with registry. - active := &activeRangeFeed{ - PartialRangeFeed: PartialRangeFeed{ - Span: span, - StartTS: startFrom, - }, - } - rr.ranges.Store(active, nil) - defer rr.ranges.Delete(active) + active := newActiveRangeFeed(rr, span, startFrom) + defer active.release() // Start a retry loop for sending the batch to the range. for r := retry.StartWithCtx(ctx, ds.rpcRetryOptions); r.Next(); { @@ -286,7 +359,7 @@ func (ds *DistSender) partialRangeFeed( } // Establish a RangeFeed for a single Range. - maxTS, err := ds.singleRangeFeed(ctx, span, startFrom, withDiff, token.Desc(), eventCh, active.onRangeEvent) + maxTS, err := ds.singleRangeFeed(ctx, span, startFrom, withDiff, token.Desc(), active, eventCh) // Forward the timestamp in case we end up sending it again. startFrom.Forward(maxTS) @@ -296,55 +369,26 @@ func (ds *DistSender) partialRangeFeed( log.Infof(ctx, "RangeFeed %s disconnected with last checkpoint %s ago: %v", span, timeutil.Since(startFrom.GoTime()), err) } - switch { - case errors.HasType(err, (*roachpb.StoreNotFoundError)(nil)) || - errors.HasType(err, (*roachpb.NodeUnavailableError)(nil)): - // These errors are likely to be unique to the replica that - // reported them, so no action is required before the next - // retry. - case IsSendError(err), errors.HasType(err, (*roachpb.RangeNotFoundError)(nil)): - // Evict the descriptor from the cache and reload on next attempt. - token.Evict(ctx) - token = rangecache.EvictionToken{} - continue - case errors.HasType(err, (*roachpb.RangeKeyMismatchError)(nil)): + errDisposition, err := handleRangeError(err) + if err != nil { + return err + } + + switch errDisposition { + case restartRange: // Evict the descriptor from the cache. token.Evict(ctx) - return ds.divideAndSendRangeFeedToRanges(ctx, rs, startFrom, rangeCh) - case errors.HasType(err, (*roachpb.RangeFeedRetryError)(nil)): - var t *roachpb.RangeFeedRetryError - if ok := errors.As(err, &t); !ok { - return errors.AssertionFailedf("wrong error type: %T", err) - } - switch t.Reason { - case roachpb.RangeFeedRetryError_REASON_REPLICA_REMOVED, - roachpb.RangeFeedRetryError_REASON_RAFT_SNAPSHOT, - roachpb.RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING, - roachpb.RangeFeedRetryError_REASON_SLOW_CONSUMER: - // Try again with same descriptor. These are transient - // errors that should not show up again. - continue - case roachpb.RangeFeedRetryError_REASON_RANGE_SPLIT, - roachpb.RangeFeedRetryError_REASON_RANGE_MERGED, - roachpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER: - // Evict the descriptor from the cache. - token.Evict(ctx) - return ds.divideAndSendRangeFeedToRanges(ctx, rs, startFrom, rangeCh) - default: - return errors.AssertionFailedf("unrecognized retryable error type: %T", err) - } + return divideAndSendRangeFeedToRanges(ctx, ds, rs, startFrom, rangeCh) + case retryRange: + // Nothing -- fallback to retry loop. default: - return err + panic("unexpected error disposition") } } } return ctx.Err() } -// onRangeEventCb is invoked for each non-error range event. -// nodeID identifies the node ID which generated the event. -type onRangeEventCb func(nodeID roachpb.NodeID, rangeID roachpb.RangeID, event *roachpb.RangeFeedEvent) - // singleRangeFeed gathers and rearranges the replicas, and makes a RangeFeed // RPC call. Results will be sent on the provided channel. Returns the timestamp // of the maximum rangefeed checkpoint seen, which can be used to re-establish @@ -357,8 +401,8 @@ func (ds *DistSender) singleRangeFeed( startFrom hlc.Timestamp, withDiff bool, desc *roachpb.RangeDescriptor, + active *activeRangeFeed, eventCh chan<- *roachpb.RangeFeedEvent, - onRangeEvent onRangeEventCb, ) (hlc.Timestamp, error) { args := roachpb.RangeFeedRequest{ Span: span, @@ -368,20 +412,9 @@ func (ds *DistSender) singleRangeFeed( }, WithDiff: withDiff, } + active.setRangeID(desc.RangeID) - var latencyFn LatencyFunc - if ds.rpcContext != nil { - latencyFn = ds.rpcContext.RemoteClocks.Latency - } - replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, nil, AllExtantReplicas) - if err != nil { - return args.Timestamp, err - } - replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), latencyFn) - // The RangeFeed is not used for system critical traffic so use a DefaultClass - // connection regardless of the range. - opts := SendOptions{class: connectionClass(&ds.st.SV)} - transport, err := ds.transportFactory(opts, ds.nodeDialer, replicas) + transport, replicas, err := ds.prepareTransportForDescriptor(ctx, desc) if err != nil { return args.Timestamp, err } @@ -394,6 +427,8 @@ func (ds *DistSender) singleRangeFeed( } args.Replica = transport.NextReplica() + active.setNodeID(args.Replica.NodeID) + clientCtx, client, err := transport.NextInternalClient(ctx) if err != nil { log.VErrEventf(ctx, 2, "RPC error: %s", err) @@ -426,7 +461,7 @@ func (ds *DistSender) singleRangeFeed( log.VErrEventf(ctx, 2, "RangeFeedError: %s", t.Error.GoError()) return args.Timestamp, t.Error.GoError() } - onRangeEvent(args.Replica.NodeID, desc.RangeID, event) + active.onRangeEvent(event) select { case eventCh <- event: @@ -437,6 +472,97 @@ func (ds *DistSender) singleRangeFeed( } } +type errorDisposition int + +const ( + // abortRange is a sentinel indicating rangefeed should be aborted because + // of an error. + abortRange errorDisposition = iota + + // restartRange indicates that the rangefeed for the range should be restarted. + // this includes updating routing information, splitting range on range boundaries + // and re-establishing rangefeeds for 1 or more ranges. + restartRange + + // retryRange indicates that the rangefeed should be simply retried. + retryRange +) + +func (d errorDisposition) String() string { + switch d { + case restartRange: + return "restart" + case retryRange: + return "retry" + default: + return "abort" + } +} + +// handleRangeError classifies rangefeed error and returns error disposition to the caller +// indicating how such error should be handled. +func handleRangeError(err error) (errorDisposition, error) { + switch { + case errors.HasType(err, (*roachpb.StoreNotFoundError)(nil)) || + errors.HasType(err, (*roachpb.NodeUnavailableError)(nil)): + // These errors are likely to be unique to the replica that + // reported them, so no action is required before the next + // retry. + return retryRange, nil + case IsSendError(err), errors.HasType(err, (*roachpb.RangeNotFoundError)(nil)): + return restartRange, nil + case errors.HasType(err, (*roachpb.RangeKeyMismatchError)(nil)): + return restartRange, nil + case errors.HasType(err, (*roachpb.RangeFeedRetryError)(nil)): + var t *roachpb.RangeFeedRetryError + if ok := errors.As(err, &t); !ok { + return abortRange, errors.AssertionFailedf("wrong error type: %T", err) + } + switch t.Reason { + case roachpb.RangeFeedRetryError_REASON_REPLICA_REMOVED, + roachpb.RangeFeedRetryError_REASON_RAFT_SNAPSHOT, + roachpb.RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING, + roachpb.RangeFeedRetryError_REASON_SLOW_CONSUMER: + // Try again with same descriptor. These are transient + // errors that should not show up again. + return retryRange, nil + case roachpb.RangeFeedRetryError_REASON_RANGE_SPLIT, + roachpb.RangeFeedRetryError_REASON_RANGE_MERGED, + roachpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER: + return restartRange, nil + default: + return abortRange, errors.AssertionFailedf("unrecognized retryable error type: %T", err) + } + default: + return abortRange, err + } +} + +// prepareTransportForDescriptor creates and configures RPC transport for the specified +// descriptor. Returns the transport, which the caller is expected to release along with the +// replicas slice used to configure the transport. +func (ds *DistSender) prepareTransportForDescriptor( + ctx context.Context, desc *roachpb.RangeDescriptor, +) (Transport, ReplicaSlice, error) { + var latencyFn LatencyFunc + if ds.rpcContext != nil { + latencyFn = ds.rpcContext.RemoteClocks.Latency + } + replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, nil, AllExtantReplicas) + if err != nil { + return nil, nil, err + } + replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), latencyFn) + // The RangeFeed is not used for system critical traffic so use a DefaultClass + // connection regardless of the range. + opts := SendOptions{class: connectionClass(&ds.st.SV)} + transport, err := ds.transportFactory(opts, ds.nodeDialer, replicas) + if err != nil { + return nil, nil, err + } + return transport, replicas, nil +} + func connectionClass(sv *settings.Values) rpc.ConnectionClass { if useDedicatedRangefeedConnectionClass.Get(sv) { return rpc.RangefeedClass diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go new file mode 100644 index 000000000000..bf1fd4ecf013 --- /dev/null +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go @@ -0,0 +1,177 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvcoord + +import ( + "context" + "fmt" + "io" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/gossip" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + gomock "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" +) + +func TestingSetEnableStreamingRangefeed(enabled bool) func() { + old := enableMuxRangeFeed + enableMuxRangeFeed = enabled + return func() { + enableMuxRangeFeed = old + } +} + +// Tests that the range feed handles transport errors appropriately. In +// particular, that when encountering other decommissioned nodes it will refresh +// its range descriptor and retry, but if this node is decommissioned it will +// bail out. Regression test for: +// https://github.com/cockroachdb/cockroach/issues/66636 +func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + for _, useStreamingRangefeed := range []bool{false, true} { + enableMuxRangeFeed = useStreamingRangefeed + + for _, spec := range []struct { + errorCode codes.Code + expectRetry bool + }{ + {codes.FailedPrecondition, true}, // target node is decommissioned; retry + {codes.PermissionDenied, false}, // this node is decommissioned; abort + {codes.Unauthenticated, false}, // this node is not part of cluster; abort + } { + t.Run(fmt.Sprintf("streaming=%t/%s", useStreamingRangefeed, spec.errorCode), + func(t *testing.T) { + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) + g := makeGossip(t, stopper, rpcContext) + + desc := roachpb.RangeDescriptor{ + RangeID: 1, + Generation: 1, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + }, + } + for _, repl := range desc.InternalReplicas { + require.NoError(t, g.AddInfoProto( + gossip.MakeNodeIDKey(repl.NodeID), + newNodeDesc(repl.NodeID), + gossip.NodeDescriptorTTL, + )) + } + + ctrl := gomock.NewController(t) + transport := NewMockTransport(ctrl) + rangeDB := rangecache.NewMockRangeDescriptorDB(ctrl) + + // We start off with a cached lease on r1. + cachedLease := roachpb.Lease{ + Replica: desc.InternalReplicas[0], + Sequence: 1, + } + + // All nodes return the specified error code. We expect the range feed to + // keep trying all replicas in sequence regardless of error. + for _, repl := range desc.InternalReplicas { + transport.EXPECT().IsExhausted().Return(false) + transport.EXPECT().NextReplica().Return(repl) + transport.EXPECT().NextInternalClient(gomock.Any()).Return( + ctx, nil, grpcstatus.Error(spec.errorCode, "")) + } + transport.EXPECT().IsExhausted().Return(true) + transport.EXPECT().Release() + + // Once all replicas have failed, it should try to refresh the lease using + // the range cache. We let this succeed once. + rangeDB.EXPECT().FirstRange().Return(&desc, nil) + + // It then tries the replicas again. This time we just report the + // transport as exhausted immediately. + transport.EXPECT().IsExhausted().Return(true) + transport.EXPECT().Release() + + // This invalidates the cache yet again. This time we error. + rangeDB.EXPECT().FirstRange().Return(nil, grpcstatus.Error(spec.errorCode, "")) + + // If we expect a range lookup retry, allow the retry to succeed by + // returning a range descriptor and a client that immediately + // cancels the context and closes the range feed stream. + if spec.expectRetry { + rangeDB.EXPECT().FirstRange().Return(&desc, nil) + client := roachpb.NewMockInternalClient(ctrl) + + if useStreamingRangefeed { + stream := roachpb.NewMockInternal_MuxRangeFeedClient(ctrl) + stream.EXPECT().Send(gomock.Any()).Return(nil) + stream.EXPECT().Recv().Do(cancel).Return(nil, io.EOF) + client.EXPECT().MuxRangeFeed(gomock.Any()).Return(stream, nil) + } else { + stream := roachpb.NewMockInternal_RangeFeedClient(ctrl) + stream.EXPECT().Recv().Do(cancel).Return(nil, io.EOF) + client.EXPECT().RangeFeed(gomock.Any(), gomock.Any()).Return(stream, nil) + } + + transport.EXPECT().IsExhausted().Return(false) + transport.EXPECT().NextReplica().Return(desc.InternalReplicas[0]) + transport.EXPECT().NextInternalClient(gomock.Any()).Return(ctx, client, nil) + transport.EXPECT().Release() + } + + ds := NewDistSender(DistSenderConfig{ + AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, + Clock: clock, + NodeDescs: g, + RPCRetryOptions: &retry.Options{MaxRetries: 10}, + RPCContext: rpcContext, + TestingKnobs: ClientTestingKnobs{ + TransportFactory: func(SendOptions, *nodedialer.Dialer, ReplicaSlice) (Transport, error) { + return transport, nil + }, + }, + RangeDescriptorDB: rangeDB, + NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), + Settings: cluster.MakeTestingClusterSettings(), + }) + ds.rangeCache.Insert(ctx, roachpb.RangeInfo{ + Desc: desc, + Lease: cachedLease, + }) + + err := ds.RangeFeed(ctx, []roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}, hlc.Timestamp{}, false, nil) + require.Error(t, err) + }) + } + } +} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go index 0d757bacb013..dec047ea1e14 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go @@ -8,146 +8,359 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package kvcoord +package kvcoord_test import ( "context" - "io" "testing" - "time" - "github.com/cockroachdb/cockroach/pkg/gossip" + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/retry" - "github.com/cockroachdb/cockroach/pkg/util/stop" - gomock "github.com/golang/mock/gomock" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" - "google.golang.org/grpc/codes" - grpcstatus "google.golang.org/grpc/status" + "google.golang.org/grpc" ) -// Tests that the range feed handles transport errors appropriately. In -// particular, that when encountering other decommissioned nodes it will refresh -// its range descriptor and retry, but if this node is decommissioned it will -// bail out. Regression test for: -// https://github.com/cockroachdb/cockroach/issues/66636 -func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) +type testRangefeedClient struct { + roachpb.InternalClient + muxRangeFeedEnabled bool + count func() +} - for _, spec := range []struct { - errorCode codes.Code - expectRetry bool - }{ - {codes.FailedPrecondition, true}, // target node is decommissioned; retry - {codes.PermissionDenied, false}, // this node is decommissioned; abort - {codes.Unauthenticated, false}, // this node is not part of cluster; abort - } { - t.Run(spec.errorCode.String(), func(t *testing.T) { - clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) - g := makeGossip(t, stopper, rpcContext) - - desc := roachpb.RangeDescriptor{ - RangeID: 1, - Generation: 1, - StartKey: roachpb.RKeyMin, - EndKey: roachpb.RKeyMax, - InternalReplicas: []roachpb.ReplicaDescriptor{ - {NodeID: 1, StoreID: 1, ReplicaID: 1}, - {NodeID: 2, StoreID: 2, ReplicaID: 2}, - }, - } - for _, repl := range desc.InternalReplicas { - require.NoError(t, g.AddInfoProto( - gossip.MakeNodeIDKey(repl.NodeID), - newNodeDesc(repl.NodeID), - gossip.NodeDescriptorTTL, - )) - } +func (c *testRangefeedClient) RangeFeed( + ctx context.Context, args *roachpb.RangeFeedRequest, opts ...grpc.CallOption, +) (roachpb.Internal_RangeFeedClient, error) { + defer c.count() - ctrl := gomock.NewController(t) - transport := NewMockTransport(ctrl) - rangeDB := rangecache.NewMockRangeDescriptorDB(ctrl) + if c.muxRangeFeedEnabled { + panic(errors.AssertionFailedf("unexpected call to RangeFeed")) + } + return c.InternalClient.RangeFeed(ctx, args, opts...) +} - // We start off with a cached lease on r1. - cachedLease := roachpb.Lease{ - Replica: desc.InternalReplicas[0], - Sequence: 1, - } +func (c *testRangefeedClient) RangeFeedStream( + ctx context.Context, opts ...grpc.CallOption, +) (roachpb.Internal_MuxRangeFeedClient, error) { + defer c.count() + + if !c.muxRangeFeedEnabled { + panic(errors.AssertionFailedf("unexpected call to RangeFeedStream")) + } + return c.InternalClient.MuxRangeFeed(ctx, opts...) +} + +type internalClientCounts struct { + syncutil.Mutex + counts map[roachpb.InternalClient]int +} + +func (c *internalClientCounts) Inc(ic roachpb.InternalClient) { + if c == nil { + return + } + c.Lock() + defer c.Unlock() + c.counts[ic]++ +} + +type countConnectionsTransport struct { + wrapped kvcoord.Transport + counts *internalClientCounts + rfStreamEnabled bool +} + +func (c *countConnectionsTransport) IsExhausted() bool { + return c.wrapped.IsExhausted() +} + +func (c *countConnectionsTransport) SendNext( + ctx context.Context, request roachpb.BatchRequest, +) (*roachpb.BatchResponse, error) { + return c.wrapped.SendNext(ctx, request) +} + +type testFeedCtxKey struct{} + +func (c *countConnectionsTransport) NextInternalClient( + ctx context.Context, +) (context.Context, roachpb.InternalClient, error) { + ctx, client, err := c.wrapped.NextInternalClient(ctx) + if err != nil { + return ctx, client, err + } + + // Count rangefeed calls but only for feeds started by this test. + countFn := func() {} + if ctx.Value(testFeedCtxKey{}) != nil { + countFn = func() { + c.counts.Inc(client) + } + } + + tc := &testRangefeedClient{ + InternalClient: client, + muxRangeFeedEnabled: c.rfStreamEnabled, + count: countFn, + } + return ctx, tc, nil +} + +func (c *countConnectionsTransport) NextReplica() roachpb.ReplicaDescriptor { + return c.wrapped.NextReplica() +} + +func (c *countConnectionsTransport) SkipReplica() { + c.wrapped.SkipReplica() +} + +func (c *countConnectionsTransport) MoveToFront(descriptor roachpb.ReplicaDescriptor) { + c.wrapped.MoveToFront(descriptor) +} + +func (c *countConnectionsTransport) Release() { + c.wrapped.Release() +} + +var _ kvcoord.Transport = (*countConnectionsTransport)(nil) + +func makeTransportFactory( + rfStreamEnabled bool, counts *internalClientCounts, +) kvcoord.TransportFactory { + return func( + options kvcoord.SendOptions, + dialer *nodedialer.Dialer, + slice kvcoord.ReplicaSlice, + ) (kvcoord.Transport, error) { + transport, err := kvcoord.GRPCTransportFactory(options, dialer, slice) + if err != nil { + return nil, err + } + countingTransport := &countConnectionsTransport{ + wrapped: transport, + rfStreamEnabled: rfStreamEnabled, + counts: counts, + } + return countingTransport, nil + } +} - // All nodes return the specified error code. We expect the range feed to - // keep trying all replicas in sequence regardless of error. - for _, repl := range desc.InternalReplicas { - transport.EXPECT().IsExhausted().Return(false) - transport.EXPECT().NextReplica().Return(repl) - transport.EXPECT().NextInternalClient(gomock.Any()).Return( - ctx, nil, grpcstatus.Error(spec.errorCode, "")) +// rangeFeed is a helper to execute rangefeed. We are not using rangefeed library +// here because of circular dependencies. +func rangeFeed( + dsI interface{}, + sp roachpb.Span, + startFrom hlc.Timestamp, + onValue func(event *roachpb.RangeFeedEvent), + onError func(err error), +) func() error { + ds := dsI.(*kvcoord.DistSender) + events := make(chan *roachpb.RangeFeedEvent) + ctx, cancel := context.WithCancel(context.WithValue(context.Background(), testFeedCtxKey{}, struct{}{})) + withCancel := func(err error) error { + cancel() + onError(err) + return err + } + + g := ctxgroup.WithContext(ctx) + g.GoCtx(func(ctx context.Context) (err error) { + return withCancel(ds.RangeFeed(ctx, []roachpb.Span{sp}, startFrom, false, events)) + }) + g.GoCtx(func(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return withCancel(ctx.Err()) + case ev := <-events: + onValue(ev) } - transport.EXPECT().IsExhausted().Return(true) - transport.EXPECT().Release() - - // Once all replicas have failed, it should try to refresh the lease using - // the range cache. We let this succeed once. - rangeDB.EXPECT().FirstRange().Return(&desc, nil) - - // It then tries the replicas again. This time we just report the - // transport as exhausted immediately. - transport.EXPECT().IsExhausted().Return(true) - transport.EXPECT().Release() - - // This invalidates the cache yet again. This time we error. - rangeDB.EXPECT().FirstRange().Return(nil, grpcstatus.Error(spec.errorCode, "")) - - // If we expect a range lookup retry, allow the retry to succeed by - // returning a range descriptor and a client that immediately - // cancels the context and closes the range feed stream. - if spec.expectRetry { - rangeDB.EXPECT().FirstRange().Return(&desc, nil) - stream := roachpb.NewMockInternal_RangeFeedClient(ctrl) - stream.EXPECT().Recv().Do(cancel).Return(nil, io.EOF) - client := roachpb.NewMockInternalClient(ctrl) - client.EXPECT().RangeFeed(gomock.Any(), gomock.Any()).Return(stream, nil) - transport.EXPECT().IsExhausted().Return(false) - transport.EXPECT().NextReplica().Return(desc.InternalReplicas[0]) - transport.EXPECT().NextInternalClient(gomock.Any()).Return(ctx, client, nil) - transport.EXPECT().Release() + } + }) + + return func() error { + cancel() + if err := g.Wait(); !errors.Is(err, context.Canceled) { + return err + } + return nil + } +} + +// rangeFeedOnError returns a error channel and an onError handler for rangefeed call. +func rangeFeedOnError() (chan error, func(err error)) { + errCh := make(chan error, 1) + return errCh, func(err error) { + select { + case errCh <- err: + default: + } + } +} + +// observeNValues returns on value handler which expects to see N rangefeed values, +// along with the channel which gets closed when requisite count of events has been seen. +func observeNValues(n int) (chan struct{}, func(ev *roachpb.RangeFeedEvent)) { + var count = struct { + syncutil.Mutex + c int + }{} + allSeen := make(chan struct{}) + return allSeen, func(ev *roachpb.RangeFeedEvent) { + if ev.Val != nil { + count.Lock() + count.c++ + if count.c == n { + close(allSeen) } + count.Unlock() + } + } +} - ds := NewDistSender(DistSenderConfig{ - AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), - Clock: clock, - NodeDescs: g, - RPCRetryOptions: &retry.Options{MaxRetries: 10}, - RPCContext: rpcContext, - TestingKnobs: ClientTestingKnobs{ - TransportFactory: func(SendOptions, *nodedialer.Dialer, ReplicaSlice) (Transport, error) { - return transport, nil +func TestBiDirectionalRangefeedNotUsedUntilUpgradeFinalilzed(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + startServerAtVer := func(ver roachpb.Version) (*testcluster.TestCluster, func()) { + st := cluster.MakeTestingClusterSettingsWithVersions(ver, ver, true) + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, // Turn off replication queues. + + ServerArgs: base.TestServerArgs{ + Settings: st, + Knobs: base.TestingKnobs{ + KVClient: &kvcoord.ClientTestingKnobs{ + TransportFactory: makeTransportFactory(false, nil), + }, + + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: 1, + BinaryVersionOverride: ver, }, }, - RangeDescriptorDB: rangeDB, - NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), - Settings: cluster.MakeTestingClusterSettings(), - }) - ds.rangeCache.Insert(ctx, roachpb.RangeInfo{ - Desc: desc, - Lease: cachedLease, - }) - - err := ds.RangeFeed(ctx, []roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}, hlc.Timestamp{}, false, nil) - require.Error(t, err) + }, }) + return tc, func() { tc.Stopper().Stop(ctx) } + } + + // Create a small table; run rangefeed. The transport factory we injected above verifies + // that we use the old rangefeed implementation. + runRangeFeed := func(tc *testcluster.TestCluster) { + ts := tc.Server(0) + + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + startTime := ts.Clock().Now() + + sqlDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) + sqlDB.Exec(t, `CREATE TABLE foo (key INT PRIMARY KEY)`) + sqlDB.Exec(t, `INSERT INTO foo (key) SELECT * FROM generate_series(1, 1000)`) + + fooDesc := desctestutils.TestingGetPublicTableDescriptor( + ts.DB(), keys.SystemSQLCodec, "defaultdb", "foo") + fooSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec) + + allSeen, onValue := observeNValues(1000) + errCh, onError := rangeFeedOnError() + closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, onValue, onError) + select { + case <-allSeen: + require.NoError(t, closeFeed()) + case err := <-errCh: + require.NoError(t, err) + } + } + + t.Run("rangefeed-stream-disabled-prior-to-version-upgrade", func(t *testing.T) { + noRfStreamVer := clusterversion.ByKey(clusterversion.RangefeedUseOneStreamPerNode - 1) + tc, cleanup := startServerAtVer(noRfStreamVer) + defer cleanup() + runRangeFeed(tc) + }) + + t.Run("rangefeed-stream-disabled-via-environment", func(t *testing.T) { + defer kvcoord.TestingSetEnableStreamingRangefeed(false)() + // Even though we could use rangefeed stream, it's disable via kill switch. + rfStreamVer := clusterversion.ByKey(clusterversion.RangefeedUseOneStreamPerNode) + tc, cleanup := startServerAtVer(rfStreamVer) + defer cleanup() + runRangeFeed(tc) + }) +} + +func TestMuxRangeFeedConnectsToNodeOnce(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + connCounts := &internalClientCounts{counts: make(map[roachpb.InternalClient]int)} + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, // Turn off replication queues. + + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + KVClient: &kvcoord.ClientTestingKnobs{ + TransportFactory: makeTransportFactory(true, connCounts), + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + ts := tc.Server(0) + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + startTime := ts.Clock().Now() + + // Create a table, and split it so that we have multiple ranges, distributed across + // test cluster nodes. + sqlDB.Exec(t, ` +SET CLUSTER SETTING kv.rangefeed.enabled = true; +ALTER DATABASE defaultdb CONFIGURE ZONE USING num_replicas = 1; +CREATE TABLE foo (key INT PRIMARY KEY); +INSERT INTO foo (key) SELECT * FROM generate_series(1, 1000); +ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(100, 900, 100)); +`) + + for i := 100; i <= 900; i += 100 { + storeID := 1 + i%3 + rowID := i + sqlDB.Exec(t, "ALTER TABLE foo EXPERIMENTAL_RELOCATE VALUES (ARRAY[$1], $2)", storeID, rowID) + } + + fooDesc := desctestutils.TestingGetPublicTableDescriptor( + ts.DB(), keys.SystemSQLCodec, "defaultdb", "foo") + fooSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec) + + allSeen, onValue := observeNValues(1000) + errCh, onError := rangeFeedOnError() + closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, onValue, onError) + select { + case <-allSeen: + require.NoError(t, closeFeed()) + case err := <-errCh: + require.NoError(t, err) + } + + // Verify we connected to each node once. + connCounts.Lock() + defer connCounts.Unlock() + for _, c := range connCounts.counts { + require.Equal(t, 1, c) } } diff --git a/pkg/kv/kvclient/kvcoord/send_test.go b/pkg/kv/kvclient/kvcoord/send_test.go index 20fada847ebf..bcd9891abc0d 100644 --- a/pkg/kv/kvclient/kvcoord/send_test.go +++ b/pkg/kv/kvclient/kvcoord/send_test.go @@ -57,6 +57,10 @@ func (n Node) RangeFeed(_ *roachpb.RangeFeedRequest, _ roachpb.Internal_RangeFee panic("unimplemented") } +func (n Node) MuxRangeFeed(server roachpb.Internal_MuxRangeFeedServer) error { + panic("unimplemented") +} + func (n Node) GossipSubscription( _ *roachpb.GossipSubscriptionRequest, _ roachpb.Internal_GossipSubscriptionServer, ) error { diff --git a/pkg/kv/kvclient/kvcoord/transport_test.go b/pkg/kv/kvclient/kvcoord/transport_test.go index 6e8509365170..2253b8264600 100644 --- a/pkg/kv/kvclient/kvcoord/transport_test.go +++ b/pkg/kv/kvclient/kvcoord/transport_test.go @@ -178,6 +178,12 @@ func (m *mockInternalClient) RangeFeed( return nil, fmt.Errorf("unsupported RangeFeed call") } +func (m *mockInternalClient) MuxRangeFeed( + ctx context.Context, opts ...grpc.CallOption, +) (roachpb.Internal_MuxRangeFeedClient, error) { + return nil, fmt.Errorf("unsupported RangeFeedStream call") +} + // GossipSubscription is part of the roachpb.InternalClient interface. func (m *mockInternalClient) GossipSubscription( ctx context.Context, args *roachpb.GossipSubscriptionRequest, _ ...grpc.CallOption, diff --git a/pkg/kv/kvserver/kvserverbase/base.go b/pkg/kv/kvserver/kvserverbase/base.go index 11fa732c79f7..8481023f218d 100644 --- a/pkg/kv/kvserver/kvserverbase/base.go +++ b/pkg/kv/kvserver/kvserverbase/base.go @@ -124,7 +124,7 @@ type ReplicaResponseFilter func(context.Context, roachpb.BatchRequest, *roachpb. // ReplicaRangefeedFilter is used in unit tests to modify the request, inject // responses, or return errors from rangefeeds. type ReplicaRangefeedFilter func( - args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) *roachpb.Error // ContainsKey returns whether this range contains the specified key. diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 9aca30b5ddaa..657fa35b699a 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -72,7 +72,7 @@ var RangefeedTBIEnabled = settings.RegisterBoolSetting( // support for concurrent calls to Send. Note that the default implementation of // grpc.Stream is not safe for concurrent calls to Send. type lockedRangefeedStream struct { - wrapped roachpb.Internal_RangeFeedServer + wrapped roachpb.RangeFeedEventSink sendMu syncutil.Mutex } @@ -143,15 +143,13 @@ func (tp *rangefeedTxnPusher) ResolveIntents( // complete. The provided ConcurrentRequestLimiter is used to limit the number // of rangefeeds using catch-up iterators at the same time. func (r *Replica) RangeFeed( - args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) *roachpb.Error { return r.rangeFeedWithRangeID(r.RangeID, args, stream) } func (r *Replica) rangeFeedWithRangeID( - _forStacks roachpb.RangeID, - args *roachpb.RangeFeedRequest, - stream roachpb.Internal_RangeFeedServer, + _forStacks roachpb.RangeID, args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) *roachpb.Error { if !r.isRangefeedEnabled() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) { return roachpb.NewErrorf("rangefeeds require the kv.rangefeed.enabled setting. See %s", diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index e727704211a7..3c68c3f28952 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2990,7 +2990,7 @@ func (s *Store) Descriptor(ctx context.Context, useCached bool) (*roachpb.StoreD // the provided stream and returns with an optional error when the rangefeed is // complete. func (s *Store) RangeFeed( - args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) *roachpb.Error { if filter := s.TestingKnobs().TestingRangefeedFilter; filter != nil { diff --git a/pkg/kv/kvserver/stores.go b/pkg/kv/kvserver/stores.go index 9d4bf5f5c9e4..f09d40184bec 100644 --- a/pkg/kv/kvserver/stores.go +++ b/pkg/kv/kvserver/stores.go @@ -199,7 +199,7 @@ func (ls *Stores) Send( // the provided stream and returns with an optional error when the rangefeed is // complete. func (ls *Stores) RangeFeed( - args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) *roachpb.Error { ctx := stream.Context() if args.RangeID == 0 { diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 14a3aee00368..1ece8b30ca2b 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -11,6 +11,7 @@ package roachpb import ( + "context" "fmt" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" @@ -1696,3 +1697,9 @@ const ( // with the SpecificTenantOverrides precedence.. AllTenantsOverrides ) + +// RangeFeedEventSink is an interface for sending a single rangefeed event. +type RangeFeedEventSink interface { + Context() context.Context + Send(*RangeFeedEvent) error +} diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index b18423086599..6893d5a6a443 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -2557,6 +2557,9 @@ message RangeFeedRequest { // AdmissionHeader is used only at the start of the range feed stream, since // the initial catch-up scan be expensive. AdmissionHeader admission_header = 4 [(gogoproto.nullable) = false]; + + // RequestID is set by the client issuing MuxRangeFeed requests. + int64 request_id = 5 [(gogoproto.customname) = "RequestID"]; } // RangeFeedValue is a variant of RangeFeedEvent that represents an update to @@ -2621,6 +2624,14 @@ message RangeFeedEvent { RangeFeedSSTable sst = 4 [(gogoproto.customname) = "SST"]; } +// MuxRangeFeedResponse is a response generated by RangeFeedStream RPC. It tags +// the underlying RangeFeedEvent with the ID of the range that produced this event. +message MuxRangeFeedResponse { + RangeFeedEvent event = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; + int32 range_id = 2 [(gogoproto.customname) = "RangeID", (gogoproto.casttype) = "RangeID"]; + // Server echoes back request_id set by the client. + int64 request_id = 3 [(gogoproto.customname) = "RequestID"]; +} // ResetQuorumRequest makes a range that is unavailable due to lost quorum // available again, at the cost of losing all of the data in the range. Any @@ -2830,7 +2841,10 @@ message JoinNodeResponse { service Internal { rpc Batch (BatchRequest) returns (BatchResponse) {} rpc RangeLookup (RangeLookupRequest) returns (RangeLookupResponse) {} + // RangeFeed RPC is deprecated. Use MuxRangeFeed instead. + // To be removed at version 22.2 rpc RangeFeed (RangeFeedRequest) returns (stream RangeFeedEvent) {} + rpc MuxRangeFeed (stream RangeFeedRequest) returns (stream MuxRangeFeedResponse) {} rpc GossipSubscription (GossipSubscriptionRequest) returns (stream GossipSubscriptionEvent) {} rpc ResetQuorum (ResetQuorumRequest) returns (ResetQuorumResponse) {} diff --git a/pkg/roachpb/mocks_generated.go b/pkg/roachpb/mocks_generated.go index 9aa7e547d2e7..54ba15b6b788 100644 --- a/pkg/roachpb/mocks_generated.go +++ b/pkg/roachpb/mocks_generated.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/cockroachdb/cockroach/pkg/roachpb (interfaces: InternalClient,Internal_RangeFeedClient) +// Source: github.com/cockroachdb/cockroach/pkg/roachpb (interfaces: InternalClient,Internal_RangeFeedClient,Internal_MuxRangeFeedClient,Internal_MuxRangeFeedServer) // Package roachpb is a generated GoMock package. package roachpb @@ -136,6 +136,26 @@ func (mr *MockInternalClientMockRecorder) Join(arg0, arg1 interface{}, arg2 ...i return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Join", reflect.TypeOf((*MockInternalClient)(nil).Join), varargs...) } +// MuxRangeFeed mocks base method. +func (m *MockInternalClient) MuxRangeFeed(arg0 context.Context, arg1 ...grpc.CallOption) (Internal_MuxRangeFeedClient, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "MuxRangeFeed", varargs...) + ret0, _ := ret[0].(Internal_MuxRangeFeedClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// MuxRangeFeed indicates an expected call of MuxRangeFeed. +func (mr *MockInternalClientMockRecorder) MuxRangeFeed(arg0 interface{}, arg1 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MuxRangeFeed", reflect.TypeOf((*MockInternalClient)(nil).MuxRangeFeed), varargs...) +} + // RangeFeed mocks base method. func (m *MockInternalClient) RangeFeed(arg0 context.Context, arg1 *RangeFeedRequest, arg2 ...grpc.CallOption) (Internal_RangeFeedClient, error) { m.ctrl.T.Helper() @@ -398,3 +418,274 @@ func (mr *MockInternal_RangeFeedClientMockRecorder) Trailer() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockInternal_RangeFeedClient)(nil).Trailer)) } + +// MockInternal_MuxRangeFeedClient is a mock of Internal_MuxRangeFeedClient interface. +type MockInternal_MuxRangeFeedClient struct { + ctrl *gomock.Controller + recorder *MockInternal_MuxRangeFeedClientMockRecorder +} + +// MockInternal_MuxRangeFeedClientMockRecorder is the mock recorder for MockInternal_MuxRangeFeedClient. +type MockInternal_MuxRangeFeedClientMockRecorder struct { + mock *MockInternal_MuxRangeFeedClient +} + +// NewMockInternal_MuxRangeFeedClient creates a new mock instance. +func NewMockInternal_MuxRangeFeedClient(ctrl *gomock.Controller) *MockInternal_MuxRangeFeedClient { + mock := &MockInternal_MuxRangeFeedClient{ctrl: ctrl} + mock.recorder = &MockInternal_MuxRangeFeedClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockInternal_MuxRangeFeedClient) EXPECT() *MockInternal_MuxRangeFeedClientMockRecorder { + return m.recorder +} + +// CloseSend mocks base method. +func (m *MockInternal_MuxRangeFeedClient) CloseSend() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CloseSend") + ret0, _ := ret[0].(error) + return ret0 +} + +// CloseSend indicates an expected call of CloseSend. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) CloseSend() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseSend", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).CloseSend)) +} + +// Context mocks base method. +func (m *MockInternal_MuxRangeFeedClient) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).Context)) +} + +// Header mocks base method. +func (m *MockInternal_MuxRangeFeedClient) Header() (metadata.MD, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Header") + ret0, _ := ret[0].(metadata.MD) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Header indicates an expected call of Header. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) Header() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Header", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).Header)) +} + +// Recv mocks base method. +func (m *MockInternal_MuxRangeFeedClient) Recv() (*MuxRangeFeedResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Recv") + ret0, _ := ret[0].(*MuxRangeFeedResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Recv indicates an expected call of Recv. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) Recv() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).Recv)) +} + +// RecvMsg mocks base method. +func (m *MockInternal_MuxRangeFeedClient) RecvMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RecvMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecvMsg indicates an expected call of RecvMsg. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) RecvMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).RecvMsg), arg0) +} + +// Send mocks base method. +func (m *MockInternal_MuxRangeFeedClient) Send(arg0 *RangeFeedRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Send", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Send indicates an expected call of Send. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) Send(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).Send), arg0) +} + +// SendMsg mocks base method. +func (m *MockInternal_MuxRangeFeedClient) SendMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendMsg indicates an expected call of SendMsg. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) SendMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).SendMsg), arg0) +} + +// Trailer mocks base method. +func (m *MockInternal_MuxRangeFeedClient) Trailer() metadata.MD { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Trailer") + ret0, _ := ret[0].(metadata.MD) + return ret0 +} + +// Trailer indicates an expected call of Trailer. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) Trailer() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).Trailer)) +} + +// MockInternal_MuxRangeFeedServer is a mock of Internal_MuxRangeFeedServer interface. +type MockInternal_MuxRangeFeedServer struct { + ctrl *gomock.Controller + recorder *MockInternal_MuxRangeFeedServerMockRecorder +} + +// MockInternal_MuxRangeFeedServerMockRecorder is the mock recorder for MockInternal_MuxRangeFeedServer. +type MockInternal_MuxRangeFeedServerMockRecorder struct { + mock *MockInternal_MuxRangeFeedServer +} + +// NewMockInternal_MuxRangeFeedServer creates a new mock instance. +func NewMockInternal_MuxRangeFeedServer(ctrl *gomock.Controller) *MockInternal_MuxRangeFeedServer { + mock := &MockInternal_MuxRangeFeedServer{ctrl: ctrl} + mock.recorder = &MockInternal_MuxRangeFeedServerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockInternal_MuxRangeFeedServer) EXPECT() *MockInternal_MuxRangeFeedServerMockRecorder { + return m.recorder +} + +// Context mocks base method. +func (m *MockInternal_MuxRangeFeedServer) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context. +func (mr *MockInternal_MuxRangeFeedServerMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockInternal_MuxRangeFeedServer)(nil).Context)) +} + +// Recv mocks base method. +func (m *MockInternal_MuxRangeFeedServer) Recv() (*RangeFeedRequest, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Recv") + ret0, _ := ret[0].(*RangeFeedRequest) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Recv indicates an expected call of Recv. +func (mr *MockInternal_MuxRangeFeedServerMockRecorder) Recv() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockInternal_MuxRangeFeedServer)(nil).Recv)) +} + +// RecvMsg mocks base method. +func (m *MockInternal_MuxRangeFeedServer) RecvMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RecvMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecvMsg indicates an expected call of RecvMsg. +func (mr *MockInternal_MuxRangeFeedServerMockRecorder) RecvMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockInternal_MuxRangeFeedServer)(nil).RecvMsg), arg0) +} + +// Send mocks base method. +func (m *MockInternal_MuxRangeFeedServer) Send(arg0 *MuxRangeFeedResponse) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Send", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Send indicates an expected call of Send. +func (mr *MockInternal_MuxRangeFeedServerMockRecorder) Send(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockInternal_MuxRangeFeedServer)(nil).Send), arg0) +} + +// SendHeader mocks base method. +func (m *MockInternal_MuxRangeFeedServer) SendHeader(arg0 metadata.MD) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendHeader", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendHeader indicates an expected call of SendHeader. +func (mr *MockInternal_MuxRangeFeedServerMockRecorder) SendHeader(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendHeader", reflect.TypeOf((*MockInternal_MuxRangeFeedServer)(nil).SendHeader), arg0) +} + +// SendMsg mocks base method. +func (m *MockInternal_MuxRangeFeedServer) SendMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendMsg indicates an expected call of SendMsg. +func (mr *MockInternal_MuxRangeFeedServerMockRecorder) SendMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockInternal_MuxRangeFeedServer)(nil).SendMsg), arg0) +} + +// SetHeader mocks base method. +func (m *MockInternal_MuxRangeFeedServer) SetHeader(arg0 metadata.MD) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetHeader", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetHeader indicates an expected call of SetHeader. +func (mr *MockInternal_MuxRangeFeedServerMockRecorder) SetHeader(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHeader", reflect.TypeOf((*MockInternal_MuxRangeFeedServer)(nil).SetHeader), arg0) +} + +// SetTrailer mocks base method. +func (m *MockInternal_MuxRangeFeedServer) SetTrailer(arg0 metadata.MD) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetTrailer", arg0) +} + +// SetTrailer indicates an expected call of SetTrailer. +func (mr *MockInternal_MuxRangeFeedServerMockRecorder) SetTrailer(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTrailer", reflect.TypeOf((*MockInternal_MuxRangeFeedServer)(nil).SetTrailer), arg0) +} diff --git a/pkg/rpc/auth_tenant.go b/pkg/rpc/auth_tenant.go index 56078743ebad..c11b3da0e6a8 100644 --- a/pkg/rpc/auth_tenant.go +++ b/pkg/rpc/auth_tenant.go @@ -52,9 +52,8 @@ func (a tenantAuthorizer) authorize( case "/cockroach.roachpb.Internal/RangeLookup": return a.authRangeLookup(tenID, req.(*roachpb.RangeLookupRequest)) - case "/cockroach.roachpb.Internal/RangeFeed": + case "/cockroach.roachpb.Internal/RangeFeed", "/cockroach.roachpb.Internal/MuxRangeFeed": return a.authRangeFeed(tenID, req.(*roachpb.RangeFeedRequest)) - case "/cockroach.roachpb.Internal/GossipSubscription": return a.authGossipSubscription(tenID, req.(*roachpb.GossipSubscriptionRequest)) diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 19c2a15548eb..b3fd50e26063 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -627,30 +627,40 @@ type respStreamClientAdapter struct { errC chan error } -func makeRespStreamClientAdapter(ctx context.Context) respStreamClientAdapter { - return respStreamClientAdapter{ - ctx: ctx, - respC: make(chan interface{}, 128), - errC: make(chan error, 1), - } +type localStream struct { + ctx context.Context } // grpc.ClientStream methods. -func (respStreamClientAdapter) Header() (metadata.MD, error) { panic("unimplemented") } -func (respStreamClientAdapter) Trailer() metadata.MD { panic("unimplemented") } -func (respStreamClientAdapter) CloseSend() error { panic("unimplemented") } +func (localStream) Header() (metadata.MD, error) { panic("unimplemented") } +func (localStream) Trailer() metadata.MD { panic("unimplemented") } +func (localStream) CloseSend() error { panic("unimplemented") } // grpc.ServerStream methods. -func (respStreamClientAdapter) SetHeader(metadata.MD) error { panic("unimplemented") } -func (respStreamClientAdapter) SendHeader(metadata.MD) error { panic("unimplemented") } -func (respStreamClientAdapter) SetTrailer(metadata.MD) { panic("unimplemented") } +func (localStream) SetHeader(metadata.MD) error { panic("unimplemented") } +func (localStream) SendHeader(metadata.MD) error { panic("unimplemented") } +func (localStream) SetTrailer(metadata.MD) { panic("unimplemented") } // grpc.Stream methods. -func (a respStreamClientAdapter) Context() context.Context { return a.ctx } -func (respStreamClientAdapter) SendMsg(m interface{}) error { panic("unimplemented") } -func (respStreamClientAdapter) RecvMsg(m interface{}) error { panic("unimplemented") } +func (a localStream) Context() context.Context { return a.ctx } +func (localStream) SendMsg(m interface{}) error { panic("unimplemented") } +func (localStream) RecvMsg(m interface{}) error { panic("unimplemented") } + +type channelStream struct { + localStream + respC chan interface{} + errC chan error +} -func (a respStreamClientAdapter) recvInternal() (interface{}, error) { +func makeChannelStream(ctx context.Context) channelStream { + return channelStream{ + localStream: localStream{ctx: ctx}, + respC: make(chan interface{}, 128), + errC: make(chan error, 1), + } +} + +func (a channelStream) recvInternal() (interface{}, error) { // Prioritize respC. Both channels are buffered and the only guarantee we // have is that once an error is sent on errC no other events will be sent // on respC again. @@ -665,10 +675,12 @@ func (a respStreamClientAdapter) recvInternal() (interface{}, error) { default: return nil, err } + case <-a.ctx.Done(): + return nil, a.ctx.Err() } } -func (a respStreamClientAdapter) sendInternal(e interface{}) error { +func (a channelStream) sendInternal(e interface{}) error { select { case a.respC <- e: return nil @@ -678,7 +690,7 @@ func (a respStreamClientAdapter) sendInternal(e interface{}) error { } type rangeFeedClientAdapter struct { - respStreamClientAdapter + channelStream } // roachpb.Internal_RangeFeedServer methods. @@ -705,7 +717,7 @@ func (a internalClientAdapter) RangeFeed( ctx, cancel := context.WithCancel(ctx) ctx, sp := tracing.ChildSpan(ctx, "/cockroach.roachpb.Internal/RangeFeed") rfAdapter := rangeFeedClientAdapter{ - respStreamClientAdapter: makeRespStreamClientAdapter(ctx), + channelStream: makeChannelStream(ctx), } // Mark this as originating locally. @@ -723,8 +735,79 @@ func (a internalClientAdapter) RangeFeed( return rfAdapter, nil } +type muxRangeFeedClientAdapter struct { + localStream + sendHandler func(interface{}) error + recvHandler func() (interface{}, error) +} + +func (a muxRangeFeedClientAdapter) Send(r *roachpb.RangeFeedRequest) error { + return a.sendHandler(r) +} + +func (a muxRangeFeedClientAdapter) Recv() (*roachpb.MuxRangeFeedResponse, error) { + e, err := a.recvHandler() + if err != nil { + return nil, err + } + return e.(*roachpb.MuxRangeFeedResponse), nil +} + +var _ roachpb.Internal_MuxRangeFeedClient = muxRangeFeedClientAdapter{} + +type muxRangeFeedServerAdapter struct { + localStream + sendHandler func(interface{}) error + recvHandler func() (interface{}, error) +} + +func (a muxRangeFeedServerAdapter) Send(event *roachpb.MuxRangeFeedResponse) error { + return a.sendHandler(event) +} + +func (a muxRangeFeedServerAdapter) Recv() (*roachpb.RangeFeedRequest, error) { + e, err := a.recvHandler() + if err != nil { + return nil, err + } + return e.(*roachpb.RangeFeedRequest), nil +} + +var _ roachpb.Internal_MuxRangeFeedServer = muxRangeFeedServerAdapter{} + +func (a internalClientAdapter) MuxRangeFeed( + ctx context.Context, opts ...grpc.CallOption, +) (roachpb.Internal_MuxRangeFeedClient, error) { + ctx, cancel := context.WithCancel(ctx) + + serverStream := makeChannelStream(ctx) + clientStream := makeChannelStream(ctx) + + go func() { + defer cancel() + adapter := muxRangeFeedServerAdapter{ + localStream: localStream{ctx: ctx}, + sendHandler: serverStream.sendInternal, + recvHandler: clientStream.recvInternal, + } + err := a.server.MuxRangeFeed(adapter) + if err == nil { + err = io.EOF + } + serverStream.errC <- err + }() + + adapter := muxRangeFeedClientAdapter{ + localStream: localStream{ctx: ctx}, + sendHandler: clientStream.sendInternal, + recvHandler: serverStream.recvInternal, + } + // rfAdapter.client.Send() -> rfAdapter.server.respC + return adapter, nil +} + type gossipSubscriptionClientAdapter struct { - respStreamClientAdapter + channelStream } // roachpb.Internal_GossipSubscriptionServer methods. @@ -751,7 +834,7 @@ func (a internalClientAdapter) GossipSubscription( ctx, cancel := context.WithCancel(ctx) ctx, sp := tracing.ChildSpan(ctx, "/cockroach.roachpb.Internal/GossipSubscription") gsAdapter := gossipSubscriptionClientAdapter{ - respStreamClientAdapter: makeRespStreamClientAdapter(ctx), + channelStream: makeChannelStream(ctx), } go func() { @@ -768,7 +851,7 @@ func (a internalClientAdapter) GossipSubscription( } type tenantSettingsClientAdapter struct { - respStreamClientAdapter + channelStream } // roachpb.Internal_TenantSettingsServer methods. @@ -794,7 +877,7 @@ func (a internalClientAdapter) TenantSettings( ) (roachpb.Internal_TenantSettingsClient, error) { ctx, cancel := context.WithCancel(ctx) gsAdapter := tenantSettingsClientAdapter{ - respStreamClientAdapter: makeRespStreamClientAdapter(ctx), + channelStream: makeChannelStream(ctx), } go func() { diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index 9807247e14d4..a1499346c0bf 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -240,6 +240,10 @@ func (*internalServer) RangeFeed( panic("unimplemented") } +func (*internalServer) RangeFeedStream(roachpb.Internal_RangeFeedStreamServer) error { + panic("unimplemented") +} + func (*internalServer) GossipSubscription( *roachpb.GossipSubscriptionRequest, roachpb.Internal_GossipSubscriptionServer, ) error { diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 93a7f8bc5d6c..58acaaa38670 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -188,6 +188,7 @@ go_library( "//pkg/util", "//pkg/util/admission", "//pkg/util/contextutil", + "//pkg/util/ctxgroup", "//pkg/util/envutil", "//pkg/util/errorutil", "//pkg/util/goschedstats", diff --git a/pkg/server/node.go b/pkg/server/node.go index c766ad0ac9f5..a72fcc4d7a4d 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -43,6 +43,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/admission" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -1153,6 +1154,12 @@ func (n *Node) RangeLookup( // RangeFeed implements the roachpb.InternalServer interface. func (n *Node) RangeFeed( args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, +) error { + return n.singleRangeFeed(args, stream) +} + +func (n *Node) singleRangeFeed( + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) error { pErr := n.stores.RangeFeed(args, stream) if pErr != nil { @@ -1165,6 +1172,58 @@ func (n *Node) RangeFeed( return nil } +type setRangeIDEventSink struct { + ctx context.Context + rangeID roachpb.RangeID + requestID int64 + wrapped roachpb.Internal_MuxRangeFeedServer +} + +func (s *setRangeIDEventSink) Context() context.Context { + return s.ctx +} + +func (s *setRangeIDEventSink) Send(event *roachpb.RangeFeedEvent) error { + response := &roachpb.MuxRangeFeedResponse{ + RangeFeedEvent: *event, + RangeID: s.rangeID, + RequestID: s.requestID, + } + return s.wrapped.Send(response) +} + +var _ roachpb.RangeFeedEventSink = (*setRangeIDEventSink)(nil) + +func (n *Node) asyncRangeFeed( + args roachpb.RangeFeedRequest, stream roachpb.Internal_MuxRangeFeedServer, +) func(ctx context.Context) error { + return func(ctx context.Context) error { + sink := setRangeIDEventSink{ + ctx: ctx, + rangeID: args.RangeID, + requestID: args.RequestID, + wrapped: stream, + } + return n.singleRangeFeed(&args, &sink) + } +} + +// MuxRangeFeed implements the roachpb.InternalServer interface. +func (n *Node) MuxRangeFeed(stream roachpb.Internal_MuxRangeFeedServer) error { + ctx, cancelFeeds := context.WithCancel(stream.Context()) + defer cancelFeeds() + rfGrp := ctxgroup.WithContext(ctx) + + for { + req, err := stream.Recv() + if err != nil { + cancelFeeds() + return errors.CombineErrors(err, rfGrp.Wait()) + } + rfGrp.GoCtx(n.asyncRangeFeed(*req, stream)) + } +} + // ResetQuorum implements the roachpb.InternalServer interface. func (n *Node) ResetQuorum( ctx context.Context, req *roachpb.ResetQuorumRequest,