From 0e933eab8699cdc4f7b20aa5c895bfbe8b614d6d Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Tue, 25 Jan 2022 07:52:45 -0500 Subject: [PATCH] kv: Introduce MuxRangeFeed bi-directional RPC. As demonstrated in #74219, running rangefeed over large enough table or perhaps not consuming rangefeed events quickly enough, may cause undesirable memory blow up, including up to OOMing the node. Since rangefeeds tend to run on (almost) every node, a possiblity exists that a entire cluster might get distabilized. One of the reasons for this is that the memory used by low(er) level http2 RPC transport is not currently being tracked -- nor do we have a mechanism to add such tracking. #74456 provided partial mitigation for this issue, where a single RangeFeed stream could be restricted to use less memory. Nonetheless, the possibility of excessive memory usage remains since the number of rangefeed streams in the system could be very large. This PR introduce a new bi-directional streaming RPC called `MuxRangeFeed`. In a uni-directonal streaming RPC, the client estabilishes connection to the KV server and requests to receive events for 1 span. A separate RPC stream is created for each span. In contrast, `MuxRangeFeed` is bi-directional RPC: the client is expected to connect to the kv server and request as many spans as it wishes to receive from that server. The server multiplexes all events onto a single stream, and the client de-multiplexes those events appropriately on the receiving end. Note: just like in a uni-directional implementation, each call to `MuxRangeFeed` method (i.e. a logical rangefeed established by the client) is treated independently. That is, even though it is possible to multiplex all logical rangefeed streams onto a single bi-directional stream to a single KV node, this optimization is not currently implementation as it raises questions about scheduling and fairness. If we need to further reduce the number of bi-directional streams in the future, from `number_logical_feeds*number_of_nodes` down to the `number_of_nodes`, such optimization can be added at a later time. Multiplexing all of the spans hosted on a node onto a single bi-directional stream reduces the number of such stream from the `number_of_ranges` down to `number_of_nodes` (per logical range feed). This, in turn, enables the client to reserve the worst case amount of memory before starting the rangefeed. This amount of memory is bound by the number of nodes in the cluster times per-stream maximum memory -- default is `2MB`, but this can be changed via dedicated rangefeed connection class. The server side of the equation may also benefit from knowing how many rangefeeds are running right now. Even though the server is somewhat protected from memory blow up via http2 pushback, we may want to also manage server side memory better. Memory accounting for client (and possible server) will be added in the follow on PRs. The new RPC is turned on by default since the intention is to retire the uni-directional rangefeed during later release. However, this default maybe disable (and rangefeed reverted to use old implementation) by setting `COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED` environment variable to `false` and restarting the cluster. Release Notes (enterprise change) Rangefeeds (which are used internally e.g. by changefeeds) now use a common HTTP/2 stream per client for all range replicas on a node, instead of one per replica. This significantly reduces the amount of network buffer memory usage, which could cause nodes to run out of memory if a client was slow to consume events. The rangefeed implementation can be revered back to its old implementation via restarting the cluster with the `COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED` environment variable set to false. --- .../settings/settings-for-tenants.txt | 2 +- docs/generated/settings/settings.html | 2 +- pkg/clusterversion/cockroach_versions.go | 6 + pkg/clusterversion/key_string.go | 5 +- pkg/kv/kvclient/kvcoord/BUILD.bazel | 7 + .../kvcoord/dist_sender_mux_rangefeed.go | 419 +++++++++++++++++ .../kvclient/kvcoord/dist_sender_rangefeed.go | 324 +++++++++---- .../dist_sender_rangefeed_mock_test.go | 177 +++++++ .../kvcoord/dist_sender_rangefeed_test.go | 445 +++++++++++++----- pkg/kv/kvclient/kvcoord/send_test.go | 4 + pkg/kv/kvclient/kvcoord/transport_test.go | 6 + pkg/kv/kvserver/kvserverbase/base.go | 2 +- pkg/kv/kvserver/replica_rangefeed.go | 8 +- pkg/kv/kvserver/store.go | 2 +- pkg/kv/kvserver/stores.go | 2 +- pkg/roachpb/api.go | 7 + pkg/roachpb/api.proto | 14 + pkg/roachpb/mocks_generated.go | 293 +++++++++++- pkg/rpc/auth_tenant.go | 3 +- pkg/rpc/context.go | 129 ++++- pkg/rpc/context_test.go | 4 + pkg/server/BUILD.bazel | 1 + pkg/server/node.go | 59 +++ 23 files changed, 1668 insertions(+), 253 deletions(-) create mode 100644 pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go create mode 100644 pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 115a3027bc5b..d5673df271b0 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -177,4 +177,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 21.2-50 set the active cluster version in the format '.' +version version 21.2-52 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 3ca78510e519..ee96273653a4 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -185,6 +185,6 @@ trace.jaeger.agentstringthe address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collectorstringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collectorstringthe address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -versionversion21.2-50set the active cluster version in the format '.' +versionversion21.2-52set the active cluster version in the format '.' diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 1cc066ae2a2f..e2b940acac5f 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -251,6 +251,8 @@ const ( // EnableProtectedTimestampsForTenant enables the use of protected timestamps // in secondary tenants. EnableProtectedTimestampsForTenant + // Rangefeed use 1 stream per node + RangefeedUseOneStreamPerNode // ************************************************* // Step (1): Add new versions here. @@ -387,6 +389,10 @@ var versionsSingleton = keyedVersions{ Key: EnableProtectedTimestampsForTenant, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 50}, }, + { + Key: RangefeedUseOneStreamPerNode, + Version: roachpb.Version{Major: 21, Minor: 2, Internal: 52}, + }, // ************************************************* // Step (2): Add new versions here. diff --git a/pkg/clusterversion/key_string.go b/pkg/clusterversion/key_string.go index 1f9bfd1df5b4..6fa398b7123d 100644 --- a/pkg/clusterversion/key_string.go +++ b/pkg/clusterversion/key_string.go @@ -34,11 +34,12 @@ func _() { _ = x[UnsafeLossOfQuorumRecoveryRangeLog-23] _ = x[AlterSystemProtectedTimestampAddColumn-24] _ = x[EnableProtectedTimestampsForTenant-25] + _ = x[RangefeedUseOneStreamPerNode-26] } -const _Key_name = "V21_2Start22_1TargetBytesAvoidExcessAvoidDrainingNamesDrainingNamesMigrationTraceIDDoesntImplyStructuredRecordingAlterSystemTableStatisticsAddAvgSizeColAlterSystemStmtDiagReqsMVCCAddSSTableInsertPublicSchemaNamespaceEntryOnRestoreUnsplitRangesInAsyncGCJobsValidateGrantOptionPebbleFormatBlockPropertyCollectorProbeRequestSelectRPCsTakeTracingInfoInbandPreSeedTenantSpanConfigsSeedTenantSpanConfigsPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreScanWholeRowsSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenant" +const _Key_name = "V21_2Start22_1TargetBytesAvoidExcessAvoidDrainingNamesDrainingNamesMigrationTraceIDDoesntImplyStructuredRecordingAlterSystemTableStatisticsAddAvgSizeColAlterSystemStmtDiagReqsMVCCAddSSTableInsertPublicSchemaNamespaceEntryOnRestoreUnsplitRangesInAsyncGCJobsValidateGrantOptionPebbleFormatBlockPropertyCollectorProbeRequestSelectRPCsTakeTracingInfoInbandPreSeedTenantSpanConfigsSeedTenantSpanConfigsPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreScanWholeRowsSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenantRangefeedUseOneStreamPerNode" -var _Key_index = [...]uint16{0, 5, 14, 36, 54, 76, 113, 152, 175, 189, 230, 256, 275, 309, 321, 352, 376, 397, 425, 455, 483, 504, 517, 536, 570, 608, 642} +var _Key_index = [...]uint16{0, 5, 14, 36, 54, 76, 113, 152, 175, 189, 230, 256, 275, 309, 321, 352, 376, 397, 425, 455, 483, 504, 517, 536, 570, 608, 642, 670} func (i Key) String() string { if i < 0 || i >= Key(len(_Key_index)-1) { diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index e7f17acd46db..55b1a504aa4b 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -8,6 +8,7 @@ go_library( "batch.go", "condensable_span_set.go", "dist_sender.go", + "dist_sender_mux_rangefeed.go", "dist_sender_rangefeed.go", "doc.go", "local_test_cluster_util.go", @@ -36,6 +37,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/base", + "//pkg/clusterversion", "//pkg/gossip", "//pkg/keys", "//pkg/kv", @@ -112,6 +114,7 @@ go_test( srcs = [ "batch_test.go", "condensable_span_set_test.go", + "dist_sender_rangefeed_mock_test.go", "dist_sender_rangefeed_test.go", "dist_sender_server_test.go", "dist_sender_test.go", @@ -143,6 +146,7 @@ go_test( deps = [ "//build/bazelutil:noop", "//pkg/base", + "//pkg/clusterversion", "//pkg/config", "//pkg/config/zonepb", "//pkg/gossip", @@ -164,6 +168,7 @@ go_test( "//pkg/security/securitytest", "//pkg/server", "//pkg/settings/cluster", + "//pkg/sql/catalog/desctestutils", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", "//pkg/storage", @@ -174,9 +179,11 @@ go_test( "//pkg/testutils/localtestcluster", "//pkg/testutils/serverutils", "//pkg/testutils/skip", + "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util", "//pkg/util/caller", + "//pkg/util/ctxgroup", "//pkg/util/grpcutil", "//pkg/util/hlc", "//pkg/util/leaktest", diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go new file mode 100644 index 000000000000..91da3da53d1e --- /dev/null +++ b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go @@ -0,0 +1,419 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvcoord + +import ( + "context" + "fmt" + "io" + + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/grpcutil" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" +) + +type rfStream struct { + g ctxgroup.Group + ds *DistSender + rr *rangeFeedRegistry + eventsCh chan<- *roachpb.RangeFeedEvent + withDiff bool + + mu struct { + syncutil.Mutex + + // Currently running MuxRangeFeed streams. + streams map[roachpb.NodeID]*streamState + + // Counter identifying particular RangeFeed stream. + rangeFeedID int64 + } +} + +// startMuxRangeFeed starts rangefeed using streaming RangeFeedStream RPC. +func (ds *DistSender) startMuxRangeFeed( + ctx context.Context, + spans []roachpb.Span, + startFrom hlc.Timestamp, + withDiff bool, + rr *rangeFeedRegistry, + eventCh chan<- *roachpb.RangeFeedEvent, +) error { + rSpans, err := spansToResolvedSpans(spans) + if err != nil { + return err + } + + rfs := rfStream{ + g: ctxgroup.WithContext(ctx), + ds: ds, + rr: rr, + eventsCh: eventCh, + withDiff: withDiff, + } + + // Kick off the initial set of ranges. + for i := range rSpans { + rSpan := rSpans[i] + rfs.g.GoCtx(func(ctx context.Context) error { + return divideAndIterateRSpan(ctx, ds, rSpan, + func(rs roachpb.RSpan, token rangecache.EvictionToken) error { + return rfs.startRangeFeedStream(ctx, rs, startFrom, token) + }, + ) + }) + } + + return rfs.g.Wait() +} + +type rangeState struct { + *activeRangeFeed + startFrom hlc.Timestamp + token rangecache.EvictionToken + rSpan roachpb.RSpan +} + +func (rs *rangeState) restartFrom() hlc.Timestamp { + restartFrom := rs.activeRangeFeed.snapshot().Resolved + if restartFrom.Less(rs.startFrom) { + // RangeFeed happily forwards any closed timestamps it receives as + // soon as there are no outstanding intents under them. + // There it's possible that the resolved timestamp observed by this stream + // might be lower than the initial startFrom. + restartFrom = rs.startFrom + } + return restartFrom +} + +type rangeStateKey struct { + requestID int64 + rangeID roachpb.RangeID +} + +type streamState struct { + nodeID roachpb.NodeID + stream roachpb.Internal_MuxRangeFeedClient + + mu struct { + syncutil.Mutex + activeFeeds map[rangeStateKey]rangeState + } +} + +func (ss *streamState) getRangeState(key rangeStateKey) rangeState { + ss.mu.Lock() + defer ss.mu.Unlock() + return ss.mu.activeFeeds[key] +} + +func (ss *streamState) clearActive(key rangeStateKey) error { + ss.mu.Lock() + defer ss.mu.Unlock() + if rs, found := ss.mu.activeFeeds[key]; found { + rs.activeRangeFeed.release() + delete(ss.mu.activeFeeds, key) + return nil + } + return errors.AssertionFailedf("expected to find active range feed %s, found none", key) +} + +func (ss *streamState) numActiveFeeds() int { + ss.mu.Lock() + defer ss.mu.Unlock() + return len(ss.mu.activeFeeds) +} + +func (ss *streamState) markActive( + rr *rangeFeedRegistry, + nodeID roachpb.NodeID, + requestID int64, + token rangecache.EvictionToken, + rSpan roachpb.RSpan, + startFrom hlc.Timestamp, +) rangeStateKey { + ss.mu.Lock() + defer ss.mu.Unlock() + + if ss.mu.activeFeeds == nil { + ss.mu.activeFeeds = make(map[rangeStateKey]rangeState) + } + + rangeID := token.Desc().RangeID + span := rSpan.AsRawSpanWithNoLocals() + + active := newActiveRangeFeed(rr, span, startFrom) + active.setNodeID(nodeID) + active.setRangeID(rangeID) + + key := rangeStateKey{ + requestID: requestID, + rangeID: rangeID, + } + + ss.mu.activeFeeds[key] = rangeState{ + activeRangeFeed: active, + token: token, + rSpan: rSpan, + startFrom: startFrom, + } + return key +} + +// nextRequestID returns the next id to use when issuing RangeFeed calls on MuxStream. +func (rfs *rfStream) nextRequestID() int64 { + rfs.mu.Lock() + defer rfs.mu.Unlock() + rfs.mu.rangeFeedID++ + return rfs.mu.rangeFeedID +} + +// startRangeFeedStreem opens RangeFeedStream connection to a node that can serve +// rangefeed for the one of the replicas in token range descriptor. +func (rfs *rfStream) startRangeFeedStream( + ctx context.Context, rs roachpb.RSpan, startFrom hlc.Timestamp, token rangecache.EvictionToken, +) error { + if ctx.Err() != nil { + // Don't bother starting stream if we are already cancelled. + return ctx.Err() + } + + for r := retry.StartWithCtx(ctx, rfs.ds.rpcRetryOptions); r.Next(); { + if !token.Valid() { + var err error + ri, err := rfs.ds.getRoutingInfo(ctx, rs.Key, rangecache.EvictionToken{}, false) + if err != nil { + log.VErrEventf(ctx, 1, "range descriptor re-lookup failed: %s", err) + if !rangecache.IsRangeLookupErrorRetryable(err) { + return err + } + continue + } + token = ri + } + + ss, replica, mustStartConsumer, err := rfs.lookupOrCreateStream(ctx, token) + + if err != nil { + if IsSendError(err) { + token.Evict(ctx) + continue + } + return err + } + + if mustStartConsumer { + rfs.g.GoCtx(func(ctx context.Context) error { + if ss == nil { + panic("ss is nil") + } + return rfs.rfStreamEventProcessor(ctx, ss) + }) + } + + rangeID := token.Desc().RangeID + activeKey := ss.markActive(rfs.rr, replica.NodeID, rfs.nextRequestID(), token, rs, startFrom) + + req := roachpb.RangeFeedRequest{ + Span: rs.AsRawSpanWithNoLocals(), + Header: roachpb.Header{ + Timestamp: startFrom, + RangeID: rangeID, + }, + WithDiff: rfs.withDiff, + RequestID: activeKey.requestID, + } + req.Replica = replica + err = ss.stream.Send(&req) + if err == nil { + return nil + } + + log.VErrEventf(ctx, 2, "RPC error: %s", err) + if grpcutil.IsAuthError(err) { + // Authentication or authorization error. Propagate. + return err + } + // Otherwise, fallback to retry. + if err := ss.clearActive(activeKey); err != nil { + return err + } + } + + return ctx.Err() +} + +func (rfs *rfStream) lookupOrCreateStream( + ctx context.Context, token rangecache.EvictionToken, +) (*streamState, roachpb.ReplicaDescriptor, bool, error) { + transport, replicas, err := rfs.ds.prepareTransportForDescriptor(ctx, token.Desc()) + if err != nil { + return nil, roachpb.ReplicaDescriptor{}, false, err + } + defer transport.Release() + + for !transport.IsExhausted() { + ss, replica, mustStartConsumer, err := rfs.lookupOrCreateStreamForNextReplica(ctx, transport) + if err == nil { + return ss, replica, mustStartConsumer, err + } + } + + return nil, roachpb.ReplicaDescriptor{}, false, newSendError( + fmt.Sprintf("sending to all %d replicas failed", len(replicas))) +} + +func (rfs *rfStream) lookupOrCreateStreamForNextReplica( + ctx context.Context, transport Transport, +) (*streamState, roachpb.ReplicaDescriptor, bool, error) { + replica := transport.NextReplica() + nodeID := replica.NodeID + + rfs.mu.Lock() + defer rfs.mu.Unlock() + + if rfs.mu.streams == nil { + rfs.mu.streams = make(map[roachpb.NodeID]*streamState) + } + if ss, ok := rfs.mu.streams[nodeID]; ok { + return ss, replica, false, nil + } + clientCtx, client, err := transport.NextInternalClient(ctx) + if err != nil { + return nil, roachpb.ReplicaDescriptor{}, false, err + } + stream, err := client.MuxRangeFeed(clientCtx) + if err != nil { + return nil, roachpb.ReplicaDescriptor{}, false, err + } + + ss := &streamState{ + nodeID: nodeID, + stream: stream, + } + + rfs.mu.streams[nodeID] = ss + return ss, replica, true, nil +} + +func (rfs *rfStream) restartAllActive(ctx context.Context, ss *streamState) error { + rfs.mu.Lock() + defer rfs.mu.Unlock() + + if rfs.mu.streams[ss.nodeID] != ss { + panic("corrupt rangefeed state") + } + + delete(rfs.mu.streams, ss.nodeID) + + ss.mu.Lock() + defer ss.mu.Unlock() + + for _, active := range ss.mu.activeFeeds { + if err := rfs.restartRange(ctx, active.rSpan, active.restartFrom()); err != nil { + return err + } + } + + return nil +} + +func (rfs *rfStream) restartRange( + ctx context.Context, rs roachpb.RSpan, restartFrom hlc.Timestamp, +) error { + return divideAndIterateRSpan(ctx, rfs.ds, rs, + func(rs roachpb.RSpan, token rangecache.EvictionToken) error { + return rfs.startRangeFeedStream(ctx, rs, restartFrom, token) + }, + ) +} + +// rfStreamEventProcessor is responsible for processing rangefeed events from a node. +func (rfs *rfStream) rfStreamEventProcessor(ctx context.Context, ss *streamState) error { + for { + event, eventErr := ss.stream.Recv() + if eventErr == io.EOF { + if log.V(1) { + log.Infof(ctx, "RangeFeed for node %d disconnected. Restarting %d ranges", + ss.nodeID, ss.numActiveFeeds()) + } + return rfs.restartAllActive(ctx, ss) + } + + // Get range state associated with this rangefeed. + var rs rangeState + if eventErr == nil { + if event == nil { + return errors.AssertionFailedf("unexpected state: expected non nil event") + } + rsKey := rangeStateKey{requestID: event.RequestID, rangeID: event.RangeID} + rs = ss.getRangeState(rsKey) + if rs.activeRangeFeed == nil { + return errors.AssertionFailedf( + "unexpected state: expected to find active range feed for %v, found none", + rsKey) + } + } + + if eventErr == nil && event.Error != nil { + eventErr = event.Error.Error.GoError() + } + + if eventErr == nil { + // Dispatch event to the caller. + rs.onRangeEvent(&event.RangeFeedEvent) + + select { + case rfs.eventsCh <- &event.RangeFeedEvent: + case <-ctx.Done(): + return ctx.Err() + } + continue + } + + errDisposition, err := handleRangeError(eventErr) + if err != nil { + return err + } + + // Clear active range feed -- it will be re-added when we retry/restart. + if err := ss.clearActive( + rangeStateKey{requestID: event.RequestID, rangeID: event.RangeID}, + ); err != nil { + return err + } + + restartFrom := rs.restartFrom() + if log.V(1) { + log.Infof(ctx, "Transient error (%v) for rangefeed from node %d for range %d: %s %s@%s", + eventErr, ss.nodeID, event.RangeID, errDisposition, rs.rSpan, restartFrom) + } + + switch errDisposition { + case retryRange: + if err := rfs.startRangeFeedStream(ctx, rs.rSpan, restartFrom, rs.token); err != nil { + return err + } + case restartRange: + if err := rfs.restartRange(ctx, rs.rSpan, restartFrom); err != nil { + return err + } + default: + panic("unexpected error disposition") + } + } +} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 75c916399539..320da7f6dc96 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -18,6 +18,7 @@ import ( "time" "unsafe" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -25,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/iterutil" @@ -51,6 +53,9 @@ var useDedicatedRangefeedConnectionClass = settings.RegisterBoolSetting( "kv.rangefeed.use_dedicated_connection_class.enabled", false), ) +// A "kill switch" to disable multiplexing rangefeed if severe issues discovered with new implementation. +var enableMuxRangeFeed = envutil.EnvOrDefaultBool("COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED", true) + // RangeFeed divides a RangeFeed request on range boundaries and establishes a // RangeFeed to each of the individual ranges. It streams back results on the // provided channel. @@ -76,6 +81,28 @@ func (ds *DistSender) RangeFeed( ds.activeRangeFeeds.Store(rr, nil) defer ds.activeRangeFeeds.Delete(rr) + if ds.st.Version.IsActive(ctx, clusterversion.RangefeedUseOneStreamPerNode) && + enableMuxRangeFeed { + return ds.startMuxRangeFeed(ctx, spans, startFrom, withDiff, rr, eventCh) + } + + return ds.startRangeFeed(ctx, spans, startFrom, withDiff, rr, eventCh) +} + +// TODO(yevgeniy): Deprecate and remove non-streaming implementation in 22.2 +func (ds *DistSender) startRangeFeed( + ctx context.Context, + spans []roachpb.Span, + startFrom hlc.Timestamp, + withDiff bool, + rr *rangeFeedRegistry, + eventCh chan<- *roachpb.RangeFeedEvent, +) error { + rSpans, err := spansToResolvedSpans(spans) + if err != nil { + return err + } + g := ctxgroup.WithContext(ctx) // Goroutine that processes subdivided ranges and creates a rangefeed for // each. @@ -95,18 +122,29 @@ func (ds *DistSender) RangeFeed( }) // Kick off the initial set of ranges. - for _, span := range spans { - rs, err := keys.SpanAddr(span) - if err != nil { - return err - } + for i := range rSpans { + rs := rSpans[i] g.GoCtx(func(ctx context.Context) error { - return ds.divideAndSendRangeFeedToRanges(ctx, rs, startFrom, rangeCh) + return divideAndSendRangeFeedToRanges(ctx, ds, rs, startFrom, rangeCh) }) } + return g.Wait() } +func spansToResolvedSpans(spans []roachpb.Span) (rSpans []roachpb.RSpan, _ error) { + var sg roachpb.SpanGroup + sg.Add(spans...) + for _, sp := range sg.Slice() { + rs, err := keys.SpanAddr(sp) + if err != nil { + return nil, err + } + rSpans = append(rSpans, rs) + } + return rSpans, nil +} + // RangeFeedContext is the structure containing arguments passed to // RangeFeed call. It functions as a kind of key for an active range feed. type RangeFeedContext struct { @@ -138,17 +176,11 @@ func (ds *DistSender) ForEachActiveRangeFeed(fn ActiveRangeFeedIterFn) (iterErr const continueIter = true const stopIter = false - partialRangeFeed := func(active *activeRangeFeed) PartialRangeFeed { - active.Lock() - defer active.Unlock() - return active.PartialRangeFeed - } - ds.activeRangeFeeds.Range(func(k, v interface{}) bool { r := k.(*rangeFeedRegistry) r.ranges.Range(func(k, v interface{}) bool { active := k.(*activeRangeFeed) - if err := fn(r.RangeFeedContext, partialRangeFeed(active)); err != nil { + if err := fn(r.RangeFeedContext, active.snapshot()); err != nil { iterErr = err return stopIter } @@ -166,23 +198,50 @@ func (ds *DistSender) ForEachActiveRangeFeed(fn ActiveRangeFeedIterFn) (iterErr // activeRangeFeed is a thread safe PartialRangeFeed. type activeRangeFeed struct { - syncutil.Mutex - PartialRangeFeed + release func() + mu struct { + syncutil.Mutex + PartialRangeFeed + } } -func (a *activeRangeFeed) onRangeEvent( - nodeID roachpb.NodeID, rangeID roachpb.RangeID, event *roachpb.RangeFeedEvent, -) { - a.Lock() - defer a.Unlock() +func newActiveRangeFeed( + rr *rangeFeedRegistry, span roachpb.Span, startTS hlc.Timestamp, +) *activeRangeFeed { + a := &activeRangeFeed{} + a.mu.PartialRangeFeed.Span = span + a.mu.StartTS = startTS + a.release = func() { rr.ranges.Delete(a) } + rr.ranges.Store(a, nil) + return a +} + +func (a *activeRangeFeed) snapshot() PartialRangeFeed { + a.mu.Lock() + defer a.mu.Unlock() + return a.mu.PartialRangeFeed +} + +func (a *activeRangeFeed) setNodeID(nodeID roachpb.NodeID) { + a.mu.Lock() + defer a.mu.Unlock() + a.mu.NodeID = nodeID +} + +func (a *activeRangeFeed) setRangeID(rangeID roachpb.RangeID) { + a.mu.Lock() + defer a.mu.Unlock() + a.mu.RangeID = rangeID +} + +func (a *activeRangeFeed) onRangeEvent(event *roachpb.RangeFeedEvent) { + a.mu.Lock() + defer a.mu.Unlock() if event.Val != nil || event.SST != nil { - a.LastValueReceived = timeutil.Now() + a.mu.LastValueReceived = timeutil.Now() } else if event.Checkpoint != nil { - a.Resolved = event.Checkpoint.ResolvedTS + a.mu.Resolved = event.Checkpoint.ResolvedTS } - - a.NodeID = nodeID - a.RangeID = rangeID } // rangeFeedRegistry is responsible for keeping track of currently executing @@ -209,8 +268,11 @@ func newRangeFeedRegistry( return rr } -func (ds *DistSender) divideAndSendRangeFeedToRanges( - ctx context.Context, rs roachpb.RSpan, startFrom hlc.Timestamp, rangeCh chan<- singleRangeInfo, +func divideAndIterateRSpan( + ctx context.Context, + ds *DistSender, + rs roachpb.RSpan, + fn func(rs roachpb.RSpan, token rangecache.EvictionToken) error, ) error { // As RangeIterator iterates, it can return overlapping descriptors (and // during splits, this happens frequently), but divideAndSendRangeFeedToRanges @@ -226,14 +288,8 @@ func (ds *DistSender) divideAndSendRangeFeedToRanges( return err } nextRS.Key = partialRS.EndKey - select { - case rangeCh <- singleRangeInfo{ - rs: partialRS, - startFrom: startFrom, - token: ri.Token(), - }: - case <-ctx.Done(): - return ctx.Err() + if err := fn(partialRS, ri.Token()); err != nil { + return err } if !ri.NeedAnother(nextRS) { break @@ -242,6 +298,29 @@ func (ds *DistSender) divideAndSendRangeFeedToRanges( return ri.Error() } +func divideAndSendRangeFeedToRanges( + ctx context.Context, + ds *DistSender, + rs roachpb.RSpan, + startFrom hlc.Timestamp, + rangeCh chan<- singleRangeInfo, +) error { + return divideAndIterateRSpan( + ctx, ds, rs, + func(rs roachpb.RSpan, token rangecache.EvictionToken) error { + select { + case rangeCh <- singleRangeInfo{ + rs: rs, + startFrom: startFrom, + token: token, + }: + return nil + case <-ctx.Done(): + return ctx.Err() + } + }) +} + // partialRangeFeed establishes a RangeFeed to the range specified by desc. It // manages lifecycle events of the range in order to maintain the RangeFeed // connection; this may involve instructing higher-level functions to retry @@ -260,14 +339,8 @@ func (ds *DistSender) partialRangeFeed( span := rs.AsRawSpanWithNoLocals() // Register partial range feed with registry. - active := &activeRangeFeed{ - PartialRangeFeed: PartialRangeFeed{ - Span: span, - StartTS: startFrom, - }, - } - rr.ranges.Store(active, nil) - defer rr.ranges.Delete(active) + active := newActiveRangeFeed(rr, span, startFrom) + defer active.release() // Start a retry loop for sending the batch to the range. for r := retry.StartWithCtx(ctx, ds.rpcRetryOptions); r.Next(); { @@ -286,7 +359,7 @@ func (ds *DistSender) partialRangeFeed( } // Establish a RangeFeed for a single Range. - maxTS, err := ds.singleRangeFeed(ctx, span, startFrom, withDiff, token.Desc(), eventCh, active.onRangeEvent) + maxTS, err := ds.singleRangeFeed(ctx, span, startFrom, withDiff, token.Desc(), active, eventCh) // Forward the timestamp in case we end up sending it again. startFrom.Forward(maxTS) @@ -296,55 +369,26 @@ func (ds *DistSender) partialRangeFeed( log.Infof(ctx, "RangeFeed %s disconnected with last checkpoint %s ago: %v", span, timeutil.Since(startFrom.GoTime()), err) } - switch { - case errors.HasType(err, (*roachpb.StoreNotFoundError)(nil)) || - errors.HasType(err, (*roachpb.NodeUnavailableError)(nil)): - // These errors are likely to be unique to the replica that - // reported them, so no action is required before the next - // retry. - case IsSendError(err), errors.HasType(err, (*roachpb.RangeNotFoundError)(nil)): - // Evict the descriptor from the cache and reload on next attempt. - token.Evict(ctx) - token = rangecache.EvictionToken{} - continue - case errors.HasType(err, (*roachpb.RangeKeyMismatchError)(nil)): + errDisposition, err := handleRangeError(err) + if err != nil { + return err + } + + switch errDisposition { + case restartRange: // Evict the descriptor from the cache. token.Evict(ctx) - return ds.divideAndSendRangeFeedToRanges(ctx, rs, startFrom, rangeCh) - case errors.HasType(err, (*roachpb.RangeFeedRetryError)(nil)): - var t *roachpb.RangeFeedRetryError - if ok := errors.As(err, &t); !ok { - return errors.AssertionFailedf("wrong error type: %T", err) - } - switch t.Reason { - case roachpb.RangeFeedRetryError_REASON_REPLICA_REMOVED, - roachpb.RangeFeedRetryError_REASON_RAFT_SNAPSHOT, - roachpb.RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING, - roachpb.RangeFeedRetryError_REASON_SLOW_CONSUMER: - // Try again with same descriptor. These are transient - // errors that should not show up again. - continue - case roachpb.RangeFeedRetryError_REASON_RANGE_SPLIT, - roachpb.RangeFeedRetryError_REASON_RANGE_MERGED, - roachpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER: - // Evict the descriptor from the cache. - token.Evict(ctx) - return ds.divideAndSendRangeFeedToRanges(ctx, rs, startFrom, rangeCh) - default: - return errors.AssertionFailedf("unrecognized retryable error type: %T", err) - } + return divideAndSendRangeFeedToRanges(ctx, ds, rs, startFrom, rangeCh) + case retryRange: + // Nothing -- fallback to retry loop. default: - return err + panic("unexpected error disposition") } } } return ctx.Err() } -// onRangeEventCb is invoked for each non-error range event. -// nodeID identifies the node ID which generated the event. -type onRangeEventCb func(nodeID roachpb.NodeID, rangeID roachpb.RangeID, event *roachpb.RangeFeedEvent) - // singleRangeFeed gathers and rearranges the replicas, and makes a RangeFeed // RPC call. Results will be sent on the provided channel. Returns the timestamp // of the maximum rangefeed checkpoint seen, which can be used to re-establish @@ -357,8 +401,8 @@ func (ds *DistSender) singleRangeFeed( startFrom hlc.Timestamp, withDiff bool, desc *roachpb.RangeDescriptor, + active *activeRangeFeed, eventCh chan<- *roachpb.RangeFeedEvent, - onRangeEvent onRangeEventCb, ) (hlc.Timestamp, error) { args := roachpb.RangeFeedRequest{ Span: span, @@ -368,20 +412,9 @@ func (ds *DistSender) singleRangeFeed( }, WithDiff: withDiff, } + active.setRangeID(desc.RangeID) - var latencyFn LatencyFunc - if ds.rpcContext != nil { - latencyFn = ds.rpcContext.RemoteClocks.Latency - } - replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, nil, AllExtantReplicas) - if err != nil { - return args.Timestamp, err - } - replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), latencyFn) - // The RangeFeed is not used for system critical traffic so use a DefaultClass - // connection regardless of the range. - opts := SendOptions{class: connectionClass(&ds.st.SV)} - transport, err := ds.transportFactory(opts, ds.nodeDialer, replicas) + transport, replicas, err := ds.prepareTransportForDescriptor(ctx, desc) if err != nil { return args.Timestamp, err } @@ -394,6 +427,8 @@ func (ds *DistSender) singleRangeFeed( } args.Replica = transport.NextReplica() + active.setNodeID(args.Replica.NodeID) + clientCtx, client, err := transport.NextInternalClient(ctx) if err != nil { log.VErrEventf(ctx, 2, "RPC error: %s", err) @@ -426,7 +461,7 @@ func (ds *DistSender) singleRangeFeed( log.VErrEventf(ctx, 2, "RangeFeedError: %s", t.Error.GoError()) return args.Timestamp, t.Error.GoError() } - onRangeEvent(args.Replica.NodeID, desc.RangeID, event) + active.onRangeEvent(event) select { case eventCh <- event: @@ -437,6 +472,97 @@ func (ds *DistSender) singleRangeFeed( } } +type errorDisposition int + +const ( + // abortRange is a sentinel indicating rangefeed should be aborted because + // of an error. + abortRange errorDisposition = iota + + // restartRange indicates that the rangefeed for the range should be restarted. + // this includes updating routing information, splitting range on range boundaries + // and re-establishing rangefeeds for 1 or more ranges. + restartRange + + // retryRange indicates that the rangefeed should be simply retried. + retryRange +) + +func (d errorDisposition) String() string { + switch d { + case restartRange: + return "restart" + case retryRange: + return "retry" + default: + return "abort" + } +} + +// handleRangeError classifies rangefeed error and returns error disposition to the caller +// indicating how such error should be handled. +func handleRangeError(err error) (errorDisposition, error) { + switch { + case errors.HasType(err, (*roachpb.StoreNotFoundError)(nil)) || + errors.HasType(err, (*roachpb.NodeUnavailableError)(nil)): + // These errors are likely to be unique to the replica that + // reported them, so no action is required before the next + // retry. + return retryRange, nil + case IsSendError(err), errors.HasType(err, (*roachpb.RangeNotFoundError)(nil)): + return restartRange, nil + case errors.HasType(err, (*roachpb.RangeKeyMismatchError)(nil)): + return restartRange, nil + case errors.HasType(err, (*roachpb.RangeFeedRetryError)(nil)): + var t *roachpb.RangeFeedRetryError + if ok := errors.As(err, &t); !ok { + return abortRange, errors.AssertionFailedf("wrong error type: %T", err) + } + switch t.Reason { + case roachpb.RangeFeedRetryError_REASON_REPLICA_REMOVED, + roachpb.RangeFeedRetryError_REASON_RAFT_SNAPSHOT, + roachpb.RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING, + roachpb.RangeFeedRetryError_REASON_SLOW_CONSUMER: + // Try again with same descriptor. These are transient + // errors that should not show up again. + return retryRange, nil + case roachpb.RangeFeedRetryError_REASON_RANGE_SPLIT, + roachpb.RangeFeedRetryError_REASON_RANGE_MERGED, + roachpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER: + return restartRange, nil + default: + return abortRange, errors.AssertionFailedf("unrecognized retryable error type: %T", err) + } + default: + return abortRange, err + } +} + +// prepareTransportForDescriptor creates and configures RPC transport for the specified +// descriptor. Returns the transport, which the caller is expected to release along with the +// replicas slice used to configure the transport. +func (ds *DistSender) prepareTransportForDescriptor( + ctx context.Context, desc *roachpb.RangeDescriptor, +) (Transport, ReplicaSlice, error) { + var latencyFn LatencyFunc + if ds.rpcContext != nil { + latencyFn = ds.rpcContext.RemoteClocks.Latency + } + replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, nil, AllExtantReplicas) + if err != nil { + return nil, nil, err + } + replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), latencyFn) + // The RangeFeed is not used for system critical traffic so use a DefaultClass + // connection regardless of the range. + opts := SendOptions{class: connectionClass(&ds.st.SV)} + transport, err := ds.transportFactory(opts, ds.nodeDialer, replicas) + if err != nil { + return nil, nil, err + } + return transport, replicas, nil +} + func connectionClass(sv *settings.Values) rpc.ConnectionClass { if useDedicatedRangefeedConnectionClass.Get(sv) { return rpc.RangefeedClass diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go new file mode 100644 index 000000000000..bf1fd4ecf013 --- /dev/null +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go @@ -0,0 +1,177 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvcoord + +import ( + "context" + "fmt" + "io" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/gossip" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + gomock "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" +) + +func TestingSetEnableStreamingRangefeed(enabled bool) func() { + old := enableMuxRangeFeed + enableMuxRangeFeed = enabled + return func() { + enableMuxRangeFeed = old + } +} + +// Tests that the range feed handles transport errors appropriately. In +// particular, that when encountering other decommissioned nodes it will refresh +// its range descriptor and retry, but if this node is decommissioned it will +// bail out. Regression test for: +// https://github.com/cockroachdb/cockroach/issues/66636 +func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + for _, useStreamingRangefeed := range []bool{false, true} { + enableMuxRangeFeed = useStreamingRangefeed + + for _, spec := range []struct { + errorCode codes.Code + expectRetry bool + }{ + {codes.FailedPrecondition, true}, // target node is decommissioned; retry + {codes.PermissionDenied, false}, // this node is decommissioned; abort + {codes.Unauthenticated, false}, // this node is not part of cluster; abort + } { + t.Run(fmt.Sprintf("streaming=%t/%s", useStreamingRangefeed, spec.errorCode), + func(t *testing.T) { + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) + g := makeGossip(t, stopper, rpcContext) + + desc := roachpb.RangeDescriptor{ + RangeID: 1, + Generation: 1, + StartKey: roachpb.RKeyMin, + EndKey: roachpb.RKeyMax, + InternalReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + }, + } + for _, repl := range desc.InternalReplicas { + require.NoError(t, g.AddInfoProto( + gossip.MakeNodeIDKey(repl.NodeID), + newNodeDesc(repl.NodeID), + gossip.NodeDescriptorTTL, + )) + } + + ctrl := gomock.NewController(t) + transport := NewMockTransport(ctrl) + rangeDB := rangecache.NewMockRangeDescriptorDB(ctrl) + + // We start off with a cached lease on r1. + cachedLease := roachpb.Lease{ + Replica: desc.InternalReplicas[0], + Sequence: 1, + } + + // All nodes return the specified error code. We expect the range feed to + // keep trying all replicas in sequence regardless of error. + for _, repl := range desc.InternalReplicas { + transport.EXPECT().IsExhausted().Return(false) + transport.EXPECT().NextReplica().Return(repl) + transport.EXPECT().NextInternalClient(gomock.Any()).Return( + ctx, nil, grpcstatus.Error(spec.errorCode, "")) + } + transport.EXPECT().IsExhausted().Return(true) + transport.EXPECT().Release() + + // Once all replicas have failed, it should try to refresh the lease using + // the range cache. We let this succeed once. + rangeDB.EXPECT().FirstRange().Return(&desc, nil) + + // It then tries the replicas again. This time we just report the + // transport as exhausted immediately. + transport.EXPECT().IsExhausted().Return(true) + transport.EXPECT().Release() + + // This invalidates the cache yet again. This time we error. + rangeDB.EXPECT().FirstRange().Return(nil, grpcstatus.Error(spec.errorCode, "")) + + // If we expect a range lookup retry, allow the retry to succeed by + // returning a range descriptor and a client that immediately + // cancels the context and closes the range feed stream. + if spec.expectRetry { + rangeDB.EXPECT().FirstRange().Return(&desc, nil) + client := roachpb.NewMockInternalClient(ctrl) + + if useStreamingRangefeed { + stream := roachpb.NewMockInternal_MuxRangeFeedClient(ctrl) + stream.EXPECT().Send(gomock.Any()).Return(nil) + stream.EXPECT().Recv().Do(cancel).Return(nil, io.EOF) + client.EXPECT().MuxRangeFeed(gomock.Any()).Return(stream, nil) + } else { + stream := roachpb.NewMockInternal_RangeFeedClient(ctrl) + stream.EXPECT().Recv().Do(cancel).Return(nil, io.EOF) + client.EXPECT().RangeFeed(gomock.Any(), gomock.Any()).Return(stream, nil) + } + + transport.EXPECT().IsExhausted().Return(false) + transport.EXPECT().NextReplica().Return(desc.InternalReplicas[0]) + transport.EXPECT().NextInternalClient(gomock.Any()).Return(ctx, client, nil) + transport.EXPECT().Release() + } + + ds := NewDistSender(DistSenderConfig{ + AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()}, + Clock: clock, + NodeDescs: g, + RPCRetryOptions: &retry.Options{MaxRetries: 10}, + RPCContext: rpcContext, + TestingKnobs: ClientTestingKnobs{ + TransportFactory: func(SendOptions, *nodedialer.Dialer, ReplicaSlice) (Transport, error) { + return transport, nil + }, + }, + RangeDescriptorDB: rangeDB, + NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), + Settings: cluster.MakeTestingClusterSettings(), + }) + ds.rangeCache.Insert(ctx, roachpb.RangeInfo{ + Desc: desc, + Lease: cachedLease, + }) + + err := ds.RangeFeed(ctx, []roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}, hlc.Timestamp{}, false, nil) + require.Error(t, err) + }) + } + } +} diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go index 0d757bacb013..dec047ea1e14 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go @@ -8,146 +8,359 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package kvcoord +package kvcoord_test import ( "context" - "io" "testing" - "time" - "github.com/cockroachdb/cockroach/pkg/gossip" + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/retry" - "github.com/cockroachdb/cockroach/pkg/util/stop" - gomock "github.com/golang/mock/gomock" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" - "google.golang.org/grpc/codes" - grpcstatus "google.golang.org/grpc/status" + "google.golang.org/grpc" ) -// Tests that the range feed handles transport errors appropriately. In -// particular, that when encountering other decommissioned nodes it will refresh -// its range descriptor and retry, but if this node is decommissioned it will -// bail out. Regression test for: -// https://github.com/cockroachdb/cockroach/issues/66636 -func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) +type testRangefeedClient struct { + roachpb.InternalClient + muxRangeFeedEnabled bool + count func() +} - for _, spec := range []struct { - errorCode codes.Code - expectRetry bool - }{ - {codes.FailedPrecondition, true}, // target node is decommissioned; retry - {codes.PermissionDenied, false}, // this node is decommissioned; abort - {codes.Unauthenticated, false}, // this node is not part of cluster; abort - } { - t.Run(spec.errorCode.String(), func(t *testing.T) { - clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) - g := makeGossip(t, stopper, rpcContext) - - desc := roachpb.RangeDescriptor{ - RangeID: 1, - Generation: 1, - StartKey: roachpb.RKeyMin, - EndKey: roachpb.RKeyMax, - InternalReplicas: []roachpb.ReplicaDescriptor{ - {NodeID: 1, StoreID: 1, ReplicaID: 1}, - {NodeID: 2, StoreID: 2, ReplicaID: 2}, - }, - } - for _, repl := range desc.InternalReplicas { - require.NoError(t, g.AddInfoProto( - gossip.MakeNodeIDKey(repl.NodeID), - newNodeDesc(repl.NodeID), - gossip.NodeDescriptorTTL, - )) - } +func (c *testRangefeedClient) RangeFeed( + ctx context.Context, args *roachpb.RangeFeedRequest, opts ...grpc.CallOption, +) (roachpb.Internal_RangeFeedClient, error) { + defer c.count() - ctrl := gomock.NewController(t) - transport := NewMockTransport(ctrl) - rangeDB := rangecache.NewMockRangeDescriptorDB(ctrl) + if c.muxRangeFeedEnabled { + panic(errors.AssertionFailedf("unexpected call to RangeFeed")) + } + return c.InternalClient.RangeFeed(ctx, args, opts...) +} - // We start off with a cached lease on r1. - cachedLease := roachpb.Lease{ - Replica: desc.InternalReplicas[0], - Sequence: 1, - } +func (c *testRangefeedClient) RangeFeedStream( + ctx context.Context, opts ...grpc.CallOption, +) (roachpb.Internal_MuxRangeFeedClient, error) { + defer c.count() + + if !c.muxRangeFeedEnabled { + panic(errors.AssertionFailedf("unexpected call to RangeFeedStream")) + } + return c.InternalClient.MuxRangeFeed(ctx, opts...) +} + +type internalClientCounts struct { + syncutil.Mutex + counts map[roachpb.InternalClient]int +} + +func (c *internalClientCounts) Inc(ic roachpb.InternalClient) { + if c == nil { + return + } + c.Lock() + defer c.Unlock() + c.counts[ic]++ +} + +type countConnectionsTransport struct { + wrapped kvcoord.Transport + counts *internalClientCounts + rfStreamEnabled bool +} + +func (c *countConnectionsTransport) IsExhausted() bool { + return c.wrapped.IsExhausted() +} + +func (c *countConnectionsTransport) SendNext( + ctx context.Context, request roachpb.BatchRequest, +) (*roachpb.BatchResponse, error) { + return c.wrapped.SendNext(ctx, request) +} + +type testFeedCtxKey struct{} + +func (c *countConnectionsTransport) NextInternalClient( + ctx context.Context, +) (context.Context, roachpb.InternalClient, error) { + ctx, client, err := c.wrapped.NextInternalClient(ctx) + if err != nil { + return ctx, client, err + } + + // Count rangefeed calls but only for feeds started by this test. + countFn := func() {} + if ctx.Value(testFeedCtxKey{}) != nil { + countFn = func() { + c.counts.Inc(client) + } + } + + tc := &testRangefeedClient{ + InternalClient: client, + muxRangeFeedEnabled: c.rfStreamEnabled, + count: countFn, + } + return ctx, tc, nil +} + +func (c *countConnectionsTransport) NextReplica() roachpb.ReplicaDescriptor { + return c.wrapped.NextReplica() +} + +func (c *countConnectionsTransport) SkipReplica() { + c.wrapped.SkipReplica() +} + +func (c *countConnectionsTransport) MoveToFront(descriptor roachpb.ReplicaDescriptor) { + c.wrapped.MoveToFront(descriptor) +} + +func (c *countConnectionsTransport) Release() { + c.wrapped.Release() +} + +var _ kvcoord.Transport = (*countConnectionsTransport)(nil) + +func makeTransportFactory( + rfStreamEnabled bool, counts *internalClientCounts, +) kvcoord.TransportFactory { + return func( + options kvcoord.SendOptions, + dialer *nodedialer.Dialer, + slice kvcoord.ReplicaSlice, + ) (kvcoord.Transport, error) { + transport, err := kvcoord.GRPCTransportFactory(options, dialer, slice) + if err != nil { + return nil, err + } + countingTransport := &countConnectionsTransport{ + wrapped: transport, + rfStreamEnabled: rfStreamEnabled, + counts: counts, + } + return countingTransport, nil + } +} - // All nodes return the specified error code. We expect the range feed to - // keep trying all replicas in sequence regardless of error. - for _, repl := range desc.InternalReplicas { - transport.EXPECT().IsExhausted().Return(false) - transport.EXPECT().NextReplica().Return(repl) - transport.EXPECT().NextInternalClient(gomock.Any()).Return( - ctx, nil, grpcstatus.Error(spec.errorCode, "")) +// rangeFeed is a helper to execute rangefeed. We are not using rangefeed library +// here because of circular dependencies. +func rangeFeed( + dsI interface{}, + sp roachpb.Span, + startFrom hlc.Timestamp, + onValue func(event *roachpb.RangeFeedEvent), + onError func(err error), +) func() error { + ds := dsI.(*kvcoord.DistSender) + events := make(chan *roachpb.RangeFeedEvent) + ctx, cancel := context.WithCancel(context.WithValue(context.Background(), testFeedCtxKey{}, struct{}{})) + withCancel := func(err error) error { + cancel() + onError(err) + return err + } + + g := ctxgroup.WithContext(ctx) + g.GoCtx(func(ctx context.Context) (err error) { + return withCancel(ds.RangeFeed(ctx, []roachpb.Span{sp}, startFrom, false, events)) + }) + g.GoCtx(func(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return withCancel(ctx.Err()) + case ev := <-events: + onValue(ev) } - transport.EXPECT().IsExhausted().Return(true) - transport.EXPECT().Release() - - // Once all replicas have failed, it should try to refresh the lease using - // the range cache. We let this succeed once. - rangeDB.EXPECT().FirstRange().Return(&desc, nil) - - // It then tries the replicas again. This time we just report the - // transport as exhausted immediately. - transport.EXPECT().IsExhausted().Return(true) - transport.EXPECT().Release() - - // This invalidates the cache yet again. This time we error. - rangeDB.EXPECT().FirstRange().Return(nil, grpcstatus.Error(spec.errorCode, "")) - - // If we expect a range lookup retry, allow the retry to succeed by - // returning a range descriptor and a client that immediately - // cancels the context and closes the range feed stream. - if spec.expectRetry { - rangeDB.EXPECT().FirstRange().Return(&desc, nil) - stream := roachpb.NewMockInternal_RangeFeedClient(ctrl) - stream.EXPECT().Recv().Do(cancel).Return(nil, io.EOF) - client := roachpb.NewMockInternalClient(ctrl) - client.EXPECT().RangeFeed(gomock.Any(), gomock.Any()).Return(stream, nil) - transport.EXPECT().IsExhausted().Return(false) - transport.EXPECT().NextReplica().Return(desc.InternalReplicas[0]) - transport.EXPECT().NextInternalClient(gomock.Any()).Return(ctx, client, nil) - transport.EXPECT().Release() + } + }) + + return func() error { + cancel() + if err := g.Wait(); !errors.Is(err, context.Canceled) { + return err + } + return nil + } +} + +// rangeFeedOnError returns a error channel and an onError handler for rangefeed call. +func rangeFeedOnError() (chan error, func(err error)) { + errCh := make(chan error, 1) + return errCh, func(err error) { + select { + case errCh <- err: + default: + } + } +} + +// observeNValues returns on value handler which expects to see N rangefeed values, +// along with the channel which gets closed when requisite count of events has been seen. +func observeNValues(n int) (chan struct{}, func(ev *roachpb.RangeFeedEvent)) { + var count = struct { + syncutil.Mutex + c int + }{} + allSeen := make(chan struct{}) + return allSeen, func(ev *roachpb.RangeFeedEvent) { + if ev.Val != nil { + count.Lock() + count.c++ + if count.c == n { + close(allSeen) } + count.Unlock() + } + } +} - ds := NewDistSender(DistSenderConfig{ - AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), - Clock: clock, - NodeDescs: g, - RPCRetryOptions: &retry.Options{MaxRetries: 10}, - RPCContext: rpcContext, - TestingKnobs: ClientTestingKnobs{ - TransportFactory: func(SendOptions, *nodedialer.Dialer, ReplicaSlice) (Transport, error) { - return transport, nil +func TestBiDirectionalRangefeedNotUsedUntilUpgradeFinalilzed(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + startServerAtVer := func(ver roachpb.Version) (*testcluster.TestCluster, func()) { + st := cluster.MakeTestingClusterSettingsWithVersions(ver, ver, true) + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, // Turn off replication queues. + + ServerArgs: base.TestServerArgs{ + Settings: st, + Knobs: base.TestingKnobs{ + KVClient: &kvcoord.ClientTestingKnobs{ + TransportFactory: makeTransportFactory(false, nil), + }, + + Server: &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: 1, + BinaryVersionOverride: ver, }, }, - RangeDescriptorDB: rangeDB, - NodeDialer: nodedialer.New(rpcContext, gossip.AddressResolver(g)), - Settings: cluster.MakeTestingClusterSettings(), - }) - ds.rangeCache.Insert(ctx, roachpb.RangeInfo{ - Desc: desc, - Lease: cachedLease, - }) - - err := ds.RangeFeed(ctx, []roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}, hlc.Timestamp{}, false, nil) - require.Error(t, err) + }, }) + return tc, func() { tc.Stopper().Stop(ctx) } + } + + // Create a small table; run rangefeed. The transport factory we injected above verifies + // that we use the old rangefeed implementation. + runRangeFeed := func(tc *testcluster.TestCluster) { + ts := tc.Server(0) + + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + startTime := ts.Clock().Now() + + sqlDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) + sqlDB.Exec(t, `CREATE TABLE foo (key INT PRIMARY KEY)`) + sqlDB.Exec(t, `INSERT INTO foo (key) SELECT * FROM generate_series(1, 1000)`) + + fooDesc := desctestutils.TestingGetPublicTableDescriptor( + ts.DB(), keys.SystemSQLCodec, "defaultdb", "foo") + fooSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec) + + allSeen, onValue := observeNValues(1000) + errCh, onError := rangeFeedOnError() + closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, onValue, onError) + select { + case <-allSeen: + require.NoError(t, closeFeed()) + case err := <-errCh: + require.NoError(t, err) + } + } + + t.Run("rangefeed-stream-disabled-prior-to-version-upgrade", func(t *testing.T) { + noRfStreamVer := clusterversion.ByKey(clusterversion.RangefeedUseOneStreamPerNode - 1) + tc, cleanup := startServerAtVer(noRfStreamVer) + defer cleanup() + runRangeFeed(tc) + }) + + t.Run("rangefeed-stream-disabled-via-environment", func(t *testing.T) { + defer kvcoord.TestingSetEnableStreamingRangefeed(false)() + // Even though we could use rangefeed stream, it's disable via kill switch. + rfStreamVer := clusterversion.ByKey(clusterversion.RangefeedUseOneStreamPerNode) + tc, cleanup := startServerAtVer(rfStreamVer) + defer cleanup() + runRangeFeed(tc) + }) +} + +func TestMuxRangeFeedConnectsToNodeOnce(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + connCounts := &internalClientCounts{counts: make(map[roachpb.InternalClient]int)} + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, // Turn off replication queues. + + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + KVClient: &kvcoord.ClientTestingKnobs{ + TransportFactory: makeTransportFactory(true, connCounts), + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + ts := tc.Server(0) + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + startTime := ts.Clock().Now() + + // Create a table, and split it so that we have multiple ranges, distributed across + // test cluster nodes. + sqlDB.Exec(t, ` +SET CLUSTER SETTING kv.rangefeed.enabled = true; +ALTER DATABASE defaultdb CONFIGURE ZONE USING num_replicas = 1; +CREATE TABLE foo (key INT PRIMARY KEY); +INSERT INTO foo (key) SELECT * FROM generate_series(1, 1000); +ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(100, 900, 100)); +`) + + for i := 100; i <= 900; i += 100 { + storeID := 1 + i%3 + rowID := i + sqlDB.Exec(t, "ALTER TABLE foo EXPERIMENTAL_RELOCATE VALUES (ARRAY[$1], $2)", storeID, rowID) + } + + fooDesc := desctestutils.TestingGetPublicTableDescriptor( + ts.DB(), keys.SystemSQLCodec, "defaultdb", "foo") + fooSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec) + + allSeen, onValue := observeNValues(1000) + errCh, onError := rangeFeedOnError() + closeFeed := rangeFeed(ts.DistSenderI(), fooSpan, startTime, onValue, onError) + select { + case <-allSeen: + require.NoError(t, closeFeed()) + case err := <-errCh: + require.NoError(t, err) + } + + // Verify we connected to each node once. + connCounts.Lock() + defer connCounts.Unlock() + for _, c := range connCounts.counts { + require.Equal(t, 1, c) } } diff --git a/pkg/kv/kvclient/kvcoord/send_test.go b/pkg/kv/kvclient/kvcoord/send_test.go index 20fada847ebf..bcd9891abc0d 100644 --- a/pkg/kv/kvclient/kvcoord/send_test.go +++ b/pkg/kv/kvclient/kvcoord/send_test.go @@ -57,6 +57,10 @@ func (n Node) RangeFeed(_ *roachpb.RangeFeedRequest, _ roachpb.Internal_RangeFee panic("unimplemented") } +func (n Node) MuxRangeFeed(server roachpb.Internal_MuxRangeFeedServer) error { + panic("unimplemented") +} + func (n Node) GossipSubscription( _ *roachpb.GossipSubscriptionRequest, _ roachpb.Internal_GossipSubscriptionServer, ) error { diff --git a/pkg/kv/kvclient/kvcoord/transport_test.go b/pkg/kv/kvclient/kvcoord/transport_test.go index 6e8509365170..2253b8264600 100644 --- a/pkg/kv/kvclient/kvcoord/transport_test.go +++ b/pkg/kv/kvclient/kvcoord/transport_test.go @@ -178,6 +178,12 @@ func (m *mockInternalClient) RangeFeed( return nil, fmt.Errorf("unsupported RangeFeed call") } +func (m *mockInternalClient) MuxRangeFeed( + ctx context.Context, opts ...grpc.CallOption, +) (roachpb.Internal_MuxRangeFeedClient, error) { + return nil, fmt.Errorf("unsupported RangeFeedStream call") +} + // GossipSubscription is part of the roachpb.InternalClient interface. func (m *mockInternalClient) GossipSubscription( ctx context.Context, args *roachpb.GossipSubscriptionRequest, _ ...grpc.CallOption, diff --git a/pkg/kv/kvserver/kvserverbase/base.go b/pkg/kv/kvserver/kvserverbase/base.go index 11fa732c79f7..8481023f218d 100644 --- a/pkg/kv/kvserver/kvserverbase/base.go +++ b/pkg/kv/kvserver/kvserverbase/base.go @@ -124,7 +124,7 @@ type ReplicaResponseFilter func(context.Context, roachpb.BatchRequest, *roachpb. // ReplicaRangefeedFilter is used in unit tests to modify the request, inject // responses, or return errors from rangefeeds. type ReplicaRangefeedFilter func( - args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) *roachpb.Error // ContainsKey returns whether this range contains the specified key. diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 9aca30b5ddaa..657fa35b699a 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -72,7 +72,7 @@ var RangefeedTBIEnabled = settings.RegisterBoolSetting( // support for concurrent calls to Send. Note that the default implementation of // grpc.Stream is not safe for concurrent calls to Send. type lockedRangefeedStream struct { - wrapped roachpb.Internal_RangeFeedServer + wrapped roachpb.RangeFeedEventSink sendMu syncutil.Mutex } @@ -143,15 +143,13 @@ func (tp *rangefeedTxnPusher) ResolveIntents( // complete. The provided ConcurrentRequestLimiter is used to limit the number // of rangefeeds using catch-up iterators at the same time. func (r *Replica) RangeFeed( - args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) *roachpb.Error { return r.rangeFeedWithRangeID(r.RangeID, args, stream) } func (r *Replica) rangeFeedWithRangeID( - _forStacks roachpb.RangeID, - args *roachpb.RangeFeedRequest, - stream roachpb.Internal_RangeFeedServer, + _forStacks roachpb.RangeID, args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) *roachpb.Error { if !r.isRangefeedEnabled() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) { return roachpb.NewErrorf("rangefeeds require the kv.rangefeed.enabled setting. See %s", diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index e727704211a7..3c68c3f28952 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2990,7 +2990,7 @@ func (s *Store) Descriptor(ctx context.Context, useCached bool) (*roachpb.StoreD // the provided stream and returns with an optional error when the rangefeed is // complete. func (s *Store) RangeFeed( - args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) *roachpb.Error { if filter := s.TestingKnobs().TestingRangefeedFilter; filter != nil { diff --git a/pkg/kv/kvserver/stores.go b/pkg/kv/kvserver/stores.go index 9d4bf5f5c9e4..f09d40184bec 100644 --- a/pkg/kv/kvserver/stores.go +++ b/pkg/kv/kvserver/stores.go @@ -199,7 +199,7 @@ func (ls *Stores) Send( // the provided stream and returns with an optional error when the rangefeed is // complete. func (ls *Stores) RangeFeed( - args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) *roachpb.Error { ctx := stream.Context() if args.RangeID == 0 { diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 14a3aee00368..1ece8b30ca2b 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -11,6 +11,7 @@ package roachpb import ( + "context" "fmt" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" @@ -1696,3 +1697,9 @@ const ( // with the SpecificTenantOverrides precedence.. AllTenantsOverrides ) + +// RangeFeedEventSink is an interface for sending a single rangefeed event. +type RangeFeedEventSink interface { + Context() context.Context + Send(*RangeFeedEvent) error +} diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index b18423086599..6893d5a6a443 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -2557,6 +2557,9 @@ message RangeFeedRequest { // AdmissionHeader is used only at the start of the range feed stream, since // the initial catch-up scan be expensive. AdmissionHeader admission_header = 4 [(gogoproto.nullable) = false]; + + // RequestID is set by the client issuing MuxRangeFeed requests. + int64 request_id = 5 [(gogoproto.customname) = "RequestID"]; } // RangeFeedValue is a variant of RangeFeedEvent that represents an update to @@ -2621,6 +2624,14 @@ message RangeFeedEvent { RangeFeedSSTable sst = 4 [(gogoproto.customname) = "SST"]; } +// MuxRangeFeedResponse is a response generated by RangeFeedStream RPC. It tags +// the underlying RangeFeedEvent with the ID of the range that produced this event. +message MuxRangeFeedResponse { + RangeFeedEvent event = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; + int32 range_id = 2 [(gogoproto.customname) = "RangeID", (gogoproto.casttype) = "RangeID"]; + // Server echoes back request_id set by the client. + int64 request_id = 3 [(gogoproto.customname) = "RequestID"]; +} // ResetQuorumRequest makes a range that is unavailable due to lost quorum // available again, at the cost of losing all of the data in the range. Any @@ -2830,7 +2841,10 @@ message JoinNodeResponse { service Internal { rpc Batch (BatchRequest) returns (BatchResponse) {} rpc RangeLookup (RangeLookupRequest) returns (RangeLookupResponse) {} + // RangeFeed RPC is deprecated. Use MuxRangeFeed instead. + // To be removed at version 22.2 rpc RangeFeed (RangeFeedRequest) returns (stream RangeFeedEvent) {} + rpc MuxRangeFeed (stream RangeFeedRequest) returns (stream MuxRangeFeedResponse) {} rpc GossipSubscription (GossipSubscriptionRequest) returns (stream GossipSubscriptionEvent) {} rpc ResetQuorum (ResetQuorumRequest) returns (ResetQuorumResponse) {} diff --git a/pkg/roachpb/mocks_generated.go b/pkg/roachpb/mocks_generated.go index 9aa7e547d2e7..54ba15b6b788 100644 --- a/pkg/roachpb/mocks_generated.go +++ b/pkg/roachpb/mocks_generated.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/cockroachdb/cockroach/pkg/roachpb (interfaces: InternalClient,Internal_RangeFeedClient) +// Source: github.com/cockroachdb/cockroach/pkg/roachpb (interfaces: InternalClient,Internal_RangeFeedClient,Internal_MuxRangeFeedClient,Internal_MuxRangeFeedServer) // Package roachpb is a generated GoMock package. package roachpb @@ -136,6 +136,26 @@ func (mr *MockInternalClientMockRecorder) Join(arg0, arg1 interface{}, arg2 ...i return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Join", reflect.TypeOf((*MockInternalClient)(nil).Join), varargs...) } +// MuxRangeFeed mocks base method. +func (m *MockInternalClient) MuxRangeFeed(arg0 context.Context, arg1 ...grpc.CallOption) (Internal_MuxRangeFeedClient, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "MuxRangeFeed", varargs...) + ret0, _ := ret[0].(Internal_MuxRangeFeedClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// MuxRangeFeed indicates an expected call of MuxRangeFeed. +func (mr *MockInternalClientMockRecorder) MuxRangeFeed(arg0 interface{}, arg1 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MuxRangeFeed", reflect.TypeOf((*MockInternalClient)(nil).MuxRangeFeed), varargs...) +} + // RangeFeed mocks base method. func (m *MockInternalClient) RangeFeed(arg0 context.Context, arg1 *RangeFeedRequest, arg2 ...grpc.CallOption) (Internal_RangeFeedClient, error) { m.ctrl.T.Helper() @@ -398,3 +418,274 @@ func (mr *MockInternal_RangeFeedClientMockRecorder) Trailer() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockInternal_RangeFeedClient)(nil).Trailer)) } + +// MockInternal_MuxRangeFeedClient is a mock of Internal_MuxRangeFeedClient interface. +type MockInternal_MuxRangeFeedClient struct { + ctrl *gomock.Controller + recorder *MockInternal_MuxRangeFeedClientMockRecorder +} + +// MockInternal_MuxRangeFeedClientMockRecorder is the mock recorder for MockInternal_MuxRangeFeedClient. +type MockInternal_MuxRangeFeedClientMockRecorder struct { + mock *MockInternal_MuxRangeFeedClient +} + +// NewMockInternal_MuxRangeFeedClient creates a new mock instance. +func NewMockInternal_MuxRangeFeedClient(ctrl *gomock.Controller) *MockInternal_MuxRangeFeedClient { + mock := &MockInternal_MuxRangeFeedClient{ctrl: ctrl} + mock.recorder = &MockInternal_MuxRangeFeedClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockInternal_MuxRangeFeedClient) EXPECT() *MockInternal_MuxRangeFeedClientMockRecorder { + return m.recorder +} + +// CloseSend mocks base method. +func (m *MockInternal_MuxRangeFeedClient) CloseSend() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CloseSend") + ret0, _ := ret[0].(error) + return ret0 +} + +// CloseSend indicates an expected call of CloseSend. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) CloseSend() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseSend", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).CloseSend)) +} + +// Context mocks base method. +func (m *MockInternal_MuxRangeFeedClient) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).Context)) +} + +// Header mocks base method. +func (m *MockInternal_MuxRangeFeedClient) Header() (metadata.MD, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Header") + ret0, _ := ret[0].(metadata.MD) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Header indicates an expected call of Header. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) Header() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Header", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).Header)) +} + +// Recv mocks base method. +func (m *MockInternal_MuxRangeFeedClient) Recv() (*MuxRangeFeedResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Recv") + ret0, _ := ret[0].(*MuxRangeFeedResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Recv indicates an expected call of Recv. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) Recv() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).Recv)) +} + +// RecvMsg mocks base method. +func (m *MockInternal_MuxRangeFeedClient) RecvMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RecvMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecvMsg indicates an expected call of RecvMsg. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) RecvMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).RecvMsg), arg0) +} + +// Send mocks base method. +func (m *MockInternal_MuxRangeFeedClient) Send(arg0 *RangeFeedRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Send", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Send indicates an expected call of Send. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) Send(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).Send), arg0) +} + +// SendMsg mocks base method. +func (m *MockInternal_MuxRangeFeedClient) SendMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendMsg indicates an expected call of SendMsg. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) SendMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).SendMsg), arg0) +} + +// Trailer mocks base method. +func (m *MockInternal_MuxRangeFeedClient) Trailer() metadata.MD { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Trailer") + ret0, _ := ret[0].(metadata.MD) + return ret0 +} + +// Trailer indicates an expected call of Trailer. +func (mr *MockInternal_MuxRangeFeedClientMockRecorder) Trailer() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockInternal_MuxRangeFeedClient)(nil).Trailer)) +} + +// MockInternal_MuxRangeFeedServer is a mock of Internal_MuxRangeFeedServer interface. +type MockInternal_MuxRangeFeedServer struct { + ctrl *gomock.Controller + recorder *MockInternal_MuxRangeFeedServerMockRecorder +} + +// MockInternal_MuxRangeFeedServerMockRecorder is the mock recorder for MockInternal_MuxRangeFeedServer. +type MockInternal_MuxRangeFeedServerMockRecorder struct { + mock *MockInternal_MuxRangeFeedServer +} + +// NewMockInternal_MuxRangeFeedServer creates a new mock instance. +func NewMockInternal_MuxRangeFeedServer(ctrl *gomock.Controller) *MockInternal_MuxRangeFeedServer { + mock := &MockInternal_MuxRangeFeedServer{ctrl: ctrl} + mock.recorder = &MockInternal_MuxRangeFeedServerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockInternal_MuxRangeFeedServer) EXPECT() *MockInternal_MuxRangeFeedServerMockRecorder { + return m.recorder +} + +// Context mocks base method. +func (m *MockInternal_MuxRangeFeedServer) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context. +func (mr *MockInternal_MuxRangeFeedServerMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockInternal_MuxRangeFeedServer)(nil).Context)) +} + +// Recv mocks base method. +func (m *MockInternal_MuxRangeFeedServer) Recv() (*RangeFeedRequest, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Recv") + ret0, _ := ret[0].(*RangeFeedRequest) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Recv indicates an expected call of Recv. +func (mr *MockInternal_MuxRangeFeedServerMockRecorder) Recv() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockInternal_MuxRangeFeedServer)(nil).Recv)) +} + +// RecvMsg mocks base method. +func (m *MockInternal_MuxRangeFeedServer) RecvMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RecvMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecvMsg indicates an expected call of RecvMsg. +func (mr *MockInternal_MuxRangeFeedServerMockRecorder) RecvMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockInternal_MuxRangeFeedServer)(nil).RecvMsg), arg0) +} + +// Send mocks base method. +func (m *MockInternal_MuxRangeFeedServer) Send(arg0 *MuxRangeFeedResponse) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Send", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Send indicates an expected call of Send. +func (mr *MockInternal_MuxRangeFeedServerMockRecorder) Send(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockInternal_MuxRangeFeedServer)(nil).Send), arg0) +} + +// SendHeader mocks base method. +func (m *MockInternal_MuxRangeFeedServer) SendHeader(arg0 metadata.MD) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendHeader", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendHeader indicates an expected call of SendHeader. +func (mr *MockInternal_MuxRangeFeedServerMockRecorder) SendHeader(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendHeader", reflect.TypeOf((*MockInternal_MuxRangeFeedServer)(nil).SendHeader), arg0) +} + +// SendMsg mocks base method. +func (m *MockInternal_MuxRangeFeedServer) SendMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendMsg indicates an expected call of SendMsg. +func (mr *MockInternal_MuxRangeFeedServerMockRecorder) SendMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockInternal_MuxRangeFeedServer)(nil).SendMsg), arg0) +} + +// SetHeader mocks base method. +func (m *MockInternal_MuxRangeFeedServer) SetHeader(arg0 metadata.MD) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetHeader", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetHeader indicates an expected call of SetHeader. +func (mr *MockInternal_MuxRangeFeedServerMockRecorder) SetHeader(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHeader", reflect.TypeOf((*MockInternal_MuxRangeFeedServer)(nil).SetHeader), arg0) +} + +// SetTrailer mocks base method. +func (m *MockInternal_MuxRangeFeedServer) SetTrailer(arg0 metadata.MD) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetTrailer", arg0) +} + +// SetTrailer indicates an expected call of SetTrailer. +func (mr *MockInternal_MuxRangeFeedServerMockRecorder) SetTrailer(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTrailer", reflect.TypeOf((*MockInternal_MuxRangeFeedServer)(nil).SetTrailer), arg0) +} diff --git a/pkg/rpc/auth_tenant.go b/pkg/rpc/auth_tenant.go index 56078743ebad..c11b3da0e6a8 100644 --- a/pkg/rpc/auth_tenant.go +++ b/pkg/rpc/auth_tenant.go @@ -52,9 +52,8 @@ func (a tenantAuthorizer) authorize( case "/cockroach.roachpb.Internal/RangeLookup": return a.authRangeLookup(tenID, req.(*roachpb.RangeLookupRequest)) - case "/cockroach.roachpb.Internal/RangeFeed": + case "/cockroach.roachpb.Internal/RangeFeed", "/cockroach.roachpb.Internal/MuxRangeFeed": return a.authRangeFeed(tenID, req.(*roachpb.RangeFeedRequest)) - case "/cockroach.roachpb.Internal/GossipSubscription": return a.authGossipSubscription(tenID, req.(*roachpb.GossipSubscriptionRequest)) diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 19c2a15548eb..b3fd50e26063 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -627,30 +627,40 @@ type respStreamClientAdapter struct { errC chan error } -func makeRespStreamClientAdapter(ctx context.Context) respStreamClientAdapter { - return respStreamClientAdapter{ - ctx: ctx, - respC: make(chan interface{}, 128), - errC: make(chan error, 1), - } +type localStream struct { + ctx context.Context } // grpc.ClientStream methods. -func (respStreamClientAdapter) Header() (metadata.MD, error) { panic("unimplemented") } -func (respStreamClientAdapter) Trailer() metadata.MD { panic("unimplemented") } -func (respStreamClientAdapter) CloseSend() error { panic("unimplemented") } +func (localStream) Header() (metadata.MD, error) { panic("unimplemented") } +func (localStream) Trailer() metadata.MD { panic("unimplemented") } +func (localStream) CloseSend() error { panic("unimplemented") } // grpc.ServerStream methods. -func (respStreamClientAdapter) SetHeader(metadata.MD) error { panic("unimplemented") } -func (respStreamClientAdapter) SendHeader(metadata.MD) error { panic("unimplemented") } -func (respStreamClientAdapter) SetTrailer(metadata.MD) { panic("unimplemented") } +func (localStream) SetHeader(metadata.MD) error { panic("unimplemented") } +func (localStream) SendHeader(metadata.MD) error { panic("unimplemented") } +func (localStream) SetTrailer(metadata.MD) { panic("unimplemented") } // grpc.Stream methods. -func (a respStreamClientAdapter) Context() context.Context { return a.ctx } -func (respStreamClientAdapter) SendMsg(m interface{}) error { panic("unimplemented") } -func (respStreamClientAdapter) RecvMsg(m interface{}) error { panic("unimplemented") } +func (a localStream) Context() context.Context { return a.ctx } +func (localStream) SendMsg(m interface{}) error { panic("unimplemented") } +func (localStream) RecvMsg(m interface{}) error { panic("unimplemented") } + +type channelStream struct { + localStream + respC chan interface{} + errC chan error +} -func (a respStreamClientAdapter) recvInternal() (interface{}, error) { +func makeChannelStream(ctx context.Context) channelStream { + return channelStream{ + localStream: localStream{ctx: ctx}, + respC: make(chan interface{}, 128), + errC: make(chan error, 1), + } +} + +func (a channelStream) recvInternal() (interface{}, error) { // Prioritize respC. Both channels are buffered and the only guarantee we // have is that once an error is sent on errC no other events will be sent // on respC again. @@ -665,10 +675,12 @@ func (a respStreamClientAdapter) recvInternal() (interface{}, error) { default: return nil, err } + case <-a.ctx.Done(): + return nil, a.ctx.Err() } } -func (a respStreamClientAdapter) sendInternal(e interface{}) error { +func (a channelStream) sendInternal(e interface{}) error { select { case a.respC <- e: return nil @@ -678,7 +690,7 @@ func (a respStreamClientAdapter) sendInternal(e interface{}) error { } type rangeFeedClientAdapter struct { - respStreamClientAdapter + channelStream } // roachpb.Internal_RangeFeedServer methods. @@ -705,7 +717,7 @@ func (a internalClientAdapter) RangeFeed( ctx, cancel := context.WithCancel(ctx) ctx, sp := tracing.ChildSpan(ctx, "/cockroach.roachpb.Internal/RangeFeed") rfAdapter := rangeFeedClientAdapter{ - respStreamClientAdapter: makeRespStreamClientAdapter(ctx), + channelStream: makeChannelStream(ctx), } // Mark this as originating locally. @@ -723,8 +735,79 @@ func (a internalClientAdapter) RangeFeed( return rfAdapter, nil } +type muxRangeFeedClientAdapter struct { + localStream + sendHandler func(interface{}) error + recvHandler func() (interface{}, error) +} + +func (a muxRangeFeedClientAdapter) Send(r *roachpb.RangeFeedRequest) error { + return a.sendHandler(r) +} + +func (a muxRangeFeedClientAdapter) Recv() (*roachpb.MuxRangeFeedResponse, error) { + e, err := a.recvHandler() + if err != nil { + return nil, err + } + return e.(*roachpb.MuxRangeFeedResponse), nil +} + +var _ roachpb.Internal_MuxRangeFeedClient = muxRangeFeedClientAdapter{} + +type muxRangeFeedServerAdapter struct { + localStream + sendHandler func(interface{}) error + recvHandler func() (interface{}, error) +} + +func (a muxRangeFeedServerAdapter) Send(event *roachpb.MuxRangeFeedResponse) error { + return a.sendHandler(event) +} + +func (a muxRangeFeedServerAdapter) Recv() (*roachpb.RangeFeedRequest, error) { + e, err := a.recvHandler() + if err != nil { + return nil, err + } + return e.(*roachpb.RangeFeedRequest), nil +} + +var _ roachpb.Internal_MuxRangeFeedServer = muxRangeFeedServerAdapter{} + +func (a internalClientAdapter) MuxRangeFeed( + ctx context.Context, opts ...grpc.CallOption, +) (roachpb.Internal_MuxRangeFeedClient, error) { + ctx, cancel := context.WithCancel(ctx) + + serverStream := makeChannelStream(ctx) + clientStream := makeChannelStream(ctx) + + go func() { + defer cancel() + adapter := muxRangeFeedServerAdapter{ + localStream: localStream{ctx: ctx}, + sendHandler: serverStream.sendInternal, + recvHandler: clientStream.recvInternal, + } + err := a.server.MuxRangeFeed(adapter) + if err == nil { + err = io.EOF + } + serverStream.errC <- err + }() + + adapter := muxRangeFeedClientAdapter{ + localStream: localStream{ctx: ctx}, + sendHandler: clientStream.sendInternal, + recvHandler: serverStream.recvInternal, + } + // rfAdapter.client.Send() -> rfAdapter.server.respC + return adapter, nil +} + type gossipSubscriptionClientAdapter struct { - respStreamClientAdapter + channelStream } // roachpb.Internal_GossipSubscriptionServer methods. @@ -751,7 +834,7 @@ func (a internalClientAdapter) GossipSubscription( ctx, cancel := context.WithCancel(ctx) ctx, sp := tracing.ChildSpan(ctx, "/cockroach.roachpb.Internal/GossipSubscription") gsAdapter := gossipSubscriptionClientAdapter{ - respStreamClientAdapter: makeRespStreamClientAdapter(ctx), + channelStream: makeChannelStream(ctx), } go func() { @@ -768,7 +851,7 @@ func (a internalClientAdapter) GossipSubscription( } type tenantSettingsClientAdapter struct { - respStreamClientAdapter + channelStream } // roachpb.Internal_TenantSettingsServer methods. @@ -794,7 +877,7 @@ func (a internalClientAdapter) TenantSettings( ) (roachpb.Internal_TenantSettingsClient, error) { ctx, cancel := context.WithCancel(ctx) gsAdapter := tenantSettingsClientAdapter{ - respStreamClientAdapter: makeRespStreamClientAdapter(ctx), + channelStream: makeChannelStream(ctx), } go func() { diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index 9807247e14d4..a1499346c0bf 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -240,6 +240,10 @@ func (*internalServer) RangeFeed( panic("unimplemented") } +func (*internalServer) RangeFeedStream(roachpb.Internal_RangeFeedStreamServer) error { + panic("unimplemented") +} + func (*internalServer) GossipSubscription( *roachpb.GossipSubscriptionRequest, roachpb.Internal_GossipSubscriptionServer, ) error { diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 93a7f8bc5d6c..58acaaa38670 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -188,6 +188,7 @@ go_library( "//pkg/util", "//pkg/util/admission", "//pkg/util/contextutil", + "//pkg/util/ctxgroup", "//pkg/util/envutil", "//pkg/util/errorutil", "//pkg/util/goschedstats", diff --git a/pkg/server/node.go b/pkg/server/node.go index c766ad0ac9f5..a72fcc4d7a4d 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -43,6 +43,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/admission" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -1153,6 +1154,12 @@ func (n *Node) RangeLookup( // RangeFeed implements the roachpb.InternalServer interface. func (n *Node) RangeFeed( args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, +) error { + return n.singleRangeFeed(args, stream) +} + +func (n *Node) singleRangeFeed( + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) error { pErr := n.stores.RangeFeed(args, stream) if pErr != nil { @@ -1165,6 +1172,58 @@ func (n *Node) RangeFeed( return nil } +type setRangeIDEventSink struct { + ctx context.Context + rangeID roachpb.RangeID + requestID int64 + wrapped roachpb.Internal_MuxRangeFeedServer +} + +func (s *setRangeIDEventSink) Context() context.Context { + return s.ctx +} + +func (s *setRangeIDEventSink) Send(event *roachpb.RangeFeedEvent) error { + response := &roachpb.MuxRangeFeedResponse{ + RangeFeedEvent: *event, + RangeID: s.rangeID, + RequestID: s.requestID, + } + return s.wrapped.Send(response) +} + +var _ roachpb.RangeFeedEventSink = (*setRangeIDEventSink)(nil) + +func (n *Node) asyncRangeFeed( + args roachpb.RangeFeedRequest, stream roachpb.Internal_MuxRangeFeedServer, +) func(ctx context.Context) error { + return func(ctx context.Context) error { + sink := setRangeIDEventSink{ + ctx: ctx, + rangeID: args.RangeID, + requestID: args.RequestID, + wrapped: stream, + } + return n.singleRangeFeed(&args, &sink) + } +} + +// MuxRangeFeed implements the roachpb.InternalServer interface. +func (n *Node) MuxRangeFeed(stream roachpb.Internal_MuxRangeFeedServer) error { + ctx, cancelFeeds := context.WithCancel(stream.Context()) + defer cancelFeeds() + rfGrp := ctxgroup.WithContext(ctx) + + for { + req, err := stream.Recv() + if err != nil { + cancelFeeds() + return errors.CombineErrors(err, rfGrp.Wait()) + } + rfGrp.GoCtx(n.asyncRangeFeed(*req, stream)) + } +} + // ResetQuorum implements the roachpb.InternalServer interface. func (n *Node) ResetQuorum( ctx context.Context, req *roachpb.ResetQuorumRequest,