From 8029f27118ccf6fa079a029e03055b10d46c852c Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Thu, 27 Apr 2023 01:16:29 -0400 Subject: [PATCH] kvserver,kvflowcontrol: integrate flow control Part of #95563. This PR integrates various kvflowcontrol components into the critical path for replication traffic. It does so by introducing two "integration interfaces" in the kvserver package to intercept various points of a replica's lifecycle, using it to manage the underlying replication streams and flow tokens. The integration is mediated through two cluster settings: - kvadmission.flow_control.enabled This is a top-level kill-switch to revert to pre-kvflowcontrol behavior where follower writes unilaterally deducted IO tokens without blocking. - kvadmission.flow_control.mode It can take on one of three settings, each exercising the flow control machinery to varying degrees. - apply_to_elastic Only applies admission delays to elastic traffic. - apply_to_all Applies admission delays to {regular,elastic} traffic. When the mode is changed, we simply admit all waiting requests. This risks possibly over-admitting work, but that's ok -- we assume these mode changes are rare events and done under supervision. These settings are hooked into in the kvadmission and kvflowcontroller packages. As for the actual integration interfaces in kvserver, they are: - replicaFlowControlIntegration: used to integrate with replication flow control. It's intercepts various points in a replica's lifecycle, like it acquiring raft leadership or losing it, or its raft membership changing, etc. Accessing it requires Replica.mu to be held, exclusively (this is asserted on in the canonical implementation). type replicaFlowControlIntegration interface { handle() (kvflowcontrol.Handle, bool) onBecameLeader(context.Context) onBecameFollower(context.Context) onDescChanged(context.Context) onFollowersPaused(context.Context) onReplicaDestroyed(context.Context) onProposalQuotaUpdated(context.Context) } - replicaForFlowControl abstracts the interface of an individual Replica, as needed by replicaFlowControlIntegration. type replicaForFlowControl interface { assertLocked() annotateCtx(context.Context) context.Context getTenantID() roachpb.TenantID getReplicaID() roachpb.ReplicaID getRangeID() roachpb.RangeID getDescriptor() *roachpb.RangeDescriptor pausedFollowers() map[roachpb.ReplicaID]struct{} isFollowerActive(context.Context, roachpb.ReplicaID) bool appliedLogPosition() kvflowcontrolpb.RaftLogPosition withReplicaProgress(f func(roachpb.ReplicaID, rafttracker.Progress)) } Release note: None --- .../settings/settings-for-tenants.txt | 2 +- docs/generated/settings/settings.html | 2 +- pkg/clusterversion/cockroach_versions.go | 8 + pkg/kv/kvserver/BUILD.bazel | 3 + pkg/kv/kvserver/flow_control_replica.go | 107 +++++ .../flow_control_replica_integration.go | 432 ++++++++++++++++++ pkg/kv/kvserver/flow_control_stores.go | 43 +- pkg/kv/kvserver/kvadmission/BUILD.bazel | 3 + pkg/kv/kvserver/kvadmission/kvadmission.go | 92 +++- pkg/kv/kvserver/kvflowcontrol/BUILD.bazel | 1 + pkg/kv/kvserver/kvflowcontrol/doc.go | 15 + .../kvserver/kvflowcontrol/kvflowcontrol.go | 99 +++- .../kvflowcontroller/kvflowcontroller.go | 23 +- .../kvflowcontroller_metrics.go | 27 +- .../kvflowcontrol/kvflowcontrolpb/BUILD.bazel | 7 +- ...{raft_log_position.go => kvflowcontrol.go} | 15 +- .../kvflowcontrolpb/kvflowcontrol.proto | 2 + .../kvflowcontrol/kvflowhandle/BUILD.bazel | 2 + .../kvflowhandle/kvflowhandle.go | 91 +++- .../kvflowhandle/kvflowhandle_metrics.go | 30 +- .../kvflowhandle/kvflowhandle_test.go | 124 ++++- .../kvflowcontrol/kvflowhandle/noop.go | 63 +++ .../kvflowsimulator/simulation_test.go | 6 +- .../testdata/handle_stream_disconnection | 4 +- .../kvflowtokentracker/BUILD.bazel | 1 + .../kvflowtokentracker/tracker.go | 41 +- .../kvflowtokentracker/tracker_test.go | 4 +- pkg/kv/kvserver/raftlog/encoding_test.go | 2 +- pkg/kv/kvserver/replica.go | 10 + pkg/kv/kvserver/replica_app_batch.go | 7 +- pkg/kv/kvserver/replica_application_cmd.go | 6 + pkg/kv/kvserver/replica_application_result.go | 4 + pkg/kv/kvserver/replica_destroy.go | 3 +- pkg/kv/kvserver/replica_init.go | 5 + pkg/kv/kvserver/replica_init_test.go | 1 + pkg/kv/kvserver/replica_proposal.go | 11 + pkg/kv/kvserver/replica_proposal_buf.go | 105 ++++- pkg/kv/kvserver/replica_proposal_buf_test.go | 46 ++ pkg/kv/kvserver/replica_proposal_quota.go | 11 +- pkg/kv/kvserver/replica_raft.go | 79 +++- pkg/kv/kvserver/replica_raft_overload.go | 1 + pkg/kv/kvserver/store.go | 13 +- pkg/roachpb/metadata_replicas.go | 20 + pkg/roachpb/metadata_replicas_test.go | 40 ++ pkg/server/BUILD.bazel | 5 + pkg/server/admission.go | 55 +++ pkg/server/node.go | 8 +- pkg/server/server.go | 66 ++- pkg/util/admission/granter_test.go | 4 +- .../replicated_write_admission_test.go | 2 +- pkg/util/admission/work_queue.go | 38 +- pkg/util/admission/work_queue_test.go | 2 +- pkg/util/metric/registry.go | 4 + 53 files changed, 1667 insertions(+), 128 deletions(-) create mode 100644 pkg/kv/kvserver/flow_control_replica.go create mode 100644 pkg/kv/kvserver/flow_control_replica_integration.go rename pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/{raft_log_position.go => kvflowcontrol.go} (72%) create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowhandle/noop.go create mode 100644 pkg/server/admission.go diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 177e2e491a57..e8ca9b68e80a 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -300,4 +300,4 @@ trace.opentelemetry.collector string address of an OpenTelemetry trace collecto trace.snapshot.rate duration 0s if non-zero, interval at which background trace snapshots are captured tenant-rw trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https:///#/debug/tracez tenant-rw trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. tenant-rw -version version 1000023.1-8 set the active cluster version in the format '.' tenant-rw +version version 1000023.1-10 set the active cluster version in the format '.' tenant-rw diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 986931b1a29e..ed142364fa63 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -254,6 +254,6 @@
trace.snapshot.rate
duration0sif non-zero, interval at which background trace snapshots are capturedServerless/Dedicated/Self-Hosted
trace.span_registry.enabled
booleantrueif set, ongoing traces can be seen at https://<ui>/#/debug/tracezServerless/Dedicated/Self-Hosted
trace.zipkin.collector
stringthe address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.Serverless/Dedicated/Self-Hosted -
version
version1000023.1-8set the active cluster version in the format '<major>.<minor>'Serverless/Dedicated/Self-Hosted +
version
version1000023.1-10set the active cluster version in the format '<major>.<minor>'Serverless/Dedicated/Self-Hosted diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 5e30740f1e9b..3c61b43e0956 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -538,6 +538,10 @@ const ( // the system tenant. V23_2_EnableRangeCoalescingForSystemTenant + // V23_2_UseACRaftEntryEntryEncodings gates the use of raft entry encodings + // that (optionally) embed below-raft admission data. + V23_2_UseACRaftEntryEntryEncodings + // ************************************************* // Step (1) Add new versions here. // Do not add new versions to a patch release. @@ -935,6 +939,10 @@ var rawVersionsSingleton = keyedVersions{ Key: V23_2_EnableRangeCoalescingForSystemTenant, Version: roachpb.Version{Major: 23, Minor: 1, Internal: 8}, }, + { + Key: V23_2_UseACRaftEntryEntryEncodings, + Version: roachpb.Version{Major: 23, Minor: 1, Internal: 10}, + }, // ************************************************* // Step (2): Add new versions here. diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index e647abef2b17..21e78a41824e 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -13,6 +13,8 @@ go_library( "debug_print.go", "doc.go", "flow_control_raft_transport.go", + "flow_control_replica.go", + "flow_control_replica_integration.go", "flow_control_stores.go", "lease_history.go", "markers.go", @@ -141,6 +143,7 @@ go_library( "//pkg/kv/kvserver/kvflowcontrol", "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch", + "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/kvserverpb", "//pkg/kv/kvserver/kvstorage", diff --git a/pkg/kv/kvserver/flow_control_replica.go b/pkg/kv/kvserver/flow_control_replica.go new file mode 100644 index 000000000000..0f73e3241995 --- /dev/null +++ b/pkg/kv/kvserver/flow_control_replica.go @@ -0,0 +1,107 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "go.etcd.io/raft/v3" + rafttracker "go.etcd.io/raft/v3/tracker" +) + +// replicaForFlowControl abstracts the interface of an individual Replica, as +// needed by replicaFlowControlIntegration. +type replicaForFlowControl interface { + annotateCtx(context.Context) context.Context + getTenantID() roachpb.TenantID + getReplicaID() roachpb.ReplicaID + getRangeID() roachpb.RangeID + getDescriptor() *roachpb.RangeDescriptor + getAppliedLogPosition() kvflowcontrolpb.RaftLogPosition + getPausedFollowers() map[roachpb.ReplicaID]struct{} + isFollowerLive(context.Context, roachpb.ReplicaID) bool + isRaftTransportConnectedTo(roachpb.StoreID) bool + withReplicaProgress(f func(roachpb.ReplicaID, rafttracker.Progress)) + + assertLocked() // only affects test builds +} + +// replicaFlowControl is a concrete implementation of the replicaForFlowControl +// interface. +type replicaFlowControl Replica + +var _ replicaForFlowControl = &replicaFlowControl{} + +func (rf *replicaFlowControl) assertLocked() { + rf.mu.AssertHeld() +} + +func (rf *replicaFlowControl) annotateCtx(ctx context.Context) context.Context { + return rf.AnnotateCtx(ctx) +} + +func (rf *replicaFlowControl) getTenantID() roachpb.TenantID { + rf.assertLocked() + return rf.mu.tenantID +} + +func (rf *replicaFlowControl) getReplicaID() roachpb.ReplicaID { + return rf.replicaID +} + +func (rf *replicaFlowControl) getRangeID() roachpb.RangeID { + return rf.RangeID +} + +func (rf *replicaFlowControl) getDescriptor() *roachpb.RangeDescriptor { + rf.assertLocked() + r := (*Replica)(rf) + return r.descRLocked() +} + +func (rf *replicaFlowControl) getPausedFollowers() map[roachpb.ReplicaID]struct{} { + rf.assertLocked() + return rf.mu.pausedFollowers +} + +func (rf *replicaFlowControl) isFollowerLive(ctx context.Context, replID roachpb.ReplicaID) bool { + rf.mu.AssertHeld() + return rf.mu.lastUpdateTimes.isFollowerActiveSince( + ctx, + replID, + timeutil.Now(), + rf.store.cfg.RangeLeaseDuration, + ) +} + +func (rf *replicaFlowControl) isRaftTransportConnectedTo(storeID roachpb.StoreID) bool { + rf.mu.AssertHeld() + return rf.store.cfg.Transport.isConnectedTo(storeID) +} + +func (rf *replicaFlowControl) getAppliedLogPosition() kvflowcontrolpb.RaftLogPosition { + rf.mu.AssertHeld() + status := rf.mu.internalRaftGroup.BasicStatus() + return kvflowcontrolpb.RaftLogPosition{ + Term: status.Term, + Index: status.Applied, + } +} + +func (rf *replicaFlowControl) withReplicaProgress(f func(roachpb.ReplicaID, rafttracker.Progress)) { + rf.mu.AssertHeld() + rf.mu.internalRaftGroup.WithProgress(func(id uint64, _ raft.ProgressType, progress rafttracker.Progress) { + f(roachpb.ReplicaID(id), progress) + }) +} diff --git a/pkg/kv/kvserver/flow_control_replica_integration.go b/pkg/kv/kvserver/flow_control_replica_integration.go new file mode 100644 index 000000000000..051fad507fcb --- /dev/null +++ b/pkg/kv/kvserver/flow_control_replica_integration.go @@ -0,0 +1,432 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "context" + "sort" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/log" + rafttracker "go.etcd.io/raft/v3/tracker" +) + +// TODO(irfansharif): Write integration tests, walking through +// kvflowcontrol/doc.go. Do this as part of #95563. Randomize an in-memory +// workload with various chaos events, like nodes dying, streams breaking, +// splits, merges, etc. and assert stable flow tokens. Leader replica removing +// itself from descriptor. Uninitialized replica below raft for which we've +// deducted flow tokens for (dealt with by looking at StateReplicate). Dropped +// proposals -- we should only be deducting tokens once submitting to raft. But +// if leader's raft messages to follower get dropped (and not vice versa), +// leader will still see follower as active and not disconnect streams. Has this +// changed with us upgrading asymmetric partitions to bidirectional ones? + +// TODO(irfansharif): Write data-driven unit tests for this interface. + +// replicaFlowControlIntegration is used to integrate with replication flow +// control. It's intercepts various points in a replica's lifecycle, like it +// acquiring raft leadership or losing it, or its raft membership changing, etc. +// +// Accessing it requires Replica.mu to be held, exclusively (this is asserted on +// in the canonical implementation). +type replicaFlowControlIntegration interface { + handle() (kvflowcontrol.Handle, bool) + onBecameLeader(context.Context) + onBecameFollower(context.Context) + onDescChanged(context.Context) + onFollowersPaused(context.Context) + onReplicaDestroyed(context.Context) + onProposalQuotaUpdated(context.Context) + onRaftTransportDisconnected(context.Context, ...roachpb.StoreID) +} + +var _ replicaFlowControlIntegration = &replicaFlowControlIntegrationImpl{} + +type replicaFlowControlIntegrationImpl struct { + replicaForFlowControl replicaForFlowControl + handleFactory kvflowcontrol.HandleFactory + + innerHandle kvflowcontrol.Handle + lastKnownReplicas roachpb.ReplicaSet + disconnectedStreams map[roachpb.ReplicaID]kvflowcontrol.Stream +} + +func newReplicaFlowControlIntegration( + replicaForFlowControl replicaForFlowControl, handleFactory kvflowcontrol.HandleFactory, +) *replicaFlowControlIntegrationImpl { + return &replicaFlowControlIntegrationImpl{ + replicaForFlowControl: replicaForFlowControl, + handleFactory: handleFactory, + } +} + +func (f *replicaFlowControlIntegrationImpl) handle() (kvflowcontrol.Handle, bool) { + f.replicaForFlowControl.assertLocked() + return f.innerHandle, f.innerHandle != nil +} + +func (f *replicaFlowControlIntegrationImpl) onBecameLeader(ctx context.Context) { + f.replicaForFlowControl.assertLocked() + if f.innerHandle != nil { + log.Fatal(ctx, "flow control handle was not nil before becoming the leader") + } + if !f.replicaForFlowControl.getTenantID().IsSet() { + log.Fatal(ctx, "unset tenant ID") + } + + // See I5 from kvflowcontrol/doc.go. The per-replica kvflowcontrol.Handle is + // tied to the lifetime of a leaseholder replica having raft leadership. + f.innerHandle = f.handleFactory.NewHandle( + f.replicaForFlowControl.getRangeID(), + f.replicaForFlowControl.getTenantID(), + ) + f.lastKnownReplicas = f.replicaForFlowControl.getDescriptor().Replicas() + f.disconnectedStreams = make(map[roachpb.ReplicaID]kvflowcontrol.Stream) + + appliedLogPosition := f.replicaForFlowControl.getAppliedLogPosition() + for _, desc := range f.replicaForFlowControl.getDescriptor().Replicas().Descriptors() { + // Start off every remote stream as disconnected. Later we'll try to + // reconnect them. + stream := kvflowcontrol.Stream{ + TenantID: f.replicaForFlowControl.getTenantID(), + StoreID: desc.StoreID, + } + if f.replicaForFlowControl.getReplicaID() != desc.ReplicaID { + f.disconnectedStreams[desc.ReplicaID] = stream + continue + } + // Connect to the local stream. + f.innerHandle.ConnectStream(ctx, appliedLogPosition, stream) + } + f.tryReconnect(ctx) + + if log.V(1) { + var disconnected []kvflowcontrol.Stream + for _, stream := range f.disconnectedStreams { + disconnected = append(disconnected, stream) + } + sort.Slice(disconnected, func(i, j int) bool { + return disconnected[i].StoreID < disconnected[j].StoreID + }) + log.Infof(ctx, "assumed raft leadership: initializing flow handle for %s starting at %s (disconnected streams: %s)", + f.replicaForFlowControl.getDescriptor(), appliedLogPosition, disconnected) + } +} + +func (f *replicaFlowControlIntegrationImpl) onBecameFollower(ctx context.Context) { + f.replicaForFlowControl.assertLocked() + if f.innerHandle == nil { + return + } + + // See I5 from kvflowcontrol/doc.go. The per-replica kvflowcontrol.Handle is + // tied to the lifetime of a leaseholder replica having raft leadership. + // When leadership is lost, or the lease changes hands, we release all held + // flow tokens. Here we're not dealing with prolonged leaseholder != leader + // scenarios. + log.VInfof(ctx, 1, "lost raft leadership: releasing flow tokens and closing handle for %s", + f.replicaForFlowControl.getDescriptor()) + f.innerHandle.Close(ctx) + f.innerHandle = nil + f.lastKnownReplicas = roachpb.MakeReplicaSet(nil) + f.disconnectedStreams = nil +} + +func (f *replicaFlowControlIntegrationImpl) onDescChanged(ctx context.Context) { + f.replicaForFlowControl.assertLocked() + if f.innerHandle == nil { + return // nothing to do + } + + addedReplicas, removedReplicas := f.lastKnownReplicas.Difference( + f.replicaForFlowControl.getDescriptor().Replicas(), + ) + for _, repl := range removedReplicas { + if repl.ReplicaID == f.replicaForFlowControl.getReplicaID() { + // We're observing ourselves get removed from the raft group, but + // are still retaining raft leadership. Close the underlying handle + // and bail. + // + // TODO(irfansharif): Is this even possible? + f.innerHandle.Close(ctx) + f.innerHandle = nil + f.lastKnownReplicas = roachpb.MakeReplicaSet(nil) + f.disconnectedStreams = nil + return + } + // See I10 from kvflowcontrol/doc.go. We stop deducting flow tokens for + // replicas that are no longer part of the raft group, free-ing up all + // held tokens. + f.innerHandle.DisconnectStream(ctx, kvflowcontrol.Stream{ + TenantID: f.replicaForFlowControl.getTenantID(), + StoreID: repl.StoreID, + }) + delete(f.disconnectedStreams, repl.ReplicaID) + } + + for _, repl := range addedReplicas { + // Start off new replicas as disconnected. We'll subsequently try to + // re-add them, once we know their log positions and consider them + // sufficiently caught up. See I3a from kvflowcontrol/doc.go. + if repl.ReplicaID == f.replicaForFlowControl.getReplicaID() { + log.Fatalf(ctx, "observed replica adding itself to the range descriptor") + } + if _, found := f.disconnectedStreams[repl.ReplicaID]; found { + continue // already disconnected, nothing to do + } + stream := kvflowcontrol.Stream{ + TenantID: f.replicaForFlowControl.getTenantID(), + StoreID: repl.StoreID, + } + f.disconnectedStreams[repl.ReplicaID] = stream + } + if len(addedReplicas) > 0 || len(removedReplicas) > 0 { + log.VInfof(ctx, 1, "desc changed from %s to %s: added=%s removed=%s", + f.lastKnownReplicas, f.replicaForFlowControl.getDescriptor(), addedReplicas, removedReplicas, + ) + } + f.lastKnownReplicas = f.replicaForFlowControl.getDescriptor().Replicas() +} + +func (f *replicaFlowControlIntegrationImpl) onFollowersPaused(ctx context.Context) { + f.replicaForFlowControl.assertLocked() + if f.innerHandle == nil { + return // nothing to do + } + + var toDisconnect []roachpb.ReplicaDescriptor + // See I3 from kvflowcontrol/doc.go. We don't deduct flow tokens for + // replication traffic that's not headed to paused replicas. + for replID := range f.replicaForFlowControl.getPausedFollowers() { + repl, ok := f.lastKnownReplicas.GetReplicaDescriptorByID(replID) + if !ok { + // As of 4/23, we don't make any strong guarantees around the set of + // paused followers we're tracking, nothing that ensures that what's + // tracked is guaranteed to be a member of the range descriptor. We + // treat the range descriptor derived state as authoritative. + continue + } + if repl.ReplicaID == f.replicaForFlowControl.getReplicaID() { + log.Fatalf(ctx, "observed replica pausing replication traffic to itself") + } + toDisconnect = append(toDisconnect, repl) + } + + f.disconnectStreams(ctx, toDisconnect, "paused followers") + f.tryReconnect(ctx) +} + +func (f *replicaFlowControlIntegrationImpl) onReplicaDestroyed(ctx context.Context) { + f.replicaForFlowControl.assertLocked() + if f.innerHandle == nil { + return // nothing to do + } + + // During merges, the context might have the subsuming range, so we + // explicitly (re-)annotate it here. + ctx = f.replicaForFlowControl.annotateCtx(ctx) + + // See I6, I9 from kvflowcontrol/doc.go. We want to free up all held flow + // tokens when a replica is being removed, for example when it's being + // rebalanced away, is no longer part of the raft group, is being GC-ed, + // destroyed as part of the EndTxn merge trigger, or subsumed if applying + // the merge as part of an incoming snapshot. + f.innerHandle.Close(ctx) + f.innerHandle = nil + f.lastKnownReplicas = roachpb.MakeReplicaSet(nil) + f.disconnectedStreams = nil +} + +func (f *replicaFlowControlIntegrationImpl) onProposalQuotaUpdated(ctx context.Context) { + f.replicaForFlowControl.assertLocked() + if f.innerHandle == nil { + return // nothing to do + } + + var toDisconnect []roachpb.ReplicaDescriptor + + // Disconnect any recently inactive followers. + // + // TODO(irfansharif): Experimentally this gets triggered quite often. It + // might be too sensitive and may result in ineffective flow control as + // a result. Fix as part of #95563. + for _, repl := range f.lastKnownReplicas.Descriptors() { + if f.replicaForFlowControl.isFollowerLive(ctx, repl.ReplicaID) { + continue // nothing to do + } + if repl.ReplicaID == f.replicaForFlowControl.getReplicaID() { + // NB: We ignore ourselves from this last-updated map. For followers + // we update the timestamps when we step a message from them into + // the local raft group, but for the leader we only update it + // whenever it ticks. So in workloads where the leader only sees + // occasional writes, it could see itself as non-live. This is + // likely unintentional, but we paper over it here. + continue // nothing to do + } + toDisconnect = append(toDisconnect, repl) + } + f.disconnectStreams(ctx, toDisconnect, "inactive followers") + + // Disconnect any streams we're not actively replicating to. + toDisconnect = nil + for _, replID := range f.notActivelyReplicatingTo() { + repl, ok := f.lastKnownReplicas.GetReplicaDescriptorByID(replID) + if !ok { + continue + } + if repl.ReplicaID == f.replicaForFlowControl.getReplicaID() { + log.Fatalf(ctx, "leader replica observed that it was not being actively replicated to") + } + toDisconnect = append(toDisconnect, repl) + } + f.disconnectStreams(ctx, toDisconnect, "not actively replicating") + + f.tryReconnect(ctx) +} + +// notActivelyReplicatingTo lists the replicas that aren't actively receiving +// log entries to append to its log, from raft's perspective (i.e. this is +// unrelated to CRDB-level follower pausing). This encompasses newly added +// replicas that we're still probing to figure out its last index, replicas +// that are pending raft snapshots because the leader has truncated away entries +// higher than its last position, and replicas we're not currently connected to +// via the raft transport. +func (f *replicaFlowControlIntegrationImpl) notActivelyReplicatingTo() []roachpb.ReplicaID { + var res []roachpb.ReplicaID + f.replicaForFlowControl.withReplicaProgress(func(replID roachpb.ReplicaID, progress rafttracker.Progress) { + if replID == f.replicaForFlowControl.getReplicaID() { + return + } + repl, ok := f.lastKnownReplicas.GetReplicaDescriptorByID(replID) + if !ok { + return + } + + if progress.State != rafttracker.StateReplicate { + res = append(res, replID) + // TODO(irfansharif): Integrating with these other progress fields + // from raft. For replicas exiting rafttracker.StateProbe, perhaps + // compare progress.Match against status.Commit to make sure it's + // sufficiently caught up with respect to its raft log before we + // start deducting tokens for it (lest we run into I3a from + // kvflowcontrol/doc.go). To play well with the replica-level + // proposal quota pool, maybe we also factor its base index? + // Replicas that crashed and came back could come back in + // StateReplicate but be behind on their logs. If we're deducting + // tokens right away for subsequent proposals, it would take some + // time for it to catch up and then later return those tokens to us. + // This is I3a again; do it as part of #95563. + _ = progress.RecentActive + _ = progress.MsgAppFlowPaused + _ = progress.Match + return + } + + if !f.replicaForFlowControl.isRaftTransportConnectedTo(repl.StoreID) { + res = append(res, replID) + } + }) + return res +} + +func (f *replicaFlowControlIntegrationImpl) disconnectStreams( + ctx context.Context, toDisconnect []roachpb.ReplicaDescriptor, reason string, +) { + for _, repl := range toDisconnect { + if _, found := f.disconnectedStreams[repl.ReplicaID]; found { + continue // already disconnected, nothing to do + } + stream := kvflowcontrol.Stream{ + TenantID: f.replicaForFlowControl.getTenantID(), + StoreID: repl.StoreID, + } + f.innerHandle.DisconnectStream(ctx, stream) + f.disconnectedStreams[repl.ReplicaID] = stream + log.VInfof(ctx, 1, "tracked disconnected stream: %s (reason: %s)", stream, reason) + } +} + +func (f *replicaFlowControlIntegrationImpl) onRaftTransportDisconnected( + ctx context.Context, storeIDs ...roachpb.StoreID, +) { + f.replicaForFlowControl.assertLocked() + if f.innerHandle == nil { + return // nothing to do + } + + disconnectedStores := make(map[roachpb.StoreID]struct{}) + for _, storeID := range storeIDs { + disconnectedStores[storeID] = struct{}{} + } + + var toDisconnect []roachpb.ReplicaDescriptor + for _, repl := range f.lastKnownReplicas.Descriptors() { + if _, found := disconnectedStores[repl.StoreID]; found { + toDisconnect = append(toDisconnect, repl) + } + } + f.disconnectStreams(ctx, toDisconnect, "raft transport disconnected") + f.tryReconnect(ctx) +} + +func (f *replicaFlowControlIntegrationImpl) tryReconnect(ctx context.Context) { + // Try reconnecting streams we disconnected. + pausedFollowers := f.replicaForFlowControl.getPausedFollowers() + notActivelyReplicatingTo := f.notActivelyReplicatingTo() + appliedLogPosition := f.replicaForFlowControl.getAppliedLogPosition() + + var disconnectedRepls []roachpb.ReplicaID + for replID := range f.disconnectedStreams { + disconnectedRepls = append(disconnectedRepls, replID) + } + sort.Slice(disconnectedRepls, func(i, j int) bool { // for determinism in tests + return disconnectedRepls[i] < disconnectedRepls[j] + }) + for _, replID := range disconnectedRepls { + stream := f.disconnectedStreams[replID] + if _, ok := pausedFollowers[replID]; ok { + continue // still paused, nothing to reconnect + } + + repl, ok := f.lastKnownReplicas.GetReplicaDescriptorByID(replID) + if !ok { + log.Fatalf(ctx, "%s: tracking %s in disconnected streams despite it not being in descriptor: %s", + f.replicaForFlowControl.getReplicaID(), replID, f.lastKnownReplicas) + } + if !f.replicaForFlowControl.isFollowerLive(ctx, replID) { + continue // still inactive, nothing to reconnect + } + + notReplicatedTo := false + for _, notReplicatedToRepl := range notActivelyReplicatingTo { + if replID == notReplicatedToRepl { + notReplicatedTo = true + break + } + } + if notReplicatedTo { + continue // not actively replicated to, yet; nothing to reconnect + } + + if !f.replicaForFlowControl.isRaftTransportConnectedTo(repl.StoreID) { + continue // not connected to via raft transport + } + + // See I1, I2, I3, I3a, I4 from kvflowcontrol/doc.go. Replica is + // connected to via the RaftTransport (I1), on a live node (I2), not + // paused (I3), and is being actively replicated to through log entries + // (I3a, I4). Re-connect so we can start deducting tokens for it. + f.innerHandle.ConnectStream(ctx, appliedLogPosition, stream) + delete(f.disconnectedStreams, replID) + } +} diff --git a/pkg/kv/kvserver/flow_control_stores.go b/pkg/kv/kvserver/flow_control_stores.go index fe0d14ad69ac..98ca2dcc4e93 100644 --- a/pkg/kv/kvserver/flow_control_stores.go +++ b/pkg/kv/kvserver/flow_control_stores.go @@ -14,6 +14,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowhandle" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/log" ) @@ -104,14 +105,22 @@ func (sh *storeForFlowControl) Lookup( if repl == nil { return nil, false } - return nil, false // TODO(irfansharif): Fill this in. + + repl.mu.Lock() + defer repl.mu.Unlock() + return repl.mu.replicaFlowControlIntegration.handle() } // ResetStreams is part of the StoresForFlowControl interface. func (sh *storeForFlowControl) ResetStreams(ctx context.Context) { s := (*Store)(sh) s.VisitReplicas(func(r *Replica) (wantMore bool) { - // TODO(irfansharif): Fill this in. + r.mu.Lock() + defer r.mu.Unlock() + handle, found := r.mu.replicaFlowControlIntegration.handle() + if found { + handle.ResetStreams(ctx) + } return true }) } @@ -123,10 +132,38 @@ func (sh *storeForFlowControl) OnRaftTransportDisconnected( ) { s := (*Store)(sh) s.mu.replicasByRangeID.Range(func(replica *Replica) { - // TODO(irfansharif): Fill this in. + replica.mu.Lock() + defer replica.mu.Unlock() + replica.mu.replicaFlowControlIntegration.onRaftTransportDisconnected(ctx, storeIDs...) }) } +// storeFlowControlHandleFactory is a concrete implementation of +// kvflowcontrol.HandleFactory. +type storeFlowControlHandleFactory Store + +var _ kvflowcontrol.HandleFactory = &storeFlowControlHandleFactory{} + +// makeStoreFlowControlHandleFactory returns a new storeFlowControlHandleFactory +// instance. +func makeStoreFlowControlHandleFactory(store *Store) *storeFlowControlHandleFactory { + return (*storeFlowControlHandleFactory)(store) +} + +// NewHandle is part of the kvflowcontrol.HandleFactory interface. +func (shf *storeFlowControlHandleFactory) NewHandle( + rangeID roachpb.RangeID, tenantID roachpb.TenantID, +) kvflowcontrol.Handle { + s := (*Store)(shf) + return kvflowhandle.New( + s.cfg.KVFlowController, + s.cfg.KVFlowHandleMetrics, + s.cfg.Clock, + rangeID, + tenantID, + ) +} + // NoopStoresFlowControlIntegration is a no-op implementation of the // StoresForFlowControl interface. type NoopStoresFlowControlIntegration struct{} diff --git a/pkg/kv/kvserver/kvadmission/BUILD.bazel b/pkg/kv/kvserver/kvadmission/BUILD.bazel index 175b864bfe01..fdf83611cc78 100644 --- a/pkg/kv/kvserver/kvadmission/BUILD.bazel +++ b/pkg/kv/kvserver/kvadmission/BUILD.bazel @@ -7,8 +7,11 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission", visibility = ["//visibility:public"], deps = [ + "//pkg/base", + "//pkg/clusterversion", "//pkg/kv/kvpb", "//pkg/kv/kvserver/kvflowcontrol", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", "//pkg/kv/kvserver/raftlog", "//pkg/roachpb", "//pkg/settings", diff --git a/pkg/kv/kvserver/kvadmission/kvadmission.go b/pkg/kv/kvserver/kvadmission/kvadmission.go index 0ebb62537f20..f0a61eea5aa9 100644 --- a/pkg/kv/kvserver/kvadmission/kvadmission.go +++ b/pkg/kv/kvserver/kvadmission/kvadmission.go @@ -18,8 +18,11 @@ import ( "sync" "time" + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" @@ -182,13 +185,18 @@ type TenantWeightsForStore struct { // controllerImpl implements Controller interface. type controllerImpl struct { + nodeID *base.NodeIDContainer + // Admission control queues and coordinators. All three should be nil or // non-nil. kvAdmissionQ *admission.WorkQueue storeGrantCoords *admission.StoreGrantCoordinators elasticCPUGrantCoordinator *admission.ElasticCPUGrantCoordinator - settings *cluster.Settings - every log.EveryN + kvflowController kvflowcontrol.Controller + kvflowHandles kvflowcontrol.Handles + + settings *cluster.Settings + every log.EveryN } var _ Controller = &controllerImpl{} @@ -202,23 +210,42 @@ type Handle struct { tenantID roachpb.TenantID storeAdmissionQ *admission.StoreWorkQueue storeWorkHandle admission.StoreWorkHandle - ElasticCPUWorkHandle *admission.ElasticCPUWorkHandle + elasticCPUWorkHandle *admission.ElasticCPUWorkHandle + raftAdmissionMeta *kvflowcontrolpb.RaftAdmissionMeta callAdmittedWorkDoneOnKVAdmissionQ bool } +// AnnotateCtx annotates the given context with request-scoped admission +// data, plumbed through the KV stack using context.Contexts. +func (h *Handle) AnnotateCtx(ctx context.Context) context.Context { + if h.elasticCPUWorkHandle != nil { + ctx = admission.ContextWithElasticCPUWorkHandle(ctx, h.elasticCPUWorkHandle) + } + if h.raftAdmissionMeta != nil { + ctx = kvflowcontrol.ContextWithMeta(ctx, h.raftAdmissionMeta) + } + return ctx +} + // MakeController returns a Controller. All three parameters must together be // nil or non-nil. func MakeController( + nodeID *base.NodeIDContainer, kvAdmissionQ *admission.WorkQueue, elasticCPUGrantCoordinator *admission.ElasticCPUGrantCoordinator, storeGrantCoords *admission.StoreGrantCoordinators, + kvflowController kvflowcontrol.Controller, + kvflowHandles kvflowcontrol.Handles, settings *cluster.Settings, ) Controller { return &controllerImpl{ + nodeID: nodeID, kvAdmissionQ: kvAdmissionQ, storeGrantCoords: storeGrantCoords, elasticCPUGrantCoordinator: elasticCPUGrantCoordinator, + kvflowController: kvflowController, + kvflowHandles: kvflowHandles, settings: settings, every: log.Every(10 * time.Second), } @@ -273,22 +300,44 @@ func (n *controllerImpl) AdmitKVWork( // to continue even when throttling since there are often significant // number of tokens available. if ba.IsWrite() && !ba.IsSingleHeartbeatTxnRequest() { - storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(ba.Replica.StoreID)) - if storeAdmissionQ != nil { - storeWorkHandle, err := storeAdmissionQ.Admit( - ctx, admission.StoreWriteWorkInfo{WorkInfo: admissionInfo}) - if err != nil { + if !bypassAdmission && + kvflowcontrol.Enabled.Get(&n.settings.SV) && + n.settings.Version.IsActive(ctx, clusterversion.V23_2_UseACRaftEntryEntryEncodings) { + kvflowHandle, found := n.kvflowHandles.Lookup(ba.RangeID) + if !found { + return Handle{}, nil + } + if err := kvflowHandle.Admit(ctx, admissionInfo.Priority, timeutil.FromUnixNanos(createTime)); err != nil { return Handle{}, err } - admissionEnabled = storeWorkHandle.UseAdmittedWorkDone() - if admissionEnabled { - defer func() { - if retErr != nil { - // No bytes were written. - _ = storeAdmissionQ.AdmittedWorkDone(ah.storeWorkHandle, admission.StoreWorkDoneInfo{}) - } - }() - ah.storeAdmissionQ, ah.storeWorkHandle = storeAdmissionQ, storeWorkHandle + // NB: It's possible for us to be waiting for available flow tokens + // for a different set of streams that the ones we'll eventually + // deduct tokens from, if the range experiences a split between now + // and the point of deduction. That's ok, there's no strong + // synchronization needed between these two points. + ah.raftAdmissionMeta = &kvflowcontrolpb.RaftAdmissionMeta{ + AdmissionPriority: int32(admissionInfo.Priority), + AdmissionCreateTime: admissionInfo.CreateTime, + AdmissionOriginNode: n.nodeID.Get(), + } + } else { + storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(ba.Replica.StoreID)) + if storeAdmissionQ != nil { + storeWorkHandle, err := storeAdmissionQ.Admit( + ctx, admission.StoreWriteWorkInfo{WorkInfo: admissionInfo}) + if err != nil { + return Handle{}, err + } + admissionEnabled = storeWorkHandle.UseAdmittedWorkDone() + if admissionEnabled { + defer func() { + if retErr != nil { + // No bytes were written. + _ = storeAdmissionQ.AdmittedWorkDone(ah.storeWorkHandle, admission.StoreWorkDoneInfo{}) + } + }() + ah.storeAdmissionQ, ah.storeWorkHandle = storeAdmissionQ, storeWorkHandle + } } } } @@ -312,11 +361,11 @@ func (n *controllerImpl) AdmitKVWork( if err != nil { return Handle{}, err } - ah.ElasticCPUWorkHandle = elasticWorkHandle + ah.elasticCPUWorkHandle = elasticWorkHandle defer func() { if retErr != nil { // No elastic work was done. - n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle) + n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.AdmittedWorkDone(ah.elasticCPUWorkHandle) } }() } else { @@ -332,7 +381,7 @@ func (n *controllerImpl) AdmitKVWork( // AdmittedKVWorkDone implements the Controller interface. func (n *controllerImpl) AdmittedKVWorkDone(ah Handle, writeBytes *StoreWriteBytes) { - n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle) + n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.AdmittedWorkDone(ah.elasticCPUWorkHandle) if ah.callAdmittedWorkDoneOnKVAdmissionQ { n.kvAdmissionQ.AdmittedWorkDone(ah.tenantID) } @@ -466,10 +515,11 @@ func (n *controllerImpl) AdmitRaftEntry( } if log.V(1) { - log.Infof(ctx, "decoded raft admission meta below-raft: pri=%s create-time=%d proposer=n%s receiver=[n?,s%s] tenant=t%d tokens≈%d sideloaded=%t raft-entry=%d/%d", + log.Infof(ctx, "decoded raft admission meta below-raft: pri=%s create-time=%d proposer=n%s receiver=[n%d,s%s] tenant=t%d tokens≈%d sideloaded=%t raft-entry=%d/%d", admissionpb.WorkPriority(meta.AdmissionPriority), meta.AdmissionCreateTime, meta.AdmissionOriginNode, + n.nodeID.Get(), storeID, tenantID.ToUint64(), kvflowcontrol.Tokens(len(entry.Data)), diff --git a/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel index 23b2271aa510..2665a779d145 100644 --- a/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "//pkg/base", "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", "//pkg/roachpb", + "//pkg/settings", "//pkg/util/admission/admissionpb", "@com_github_cockroachdb_redact//:redact", "@com_github_dustin_go_humanize//:go-humanize", diff --git a/pkg/kv/kvserver/kvflowcontrol/doc.go b/pkg/kv/kvserver/kvflowcontrol/doc.go index 49910b2790d5..28ccfa514c17 100644 --- a/pkg/kv/kvserver/kvflowcontrol/doc.go +++ b/pkg/kv/kvserver/kvflowcontrol/doc.go @@ -426,6 +426,21 @@ package kvflowcontrol // - Because the fan-in effects of epoch-LIFO are not well understood (by this // author at least), we just disable it below-raft. // +// I13. What happens when a range {un,}quiesces? +// - Quiescing a range only prevents its internal raft group from being ticked, +// which stops it from issuing MsgHeartbeats or calling elections. Quiesced +// ranges still have a raft leader and/or a leaseholder. Any raft operation +// (for example, proposals) on any replica ends up unquiescing the range, +// typically under stable raft leadership. As far as flow tokens are +// concerned: +// - Quiesced ranges have no steady stream of RaftTransport messages, which we +// use to piggyback flow token returns. But we guarantee timely delivery +// even without messages to piggyback on top of. See I8 above. +// - When returning flow tokens to a quiesced range's leaseholder, that's ok, +// we're able to look up the right kvflowcontrol.Handle since the replica is +// still around. When quiescing a range, we don't need to release all-held +// tokens, or wait until there are no held flow tokens. +// // --- // // [^1]: kvserverpb.RaftMessageRequest is the unit of what's sent diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go index 1e95f1277c8a..05f8b481ff25 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go @@ -18,11 +18,72 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/redact" "github.com/dustin/go-humanize" ) +// Enabled determines whether we use flow control for replication traffic in KV. +var Enabled = settings.RegisterBoolSetting( + settings.SystemOnly, + "kvadmission.flow_control.enabled", + "determines whether we use flow control for replication traffic in KV", + true, +) + +// Mode determines the 'mode' of flow control we use for replication traffic in +// KV, if enabled. +var Mode = settings.RegisterEnumSetting( + settings.SystemOnly, + "kvadmission.flow_control.mode", + "determines the 'mode' of flow control we use for replication traffic in KV, if enabled", + ApplyToAll.String(), + map[int64]string{ + int64(ApplyToElastic): modeDict[ApplyToElastic], + int64(ApplyToAll): modeDict[ApplyToAll], + }, +) + +var modeDict = map[ModeT]string{ + ApplyToElastic: "apply_to_elastic", + ApplyToAll: "apply_to_all", +} + +// ModeT represents the various modes of flow control for replication traffic. +type ModeT int64 + +const ( + // ApplyToElastic uses flow control for only elastic traffic, i.e. only + // elastic work will wait for flow tokens to be available. All work is + // virtually enqueued in below-raft admission queues and dequeued in + // priority order, but only empty elastic flow token buckets above-raft will + // block further elastic traffic from being admitted. + // + // TODO(irfansharif): We're potentially risking OOMs doing all this tracking + // for regular work, without coalescing state. With a bit of plumbing, for + // requests that bypass flow control we could fallback to using the non-AC + // raft encodings and avoid the potential OOMs. Address this as part of + // #95563. + ApplyToElastic ModeT = iota + // ApplyToAll uses flow control for both elastic and regular traffic, + // i.e. all work will wait for flow tokens to be available. + ApplyToAll +) + +func (m ModeT) String() string { + return redact.StringWithoutMarkers(m) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (m ModeT) SafeFormat(p redact.SafePrinter, verb rune) { + if s, ok := modeDict[m]; ok { + p.Print(s) + return + } + p.Print("unknown-mode") +} + // Stream models the stream over which we replicate data traffic, the // transmission for which we regulate using flow control. It's segmented by the // specific store the traffic is bound for and the tenant driving it. Despite @@ -127,7 +188,8 @@ type Handle interface { // DisconnectStream disconnects a stream from the handle. When disconnecting // a stream, (a) all previously held flow tokens are released and (b) we // unblock all requests waiting in Admit() for this stream's flow tokens in - // particular. + // particular. It's a no-op if disconnecting something we're not connected + // to. // // This is typically used when we're no longer replicating data to a member // of the raft group, because (a) it crashed, (b) it's no longer part of the @@ -137,6 +199,11 @@ type Handle interface { // AdmittedRaftLogEntries between what it admitted last and its latest // RaftLogPosition. DisconnectStream(context.Context, Stream) + // ResetStreams resets all connected streams, i.e. it disconnects and + // re-connects to each one. It effectively unblocks all requests waiting in + // Admit(). It's only used when cluster settings change, settings that + // affect all work waiting for flow tokens. + ResetStreams(ctx context.Context) // Close closes the handle and returns all held tokens back to the // underlying controller. Typically used when the replica loses its lease // and/or raft leadership, or ends up getting GC-ed (if it's being @@ -149,6 +216,12 @@ type Handle interface { // they're uniquely identified by their range ID. type Handles interface { Lookup(roachpb.RangeID) (Handle, bool) + ResetStreams(ctx context.Context) +} + +// HandleFactory is used to construct new Handles. +type HandleFactory interface { + NewHandle(roachpb.RangeID, roachpb.TenantID) Handle } // Dispatch is used (i) to dispatch information about admitted raft log entries @@ -205,3 +278,27 @@ func (s Stream) SafeFormat(p redact.SafePrinter, verb rune) { } p.Printf("t%s/s%s", tenantSt, s.StoreID.String()) } + +type raftAdmissionMetaKey struct{} + +// ContextWithMeta returns a Context wrapping the supplied raft admission meta, +// if any. +// +// TODO(irfansharif): This causes a heap allocation. Revisit as part of #95563. +func ContextWithMeta(ctx context.Context, meta *kvflowcontrolpb.RaftAdmissionMeta) context.Context { + if meta != nil { + ctx = context.WithValue(ctx, raftAdmissionMetaKey{}, meta) + } + return ctx +} + +// MetaFromContext returns the raft admission meta embedded in the Context, if +// any. +func MetaFromContext(ctx context.Context) *kvflowcontrolpb.RaftAdmissionMeta { + val := ctx.Value(raftAdmissionMetaKey{}) + h, ok := val.(*kvflowcontrolpb.RaftAdmissionMeta) + if !ok { + return nil + } + return h +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go index a18472414715..073aa741e153 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go @@ -65,8 +65,9 @@ type Controller struct { // minutes), clear these out. buckets map[kvflowcontrol.Stream]bucket } - metrics *metrics - clock *hlc.Clock + metrics *metrics + clock *hlc.Clock + settings *cluster.Settings } var _ kvflowcontrol.Controller = &Controller{} @@ -74,7 +75,8 @@ var _ kvflowcontrol.Controller = &Controller{} // New constructs a new Controller. func New(registry *metric.Registry, settings *cluster.Settings, clock *hlc.Clock) *Controller { c := &Controller{ - clock: clock, + clock: clock, + settings: settings, } regularTokens := kvflowcontrol.Tokens(regularTokensPerStream.Get(&settings.SV)) @@ -115,6 +117,10 @@ func New(registry *metric.Registry, settings *cluster.Settings, clock *hlc.Clock return c } +func (c *Controller) mode() kvflowcontrol.ModeT { + return kvflowcontrol.ModeT(kvflowcontrol.Mode.Get(&c.settings.SV)) +} + // Admit is part of the kvflowcontrol.Controller interface. It blocks until // there are flow tokens available for replication over the given stream for // work of the given priority. @@ -135,10 +141,15 @@ func (c *Controller) Admit( tokens := b.tokens[class] c.mu.Unlock() - if tokens > 0 { + if tokens > 0 || + // In addition to letting requests through when there are tokens + // being available, we'll also let them through if we're not + // applying flow control to their specific work class. + c.mode() == kvflowcontrol.ApplyToElastic && class == admissionpb.RegularWorkClass { + if log.ExpensiveLogEnabled(ctx, 2) { - log.Infof(ctx, "flow tokens available (pri=%s stream=%s tokens=%s wait-duration=%s)", - pri, connection.Stream(), tokens, c.clock.PhysicalTime().Sub(tstart)) + log.Infof(ctx, "admitted request (pri=%s stream=%s tokens=%s wait-duration=%s mode=%s)", + pri, connection.Stream(), tokens, c.clock.PhysicalTime().Sub(tstart), c.mode()) } // TODO(irfansharif): Right now we continue forwarding admission diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_metrics.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_metrics.go index 41c2561575ed..2882ce56fbc2 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_metrics.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_metrics.go @@ -11,11 +11,14 @@ package kvflowcontroller import ( + "context" "fmt" + "strings" "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" ) @@ -197,17 +200,39 @@ func newMetrics(c *Controller) *metrics { return int64(len(c.mu.buckets)) }, ) + + var blockedStreamLogger = log.Every(30 * time.Second) + var buf strings.Builder m.BlockedStreamCount[wc] = metric.NewFunctionalGauge( annotateMetricTemplateWithWorkClass(wc, blockedStreamCount), func() int64 { + shouldLog := blockedStreamLogger.ShouldLog() + count := int64(0) c.mu.Lock() defer c.mu.Unlock() - for _, wbc := range c.mu.buckets { + + for s, wbc := range c.mu.buckets { if wbc.tokens[wc] <= 0 { count += 1 + + if shouldLog { + if count > 10 { + continue // cap output to 10 blocked streams + } + if count == 1 { + buf.Reset() + } + if count > 1 { + buf.WriteString(", ") + } + buf.WriteString(s.String()) + } } } + if shouldLog && count > 0 { + log.Warningf(context.Background(), "%d blocked %s replication stream(s): %s", count, wc, buf.String()) + } return count }, ) diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/BUILD.bazel index 0a2fd87eda43..e73bf09982a5 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/BUILD.bazel @@ -25,11 +25,14 @@ go_proto_library( go_library( name = "kvflowcontrolpb", - srcs = ["raft_log_position.go"], + srcs = ["kvflowcontrol.go"], embed = [":kvflowcontrolpb_go_proto"], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", visibility = ["//visibility:public"], - deps = ["@com_github_cockroachdb_redact//:redact"], + deps = [ + "//pkg/util/admission/admissionpb", + "@com_github_cockroachdb_redact//:redact", + ], ) get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/raft_log_position.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.go similarity index 72% rename from pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/raft_log_position.go rename to pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.go index c363012cc24d..cb1c7f0a7621 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/raft_log_position.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.go @@ -10,7 +10,10 @@ package kvflowcontrolpb -import "github.com/cockroachdb/redact" +import ( + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/redact" +) func (p RaftLogPosition) String() string { return redact.StringWithoutMarkers(p) @@ -40,3 +43,13 @@ func (p *RaftLogPosition) Less(o RaftLogPosition) bool { func (p *RaftLogPosition) LessEq(o RaftLogPosition) bool { return p.Less(o) || p.Equal(o) } + +func (a AdmittedRaftLogEntries) String() string { + return redact.StringWithoutMarkers(a) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (a AdmittedRaftLogEntries) SafeFormat(w redact.SafePrinter, _ rune) { + w.Printf("admitted-entries (r%s s%s pri=%s up-to-%s)", + a.RangeID, a.StoreID, admissionpb.WorkPriority(a.AdmissionPriority), a.UpToRaftLogPosition) +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto index 890209973ab6..d0a0d6830bf8 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto @@ -82,6 +82,8 @@ message RaftAdmissionMeta { // this particular "replication stream" (i.e. flowing to a particular store, // remote or otherwise). message AdmittedRaftLogEntries { + option (gogoproto.goproto_stringer) = false; + // RangeID of the raft group these entries belong to. This is the range on // whose behalf work was admitted. int64 range_id = 1 [(gogoproto.customname) = "RangeID", diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/BUILD.bazel index 0b4379c2a72b..269aaf135876 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "connected_stream.go", "kvflowhandle.go", "kvflowhandle_metrics.go", + "noop.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowhandle", visibility = ["//visibility:public"], @@ -15,6 +16,7 @@ go_library( "//pkg/kv/kvserver/kvflowcontrol", "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", "//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker", + "//pkg/roachpb", "//pkg/util/admission/admissionpb", "//pkg/util/hlc", "//pkg/util/log", diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go index 0ff5ebf18a10..d9aea13641bd 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -31,6 +32,8 @@ type Handle struct { controller kvflowcontrol.Controller metrics *Metrics clock *hlc.Clock + rangeID roachpb.RangeID + tenantID roachpb.TenantID mu struct { syncutil.Mutex @@ -45,11 +48,22 @@ type Handle struct { } // New constructs a new Handle. -func New(controller kvflowcontrol.Controller, metrics *Metrics, clock *hlc.Clock) *Handle { +func New( + controller kvflowcontrol.Controller, + metrics *Metrics, + clock *hlc.Clock, + rangeID roachpb.RangeID, + tenantID roachpb.TenantID, +) *Handle { + if metrics == nil { // only nil in tests + metrics = NewMetrics(nil) + } h := &Handle{ controller: controller, metrics: metrics, clock: clock, + rangeID: rangeID, + tenantID: tenantID, } h.mu.perStreamTokenTracker = map[kvflowcontrol.Stream]*kvflowtokentracker.Tracker{} return h @@ -126,9 +140,12 @@ func (h *Handle) deductTokensForInner( } for _, c := range h.mu.connections { - h.controller.DeductTokens(ctx, pri, tokens, c.Stream()) - h.mu.perStreamTokenTracker[c.Stream()].Track(ctx, pri, tokens, pos) - streams = append(streams, c.Stream()) + if h.mu.perStreamTokenTracker[c.Stream()].Track(ctx, pri, tokens, pos) { + // Only deduct tokens if we're able to track them for subsequent + // returns. We risk leaking flow tokens otherwise. + h.controller.DeductTokens(ctx, pri, tokens, c.Stream()) + streams = append(streams, c.Stream()) + } } return streams } @@ -160,6 +177,14 @@ func (h *Handle) ReturnTokensUpto( return } + if !stream.TenantID.IsSet() { + // NB: The tenant ID is set in the local fast path for token returns, + // through the kvflowcontrol.Dispatch. Tecnically we could set the + // tenant ID by looking up the local replica and reading it, but it's + // easier to do it this way having captured it when the handle was + // instantiated. + stream.TenantID = h.tenantID + } h.mu.Lock() defer h.mu.Unlock() if h.mu.closed { @@ -175,6 +200,13 @@ func (h *Handle) ReturnTokensUpto( func (h *Handle) ConnectStream( ctx context.Context, pos kvflowcontrolpb.RaftLogPosition, stream kvflowcontrol.Stream, ) { + if !stream.TenantID.IsSet() { + // See comment in (*Handle).ReturnTokensUpto above where this same check + // exists. The callers here do typically have this set, but it doesn't + // hurt to be defensive. + stream.TenantID = h.tenantID + } + h.mu.Lock() defer h.mu.Unlock() if h.mu.closed { @@ -182,6 +214,12 @@ func (h *Handle) ConnectStream( return } + h.connectStreamLocked(ctx, pos, stream) +} + +func (h *Handle) connectStreamLocked( + ctx context.Context, pos kvflowcontrolpb.RaftLogPosition, stream kvflowcontrol.Stream, +) { if _, ok := h.mu.perStreamTokenTracker[stream]; ok { log.Fatalf(ctx, "reconnecting already connected stream: %s", stream) } @@ -195,23 +233,54 @@ func (h *Handle) ConnectStream( // that case, this sorting will help avoid deadlocks. return h.mu.connections[i].Stream().StoreID < h.mu.connections[j].Stream().StoreID }) - h.mu.perStreamTokenTracker[stream] = kvflowtokentracker.New(pos, nil /* knobs */) + h.mu.perStreamTokenTracker[stream] = kvflowtokentracker.New(pos, stream, nil /* knobs */) + h.metrics.StreamsConnected.Inc(1) + log.VInfof(ctx, 1, "connected to stream: %s", stream) } // DisconnectStream is part of the kvflowcontrol.Handle interface. func (h *Handle) DisconnectStream(ctx context.Context, stream kvflowcontrol.Stream) { + if !stream.TenantID.IsSet() { + // See comment in (*Handle).ReturnTokensUpto above where this same check + // exists. The callers here do typically have this set, but it doesn't + // hurt to be defensive. + stream.TenantID = h.tenantID + } h.mu.Lock() defer h.mu.Unlock() h.disconnectStreamLocked(ctx, stream) } +// ResetStreams is part of the kvflowcontrol.Handle interface. +func (h *Handle) ResetStreams(ctx context.Context) { + h.mu.Lock() + defer h.mu.Unlock() + if h.mu.closed { + log.Errorf(ctx, "operating on a closed handle") + return + } + + var streams []kvflowcontrol.Stream + var lowerBounds []kvflowcontrolpb.RaftLogPosition + for stream, tracker := range h.mu.perStreamTokenTracker { + streams = append(streams, stream) + lowerBounds = append(lowerBounds, tracker.LowerBound()) + } + for i := range streams { + h.disconnectStreamLocked(ctx, streams[i]) + } + for i := range streams { + h.connectStreamLocked(ctx, lowerBounds[i], streams[i]) + } +} + func (h *Handle) disconnectStreamLocked(ctx context.Context, stream kvflowcontrol.Stream) { if h.mu.closed { log.Errorf(ctx, "operating on a closed handle") return } if _, ok := h.mu.perStreamTokenTracker[stream]; !ok { - log.Fatalf(ctx, "disconnecting non-existent stream: %s", stream) + return } h.mu.perStreamTokenTracker[stream].Iter(ctx, @@ -232,6 +301,8 @@ func (h *Handle) disconnectStreamLocked(ctx context.Context, stream kvflowcontro connection.Disconnect() h.mu.connections = append(h.mu.connections[:streamIdx], h.mu.connections[streamIdx+1:]...) + log.VInfof(ctx, 1, "disconnected stream: %s", stream) + h.metrics.StreamsDisconnected.Inc(1) // TODO(irfansharif): Optionally record lower bound raft log positions for // disconnected streams to guard against regressions when (re-)connecting -- // it must be done with higher positions. @@ -250,8 +321,12 @@ func (h *Handle) Close(ctx context.Context) { return } - for _, connection := range h.mu.connections { - h.disconnectStreamLocked(ctx, connection.Stream()) + var streams []kvflowcontrol.Stream + for stream := range h.mu.perStreamTokenTracker { + streams = append(streams, stream) + } + for _, stream := range streams { + h.disconnectStreamLocked(ctx, stream) } h.mu.closed = true } diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_metrics.go b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_metrics.go index 656805d39b66..5006781b53e8 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_metrics.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_metrics.go @@ -20,6 +20,20 @@ import ( ) var ( + streamsConnected = metric.Metadata{ + Name: "kvadmission.flow_handle.streams_connected", + Help: "Number of times we've connected to a stream, at the handle level", + Measurement: "Streams", + Unit: metric.Unit_COUNT, + } + + streamsDisconnected = metric.Metadata{ + Name: "kvadmission.flow_handle.streams_disconnected", + Help: "Number of times we've disconnected from a stream, at the handle level", + Measurement: "Streams", + Unit: metric.Unit_COUNT, + } + requestsWaiting = metric.Metadata{ Name: "kvadmission.flow_handle.%s_requests_waiting", Help: "Number of %s requests waiting for flow tokens, at the handle level", @@ -62,17 +76,23 @@ func annotateMetricTemplateWithWorkClass( // Metrics is a metric.Struct for all kvflowcontrol.Handles. type Metrics struct { - RequestsWaiting [admissionpb.NumWorkClasses]*metric.Gauge - RequestsAdmitted [admissionpb.NumWorkClasses]*metric.Counter - RequestsErrored [admissionpb.NumWorkClasses]*metric.Counter - WaitDuration [admissionpb.NumWorkClasses]metric.IHistogram + StreamsConnected *metric.Counter + StreamsDisconnected *metric.Counter + RequestsWaiting [admissionpb.NumWorkClasses]*metric.Gauge + RequestsAdmitted [admissionpb.NumWorkClasses]*metric.Counter + RequestsErrored [admissionpb.NumWorkClasses]*metric.Counter + WaitDuration [admissionpb.NumWorkClasses]metric.IHistogram } var _ metric.Struct = &Metrics{} // NewMetrics returns a new instance of Metrics. func NewMetrics(registry *metric.Registry) *Metrics { - m := &Metrics{} + m := &Metrics{ + StreamsConnected: metric.NewCounter(streamsConnected), + StreamsDisconnected: metric.NewCounter(streamsDisconnected), + } + for _, wc := range []admissionpb.WorkClass{ admissionpb.RegularWorkClass, admissionpb.ElasticWorkClass, diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go index a4f1182ac19f..f9a7dda82cc9 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle_test.go @@ -32,7 +32,8 @@ import ( // TestHandleAdmit tests the blocking behavior of Handle.Admit(): // - we block until there are flow tokens available; // - we unblock when streams without flow tokens are disconnected; -// - we unblock when the handle is closed. +// - we unblock when the handle is closed; +// - we unblock when the handle is reset. func TestHandleAdmit(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -69,12 +70,30 @@ func TestHandleAdmit(t *testing.T) { handle.Close(ctx) }, }, + { + name: "unblocked-when-reset", + unblockFn: func(ctx context.Context, handle kvflowcontrol.Handle) { + // Reset all streams on the handle; the call to .Admit() should + // unblock. + handle.ResetStreams(ctx) + }, + }, } { t.Run(tc.name, func(t *testing.T) { registry := metric.NewRegistry() clock := hlc.NewClockForTesting(nil) - controller := kvflowcontroller.New(registry, cluster.MakeTestingClusterSettings(), clock) - handle := kvflowhandle.New(controller, kvflowhandle.NewMetrics(registry), clock) + st := cluster.MakeTestingClusterSettings() + kvflowcontrol.Enabled.Override(ctx, &st.SV, true) + kvflowcontrol.Mode.Override(ctx, &st.SV, int64(kvflowcontrol.ApplyToAll)) + + controller := kvflowcontroller.New(registry, st, clock) + handle := kvflowhandle.New( + controller, + kvflowhandle.NewMetrics(registry), + clock, + roachpb.RangeID(1), + roachpb.SystemTenantID, + ) // Connect a single stream at pos=0 and deplete all 16MiB of regular // tokens at pos=1. @@ -105,3 +124,102 @@ func TestHandleAdmit(t *testing.T) { }) } } + +func TestFlowControlMode(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + stream := kvflowcontrol.Stream{ + TenantID: roachpb.MustMakeTenantID(42), + StoreID: roachpb.StoreID(42), + } + pos := func(d uint64) kvflowcontrolpb.RaftLogPosition { + return kvflowcontrolpb.RaftLogPosition{Term: 1, Index: d} + } + + for _, tc := range []struct { + mode kvflowcontrol.ModeT + blocks, ignores []admissionpb.WorkClass + }{ + { + mode: kvflowcontrol.ApplyToElastic, + blocks: []admissionpb.WorkClass{ + admissionpb.ElasticWorkClass, + }, + ignores: []admissionpb.WorkClass{ + admissionpb.RegularWorkClass, + }, + }, + { + mode: kvflowcontrol.ApplyToAll, + blocks: []admissionpb.WorkClass{ + admissionpb.ElasticWorkClass, admissionpb.RegularWorkClass, + }, + ignores: []admissionpb.WorkClass{}, + }, + } { + t.Run(tc.mode.String(), func(t *testing.T) { + registry := metric.NewRegistry() + clock := hlc.NewClockForTesting(nil) + st := cluster.MakeTestingClusterSettings() + kvflowcontrol.Enabled.Override(ctx, &st.SV, true) + kvflowcontrol.Mode.Override(ctx, &st.SV, int64(tc.mode)) + + controller := kvflowcontroller.New(registry, st, clock) + handle := kvflowhandle.New( + controller, + kvflowhandle.NewMetrics(registry), + clock, + roachpb.RangeID(1), + roachpb.SystemTenantID, + ) + defer handle.Close(ctx) + + // Connect a single stream at pos=0 and deplete all 16MiB of regular + // tokens at pos=1. It also puts elastic tokens in the -ve. + handle.ConnectStream(ctx, pos(0), stream) + handle.DeductTokensFor(ctx, admissionpb.NormalPri, pos(1), kvflowcontrol.Tokens(16<<20 /* 16MiB */)) + + // Invoke .Admit() for {regular,elastic} work in a separate + // goroutines, and test below whether the goroutines are blocked. + regularAdmitCh := make(chan struct{}) + elasticAdmitCh := make(chan struct{}) + go func() { + require.NoError(t, handle.Admit(ctx, admissionpb.NormalPri, time.Time{})) + close(regularAdmitCh) + }() + go func() { + require.NoError(t, handle.Admit(ctx, admissionpb.BulkNormalPri, time.Time{})) + close(elasticAdmitCh) + }() + + for _, ignoredClass := range tc.ignores { // work should not block + classAdmitCh := regularAdmitCh + if ignoredClass == admissionpb.ElasticWorkClass { + classAdmitCh = elasticAdmitCh + } + + select { + case <-classAdmitCh: + case <-time.After(5 * time.Second): + t.Fatalf("%s work didn't get admitted", ignoredClass) + } + } + + for _, blockedClass := range tc.blocks { // work should get blocked + classAdmitCh := regularAdmitCh + if blockedClass == admissionpb.ElasticWorkClass { + classAdmitCh = elasticAdmitCh + } + + select { + case <-classAdmitCh: + t.Fatalf("unexpectedly admitted %s work", blockedClass) + case <-time.After(10 * time.Millisecond): + } + } + }) + } + +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/noop.go b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/noop.go new file mode 100644 index 000000000000..5a311b265fa0 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowhandle/noop.go @@ -0,0 +1,63 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvflowhandle + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" +) + +// Noop is a no-op implementation of the kvflowcontrol.Handle interface. +type Noop struct{} + +var _ kvflowcontrol.Handle = Noop{} + +// Admit is part of the kvflowcontrol.Handle interface. +func (n Noop) Admit(ctx context.Context, priority admissionpb.WorkPriority, time time.Time) error { + return nil +} + +// DeductTokensFor is part of the kvflowcontrol.Handle interface. +func (n Noop) DeductTokensFor( + ctx context.Context, + priority admissionpb.WorkPriority, + position kvflowcontrolpb.RaftLogPosition, + tokens kvflowcontrol.Tokens, +) { +} + +// ReturnTokensUpto is part of the kvflowcontrol.Handle interface. +func (n Noop) ReturnTokensUpto( + ctx context.Context, + priority admissionpb.WorkPriority, + position kvflowcontrolpb.RaftLogPosition, + stream kvflowcontrol.Stream, +) { +} + +// ConnectStream is part of the kvflowcontrol.Handle interface. +func (n Noop) ConnectStream( + ctx context.Context, position kvflowcontrolpb.RaftLogPosition, stream kvflowcontrol.Stream, +) { +} + +// DisconnectStream is part of the kvflowcontrol.Handle interface. +func (n Noop) DisconnectStream(ctx context.Context, stream kvflowcontrol.Stream) {} + +// ResetStreams is part of the kvflowcontrol.Handle interface. +func (n Noop) ResetStreams(ctx context.Context) {} + +// Close is part of the kvflowcontrol.Handle interface. +func (n Noop) Close(ctx context.Context) {} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/simulation_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/simulation_test.go index d898c39ce43a..7bb66c32d45d 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/simulation_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/simulation_test.go @@ -176,7 +176,7 @@ func TestUsingSimulation(t *testing.T) { for _, line := range strings.Split(d.Input, "\n") { name := strings.TrimPrefix(strings.TrimSpace(line), "handle=") replicaHandles[name] = &replicaHandle{ - handle: kvflowhandle.New(controller, hmetrics, clock), + handle: kvflowhandle.New(controller, hmetrics, clock, roachpb.RangeID(0), roachpb.TenantID{}), deductionTracker: make(map[kvflowcontrol.Stream]*kvflowtokentracker.Tracker), outstandingReturns: make(map[kvflowcontrol.Stream]kvflowcontrol.Tokens), snapshots: make([]snapshot, 0), @@ -632,7 +632,7 @@ func (h *replicaHandle) deductTokens( h.quorumLogPosition.Index += 1 streams := h.handle.TestingDeductTokensForInner(ctx, pri, h.quorumLogPosition, tokens) for _, stream := range streams { - h.deductionTracker[stream].Track(ctx, pri, tokens, h.quorumLogPosition) + _ = h.deductionTracker[stream].Track(ctx, pri, tokens, h.quorumLogPosition) } } @@ -988,7 +988,7 @@ func (ht *handleOpTicker) tick(ctx context.Context, t time.Time) { case "connect": ht.replicaHandle.quorumLogPosition = ht.position ht.replicaHandle.handle.ConnectStream(ctx, ht.position, ht.stream) - ht.replicaHandle.deductionTracker[ht.stream] = kvflowtokentracker.New(ht.position, nil /* knobs */) + ht.replicaHandle.deductionTracker[ht.stream] = kvflowtokentracker.New(ht.position, ht.stream, nil /* knobs */) case "snapshot": ht.replicaHandle.snapshots = append(ht.replicaHandle.snapshots, snapshot{ time: t, diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/testdata/handle_stream_disconnection b/pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/testdata/handle_stream_disconnection index 60bbf31d8d9f..290b409a57cf 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/testdata/handle_stream_disconnection +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/testdata/handle_stream_disconnection @@ -31,13 +31,13 @@ simulate # # As soon as s3 is disconnected, we see a release of 16MiB of held tokens back # into the node-level controller (32MiB -> 48MiB). We see a burst in the number -# of stream-specific/controller-lvel requests bypassing Admit() due to the +# of stream-specific/controller-level requests bypassing Admit() due to the # stream disconnecting. At the handle-level this just appears as a burst in # admitted requests. After s3 disconnects, the handle-level admission rate goes # back to what it was before traffic was shaped by s3. # # TODO(irfansharif): The post-stream disconnection burst might lead to -# severe over-admission since it may have been long since we observed availble +# severe over-admission since it may have been long since we observed available # tokens for the still connected streams. In fact, many requests that started # waiting on the soon-to-be-disconnected-stream are in the same boat, all of # which will now get admitted. One thing we could do is to try and observe diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/BUILD.bazel index bd3536c6fec0..3c1bb6a39379 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "//pkg/kv/kvserver/kvflowcontrol", "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", "//pkg/util/admission/admissionpb", + "//pkg/util/buildutil", "//pkg/util/log", ], ) diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker.go b/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker.go index 5afc6aff7068..73578a1de596 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/log" ) @@ -32,6 +33,8 @@ type Tracker struct { // we ignore token deductions. lowerBound kvflowcontrolpb.RaftLogPosition + stream kvflowcontrol.Stream // used for logging only + knobs *kvflowcontrol.TestingKnobs } @@ -45,7 +48,11 @@ type tracked struct { // New constructs a new Tracker with the given lower bound raft log position // (below which we're not allowed to deduct tokens). -func New(lb kvflowcontrolpb.RaftLogPosition, knobs *kvflowcontrol.TestingKnobs) *Tracker { +func New( + lb kvflowcontrolpb.RaftLogPosition, + stream kvflowcontrol.Stream, + knobs *kvflowcontrol.TestingKnobs, +) *Tracker { if knobs == nil { knobs = &kvflowcontrol.TestingKnobs{} } @@ -53,6 +60,7 @@ func New(lb kvflowcontrolpb.RaftLogPosition, knobs *kvflowcontrol.TestingKnobs) trackedM: make(map[admissionpb.WorkPriority][]tracked), lowerBound: lb, knobs: knobs, + stream: stream, } } @@ -63,7 +71,7 @@ func (dt *Tracker) Track( pri admissionpb.WorkPriority, tokens kvflowcontrol.Tokens, pos kvflowcontrolpb.RaftLogPosition, -) { +) bool { if !(dt.lowerBound.Less(pos)) { // We're trying to track a token deduction at a position less than the // stream's lower-bound. Shout loudly but ultimately no-op. This @@ -78,23 +86,37 @@ func (dt *Tracker) Track( // Handle.ConnectStream). // - token returns upto some log position don't precede deductions at // lower log positions (see Handle.ReturnTokensUpto); - log.Errorf(ctx, "observed raft log position less than per-stream lower bound (%s <= %s)", + logFn := log.Errorf + if buildutil.CrdbTestBuild { + logFn = log.Fatalf + } + logFn(ctx, "observed raft log position less than per-stream lower bound (%s <= %s)", pos, dt.lowerBound) - return + return false } dt.lowerBound = pos if len(dt.trackedM[pri]) >= 1 { last := dt.trackedM[pri][len(dt.trackedM[pri])-1] if !last.position.Less(pos) { - log.Fatalf(ctx, "expected in order tracked log positions (%s < %s)", + logFn := log.Errorf + if buildutil.CrdbTestBuild { + logFn = log.Fatalf + } + logFn(ctx, "expected in order tracked log positions (%s < %s)", last.position, pos) + return false } } dt.trackedM[pri] = append(dt.trackedM[pri], tracked{ tokens: tokens, position: pos, }) + if log.ExpensiveLogEnabled(ctx, 1) { + log.Infof(ctx, "tracking %s flow control tokens for pri=%s stream=%s pos=%s", + tokens, pri, dt.stream, pos) + } + return true } // Untrack all token deductions of the given priority that have log positions @@ -136,8 +158,8 @@ func (dt *Tracker) Untrack( if len(dt.trackedM[pri]) > 0 { remaining = fmt.Sprintf(" (%s, ...)", dt.trackedM[pri][0].tokens) } - log.VInfof(ctx, 1, "released flow control tokens for %d/%d pri=%s tracked deductions, upto %s; %d tracked deduction(s) remain%s", - untracked, trackedBefore, pri, upto, len(dt.trackedM[pri]), remaining) + log.Infof(ctx, "released %s flow control tokens for %d out of %d tracked deductions for pri=%s stream=%s, up to %s; %d tracked deduction(s) remain%s", + tokens, untracked, trackedBefore, pri, dt.stream, upto, len(dt.trackedM[pri]), remaining) } if len(dt.trackedM[pri]) == 0 { delete(dt.trackedM, pri) @@ -161,6 +183,11 @@ func (dt *Tracker) Iter(_ context.Context, f func(admissionpb.WorkPriority, kvfl } } +// LowerBound returns the log position below which we ignore token deductions. +func (dt *Tracker) LowerBound() kvflowcontrolpb.RaftLogPosition { + return dt.lowerBound +} + // TestingIter is a testing-only re-implementation of Iter. It iterates through // all tracked token deductions, invoking the provided callback with tracked // pri<->token<->position triples. diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go b/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go index 7793501e0b75..58d4ed4063b5 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker_test.go @@ -39,7 +39,7 @@ func TestTracker(t *testing.T) { datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { case "init": - tracker = New(kvflowcontrolpb.RaftLogPosition{Term: 1, Index: 0}, knobs) + tracker = New(kvflowcontrolpb.RaftLogPosition{Term: 1, Index: 0}, kvflowcontrol.Stream{}, knobs) return "" case "track": @@ -81,7 +81,7 @@ func TestTracker(t *testing.T) { t.Fatalf("unrecognized prefix: %s", parts[i]) } } - tracker.Track(ctx, pri, tokens, logPosition) + require.True(t, tracker.Track(ctx, pri, tokens, logPosition)) } return "" diff --git a/pkg/kv/kvserver/raftlog/encoding_test.go b/pkg/kv/kvserver/raftlog/encoding_test.go index 0e5734d9d5bd..1d22660bf411 100644 --- a/pkg/kv/kvserver/raftlog/encoding_test.go +++ b/pkg/kv/kvserver/raftlog/encoding_test.go @@ -93,7 +93,7 @@ func BenchmarkRaftAdmissionMetaOverhead(b *testing.B) { // 4. If using below-raft admission, decode the raft // metadata. if withRaftAdmissionMeta { - _, err = DecodeRaftAdmissionMeta(encodingBuf) + _, err := DecodeRaftAdmissionMeta(encodingBuf) require.NoError(b, err) } diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 97b5637832a5..0a3645cd89ba 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -801,6 +801,16 @@ type Replica struct { pausedFollowers map[roachpb.ReplicaID]struct{} slowProposalCount int64 // updated in refreshProposalsLocked + + // replicaFlowControlIntegration is used to interface with replication flow + // control. It's backed by the node-level kvflowcontrol.Controller that + // manages flow tokens for on a per basis, which it + // interfaces through a replica-level kvflowcontrol.Handle. It's + // actively used on replicas initiating replication traffic, i.e. are + // both the leaseholder and raft leader. + // + // Accessing it requires Replica.mu to be held, exclusively. + replicaFlowControlIntegration replicaFlowControlIntegration } // The raft log truncations that are pending. Access is protected by its own diff --git a/pkg/kv/kvserver/replica_app_batch.go b/pkg/kv/kvserver/replica_app_batch.go index 706d945e80b1..6e588aac1c12 100644 --- a/pkg/kv/kvserver/replica_app_batch.go +++ b/pkg/kv/kvserver/replica_app_batch.go @@ -250,7 +250,12 @@ func (b *replicaAppBatch) runPostAddTriggersReplicaOnly( // We don't track these stats in standalone log application since they depend // on whether the proposer is still waiting locally, and this concept does not // apply in a standalone context. - if !cmd.IsLocal() { + // + // TODO(irfansharif): This code block can be removed once below-raft + // admission control is the only form of IO admission control. It pre-dates + // it -- these stats were previously used to deduct IO tokens for follower + // writes/ingests without waiting. + if !cmd.IsLocal() && !cmd.ApplyAdmissionControl() { writeBytes, ingestedBytes := cmd.getStoreWriteByteSizes() b.followerStoreWriteBytes.NumEntries++ b.followerStoreWriteBytes.WriteBytes += writeBytes diff --git a/pkg/kv/kvserver/replica_application_cmd.go b/pkg/kv/kvserver/replica_application_cmd.go index d82eddffdc86..ae2cc0a228f3 100644 --- a/pkg/kv/kvserver/replica_application_cmd.go +++ b/pkg/kv/kvserver/replica_application_cmd.go @@ -71,6 +71,12 @@ func (c *replicatedCmd) IsLocal() bool { return c.proposal != nil } +// ApplyAdmissionControl indicates whether the command should be +// subject to replication admission control. +func (c *replicatedCmd) ApplyAdmissionControl() bool { + return c.Entry.ApplyAdmissionControl +} + // Ctx implements apply.Command. func (c *replicatedCmd) Ctx() context.Context { return c.ctx diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index aac8f3ae81b2..769e85bc6fde 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -281,6 +281,10 @@ func (r *Replica) tryReproposeWithNewLeaseIndex( // Some tests check for this log message in the trace. log.VEventf(ctx, 2, "retry: proposalIllegalLeaseIndex") + // See I7 from kvflowcontrol/doc.go: we don't re-deduct flow tokens on + // reproposals. + p.raftAdmissionMeta = nil + pErr := r.propose(ctx, p, tok.Move(ctx)) if pErr != nil { return pErr diff --git a/pkg/kv/kvserver/replica_destroy.go b/pkg/kv/kvserver/replica_destroy.go index 26a3ed2a6ed6..8a836067ac66 100644 --- a/pkg/kv/kvserver/replica_destroy.go +++ b/pkg/kv/kvserver/replica_destroy.go @@ -162,7 +162,7 @@ func (r *Replica) destroyRaftMuLocked(ctx context.Context, nextReplicaID roachpb // disconnectReplicationRaftMuLocked is called when a Replica is being removed. // It cancels all outstanding proposals, closes the proposalQuota if there -// is one, and removes the in-memory raft state. +// is one, releases all held flow tokens, and removes the in-memory raft state. func (r *Replica) disconnectReplicationRaftMuLocked(ctx context.Context) { r.raftMu.AssertHeld() r.mu.Lock() @@ -174,6 +174,7 @@ func (r *Replica) disconnectReplicationRaftMuLocked(ctx context.Context) { if pq := r.mu.proposalQuota; pq != nil { pq.Close("destroyed") } + r.mu.replicaFlowControlIntegration.onReplicaDestroyed(ctx) r.mu.proposalBuf.FlushLockedWithoutProposing(ctx) for _, p := range r.mu.proposals { r.cleanupFailedProposalLocked(p) diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 66560723ee4d..c4d183bf3ae2 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -161,6 +161,10 @@ func newUninitializedReplica( r.breaker = newReplicaCircuitBreaker( store.cfg.Settings, store.stopper, r.AmbientContext, r, onTrip, onReset, ) + r.mu.replicaFlowControlIntegration = newReplicaFlowControlIntegration( + (*replicaFlowControl)(r), + makeStoreFlowControlHandleFactory(r.store), + ) return r } @@ -395,6 +399,7 @@ func (r *Replica) setDescLockedRaftMuLocked(ctx context.Context, desc *roachpb.R r.connectionClass.set(rpc.ConnectionClassForKey(desc.StartKey)) r.concMgr.OnRangeDescUpdated(desc) r.mu.state.Desc = desc + r.mu.replicaFlowControlIntegration.onDescChanged(ctx) // Give the liveness and meta ranges high priority in the Raft scheduler, to // avoid head-of-line blocking and high scheduling latency. diff --git a/pkg/kv/kvserver/replica_init_test.go b/pkg/kv/kvserver/replica_init_test.go index cc2cae9389a6..c3109cfa39c3 100644 --- a/pkg/kv/kvserver/replica_init_test.go +++ b/pkg/kv/kvserver/replica_init_test.go @@ -64,6 +64,7 @@ func TestReplicaUpdateLastReplicaAdded(t *testing.T) { var r Replica r.mu.state.Desc = &c.oldDesc r.mu.lastReplicaAdded = c.lastReplicaAdded + r.mu.replicaFlowControlIntegration = newReplicaFlowControlIntegration((*replicaFlowControl)(&r), nil) r.store = tc.store r.concMgr = tc.repl.concMgr r.setDescRaftMuLocked(context.Background(), &c.newDesc) diff --git a/pkg/kv/kvserver/replica_proposal.go b/pkg/kv/kvserver/replica_proposal.go index ea03b019b5d7..e0e3e7aa5e22 100644 --- a/pkg/kv/kvserver/replica_proposal.go +++ b/pkg/kv/kvserver/replica_proposal.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" @@ -125,6 +126,16 @@ type ProposalData struct { // tok identifies the request to the propBuf. Once the proposal is made, the // token will be used to stop tracking this request. tok TrackedRequestToken + + // raftAdmissionMeta captures the metadata we encode as part of the command + // when first proposed for replication admission control. + raftAdmissionMeta *kvflowcontrolpb.RaftAdmissionMeta +} + +// useReplicationAdmissionControl indicates whether this raft command should +// be subject to replication admission control. +func (proposal *ProposalData) useReplicationAdmissionControl() bool { + return proposal.raftAdmissionMeta != nil } // finishApplication is called when a command application has finished. The diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index ee232f57ac93..d97220444277 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -12,16 +12,21 @@ package kvserver import ( "context" + "fmt" "sync" "sync/atomic" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowhandle" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/errorutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -132,10 +137,16 @@ type rangeLeaderInfo struct { leaderEligibleForLease bool } +type admitEntHandle struct { + handle *kvflowcontrolpb.RaftAdmissionMeta + pCtx context.Context +} + // A proposer is an object that uses a propBuf to coordinate Raft proposals. type proposer interface { locker() sync.Locker rlocker() sync.Locker + // The following require the proposer to hold (at least) a shared lock. getReplicaID() roachpb.ReplicaID destroyed() destroyStatus @@ -146,6 +157,7 @@ type proposer interface { leaderStatus(ctx context.Context, raftGroup proposerRaft) rangeLeaderInfo ownsValidLease(ctx context.Context, now hlc.ClockTimestamp) bool shouldCampaignOnRedirect(raftGroup proposerRaft) bool + flowControlHandle(ctx context.Context) kvflowcontrol.Handle // The following require the proposer to hold an exclusive lock. withGroupLocked(func(proposerRaft) error) error @@ -407,6 +419,11 @@ func (b *propBuf) FlushLockedWithRaftGroup( buf := b.arr.asSlice()[:used] ents := make([]raftpb.Entry, 0, used) + // Use this slice to track, for each entry that's proposed to raft, whether + // it's subject to replication admission control. Updated in tandem with + // slice above. + admitHandles := make([]admitEntHandle, 0, used) + // Compute the closed timestamp target, which will be used to assign a closed // timestamp to all proposals in this batch. closedTSTarget := b.p.closedTimestampTarget() @@ -490,11 +507,17 @@ func (b *propBuf) FlushLockedWithRaftGroup( // Flush any previously batched (non-conf change) proposals to // preserve the correct ordering or proposals. Later proposals // will start a new batch. - if err := proposeBatch(raftGroup, b.p.getReplicaID(), ents); err != nil { - firstErr = err + propErr, dropped := proposeBatch(raftGroup, b.p.getReplicaID(), ents) + if propErr != nil { + firstErr = propErr continue } + if !dropped { + b.maybeDeductFlowTokens(ctx, admitHandles, ents) + } + ents = ents[len(ents):] + admitHandles = admitHandles[len(admitHandles):] confChangeCtx := kvserverpb.ConfChangeContext{ CommandID: string(p.idKey), @@ -536,12 +559,38 @@ func (b *propBuf) FlushLockedWithRaftGroup( Data: p.encodedCommand, }) log.VEvent(p.ctx, 2, "flushing proposal to Raft") + + // We don't want deduct flow tokens for reproposed commands, and of + // course for proposals that didn't integrate with kvflowcontrol. + shouldAdmit := p.createdAtTicks == p.proposedAtTicks && !reproposal && p.raftAdmissionMeta != nil + if !shouldAdmit { + admitHandles = append(admitHandles, admitEntHandle{}) + } else { + admitHandles = append(admitHandles, admitEntHandle{ + handle: p.raftAdmissionMeta, + pCtx: p.ctx, + }) + } } } if firstErr != nil { return 0, firstErr } - return used, proposeBatch(raftGroup, b.p.getReplicaID(), ents) + + propErr, dropped := proposeBatch(raftGroup, b.p.getReplicaID(), ents) + if propErr == nil && !dropped { + // Now that we know what raft log position[1] this proposal is to end up + // in, deduct flow tokens for it. This is done without blocking (we've + // already waited for available flow tokens pre-evaluation). The tokens + // will later be returned once we're informed of the entry being + // admitted below raft. + // + // [1]: We're relying on an undocumented side effect of upstream raft + // API where it populates the index and term for the passed in + // slice of entries. See etcd-io/raft#57. + b.maybeDeductFlowTokens(ctx, admitHandles, ents) + } + return used, propErr } // maybeRejectUnsafeProposalLocked conditionally rejects proposals that are @@ -900,9 +949,11 @@ func (b *propBuf) forwardClosedTimestampLocked(closedTS hlc.Timestamp) bool { return b.assignedClosedTimestamp.Forward(closedTS) } -func proposeBatch(raftGroup proposerRaft, replID roachpb.ReplicaID, ents []raftpb.Entry) error { +func proposeBatch( + raftGroup proposerRaft, replID roachpb.ReplicaID, ents []raftpb.Entry, +) (_ error, dropped bool) { if len(ents) == 0 { - return nil + return nil, false } if err := raftGroup.Step(raftpb.Message{ Type: raftpb.MsgProp, @@ -913,11 +964,41 @@ func proposeBatch(raftGroup proposerRaft, replID roachpb.ReplicaID, ents []raftp // ignored prior to the introduction of ErrProposalDropped). // TODO(bdarnell): Handle ErrProposalDropped better. // https://github.com/cockroachdb/cockroach/issues/21849 - return nil + return nil, true } else if err != nil { - return err + return err, false + } + return nil, false +} + +func (b *propBuf) maybeDeductFlowTokens( + ctx context.Context, admitHandles []admitEntHandle, ents []raftpb.Entry, +) { + if len(admitHandles) != len(ents) || cap(admitHandles) != cap(ents) { + panic( + fmt.Sprintf("mismatched slice sizes: len(admit)=%d len(ents)=%d cap(admit)=%d cap(ents)=%d", + len(admitHandles), len(ents), cap(admitHandles), cap(ents)), + ) + } + for i, admitHandle := range admitHandles { + if admitHandle.handle == nil { + continue // nothing to do + } + log.VInfof(ctx, 1, "bound index/log terms for proposal entry: %s", + raft.DescribeEntry(ents[i], func(bytes []byte) string { + return "" + }), + ) + b.p.flowControlHandle(ctx).DeductTokensFor( + admitHandle.pCtx, + admissionpb.WorkPriority(admitHandle.handle.AdmissionPriority), + kvflowcontrolpb.RaftLogPosition{ + Term: ents[i].Term, + Index: ents[i].Index, + }, + kvflowcontrol.Tokens(int64(len(ents[i].Data))), + ) } - return nil } // FlushLockedWithoutProposing is like FlushLockedWithRaftGroup but it does not @@ -1259,6 +1340,14 @@ func (rp *replicaProposer) shouldCampaignOnRedirect(raftGroup proposerRaft) bool ) } +func (rp *replicaProposer) flowControlHandle(ctx context.Context) kvflowcontrol.Handle { + handle, found := rp.mu.replicaFlowControlIntegration.handle() + if !found { + return kvflowhandle.Noop{} + } + return handle +} + // rejectProposalWithRedirectLocked is part of the proposer interface. func (rp *replicaProposer) rejectProposalWithRedirectLocked( ctx context.Context, prop *ProposalData, redirectTo roachpb.ReplicaID, diff --git a/pkg/kv/kvserver/replica_proposal_buf_test.go b/pkg/kv/kvserver/replica_proposal_buf_test.go index 8cf5bb470540..161b0892d25d 100644 --- a/pkg/kv/kvserver/replica_proposal_buf_test.go +++ b/pkg/kv/kvserver/replica_proposal_buf_test.go @@ -21,12 +21,15 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -139,6 +142,9 @@ func (t *testProposer) locker() sync.Locker { func (t *testProposer) rlocker() sync.Locker { return t.RWMutex.RLocker() } +func (t *testProposer) flowControlHandle(ctx context.Context) kvflowcontrol.Handle { + return &testFlowTokenHandle{} +} func (t *testProposer) getReplicaID() roachpb.ReplicaID { return 1 @@ -1047,3 +1053,43 @@ func (t mockTracker) Count() int { } var _ tracker.Tracker = mockTracker{} + +type testFlowTokenHandle struct{} + +var _ kvflowcontrol.Handle = &testFlowTokenHandle{} + +func (t *testFlowTokenHandle) Admit( + ctx context.Context, priority admissionpb.WorkPriority, t2 time.Time, +) error { + return nil +} + +func (t *testFlowTokenHandle) DeductTokensFor( + ctx context.Context, + priority admissionpb.WorkPriority, + position kvflowcontrolpb.RaftLogPosition, + tokens kvflowcontrol.Tokens, +) { +} + +func (t *testFlowTokenHandle) ReturnTokensUpto( + ctx context.Context, + priority admissionpb.WorkPriority, + position kvflowcontrolpb.RaftLogPosition, + stream kvflowcontrol.Stream, +) { +} + +func (t *testFlowTokenHandle) ConnectStream( + ctx context.Context, position kvflowcontrolpb.RaftLogPosition, stream kvflowcontrol.Stream, +) { +} + +func (t *testFlowTokenHandle) DisconnectStream(ctx context.Context, stream kvflowcontrol.Stream) { +} + +func (t *testFlowTokenHandle) ResetStreams(ctx context.Context) { +} + +func (t *testFlowTokenHandle) Close(ctx context.Context) { +} diff --git a/pkg/kv/kvserver/replica_proposal_quota.go b/pkg/kv/kvserver/replica_proposal_quota.go index 33b1b5f9adc1..fb61391d8bf1 100644 --- a/pkg/kv/kvserver/replica_proposal_quota.go +++ b/pkg/kv/kvserver/replica_proposal_quota.go @@ -120,6 +120,7 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( ) r.mu.lastUpdateTimes = make(map[roachpb.ReplicaID]time.Time) r.mu.lastUpdateTimes.updateOnBecomeLeader(r.mu.state.Desc.Replicas().Descriptors(), timeutil.Now()) + r.mu.replicaFlowControlIntegration.onBecameLeader(ctx) r.mu.lastProposalAtTicks = r.mu.ticks // delay imminent quiescence } else if r.mu.proposalQuota != nil { // We're becoming a follower. @@ -130,6 +131,7 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( r.mu.quotaReleaseQueue = nil r.mu.proposalQuota = nil r.mu.lastUpdateTimes = nil + r.mu.replicaFlowControlIntegration.onBecameFollower(ctx) } return } else if r.mu.proposalQuota == nil { @@ -162,10 +164,10 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( // Only consider followers that are active. Inactive ones don't decrease // minIndex - i.e. they don't hold up releasing quota. // - // The policy for determining who's active is more strict than the one used - // for purposes of quiescing. Failure to consider a dead/stuck node as such - // for the purposes of releasing quota can have bad consequences (writes - // will stall), whereas for quiescing the downside is lower. + // The policy for determining who's active is stricter than the one used + // for purposes of quiescing. Failure to consider a dead/stuck node as + // such for the purposes of releasing quota can have bad consequences + // (writes will stall), whereas for quiescing the downside is lower. if !r.mu.lastUpdateTimes.isFollowerActiveSince( ctx, rep.ReplicaID, now, r.store.cfg.RangeLeaseDuration, @@ -260,4 +262,5 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( r.mu.proposalQuotaBaseIndex, len(r.mu.quotaReleaseQueue), releasableIndex, status.Applied) } + r.mu.replicaFlowControlIntegration.onProposalQuotaUpdated(ctx) } diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index eaab212932f2..298aa3db3261 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -23,6 +23,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" @@ -33,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/envutil" @@ -227,6 +230,10 @@ func (r *Replica) evalAndPropose( // Continue with proposal... } + if meta := kvflowcontrol.MetaFromContext(ctx); meta != nil { + proposal.raftAdmissionMeta = meta + } + // Attach information about the proposer's lease to the command, for // verification below raft. Lease requests are special since they are not // necessarily proposed under a valid lease (by necessity). Instead, they @@ -377,6 +384,9 @@ func (r *Replica) propose( // Determine the encoding style for the Raft command. prefix := true entryEncoding := raftlog.EntryEncodingStandardWithoutAC + if p.useReplicationAdmissionControl() { + entryEncoding = raftlog.EntryEncodingStandardWithAC + } if crt := p.command.ReplicatedEvalResult.ChangeReplicas; crt != nil { // EndTxnRequest with a ChangeReplicasTrigger is special because Raft // needs to understand it; it cannot simply be an opaque command. To @@ -451,6 +461,9 @@ func (r *Replica) propose( } else if p.command.ReplicatedEvalResult.AddSSTable != nil { log.VEvent(p.ctx, 4, "sideloadable proposal detected") entryEncoding = raftlog.EntryEncodingSideloadedWithoutAC + if p.useReplicationAdmissionControl() { + entryEncoding = raftlog.EntryEncodingSideloadedWithAC + } r.store.metrics.AddSSTableProposals.Inc(1) if p.command.ReplicatedEvalResult.AddSSTable.Data == nil { @@ -460,12 +473,25 @@ func (r *Replica) propose( log.Infof(p.ctx, "proposing command %x: %s", p.idKey, p.Request.Summary()) } + // NB: If (significantly) re-working how raft commands are encoded, make the + // equivalent change in raftlog.BenchmarkRaftAdmissionMetaOverhead. + // Create encoding buffer. preLen := 0 if prefix { preLen = raftlog.RaftCommandPrefixLen } - cmdLen := p.command.Size() + + raftAdmissionMeta := &kvflowcontrolpb.RaftAdmissionMeta{} + var admissionMetaLen int + if p.useReplicationAdmissionControl() { + // Encode admission metadata data at the start, right after the command + // prefix. + raftAdmissionMeta = p.raftAdmissionMeta + admissionMetaLen = raftAdmissionMeta.Size() + } + + cmdLen := p.command.Size() + admissionMetaLen // Allocate the data slice with enough capacity to eventually hold the two // "footers" that are filled later. needed := preLen + cmdLen + kvserverpb.MaxRaftCommandFooterSize() @@ -474,9 +500,40 @@ func (r *Replica) propose( if prefix { raftlog.EncodeRaftCommandPrefix(data, entryEncoding, p.idKey) } - // Encode body of command. + + // Encode the body of the command. data = data[:preLen+cmdLen] - if _, err := protoutil.MarshalTo(p.command, data[preLen:]); err != nil { + + // Encode below-raft admission data, if any. + if p.useReplicationAdmissionControl() { + if !prefix { + panic("expected to encode prefix for raft commands using replication admission control") + } + if buildutil.CrdbTestBuild { + if p.raftAdmissionMeta.AdmissionOriginNode == roachpb.NodeID(0) { + log.Fatalf(ctx, "missing origin node for flow token returns") + } + } + if _, err := protoutil.MarshalTo( + raftAdmissionMeta, + data[preLen:preLen+admissionMetaLen], + ); err != nil { + return kvpb.NewError(err) + } + log.VInfof(ctx, 1, "encoded raft admission meta: pri=%s create-time=%d proposer=n%s", + admissionpb.WorkPriority(raftAdmissionMeta.AdmissionPriority), + raftAdmissionMeta.AdmissionCreateTime, + raftAdmissionMeta.AdmissionOriginNode, + ) + // Zero out what we've already encoded and marshaled, out of an + // abundance of paranoia. + p.command.AdmissionPriority = 0 + p.command.AdmissionCreateTime = 0 + p.command.AdmissionOriginNode = 0 + } + + // Encode the rest of the command. + if _, err := protoutil.MarshalTo(p.command, data[preLen+admissionMetaLen:]); err != nil { return kvpb.NewError(err) } p.encodedCommand = data @@ -982,6 +1039,20 @@ func (r *Replica) handleRaftReadyRaftMuLocked( } m := logstore.MakeMsgStorageAppend(msgStorageAppend) cb := (*replicaSyncCallback)(r) + if r.IsInitialized() && r.store.cfg.KVAdmissionController != nil { + // Enqueue raft log entries into admission queues. This is + // non-blocking; actual admission happens asynchronously. + tenantID, _ := r.TenantID() + for _, entry := range msgStorageAppend.Entries { + if len(entry.Data) == 0 { + continue // nothing to do + } + r.store.cfg.KVAdmissionController.AdmitRaftEntry( + ctx, tenantID, r.StoreID(), r.RangeID, entry, + ) + } + } + if state, err = s.StoreEntries(ctx, state, m, cb, &stats.append); err != nil { return stats, errors.Wrap(err, "while storing log entries") } @@ -1438,7 +1509,7 @@ func (r *Replica) refreshProposalsLocked( return } - log.VInfof(ctx, 1, + log.VInfof(ctx, 2, "pending commands: reproposing %d (at applied index %d, lease applied index %d) %s", len(reproposals), r.mu.state.RaftAppliedIndex, r.mu.state.LeaseAppliedIndex, reason) diff --git a/pkg/kv/kvserver/replica_raft_overload.go b/pkg/kv/kvserver/replica_raft_overload.go index a3ab9ee30757..04d829151d3b 100644 --- a/pkg/kv/kvserver/replica_raft_overload.go +++ b/pkg/kv/kvserver/replica_raft_overload.go @@ -374,4 +374,5 @@ func (r *Replica) updatePausedFollowersLocked(ctx context.Context, ioThresholdMa // with more wasted work. r.mu.internalRaftGroup.ReportUnreachable(uint64(replicaID)) } + r.mu.replicaFlowControlIntegration.onFollowersPaused(ctx) } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index da9ad9a3a504..e5b7da4adf43 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -43,6 +43,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/idalloc" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowhandle" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" @@ -1143,8 +1145,15 @@ type StoreConfig struct { // SpanConfigsDisabled is unset. SpanConfigSubscriber spanconfig.KVSubscriber - // KVAdmissionController is an optional field used for admission control. + // KVAdmissionController is used for admission control. KVAdmissionController kvadmission.Controller + // KVFlowController is used for replication admission control. + KVFlowController kvflowcontrol.Controller + // KVFlowHandles is used for replication admission control. + KVFlowHandles kvflowcontrol.Handles + // KVFlowHandleMetrics is a shared metrics struct for all + // kvflowcontrol.Handles. + KVFlowHandleMetrics *kvflowhandle.Metrics // SchedulerLatencyListener listens in on scheduling latencies, information // that's then used to adjust various admission control components (like how @@ -2572,7 +2581,7 @@ func ReadMaxHLCUpperBound(ctx context.Context, engines []storage.Engine) (int64, // GetReplica fetches a replica by Range ID. Returns an error if no replica is found. // -// See also GetReplicaIfExists for a more perfomant version. +// See also GetReplicaIfExists for a more performant version. func (s *Store) GetReplica(rangeID roachpb.RangeID) (*Replica, error) { if r := s.GetReplicaIfExists(rangeID); r != nil { return r, nil diff --git a/pkg/roachpb/metadata_replicas.go b/pkg/roachpb/metadata_replicas.go index 1217ed878167..9c764568c9e0 100644 --- a/pkg/roachpb/metadata_replicas.go +++ b/pkg/roachpb/metadata_replicas.go @@ -483,6 +483,26 @@ func (d ReplicaSet) ReplicationTargets() (out []ReplicationTarget) { return out } +// Difference compares two sets of replicas, returning the replica descriptors +// that were added and removed when going from one to the other. 'd' is the before +// state, 'o' is the one after. +func (d ReplicaSet) Difference(o ReplicaSet) (added, removed []ReplicaDescriptor) { + return o.Subtract(d), d.Subtract(o) +} + +// Subtract one sets of replicas from another. This returning the replica +// descriptors that were present in the original and not the other. 'd' is the +// original set of descriptors, 'o' is the other. +func (d ReplicaSet) Subtract(o ReplicaSet) []ReplicaDescriptor { + var repls []ReplicaDescriptor + for _, repl := range d.Descriptors() { + if _, found := o.GetReplicaDescriptorByID(repl.ReplicaID); !found { + repls = append(repls, repl) + } + } + return repls +} + // IsAddition returns true if `c` refers to a replica addition operation. func (c ReplicaChangeType) IsAddition() bool { switch c { diff --git a/pkg/roachpb/metadata_replicas_test.go b/pkg/roachpb/metadata_replicas_test.go index c63d877e0a98..90472940af9f 100644 --- a/pkg/roachpb/metadata_replicas_test.go +++ b/pkg/roachpb/metadata_replicas_test.go @@ -371,3 +371,43 @@ func TestReplicaDescriptorsCanMakeProgressRandom(t *testing.T) { log.Infof(ctx, "progress: %d cases. no progress: %d cases. skipped: %d cases.", progress, noProgress, skipped) } + +func TestReplicaSetOperations(t *testing.T) { + rs := func(ids ...uint64) ReplicaSet { + replicas := make([]ReplicaDescriptor, 0, len(ids)) + for _, id := range ids { + replicas = append(replicas, rd(VOTER_FULL, id)) + } + return MakeReplicaSet(replicas) + } + var empty []ReplicaDescriptor + t.Run("subtract", func(t *testing.T) { + require.Equal(t, rs(1, 2, 3).Subtract(rs(2, 3)), rs(1).Descriptors()) + require.Equal(t, rs(1, 2, 3).Subtract(rs()), rs(1, 2, 3).Descriptors()) + require.Equal(t, rs(1, 2, 3).Subtract(rs(4, 5, 6)), rs(1, 2, 3).Descriptors()) + require.Equal(t, rs(1, 2).Subtract(rs(6, 1)), rs(2).Descriptors()) + require.Equal(t, rs().Subtract(rs(6, 1)), empty) + }) + t.Run("difference", func(t *testing.T) { + { // {1,2,3}.difference({2,3,4}) + added, removed := rs(1, 2, 3).Difference(rs(2, 3, 4)) + require.Equal(t, added, rs(4).Descriptors()) + require.Equal(t, removed, rs(1).Descriptors()) + } + { // {1,2,3}.difference({1,2,3}) + added, removed := rs(1, 2, 3).Difference(rs(1, 2, 3)) + require.Equal(t, added, empty) + require.Equal(t, removed, empty) + } + { // {}.difference({1,2,3}) + added, removed := rs().Difference(rs(1, 2, 3)) + require.Equal(t, added, rs(1, 2, 3).Descriptors()) + require.Equal(t, removed, empty) + } + { // {1,2,3}.difference({}) + added, removed := rs(1, 2, 3).Difference(rs()) + require.Equal(t, added, empty) + require.Equal(t, removed, rs(1, 2, 3).Descriptors()) + } + }) +} diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 219b1e095f71..1b2314975c67 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "addjoin.go", "admin.go", "admin_test_utils.go", + "admission.go", "api_v2.go", "api_v2_auth.go", "api_v2_error.go", @@ -126,7 +127,11 @@ go_library( "//pkg/kv/kvserver/closedts/ctpb", "//pkg/kv/kvserver/closedts/sidetransport", "//pkg/kv/kvserver/kvadmission", + "//pkg/kv/kvserver/kvflowcontrol", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", "//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch", + "//pkg/kv/kvserver/kvflowcontrol/kvflowhandle", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/kvserverpb", "//pkg/kv/kvserver/kvstorage", diff --git a/pkg/server/admission.go b/pkg/server/admission.go new file mode 100644 index 000000000000..e738c6ad1eb4 --- /dev/null +++ b/pkg/server/admission.go @@ -0,0 +1,55 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package server + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/admission" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" +) + +type admittedLogEntryAdaptor struct { + dispatchWriter kvflowcontrol.DispatchWriter +} + +var _ admission.OnLogEntryAdmitted = &admittedLogEntryAdaptor{} + +func newAdmittedLogEntryAdaptor( + dispatchWriter kvflowcontrol.DispatchWriter, +) *admittedLogEntryAdaptor { + return &admittedLogEntryAdaptor{ + dispatchWriter: dispatchWriter, + } +} + +// AdmittedLogEntry implements the admission.OnLogEntryAdmitted interface. +func (a *admittedLogEntryAdaptor) AdmittedLogEntry( + ctx context.Context, + origin roachpb.NodeID, + pri admissionpb.WorkPriority, + storeID roachpb.StoreID, + rangeID roachpb.RangeID, + pos admission.LogPosition, +) { + a.dispatchWriter.Dispatch(ctx, origin, kvflowcontrolpb.AdmittedRaftLogEntries{ + RangeID: rangeID, + AdmissionPriority: int32(pri), + UpToRaftLogPosition: kvflowcontrolpb.RaftLogPosition{ + Term: pos.Term, + Index: pos.Index, + }, + StoreID: storeID, + }) +} diff --git a/pkg/server/node.go b/pkg/server/node.go index 33a795f8d16f..bdd44d6c1a3e 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -433,10 +433,6 @@ func NewNode( testingErrorEvent: cfg.TestingKnobs.TestingResponseErrorEvent, spanStatsCollector: spanstatscollector.New(cfg.Settings), } - n.storeCfg.KVAdmissionController = kvadmission.MakeController( - kvAdmissionQ, elasticCPUGrantCoord, storeGrantCoords, cfg.Settings, - ) - n.storeCfg.SchedulerLatencyListener = elasticCPUGrantCoord.SchedulerLatencyListener n.perReplicaServer = kvserver.MakeServer(&n.Descriptor, n.stores) return n } @@ -1176,9 +1172,7 @@ func (n *Node) batchInternal( if err != nil { return nil, err } - if handle.ElasticCPUWorkHandle != nil { - ctx = admission.ContextWithElasticCPUWorkHandle(ctx, handle.ElasticCPUWorkHandle) - } + ctx = handle.AnnotateCtx(ctx) var writeBytes *kvadmission.StoreWriteBytes defer func() { diff --git a/pkg/server/server.go b/pkg/server/server.go index 9fd7b2dd7924..f6c4d714ad39 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -43,7 +43,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/sidetransport" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontroller" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowhandle" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" @@ -271,7 +275,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { if opts, ok := cfg.TestingKnobs.AdmissionControl.(*admission.Options); ok { admissionOptions.Override(opts) } - gcoords := admission.NewGrantCoordinators(cfg.AmbientCtx, st, admissionOptions, registry, &admission.NoopOnLogEntryAdmitted{}) engines, err := cfg.CreateEngines(ctx) if err != nil { @@ -307,7 +310,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { rpcCtxOpts := rpc.ContextOptions{ TenantID: roachpb.SystemTenantID, UseNodeAuth: true, - NodeID: cfg.IDContainer, + NodeID: nodeIDContainer, StorageClusterID: cfg.ClusterIDContainer, Config: cfg.Config, Clock: clock.WallClock(), @@ -455,17 +458,10 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { } tcsFactory := kvcoord.NewTxnCoordSenderFactory(txnCoordSenderFactoryCfg, distSender) - cbID := goschedstats.RegisterRunnableCountCallback(gcoords.Regular.CPULoad) - stopper.AddCloser(stop.CloserFn(func() { - goschedstats.UnregisterRunnableCountCallback(cbID) - })) - stopper.AddCloser(gcoords) - dbCtx := kv.DefaultDBContext(stopper) dbCtx.NodeID = idContainer dbCtx.Stopper = stopper db := kv.NewDBWithContext(cfg.AmbientCtx, tcsFactory, clock, dbCtx) - db.SQLKVResponseAdmissionQ = gcoords.Regular.GetWorkQueue(admission.SQLKVResponseWork) nlActive, nlRenewal := cfg.NodeLivenessDurations() if knobs := cfg.TestingKnobs.NodeLiveness; knobs != nil { @@ -550,6 +546,47 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { storesForFlowControl := kvserver.MakeStoresForFlowControl(stores) kvflowTokenDispatch := kvflowdispatch.New(registry, storesForFlowControl, nodeIDContainer) + admittedEntryAdaptor := newAdmittedLogEntryAdaptor(kvflowTokenDispatch) + gcoords := admission.NewGrantCoordinators( + cfg.AmbientCtx, + st, + admissionOptions, + registry, + admittedEntryAdaptor, + ) + db.SQLKVResponseAdmissionQ = gcoords.Regular.GetWorkQueue(admission.SQLKVResponseWork) + cbID := goschedstats.RegisterRunnableCountCallback(gcoords.Regular.CPULoad) + stopper.AddCloser(stop.CloserFn(func() { + goschedstats.UnregisterRunnableCountCallback(cbID) + })) + stopper.AddCloser(gcoords) + + var admissionControl struct { + schedulerLatencyListener admission.SchedulerLatencyListener + kvflowController kvflowcontrol.Controller + kvflowTokenDispatch kvflowcontrol.Dispatch + kvAdmissionController kvadmission.Controller + storesFlowControl kvserver.StoresForFlowControl + kvFlowHandleMetrics *kvflowhandle.Metrics + } + admissionControl.schedulerLatencyListener = gcoords.Elastic.SchedulerLatencyListener + admissionControl.kvflowController = kvflowcontroller.New(registry, st, clock) + admissionControl.kvflowTokenDispatch = kvflowTokenDispatch + admissionControl.storesFlowControl = storesForFlowControl + admissionControl.kvAdmissionController = kvadmission.MakeController( + nodeIDContainer, + gcoords.Regular.GetWorkQueue(admission.KVWork), + gcoords.Elastic, + gcoords.Stores, + admissionControl.kvflowController, + admissionControl.storesFlowControl, + cfg.Settings, + ) + admissionControl.kvFlowHandleMetrics = kvflowhandle.NewMetrics(registry) + kvflowcontrol.Mode.SetOnChange(&st.SV, func(ctx context.Context) { + admissionControl.storesFlowControl.ResetStreams(ctx) + }) + raftTransport := kvserver.NewRaftTransport( cfg.AmbientCtx, st, @@ -557,9 +594,9 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { nodeDialer, grpcServer.Server, stopper, - kvflowTokenDispatch, - storesForFlowControl, - storesForFlowControl, + admissionControl.kvflowTokenDispatch, + admissionControl.storesFlowControl, + admissionControl.storesFlowControl, nil, /* knobs */ ) registry.AddMetricStruct(raftTransport.Metrics()) @@ -798,6 +835,11 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { SnapshotApplyLimit: cfg.SnapshotApplyLimit, SnapshotSendLimit: cfg.SnapshotSendLimit, RangeLogWriter: rangeLogWriter, + KVAdmissionController: admissionControl.kvAdmissionController, + KVFlowController: admissionControl.kvflowController, + KVFlowHandles: admissionControl.storesFlowControl, + KVFlowHandleMetrics: admissionControl.kvFlowHandleMetrics, + SchedulerLatencyListener: admissionControl.schedulerLatencyListener, } if storeTestingKnobs := cfg.TestingKnobs.Store; storeTestingKnobs != nil { storeCfg.TestingKnobs = *storeTestingKnobs.(*kvserver.StoreTestingKnobs) diff --git a/pkg/util/admission/granter_test.go b/pkg/util/admission/granter_test.go index 911fcf4d2312..cb7f1732af80 100644 --- a/pkg/util/admission/granter_test.go +++ b/pkg/util/admission/granter_test.go @@ -98,7 +98,7 @@ func TestGranterBasic(t *testing.T) { return req } delayForGrantChainTermination = 0 - coords := NewGrantCoordinators(ambientCtx, settings, opts, registry, &NoopOnLogEntryAdmitted{}) + coords := NewGrantCoordinators(ambientCtx, settings, opts, registry, &noopOnLogEntryAdmitted{}) defer coords.Close() coord = coords.Regular return flushAndReset() @@ -288,7 +288,7 @@ func TestStoreCoordinators(t *testing.T) { return str }, } - coords := NewGrantCoordinators(ambientCtx, settings, opts, registry, &NoopOnLogEntryAdmitted{}) + coords := NewGrantCoordinators(ambientCtx, settings, opts, registry, &noopOnLogEntryAdmitted{}) // There is only 1 KVWork requester at this point in initialization, for the // Regular GrantCoordinator. require.Equal(t, 1, len(requesters)) diff --git a/pkg/util/admission/replicated_write_admission_test.go b/pkg/util/admission/replicated_write_admission_test.go index 4d1e368922dd..7fd37551e8fd 100644 --- a/pkg/util/admission/replicated_write_admission_test.go +++ b/pkg/util/admission/replicated_write_admission_test.go @@ -121,7 +121,7 @@ func TestReplicatedWriteAdmission(t *testing.T) { tg[admissionpb.RegularWorkClass], tg[admissionpb.ElasticWorkClass], }, - st, metrics, opts, knobs, &NoopOnLogEntryAdmitted{}, &mockCoordMu, + st, metrics, opts, knobs, &noopOnLogEntryAdmitted{}, &mockCoordMu, ).(*StoreWorkQueue) tg[admissionpb.RegularWorkClass].r = storeWorkQueue.getRequesters()[admissionpb.RegularWorkClass] tg[admissionpb.ElasticWorkClass].r = storeWorkQueue.getRequesters()[admissionpb.ElasticWorkClass] diff --git a/pkg/util/admission/work_queue.go b/pkg/util/admission/work_queue.go index 5d3ca55c968e..d7d2b344b731 100644 --- a/pkg/util/admission/work_queue.go +++ b/pkg/util/admission/work_queue.go @@ -627,8 +627,17 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err // by either adding more synchronization, getting rid of this // fast path, or swapping this entry from the top-most one in // the waiting heap (and fixing the heap). + if log.V(1) { + log.Infof(ctx, "fast-path: admitting t%d pri=%s r%s origin=n%s log-position=%s ingested=%t", + tenantID, info.Priority, + info.ReplicatedWorkInfo.RangeID, + info.ReplicatedWorkInfo.Origin, + info.ReplicatedWorkInfo.LogPosition.String(), + info.ReplicatedWorkInfo.Ingested, + ) + } q.onAdmittedReplicatedWork.admittedReplicatedWork( - roachpb.MustMakeTenantID(tenant.id), + roachpb.MustMakeTenantID(tenantID), info.Priority, info.ReplicatedWorkInfo, info.RequestedCount, @@ -722,6 +731,17 @@ func (q *WorkQueue) Admit(ctx context.Context, info WorkInfo) (enabled bool, err q.metrics.recordStartWait(info.Priority) if info.ReplicatedWorkInfo.Enabled { + if log.V(1) { + log.Infof(ctx, "async-path: len(waiting-work)=%d: enqueued t%d pri=%s r%s origin=n%s log-position=%s ingested=%t", + tenant.waitingWorkHeap.Len(), + tenant.id, info.Priority, + info.ReplicatedWorkInfo.RangeID, + info.ReplicatedWorkInfo.Origin, + info.ReplicatedWorkInfo.LogPosition, + info.ReplicatedWorkInfo.Ingested, + ) + } + return // return without waiting (admission is asynchronous) } @@ -2001,22 +2021,6 @@ type OnLogEntryAdmitted interface { ) } -// NoopOnLogEntryAdmitted is a no-op implementation of the OnLogEntryAdmitted -// interface. -type NoopOnLogEntryAdmitted struct{} - -var _ OnLogEntryAdmitted = &NoopOnLogEntryAdmitted{} - -func (n *NoopOnLogEntryAdmitted) AdmittedLogEntry( - context.Context, - roachpb.NodeID, - admissionpb.WorkPriority, - roachpb.StoreID, - roachpb.RangeID, - LogPosition, -) { -} - // AdmittedWorkDone indicates to the queue that the admitted work has completed. // It's used for the legacy above-raft admission control where we Admit() // upfront, with just an estimate of the write size, and after the write is diff --git a/pkg/util/admission/work_queue_test.go b/pkg/util/admission/work_queue_test.go index 66da1387e973..bdb287c81a6f 100644 --- a/pkg/util/admission/work_queue_test.go +++ b/pkg/util/admission/work_queue_test.go @@ -536,7 +536,7 @@ func TestStoreWorkQueueBasic(t *testing.T) { tg[admissionpb.RegularWorkClass], tg[admissionpb.ElasticWorkClass], }, - st, metrics, opts, nil /* testing knobs */, &NoopOnLogEntryAdmitted{}, &mockCoordMu).(*StoreWorkQueue) + st, metrics, opts, nil /* testing knobs */, &noopOnLogEntryAdmitted{}, &mockCoordMu).(*StoreWorkQueue) tg[admissionpb.RegularWorkClass].r = q.getRequesters()[admissionpb.RegularWorkClass] tg[admissionpb.ElasticWorkClass].r = q.getRequesters()[admissionpb.ElasticWorkClass] wrkMap.resetMap() diff --git a/pkg/util/metric/registry.go b/pkg/util/metric/registry.go index 8a9e071328c9..c665ab54a7e4 100644 --- a/pkg/util/metric/registry.go +++ b/pkg/util/metric/registry.go @@ -90,6 +90,10 @@ func (r *Registry) AddMetric(metric Iterable) { // AddMetricStruct examines all fields of metricStruct and adds // all Iterable or metric.Struct objects to the registry. func (r *Registry) AddMetricStruct(metricStruct interface{}) { + if r == nil { // for testing convenience + return + } + ctx := context.TODO() v := reflect.ValueOf(metricStruct) if v.Kind() == reflect.Ptr {