Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: use replicasByKey addition func in snapshot path #96870

Merged
merged 4 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ func (s *Store) FindTargetAndTransferLease(
func (s *Store) AddReplica(repl *Replica) error {
s.mu.Lock()
defer s.mu.Unlock()
repl.mu.RLock()
defer repl.mu.RUnlock()
if err := s.addToReplicasByRangeIDLocked(repl); err != nil {
return err
}
if err := s.addToReplicasByKeyLocked(repl); err != nil {
} else if err := s.addToReplicasByKeyLockedReplicaRLocked(repl); err != nil {
return err
}
s.metrics.ReplicaCount.Inc(1)
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,8 @@ type Replica struct {
// mergeTxnID contains the ID of the in-progress merge transaction, if a
// merge is currently in progress. Otherwise, the ID is empty.
mergeTxnID uuid.UUID
// The state of the Raft state machine.
// The state of the Raft state machine. Updated only when raftMu and mu are
// both held.
state kvserverpb.ReplicaState
// Last index/term written to the raft log (not necessarily durable locally
// or committed by the group). Note that lastTermNotDurable may be 0 (and
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestStoreCheckpointSpans(t *testing.T) {
r.isInitialized.Set(desc.IsInitialized())
require.NoError(t, s.addToReplicasByRangeIDLocked(r))
if r.IsInitialized() {
require.NoError(t, s.addToReplicasByKeyLocked(r))
require.NoError(t, s.addToReplicasByKeyLockedReplicaRLocked(r))
descs = append(descs, desc)
}
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1879,7 +1879,11 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
// TODO(pavelkalinnikov): hide these in Store's replica create functions.
err = s.addToReplicasByRangeIDLocked(rep)
if err == nil {
err = s.addToReplicasByKeyLocked(rep)
// NB: no locking of the Replica is needed since it's being created, but
// just in case.
rep.mu.RLock()
err = s.addToReplicasByKeyLockedReplicaRLocked(rep)
rep.mu.RUnlock()
}
s.mu.Unlock()
if err != nil {
Expand Down
42 changes: 17 additions & 25 deletions pkg/kv/kvserver/store_create_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,28 +249,26 @@ func fromReplicaIsTooOldRLocked(toReplica *Replica, fromReplica *roachpb.Replica
return !found && fromReplica.ReplicaID < desc.NextReplicaID
}

// addToReplicasByKeyLocked adds the replica to the replicasByKey btree. The
// replica must already be in replicasByRangeID. Requires that Store.mu is held.
//
// Returns an error if a different replica with the same range ID, or an
// overlapping replica or placeholder exists in this Store.
func (s *Store) addToReplicasByKeyLocked(repl *Replica) error {
if !repl.IsInitialized() {
return errors.Errorf("attempted to add uninitialized replica %s", repl)
// addToReplicasByKeyLockedReplicaRLocked adds the replica to the replicasByKey
// btree. The replica must already be in replicasByRangeID. Returns an error if
// a different replica with the same range ID, or an overlapping replica or
// placeholder exists in this Store. Replica.mu must be at least read-locked.
func (s *Store) addToReplicasByKeyLockedReplicaRLocked(repl *Replica) error {
desc := repl.descRLocked()
if !desc.IsInitialized() {
return errors.Errorf("%s: attempted to add uninitialized replica %s", s, repl)
}
if got := s.GetReplicaIfExists(repl.RangeID); got != repl { // NB: got can be nil too
return errors.Errorf("replica %s not in replicasByRangeID; got %s", repl, got)
return errors.Errorf("%s: replica %s not in replicasByRangeID; got %s", s, repl, got)
}

if it := s.getOverlappingKeyRangeLocked(repl.Desc()); it.item != nil {
return errors.Errorf("%s: cannot addToReplicasByKeyLocked; range %s has overlapping range %s", s, repl, it.Desc())
if it := s.getOverlappingKeyRangeLocked(desc); it.item != nil {
return errors.Errorf(
"%s: cannot add to replicasByKey: range %s overlaps with %s", s, repl, it.Desc())
}

if it := s.mu.replicasByKey.ReplaceOrInsertReplica(context.Background(), repl); it.item != nil {
return errors.Errorf("%s: cannot addToReplicasByKeyLocked; range for key %v already exists in replicasByKey btree", s,
it.item.key())
return errors.Errorf(
"%s: cannot add to replicasByKey: key %v already exists in the btree", s, it.item.key())
}

return nil
}

Expand Down Expand Up @@ -319,16 +317,11 @@ func (s *Store) maybeMarkReplicaInitializedLockedReplLocked(
}
delete(s.mu.uninitReplicas, rangeID)

if it := s.getOverlappingKeyRangeLocked(desc); it.item != nil {
return errors.AssertionFailedf("%s: cannot initialize replica; %s has overlapping range %s",
s, desc, it.Desc())
}

// Copy of the start key needs to be set before inserting into replicasByKey.
lockedRepl.setStartKeyLocked(desc.StartKey)
if it := s.mu.replicasByKey.ReplaceOrInsertReplica(ctx, lockedRepl); it.item != nil {
return errors.AssertionFailedf("range for key %v already exists in replicasByKey btree: %+v",
it.item.key(), it)

if err := s.addToReplicasByKeyLockedReplicaRLocked(lockedRepl); err != nil {
return err
}

// Unquiesce the replica. We don't allow uninitialized replicas to unquiesce,
Expand Down Expand Up @@ -357,7 +350,6 @@ func (s *Store) maybeMarkReplicaInitializedLockedReplLocked(
if !lockedRepl.maybeUnquiesceWithOptionsLocked(false /* campaignOnWake */) {
return errors.AssertionFailedf("expected replica %s to unquiesce after initialization", desc)
}

// Add the range to metrics and maybe gossip on capacity change.
s.metrics.ReplicaCount.Inc(1)
s.storeGossip.MaybeGossipOnCapacityChange(ctx, RangeAddEvent)
Expand Down
43 changes: 21 additions & 22 deletions pkg/kv/kvserver/store_split.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,34 +239,29 @@ func prepareRightReplicaForSplit(
minValidObservedTS := r.mu.minValidObservedTimestamp
r.mu.RUnlock()

// The right hand side of the split was already created (and its raftMu
// acquired) in Replica.acquireSplitLock. It must be present here.
// If the RHS replica of the split is not removed, then it has been obtained
// (and its raftMu acquired) in Replica.acquireSplitLock.
rightRepl := r.store.GetReplicaIfExists(split.RightDesc.RangeID)
// If the RHS replica at the point of the split was known to be removed
// during the application of the split then we may not find it here. That's
// fine, carry on. See also:
// If the RHS replica of the split has been removed then we either not find it
// here, or find a one with a later replica ID. In this case we also know that
// its data has already been removed by splitPreApply, so we skip initializing
// this replica. See also:
_, _ = r.acquireSplitLock, splitPostApply
if rightRepl == nil {
if rightRepl == nil || rightRepl.isNewerThanSplit(split) {
return nil
}
// Finish initialization of the RHS replica.

state, err := kvstorage.LoadReplicaState(
ctx, r.Engine(), r.StoreID(), &split.RightDesc, rightRepl.replicaID)
if err != nil {
log.Fatalf(ctx, "%v", err)
}

// Already holding raftMu, see above.
rightRepl.mu.Lock()
defer rightRepl.mu.Unlock()

// If we know that the RHS has already been removed at this replica ID
// then we also know that its data has already been removed by the preApply
// so we skip initializing it as the RHS of the split.
if rightRepl.isNewerThanSplit(split) {
return nil
}

// Finish initialization of the RHS.
if state, err := kvstorage.LoadReplicaState(
ctx, r.Engine(), r.StoreID(), &split.RightDesc, rightRepl.replicaID,
); err != nil {
log.Fatalf(ctx, "%v", err)
} else if err := rightRepl.initRaftMuLockedReplicaMuLocked(state); err != nil {
if err := rightRepl.initRaftMuLockedReplicaMuLocked(state); err != nil {
log.Fatalf(ctx, "%v", err)
}

Expand Down Expand Up @@ -355,16 +350,20 @@ func (s *Store) SplitRange(
// assumption that distribution across all tracked load stats is
// identical.
leftRepl.loadStats.Split(rightRepl.loadStats)
if err := s.addToReplicasByKeyLocked(rightRepl); err != nil {
rightRepl.mu.RLock()
pav-kv marked this conversation as resolved.
Show resolved Hide resolved
if err := s.addToReplicasByKeyLockedReplicaRLocked(rightRepl); err != nil {
rightRepl.mu.RUnlock()
return errors.Wrapf(err, "unable to add replica %v", rightRepl)
}
rightRepl.mu.RUnlock()

// Update the replica's cached byte thresholds. This is a no-op if the system
// config is not available, in which case we rely on the next gossip update
// to perform the update.
if err := rightRepl.updateRangeInfo(ctx, rightRepl.Desc()); err != nil {
if err := rightRepl.updateRangeInfo(ctx, rightDesc); err != nil {
return err
}

// Add the range to metrics and maybe gossip on capacity change.
s.metrics.ReplicaCount.Inc(1)
s.storeGossip.MaybeGossipOnCapacityChange(ctx, RangeAddEvent)
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,9 +665,9 @@ func TestReplicasByKey(t *testing.T) {
expectedErrorOnAdd string
}{
// [a,c) is contained in [KeyMin, e)
{nil, 2, roachpb.RKey("a"), roachpb.RKey("c"), ".*has overlapping range"},
{nil, 2, roachpb.RKey("a"), roachpb.RKey("c"), "overlaps with"},
// [c,f) partially overlaps with [KeyMin, e)
{nil, 3, roachpb.RKey("c"), roachpb.RKey("f"), ".*has overlapping range"},
{nil, 3, roachpb.RKey("c"), roachpb.RKey("f"), "overlaps with"},
// [e, f) is disjoint from [KeyMin, e)
{nil, 4, roachpb.RKey("e"), roachpb.RKey("f"), ""},
}
Expand Down Expand Up @@ -875,8 +875,9 @@ func TestMaybeMarkReplicaInitialized(t *testing.T) {
}()

store.mu.uninitReplicas[newRangeID] = r
require.NoError(t, store.addToReplicasByRangeIDLocked(r))

expectedResult = ".*cannot initialize replica.*"
expectedResult = "overlaps with"
func() {
r.mu.Lock()
defer r.mu.Unlock()
Expand Down