Skip to content

Commit

Permalink
Merge #94943
Browse files Browse the repository at this point in the history
94943: kvserver: don't load uninitialized replicas r=tbg a=pavelkalinnikov

This change stops using `loadRaftMuLockedReplicaMuLocked` when creating an uninitialized replica, to make it clearer what happens in this case. It turns out that what it did for uninitialized desc is equivalent to setting an empty `ReplicaState` and asserting the match between in-memory and on-disk state.

Informs #94912, #93898
Epic: CRDB-220
Release note: None

Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
craig[bot] and pav-kv committed Jan 16, 2023
2 parents 4ed157d + 7edd80c commit 761cf72
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 66 deletions.
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ type Replica struct {
// Note that there are two StateLoaders, in raftMu and mu,
// depending on which lock is being held.
stateLoader stateloader.StateLoader
// on-disk storage for sideloaded SSTables. nil when there's no ReplicaID.
// on-disk storage for sideloaded SSTables. Always non-nil.
// TODO(pavelkalinnikov): remove sideloaded == nil checks.
sideloaded logstore.SideloadStorage
// stateMachine is used to apply committed raft entries.
stateMachine replicaStateMachine
Expand Down
97 changes: 55 additions & 42 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,39 +47,50 @@ const (
func newReplica(
ctx context.Context, desc *roachpb.RangeDescriptor, store *Store, replicaID roachpb.ReplicaID,
) (*Replica, error) {
repl := newUnloadedReplica(ctx, desc, store, replicaID)
repl := newUnloadedReplica(ctx, desc.RangeID, store, replicaID)
repl.raftMu.Lock()
defer repl.raftMu.Unlock()
repl.mu.Lock()
defer repl.mu.Unlock()

// TODO(pavelkalinnikov): this path is taken only in tests. Remove it and
// assert desc.IsInitialized().
if !desc.IsInitialized() {
repl.assertStateRaftMuLockedReplicaMuRLocked(ctx, store.Engine())
return repl, nil
}

if err := repl.loadRaftMuLockedReplicaMuLocked(desc); err != nil {
return nil, err
}
return repl, nil
}

// newUnloadedReplica partially constructs a replica. The primary reason this
// function exists separately from Replica.loadRaftMuLockedReplicaMuLocked() is
// to avoid attempting to fully constructing a Replica prior to proving that it
// can exist during the delicate synchronization dance that occurs in
// newUnloadedReplica partially constructs a Replica. The returned replica is
// assumed to be uninitialized, until Replica.loadRaftMuLockedReplicaMuLocked()
// is called with the correct descriptor. The primary reason this function
// exists separately from Replica.loadRaftMuLockedReplicaMuLocked() is to avoid
// attempting to fully construct a Replica and load it from storage prior to
// proving that it can exist during the delicate synchronization dance in
// Store.tryGetOrCreateReplica(). A Replica returned from this function must not
// be used in any way until it's load() method has been called.
// be used in any way until the load method has been called.
func newUnloadedReplica(
ctx context.Context, desc *roachpb.RangeDescriptor, store *Store, replicaID roachpb.ReplicaID,
ctx context.Context, rangeID roachpb.RangeID, store *Store, replicaID roachpb.ReplicaID,
) *Replica {
if replicaID == 0 {
log.Fatalf(ctx, "cannot construct a replica for range %d with a 0 replica ID", desc.RangeID)
log.Fatalf(ctx, "cannot construct a replica for range %d with a 0 replica ID", rangeID)
}
uninitState := stateloader.UninitializedReplicaState(rangeID)
r := &Replica{
AmbientContext: store.cfg.AmbientCtx,
RangeID: desc.RangeID,
RangeID: rangeID,
replicaID: replicaID,
creationTime: timeutil.Now(),
store: store,
abortSpan: abortspan.New(desc.RangeID),
abortSpan: abortspan.New(rangeID),
concMgr: concurrency.NewManager(concurrency.Config{
NodeDesc: store.nodeDesc,
RangeDesc: desc,
RangeDesc: uninitState.Desc,
Settings: store.ClusterSettings(),
DB: store.DB(),
Clock: store.Clock(),
Expand All @@ -91,8 +102,10 @@ func newUnloadedReplica(
TxnWaitKnobs: store.TestingKnobs().TxnWaitKnobs,
}),
}
r.sideTransportClosedTimestamp.init(store.cfg.ClosedTimestampReceiver, rangeID)

r.mu.pendingLeaseRequest = makePendingLeaseRequest(r)
r.mu.stateLoader = stateloader.Make(desc.RangeID)
r.mu.stateLoader = stateloader.Make(rangeID)
r.mu.quiescent = true
r.mu.conf = store.cfg.DefaultSpanConfig
split.Init(&r.loadBasedSplitter, store.cfg.Settings, split.GlobalRandSource(), func() float64 {
Expand All @@ -114,15 +127,24 @@ func newUnloadedReplica(
r.loadStats = load.NewReplicaLoad(store.Clock(), store.cfg.StorePool.GetNodeLocalityString)
}

// Init rangeStr with the range ID.
r.rangeStr.store(replicaID, &roachpb.RangeDescriptor{RangeID: desc.RangeID})
// NB: the state will be loaded when the replica gets initialized.
r.mu.state = uninitState
r.rangeStr.store(replicaID, uninitState.Desc)
// Add replica log tag - the value is rangeStr.String().
r.AmbientContext.AddLogTag("r", &r.rangeStr)
r.raftCtx = logtags.AddTag(r.AnnotateCtx(context.Background()), "raft", nil /* value */)
// Add replica pointer value. NB: this was historically useful for debugging
// replica GC issues, but is a distraction at the moment.
// r.AmbientContext.AddLogTag("@", fmt.Sprintf("%x", unsafe.Pointer(r)))
r.raftMu.stateLoader = stateloader.Make(desc.RangeID)

r.raftMu.stateLoader = stateloader.Make(rangeID)
r.raftMu.sideloaded = logstore.NewDiskSideloadStorage(
store.cfg.Settings,
rangeID,
store.engine.GetAuxiliaryDir(),
store.limiters.BulkIOWriteRate,
store.engine,
)

r.splitQueueThrottle = util.Every(splitQueueThrottleDuration)
r.mergeQueueThrottle = util.Every(mergeQueueThrottleDuration)
Expand Down Expand Up @@ -155,28 +177,27 @@ func (r *Replica) setStartKeyLocked(startKey roachpb.RKey) {
r.startKey = startKey
}

// loadRaftMuLockedReplicaMuLocked will load the state of the replica from disk.
// If desc is initialized, the Replica will be initialized when this method
// returns. An initialized Replica may not be reloaded. If this method is called
// with an uninitialized desc it may be called again later with an initialized
// desc.
// loadRaftMuLockedReplicaMuLocked loads the state of the initialized replica
// from storage. After this method returns, Replica is initialized, and can not
// be loaded again.
//
// This method is called in three places:
// This method is called in two places:
//
// 1. newReplica - used when the store is initializing and during testing
// 2. tryGetOrCreateReplica - see newUnloadedReplica
// 3. splitPostApply - this call initializes a previously uninitialized Replica.
// 2. splitPostApply - this call initializes a previously uninitialized Replica.
func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor) error {
ctx := r.AnnotateCtx(context.TODO())
if r.mu.state.Desc != nil && r.IsInitialized() {
log.Fatalf(ctx, "r%d: cannot reinitialize an initialized replica", desc.RangeID)
if !desc.IsInitialized() {
return errors.AssertionFailedf("r%d: cannot load an uninitialized replica", desc.RangeID)
}
if r.IsInitialized() {
return errors.AssertionFailedf("r%d: cannot reinitialize an initialized replica", desc.RangeID)
} else if r.replicaID == 0 {
// NB: This is just a defensive check as r.mu.replicaID should never be 0.
log.Fatalf(ctx, "r%d: cannot initialize replica without a replicaID", desc.RangeID)
}
if desc.IsInitialized() {
r.setStartKeyLocked(desc.StartKey)
return errors.AssertionFailedf("r%d: cannot initialize replica without a replicaID",
desc.RangeID)
}
r.setStartKeyLocked(desc.StartKey)

// Clear the internal raft group in case we're being reset. Since we're
// reloading the raft state below, it isn't safe to use the existing raft
Expand All @@ -195,14 +216,15 @@ func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor)

// Ensure that we're not trying to load a replica with a different ID than
// was used to construct this Replica.
replicaID := r.replicaID
var replicaID roachpb.ReplicaID
if replicaDesc, found := r.mu.state.Desc.GetReplicaDescriptor(r.StoreID()); found {
replicaID = replicaDesc.ReplicaID
} else if desc.IsInitialized() {
log.Fatalf(ctx, "r%d: cannot initialize replica which is not in descriptor %v", desc.RangeID, desc)
} else {
return errors.AssertionFailedf("r%d: cannot initialize replica which is not in descriptor %v",
desc.RangeID, desc)
}
if r.replicaID != replicaID {
log.Fatalf(ctx, "attempting to initialize a replica which has ID %d with ID %d",
return errors.AssertionFailedf("attempting to initialize a replica which has ID %d with ID %d",
r.replicaID, replicaID)
}

Expand All @@ -220,17 +242,8 @@ func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor)
r.mu.minLeaseProposedTS = r.Clock().NowAsClockTimestamp()
}

r.raftMu.sideloaded = logstore.NewDiskSideloadStorage(
r.store.cfg.Settings,
desc.RangeID,
r.Engine().GetAuxiliaryDir(),
r.store.limiters.BulkIOWriteRate,
r.store.engine,
)
r.assertStateRaftMuLockedReplicaMuRLocked(ctx, r.store.Engine())

r.sideTransportClosedTimestamp.init(r.store.cfg.ClosedTimestampReceiver, desc.RangeID)

return nil
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/stateloader/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ go_library(
go_test(
name = "stateloader_test",
size = "small",
srcs = ["initial_test.go"],
srcs = [
"initial_test.go",
"stateloader_test.go",
],
args = ["-test.timeout=55s"],
embed = [":stateloader"],
deps = [
Expand All @@ -34,6 +37,7 @@ go_test(
"//pkg/testutils",
"//pkg/util/leaktest",
"//pkg/util/stop",
"@com_github_stretchr_testify//require",
"@io_etcd_go_raft_v3//raftpb",
],
)
Expand Down
14 changes: 14 additions & 0 deletions pkg/kv/kvserver/stateloader/stateloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,20 @@ func (rsl StateLoader) SetVersion(
hlc.Timestamp{}, hlc.ClockTimestamp{}, nil, version)
}

// UninitializedReplicaState returns the ReplicaState of an uninitialized
// Replica with the given range ID. It is equivalent to StateLoader.Load from an
// empty storage.
func UninitializedReplicaState(rangeID roachpb.RangeID) kvserverpb.ReplicaState {
return kvserverpb.ReplicaState{
Desc: &roachpb.RangeDescriptor{RangeID: rangeID},
Lease: &roachpb.Lease{},
TruncatedState: &roachpb.RaftTruncatedState{},
GCThreshold: &hlc.Timestamp{},
Stats: &enginepb.MVCCStats{},
GCHint: &roachpb.GCHint{},
}
}

// The rest is not technically part of ReplicaState.

// SynthesizeRaftState creates a Raft state which synthesizes both a HardState
Expand Down
30 changes: 30 additions & 0 deletions pkg/kv/kvserver/stateloader/stateloader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package stateloader

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/stretchr/testify/require"
)

func TestUninitializedReplicaState(t *testing.T) {
eng := storage.NewDefaultInMemForTesting()
defer eng.Close()
desc := roachpb.RangeDescriptor{RangeID: 123}
exp, err := Make(desc.RangeID).Load(context.Background(), eng, &desc)
require.NoError(t, err)
act := UninitializedReplicaState(desc.RangeID)
require.Equal(t, exp, act)
}
25 changes: 3 additions & 22 deletions pkg/kv/kvserver/store_create_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,34 +222,14 @@ func (s *Store) tryGetOrCreateReplica(
}

// Create a new uninitialized replica and lock it for raft processing.
// TODO(pavelkalinnikov): consolidate an uninitialized Replica creation into a
// single function, now that it is sequential.
uninitializedDesc := &roachpb.RangeDescriptor{
RangeID: rangeID,
// NB: other fields are unknown; need to populate them from
// snapshot.
}
repl := newUnloadedReplica(ctx, uninitializedDesc, s, replicaID)
repl := newUnloadedReplica(ctx, rangeID, s, replicaID)
repl.raftMu.Lock() // not unlocked
// Take out read-only lock. Not strictly necessary here, but follows the
// normal lock protocol for destroyStatus.Set().
repl.readOnlyCmdMu.Lock()
// Grab the internal Replica state lock to ensure nobody mucks with our
// replica even outside of raft processing.
repl.mu.Lock()

// NB: A Replica should never be in the store's replicas map with a nil
// descriptor. Assign it directly here. In the case that the Replica should
// exist (which we confirm with another check of the Tombstone below), we'll
// re-initialize the replica with the same uninitializedDesc.
//
// During short window between here and call to s.unlinkReplicaByRangeIDLocked()
// in the failure branch below, the Replica used to have a nil descriptor and
// was present in the map. While it was the case that the destroy status had
// been set, not every code path which inspects the descriptor checks the
// destroy status.
repl.mu.state.Desc = uninitializedDesc

// Initialize the Replica with the replicaID.
if err := func() error {
// An uninitialized replica should have an empty HardState.Commit at
Expand Down Expand Up @@ -314,7 +294,8 @@ func (s *Store) tryGetOrCreateReplica(
return err
}

return repl.loadRaftMuLockedReplicaMuLocked(uninitializedDesc)
repl.assertStateRaftMuLockedReplicaMuRLocked(ctx, s.Engine())
return nil
}(); err != nil {
// Mark the replica as destroyed and remove it from the replicas maps to
// ensure nobody tries to use it.
Expand Down

0 comments on commit 761cf72

Please sign in to comment.