Skip to content

Commit

Permalink
Merge #100979
Browse files Browse the repository at this point in the history
100979: asim: extend datadriven test to support recovery r=kvoli a=kvoli

Previously, only rebalancing was supported in the data driven simulation
test. This commit extends the syntax to support recovery scenarios. As
part of the extension, the state generator is split into range
generation and cluster generation.

Examples are added for each command, along with common testing scenarios
such as decommissioning, IO overload, disk fullness and adding a node
(with a store).

The newly supported commands are listed below:

```
- "load_cluster": config=<name>
  Load a defined cluster configuration to be the generated cluster in the
  simulation. The available confiurations are: single_region: 15 nodes in
  region=US, 5 in each zone US_1/US_2/US_3. single_region_multi_store: 3
  nodes, 5 stores per node with the same zone/region configuration as
  above. multi_region: 36 nodes, 12 in each region and 4 in each zone,
  regions having 3 zones. complex: 28 nodes, 3 regions with a skewed
  number of nodes per region.

- "gen_ranges" [ranges=<int>] [placement_skew=<bool>] [repl_factor=<int>]
  [keyspace=<int>] [range_bytes=<int>]
  Initialize the range generator parameters. On the next call to eval, the
  range generator is called to assign an ranges and their replica
  placement. The default values are ranges=1 repl_factor=3
  placement_skew=false keyspace=10000.

- set_liveness node=<int> [delay=<duration>]
  status=(dead|decommissioning|draining|unavailable)
  Set the liveness status of the node with ID NodeID. This applies at the
  start of the simulation or with some delay after the simulation starts,
  if specified.

- add_node: [stores=<int>] [locality=<string>] [delay=<duration>]
  Add a node to the cluster after initial generation with some delay,
  locality and number of stores on the node. The default values are
  stores=0 locality=none delay=0.

- set_span_config [delay=<duration>]
  [startKey, endKey): <span_config> Provide a new line separated list
  of spans and span configurations e.g.
  [0,100): num_replicas=5 num_voters=3 constraints={'+region=US_East'}
  [100, 500): num_replicas=3
  ...
  This will update the span config for the span [0,100) to specify 3
  voting replicas and 2 non-voting replicas, with a constraint that all
  replicas are in the region US_East.

- assertion extended to support two new assertion types:
  For type=stat assertions, if the stat (e.g. stat=replicas) value of the
  last ticks (e.g. ticks=5) duration is not exactly equal to threshold,
  the assertion fails. This applies for a specified store which must be
  provided with store=storeID.

  For type=conformance assertions, you may assert on the number of
  replicas that you expect to be under-replicated(under),
  over-replicated(over), unavailable(unavailable) and violating
  constraints(violating) at the end of the evaluation.

- "topology" [sample=<int>]
  Print the cluster locality topology of the sample given (default=last).
  e.g. for the load_cluster config=single_region
  US
  ..US_1
  ....└── [1 2 3 4 5]
  ..US_2
  ....└── [6 7 8 9 10]
  ..US_3
  ....└── [11 12 13 14 15]
```

Informs: #90137
Release note: None

Co-authored-by: Austen McClernon <[email protected]>
  • Loading branch information
craig[bot] and kvoli committed Jun 23, 2023
2 parents 5449699 + e4ed3a9 commit 1fd8581
Show file tree
Hide file tree
Showing 29 changed files with 1,464 additions and 148 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1280,6 +1280,7 @@ GO_TARGETS = [
"//pkg/kv/kvserver/apply:apply",
"//pkg/kv/kvserver/apply:apply_test",
"//pkg/kv/kvserver/asim/config:config",
"//pkg/kv/kvserver/asim/event:event",
"//pkg/kv/kvserver/asim/gen:gen",
"//pkg/kv/kvserver/asim/gossip:gossip",
"//pkg/kv/kvserver/asim/gossip:gossip_test",
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/asim/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver/asim/config",
"//pkg/kv/kvserver/asim/event",
"//pkg/kv/kvserver/asim/gossip",
"//pkg/kv/kvserver/asim/metrics",
"//pkg/kv/kvserver/asim/op",
"//pkg/kv/kvserver/asim/queue",
"//pkg/kv/kvserver/asim/state",
"//pkg/kv/kvserver/asim/storerebalancer",
"//pkg/kv/kvserver/asim/workload",
"//pkg/util/log",
],
)

