diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 8e09f7cc8aaa..4b43d669ce5c 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -1280,6 +1280,7 @@ GO_TARGETS = [ "//pkg/kv/kvserver/apply:apply", "//pkg/kv/kvserver/apply:apply_test", "//pkg/kv/kvserver/asim/config:config", + "//pkg/kv/kvserver/asim/event:event", "//pkg/kv/kvserver/asim/gen:gen", "//pkg/kv/kvserver/asim/gossip:gossip", "//pkg/kv/kvserver/asim/gossip:gossip_test", diff --git a/pkg/kv/kvserver/asim/BUILD.bazel b/pkg/kv/kvserver/asim/BUILD.bazel index a2d7e959cdc5..ec6c8e127985 100644 --- a/pkg/kv/kvserver/asim/BUILD.bazel +++ b/pkg/kv/kvserver/asim/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/kv/kvserver/asim/config", + "//pkg/kv/kvserver/asim/event", "//pkg/kv/kvserver/asim/gossip", "//pkg/kv/kvserver/asim/metrics", "//pkg/kv/kvserver/asim/op", @@ -14,6 +15,7 @@ go_library( "//pkg/kv/kvserver/asim/state", "//pkg/kv/kvserver/asim/storerebalancer", "//pkg/kv/kvserver/asim/workload", + "//pkg/util/log", ], ) diff --git a/pkg/kv/kvserver/asim/asim.go b/pkg/kv/kvserver/asim/asim.go index 2df673ad2834..878eafdab266 100644 --- a/pkg/kv/kvserver/asim/asim.go +++ b/pkg/kv/kvserver/asim/asim.go @@ -15,6 +15,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/event" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/gossip" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/metrics" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/op" @@ -22,11 +23,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/storerebalancer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload" + "github.com/cockroachdb/cockroach/pkg/util/log" ) // Simulator simulates an entire cluster, and runs the allocator of each store // in that cluster. type Simulator struct { + log.AmbientContext curr time.Time end time.Time // interval is the step between ticks for active simulaton components, such @@ -36,6 +39,7 @@ type Simulator struct { // The simulator can run multiple workload Generators in parallel. generators []workload.Generator + events event.DelayedEventList pacers map[state.StoreID]queue.ReplicaPacer @@ -53,6 +57,8 @@ type Simulator struct { gossip gossip.Gossip shuffler func(n int, swap func(i, j int)) + settings *config.SimulationSettings + metrics *metrics.Tracker history History } @@ -62,6 +68,7 @@ type Simulator struct { // TODO(kvoli): Add a range log like structure to the history. type History struct { Recorded [][]metrics.StoreMetrics + S state.State } // Listen implements the metrics.StoreMetricListener interface. @@ -76,6 +83,7 @@ func NewSimulator( initialState state.State, settings *config.SimulationSettings, m *metrics.Tracker, + events ...event.DelayedEvent, ) *Simulator { pacers := make(map[state.StoreID]queue.ReplicaPacer) rqs := make(map[state.StoreID]queue.RangeQueue) @@ -83,75 +91,93 @@ func NewSimulator( srs := make(map[state.StoreID]storerebalancer.StoreRebalancer) changer := state.NewReplicaChanger() controllers := make(map[state.StoreID]op.Controller) + + s := &Simulator{ + AmbientContext: log.MakeTestingAmbientCtxWithNewTracer(), + curr: settings.StartTime, + end: settings.StartTime.Add(duration), + interval: settings.TickInterval, + generators: wgs, + state: initialState, + changer: changer, + rqs: rqs, + sqs: sqs, + controllers: controllers, + srs: srs, + pacers: pacers, + gossip: gossip.NewGossip(initialState, settings), + metrics: m, + shuffler: state.NewShuffler(settings.Seed), + // TODO(kvoli): Keeping the state around is a bit hacky, find a better + // method of reporting the ranges. + history: History{Recorded: [][]metrics.StoreMetrics{}, S: initialState}, + events: events, + settings: settings, + } + for _, store := range initialState.Stores() { storeID := store.StoreID() - allocator := initialState.MakeAllocator(storeID) - storePool := initialState.StorePool(storeID) - // TODO(kvoli): Instead of passing in individual settings to construct - // the each ticking component, pass a pointer to the simulation - // settings struct. That way, the settings may be adjusted dynamically - // during a simulation. - rqs[storeID] = queue.NewReplicateQueue( - storeID, - changer, - settings.ReplicaChangeDelayFn(), - allocator, - storePool, - settings.StartTime, - ) - sqs[storeID] = queue.NewSplitQueue( - storeID, - changer, - settings.RangeSplitDelayFn(), - settings.RangeSizeSplitThreshold, - settings.StartTime, - ) - pacers[storeID] = queue.NewScannerReplicaPacer( - initialState.NextReplicasFn(storeID), - settings.PacerLoopInterval, - settings.PacerMinIterInterval, - settings.PacerMaxIterIterval, - settings.Seed, - ) - controllers[storeID] = op.NewController( - changer, - allocator, - storePool, - settings, - storeID, - ) - srs[storeID] = storerebalancer.NewStoreRebalancer( - settings.StartTime, - storeID, - controllers[storeID], - allocator, - storePool, - settings, - storerebalancer.GetStateRaftStatusFn(initialState), - ) + s.addStore(storeID, settings.StartTime) } + s.state.RegisterConfigChangeListener(s) - s := &Simulator{ - curr: settings.StartTime, - end: settings.StartTime.Add(duration), - interval: settings.TickInterval, - generators: wgs, - state: initialState, - changer: changer, - rqs: rqs, - sqs: sqs, - controllers: controllers, - srs: srs, - pacers: pacers, - gossip: gossip.NewGossip(initialState, settings), - metrics: m, - shuffler: state.NewShuffler(settings.Seed), - history: History{Recorded: [][]metrics.StoreMetrics{}}, - } m.Register(&s.history) + s.AddLogTag("asim", nil) return s } +// StoreAddNotify notifies that a new store has been added with ID storeID. +func (s *Simulator) StoreAddNotify(storeID state.StoreID, _ state.State) { + s.addStore(storeID, s.curr) +} + +func (s *Simulator) addStore(storeID state.StoreID, tick time.Time) { + allocator := s.state.MakeAllocator(storeID) + storePool := s.state.StorePool(storeID) + // TODO(kvoli): Instead of passing in individual settings to construct + // the each ticking component, pass a pointer to the simulation + // settings struct. That way, the settings may be adjusted dynamically + // during a simulation. + s.rqs[storeID] = queue.NewReplicateQueue( + storeID, + s.changer, + s.settings.ReplicaChangeDelayFn(), + allocator, + storePool, + tick, + ) + s.sqs[storeID] = queue.NewSplitQueue( + storeID, + s.changer, + s.settings.RangeSplitDelayFn(), + s.settings.RangeSizeSplitThreshold, + tick, + ) + s.pacers[storeID] = queue.NewScannerReplicaPacer( + s.state.NextReplicasFn(storeID), + s.settings.PacerLoopInterval, + s.settings.PacerMinIterInterval, + s.settings.PacerMaxIterIterval, + s.settings.Seed, + ) + s.controllers[storeID] = op.NewController( + s.changer, + allocator, + storePool, + s.settings, + storeID, + ) + s.srs[storeID] = storerebalancer.NewStoreRebalancer( + tick, + storeID, + s.controllers[storeID], + allocator, + storePool, + s.settings, + storerebalancer.GetStateRaftStatusFn(s.state), + ) +} + // GetNextTickTime returns a simulated tick time, or an indication that the // simulation is done. func (s *Simulator) GetNextTickTime() (done bool, tick time.Time) { @@ -189,9 +215,15 @@ func (s *Simulator) RunSim(ctx context.Context) { break } + s.AddLogTag("tick", tick.Format(time.StampMilli)) + ctx = s.AmbientContext.AnnotateCtx(ctx) + // Update the store clocks with the current tick time. s.tickStoreClocks(tick) + // Tick any events. + s.tickEvents(ctx, tick) + // Update the state with generated load. s.tickWorkload(ctx, tick) @@ -311,3 +343,22 @@ func (s *Simulator) tickStoreRebalancers(ctx context.Context, tick time.Time, st func (s *Simulator) tickMetrics(ctx context.Context, tick time.Time) { s.metrics.Tick(ctx, tick, s.state) } + +// tickEvents ticks the registered simulation events. +func (s *Simulator) tickEvents(ctx context.Context, tick time.Time) { + var idx int + // Assume the events are in sorted order and the event list is never added + // to. + for i := range s.events { + if !tick.Before(s.events[i].At) { + idx = i + 1 + log.Infof(ctx, "applying event (scheduled=%s tick=%s)", s.events[i].At, tick) + s.events[i].EventFn(ctx, tick, s.state) + } else { + break + } + } + if idx != 0 { + s.events = s.events[idx:] + } +} diff --git a/pkg/kv/kvserver/asim/asim_test.go b/pkg/kv/kvserver/asim/asim_test.go index db05c9ba2f09..0e786eeadbb9 100644 --- a/pkg/kv/kvserver/asim/asim_test.go +++ b/pkg/kv/kvserver/asim/asim_test.go @@ -87,6 +87,6 @@ func TestAllocatorSimulatorDeterministic(t *testing.T) { refRun = history continue } - require.Equal(t, refRun, history) + require.Equal(t, refRun.Recorded, history.Recorded) } } diff --git a/pkg/kv/kvserver/asim/event/BUILD.bazel b/pkg/kv/kvserver/asim/event/BUILD.bazel new file mode 100644 index 000000000000..8e9eb57247a0 --- /dev/null +++ b/pkg/kv/kvserver/asim/event/BUILD.bazel @@ -0,0 +1,9 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "event", + srcs = ["delayed_event.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/event", + visibility = ["//visibility:public"], + deps = ["//pkg/kv/kvserver/asim/state"], +) diff --git a/pkg/kv/kvserver/asim/event/delayed_event.go b/pkg/kv/kvserver/asim/event/delayed_event.go new file mode 100644 index 000000000000..61885ef406b3 --- /dev/null +++ b/pkg/kv/kvserver/asim/event/delayed_event.go @@ -0,0 +1,38 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package event + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" +) + +type DelayedEventList []DelayedEvent + +// Len implements sort.Interface. +func (del DelayedEventList) Len() int { return len(del) } + +// Less implements sort.Interface. +func (del DelayedEventList) Less(i, j int) bool { + return del[i].At.Before(del[j].At) +} + +// Swap implements sort.Interface. +func (del DelayedEventList) Swap(i, j int) { + del[i], del[j] = del[j], del[i] +} + +type DelayedEvent struct { + At time.Time + EventFn func(context.Context, time.Time, state.State) +} diff --git a/pkg/kv/kvserver/asim/gen/BUILD.bazel b/pkg/kv/kvserver/asim/gen/BUILD.bazel index 5b47af5fcc6d..01e79229a76b 100644 --- a/pkg/kv/kvserver/asim/gen/BUILD.bazel +++ b/pkg/kv/kvserver/asim/gen/BUILD.bazel @@ -8,6 +8,7 @@ go_library( deps = [ "//pkg/kv/kvserver/asim", "//pkg/kv/kvserver/asim/config", + "//pkg/kv/kvserver/asim/event", "//pkg/kv/kvserver/asim/metrics", "//pkg/kv/kvserver/asim/state", "//pkg/kv/kvserver/asim/workload", diff --git a/pkg/kv/kvserver/asim/gen/generator.go b/pkg/kv/kvserver/asim/gen/generator.go index 8ca1a1ac4066..5d2705b061a0 100644 --- a/pkg/kv/kvserver/asim/gen/generator.go +++ b/pkg/kv/kvserver/asim/gen/generator.go @@ -12,10 +12,12 @@ package gen import ( "math/rand" + "sort" "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/event" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/metrics" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload" @@ -36,26 +38,53 @@ type LoadGen interface { Generate(seed int64, settings *config.SimulationSettings) []workload.Generator } -// StateGen provides a method to generate a state given a seed and simulation -// settings. -type StateGen interface { - // Generate returns a state that is parameterized randomly by the seed and - // simulation settings provided. +// ClusterGen provides a method to generate the initial cluster state, given a +// seed and simulation settings. The initial cluster state includes: nodes +// (including locality) and stores. +type ClusterGen interface { + // Generate returns a new State that is parameterized randomly by the seed + // and simulation settings provided. Generate(seed int64, settings *config.SimulationSettings) state.State } +// RangeGen provides a method to generate the initial range splits, range +// replica and lease placement within a cluster. +type RangeGen interface { + // Generate returns an updated state, given the initial state, seed and + // simulation settings provided. In the updated state, ranges will have been + // created, replicas and leases assigned to stores in the cluster. + Generate(seed int64, settings *config.SimulationSettings, s state.State) state.State +} + +// EventGen provides a method to generate a list of events that will apply to +// the simulated cluster. Currently, only delayed (fixed time) events are +// supported. +type EventGen interface { + // Generate returns a list of events, which should be exectued at the delay specified. + Generate(seed int64) event.DelayedEventList +} + // GenerateSimulation is a utility function that creates a new allocation // simulation using the provided state, workload, settings generators and seed. func GenerateSimulation( - duration time.Duration, stateGen StateGen, loadGen LoadGen, settingsGen SettingsGen, seed int64, + duration time.Duration, + clusterGen ClusterGen, + rangeGen RangeGen, + loadGen LoadGen, + settingsGen SettingsGen, + eventGen EventGen, + seed int64, ) *asim.Simulator { settings := settingsGen.Generate(seed) + s := clusterGen.Generate(seed, &settings) + s = rangeGen.Generate(seed, &settings, s) return asim.NewSimulator( duration, loadGen.Generate(seed, &settings), - stateGen.Generate(seed, &settings), + s, &settings, metrics.NewTracker(settings.MetricsInterval), + eventGen.Generate(seed)..., ) } @@ -89,6 +118,10 @@ type BasicLoad struct { // SkewedAccess is true. The returned workload generators are seeded with the // provided seed. func (bl BasicLoad) Generate(seed int64, settings *config.SimulationSettings) []workload.Generator { + if bl.Rate == 0 { + return []workload.Generator{} + } + var keyGen workload.KeyGenerator rand := rand.New(rand.NewSource(seed)) if bl.SkewedAccess { @@ -110,25 +143,94 @@ func (bl BasicLoad) Generate(seed int64, settings *config.SimulationSettings) [] } } -// BasicState implements the StateGen interface. -type BasicState struct { - Stores int +// LoadedCluster implements the ClusterGen interface. +type LoadedCluster struct { + Info state.ClusterInfo +} + +// Generate returns a new simulator state, where the cluster is loaded based on +// the cluster info the loaded cluster generator is created with. There is no +// randomness in this cluster generation. +func (lc LoadedCluster) Generate(seed int64, settings *config.SimulationSettings) state.State { + return state.LoadClusterInfo(lc.Info, settings) +} + +// BasicCluster implements the ClusterGen interace. +type BasicCluster struct { + Nodes int + StoresPerNode int +} + +// Generate returns a new simulator state, where the cluster is created with all +// nodes having the same locality and with the specified number of stores/nodes +// created. The cluster is created based on the stores and stores-per-node +// values the basic cluster generator is created with. +func (lc BasicCluster) Generate(seed int64, settings *config.SimulationSettings) state.State { + info := state.ClusterInfoWithStoreCount(lc.Nodes, lc.StoresPerNode) + return state.LoadClusterInfo(info, settings) +} + +// LoadedRanges implements the RangeGen interface. +type LoadedRanges struct { + Info state.RangesInfo +} + +// Generate returns an updated simulator state, where the cluster is loaded +// with the range info that the generator was created with. There is no +// randomness in this cluster generation. +func (lr LoadedRanges) Generate( + seed int64, settings *config.SimulationSettings, s state.State, +) state.State { + state.LoadRangeInfo(s, lr.Info...) + return s +} + +// PlacementType represents a type of placement distribution. +type PlacementType int + +const ( + Uniform PlacementType = iota + Skewed +) + +// BasicRanges implements the RangeGen interface. +type BasicRanges struct { Ranges int - SkewedPlacement bool + PlacementType PlacementType KeySpace int ReplicationFactor int + Bytes int64 } -// Generate returns a new state that is created with the number of stores, -// ranges, keyspace and replication factor from the basic state fields. The -// initial assignment of replicas and leases for ranges follows either a -// uniform or powerlaw distribution depending on if SkewedPlacement is true. -func (bs BasicState) Generate(seed int64, settings *config.SimulationSettings) state.State { - var s state.State - if bs.SkewedPlacement { - s = state.NewStateSkewedDistribution(bs.Stores, bs.Ranges, bs.ReplicationFactor, bs.KeySpace, settings) - } else { - s = state.NewStateEvenDistribution(bs.Stores, bs.Ranges, bs.ReplicationFactor, bs.KeySpace, settings) +// Generate returns an updated simulator state, where the cluster is loaded +// with ranges based on the parameters of basic ranges. +func (br BasicRanges) Generate( + seed int64, settings *config.SimulationSettings, s state.State, +) state.State { + stores := len(s.Stores()) + var rangesInfo state.RangesInfo + switch br.PlacementType { + case Uniform: + rangesInfo = state.RangesInfoEvenDistribution(stores, br.Ranges, br.KeySpace, br.ReplicationFactor, br.Bytes) + case Skewed: + rangesInfo = state.RangesInfoSkewedDistribution(stores, br.Ranges, br.KeySpace, br.ReplicationFactor, br.Bytes) } + for _, rangeInfo := range rangesInfo { + rangeInfo.Size = br.Bytes + } + state.LoadRangeInfo(s, rangesInfo...) return s } + +// StaticEvents implements the EventGen interface. +// TODO(kvoli): introduce conditional events. +type StaticEvents struct { + DelayedEvents event.DelayedEventList +} + +// Generate returns a list of events, exactly the same as the events +// StaticEvents was created with. +func (se StaticEvents) Generate(seed int64) event.DelayedEventList { + sort.Sort(se.DelayedEvents) + return se.DelayedEvents +} diff --git a/pkg/kv/kvserver/asim/gossip/gossip.go b/pkg/kv/kvserver/asim/gossip/gossip.go index 28a3e6cfa2b6..36a923232863 100644 --- a/pkg/kv/kvserver/asim/gossip/gossip.go +++ b/pkg/kv/kvserver/asim/gossip/gossip.go @@ -49,6 +49,7 @@ type storeGossiper struct { lastIntervalGossip time.Time descriptorGetter func(cached bool) roachpb.StoreDescriptor pendingOutbound *roachpb.StoreDescriptor + addingStore bool } func newStoreGossiper(descriptorGetter func(cached bool) roachpb.StoreDescriptor) *storeGossiper { @@ -112,10 +113,14 @@ func NewGossip(s state.State, settings *config.SimulationSettings) *gossip { } s.RegisterCapacityChangeListener(g) s.RegisterCapacityListener(g) + s.RegisterConfigChangeListener(g) return g } func (g *gossip) addStoreToGossip(s state.State, storeID state.StoreID) { + // Add the store gossip in an "adding" state initially, this is to avoid + // recursive calls to get the store descriptor. + g.storeGossip[storeID] = &storeGossiper{addingStore: true} g.storeGossip[storeID] = newStoreGossiper(func(cached bool) roachpb.StoreDescriptor { return s.StoreDescriptors(cached, storeID)[0] }) @@ -160,7 +165,9 @@ func (g *gossip) Tick(ctx context.Context, tick time.Time, s state.State) { // for the store with ID StoreID. func (g *gossip) CapacityChangeNotify(cce kvserver.CapacityChangeEvent, storeID state.StoreID) { if sg, ok := g.storeGossip[storeID]; ok { - sg.local.MaybeGossipOnCapacityChange(context.Background(), cce) + if !sg.addingStore { + sg.local.MaybeGossipOnCapacityChange(context.Background(), cce) + } } else { panic( fmt.Sprintf("capacity change event but no found store in store gossip with ID %d", @@ -173,7 +180,9 @@ func (g *gossip) CapacityChangeNotify(cce kvserver.CapacityChangeEvent, storeID // the store with ID StoreID. func (g *gossip) NewCapacityNotify(capacity roachpb.StoreCapacity, storeID state.StoreID) { if sg, ok := g.storeGossip[storeID]; ok { - sg.local.UpdateCachedCapacity(capacity) + if !sg.addingStore { + sg.local.UpdateCachedCapacity(capacity) + } } else { panic( fmt.Sprintf("new capacity event but no found store in store gossip with ID %d", @@ -182,6 +191,11 @@ func (g *gossip) NewCapacityNotify(capacity roachpb.StoreCapacity, storeID state } } +// StoreAddNotify notifies that a new store has been added with ID storeID. +func (g *gossip) StoreAddNotify(storeID state.StoreID, s state.State) { + g.addStoreToGossip(s, storeID) +} + func (g *gossip) maybeUpdateState(tick time.Time, s state.State) { // NB: The updates function gives back all store descriptors which have // completed exchange. We apply the update to every stores state uniformly, diff --git a/pkg/kv/kvserver/asim/metrics/series.go b/pkg/kv/kvserver/asim/metrics/series.go index 9530b7e07c84..d99ea7570f11 100644 --- a/pkg/kv/kvserver/asim/metrics/series.go +++ b/pkg/kv/kvserver/asim/metrics/series.go @@ -37,6 +37,7 @@ func MakeTS(metrics [][]StoreMetrics) map[string][][]float64 { ret["replica_b_rcvd"] = make([][]float64, stores) ret["replica_b_sent"] = make([][]float64, stores) ret["range_splits"] = make([][]float64, stores) + ret["disk_fraction_used"] = make([][]float64, stores) for _, sms := range metrics { for i, sm := range sms { @@ -52,6 +53,7 @@ func MakeTS(metrics [][]StoreMetrics) map[string][][]float64 { ret["replica_b_rcvd"][i] = append(ret["replica_b_rcvd"][i], float64(sm.RebalanceRcvdBytes)) ret["replica_b_sent"][i] = append(ret["replica_b_sent"][i], float64(sm.RebalanceSentBytes)) ret["range_splits"][i] = append(ret["range_splits"][i], float64(sm.RangeSplits)) + ret["disk_fraction_used"][i] = append(ret["disk_fraction_used"][i], sm.DiskFractionUsed) } } return ret diff --git a/pkg/kv/kvserver/asim/metrics/tracker.go b/pkg/kv/kvserver/asim/metrics/tracker.go index 8543cff3d42e..1e5572a71713 100644 --- a/pkg/kv/kvserver/asim/metrics/tracker.go +++ b/pkg/kv/kvserver/asim/metrics/tracker.go @@ -41,6 +41,7 @@ type StoreMetrics struct { RebalanceSentBytes int64 RebalanceRcvdBytes int64 RangeSplits int64 + DiskFractionUsed float64 } // the MetricsTracker to report new store metrics for a tick. @@ -118,6 +119,7 @@ func (mt *Tracker) Tick(ctx context.Context, tick time.Time, s state.State) { RebalanceSentBytes: u.RebalanceSentBytes, RebalanceRcvdBytes: u.RebalanceRcvdBytes, RangeSplits: u.RangeSplits, + DiskFractionUsed: desc.Capacity.FractionUsed(), } sms = append(sms, sm) } diff --git a/pkg/kv/kvserver/asim/state/BUILD.bazel b/pkg/kv/kvserver/asim/state/BUILD.bazel index 165e777857ea..7c88db6bd23b 100644 --- a/pkg/kv/kvserver/asim/state/BUILD.bazel +++ b/pkg/kv/kvserver/asim/state/BUILD.bazel @@ -30,6 +30,9 @@ go_library( "//pkg/kv/kvserver/split", "//pkg/roachpb", "//pkg/settings/cluster", + "//pkg/spanconfig", + "//pkg/spanconfig/spanconfigreporter", + "//pkg/util/admission/admissionpb", "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/metric", diff --git a/pkg/kv/kvserver/asim/state/impl.go b/pkg/kv/kvserver/asim/state/impl.go index f42fa7bee831..3209d1411470 100644 --- a/pkg/kv/kvserver/asim/state/impl.go +++ b/pkg/kv/kvserver/asim/state/impl.go @@ -12,6 +12,7 @@ package state import ( "bytes" + "context" "fmt" "math" "sort" @@ -28,6 +29,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigreporter" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/google/btree" "go.etcd.io/raft/v3" @@ -42,6 +45,8 @@ type state struct { quickLivenessMap map[NodeID]livenesspb.NodeLivenessStatus capacityChangeListeners []CapacityChangeListener newCapacityListeners []NewCapacityListener + configChangeListeners []ConfigChangeListener + capacityOverrides map[StoreID]CapacityOverride ranges *rmap clusterinfo ClusterInfo usageInfo *ClusterUsageInfo @@ -63,14 +68,15 @@ func NewState(settings *config.SimulationSettings) State { func newState(settings *config.SimulationSettings) *state { s := &state{ - nodes: make(map[NodeID]*node), - stores: make(map[StoreID]*store), - loadsplits: make(map[StoreID]LoadSplitter), - quickLivenessMap: make(map[NodeID]livenesspb.NodeLivenessStatus), - clock: &ManualSimClock{nanos: settings.StartTime.UnixNano()}, - ranges: newRMap(), - usageInfo: newClusterUsageInfo(), - settings: settings, + nodes: make(map[NodeID]*node), + stores: make(map[StoreID]*store), + loadsplits: make(map[StoreID]LoadSplitter), + quickLivenessMap: make(map[NodeID]livenesspb.NodeLivenessStatus), + capacityOverrides: make(map[StoreID]CapacityOverride), + clock: &ManualSimClock{nanos: settings.StartTime.UnixNano()}, + ranges: newRMap(), + usageInfo: newClusterUsageInfo(), + settings: settings, } s.load = map[RangeID]ReplicaLoad{FirstRangeID: NewReplicaLoadCounter(s.clock)} return s @@ -197,12 +203,61 @@ func (s *state) StoreDescriptors(cached bool, storeIDs ...StoreID) []roachpb.Sto func (s *state) updateStoreCapacity(storeID StoreID) { if store, ok := s.stores[storeID]; ok { - capacity := Capacity(s, storeID) + capacity := s.capacity(storeID) + if override, ok := s.capacityOverrides[storeID]; ok { + capacity = mergeOverride(capacity, override) + } store.desc.Capacity = capacity s.publishNewCapacityEvent(capacity, storeID) } } +func (s *state) capacity(storeID StoreID) roachpb.StoreCapacity { + // TODO(kvoli,lidorcarmel): Store capacity will need to be populated with + // the following missing fields: l0sublevels, bytesperreplica, writesperreplica. + store, ok := s.stores[storeID] + if !ok { + panic(fmt.Sprintf("programming error: store (%d) doesn't exist", storeID)) + } + + // We re-use the existing store capacity and selectively zero out the fields + // we intend to change. + capacity := store.desc.Capacity + capacity.QueriesPerSecond = 0 + capacity.WritesPerSecond = 0 + capacity.LogicalBytes = 0 + capacity.LeaseCount = 0 + capacity.RangeCount = 0 + capacity.Used = 0 + capacity.Available = 0 + + for _, repl := range s.Replicas(storeID) { + rangeID := repl.Range() + replicaID := repl.ReplicaID() + rng, _ := s.Range(rangeID) + if rng.Leaseholder() == replicaID { + // TODO(kvoli): We currently only consider load on the leaseholder + // replica for a range. The other replicas have an estimate that is + // calculated within the allocation algorithm. Adapt this to + // support follower reads, when added to the workload generator. + usage := s.RangeUsageInfo(rng.RangeID(), storeID) + capacity.QueriesPerSecond += usage.QueriesPerSecond + capacity.WritesPerSecond += usage.WritesPerSecond + capacity.LogicalBytes += usage.LogicalBytes + capacity.LeaseCount++ + } + capacity.RangeCount++ + } + + // TODO(kvoli): parameterize the logical to actual used storage bytes. At the + // moment we use 1.25 as a rough estimate. + used := int64(float64(capacity.LogicalBytes) * 1.25) + available := capacity.Capacity - used + capacity.Used = used + capacity.Available = available + return capacity +} + // Store returns the Store with ID StoreID. This fails if no Store exists // with ID StoreID. func (s *state) Store(storeID StoreID) (Store, bool) { @@ -429,6 +484,10 @@ func (s *state) AddStore(nodeID NodeID) (Store, bool) { // Add a usage info struct. _ = s.usageInfo.storeRef(storeID) + for _, listener := range s.configChangeListeners { + listener.StoreAddNotify(storeID, s) + } + return store, true } @@ -647,6 +706,23 @@ func (s *state) SetRangeBytes(rangeID RangeID, bytes int64) { rng.size = bytes } +// SetCapacityOverride updates the capacity for the store with ID StoreID to +// always return the overriden value given for any set fields in +// CapacityOverride. +func (s *state) SetCapacityOverride(storeID StoreID, override CapacityOverride) { + if _, ok := s.stores[storeID]; !ok { + panic(fmt.Sprintf("programming error: no store exist with ID %d", storeID)) + } + + existing, ok := s.capacityOverrides[storeID] + if !ok { + s.capacityOverrides[storeID] = override + return + } + + s.capacityOverrides[storeID] = CapacityOverride(mergeOverride(roachpb.StoreCapacity(existing), override)) +} + // SplitRange splits the Range which contains Key in [StartKey, EndKey). // The Range is partitioned into [StartKey, Key), [Key, EndKey) and // returned. The right hand side of this split, is the new Range. If any @@ -1076,6 +1152,124 @@ func (s *state) RaftStatus(rangeID RangeID, storeID StoreID) *raft.Status { return status } +func (s *state) GetIsLiveMap() livenesspb.IsLiveMap { + isLiveMap := livenesspb.IsLiveMap{} + + for nodeID, status := range s.quickLivenessMap { + nid := roachpb.NodeID(nodeID) + entry := livenesspb.IsLiveMapEntry{ + Liveness: livenesspb.Liveness{ + NodeID: nid, + Expiration: hlc.LegacyTimestamp{WallTime: math.MaxInt64}, + Draining: false, + Membership: livenesspb.MembershipStatus_ACTIVE, + }, + IsLive: true, + } + + switch status { + case livenesspb.NodeLivenessStatus_UNKNOWN: + continue + case livenesspb.NodeLivenessStatus_DEAD: + // Set liveness expiration to be greater than + // server.time_until_store_dead in the past - so that the store is + // considered dead. + entry.Liveness.Expiration.WallTime = int64(0) + entry.IsLive = false + case livenesspb.NodeLivenessStatus_UNAVAILABLE: + // Set liveness expiration to be just recently expired. This needs to be + // within now - server.time_until_store_dead and now, otherwise the store + // will be considered dead, not unavailable. + entry.Liveness.Expiration.WallTime = s.clock.Now().Add(-time.Second).UnixNano() + entry.IsLive = false + case livenesspb.NodeLivenessStatus_LIVE: + case livenesspb.NodeLivenessStatus_DECOMMISSIONING: + entry.Membership = livenesspb.MembershipStatus_DECOMMISSIONING + case livenesspb.NodeLivenessStatus_DECOMMISSIONED: + // Similar to DEAD, set the expiration in the past. A decommissioned + // store should not have a valid liveness expiration. + entry.Membership = livenesspb.MembershipStatus_DECOMMISSIONED + entry.Liveness.Expiration.WallTime = int64(0) + entry.IsLive = false + case livenesspb.NodeLivenessStatus_DRAINING: + entry.Draining = true + } + isLiveMap[nid] = entry + } + + return isLiveMap +} + +func (s *state) GetStoreDescriptor(storeID roachpb.StoreID) (roachpb.StoreDescriptor, bool) { + if descs := s.StoreDescriptors(false, StoreID(storeID)); len(descs) == 0 { + return roachpb.StoreDescriptor{}, false + } else { + return descs[0], true + } +} + +// NeedsSplit is added for the spanconfig.StoreReader interface, required for +// SpanConfigConformanceReport. +func (s *state) NeedsSplit(ctx context.Context, start, end roachpb.RKey) (bool, error) { + // We don't need to implement this method for conformance reports. + panic("not implemented") +} + +// ComputeSplitKey is added for the spanconfig.StoreReader interface, required for +// SpanConfigConformanceReport. +func (s *state) ComputeSplitKey( + ctx context.Context, start, end roachpb.RKey, +) (roachpb.RKey, error) { + // We don't need to implement this method for conformance reports. + panic("not implemented") +} + +// GetSpanConfigForKey is added for the spanconfig.StoreReader interface, required for +// SpanConfigConformanceReport. +func (s *state) GetSpanConfigForKey( + ctx context.Context, key roachpb.RKey, +) (roachpb.SpanConfig, error) { + rng := s.rangeFor(ToKey(key.AsRawKey())) + if rng == nil { + panic(fmt.Sprintf("programming error: range for key %s doesn't exist", key)) + } + return rng.config, nil +} + +// Scan is added for the rangedesc.Scanner interface, required for +// SpanConfigConformanceReport. We ignore the span passed in and return every +// descriptor available. +func (s *state) Scan( + ctx context.Context, + pageSize int, + init func(), + span roachpb.Span, + fn func(descriptors ...roachpb.RangeDescriptor) error, +) error { + // NB: we ignore the span passed in, we pass the fn every range descriptor + // available. + rngs := s.Ranges() + descriptors := make([]roachpb.RangeDescriptor, len(rngs)) + for i, rng := range rngs { + descriptors[i] = *rng.Descriptor() + } + return fn(descriptors...) +} + +// Report returns the span config conformance report for every range in the +// simulated cluster. This may be used to assert on the current conformance +// state of ranges. +func (s *state) Report() roachpb.SpanConfigConformanceReport { + reporter := spanconfigreporter.New( + s, s, s, s, + cluster.MakeClusterSettings(), &spanconfig.TestingKnobs{}) + report, err := reporter.SpanConfigConformance(context.Background(), []roachpb.Span{{}}) + if err != nil { + panic(fmt.Sprintf("programming error: error getting span config report %s", err.Error())) + } + return report +} + // RegisterCapacityChangeListener registers a listener which will be called // on events where there is a capacity change (lease or replica) in the // cluster state. @@ -1102,6 +1296,13 @@ func (s *state) publishNewCapacityEvent(capacity roachpb.StoreCapacity, storeID } } +// RegisterCapacityListener registers a listener which will be called when +// a new store capacity has been generated from scratch, for a specific +// store. +func (s *state) RegisterConfigChangeListener(listener ConfigChangeListener) { + s.configChangeListeners = append(s.configChangeListeners, listener) +} + // node is an implementation of the Node interface. type node struct { nodeID NodeID diff --git a/pkg/kv/kvserver/asim/state/load.go b/pkg/kv/kvserver/asim/state/load.go index ef659b98e52d..77413a7860d7 100644 --- a/pkg/kv/kvserver/asim/state/load.go +++ b/pkg/kv/kvserver/asim/state/load.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" ) @@ -115,33 +116,82 @@ func (rl *ReplicaLoadCounter) Split() ReplicaLoad { } } -// Capacity returns the store capacity for the store with id storeID. It -// aggregates the load from each replica within the store. -func Capacity(state State, storeID StoreID) roachpb.StoreCapacity { - // TODO(kvoli,lidorcarmel): Store capacity will need to be populated with - // the following missing fields: capacity, available, used, l0sublevels, - // bytesperreplica, writesperreplica. - capacity := roachpb.StoreCapacity{} - - for _, repl := range state.Replicas(storeID) { - rangeID := repl.Range() - replicaID := repl.ReplicaID() - rng, _ := state.Range(rangeID) - if rng.Leaseholder() == replicaID { - // TODO(kvoli): We currently only consider load on the leaseholder - // replica for a range. The other replicas have an estimate that is - // calculated within the allocation algorithm. Adapt this to - // support follower reads, when added to the workload generator. - usage := state.RangeUsageInfo(rng.RangeID(), storeID) - capacity.QueriesPerSecond += usage.QueriesPerSecond - capacity.WritesPerSecond += usage.WritesPerSecond - capacity.LogicalBytes += usage.LogicalBytes - capacity.LeaseCount++ - } +// capacityOverrideSentinel is used to signal that the override value has not +// been set for a field. +const capacityOverrideSentinel = -1 + +// CapacityOverride is used to override some field(s) of a store's capacity. +type CapacityOverride roachpb.StoreCapacity + +// NewCapacityOverride returns a capacity override where no overrides are set. +func NewCapacityOverride() CapacityOverride { + return CapacityOverride{ + Capacity: capacityOverrideSentinel, + Available: capacityOverrideSentinel, + Used: capacityOverrideSentinel, + LogicalBytes: capacityOverrideSentinel, + RangeCount: capacityOverrideSentinel, + LeaseCount: capacityOverrideSentinel, + QueriesPerSecond: capacityOverrideSentinel, + WritesPerSecond: capacityOverrideSentinel, + CPUPerSecond: capacityOverrideSentinel, + L0Sublevels: capacityOverrideSentinel, + IOThreshold: admissionpb.IOThreshold{ + L0NumSubLevels: capacityOverrideSentinel, + L0NumSubLevelsThreshold: capacityOverrideSentinel, + L0NumFiles: capacityOverrideSentinel, + L0NumFilesThreshold: capacityOverrideSentinel, + }, + } +} - capacity.RangeCount++ +func mergeOverride( + capacity roachpb.StoreCapacity, override CapacityOverride, +) roachpb.StoreCapacity { + ret := capacity + if override.Capacity != capacityOverrideSentinel { + ret.Capacity = override.Capacity + } + if override.Available != capacityOverrideSentinel { + ret.Available = override.Available + } + if override.Used != capacityOverrideSentinel { + ret.Used = override.Used + } + if override.LogicalBytes != capacityOverrideSentinel { + ret.LogicalBytes = override.LogicalBytes + } + if override.RangeCount != capacityOverrideSentinel { + ret.RangeCount = override.RangeCount + } + if override.LeaseCount != capacityOverrideSentinel { + ret.LeaseCount = override.LeaseCount + } + if override.QueriesPerSecond != capacityOverrideSentinel { + ret.QueriesPerSecond = override.QueriesPerSecond + } + if override.WritesPerSecond != capacityOverrideSentinel { + ret.WritesPerSecond = override.WritesPerSecond + } + if override.CPUPerSecond != capacityOverrideSentinel { + ret.CPUPerSecond = override.CPUPerSecond + } + if override.L0Sublevels != capacityOverrideSentinel { + ret.L0Sublevels = override.L0Sublevels + } + if override.IOThreshold.L0NumFiles != capacityOverrideSentinel { + ret.IOThreshold.L0NumFiles = override.IOThreshold.L0NumFiles + } + if override.IOThreshold.L0NumFilesThreshold != capacityOverrideSentinel { + ret.IOThreshold.L0NumFilesThreshold = override.IOThreshold.L0NumFilesThreshold + } + if override.IOThreshold.L0NumSubLevels != capacityOverrideSentinel { + ret.IOThreshold.L0NumSubLevels = override.IOThreshold.L0NumSubLevels + } + if override.IOThreshold.L0NumSubLevelsThreshold != capacityOverrideSentinel { + ret.IOThreshold.L0NumSubLevelsThreshold = override.IOThreshold.L0NumSubLevelsThreshold } - return capacity + return ret } // StoreUsageInfo contains the load on a single store. diff --git a/pkg/kv/kvserver/asim/state/new_state.go b/pkg/kv/kvserver/asim/state/new_state.go index c2b700b9559e..cee7959759b1 100644 --- a/pkg/kv/kvserver/asim/state/new_state.go +++ b/pkg/kv/kvserver/asim/state/new_state.go @@ -153,14 +153,14 @@ func RangesInfoWithDistribution( // their weights, a best effort apporach is taken so that the total number of // aggregate matches numNodes. func ClusterInfoWithDistribution( - numNodes int, storesPerNode int, regions []string, regionNodeWeights []float64, + nodeCount int, storesPerNode int, regions []string, regionNodeWeights []float64, ) ClusterInfo { ret := ClusterInfo{} ret.Regions = make([]Region, len(regions)) - availableNodes := numNodes + availableNodes := nodeCount for i, name := range regions { - allocatedNodes := int(float64(numNodes) * (regionNodeWeights[i])) + allocatedNodes := int(float64(nodeCount) * (regionNodeWeights[i])) if allocatedNodes > availableNodes { allocatedNodes = availableNodes } @@ -174,11 +174,11 @@ func ClusterInfoWithDistribution( return ret } -// ClusterInfoWithStores returns a new ClusterInfo with the specified number of -// stores. There will be only one store per node and a single region and zone. -func ClusterInfoWithStoreCount(stores int, storesPerNode int) ClusterInfo { +// ClusterInfoWithStoreCount returns a new ClusterInfo with the specified number of +// stores. There will be storesPerNode stores per node and a single region and zone. +func ClusterInfoWithStoreCount(nodeCount int, storesPerNode int) ClusterInfo { return ClusterInfoWithDistribution( - stores, + nodeCount, storesPerNode, []string{"AU_EAST"}, /* regions */ []float64{1}, /* regionNodeWeights */ diff --git a/pkg/kv/kvserver/asim/state/state.go b/pkg/kv/kvserver/asim/state/state.go index 599186932a3d..9dae2c6e0a6e 100644 --- a/pkg/kv/kvserver/asim/state/state.go +++ b/pkg/kv/kvserver/asim/state/state.go @@ -127,6 +127,10 @@ type State interface { // SetRangeBytes sets the size of the range with ID RangeID to be equal to // the bytes given. SetRangeBytes(RangeID, int64) + // SetCapacityOverride updates the capacity for the store with ID StoreID to + // always return the overriden value given for any set fields in + // CapacityOverride. + SetCapacityOverride(StoreID, CapacityOverride) // ValidTransfer returns whether transferring the lease for the Range with ID // RangeID, to the Store with ID StoreID is valid. ValidTransfer(RangeID, StoreID) bool @@ -180,6 +184,8 @@ type State interface { // RaftStatus returns the current raft status for the replica of the Range // with ID RangeID, on the store with ID StoreID. RaftStatus(RangeID, StoreID) *raft.Status + // Report returns the span config conformance report for every range. + Report() roachpb.SpanConfigConformanceReport // RegisterCapacityChangeListener registers a listener which will be // notified on events where there is a lease or replica addition or // removal, for a specific store. @@ -188,6 +194,9 @@ type State interface { // a new store capacity has been generated from scratch, for a specific // store. RegisterCapacityListener(NewCapacityListener) + // RegisterConfigChangeListener registers a listener which will be called + // when a cluster configuration change occurs such as a store being added. + RegisterConfigChangeListener(ConfigChangeListener) } // Node is a container for stores and is part of a cluster. diff --git a/pkg/kv/kvserver/asim/state/state_listener.go b/pkg/kv/kvserver/asim/state/state_listener.go index d1de123b4fae..dfdc31a4989b 100644 --- a/pkg/kv/kvserver/asim/state/state_listener.go +++ b/pkg/kv/kvserver/asim/state/state_listener.go @@ -28,3 +28,10 @@ type NewCapacityListener interface { // the store with ID StoreID. NewCapacityNotify(roachpb.StoreCapacity, StoreID) } + +// ConfigChangeListener listens for notification of configuration changes such +// as stores being added. +type ConfigChangeListener interface { + // StoreAddNotify notifies that a new store has been added with ID storeID. + StoreAddNotify(StoreID, State) +} diff --git a/pkg/kv/kvserver/asim/state/state_test.go b/pkg/kv/kvserver/asim/state/state_test.go index a0fea2b6911c..0f8208f2938d 100644 --- a/pkg/kv/kvserver/asim/state/state_test.go +++ b/pkg/kv/kvserver/asim/state/state_test.go @@ -13,6 +13,7 @@ package state import ( "math/rand" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload" @@ -686,3 +687,33 @@ US_West `, complexTopology.String()) } + +func TestCapacityOverride(t *testing.T) { + settings := config.DefaultSimulationSettings() + tick := settings.StartTime + s := LoadClusterInfo(ClusterInfoWithStoreCount(1, 1), settings) + storeID, rangeID := StoreID(1), RangeID(1) + _, ok := s.AddReplica(rangeID, storeID, roachpb.VOTER_FULL) + require.True(t, ok) + + override := NewCapacityOverride() + override.QueriesPerSecond = 42 + + // Overwrite the QPS store capacity field. + s.SetCapacityOverride(storeID, override) + + // Record 100 QPS of load, this should not change the store capacity QPS as + // we set it above, however it should change the written keys field. + s.ApplyLoad(workload.LoadBatch{workload.LoadEvent{ + Key: 1, + Writes: 500, + }}) + s.TickClock(tick.Add(5 * time.Second)) + + capacity := s.StoreDescriptors(false /* cached */, storeID)[0].Capacity + require.Equal(t, 42.0, capacity.QueriesPerSecond) + // NB: Writes per second isn't used and is currently returned as the sum of + // writes to the store - we expect it to be 500 instead of 100 for that + // reason. + require.Equal(t, 500.0, capacity.WritesPerSecond) +} diff --git a/pkg/kv/kvserver/asim/tests/BUILD.bazel b/pkg/kv/kvserver/asim/tests/BUILD.bazel index 4a40b14662cd..c3b00577b998 100644 --- a/pkg/kv/kvserver/asim/tests/BUILD.bazel +++ b/pkg/kv/kvserver/asim/tests/BUILD.bazel @@ -7,11 +7,18 @@ go_test( data = glob(["testdata/**"]), embed = [":tests"], deps = [ + "//pkg/kv/kvserver/allocator/allocatorimpl", "//pkg/kv/kvserver/asim", "//pkg/kv/kvserver/asim/config", + "//pkg/kv/kvserver/asim/event", "//pkg/kv/kvserver/asim/gen", "//pkg/kv/kvserver/asim/metrics", + "//pkg/kv/kvserver/asim/state", + "//pkg/kv/kvserver/liveness/livenesspb", + "//pkg/roachpb", + "//pkg/spanconfig/spanconfigtestutils", "//pkg/testutils/datapathutils", + "//pkg/util/log", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_guptarohit_asciigraph//:asciigraph", "@com_github_stretchr_testify//require", @@ -26,6 +33,8 @@ go_library( deps = [ "//pkg/kv/kvserver/asim", "//pkg/kv/kvserver/asim/metrics", + "//pkg/roachpb", + "//pkg/spanconfig/spanconfigtestutils", "//pkg/util/log", "@com_github_montanaflynn_stats//:stats", ], diff --git a/pkg/kv/kvserver/asim/tests/assert.go b/pkg/kv/kvserver/asim/tests/assert.go index a17d86092dec..85775775c3d9 100644 --- a/pkg/kv/kvserver/asim/tests/assert.go +++ b/pkg/kv/kvserver/asim/tests/assert.go @@ -18,6 +18,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/metrics" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/montanaflynn/stats" ) @@ -181,3 +183,168 @@ func (ba balanceAssertion) String() string { "balance stat=%s threshold=%.2f ticks=%d", ba.stat, ba.threshold, ba.ticks) } + +type storeStatAssertion struct { + ticks int + stat string + stores []int + acceptedValue float64 +} + +// Assert looks at a simulation run history and returns true if the +// assertion holds and false if not. When the assertion does not hold, the +// reason is also returned. +func (sa storeStatAssertion) Assert( + ctx context.Context, h asim.History, +) (holds bool, reason string) { + m := h.Recorded + ticks := len(m) + if sa.ticks > ticks { + log.VInfof(ctx, 2, + "The history to run assertions against (%d) is shorter than "+ + "the assertion duration (%d)", ticks, sa.ticks) + return true, "" + } + + ts := metrics.MakeTS(m) + statTs := ts[sa.stat] + holds = true + // Set holds to be true initially, holds is set to false if the steady + // state assertion doesn't hold on any store. + holds = true + buf := strings.Builder{} + + for _, store := range sa.stores { + trimmedStoreStats := statTs[store-1][ticks-sa.ticks-1:] + for _, stat := range trimmedStoreStats { + if stat != sa.acceptedValue { + if holds { + holds = false + fmt.Fprintf(&buf, " %s\n", sa) + } + fmt.Fprintf(&buf, + "\tstore=%d stat=%.2f\n", + store, stat) + } + } + } + return holds, buf.String() +} + +// String returns the string representation of the assertion. +func (sa storeStatAssertion) String() string { + return fmt.Sprintf("stat=%s value=%.2f ticks=%d", + sa.stat, sa.acceptedValue, sa.ticks) +} + +type conformanceAssertion struct { + underreplicated int + overreplicated int + violating int + unavailable int +} + +// conformanceAssertionSentinel declares a sentinel value which when any of the +// conformanceAssertion parameters are set to, we ignore the conformance +// reports value for that type of conformance. +const conformanceAssertionSentinel = -1 + +// Assert looks at a simulation run history and returns true if the +// assertion holds and false if not. When the assertion does not hold, the +// reason is also returned. +func (ca conformanceAssertion) Assert( + ctx context.Context, h asim.History, +) (holds bool, reason string) { + report := h.S.Report() + buf := strings.Builder{} + holds = true + + unavailable, under, over, violating := len(report.Unavailable), len(report.UnderReplicated), len(report.OverReplicated), len(report.ViolatingConstraints) + + maybeInitHolds := func() { + if holds { + holds = false + fmt.Fprintf(&buf, " %s\n", ca) + fmt.Fprintf(&buf, " actual unavailable=%d under=%d, over=%d violating=%d\n", + unavailable, under, over, violating, + ) + } + } + + if ca.unavailable != conformanceAssertionSentinel && + ca.unavailable != unavailable { + maybeInitHolds() + buf.WriteString(PrintSpanConfigConformanceList( + "unavailable", report.Unavailable)) + } + if ca.underreplicated != conformanceAssertionSentinel && + ca.underreplicated != under { + maybeInitHolds() + buf.WriteString(PrintSpanConfigConformanceList( + "under replicated", report.UnderReplicated)) + } + if ca.overreplicated != conformanceAssertionSentinel && + ca.overreplicated != over { + maybeInitHolds() + buf.WriteString(PrintSpanConfigConformanceList( + "over replicated", report.OverReplicated)) + } + if ca.violating != conformanceAssertionSentinel && + ca.violating != violating { + maybeInitHolds() + buf.WriteString(PrintSpanConfigConformanceList( + "violating constraints", report.ViolatingConstraints)) + } + + return holds, buf.String() +} + +// String returns the string representation of the assertion. +func (ca conformanceAssertion) String() string { + buf := strings.Builder{} + fmt.Fprintf(&buf, "conformance ") + if ca.unavailable != conformanceAssertionSentinel { + fmt.Fprintf(&buf, "unavailable=%d ", ca.unavailable) + } + if ca.underreplicated != conformanceAssertionSentinel { + fmt.Fprintf(&buf, "under=%d ", ca.underreplicated) + } + if ca.overreplicated != conformanceAssertionSentinel { + fmt.Fprintf(&buf, "over=%d ", ca.overreplicated) + } + if ca.violating != conformanceAssertionSentinel { + fmt.Fprintf(&buf, "violating=%d ", ca.violating) + } + return buf.String() +} + +func printRangeDesc(r roachpb.RangeDescriptor) string { + var buf strings.Builder + buf.WriteString(fmt.Sprintf("r%d:", r.RangeID)) + buf.WriteString(r.RSpan().String()) + buf.WriteString(" [") + if allReplicas := r.Replicas().Descriptors(); len(allReplicas) > 0 { + for i, rep := range allReplicas { + if i > 0 { + buf.WriteString(", ") + } + buf.WriteString(rep.String()) + } + } else { + buf.WriteString("") + } + buf.WriteString("]") + return buf.String() +} + +func PrintSpanConfigConformanceList(tag string, ranges []roachpb.ConformanceReportedRange) string { + var buf strings.Builder + for i, r := range ranges { + if i == 0 { + buf.WriteString(fmt.Sprintf("%s:\n", tag)) + } + buf.WriteString(fmt.Sprintf(" %s applying %s\n", printRangeDesc(r.RangeDescriptor), + spanconfigtestutils.PrintSpanConfigDiffedAgainstDefaults(r.Config))) + } + return buf.String() +} diff --git a/pkg/kv/kvserver/asim/tests/datadriven_simulation_test.go b/pkg/kv/kvserver/asim/tests/datadriven_simulation_test.go index 2989358081ce..4087de818a57 100644 --- a/pkg/kv/kvserver/asim/tests/datadriven_simulation_test.go +++ b/pkg/kv/kvserver/asim/tests/datadriven_simulation_test.go @@ -19,11 +19,18 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/event" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/gen" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/metrics" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/datadriven" "github.com/guptarohit/asciigraph" "github.com/stretchr/testify/require" @@ -42,14 +49,50 @@ import ( // simulation. The default values are: rw_ratio=0 rate=0 min_block=1 // max_block=1 min_key=1 max_key=10_000 access_skew=false. // -// - "gen_state" [stores=] [ranges=] [placement_skew=] -// [repl_factor=] [keyspace=] -// Initialize the state generator parameters. On the next call to eval, the -// state generator is called to create the initial state used in the -// simulation. The default values are: stores=3 ranges=1 repl_factor=3 +// - "ken_cluster" [nodes=] [stores_per_node=] +// Initialize the cluster generator parameters. On the next call to eval, +// the cluster generator is called to create the initial state used in the +// simulation. The default values are: nodes=3 stores_per_node=1. +// +// - "load_cluster": config= +// Load a defined cluster configuration to be the generated cluster in the +// simulation. The available confiurations are: single_region: 15 nodes in +// region=US, 5 in each zone US_1/US_2/US_3. single_region_multi_store: 3 +// nodes, 5 stores per node with the same zone/region configuration as +// above. multi_region: 36 nodes, 12 in each region and 4 in each zone, +// regions having 3 zones. complex: 28 nodes, 3 regions with a skewed +// number of nodes per region. +// +// - "gen_ranges" [ranges=] [placement_skew=] [repl_factor=] +// [keyspace=] [range_bytes=] +// Initialize the range generator parameters. On the next call to eval, the +// range generator is called to assign an ranges and their replica +// placement. The default values are ranges=1 repl_factor=3 // placement_skew=false keyspace=10000. // -// - "assertion" type= stat= ticks= threshold= +// - set_liveness node= [delay=] +// status=(dead|decommisssioning|draining|unavailable) +// Set the liveness status of the node with ID NodeID. This applies at the +// start of the simulation or with some delay after the simulation starts, +// if specified. +// +// - add_node: [stores=] [locality=] [delay=] +// Add a node to the cluster after initial generation with some delay, +// locality and number of stores on the node. The default values are +// stores=0 locality=none delay=0. +// +// - set_span_config [delay=] +// [startKey, endKey): Provide a new line separated list +// of spans and span configurations e.g. +// [0,100): num_replicas=5 num_voters=3 constraints={'+region=US_East'} +// [100, 500): num_replicas=3 +// ... +// This will update the span config for the span [0,100) to specify 3 +// voting replicas and 2 non-voting replicas, with a constraint that all +// replicas are in the region US_East. +// +// - "assertion" type= [stat=] [ticks=] [threshold=] +// [store=] [(under|over|unavailable|violating)=] // Add an assertion to the list of assertions that run against each // sample on subsequent calls to eval. When every assertion holds during eval, // OK is printed, otherwise the reason the assertion(s) failed is printed. @@ -68,6 +111,16 @@ import ( // threshold (e.g. threshold=0.05) % of the mean, the assertion fails. This // assertion applies per-store, over 'ticks' duration. // +// For type=stat assertions, if the stat (e.g. stat=replicas) value of the +// last ticks (e.g. ticks=5) duration is not exactly equal to threshold, +// the assertion fails. This applies for a specified store which must be +// provided with store=storeID. +// +// For type=conformance assertions, you may assert on the number of +// replicas that you expect to be underreplicated (under), +// overreplicated(over), unavailable(unavailable) and violating +// constraints(violating) at the end of the evaluation. +// // - "setting" [rebalance_mode=] [rebalance_interval=] // [rebalance_qps_threshold=] [split_qps_threshold=] // [rebalance_range_threshold=] [gossip_delay=] @@ -89,14 +142,31 @@ import ( // is the simulated time and the y axis is the stat value. A series is // rendered per-store, so if there are 10 stores, 10 series will be // rendered. +// +// - "topology" [sample=] +// Print the cluster locality topology of the sample given (default=last). +// e.g. for the load_cluster config=single_region +// US +// ..US_1 +// ....└── [1 2 3 4 5] +// ..US_2 +// ....└── [6 7 8 9 10] +// ..US_3 +// ....└── [11 12 13 14 15] func TestDataDriven(t *testing.T) { ctx := context.Background() dir := datapathutils.TestDataPath(t, ".") datadriven.Walk(t, dir, func(t *testing.T, path string) { const defaultKeyspace = 10000 loadGen := gen.BasicLoad{} - stateGen := gen.BasicState{} + var clusterGen gen.ClusterGen + var rangeGen gen.RangeGen = gen.BasicRanges{ + Ranges: 1, + ReplicationFactor: 1, + KeySpace: defaultKeyspace, + } settingsGen := gen.StaticSettings{Settings: config.DefaultSimulationSettings()} + eventGen := gen.StaticEvents{DelayedEvents: event.DelayedEventList{}} assertions := []SimulationAssertion{} runs := []asim.History{} datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { @@ -123,21 +193,177 @@ func TestDataDriven(t *testing.T) { loadGen.MaxBlockSize = maxBlock loadGen.MinBlockSize = minBlock return "" - case "gen_state": - var stores, ranges, replFactor, keyspace = 3, 1, 3, defaultKeyspace + case "gen_ranges": + var ranges, replFactor, keyspace = 1, 3, defaultKeyspace + var bytes int64 = 0 var placementSkew bool - scanIfExists(t, d, "stores", &stores) scanIfExists(t, d, "ranges", &ranges) scanIfExists(t, d, "repl_factor", &replFactor) scanIfExists(t, d, "placement_skew", &placementSkew) scanIfExists(t, d, "keyspace", &keyspace) + scanIfExists(t, d, "bytes", &bytes) + + var placementType gen.PlacementType + if placementSkew { + placementType = gen.Skewed + } else { + placementType = gen.Uniform + } + rangeGen = gen.BasicRanges{ + Ranges: ranges, + PlacementType: placementType, + KeySpace: keyspace, + ReplicationFactor: replFactor, + Bytes: bytes, + } + return "" + case "topology": + var sample = len(runs) + scanIfExists(t, d, "sample", &sample) + top := runs[sample-1].S.Topology() + return (&top).String() + case "gen_cluster": + var nodes = 3 + var storesPerNode = 1 + scanIfExists(t, d, "nodes", &nodes) + scanIfExists(t, d, "stores_per_node", &storesPerNode) + clusterGen = gen.BasicCluster{ + Nodes: nodes, + StoresPerNode: storesPerNode, + } + return "" + case "load_cluster": + var config string + var clusterInfo state.ClusterInfo + scanArg(t, d, "config", &config) + + switch config { + case "single_region": + clusterInfo = state.SingleRegionConfig + case "single_region_multi_store": + clusterInfo = state.SingleRegionMultiStoreConfig + case "multi_region": + clusterInfo = state.MultiRegionConfig + case "complex": + clusterInfo = state.ComplexConfig + default: + panic(fmt.Sprintf("unknown cluster config %s", config)) + } + + clusterGen = gen.LoadedCluster{ + Info: clusterInfo, + } + return "" + case "add_node": + var delay time.Duration + var numStores = 1 + var localityString string + scanIfExists(t, d, "delay", &delay) + scanIfExists(t, d, "stores", &numStores) + scanIfExists(t, d, "locality", &localityString) + + addEvent := event.DelayedEvent{ + EventFn: func(ctx context.Context, tick time.Time, s state.State) { + node := s.AddNode() + if localityString != "" { + var locality roachpb.Locality + if err := locality.Set(localityString); err != nil { + panic(fmt.Sprintf("unable to set node locality %s", err.Error())) + } + s.SetNodeLocality(node.NodeID(), locality) + } + for i := 0; i < numStores; i++ { + if _, ok := s.AddStore(node.NodeID()); !ok { + panic(fmt.Sprintf("adding store to node=%d failed", node)) + } + } + }, + At: settingsGen.Settings.StartTime.Add(delay), + } + eventGen.DelayedEvents = append(eventGen.DelayedEvents, addEvent) + return "" + case "set_span_config": + var delay time.Duration + scanIfExists(t, d, "delay", &delay) + for _, line := range strings.Split(d.Input, "\n") { + line = strings.TrimSpace(line) + if line == "" { + continue + } + tag, data, found := strings.Cut(line, ":") + require.True(t, found) + tag, data = strings.TrimSpace(tag), strings.TrimSpace(data) + span := spanconfigtestutils.ParseSpan(t, tag) + conf := spanconfigtestutils.ParseZoneConfig(t, data).AsSpanConfig() + eventGen.DelayedEvents = append(eventGen.DelayedEvents, event.DelayedEvent{ + EventFn: func(ctx context.Context, tick time.Time, s state.State) { + s.SetSpanConfig(span, conf) + }, + At: settingsGen.Settings.StartTime.Add(delay), + }) + } + return "" + case "set_liveness": + var nodeID int + var liveness string + var delay time.Duration + livenessStatus := 3 + scanArg(t, d, "node", &nodeID) + scanArg(t, d, "liveness", &liveness) + scanIfExists(t, d, "delay", &delay) + switch liveness { + case "unknown": + livenessStatus = 0 + case "dead": + livenessStatus = 1 + case "unavailable": + livenessStatus = 2 + case "live": + livenessStatus = 3 + case "decommissioning": + livenessStatus = 4 + case "draining": + livenessStatus = 5 + panic(fmt.Sprintf("unkown liveness status: %s", liveness)) + } + eventGen.DelayedEvents = append(eventGen.DelayedEvents, event.DelayedEvent{ + EventFn: func(ctx context.Context, tick time.Time, s state.State) { + s.SetNodeLiveness( + state.NodeID(nodeID), + livenesspb.NodeLivenessStatus(livenessStatus), + ) + }, + At: settingsGen.Settings.StartTime.Add(delay), + }) + return "" + case "set_capacity": + var store int + var ioThreshold float64 = -1 + var capacity, available int64 = -1, -1 + var delay time.Duration + + scanArg(t, d, "store", &store) + scanIfExists(t, d, "io_threshold", &ioThreshold) + scanIfExists(t, d, "capacity", &capacity) + scanIfExists(t, d, "available", &available) + scanIfExists(t, d, "delay", &delay) + + capacityOverride := state.NewCapacityOverride() + capacityOverride.Capacity = capacity + capacityOverride.Available = available + if ioThreshold != -1 { + capacityOverride.IOThreshold = allocatorimpl.TestingIOThresholdWithScore(ioThreshold) + } + + eventGen.DelayedEvents = append(eventGen.DelayedEvents, event.DelayedEvent{ + EventFn: func(ctx context.Context, tick time.Time, s state.State) { + log.Infof(ctx, "setting capacity override %+v", capacityOverride) + s.SetCapacityOverride(state.StoreID(store), capacityOverride) + }, + At: settingsGen.Settings.StartTime.Add(delay), + }) - stateGen.Stores = stores - stateGen.ReplicationFactor = replFactor - stateGen.KeySpace = keyspace - stateGen.Ranges = ranges - stateGen.SkewedPlacement = placementSkew return "" case "eval": samples := 1 @@ -158,7 +384,8 @@ func TestDataDriven(t *testing.T) { for sample := 0; sample < samples; sample++ { assertionFailures := []string{} simulator := gen.GenerateSimulation( - duration, stateGen, loadGen, settingsGen, seedGen.Int63(), + duration, clusterGen, rangeGen, loadGen, + settingsGen, eventGen, seedGen.Int63(), ) simulator.RunSim(ctx) history := simulator.History() @@ -195,23 +422,55 @@ func TestDataDriven(t *testing.T) { var threshold float64 scanArg(t, d, "type", &typ) - scanArg(t, d, "stat", &stat) - scanArg(t, d, "ticks", &ticks) - scanArg(t, d, "threshold", &threshold) switch typ { case "balance": + scanArg(t, d, "stat", &stat) + scanArg(t, d, "ticks", &ticks) + scanArg(t, d, "threshold", &threshold) assertions = append(assertions, balanceAssertion{ ticks: ticks, stat: stat, threshold: threshold, }) case "steady": + scanArg(t, d, "stat", &stat) + scanArg(t, d, "ticks", &ticks) + scanArg(t, d, "threshold", &threshold) assertions = append(assertions, steadyStateAssertion{ ticks: ticks, stat: stat, threshold: threshold, }) + case "stat": + var store int + scanArg(t, d, "stat", &stat) + scanArg(t, d, "ticks", &ticks) + scanArg(t, d, "threshold", &threshold) + scanArg(t, d, "store", &store) + assertions = append(assertions, storeStatAssertion{ + ticks: ticks, + stat: stat, + acceptedValue: threshold, + // TODO(kvoli): support setting multiple stores. + stores: []int{store}, + }) + case "conformance": + var under, over, unavailable, violating int + under = conformanceAssertionSentinel + over = conformanceAssertionSentinel + unavailable = conformanceAssertionSentinel + violating = conformanceAssertionSentinel + scanIfExists(t, d, "under", &under) + scanIfExists(t, d, "over", &over) + scanIfExists(t, d, "unavailable", &unavailable) + scanIfExists(t, d, "violating", &violating) + assertions = append(assertions, conformanceAssertion{ + underreplicated: under, + overreplicated: over, + violating: violating, + unavailable: unavailable, + }) } return "" case "setting": @@ -221,6 +480,7 @@ func TestDataDriven(t *testing.T) { scanIfExists(t, d, "split_qps_threshold", &settingsGen.Settings.SplitQPSThreshold) scanIfExists(t, d, "rebalance_range_threshold", &settingsGen.Settings.RangeRebalanceThreshold) scanIfExists(t, d, "gossip_delay", &settingsGen.Settings.StateExchangeDelay) + scanIfExists(t, d, "range_size_split_threshold", &settingsGen.Settings.RangeSizeSplitThreshold) return "" case "plot": var stat string @@ -228,7 +488,7 @@ func TestDataDriven(t *testing.T) { var buf strings.Builder scanArg(t, d, "stat", &stat) - scanArg(t, d, "sample", &sample) + scanIfExists(t, d, "sample", &sample) scanIfExists(t, d, "height", &height) scanIfExists(t, d, "width", &width) diff --git a/pkg/kv/kvserver/asim/tests/testdata/example_add_node b/pkg/kv/kvserver/asim/tests/testdata/example_add_node new file mode 100644 index 000000000000..83f828b79b41 --- /dev/null +++ b/pkg/kv/kvserver/asim/tests/testdata/example_add_node @@ -0,0 +1,60 @@ +# This test simulates the behavior of the roachtest replicate/1to3. Where +# initially there is one store, two new stores are added and the the test +# asserts the replica counts between the 3 stores eventually balances. +gen_cluster nodes=1 +---- + +# Generate 300 ranges, where each range is 100mb (logical). +gen_ranges ranges=300 range_bytes=100000000 repl_factor=1 +---- + +# Add the two new nodes that won't be in the initial cluster, however added as +# soon as the simulation evaluation begins i.e. with delay=0. +add_node +---- + +add_node +---- + +# Assert that the replica counts balance within 5% of each other among stores. +assertion type=balance stat=replicas ticks=6 threshold=1.05 +---- + +# Update the replication factor for the keyspace to be 3, instead of the +# initial replication factor of 1 set during generation. +set_span_config +[0,10000): num_replicas=3 num_voters=3 +---- + +eval duration=20m samples=1 seed=42 +---- +OK + +# Plot the replica count from the evaluation. Since there are 300 replicas on +# s1 and the default RF=3, we expect the other stores to be up-replicated to +# 300 replicas as well. +plot stat=replicas sample=1 +---- +---- + + 301 ┼──────────────────────────────────────╭──────────────────────────────────────── + 281 ┤ ╭╭─╯ + 261 ┤ ╭╭──╯ + 241 ┤ ╭╭─╯ + 221 ┤ ╭───╯ + 201 ┤ ╭╭─╯ + 181 ┤ ╭──╯ + 161 ┤ ╭──╯ + 140 ┤ ╭──╯╯ + 120 ┤ ╭─╯╯ + 100 ┤ ╭──╯ + 80 ┤ ╭─╯╯ + 60 ┤ ╭──╯ + 40 ┤ ╭─╯ + 20 ┤ ╭──╯ + 0 ┼─╯ + replicas +---- +---- + +# vim:ft=sh diff --git a/pkg/kv/kvserver/asim/tests/testdata/example_fulldisk b/pkg/kv/kvserver/asim/tests/testdata/example_fulldisk new file mode 100644 index 000000000000..6654796c7829 --- /dev/null +++ b/pkg/kv/kvserver/asim/tests/testdata/example_fulldisk @@ -0,0 +1,68 @@ +gen_cluster nodes=5 +---- + +gen_ranges ranges=500 bytes=300000000 +---- + +gen_load rate=500 max_block=128000 min_block=128000 +---- + +set_capacity store=5 capacity=45000000000 +---- + +eval duration=30m seed=42 +---- +OK + +# Plot the replicas over time per store. With a steady state of writes, we will +# repeatedly hit the disk fullness threshold which causes shedding replicas on +# store 5. This is shown below as it sheds replicas. +plot stat=replicas +---- +---- + + 336 ┤ ╭╮ ╭╭╮╭╮─╭╮╭╭╮ ╭╮╭──╮╮╭╭─ + 325 ┤ ╭╮╭────╮╭────────╯╰╯╰─╯╰─╯╰──────────────╮╭╯─╯╰──╯╯ + 314 ┤ ╭╭╭╮╭─╭──╯╰╯╯╰╯╰╰╯ ╰╯ ╰╯ ╰╯╰╯╰╯ ╰╯ + 302 ┼───────────────╮─────────╯ + 291 ┤ ╰───╮ + 280 ┤ ╰╮ ╭╮ + 269 ┤ ╰─╯╰╮ + 258 ┤ ╰╮ + 246 ┤ ╰──╮ + 235 ┤ ╰╮ + 224 ┤ ╰╮ + 213 ┤ ╰╮╭────╮ ╭╮ + 202 ┤ ╰╯ ╰╮ ╭──────────╯╰───╮ + 190 ┤ ╰─────╮ ╭───╮ ╭╯ │ + 179 ┤ ╰───╯ ╰─╯ ╰─╮ ╭──╮ + 168 ┤ ╰─╯ ╰ + replicas +---- +---- + +# Plot the % of disk storage capacity used. We should see s5 hovering right +# around 92.5-95% (the storage capacity threshold value). +plot stat=disk_fraction_used +---- +---- + + 0.98 ┤ ╭─╮ ╭╮ ╭╮╭─╮╭──╮ ╭──────╮╭─╮ ╭───╮ ╭╮ ╭╮╭─╮ ╭───╮ ╭─╮ + 0.91 ┤ ╭───────╯ ╰─╯╰──╯╰╯ ╰╯ ╰──╯ ╰╯ ╰────╯ ╰────╯╰──╯╰╯ ╰──╯ ╰───╯ ╰ + 0.85 ┼──────╯ + 0.78 ┤ + 0.72 ┤ + 0.65 ┤ + 0.59 ┤ + 0.52 ┤ + 0.46 ┤ + 0.39 ┤ + 0.33 ┤ + 0.26 ┤ + 0.20 ┤ + 0.13 ┤ + 0.07 ┤ + 0.00 ┼─────────────────────────────────────────────────────────────────────────────── + disk_fraction_used +---- +---- diff --git a/pkg/kv/kvserver/asim/tests/testdata/example_io_overload b/pkg/kv/kvserver/asim/tests/testdata/example_io_overload new file mode 100644 index 000000000000..0e1adcb16822 --- /dev/null +++ b/pkg/kv/kvserver/asim/tests/testdata/example_io_overload @@ -0,0 +1,41 @@ +gen_cluster nodes=5 +---- + +gen_ranges ranges=500 placement_skew=true +---- + +set_capacity store=5 io_threshold=1 +---- + +assertion type=stat stat=replicas store=5 threshold=0 ticks=5 +---- + +eval duration=10m seed=42 +---- +OK + +# Expect s5 to get no replicas due to IO overload. The plot below should show a +# solid line at 0, which will be s5's replica count. +plot stat=replicas +---- +---- + + 500 ┼────────╮╮ + 467 ┤ ╰──╰───────────╮╮ + 433 ┤ ╰╰────────────╮╮ + 400 ┤ ╰╰───────────╮─╮ ╭╮ ╭╮ ╭╮╭──╮ + 367 ┤ ╰╭─────╯╰───╯╰──╯╰╯──╰──────────── + 333 ┤ ╭────╯ + 300 ┤ ╭───╯ + 267 ┤ ╭───╯ + 233 ┤ ╭────╯ + 200 ┤ ╭───╯ + 167 ┤ ╭────╯ + 133 ┤ ╭───╯ + 100 ┤ ╭───╯ + 67 ┤ ╭────╯ + 33 ┤ ╭───╯ + 0 ┼─────────────────────────────────────────────────────────────────────────────── + replicas +---- +---- diff --git a/pkg/kv/kvserver/asim/tests/testdata/example_liveness b/pkg/kv/kvserver/asim/tests/testdata/example_liveness new file mode 100644 index 000000000000..9a4e5d970027 --- /dev/null +++ b/pkg/kv/kvserver/asim/tests/testdata/example_liveness @@ -0,0 +1,87 @@ +# This example sets n7 to dead initially and n5 to decommissioning after 2 +# minutes. The output of replicas per store is then plotted. +# +# Create 7 stores, with 700 ranges (RF=3). Each store should have approx 300 +# replicas and 100 leases. +gen_cluster nodes=7 +---- + +gen_ranges ranges=700 +---- + +# n7 is dead and remains dead forever. It will still have its initial (3000) +# replicas. +set_liveness node=7 liveness=dead +---- + +# n6 becomes decommissioning after 3 minutes and remains decommissioning +# thereafter. +set_liveness node=6 liveness=decommissioning delay=3m +---- + +# The number of replicas on the dead store should be 0, assert this. +assertion type=stat stat=replicas ticks=6 threshold=0 store=7 +---- + +# The number of replicas on the decommissioning store should be 0, assert this. +assertion type=stat stat=replicas ticks=6 threshold=0 store=6 +---- + +eval duration=12m seed=42 +---- +OK + +# We expect one node(store) (n7) to immediately start losing replicas, whilst +# other stores gain replicas evenly. After 3 minutes, we expect another +# node(store) (n6) to begin losing replicas in a similar manner. +plot stat=replicas +---- +---- + + 432 ┤ ╭────╭─────────────────── + 403 ┤ ╭──────╭───╭──────────────────────────────── + 374 ┤ ╭─╭──╭───────────────╯╯ + 346 ┤ ╭─╭╭──────────╯ + 317 ┤╭╭╭─────────────────────╮ + 288 ┼──╮ ╰───╮ + 259 ┤ ╰──╮ ╰────╮ + 230 ┤ ╰─╮ ╰──╮ + 202 ┤ ╰──╮ ╰──╮ + 173 ┤ ╰───╮ ╰────╮ + 144 ┤ ╰──╮ ╰──╮ + 115 ┤ ╰─╮ ╰──╮ + 86 ┤ ╰───╮ ╰──╮ + 58 ┤ ╰──╮ ╰────╮ + 29 ┤ ╰───╮ ╰───────╮ + 0 ┤ ╰──────────────────────────────────────────────── + replicas +---- +---- + +# Both nodes should begin losing leases immediately after their liveness status +# is changed to dead or decommissioning (5 minutes later). +plot stat=leases +---- +---- + + 148 ┤ ╭─────────────────────── + 138 ┤ ╭───╭─────╭─────────────────────── + 128 ┤ ╭────╭───────────╯╯──╯ + 118 ┤ ╭╮╭─────────────────╮────────╯────╯ + 109 ┤ ╭──────────╯───────────────╯ ╰─╮ + 99 ┼──╮──╯────────╯ ╰─╮ + 89 ┤ ╰───╮ ╰──╮ + 79 ┤ ╰─╮ ╰─╮ + 69 ┤ ╰──╮ ╰─╮ + 59 ┤ ╰───╮ ╰╮ + 49 ┤ ╰─╮ ╰──╮ + 39 ┤ ╰───╮ ╰─╮ + 30 ┤ ╰──╮ ╰─╮ + 20 ┤ ╰───╮ ╰──╮ + 10 ┤ ╰──╮ ╰──╮ + 0 ┤ ╰─────────────────────────────────────────────── + leases +---- +---- + +# vim:ft=sh diff --git a/pkg/kv/kvserver/asim/tests/testdata/example_load_cluster b/pkg/kv/kvserver/asim/tests/testdata/example_load_cluster new file mode 100644 index 000000000000..99a2fc6c38f7 --- /dev/null +++ b/pkg/kv/kvserver/asim/tests/testdata/example_load_cluster @@ -0,0 +1,49 @@ +# This test shows how configurations may be loaded from the existing catalog. +# This test also demonstrates how to use conformance assertions to check +# replication meet expectations. +load_cluster config=complex +---- + +# Load just a single range into state, with a RF=5. +gen_ranges ranges=1 repl_factor=5 +---- + +# Set the span config so that there are only voters, with 3 voters in US_East +# and 1 voter each in US_West and EU. +set_span_config +[0,10000): num_replicas=5 num_voters=5 constraints={'+region=US_East':3,'+region=US_West':1,'+region=EU':1} voter_constraints={'+region=US_East':3,'+region=US_West':1,'+region=EU':1} +---- + +# This assertion will fail if there are more than 0 unavailable, under +# replicated, over replicated or constraint violating ranges, once the +# simulation evaluation ends. +assertion type=conformance unavailable=0 under=0 over=0 violating=0 +---- + +eval duration=2m samples=1 seed=42 +---- +OK + +topology +---- +EU + EU_1 + │ └── [19 20 21] + EU_2 + │ └── [22 23 24] + EU_3 + │ └── [25 26 27 28] +US_East + US_East_1 + │ └── [1] + US_East_2 + │ └── [2 3] + US_East_3 + │ └── [4 5 6 7 8 9 10 11 12 13 14 15 16] +US_West + US_West_1 + └── [17 18] + + + +# vim:ft=sh diff --git a/pkg/kv/kvserver/asim/tests/testdata/example_multi_store b/pkg/kv/kvserver/asim/tests/testdata/example_multi_store new file mode 100644 index 000000000000..93a0f1cd9012 --- /dev/null +++ b/pkg/kv/kvserver/asim/tests/testdata/example_multi_store @@ -0,0 +1,46 @@ +# This test simulates identical parameters as the rebalance_load multi-store +# test. The number of leases per store should be equal to 1. We assert on this +# with a balance threshold of 1 (i.e. identical number of leases) and a steady +# state threshold of 0 (i.e. doesn't change). +gen_cluster nodes=7 stores_per_node=2 +---- + +gen_ranges ranges=14 placement_skew=true +---- + +gen_load rate=7000 +---- + +assertion stat=leases type=balance ticks=6 threshold=1 +---- + +assertion stat=leases type=steady ticks=6 threshold=0 +---- + +eval duration=5m seed=42 +---- +OK + +plot stat=leases +---- +---- + + 14.00 ┼╮ + 13.07 ┤╰╮ + 12.13 ┤ ╰╮ + 11.20 ┤ │ + 10.27 ┤ │ + 9.33 ┤ │ + 8.40 ┤ ╰╮ + 7.47 ┤ ╰╮ + 6.53 ┤ │ + 5.60 ┤ │ + 4.67 ┤ │ + 3.73 ┤ ╰───────────╮ + 2.80 ┤ │ + 1.87 ┤ ╭───────────╮──────────────╮ + 0.93 ┤╭╭╭──────────────────────────────────────────────────────────────────────────── + 0.00 ┼──╯─────────────╯──────────────╯ + leases +---- +---- diff --git a/pkg/kv/kvserver/asim/tests/testdata/example_rebalancing b/pkg/kv/kvserver/asim/tests/testdata/example_rebalancing index d26af0187a23..99b155512bed 100644 --- a/pkg/kv/kvserver/asim/tests/testdata/example_rebalancing +++ b/pkg/kv/kvserver/asim/tests/testdata/example_rebalancing @@ -2,7 +2,10 @@ # where there are 7 stores, 7 ranges and initially the replicas are placed # following a skewed distribution (where s1 has the most replicas, s2 has half # as many as s1...). -gen_state stores=7 ranges=7 placement_skew=true +gen_cluster nodes=7 +---- + +gen_ranges ranges=7 placement_skew=true ---- # Create a load generator, where there are 7k ops/s and the access follows a diff --git a/pkg/kv/kvserver/asim/tests/testdata/example_splitting b/pkg/kv/kvserver/asim/tests/testdata/example_splitting index 4f644d434dd6..51e5d1b1ceb2 100644 --- a/pkg/kv/kvserver/asim/tests/testdata/example_splitting +++ b/pkg/kv/kvserver/asim/tests/testdata/example_splitting @@ -1,6 +1,9 @@ # Explore how load based and sized based splitting occur in isolation. In this # example, there is only one store so no rebalancing activity should occur. -gen_state stores=1 ranges=1 repl_factor=1 +gen_cluster nodes=1 +---- + +gen_ranges ranges=1 repl_factor=1 ---- # Create a load generator, where there is higher ops/s than the qps split