diff --git a/DEPS.bzl b/DEPS.bzl index ceb642e80bde..ddf4b851152d 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -1297,10 +1297,10 @@ def go_deps(): patches = [ "@cockroach//build/patches:com_github_cockroachdb_pebble.patch", ], - sha256 = "5b306806fca1c0defafe031c4cd842934aec44394bc44dc24bc805bc3fd609b8", - strip_prefix = "github.com/cockroachdb/pebble@v0.0.0-20220301234049-69a82fe41c31", + sha256 = "cc3201e4197273c3ddc0adf72ab1f800d7b09e2b9d50422cab619a854d8e4e80", + strip_prefix = "github.com/cockroachdb/pebble@v0.0.0-20220307192532-e2b7bb844759", urls = [ - "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20220301234049-69a82fe41c31.zip", + "https://storage.googleapis.com/cockroach-godeps/gomod/github.com/cockroachdb/pebble/com_github_cockroachdb_pebble-v0.0.0-20220307192532-e2b7bb844759.zip", ], ) go_repository( diff --git a/WORKSPACE b/WORKSPACE index 74a3198740a3..e71ed271947c 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -137,16 +137,35 @@ http_archive( load( "@io_bazel_rules_go//go:deps.bzl", "go_download_sdk", + "go_host_sdk", + "go_local_sdk", "go_register_toolchains", "go_rules_dependencies", ) +# To point to a mirrored artifact, use: +# go_download_sdk( name = "go_sdk", urls = ["https://storage.googleapis.com/public-bazel-artifacts/go/{}"], version = "1.17.6", ) +# To point to a local SDK path, use the following instead. We'll call the +# directory into which you cloned the Go repository $GODIR[1]. You'll have to +# first run ./make.bash from $GODIR/src to pick up any custom changes. +# +# [1]: https://go.dev/doc/contribute#testing +# +# go_local_sdk( +# name = "go_sdk", +# path = "", +# ) + +# To use your whatever your local SDK is, use the following instead: +# +# go_host_sdk(name = "go_sdk") + go_rules_dependencies() go_register_toolchains(nogo = "@cockroach//:crdb_nogo") diff --git a/docs/tech-notes/change-replicas.md b/docs/tech-notes/change-replicas.md new file mode 100644 index 000000000000..5de72a42dbcd --- /dev/null +++ b/docs/tech-notes/change-replicas.md @@ -0,0 +1,93 @@ +# Up-replicating replicas with snapshots + +This tech note briefly explains the end-to-end process of up-replicating a range, starting from the `replicateQueue` to +sending Learner snapshots. This is meant to serve as a high level overview of the code paths taken, and more detailed +descriptions can be found in tech notes specific to each area. + +## Introduction + +Raft snapshots are necessary when a follower replica is unable to catch up using the existing Raft logs of the leader. +Such a scenario occurs when an existing replica falls too far behind the rest of the Raft group such that we've +truncated the Raft log above it. On the other hand, Learner snapshots are necessary when we add a new replica (due to a +rebalancing operation) and the new replica needs a snapshot to up-replicate. We should make a small distinction +that `Learner snapshots` are sent during rebalancing operations by the `replicateQueue` and `Raft snapshots` +are sent by the `raftSnapshotQueue` when a replica falls behind. However, there is no difference in the way the +snapshots are sent, generated or applied. For example, in the case where the replication factor is increased, the Raft +group would need to create a new replica from a clean slate. To up-replicate this replica, we would send it a Learner +snapshot of the full range data. + +In this note, we will focus on the scenario where a Learner snapshot is needed for the up-replication and rebalancing of +replicas. + +## ReplicateQueue + +As a brief overview, the `replicateQueue` manages a queue of replicas and is responsible for processing replicas for +rebalancing. For each replica that holds the lease, it invokes the allocator to compute decisions on whether any placement changes are needed +and the `replicateQueue` executes these changes. Then the `replicateQueue` calls `changeReplicas` to act on these change +requests and repeats until all changes are complete. + +## ChangeReplicas API + +The `AdminChangeReplicas` API exposed by KV is mainly responsible for atomically changing replicas of a range. These +changes include non-voter promotions, voter/non-voter swaps, additions and removals. The change is performed in a +distributed transaction and takes effect when the transaction is committed. There are many details involved in these +changes, but for the purposes of this note, we will focus on up-replication and the process of sending Learner +snapshots. + +During up-replication, `ChangeReplicas` runs a transaction to add a new learner replica to the range. Learner +replicas are new replicas that receive Raft traffic but do not participate in quorum; allowing the range to remain +highly available during the replication process. + +Once the learners have been added to the range, they are synchronously sent a Learner snapshot from the leaseholder +(which is typically, but not necessarily, the Raft leader) to up-replicate. There is some nuance here as Raft snapshots +are typically automatically sent by the existing Raft snapshot queue. However, we are synchronously sending a snapshot +here in the replicateQueue to quickly catch up learners. To prevent a race with the raftSnapshotQueue, we lock snapshots +to learners and non-voters on the current leaseholder store while processing the change. In addition, we also place a +lock on log truncation to ensure we don't truncate the Raft +log while a snapshot is inflight, preventing wasted snapshots. However, both of these locks are the best effort as +opposed to guaranteed as the lease can be transferred and the new leaseholder could still truncate the log. + +## Sending Snapshots + +The snapshot process itself is broken into three parts: generating, transmitting and applying the snapshot. This process +is the same whether it is invoked by the `replicateQueue` or `raftSnapshotQueue`. + +### Generating the snapshot + +A snapshot is a bulk transfer of all replicated data in a range and everything the replica needs to be a member of a +Raft group. It consists of a consistent view of the state of some replica of a range as of an applied index. The +`GetSnapshot` method gets a storage engine snapshot which reflects the replicated state as of the applied index the +snapshot is generated at, and creates an iterator from the storage snapshot. A storage engine snapshot is important to +ensure that multiple iterators created at different points in time see a consistent replicated state that does not +change while the snapshot is streamed. In our current code, the engine snapshot is not necessary for correctness as only +one iterator is constructed. + +### Transmitting the snapshot + +The snapshot transfer is sent through a bi-directional stream of snapshot requests and snapshot responses. However, +before the actual range data is sent, the streaming rpc first sends a header message to the recipient store and blocks +until the store responds to accept or reject the snapshot data. + +The recipient checks the following conditions before accepting the snapshot. We currently allow one concurrent snapshot +application on a receiver store at a time. Consequently, if there are snapshot applications already in-flight on the +receiver, incoming snapshots are throttled or rejected if they wait too long. The receiver then checks whether its store +has a compatible replica present to have the snapshot applied. In the case of up-replication, we expect the learner +replica to be present on the receiver store. However, if the snapshot overlaps an existing replica or replica +placeholder on the receiver, the snapshot will be rejected as well. Once these conditions have been verified, a response +message will be sent back to the sender. + +Once the recipient has accepted, the sender proceeds to use the iterator to read chunks of +size `kv.snapshot_sender.batch_size` from storage into memory. The batch size is enforced as to not hold on to a +significant chunk of data in memory. Then, in-memory batches are created to send the snapshot data in streaming grpc +message chunks to the recipient. Note that snapshots are rate limited depending on cluster settings as to not overload +the network. Finally, once all the data is sent, the sender sends a final message to the receiver and waits for a +response from the recipient indicating whether the snapshot was a success. On the receiver side, the key-value pairs are +used to construct multiple SSTs for direct ingestion, which prevents the receiver from holding the entire snapshot in +memory. + +### Applying the snapshot + +Once the snapshot is received on the receiver, defensive checks are applied to ensure the correct snapshot is received. +It ensures there is an initialized learner replica or creates a placeholder to accept the snapshot. Finally, the Raft +snapshot message is handed to the replica’s Raft node, which will apply the snapshot. The placeholder or learner is then +converted to a fully initialized replica. diff --git a/go.mod b/go.mod index 9235f837007a..8164331538fe 100644 --- a/go.mod +++ b/go.mod @@ -46,7 +46,7 @@ require ( github.com/cockroachdb/go-test-teamcity v0.0.0-20191211140407-cff980ad0a55 github.com/cockroachdb/gostdlib v1.13.0 github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f - github.com/cockroachdb/pebble v0.0.0-20220301234049-69a82fe41c31 + github.com/cockroachdb/pebble v0.0.0-20220307192532-e2b7bb844759 github.com/cockroachdb/redact v1.1.3 github.com/cockroachdb/returncheck v0.0.0-20200612231554-92cdbca611dd github.com/cockroachdb/stress v0.0.0-20220217190341-94cf65c2a29f diff --git a/go.sum b/go.sum index 4f25f3293c73..b9033e86fce0 100644 --- a/go.sum +++ b/go.sum @@ -438,8 +438,8 @@ github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f h1:6jduT9Hfc0n github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs= github.com/cockroachdb/panicparse/v2 v2.0.0-20211103220158-604c82a44f1e h1:FrERdkPlRj+v7fc+PGpey3GUiDGuTR5CsmLCA54YJ8I= github.com/cockroachdb/panicparse/v2 v2.0.0-20211103220158-604c82a44f1e/go.mod h1:pMxsKyCewnV3xPaFvvT9NfwvDTcIx2Xqg0qL5Gq0SjM= -github.com/cockroachdb/pebble v0.0.0-20220301234049-69a82fe41c31 h1:hkBQIZdAeGQzF3pVbL1lrPJNx/Hvwiy5HHpRf0Nz6y8= -github.com/cockroachdb/pebble v0.0.0-20220301234049-69a82fe41c31/go.mod h1:buxOO9GBtOcq1DiXDpIPYrmxY020K2A8lOrwno5FetU= +github.com/cockroachdb/pebble v0.0.0-20220307192532-e2b7bb844759 h1:PKDkU1nARLt2TL99CCEukKJIEUo2cgt9AEVlrdtzfuM= +github.com/cockroachdb/pebble v0.0.0-20220307192532-e2b7bb844759/go.mod h1:buxOO9GBtOcq1DiXDpIPYrmxY020K2A8lOrwno5FetU= github.com/cockroachdb/redact v1.0.8/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= github.com/cockroachdb/redact v1.1.3 h1:AKZds10rFSIj7qADf0g46UixK8NNLwWTNdCIGS5wfSQ= github.com/cockroachdb/redact v1.1.3/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= diff --git a/pkg/cmd/dev/cache.go b/pkg/cmd/dev/cache.go index 118653819624..e6fcea304f03 100644 --- a/pkg/cmd/dev/cache.go +++ b/pkg/cmd/dev/cache.go @@ -61,25 +61,21 @@ func (d *dev) cache(cmd *cobra.Command, _ []string) error { if clean { return d.cleanCache(ctx) } + if down { + return d.tearDownCache(ctx) + } if reset { // Errors here don't really mean much, we can just ignore them. err := d.tearDownCache(ctx) if err != nil { log.Printf("%v\n", err) } - bazelRcLine, err := d.setUpCache(ctx) - if bazelRcLine != "" { - fmt.Printf("Please add `%s` to your ~/.bazelrc\n", bazelRcLine) - } - return err - } - if down { - return d.tearDownCache(ctx) } bazelRcLine, err := d.setUpCache(ctx) - if bazelRcLine != "" { - fmt.Printf("Please add `%s` to your ~/.bazelrc\n", bazelRcLine) + if err != nil { + return err } + _, err = d.checkPresenceInBazelRc(bazelRcLine) return err } diff --git a/pkg/cmd/dev/doctor.go b/pkg/cmd/dev/doctor.go index e5c25047682a..9402c5bc1ec1 100644 --- a/pkg/cmd/dev/doctor.go +++ b/pkg/cmd/dev/doctor.go @@ -222,16 +222,10 @@ Please add one of the following to your %s/.bazelrc.user:`, workspace) if err != nil { return err } - homeDir, err := os.UserHomeDir() + success, err = d.checkPresenceInBazelRc(bazelRcLine) if err != nil { return err } - bazelRcContents, err := d.os.ReadFile(filepath.Join(homeDir, ".bazelrc")) - if err != nil || !strings.Contains(bazelRcContents, bazelRcLine) { - log.Printf("Please add the string `%s` to your ~/.bazelrc:\n", bazelRcLine) - log.Printf(" echo \"%s\" >> ~/.bazelrc", bazelRcLine) - success = false - } } if !success { @@ -244,3 +238,31 @@ Please add one of the following to your %s/.bazelrc.user:`, workspace) log.Println("You are ready to build :)") return nil } + +func (d *dev) checkPresenceInBazelRc(expectedBazelRcLine string) (found bool, _ error) { + homeDir, err := os.UserHomeDir() + if err != nil { + return false, err + } + defer func() { + if !found { + log.Printf("Please add the string `%s` to your ~/.bazelrc:\n", expectedBazelRcLine) + log.Printf(" echo \"%s\" >> ~/.bazelrc", expectedBazelRcLine) + } + }() + + bazelRcContents, err := d.os.ReadFile(filepath.Join(homeDir, ".bazelrc")) + if err != nil { + return false, err + } + for _, line := range strings.Split(bazelRcContents, "\n") { + if !strings.Contains(line, expectedBazelRcLine) { + continue + } + if strings.HasPrefix(strings.TrimSpace(line), "#") { + continue + } + return true, nil + } + return false, nil +} diff --git a/pkg/kv/kvserver/allocator.go b/pkg/kv/kvserver/allocator.go index 7eb8af2d03be..e756660ed2ff 100644 --- a/pkg/kv/kvserver/allocator.go +++ b/pkg/kv/kvserver/allocator.go @@ -185,6 +185,17 @@ const ( nonVoterTarget ) +// replicaStatus represents whether a replica is currently alive, +// dead or decommissioning. +type replicaStatus int + +const ( + _ replicaStatus = iota + alive + dead + decommissioning +) + // AddChangeType returns the roachpb.ReplicaChangeType corresponding to the // given targetReplicaType. // diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index e892c0274f64..4bc89e6c0882 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -61,7 +61,6 @@ var MinLeaseTransferInterval = settings.RegisterDurationSetting( settings.NonNegativeDuration, ) -// TODO(aayush): Expand this metric set to include metrics about non-voting replicas. var ( metaReplicateQueueAddReplicaCount = metric.Metadata{ Name: "queue.replicate.addreplica", @@ -69,18 +68,72 @@ var ( Measurement: "Replica Additions", Unit: metric.Unit_COUNT, } + metaReplicateQueueAddVoterReplicaCount = metric.Metadata{ + Name: "queue.replicate.addvoterreplica", + Help: "Number of voter replica additions attempted by the replicate queue", + Measurement: "Replica Additions", + Unit: metric.Unit_COUNT, + } + metaReplicateQueueAddNonVoterReplicaCount = metric.Metadata{ + Name: "queue.replicate.addnonvoterreplica", + Help: "Number of non-voter replica additions attempted by the replicate queue", + Measurement: "Replica Additions", + Unit: metric.Unit_COUNT, + } metaReplicateQueueRemoveReplicaCount = metric.Metadata{ Name: "queue.replicate.removereplica", Help: "Number of replica removals attempted by the replicate queue (typically in response to a rebalancer-initiated addition)", Measurement: "Replica Removals", Unit: metric.Unit_COUNT, } + metaReplicateQueueRemoveVoterReplicaCount = metric.Metadata{ + Name: "queue.replicate.removevoterreplica", + Help: "Number of voter replica removals attempted by the replicate queue (typically in response to a rebalancer-initiated addition)", + Measurement: "Replica Removals", + Unit: metric.Unit_COUNT, + } + metaReplicateQueueRemoveNonVoterReplicaCount = metric.Metadata{ + Name: "queue.replicate.removenonvoterreplica", + Help: "Number of non-voter replica removals attempted by the replicate queue (typically in response to a rebalancer-initiated addition)", + Measurement: "Replica Removals", + Unit: metric.Unit_COUNT, + } metaReplicateQueueRemoveDeadReplicaCount = metric.Metadata{ Name: "queue.replicate.removedeadreplica", Help: "Number of dead replica removals attempted by the replicate queue (typically in response to a node outage)", Measurement: "Replica Removals", Unit: metric.Unit_COUNT, } + metaReplicateQueueRemoveDeadVoterReplicaCount = metric.Metadata{ + Name: "queue.replicate.removedeadvoterreplica", + Help: "Number of dead voter replica removals attempted by the replicate queue (typically in response to a node outage)", + Measurement: "Replica Removals", + Unit: metric.Unit_COUNT, + } + metaReplicateQueueRemoveDeadNonVoterReplicaCount = metric.Metadata{ + Name: "queue.replicate.removedeadnonvoterreplica", + Help: "Number of dead non-voter replica removals attempted by the replicate queue (typically in response to a node outage)", + Measurement: "Replica Removals", + Unit: metric.Unit_COUNT, + } + metaReplicateQueueRemoveDecommissioningReplicaCount = metric.Metadata{ + Name: "queue.replicate.removedecommissioningreplica", + Help: "Number of decommissioning replica removals attempted by the replicate queue (typically in response to a node outage)", + Measurement: "Replica Removals", + Unit: metric.Unit_COUNT, + } + metaReplicateQueueRemoveDecommissioningVoterReplicaCount = metric.Metadata{ + Name: "queue.replicate.removedecommissioningvoterreplica", + Help: "Number of decommissioning voter replica removals attempted by the replicate queue (typically in response to a node outage)", + Measurement: "Replica Removals", + Unit: metric.Unit_COUNT, + } + metaReplicateQueueRemoveDecommissioningNonVoterReplicaCount = metric.Metadata{ + Name: "queue.replicate.removedecommissioningnonvoterreplica", + Help: "Number of decommissioning non-voter replica removals attempted by the replicate queue (typically in response to a node outage)", + Measurement: "Replica Removals", + Unit: metric.Unit_COUNT, + } metaReplicateQueueRemoveLearnerReplicaCount = metric.Metadata{ Name: "queue.replicate.removelearnerreplica", Help: "Number of learner replica removals attempted by the replicate queue (typically due to internal race conditions)", @@ -93,6 +146,18 @@ var ( Measurement: "Replica Additions", Unit: metric.Unit_COUNT, } + metaReplicateQueueRebalanceVoterReplicaCount = metric.Metadata{ + Name: "queue.replicate.rebalancevoterreplica", + Help: "Number of voter replica rebalancer-initiated additions attempted by the replicate queue", + Measurement: "Replica Additions", + Unit: metric.Unit_COUNT, + } + metaReplicateQueueRebalanceNonVoterReplicaCount = metric.Metadata{ + Name: "queue.replicate.rebalancenonvoterreplica", + Help: "Number of non-voter replica rebalancer-initiated additions attempted by the replicate queue", + Measurement: "Replica Additions", + Unit: metric.Unit_COUNT, + } metaReplicateQueueTransferLeaseCount = metric.Metadata{ Name: "queue.replicate.transferlease", Help: "Number of range lease transfers attempted by the replicate queue", @@ -133,28 +198,140 @@ func (e *quorumError) Error() string { func (*quorumError) purgatoryErrorMarker() {} // ReplicateQueueMetrics is the set of metrics for the replicate queue. -// TODO(aayush): Track metrics for non-voting replicas separately here. type ReplicateQueueMetrics struct { - AddReplicaCount *metric.Counter - RemoveReplicaCount *metric.Counter - RemoveDeadReplicaCount *metric.Counter - RemoveLearnerReplicaCount *metric.Counter - RebalanceReplicaCount *metric.Counter - TransferLeaseCount *metric.Counter - NonVoterPromotionsCount *metric.Counter - VoterDemotionsCount *metric.Counter + AddReplicaCount *metric.Counter + AddVoterReplicaCount *metric.Counter + AddNonVoterReplicaCount *metric.Counter + RemoveReplicaCount *metric.Counter + RemoveVoterReplicaCount *metric.Counter + RemoveNonVoterReplicaCount *metric.Counter + RemoveDeadReplicaCount *metric.Counter + RemoveDeadVoterReplicaCount *metric.Counter + RemoveDeadNonVoterReplicaCount *metric.Counter + RemoveDecommissioningReplicaCount *metric.Counter + RemoveDecommissioningVoterReplicaCount *metric.Counter + RemoveDecommissioningNonVoterReplicaCount *metric.Counter + RemoveLearnerReplicaCount *metric.Counter + RebalanceReplicaCount *metric.Counter + RebalanceVoterReplicaCount *metric.Counter + RebalanceNonVoterReplicaCount *metric.Counter + TransferLeaseCount *metric.Counter + NonVoterPromotionsCount *metric.Counter + VoterDemotionsCount *metric.Counter } func makeReplicateQueueMetrics() ReplicateQueueMetrics { return ReplicateQueueMetrics{ - AddReplicaCount: metric.NewCounter(metaReplicateQueueAddReplicaCount), - RemoveReplicaCount: metric.NewCounter(metaReplicateQueueRemoveReplicaCount), - RemoveDeadReplicaCount: metric.NewCounter(metaReplicateQueueRemoveDeadReplicaCount), - RemoveLearnerReplicaCount: metric.NewCounter(metaReplicateQueueRemoveLearnerReplicaCount), - RebalanceReplicaCount: metric.NewCounter(metaReplicateQueueRebalanceReplicaCount), - TransferLeaseCount: metric.NewCounter(metaReplicateQueueTransferLeaseCount), - NonVoterPromotionsCount: metric.NewCounter(metaReplicateQueueNonVoterPromotionsCount), - VoterDemotionsCount: metric.NewCounter(metaReplicateQueueVoterDemotionsCount), + AddReplicaCount: metric.NewCounter(metaReplicateQueueAddReplicaCount), + AddVoterReplicaCount: metric.NewCounter(metaReplicateQueueAddVoterReplicaCount), + AddNonVoterReplicaCount: metric.NewCounter(metaReplicateQueueAddNonVoterReplicaCount), + RemoveReplicaCount: metric.NewCounter(metaReplicateQueueRemoveReplicaCount), + RemoveVoterReplicaCount: metric.NewCounter(metaReplicateQueueRemoveVoterReplicaCount), + RemoveNonVoterReplicaCount: metric.NewCounter(metaReplicateQueueRemoveNonVoterReplicaCount), + RemoveDeadReplicaCount: metric.NewCounter(metaReplicateQueueRemoveDeadReplicaCount), + RemoveDeadVoterReplicaCount: metric.NewCounter(metaReplicateQueueRemoveDeadVoterReplicaCount), + RemoveDeadNonVoterReplicaCount: metric.NewCounter(metaReplicateQueueRemoveDeadNonVoterReplicaCount), + RemoveLearnerReplicaCount: metric.NewCounter(metaReplicateQueueRemoveLearnerReplicaCount), + RemoveDecommissioningReplicaCount: metric.NewCounter(metaReplicateQueueRemoveDecommissioningReplicaCount), + RemoveDecommissioningVoterReplicaCount: metric.NewCounter(metaReplicateQueueRemoveDecommissioningVoterReplicaCount), + RemoveDecommissioningNonVoterReplicaCount: metric.NewCounter(metaReplicateQueueRemoveDecommissioningNonVoterReplicaCount), + RebalanceReplicaCount: metric.NewCounter(metaReplicateQueueRebalanceReplicaCount), + RebalanceVoterReplicaCount: metric.NewCounter(metaReplicateQueueRebalanceVoterReplicaCount), + RebalanceNonVoterReplicaCount: metric.NewCounter(metaReplicateQueueRebalanceNonVoterReplicaCount), + TransferLeaseCount: metric.NewCounter(metaReplicateQueueTransferLeaseCount), + NonVoterPromotionsCount: metric.NewCounter(metaReplicateQueueNonVoterPromotionsCount), + VoterDemotionsCount: metric.NewCounter(metaReplicateQueueVoterDemotionsCount), + } +} + +// trackAddReplicaCount increases the AddReplicaCount metric and separately +// tracks voter/non-voter metrics given a replica targetType. +func (metrics *ReplicateQueueMetrics) trackAddReplicaCount(targetType targetReplicaType) { + metrics.AddReplicaCount.Inc(1) + switch targetType { + case voterTarget: + metrics.AddVoterReplicaCount.Inc(1) + case nonVoterTarget: + metrics.AddNonVoterReplicaCount.Inc(1) + default: + panic(fmt.Sprintf("unsupported targetReplicaType: %v", targetType)) + } +} + +// trackRemoveMetric increases total RemoveReplicaCount metrics and +// increments dead/decommissioning metrics depending on replicaStatus. +func (metrics *ReplicateQueueMetrics) trackRemoveMetric( + targetType targetReplicaType, replicaStatus replicaStatus, +) { + metrics.trackRemoveReplicaCount(targetType) + switch replicaStatus { + case dead: + metrics.trackRemoveDeadReplicaCount(targetType) + case decommissioning: + metrics.trackRemoveDecommissioningReplicaCount(targetType) + case alive: + return + default: + panic(fmt.Sprintf("unknown replicaStatus %v", replicaStatus)) + } +} + +// trackRemoveReplicaCount increases the RemoveReplicaCount metric and +// separately tracks voter/non-voter metrics given a replica targetType. +func (metrics *ReplicateQueueMetrics) trackRemoveReplicaCount(targetType targetReplicaType) { + metrics.RemoveReplicaCount.Inc(1) + switch targetType { + case voterTarget: + metrics.RemoveVoterReplicaCount.Inc(1) + case nonVoterTarget: + metrics.RemoveNonVoterReplicaCount.Inc(1) + default: + panic(fmt.Sprintf("unsupported targetReplicaType: %v", targetType)) + } +} + +// trackRemoveDeadReplicaCount increases the RemoveDeadReplicaCount metric and +// separately tracks voter/non-voter metrics given a replica targetType. +func (metrics *ReplicateQueueMetrics) trackRemoveDeadReplicaCount(targetType targetReplicaType) { + metrics.RemoveDeadReplicaCount.Inc(1) + switch targetType { + case voterTarget: + metrics.RemoveDeadVoterReplicaCount.Inc(1) + case nonVoterTarget: + metrics.RemoveDeadNonVoterReplicaCount.Inc(1) + default: + panic(fmt.Sprintf("unsupported targetReplicaType: %v", targetType)) + } +} + +// trackRemoveDecommissioningReplicaCount increases the +// RemoveDecommissioningReplicaCount metric and separately tracks +// voter/non-voter metrics given a replica targetType. +func (metrics *ReplicateQueueMetrics) trackRemoveDecommissioningReplicaCount( + targetType targetReplicaType, +) { + metrics.RemoveDecommissioningReplicaCount.Inc(1) + switch targetType { + case voterTarget: + metrics.RemoveDecommissioningVoterReplicaCount.Inc(1) + case nonVoterTarget: + metrics.RemoveDecommissioningNonVoterReplicaCount.Inc(1) + default: + panic(fmt.Sprintf("unsupported targetReplicaType: %v", targetType)) + } +} + +// trackRebalanceReplicaCount increases the RebalanceReplicaCount metric and +// separately tracks voter/non-voter metrics given a replica targetType. +func (metrics *ReplicateQueueMetrics) trackRebalanceReplicaCount(targetType targetReplicaType) { + metrics.RebalanceReplicaCount.Inc(1) + switch targetType { + case voterTarget: + metrics.RebalanceVoterReplicaCount.Inc(1) + case nonVoterTarget: + metrics.RebalanceNonVoterReplicaCount.Inc(1) + default: + panic(fmt.Sprintf("unsupported targetReplicaType: %v", targetType)) } } @@ -399,9 +576,13 @@ func (rq *replicateQueue) processOneChange( // Add replicas. case AllocatorAddVoter: - return rq.addOrReplaceVoters(ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, dryRun) + return rq.addOrReplaceVoters( + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, alive, dryRun, + ) case AllocatorAddNonVoter: - return rq.addOrReplaceNonVoters(ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, dryRun) + return rq.addOrReplaceNonVoters( + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, alive, dryRun, + ) // Remove replicas. case AllocatorRemoveVoter: @@ -421,7 +602,8 @@ func (rq *replicateQueue) processOneChange( "dead voter %v unexpectedly not found in %v", deadVoterReplicas[0], voterReplicas) } - return rq.addOrReplaceVoters(ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, dryRun) + return rq.addOrReplaceVoters( + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, dead, dryRun) case AllocatorReplaceDeadNonVoter: if len(deadNonVoterReplicas) == 0 { // Nothing to do. @@ -433,7 +615,8 @@ func (rq *replicateQueue) processOneChange( "dead non-voter %v unexpectedly not found in %v", deadNonVoterReplicas[0], nonVoterReplicas) } - return rq.addOrReplaceNonVoters(ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, dryRun) + return rq.addOrReplaceNonVoters( + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, dead, dryRun) // Replace decommissioning replicas. case AllocatorReplaceDecommissioningVoter: @@ -448,7 +631,8 @@ func (rq *replicateQueue) processOneChange( "decommissioning voter %v unexpectedly not found in %v", decommissioningVoterReplicas[0], voterReplicas) } - return rq.addOrReplaceVoters(ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, dryRun) + return rq.addOrReplaceVoters( + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, decommissioning, dryRun) case AllocatorReplaceDecommissioningNonVoter: decommissioningNonVoterReplicas := rq.allocator.storePool.decommissioningReplicas(nonVoterReplicas) if len(decommissioningNonVoterReplicas) == 0 { @@ -460,7 +644,8 @@ func (rq *replicateQueue) processOneChange( "decommissioning non-voter %v unexpectedly not found in %v", decommissioningNonVoterReplicas[0], nonVoterReplicas) } - return rq.addOrReplaceNonVoters(ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, dryRun) + return rq.addOrReplaceNonVoters( + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, decommissioning, dryRun) // Remove decommissioning replicas. // @@ -530,6 +715,7 @@ func (rq *replicateQueue) addOrReplaceVoters( repl *Replica, liveVoterReplicas, liveNonVoterReplicas []roachpb.ReplicaDescriptor, removeIdx int, + replicaStatus replicaStatus, dryRun bool, ) (requeue bool, _ error) { desc, conf := repl.DescAndSpanConfig() @@ -611,7 +797,6 @@ func (rq *replicateQueue) addOrReplaceVoters( return false, errors.Wrap(err, "avoid up-replicating to fragile quorum") } } - rq.metrics.AddReplicaCount.Inc(1) // Figure out whether we should be promoting an existing non-voting replica to // a voting replica or if we ought to be adding a voter afresh. @@ -624,16 +809,23 @@ func (rq *replicateQueue) addOrReplaceVoters( } // If the allocation target has a non-voter already, we will promote it to a // voter. - rq.metrics.NonVoterPromotionsCount.Inc(1) + if !dryRun { + rq.metrics.NonVoterPromotionsCount.Inc(1) + } ops = roachpb.ReplicationChangesForPromotion(newVoter) } else { + if !dryRun { + rq.metrics.trackAddReplicaCount(voterTarget) + } ops = roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, newVoter) } if removeIdx < 0 { log.VEventf(ctx, 1, "adding voter %+v: %s", newVoter, rangeRaftProgress(repl.RaftStatus(), existingVoters)) } else { - rq.metrics.RemoveReplicaCount.Inc(1) + if !dryRun { + rq.metrics.trackRemoveMetric(voterTarget, replicaStatus) + } removeVoter := existingVoters[removeIdx] log.VEventf(ctx, 1, "replacing voter %s with %+v: %s", removeVoter, newVoter, rangeRaftProgress(repl.RaftStatus(), existingVoters)) @@ -672,6 +864,7 @@ func (rq *replicateQueue) addOrReplaceNonVoters( repl *Replica, liveVoterReplicas, liveNonVoterReplicas []roachpb.ReplicaDescriptor, removeIdx int, + replicaStatus replicaStatus, dryRun bool, ) (requeue bool, _ error) { desc, conf := repl.DescAndSpanConfig() @@ -681,14 +874,18 @@ func (rq *replicateQueue) addOrReplaceNonVoters( if err != nil { return false, err } - rq.metrics.AddReplicaCount.Inc(1) + if !dryRun { + rq.metrics.trackAddReplicaCount(nonVoterTarget) + } ops := roachpb.MakeReplicationChanges(roachpb.ADD_NON_VOTER, newNonVoter) if removeIdx < 0 { log.VEventf(ctx, 1, "adding non-voter %+v: %s", newNonVoter, rangeRaftProgress(repl.RaftStatus(), existingNonVoters)) } else { - rq.metrics.RemoveReplicaCount.Inc(1) + if !dryRun { + rq.metrics.trackRemoveMetric(nonVoterTarget, replicaStatus) + } removeNonVoter := existingNonVoters[removeIdx] log.VEventf(ctx, 1, "replacing non-voter %s with %+v: %s", removeNonVoter, newNonVoter, rangeRaftProgress(repl.RaftStatus(), existingNonVoters)) @@ -871,7 +1068,10 @@ func (rq *replicateQueue) removeVoter( } // Remove a replica. - rq.metrics.RemoveReplicaCount.Inc(1) + if !dryRun { + rq.metrics.trackRemoveMetric(voterTarget, alive) + } + log.VEventf(ctx, 1, "removing voting replica %+v due to over-replication: %s", removeVoter, rangeRaftProgress(repl.RaftStatus(), existingVoters)) desc := repl.Desc() @@ -901,7 +1101,6 @@ func (rq *replicateQueue) removeNonVoter( existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, dryRun bool, ) (requeue bool, _ error) { - rq.metrics.RemoveReplicaCount.Inc(1) desc, conf := repl.DescAndSpanConfig() removeNonVoter, details, err := rq.allocator.RemoveNonVoter( @@ -915,7 +1114,9 @@ func (rq *replicateQueue) removeNonVoter( if err != nil { return false, err } - + if !dryRun { + rq.metrics.trackRemoveMetric(nonVoterTarget, alive) + } log.VEventf(ctx, 1, "removing non-voting replica %+v due to over-replication: %s", removeNonVoter, rangeRaftProgress(repl.RaftStatus(), existingVoters)) target := roachpb.ReplicationTarget{ @@ -974,7 +1175,9 @@ func (rq *replicateQueue) removeDecommissioning( } // Remove the decommissioning replica. - rq.metrics.RemoveReplicaCount.Inc(1) + if !dryRun { + rq.metrics.trackRemoveMetric(targetType, decommissioning) + } log.VEventf(ctx, 1, "removing decommissioning %s %+v from store", targetType, decommissioningReplica) target := roachpb.ReplicationTarget{ NodeID: decommissioningReplica.NodeID, @@ -1013,7 +1216,9 @@ func (rq *replicateQueue) removeDead( return true, nil } deadReplica := deadReplicas[0] - rq.metrics.RemoveDeadReplicaCount.Inc(1) + if !dryRun { + rq.metrics.trackRemoveMetric(targetType, dead) + } log.VEventf(ctx, 1, "removing dead %s %+v from store", targetType, deadReplica) target := roachpb.ReplicationTarget{ NodeID: deadReplica.NodeID, @@ -1050,7 +1255,9 @@ func (rq *replicateQueue) removeLearner( return true, nil } learnerReplica := learnerReplicas[0] - rq.metrics.RemoveLearnerReplicaCount.Inc(1) + if !dryRun { + rq.metrics.RemoveLearnerReplicaCount.Inc(1) + } log.VEventf(ctx, 1, "removing learner replica %+v from store", learnerReplica) target := roachpb.ReplicationTarget{ NodeID: learnerReplica.NodeID, @@ -1139,10 +1346,12 @@ func (rq *replicateQueue) considerRebalance( if err != nil { return false, err } - rq.metrics.RebalanceReplicaCount.Inc(1) - if performingSwap { - rq.metrics.VoterDemotionsCount.Inc(1) - rq.metrics.NonVoterPromotionsCount.Inc(1) + if !dryRun { + rq.metrics.trackRebalanceReplicaCount(rebalanceTargetType) + if performingSwap { + rq.metrics.VoterDemotionsCount.Inc(1) + rq.metrics.NonVoterPromotionsCount.Inc(1) + } } log.VEventf(ctx, 1, diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index 5bb943a6e62e..7b9f568e5120 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -388,7 +388,7 @@ func TestReplicateQueueUpAndDownReplicateNonVoters(t *testing.T) { func checkReplicaCount( ctx context.Context, tc *testcluster.TestCluster, - rangeDesc roachpb.RangeDescriptor, + rangeDesc *roachpb.RangeDescriptor, voterCount, nonVoterCount int, ) (bool, error) { err := forceScanOnAllReplicationQueues(tc) @@ -396,7 +396,7 @@ func checkReplicaCount( log.Infof(ctx, "store.ForceReplicationScanAndProcess() failed with: %s", err) return false, err } - rangeDesc, err = tc.LookupRange(rangeDesc.StartKey.AsRawKey()) + *rangeDesc, err = tc.LookupRange(rangeDesc.StartKey.AsRawKey()) if err != nil { return false, err } @@ -426,6 +426,9 @@ func TestReplicateQueueDecommissioningNonVoters(t *testing.T) { ReplicationMode: base.ReplicationAuto, ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableReplicaRebalancing: true, + }, SpanConfig: &spanconfig.TestingKnobs{ ConfigureScratchRange: true, }, @@ -433,15 +436,18 @@ func TestReplicateQueueDecommissioningNonVoters(t *testing.T) { }, }, ) + _, err := tc.ServerConn(0).Exec( + `SET CLUSTER SETTING server.failed_reservation_timeout='1ms'`) + require.NoError(t, err) scratchKey := tc.ScratchRange(t) scratchRange := tc.LookupRangeOrFatal(t, scratchKey) - _, err := tc.ServerConn(0).Exec( + _, err = tc.ServerConn(0).Exec( `ALTER RANGE DEFAULT CONFIGURE ZONE USING num_replicas = 3, num_voters = 1`, ) require.NoError(t, err) require.Eventually(t, func() bool { - ok, err := checkReplicaCount(ctx, tc, scratchRange, 1 /* voterCount */, 2 /* nonVoterCount */) + ok, err := checkReplicaCount(ctx, tc, &scratchRange, 1 /* voterCount */, 2 /* nonVoterCount */) if err != nil { log.Errorf(ctx, "error checking replica count: %s", err) return false @@ -460,13 +466,23 @@ func TestReplicateQueueDecommissioningNonVoters(t *testing.T) { // Do a fresh look up on the range descriptor. scratchRange = tc.LookupRangeOrFatal(t, scratchRange.StartKey.AsRawKey()) beforeNodeIDs := getNonVoterNodeIDs(scratchRange) + store, err := getLeaseholderStore(tc, scratchRange) + if err != nil { + t.Fatal(err) + } + // Check the value of metrics prior to replacement. + previousAddCount := store.ReplicateQueueMetrics().AddNonVoterReplicaCount.Count() + previousRemovalCount := store.ReplicateQueueMetrics().RemoveNonVoterReplicaCount.Count() + previousDecommRemovals := + store.ReplicateQueueMetrics().RemoveDecommissioningNonVoterReplicaCount.Count() + // Decommission each of the two nodes that have the non-voters and make sure // that those non-voters are upreplicated elsewhere. require.NoError(t, tc.Server(0).Decommission(ctx, livenesspb.MembershipStatus_DECOMMISSIONING, beforeNodeIDs)) require.Eventually(t, func() bool { - ok, err := checkReplicaCount(ctx, tc, scratchRange, 1 /* voterCount */, 2 /* nonVoterCount */) + ok, err := checkReplicaCount(ctx, tc, &scratchRange, 1 /* voterCount */, 2 /* nonVoterCount */) if err != nil { log.Errorf(ctx, "error checking replica count: %s", err) return false @@ -487,6 +503,26 @@ func TestReplicateQueueDecommissioningNonVoters(t *testing.T) { } return true }, testutils.DefaultSucceedsSoonDuration, 100*time.Millisecond) + + // replica replacements update the addition/removal metrics as replicas + // are being removed on two decommissioning stores added to other stores. + currentAddCount := store.ReplicateQueueMetrics().AddNonVoterReplicaCount.Count() + currentRemoveCount := store.ReplicateQueueMetrics().RemoveNonVoterReplicaCount.Count() + currentDecommRemovals := + store.ReplicateQueueMetrics().RemoveDecommissioningNonVoterReplicaCount.Count() + + require.GreaterOrEqualf( + t, currentAddCount, previousAddCount+2, + "expected replica additions to increase by at least 2", + ) + require.GreaterOrEqualf( + t, currentRemoveCount, previousRemovalCount+2, + "expected total replica removals to increase by at least 2", + ) + require.GreaterOrEqualf( + t, currentDecommRemovals, previousDecommRemovals+2, + "expected decommissioning replica removals to increase by at least 2", + ) }) // Check that when we have more non-voters than needed and some of those @@ -512,6 +548,15 @@ func TestReplicateQueueDecommissioningNonVoters(t *testing.T) { for _, repl := range scratchRange.Replicas().NonVoterDescriptors() { nonVoterNodeIDs = append(nonVoterNodeIDs, repl.NodeID) } + // Check metrics of leaseholder store prior to removal. + store, err := getLeaseholderStore(tc, scratchRange) + if err != nil { + t.Fatal(err) + } + previousRemovalCount := store.ReplicateQueueMetrics().RemoveNonVoterReplicaCount.Count() + previousDecommRemovals := + store.ReplicateQueueMetrics().RemoveDecommissioningNonVoterReplicaCount.Count() + require.NoError(t, tc.Server(0).Decommission(ctx, livenesspb.MembershipStatus_DECOMMISSIONING, nonVoterNodeIDs)) @@ -520,21 +565,50 @@ func TestReplicateQueueDecommissioningNonVoters(t *testing.T) { // replicateQueue on and ensure that these redundant non-voters are removed. tc.ToggleReplicateQueues(true) require.Eventually(t, func() bool { - ok, err := checkReplicaCount(ctx, tc, scratchRange, 1 /* voterCount */, 0 /* nonVoterCount */) + ok, err := checkReplicaCount(ctx, tc, &scratchRange, 1 /* voterCount */, 0 /* nonVoterCount */) if err != nil { log.Errorf(ctx, "error checking replica count: %s", err) return false } return ok }, testutils.DefaultSucceedsSoonDuration, 100*time.Millisecond) + + currentRemoveCount := store.ReplicateQueueMetrics().RemoveNonVoterReplicaCount.Count() + currentDecommRemovals := + store.ReplicateQueueMetrics().RemoveDecommissioningNonVoterReplicaCount.Count() + require.GreaterOrEqualf( + t, currentRemoveCount, previousRemovalCount+2, + "expected replica removals to increase by at least 2", + ) + require.GreaterOrEqualf( + t, currentDecommRemovals, previousDecommRemovals+2, + "expected replica removals to increase by at least 2", + ) }) } +// getLeaseholderStore returns the leaseholder store for the given scratchRange. +func getLeaseholderStore( + tc *testcluster.TestCluster, scratchRange roachpb.RangeDescriptor, +) (*kvserver.Store, error) { + leaseHolder, err := tc.FindRangeLeaseHolder(scratchRange, nil) + if err != nil { + return nil, err + } + leaseHolderSrv := tc.Servers[leaseHolder.NodeID-1] + store, err := leaseHolderSrv.Stores().GetStore(leaseHolder.StoreID) + if err != nil { + return nil, err + } + return store, nil +} + // TestReplicateQueueDeadNonVoters is an end to end test ensuring that // non-voting replicas on dead nodes are replaced or removed. func TestReplicateQueueDeadNonVoters(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + skip.WithIssue(t, 76996) skip.UnderRace(t, "takes a long time or times out under race") ctx := context.Background() @@ -545,7 +619,12 @@ func TestReplicateQueueDeadNonVoters(t *testing.T) { base.TestClusterArgs{ ReplicationMode: base.ReplicationAuto, ServerArgs: base.TestServerArgs{ + ScanMinIdleTime: time.Millisecond, + ScanMaxIdleTime: time.Millisecond, Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableReplicaRebalancing: true, + }, SpanConfig: &spanconfig.TestingKnobs{ ConfigureScratchRange: true, }, @@ -564,15 +643,19 @@ func TestReplicateQueueDeadNonVoters(t *testing.T) { }, }, ) + _, err := tc.ServerConn(0).Exec( + `SET CLUSTER SETTING server.failed_reservation_timeout='1ms'`) + require.NoError(t, err) + // Setup a scratch range on a test cluster with 2 non-voters and 1 voter. scratchKey := tc.ScratchRange(t) scratchRange := tc.LookupRangeOrFatal(t, scratchKey) - _, err := tc.ServerConn(0).Exec( + _, err = tc.ServerConn(0).Exec( `ALTER RANGE DEFAULT CONFIGURE ZONE USING num_replicas = 3, num_voters = 1`, ) require.NoError(t, err) require.Eventually(t, func() bool { - ok, err := checkReplicaCount(ctx, tc, scratchRange, 1 /* voterCount */, 2 /* nonVoterCount */) + ok, err := checkReplicaCount(ctx, tc, &scratchRange, 1 /* voterCount */, 2 /* nonVoterCount */) if err != nil { log.Errorf(ctx, "error checking replica count: %s", err) return false @@ -604,10 +687,20 @@ func TestReplicateQueueDeadNonVoters(t *testing.T) { tc, scratchRange := setupFn(t) defer tc.Stopper().Stop(ctx) + // Check the value of non-voter metrics from leaseholder store prior to removals. + store, err := getLeaseholderStore(tc, scratchRange) + if err != nil { + t.Fatal(err) + } + + prevAdditions := store.ReplicateQueueMetrics().AddNonVoterReplicaCount.Count() + prevRemovals := store.ReplicateQueueMetrics().RemoveNonVoterReplicaCount.Count() + prevDeadRemovals := store.ReplicateQueueMetrics().RemoveDeadNonVoterReplicaCount.Count() + beforeNodeIDs := getNonVoterNodeIDs(scratchRange) markDead(beforeNodeIDs) require.Eventually(t, func() bool { - ok, err := checkReplicaCount(ctx, tc, scratchRange, 1 /* voterCount */, 2 /* nonVoterCount */) + ok, err := checkReplicaCount(ctx, tc, &scratchRange, 1 /* voterCount */, 2 /* nonVoterCount */) if err != nil { log.Errorf(ctx, "error checking replica count: %s", err) return false @@ -628,6 +721,23 @@ func TestReplicateQueueDeadNonVoters(t *testing.T) { } return true }, testutils.DefaultSucceedsSoonDuration, 100*time.Millisecond) + + addCount := store.ReplicateQueueMetrics().AddNonVoterReplicaCount.Count() + removeNonVoterCount := store.ReplicateQueueMetrics().RemoveNonVoterReplicaCount.Count() + removeDeadNonVoterCount := store.ReplicateQueueMetrics().RemoveDeadNonVoterReplicaCount.Count() + + require.GreaterOrEqualf( + t, addCount, prevAdditions+2, + "expected replica removals to increase by at least 2", + ) + require.GreaterOrEqualf( + t, removeNonVoterCount, prevRemovals+2, + "expected replica removals to increase by at least 2", + ) + require.GreaterOrEqualf( + t, removeDeadNonVoterCount, prevDeadRemovals+2, + "expected replica removals to increase by at least 2", + ) }) // This subtest checks that when we have more non-voters than needed and some @@ -653,21 +763,213 @@ func TestReplicateQueueDeadNonVoters(t *testing.T) { "ALTER RANGE default CONFIGURE ZONE USING num_replicas = 1", ) require.NoError(t, err) + + // Check the value of non-voter metrics from leaseholder store prior to removals. + store, err := getLeaseholderStore(tc, scratchRange) + if err != nil { + t.Fatal(err) + } + prevRemovals := store.ReplicateQueueMetrics().RemoveReplicaCount.Count() + prevNonVoterRemovals := store.ReplicateQueueMetrics().RemoveNonVoterReplicaCount.Count() + prevDeadRemovals := store.ReplicateQueueMetrics().RemoveDeadNonVoterReplicaCount.Count() + beforeNodeIDs := getNonVoterNodeIDs(scratchRange) markDead(beforeNodeIDs) toggleReplicationQueues(tc, true) require.Eventually(t, func() bool { - ok, err := checkReplicaCount(ctx, tc, scratchRange, 1 /* voterCount */, 0 /* nonVoterCount */) + ok, err := checkReplicaCount(ctx, tc, &scratchRange, 1 /* voterCount */, 0 /* nonVoterCount */) if err != nil { log.Errorf(ctx, "error checking replica count: %s", err) return false } return ok }, testutils.DefaultSucceedsSoonDuration, 100*time.Millisecond) + + removeCount := store.ReplicateQueueMetrics().RemoveReplicaCount.Count() + removeNonVoterCount := store.ReplicateQueueMetrics().RemoveNonVoterReplicaCount.Count() + removeDeadNonVoterCount := store.ReplicateQueueMetrics().RemoveDeadNonVoterReplicaCount.Count() + require.GreaterOrEqualf( + t, removeCount, prevRemovals+2, + "expected replica removals to increase by at least 2", + ) + require.GreaterOrEqualf( + t, removeNonVoterCount, prevNonVoterRemovals+2, + "expected replica removals to increase by at least 2", + ) + require.GreaterOrEqualf( + t, removeDeadNonVoterCount, prevDeadRemovals+2, + "expected replica removals to increase by at least 2", + ) }) } +// TestReplicateQueueMetrics is an end-to-end test ensuring the replicateQueue +// voter replica metrics will be updated correctly during upreplication and downreplication. +func TestReplicateQueueMetrics(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + skip.UnderRace(t, "takes a long time or times out under race") + + ctx := context.Background() + var clusterArgs = base.TestClusterArgs{ + ReplicationMode: base.ReplicationAuto, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + DisableReplicaRebalancing: true, + }, + }, + }, + } + dbName := "testdb" + tableName := "kv" + numNodes := 3 + tc, scratchRange := setupTestClusterWithDummyRange(t, clusterArgs, dbName, tableName, numNodes) + defer tc.Stopper().Stop(ctx) + + // Check that the cluster is initialized correctly with 3 voters. + require.Eventually(t, func() bool { + ok, err := checkReplicaCount( + ctx, tc.(*testcluster.TestCluster), + &scratchRange, 3 /* voterCount */, 0, /* nonVoterCount */ + ) + if err != nil { + log.Errorf(ctx, "error checking replica count: %s", err) + return false + } + return ok + }, testutils.DefaultSucceedsSoonDuration, 100*time.Millisecond) + + // Get a map of voter replica store locations before the zone configuration change. + voterStores := getVoterStores(t, tc.(*testcluster.TestCluster), &scratchRange) + // Check the aggregated voter removal metrics across voter stores. + previousRemoveCount, previousRemoveVoterCount := getAggregateMetricCounts( + ctx, + tc.(*testcluster.TestCluster), + voterStores, + false, /* add */ + ) + + _, err := tc.ServerConn(0).Exec( + `ALTER TABLE testdb.kv CONFIGURE ZONE USING num_replicas = 1`, + ) + require.NoError(t, err) + require.Eventually( + t, func() bool { + ok, err := checkReplicaCount( + ctx, tc.(*testcluster.TestCluster), &scratchRange, 1, 0, + ) + if err != nil { + log.Errorf(ctx, "error checking replica count: %s", err) + return false + } + return ok + }, testutils.DefaultSucceedsSoonDuration, 100*time.Millisecond, + ) + + // Expect the new aggregated voter removal metrics across stores which had + // voters removed increase by at least 2. + currentRemoveCount, currentRemoveVoterCount := getAggregateMetricCounts( + ctx, + tc.(*testcluster.TestCluster), + voterStores, + false, /* add */ + ) + require.GreaterOrEqualf( + t, + currentRemoveCount, + previousRemoveCount+2, + "expected replica removals to increase by at least 2", + ) + require.GreaterOrEqualf( + t, + currentRemoveVoterCount, + previousRemoveVoterCount+2, + "expected replica removals to increase by at least 2", + ) + + scratchRange = tc.LookupRangeOrFatal(t, scratchRange.StartKey.AsRawKey()) + store, err := getLeaseholderStore(tc.(*testcluster.TestCluster), scratchRange) + if err != nil { + t.Fatal(err) + } + // Track add counts on leaseholder before upreplication. + previousAddCount := store.ReplicateQueueMetrics().AddReplicaCount.Count() + previousAddVoterCount := store.ReplicateQueueMetrics().AddVoterReplicaCount.Count() + + _, err = tc.ServerConn(0).Exec( + `ALTER TABLE testdb.kv CONFIGURE ZONE USING num_replicas = 3`, + ) + require.NoError(t, err) + require.Eventually(t, func() bool { + ok, err := checkReplicaCount( + ctx, tc.(*testcluster.TestCluster), &scratchRange, 3, 0, + ) + if err != nil { + log.Errorf(ctx, "error checking replica count: %s", err) + return false + } + return ok + }, testutils.DefaultSucceedsSoonDuration, 100*time.Millisecond) + + // Expect the aggregated voter add metrics across voter stores increase by at least 2. + voterMap := getVoterStores(t, tc.(*testcluster.TestCluster), &scratchRange) + currentAddCount, currentAddVoterCount := getAggregateMetricCounts( + ctx, + tc.(*testcluster.TestCluster), + voterMap, + true, /* add */ + ) + require.GreaterOrEqualf( + t, currentAddCount, previousAddCount+2, + "expected replica additions to increase by at least 2", + ) + require.GreaterOrEqualf( + t, currentAddVoterCount, previousAddVoterCount+2, + "expected voter additions to increase by at least 2", + ) +} + +// getVoterStores returns a mapping of voter nodeIDs to storeIDs. +func getVoterStores( + t *testing.T, tc *testcluster.TestCluster, rangeDesc *roachpb.RangeDescriptor, +) (storeMap map[roachpb.NodeID]roachpb.StoreID) { + *rangeDesc = tc.LookupRangeOrFatal(t, rangeDesc.StartKey.AsRawKey()) + voters := rangeDesc.Replicas().VoterDescriptors() + storeMap = make(map[roachpb.NodeID]roachpb.StoreID) + for i := 0; i < len(voters); i++ { + storeMap[voters[i].NodeID] = voters[i].StoreID + } + return storeMap +} + +// getAggregateMetricCounts adds metric counts from all stores in a given map. +// and returns the totals. +func getAggregateMetricCounts( + ctx context.Context, + tc *testcluster.TestCluster, + voterMap map[roachpb.NodeID]roachpb.StoreID, + add bool, +) (currentCount int64, currentVoterCount int64) { + for _, s := range tc.Servers { + if storeId, exists := voterMap[s.NodeID()]; exists { + store, err := s.Stores().GetStore(storeId) + if err != nil { + log.Errorf(ctx, "error finding store: %s", err) + continue + } + if add { + currentCount += store.ReplicateQueueMetrics().AddReplicaCount.Count() + currentVoterCount += store.ReplicateQueueMetrics().AddVoterReplicaCount.Count() + } else { + currentCount += store.ReplicateQueueMetrics().RemoveReplicaCount.Count() + currentVoterCount += store.ReplicateQueueMetrics().RemoveVoterReplicaCount.Count() + } + } + } + return currentCount, currentVoterCount +} func getNonVoterNodeIDs(rangeDesc roachpb.RangeDescriptor) (result []roachpb.NodeID) { for _, repl := range rangeDesc.Replicas().NonVoterDescriptors() { result = append(result, repl.NodeID) diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index d1e4b908083d..a2b70a25daf7 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2996,6 +2996,11 @@ func (s *Store) Metrics() *StoreMetrics { return s.metrics } +// ReplicateQueueMetrics returns the store's replicateQueue metric struct. +func (s *Store) ReplicateQueueMetrics() ReplicateQueueMetrics { + return s.replicateQueue.metrics +} + // Descriptor returns a StoreDescriptor including current store // capacity information. func (s *Store) Descriptor(ctx context.Context, useCached bool) (*roachpb.StoreDescriptor, error) { diff --git a/pkg/server/server.go b/pkg/server/server.go index 98df924dc4b0..4a2269d99d5e 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -417,7 +417,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { registry.AddMetricStruct(nodeLiveness.Metrics()) nodeLivenessFn := kvserver.MakeStorePoolNodeLivenessFunc(nodeLiveness) - if nodeLivenessKnobs, ok := cfg.TestingKnobs.Store.(*kvserver.NodeLivenessTestingKnobs); ok && + if nodeLivenessKnobs, ok := cfg.TestingKnobs.NodeLiveness.(kvserver.NodeLivenessTestingKnobs); ok && nodeLivenessKnobs.StorePoolNodeLivenessFn != nil { nodeLivenessFn = nodeLivenessKnobs.StorePoolNodeLivenessFn } diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index d14ed526a22a..7192154bfec7 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -695,7 +695,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { HistogramWindowInterval: cfg.HistogramWindowInterval(), RangeDescriptorCache: cfg.distSender.RangeDescriptorCache(), RoleMemberCache: sql.NewMembershipCache(serverCacheMemoryMonitor.MakeBoundAccount()), - SessionInitCache: sessioninit.NewCache(serverCacheMemoryMonitor.MakeBoundAccount()), + SessionInitCache: sessioninit.NewCache(serverCacheMemoryMonitor.MakeBoundAccount(), cfg.stopper), RootMemoryMonitor: rootSQLMemoryMonitor, TestingKnobs: sqlExecutorTestingKnobs, CompactEngineSpanFunc: compactEngineSpanFunc, diff --git a/pkg/sql/sessioninit/BUILD.bazel b/pkg/sql/sessioninit/BUILD.bazel index 0d0935bb2fef..7efe6aedbab9 100644 --- a/pkg/sql/sessioninit/BUILD.bazel +++ b/pkg/sql/sessioninit/BUILD.bazel @@ -45,6 +45,9 @@ go_library( "//pkg/sql/sqlutil", "//pkg/util/log", "//pkg/util/mon", + "//pkg/util/stop", "//pkg/util/syncutil", + "//pkg/util/syncutil/singleflight", + "@com_github_cockroachdb_logtags//:logtags", ], ) diff --git a/pkg/sql/sessioninit/cache.go b/pkg/sql/sessioninit/cache.go index 4efd1b21ef58..5d0263a840ca 100644 --- a/pkg/sql/sessioninit/cache.go +++ b/pkg/sql/sessioninit/cache.go @@ -12,6 +12,7 @@ package sessioninit import ( "context" + "fmt" "unsafe" "github.com/cockroachdb/cockroach/pkg/kv" @@ -25,7 +26,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight" + "github.com/cockroachdb/logtags" ) // CacheEnabledSettingName is the name of the CacheEnabled cluster setting. @@ -53,6 +57,10 @@ type Cache struct { authInfoCache map[security.SQLUsername]AuthInfo // settingsCache is a mapping from (dbID, username) to default settings. settingsCache map[SettingsCacheKey][]string + // populateCacheGroup is used to ensure that there is at most one in-flight + // request for populating each cache entry. + populateCacheGroup singleflight.Group + stopper *stop.Stopper } // AuthInfo contains data that is used to perform an authentication attempt. @@ -83,9 +91,10 @@ type SettingsCacheEntry struct { } // NewCache initializes a new sessioninit.Cache. -func NewCache(account mon.BoundAccount) *Cache { +func NewCache(account mon.BoundAccount, stopper *stop.Stopper) *Cache { return &Cache{ boundAccount: account, + stopper: stopper, } } @@ -159,16 +168,19 @@ func (a *Cache) GetAuthInfo( return nil } - // Lookup the data outside the lock. - aInfo, err = readFromSystemTables( - ctx, - txn, - ie, - username, - ) + // Lookup the data outside the lock, with at most one request in-flight + // for each user. The user and role_options table versions are also part + // of the request key so that we don't read data from an old version + // of either table. + val, err := a.loadCacheValue( + ctx, fmt.Sprintf("authinfo-%s-%d-%d", username.Normalized(), usersTableVersion, roleOptionsTableVersion), + func(loadCtx context.Context) (interface{}, error) { + return readFromSystemTables(loadCtx, txn, ie, username) + }) if err != nil { return err } + aInfo = val.(AuthInfo) finishedLoop := a.writeAuthInfoBackToCache( ctx, @@ -200,6 +212,33 @@ func (a *Cache) readAuthInfoFromCache( return ai, foundAuthInfo } +// loadCacheValue loads the value for the given requestKey using the provided +// function. It ensures that there is only at most one in-flight request for +// each key at any time. +func (a *Cache) loadCacheValue( + ctx context.Context, requestKey string, fn func(loadCtx context.Context) (interface{}, error), +) (interface{}, error) { + ch, _ := a.populateCacheGroup.DoChan(requestKey, func() (interface{}, error) { + // Use a different context to fetch, so that it isn't possible for + // one query to timeout and cause all the goroutines that are waiting + // to get a timeout error. + loadCtx, cancel := a.stopper.WithCancelOnQuiesce( + logtags.WithTags(context.Background(), logtags.FromContext(ctx)), + ) + defer cancel() + return fn(loadCtx) + }) + select { + case res := <-ch: + if res.Err != nil { + return AuthInfo{}, res.Err + } + return res.Val, nil + case <-ctx.Done(): + return AuthInfo{}, ctx.Err() + } +} + // writeAuthInfoBackToCache tries to put the fetched AuthInfo into the // authInfoCache, and returns true if it succeeded. If the underlying system // tables have been modified since they were read, the authInfoCache is not @@ -318,17 +357,20 @@ func (a *Cache) GetDefaultSettings( return nil } - // Lookup the data outside the lock. - settingsEntries, err = readFromSystemTables( - ctx, - txn, - ie, - username, - databaseID, + // Lookup the data outside the lock, with at most one request in-flight + // for each user+database. The db_role_settings table version is also part + // of the request key so that we don't read data from an old version + // of the table. + val, err := a.loadCacheValue( + ctx, fmt.Sprintf("defaultsettings-%s-%d-%d", username.Normalized(), databaseID, dbRoleSettingsTableVersion), + func(loadCtx context.Context) (interface{}, error) { + return readFromSystemTables(loadCtx, txn, ie, username, databaseID) + }, ) if err != nil { return err } + settingsEntries = val.([]SettingsCacheEntry) finishedLoop := a.writeDefaultSettingsBackToCache( ctx, diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index a7654e7b7fb6..cebdd60791f0 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -1648,8 +1648,12 @@ var charts = []sectionDescription{ Organization: [][]string{{ReplicationLayer, "Replicate Queue"}}, Charts: []chartDescription{ { - Title: "Add Replica Count", - Metrics: []string{"queue.replicate.addreplica"}, + Title: "Add Replica Count", + Metrics: []string{ + "queue.replicate.addreplica", + "queue.replicate.addvoterreplica", + "queue.replicate.addnonvoterreplica", + }, }, { Title: "Lease Transfer Count", @@ -1664,8 +1668,12 @@ var charts = []sectionDescription{ Metrics: []string{"queue.replicate.purgatory"}, }, { - Title: "Rebalance Count", - Metrics: []string{"queue.replicate.rebalancereplica"}, + Title: "Rebalance Count", + Metrics: []string{ + "queue.replicate.rebalancereplica", + "queue.replicate.rebalancevoterreplica", + "queue.replicate.rebalancenonvoterreplica", + }, }, { Title: "Demotions of Voters to Non Voters", @@ -1679,8 +1687,15 @@ var charts = []sectionDescription{ Title: "Remove Replica Count", Metrics: []string{ "queue.replicate.removedeadreplica", + "queue.replicate.removedeadvoterreplica", + "queue.replicate.removedeadnonvoterreplica", "queue.replicate.removereplica", + "queue.replicate.removevoterreplica", + "queue.replicate.removenonvoterreplica", "queue.replicate.removelearnerreplica", + "queue.replicate.removedecommissioningreplica", + "queue.replicate.removedecommissioningvoterreplica", + "queue.replicate.removedecommissioningnonvoterreplica", }, }, { diff --git a/vendor b/vendor index 165025f03bad..5d616a24eadd 160000 --- a/vendor +++ b/vendor @@ -1 +1 @@ -Subproject commit 165025f03bad97bd7f3637a57eefbd9aedf043d4 +Subproject commit 5d616a24eadd1735d65232e38ea2be56ad0e79d3