Expand Down
173 changes: 112 additions & 61 deletions pkg/kv/kvserver/asim/asim.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,21 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/event"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/gossip"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/metrics"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/op"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/queue"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/storerebalancer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/workload"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// Simulator simulates an entire cluster, and runs the allocator of each store
// in that cluster.
type Simulator struct {
log.AmbientContext
curr time.Time
end time.Time
// interval is the step between ticks for active simulaton components, such
Expand All @@ -36,6 +39,7 @@ type Simulator struct {

// The simulator can run multiple workload Generators in parallel.
generators []workload.Generator
events event.DelayedEventList

pacers map[state.StoreID]queue.ReplicaPacer

Expand All @@ -53,6 +57,8 @@ type Simulator struct {
gossip gossip.Gossip
shuffler func(n int, swap func(i, j int))

settings *config.SimulationSettings

metrics *metrics.Tracker
history History
}
Expand All @@ -62,6 +68,7 @@ type Simulator struct {
// TODO(kvoli): Add a range log like structure to the history.
type History struct {
Recorded [][]metrics.StoreMetrics
S state.State
}

// Listen implements the metrics.StoreMetricListener interface.
Expand All @@ -76,82 +83,101 @@ func NewSimulator(
initialState state.State,
settings *config.SimulationSettings,
m *metrics.Tracker,
events ...event.DelayedEvent,
) *Simulator {
pacers := make(map[state.StoreID]queue.ReplicaPacer)
rqs := make(map[state.StoreID]queue.RangeQueue)
sqs := make(map[state.StoreID]queue.RangeQueue)
srs := make(map[state.StoreID]storerebalancer.StoreRebalancer)
changer := state.NewReplicaChanger()
controllers := make(map[state.StoreID]op.Controller)

s := &Simulator{
AmbientContext: log.MakeTestingAmbientCtxWithNewTracer(),
curr: settings.StartTime,
end: settings.StartTime.Add(duration),
interval: settings.TickInterval,
generators: wgs,
state: initialState,
changer: changer,
rqs: rqs,
sqs: sqs,
controllers: controllers,
srs: srs,
pacers: pacers,
gossip: gossip.NewGossip(initialState, settings),
metrics: m,
shuffler: state.NewShuffler(settings.Seed),
// TODO(kvoli): Keeping the state around is a bit hacky, find a better
// method of reporting the ranges.
history: History{Recorded: [][]metrics.StoreMetrics{}, S: initialState},
events: events,
settings: settings,
}

for _, store := range initialState.Stores() {
storeID := store.StoreID()
allocator := initialState.MakeAllocator(storeID)
storePool := initialState.StorePool(storeID)
// TODO(kvoli): Instead of passing in individual settings to construct
// the each ticking component, pass a pointer to the simulation
// settings struct. That way, the settings may be adjusted dynamically
// during a simulation.
rqs[storeID] = queue.NewReplicateQueue(
storeID,
changer,
settings.ReplicaChangeDelayFn(),
allocator,
storePool,
settings.StartTime,
)
sqs[storeID] = queue.NewSplitQueue(
storeID,
changer,
settings.RangeSplitDelayFn(),
settings.RangeSizeSplitThreshold,
settings.StartTime,
)
pacers[storeID] = queue.NewScannerReplicaPacer(
initialState.NextReplicasFn(storeID),
settings.PacerLoopInterval,
settings.PacerMinIterInterval,
settings.PacerMaxIterIterval,
settings.Seed,
)
controllers[storeID] = op.NewController(
changer,
allocator,
storePool,
settings,
storeID,
)
srs[storeID] = storerebalancer.NewStoreRebalancer(
settings.StartTime,
storeID,
controllers[storeID],
allocator,
storePool,
settings,
storerebalancer.GetStateRaftStatusFn(initialState),
)
s.addStore(storeID, settings.StartTime)
}
s.state.RegisterConfigChangeListener(s)

s := &Simulator{
curr: settings.StartTime,
end: settings.StartTime.Add(duration),
interval: settings.TickInterval,
generators: wgs,
state: initialState,
changer: changer,
rqs: rqs,
sqs: sqs,
controllers: controllers,
srs: srs,
pacers: pacers,
gossip: gossip.NewGossip(initialState, settings),
metrics: m,
shuffler: state.NewShuffler(settings.Seed),
history: History{Recorded: [][]metrics.StoreMetrics{}},
}
m.Register(&s.history)
s.AddLogTag("asim", nil)
return s
}

// StoreAddNotify notifies that a new store has been added with ID storeID.
func (s *Simulator) StoreAddNotify(storeID state.StoreID, _ state.State) {
s.addStore(storeID, s.curr)
}

func (s *Simulator) addStore(storeID state.StoreID, tick time.Time) {
allocator := s.state.MakeAllocator(storeID)
storePool := s.state.StorePool(storeID)
// TODO(kvoli): Instead of passing in individual settings to construct
// the each ticking component, pass a pointer to the simulation
// settings struct. That way, the settings may be adjusted dynamically
// during a simulation.
s.rqs[storeID] = queue.NewReplicateQueue(
storeID,
s.changer,
s.settings.ReplicaChangeDelayFn(),
allocator,
storePool,
tick,
)
s.sqs[storeID] = queue.NewSplitQueue(
storeID,
s.changer,
s.settings.RangeSplitDelayFn(),
s.settings.RangeSizeSplitThreshold,
tick,
)
s.pacers[storeID] = queue.NewScannerReplicaPacer(
s.state.NextReplicasFn(storeID),
s.settings.PacerLoopInterval,
s.settings.PacerMinIterInterval,
s.settings.PacerMaxIterIterval,
s.settings.Seed,
)
s.controllers[storeID] = op.NewController(
s.changer,
allocator,
storePool,
s.settings,
storeID,
)
s.srs[storeID] = storerebalancer.NewStoreRebalancer(
tick,
storeID,
s.controllers[storeID],
allocator,
storePool,
s.settings,
storerebalancer.GetStateRaftStatusFn(s.state),
)
}

// GetNextTickTime returns a simulated tick time, or an indication that the
// simulation is done.
func (s *Simulator) GetNextTickTime() (done bool, tick time.Time) {
Expand Down Expand Up @@ -189,9 +215,15 @@ func (s *Simulator) RunSim(ctx context.Context) {
break
}

s.AddLogTag("tick", tick.Format(time.StampMilli))
ctx = s.AmbientContext.AnnotateCtx(ctx)

// Update the store clocks with the current tick time.
s.tickStoreClocks(tick)

// Tick any events.
s.tickEvents(ctx, tick)

// Update the state with generated load.
s.tickWorkload(ctx, tick)

Expand Down Expand Up @@ -311,3 +343,22 @@ func (s *Simulator) tickStoreRebalancers(ctx context.Context, tick time.Time, st
func (s *Simulator) tickMetrics(ctx context.Context, tick time.Time) {
s.metrics.Tick(ctx, tick, s.state)
}

// tickEvents ticks the registered simulation events.
func (s *Simulator) tickEvents(ctx context.Context, tick time.Time) {
var idx int
// Assume the events are in sorted order and the event list is never added
// to.
for i := range s.events {
if !tick.Before(s.events[i].At) {
idx = i + 1
log.Infof(ctx, "applying event (scheduled=%s tick=%s)", s.events[i].At, tick)
s.events[i].EventFn(ctx, tick, s.state)
} else {
break
}
}
if idx != 0 {
s.events = s.events[idx:]
}
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/asim/asim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,6 @@ func TestAllocatorSimulatorDeterministic(t *testing.T) {
refRun = history
continue
}
require.Equal(t, refRun, history)
require.Equal(t, refRun.Recorded, history.Recorded)
}
}
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/asim/event/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "event",
srcs = ["delayed_event.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/event",
visibility = ["//visibility:public"],
deps = ["//pkg/kv/kvserver/asim/state"],
)
38 changes: 38 additions & 0 deletions pkg/kv/kvserver/asim/event/delayed_event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package event

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state"
)

type DelayedEventList []DelayedEvent

// Len implements sort.Interface.
func (del DelayedEventList) Len() int { return len(del) }

// Less implements sort.Interface.
func (del DelayedEventList) Less(i, j int) bool {
return del[i].At.Before(del[j].At)
}

// Swap implements sort.Interface.
func (del DelayedEventList) Swap(i, j int) {
del[i], del[j] = del[j], del[i]
}

type DelayedEvent struct {
At time.Time
EventFn func(context.Context, time.Time, state.State)
}
1 change: 1 addition & 0 deletions pkg/kv/kvserver/asim/gen/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
deps = [
"//pkg/kv/kvserver/asim",
"//pkg/kv/kvserver/asim/config",
"//pkg/kv/kvserver/asim/event",
"//pkg/kv/kvserver/asim/metrics",
"//pkg/kv/kvserver/asim/state",
"//pkg/kv/kvserver/asim/workload",
Expand Down
Loading

0 comments on commit 1fd8581

Please sign in to comment.