diff --git a/pkg/kv/kvserver/allocator/storepool/BUILD.bazel b/pkg/kv/kvserver/allocator/storepool/BUILD.bazel index 5264758d4e4e..e44210e9d9d7 100644 --- a/pkg/kv/kvserver/allocator/storepool/BUILD.bazel +++ b/pkg/kv/kvserver/allocator/storepool/BUILD.bazel @@ -4,6 +4,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "storepool", srcs = [ + "override_store_pool.go", "store_pool.go", "test_helpers.go", ], @@ -35,7 +36,10 @@ go_library( go_test( name = "storepool_test", - srcs = ["store_pool_test.go"], + srcs = [ + "override_store_pool_test.go", + "store_pool_test.go", + ], args = ["-test.timeout=295s"], embed = [":storepool"], deps = [ diff --git a/pkg/kv/kvserver/allocator/storepool/override_store_pool.go b/pkg/kv/kvserver/allocator/storepool/override_store_pool.go new file mode 100644 index 000000000000..cdd3d2f2a07f --- /dev/null +++ b/pkg/kv/kvserver/allocator/storepool/override_store_pool.go @@ -0,0 +1,172 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storepool + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/hlc" +) + +// OverrideStorePool is an implementation of AllocatorStorePool that allows +// the ability to override a node's liveness status for the purposes of +// evaluation for the allocator, otherwise delegating to an actual StorePool +// for all its logic, including management and lookup of store descriptors. +// +// The OverrideStorePool is meant to provide a read-only overlay to an +// StorePool, and as such, read-only methods are dispatched to the underlying +// StorePool using the configured NodeLivenessFunc override. Methods that +// mutate the state of the StorePool such as UpdateLocalStoreAfterRebalance +// are instead no-ops. +// +// NB: Despite the fact that StorePool.DetailsMu is held in write mode in +// some of the dispatched functions, these do not mutate the state of the +// underlying StorePool. +type OverrideStorePool struct { + sp *StorePool + + overrideNodeLivenessFn NodeLivenessFunc +} + +var _ AllocatorStorePool = &OverrideStorePool{} + +func NewOverrideStorePool(storePool *StorePool, nl NodeLivenessFunc) *OverrideStorePool { + return &OverrideStorePool{ + sp: storePool, + overrideNodeLivenessFn: nl, + } +} + +func (o *OverrideStorePool) String() string { + return o.sp.statusString(o.overrideNodeLivenessFn) +} + +// IsStoreReadyForRoutineReplicaTransfer implements the AllocatorStorePool interface. +func (o *OverrideStorePool) IsStoreReadyForRoutineReplicaTransfer( + ctx context.Context, targetStoreID roachpb.StoreID, +) bool { + return o.sp.isStoreReadyForRoutineReplicaTransferInternal(ctx, targetStoreID, o.overrideNodeLivenessFn) +} + +// DecommissioningReplicas implements the AllocatorStorePool interface. +func (o *OverrideStorePool) DecommissioningReplicas( + repls []roachpb.ReplicaDescriptor, +) []roachpb.ReplicaDescriptor { + return o.sp.decommissioningReplicasWithLiveness(repls, o.overrideNodeLivenessFn) +} + +// GetStoreList implements the AllocatorStorePool interface. +func (o *OverrideStorePool) GetStoreList( + filter StoreFilter, +) (StoreList, int, ThrottledStoreReasons) { + o.sp.DetailsMu.Lock() + defer o.sp.DetailsMu.Unlock() + + var storeIDs roachpb.StoreIDSlice + for storeID := range o.sp.DetailsMu.StoreDetails { + storeIDs = append(storeIDs, storeID) + } + return o.sp.getStoreListFromIDsLocked(storeIDs, o.overrideNodeLivenessFn, filter) +} + +// GetStoreListFromIDs implements the AllocatorStorePool interface. +func (o *OverrideStorePool) GetStoreListFromIDs( + storeIDs roachpb.StoreIDSlice, filter StoreFilter, +) (StoreList, int, ThrottledStoreReasons) { + o.sp.DetailsMu.Lock() + defer o.sp.DetailsMu.Unlock() + return o.sp.getStoreListFromIDsLocked(storeIDs, o.overrideNodeLivenessFn, filter) +} + +// LiveAndDeadReplicas implements the AllocatorStorePool interface. +func (o *OverrideStorePool) LiveAndDeadReplicas( + repls []roachpb.ReplicaDescriptor, includeSuspectAndDrainingStores bool, +) (liveReplicas, deadReplicas []roachpb.ReplicaDescriptor) { + return o.sp.liveAndDeadReplicasWithLiveness(repls, o.overrideNodeLivenessFn, includeSuspectAndDrainingStores) +} + +// ClusterNodeCount implements the AllocatorStorePool interface. +func (o *OverrideStorePool) ClusterNodeCount() int { + return o.sp.ClusterNodeCount() +} + +// IsDeterministic implements the AllocatorStorePool interface. +func (o *OverrideStorePool) IsDeterministic() bool { + return o.sp.deterministic +} + +// Clock implements the AllocatorStorePool interface. +func (o *OverrideStorePool) Clock() *hlc.Clock { + return o.sp.clock +} + +// GetLocalitiesByNode implements the AllocatorStorePool interface. +func (o *OverrideStorePool) GetLocalitiesByNode( + replicas []roachpb.ReplicaDescriptor, +) map[roachpb.NodeID]roachpb.Locality { + return o.sp.GetLocalitiesByNode(replicas) +} + +// GetLocalitiesByStore implements the AllocatorStorePool interface. +func (o *OverrideStorePool) GetLocalitiesByStore( + replicas []roachpb.ReplicaDescriptor, +) map[roachpb.StoreID]roachpb.Locality { + return o.sp.GetLocalitiesByStore(replicas) +} + +// GetStores implements the AllocatorStorePool interface. +func (o *OverrideStorePool) GetStores() map[roachpb.StoreID]roachpb.StoreDescriptor { + return o.sp.GetStores() +} + +// GetStoreDescriptor implements the AllocatorStorePool interface. +func (o *OverrideStorePool) GetStoreDescriptor( + storeID roachpb.StoreID, +) (roachpb.StoreDescriptor, bool) { + return o.sp.GetStoreDescriptor(storeID) +} + +// GossipNodeIDAddress implements the AllocatorStorePool interface. +func (o *OverrideStorePool) GossipNodeIDAddress( + nodeID roachpb.NodeID, +) (*util.UnresolvedAddr, error) { + return o.sp.GossipNodeIDAddress(nodeID) +} + +// UpdateLocalStoreAfterRebalance implements the AllocatorStorePool interface. +// This override method is a no-op, as +// StorePool.UpdateLocalStoreAfterRebalance(..) is not a read-only method and +// mutates the state of the held store details. +func (o *OverrideStorePool) UpdateLocalStoreAfterRebalance( + _ roachpb.StoreID, _ allocator.RangeUsageInfo, _ roachpb.ReplicaChangeType, +) { +} + +// UpdateLocalStoresAfterLeaseTransfer implements the AllocatorStorePool interface. +// This override method is a no-op, as +// StorePool.UpdateLocalStoresAfterLeaseTransfer(..) is not a read-only method and +// mutates the state of the held store details. +func (o *OverrideStorePool) UpdateLocalStoresAfterLeaseTransfer( + _ roachpb.StoreID, _ roachpb.StoreID, _ float64, +) { +} + +// UpdateLocalStoreAfterRelocate implements the AllocatorStorePool interface. +// This override method is a no-op, as +// StorePool.UpdateLocalStoreAfterRelocate(..) is not a read-only method and +// mutates the state of the held store details. +func (o *OverrideStorePool) UpdateLocalStoreAfterRelocate( + _, _ []roachpb.ReplicationTarget, _, _ []roachpb.ReplicaDescriptor, _ roachpb.StoreID, _ float64, +) { +} diff --git a/pkg/kv/kvserver/allocator/storepool/override_store_pool_test.go b/pkg/kv/kvserver/allocator/storepool/override_store_pool_test.go new file mode 100644 index 000000000000..155fc1bade06 --- /dev/null +++ b/pkg/kv/kvserver/allocator/storepool/override_store_pool_test.go @@ -0,0 +1,373 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storepool + +import ( + "context" + "reflect" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +// TestOverrideStorePoolStatusString tests the status string output of the +// store pool implementation, including any liveness overrides. +func TestOverrideStorePoolStatusString(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + stopper, g, _, testStorePool, mnl := CreateTestStorePool(ctx, st, + TestTimeUntilStoreDead, false, /* deterministic */ + func() int { return 10 }, /* nodeCount */ + livenesspb.NodeLivenessStatus_DEAD) + defer stopper.Stop(ctx) + sg := gossiputil.NewStoreGossiper(g) + + livenessOverrides := make(map[roachpb.NodeID]livenesspb.NodeLivenessStatus) + sp := NewOverrideStorePool(testStorePool, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + if overriddenLiveness, ok := livenessOverrides[nid]; ok { + return overriddenLiveness + } + + return mnl.NodeLivenessFunc(nid, now, timeUntilStoreDead) + }) + + stores := []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{NodeID: 4}, + }, + { + StoreID: 5, + Node: roachpb.NodeDescriptor{NodeID: 5}, + }, + } + + sg.GossipStores(stores, t) + for i := 1; i <= 5; i++ { + mnl.SetNodeStatus(roachpb.NodeID(i), livenesspb.NodeLivenessStatus_LIVE) + } + + // Override node 2 as dead. + livenessOverrides[roachpb.NodeID(2)] = livenesspb.NodeLivenessStatus_DEAD + + // Override node 3 as decommissioning. + livenessOverrides[roachpb.NodeID(3)] = livenesspb.NodeLivenessStatus_DECOMMISSIONING + + // Mark node 5 as draining. + mnl.SetNodeStatus(5, livenesspb.NodeLivenessStatus_DRAINING) + + require.Equal(t, "1: range-count=0 fraction-used=0.00\n"+ + "2 (status=1): range-count=0 fraction-used=0.00\n"+ + "3 (status=5): range-count=0 fraction-used=0.00\n"+ + "4: range-count=0 fraction-used=0.00\n"+ + "5 (status=7): range-count=0 fraction-used=0.00\n", + sp.String(), + ) +} + +// TestOverrideStorePoolDecommissioningReplicas validates the ability of to use +// both liveness as well as liveness overrides in determining the set of +// decommissioning replicas. +func TestOverrideStorePoolDecommissioningReplicas(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + stopper, g, _, testStorePool, mnl := CreateTestStorePool(ctx, st, + TestTimeUntilStoreDead, false, /* deterministic */ + func() int { return 10 }, /* nodeCount */ + livenesspb.NodeLivenessStatus_DEAD) + defer stopper.Stop(ctx) + sg := gossiputil.NewStoreGossiper(g) + + livenessOverrides := make(map[roachpb.NodeID]livenesspb.NodeLivenessStatus) + sp := NewOverrideStorePool(testStorePool, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + if overriddenLiveness, ok := livenessOverrides[nid]; ok { + return overriddenLiveness + } + + return mnl.NodeLivenessFunc(nid, now, timeUntilStoreDead) + }) + + stores := []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + }, + { + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + }, + { + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + }, + { + StoreID: 4, + Node: roachpb.NodeDescriptor{NodeID: 4}, + }, + { + StoreID: 5, + Node: roachpb.NodeDescriptor{NodeID: 5}, + }, + } + + replicas := []roachpb.ReplicaDescriptor{ + { + NodeID: 1, + StoreID: 1, + ReplicaID: 1, + }, + { + NodeID: 2, + StoreID: 2, + ReplicaID: 2, + }, + { + NodeID: 3, + StoreID: 3, + ReplicaID: 4, + }, + { + NodeID: 4, + StoreID: 4, + ReplicaID: 4, + }, + { + NodeID: 5, + StoreID: 5, + ReplicaID: 5, + }, + } + + sg.GossipStores(stores, t) + for i := 1; i <= 5; i++ { + mnl.SetNodeStatus(roachpb.NodeID(i), livenesspb.NodeLivenessStatus_LIVE) + } + + liveReplicas, deadReplicas := sp.LiveAndDeadReplicas(replicas, false /* includeSuspectAndDrainingStores */) + require.Equalf(t, 5, len(liveReplicas), "expected five live replicas, found %d (%v)", len(liveReplicas), liveReplicas) + require.Emptyf(t, deadReplicas, "expected no dead replicas initially, found %d (%v)", len(deadReplicas), deadReplicas) + // Mark node 4 as decommissioning. + mnl.SetNodeStatus(4, livenesspb.NodeLivenessStatus_DECOMMISSIONING) + // Mark node 5 as dead. + mnl.SetNodeStatus(5, livenesspb.NodeLivenessStatus_DEAD) + + // Override node 3 as decommissioning. + livenessOverrides[roachpb.NodeID(3)] = livenesspb.NodeLivenessStatus_DECOMMISSIONING + + liveReplicas, deadReplicas = sp.LiveAndDeadReplicas(replicas, false /* includeSuspectAndDrainingStores */) + // Decommissioning replicas are considered live. + if a, e := liveReplicas, replicas[:4]; !reflect.DeepEqual(a, e) { + t.Fatalf("expected live replicas %+v; got %+v", e, a) + } + if a, e := deadReplicas, replicas[4:]; !reflect.DeepEqual(a, e) { + t.Fatalf("expected dead replicas %+v; got %+v", e, a) + } + + decommissioningReplicas := sp.DecommissioningReplicas(replicas) + if a, e := decommissioningReplicas, replicas[2:4]; !reflect.DeepEqual(a, e) { + t.Fatalf("expected decommissioning replicas %+v; got %+v", e, a) + } +} + +// TestOverrideStorePoolGetStoreList tests the functionality of the store list +// with and without liveness overrides. +func TestOverrideStorePoolGetStoreList(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + // We're going to manually mark stores dead in this test. + stopper, g, _, testStorePool, mnl := CreateTestStorePool(ctx, st, + TestTimeUntilStoreDead, false, /* deterministic */ + func() int { return 10 }, /* nodeCount */ + livenesspb.NodeLivenessStatus_DEAD) + defer stopper.Stop(ctx) + sg := gossiputil.NewStoreGossiper(g) + + livenessOverrides := make(map[roachpb.NodeID]livenesspb.NodeLivenessStatus) + sp := NewOverrideStorePool(testStorePool, func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + if overriddenLiveness, ok := livenessOverrides[nid]; ok { + return overriddenLiveness + } + + return mnl.NodeLivenessFunc(nid, now, timeUntilStoreDead) + }) + + constraints := []roachpb.ConstraintsConjunction{ + { + Constraints: []roachpb.Constraint{ + {Type: roachpb.Constraint_REQUIRED, Value: "ssd"}, + {Type: roachpb.Constraint_REQUIRED, Value: "dc"}, + }, + }, + } + required := []string{"ssd", "dc"} + // Nothing yet. + sl, _, _ := sp.GetStoreList(StoreFilterNone) + sl = sl.ExcludeInvalid(constraints) + require.Emptyf(t, sl.Stores, "expected no stores, instead %+v", sl.Stores) + + matchingStore := roachpb.StoreDescriptor{ + StoreID: 1, + Node: roachpb.NodeDescriptor{NodeID: 1}, + Attrs: roachpb.Attributes{Attrs: required}, + } + supersetStore := roachpb.StoreDescriptor{ + StoreID: 2, + Node: roachpb.NodeDescriptor{NodeID: 2}, + Attrs: roachpb.Attributes{Attrs: append(required, "db")}, + } + unmatchingStore := roachpb.StoreDescriptor{ + StoreID: 3, + Node: roachpb.NodeDescriptor{NodeID: 3}, + Attrs: roachpb.Attributes{Attrs: []string{"ssd", "otherdc"}}, + } + emptyStore := roachpb.StoreDescriptor{ + StoreID: 4, + Node: roachpb.NodeDescriptor{NodeID: 4}, + Attrs: roachpb.Attributes{}, + } + deadStore := roachpb.StoreDescriptor{ + StoreID: 5, + Node: roachpb.NodeDescriptor{NodeID: 5}, + Attrs: roachpb.Attributes{Attrs: required}, + } + decommissioningStore := roachpb.StoreDescriptor{ + StoreID: 6, + Node: roachpb.NodeDescriptor{NodeID: 6}, + Attrs: roachpb.Attributes{Attrs: required}, + } + absentStore := roachpb.StoreDescriptor{ + StoreID: 7, + Node: roachpb.NodeDescriptor{NodeID: 7}, + Attrs: roachpb.Attributes{Attrs: required}, + } + suspectedStore := roachpb.StoreDescriptor{ + StoreID: 8, + Node: roachpb.NodeDescriptor{NodeID: 8}, + Attrs: roachpb.Attributes{Attrs: required}, + } + + // Gossip and mark all alive initially. + sg.GossipStores([]*roachpb.StoreDescriptor{ + &matchingStore, + &supersetStore, + &unmatchingStore, + &emptyStore, + &deadStore, + &decommissioningStore, + &suspectedStore, + // absentStore is purposefully not gossiped. + }, t) + for i := 1; i <= 8; i++ { + mnl.SetNodeStatus(roachpb.NodeID(i), livenesspb.NodeLivenessStatus_LIVE) + } + + // Set deadStore as dead. + livenessOverrides[deadStore.Node.NodeID] = livenesspb.NodeLivenessStatus_DEAD + + // Set decommissioningStore as decommissioning. + livenessOverrides[decommissioningStore.Node.NodeID] = livenesspb.NodeLivenessStatus_DECOMMISSIONING + + // Set suspectedStore as suspected. + testStorePool.DetailsMu.Lock() + testStorePool.DetailsMu.StoreDetails[suspectedStore.StoreID].LastAvailable = testStorePool.clock.Now().GoTime() + testStorePool.DetailsMu.StoreDetails[suspectedStore.StoreID].LastUnavailable = testStorePool.clock.Now().GoTime() + testStorePool.DetailsMu.Unlock() + + // No filter or limited set of store IDs. + require.NoError(t, verifyStoreList( + sp, + constraints, + nil, /* storeIDs */ + StoreFilterNone, + []int{ + int(matchingStore.StoreID), + int(supersetStore.StoreID), + int(suspectedStore.StoreID), + }, + /* expectedAliveStoreCount */ 5, + /* expectedThrottledStoreCount */ 1, + )) + + // Filter out suspected stores but don't limit the set of store IDs. + require.NoError(t, verifyStoreList( + sp, + constraints, + nil, /* storeIDs */ + StoreFilterSuspect, + []int{ + int(matchingStore.StoreID), + int(supersetStore.StoreID), + }, + /* expectedAliveStoreCount */ 5, + /* expectedThrottledStoreCount */ 1, + )) + + limitToStoreIDs := roachpb.StoreIDSlice{ + matchingStore.StoreID, + decommissioningStore.StoreID, + absentStore.StoreID, + suspectedStore.StoreID, + } + + // No filter but limited to limitToStoreIDs. + // Note that supersetStore is not included. + require.NoError(t, verifyStoreList( + sp, + constraints, + limitToStoreIDs, + StoreFilterNone, + []int{ + int(matchingStore.StoreID), + int(suspectedStore.StoreID), + }, + /* expectedAliveStoreCount */ 2, + /* expectedThrottledStoreCount */ 1, + )) + + // Filter out suspected stores and limit to limitToStoreIDs. + // Note that suspectedStore is not included. + require.NoError(t, verifyStoreList( + sp, + constraints, + limitToStoreIDs, + StoreFilterSuspect, + []int{ + int(matchingStore.StoreID), + }, + /* expectedAliveStoreCount */ 2, + /* expectedThrottledStoreCount */ 1, + )) +} diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool.go b/pkg/kv/kvserver/allocator/storepool/store_pool.go index a3ef04e9c9e2..7ff75c231574 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool.go @@ -334,6 +334,8 @@ type localityWithString struct { // AllocatorStorePool provides an interface for use by the allocator to a list // of all known stores in the cluster and information on their health. type AllocatorStorePool interface { + fmt.Stringer + // ClusterNodeCount returns the number of nodes that are possible allocation // targets. // See comment on StorePool.ClusterNodeCount(). @@ -487,6 +489,10 @@ func NewStorePool( } func (sp *StorePool) String() string { + return sp.statusString(sp.NodeLivenessFn) +} + +func (sp *StorePool) statusString(nl NodeLivenessFunc) string { sp.DetailsMu.RLock() defer sp.DetailsMu.RUnlock() @@ -504,7 +510,7 @@ func (sp *StorePool) String() string { for _, id := range ids { detail := sp.DetailsMu.StoreDetails[id] fmt.Fprintf(&buf, "%d", id) - status := detail.status(now, timeUntilStoreDead, sp.NodeLivenessFn, timeAfterStoreSuspect) + status := detail.status(now, timeUntilStoreDead, nl, timeAfterStoreSuspect) if status != storeStatusAvailable { fmt.Fprintf(&buf, " (status=%d)", status) } @@ -706,6 +712,14 @@ func (sp *StorePool) GetStoreDescriptor(storeID roachpb.StoreID) (roachpb.StoreD // from the provided repls and returns them in a slice. func (sp *StorePool) DecommissioningReplicas( repls []roachpb.ReplicaDescriptor, +) (decommissioningReplicas []roachpb.ReplicaDescriptor) { + return sp.decommissioningReplicasWithLiveness(repls, sp.NodeLivenessFn) +} + +// decommissioningReplicasWithLiveness filters out replicas on decommissioning node/store +// from the provided repls and returns them in a slice, using the provided NodeLivenessFunc. +func (sp *StorePool) decommissioningReplicasWithLiveness( + repls []roachpb.ReplicaDescriptor, nl NodeLivenessFunc, ) (decommissioningReplicas []roachpb.ReplicaDescriptor) { sp.DetailsMu.Lock() defer sp.DetailsMu.Unlock() @@ -718,7 +732,7 @@ func (sp *StorePool) DecommissioningReplicas( for _, repl := range repls { detail := sp.GetStoreDetailLocked(repl.StoreID) - switch detail.status(now, timeUntilStoreDead, sp.NodeLivenessFn, timeAfterStoreSuspect) { + switch detail.status(now, timeUntilStoreDead, nl, timeAfterStoreSuspect) { case storeStatusDecommissioning: decommissioningReplicas = append(decommissioningReplicas, repl) } @@ -776,7 +790,7 @@ func (sp *StorePool) IsDead(storeID roachpb.StoreID) (bool, time.Duration, error // liveness or deadness at the moment) or an error if the store is not found in // the pool. func (sp *StorePool) IsUnknown(storeID roachpb.StoreID) (bool, error) { - status, err := sp.storeStatus(storeID) + status, err := sp.storeStatus(storeID, sp.NodeLivenessFn) if err != nil { return false, err } @@ -786,7 +800,7 @@ func (sp *StorePool) IsUnknown(storeID roachpb.StoreID) (bool, error) { // IsDraining returns true if the given store's status is `storeStatusDraining` // or an error if the store is not found in the pool. func (sp *StorePool) IsDraining(storeID roachpb.StoreID) (bool, error) { - status, err := sp.storeStatus(storeID) + status, err := sp.storeStatus(storeID, sp.NodeLivenessFn) if err != nil { return false, err } @@ -796,14 +810,16 @@ func (sp *StorePool) IsDraining(storeID roachpb.StoreID) (bool, error) { // IsLive returns true if the node is considered alive by the store pool or an error // if the store is not found in the pool. func (sp *StorePool) IsLive(storeID roachpb.StoreID) (bool, error) { - status, err := sp.storeStatus(storeID) + status, err := sp.storeStatus(storeID, sp.NodeLivenessFn) if err != nil { return false, err } return status == storeStatusAvailable, nil } -func (sp *StorePool) storeStatus(storeID roachpb.StoreID) (storeStatus, error) { +func (sp *StorePool) storeStatus( + storeID roachpb.StoreID, nl NodeLivenessFunc, +) (storeStatus, error) { sp.DetailsMu.Lock() defer sp.DetailsMu.Unlock() @@ -816,7 +832,7 @@ func (sp *StorePool) storeStatus(storeID roachpb.StoreID) (storeStatus, error) { now := sp.clock.Now().GoTime() timeUntilStoreDead := TimeUntilStoreDead.Get(&sp.st.SV) timeAfterStoreSuspect := TimeAfterStoreSuspect.Get(&sp.st.SV) - return sd.status(now, timeUntilStoreDead, sp.NodeLivenessFn, timeAfterStoreSuspect), nil + return sd.status(now, timeUntilStoreDead, nl, timeAfterStoreSuspect), nil } // LiveAndDeadReplicas divides the provided repls slice into two slices: the @@ -833,6 +849,16 @@ func (sp *StorePool) storeStatus(storeID roachpb.StoreID) (storeStatus, error) { // they are excluded from the returned slices. func (sp *StorePool) LiveAndDeadReplicas( repls []roachpb.ReplicaDescriptor, includeSuspectAndDrainingStores bool, +) (liveReplicas, deadReplicas []roachpb.ReplicaDescriptor) { + return sp.liveAndDeadReplicasWithLiveness(repls, sp.NodeLivenessFn, includeSuspectAndDrainingStores) +} + +// liveAndDeadReplicasWithLiveness divides the provided repls slice into two slices: the +// first for live replicas, and the second for dead replicas, using the +// provided NodeLivenessFunc. +// See comment on StorePool.LiveAndDeadReplicas(..). +func (sp *StorePool) liveAndDeadReplicasWithLiveness( + repls []roachpb.ReplicaDescriptor, nl NodeLivenessFunc, includeSuspectAndDrainingStores bool, ) (liveReplicas, deadReplicas []roachpb.ReplicaDescriptor) { sp.DetailsMu.Lock() defer sp.DetailsMu.Unlock() @@ -844,7 +870,7 @@ func (sp *StorePool) LiveAndDeadReplicas( for _, repl := range repls { detail := sp.GetStoreDetailLocked(repl.StoreID) // Mark replica as dead if store is dead. - status := detail.status(now, timeUntilStoreDead, sp.NodeLivenessFn, timeAfterStoreSuspect) + status := detail.status(now, timeUntilStoreDead, nl, timeAfterStoreSuspect) switch status { case storeStatusDead: deadReplicas = append(deadReplicas, repl) @@ -1033,7 +1059,7 @@ func (sp *StorePool) GetStoreList(filter StoreFilter) (StoreList, int, Throttled for storeID := range sp.DetailsMu.StoreDetails { storeIDs = append(storeIDs, storeID) } - return sp.getStoreListFromIDsLocked(storeIDs, filter) + return sp.getStoreListFromIDsLocked(storeIDs, sp.NodeLivenessFn, filter) } // GetStoreListFromIDs is the same function as GetStoreList but only returns stores @@ -1043,13 +1069,13 @@ func (sp *StorePool) GetStoreListFromIDs( ) (StoreList, int, ThrottledStoreReasons) { sp.DetailsMu.Lock() defer sp.DetailsMu.Unlock() - return sp.getStoreListFromIDsLocked(storeIDs, filter) + return sp.getStoreListFromIDsLocked(storeIDs, sp.NodeLivenessFn, filter) } // getStoreListFromIDsRLocked is the same function as GetStoreList but requires // that the detailsMU read lock is held. func (sp *StorePool) getStoreListFromIDsLocked( - storeIDs roachpb.StoreIDSlice, filter StoreFilter, + storeIDs roachpb.StoreIDSlice, nl NodeLivenessFunc, filter StoreFilter, ) (StoreList, int, ThrottledStoreReasons) { if sp.deterministic { sort.Sort(storeIDs) @@ -1071,7 +1097,7 @@ func (sp *StorePool) getStoreListFromIDsLocked( // Do nothing; this store is not in the StorePool. continue } - switch s := detail.status(now, timeUntilStoreDead, sp.NodeLivenessFn, timeAfterStoreSuspect); s { + switch s := detail.status(now, timeUntilStoreDead, nl, timeAfterStoreSuspect); s { case storeStatusThrottled: aliveStoreCount++ throttled = append(throttled, detail.throttledBecause) @@ -1214,13 +1240,13 @@ func (sp *StorePool) IsStoreReadyForRoutineReplicaTransfer( if sp.OverrideIsStoreReadyForRoutineReplicaTransferFn != nil { return sp.OverrideIsStoreReadyForRoutineReplicaTransferFn(ctx, targetStoreID) } - return sp.isStoreReadyForRoutineReplicaTransferInternal(ctx, targetStoreID) + return sp.isStoreReadyForRoutineReplicaTransferInternal(ctx, targetStoreID, sp.NodeLivenessFn) } func (sp *StorePool) isStoreReadyForRoutineReplicaTransferInternal( - ctx context.Context, targetStoreID roachpb.StoreID, + ctx context.Context, targetStoreID roachpb.StoreID, nl NodeLivenessFunc, ) bool { - status, err := sp.storeStatus(targetStoreID) + status, err := sp.storeStatus(targetStoreID, nl) if err != nil { return false } diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool_test.go b/pkg/kv/kvserver/allocator/storepool/store_pool_test.go index bdea163c45d1..51c456d5390e 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool_test.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool_test.go @@ -78,7 +78,7 @@ func TestStorePoolGossipUpdate(t *testing.T) { // verifyStoreList ensures that the returned list of stores is correct. func verifyStoreList( - sp *StorePool, + sp AllocatorStorePool, constraints []roachpb.ConstraintsConjunction, storeIDs roachpb.StoreIDSlice, // optional filter StoreFilter,