From 41c5318ee039de284dc8d44f70018334b3b92b3f Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 17 Jun 2022 14:30:11 -0400 Subject: [PATCH] proof of concept for injecting faults into `FSM.Apply` This changeset is a proof-of-concept for a fault injection interface into the `FSM.Apply` function. This would allow us to introduce timeouts or errors in unit testing by adding a LogApplier implementation to a map of `interceptionAppliers`. This is similar to how we register LogAppliers for the enterprise FSM functions currently. Most interception appliers are expected to then call the normal applier directly. This was developed initially for #13407 but can't be used to reproduce that particular bug. But I'm opening this PR for further discussion about whether this is a worthwhile tool to have for testing otherwise. --- nomad/fsm.go | 28 +++++++++++++++++++--------- nomad/plan_endpoint_test.go | 13 +++++++++++++ 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index 4bac03bef9e..4e370ae455b 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -92,6 +92,10 @@ type nomadFSM struct { // enterpriseRestorers holds the set of enterprise only snapshot restorers enterpriseRestorers SnapshotRestorers + // faultInjectionAppliers holds a set of test-only LogAppliers + // used to intercept raft messages to inject faults + interceptionAppliers LogAppliers + // stateLock is only used to protect outside callers to State() from // racing with Restore(), which is called by Raft (it puts in a totally // new state store). Everything internal here is synchronized by the @@ -153,15 +157,16 @@ func NewFSM(config *FSMConfig) (*nomadFSM, error) { } fsm := &nomadFSM{ - evalBroker: config.EvalBroker, - periodicDispatcher: config.Periodic, - blockedEvals: config.Blocked, - logger: config.Logger.Named("fsm"), - config: config, - state: state, - timetable: NewTimeTable(timeTableGranularity, timeTableLimit), - enterpriseAppliers: make(map[structs.MessageType]LogApplier, 8), - enterpriseRestorers: make(map[SnapshotType]SnapshotRestorer, 8), + evalBroker: config.EvalBroker, + periodicDispatcher: config.Periodic, + blockedEvals: config.Blocked, + logger: config.Logger.Named("fsm"), + config: config, + state: state, + timetable: NewTimeTable(timeTableGranularity, timeTableLimit), + enterpriseAppliers: make(map[structs.MessageType]LogApplier, 8), + enterpriseRestorers: make(map[SnapshotType]SnapshotRestorer, 8), + interceptionAppliers: make(map[structs.MessageType]LogApplier, 8), } // Register all the log applier functions @@ -207,6 +212,11 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { ignoreUnknown = true } + // Check interception message types. + if applier, ok := n.interceptionAppliers[msgType]; ok { + return applier(buf[1:], log.Index) + } + switch msgType { case structs.NodeRegisterRequestType: return n.applyUpsertNode(msgType, buf[1:], log.Index) diff --git a/nomad/plan_endpoint_test.go b/nomad/plan_endpoint_test.go index c36cb4f1989..b53406fcc4b 100644 --- a/nomad/plan_endpoint_test.go +++ b/nomad/plan_endpoint_test.go @@ -1,6 +1,7 @@ package nomad import ( + "fmt" "sync" "testing" "time" @@ -140,6 +141,18 @@ func TestPlanEndpoint_ApplyConcurrent(t *testing.T) { defer cleanupS1() testutil.WaitForLeader(t, s1.RPC) + planApplyFn := func(buf []byte, index uint64) interface{} { + if index == 8 { + fmt.Println("introducing delay") + time.Sleep(6000 * time.Millisecond) + } + return s1.fsm.applyPlanResults(structs.MsgTypeTestSetup, buf, index) + } + + s1.fsm.interceptionAppliers = map[structs.MessageType]LogApplier{ + structs.ApplyPlanResultsRequestType: planApplyFn, + } + plans := []*structs.Plan{} for i := 0; i < 5; i++ {