diff --git a/src/executor-event-loop/executor.go b/src/executor-event-loop/executor.go index ba7a6599..c9896639 100644 --- a/src/executor-event-loop/executor.go +++ b/src/executor-event-loop/executor.go @@ -24,23 +24,33 @@ type StepInfo struct { type ComponentUpdate = func(component string) StepInfo type Executor struct { - Topology lib.Topology - BuildTopology func() lib.Topology - Marshaler lib.Marshaler + Reactors []string + Topology lib.Topology + BuildReactor func(name string) lib.Reactor + Marshaler lib.Marshaler //Update ComponentUpdate } -func NewExecutor(buildTopology func() lib.Topology, m lib.Marshaler) *Executor { +func buildTopology(constructor func(name string) lib.Reactor, reactors []string) lib.Topology { + items := make([]lib.Item, 0, len(reactors)) + for _, n := range reactors { + items = append(items, lib.Item{n, constructor(n)}) + } + return lib.NewTopology(items...) +} + +func NewExecutor(reactors []string, buildReactor func(name string) lib.Reactor, m lib.Marshaler) *Executor { return &Executor{ - Topology: buildTopology(), - BuildTopology: buildTopology, - Marshaler: m, + Reactors: reactors, + Topology: buildTopology(buildReactor, reactors), + BuildReactor: buildReactor, + Marshaler: m, //Update: cu, } } func (ex Executor) Reset() { - ex.Topology = ex.BuildTopology() + ex.Topology = buildTopology(ex.BuildReactor, ex.Reactors) } func (el Executor) processEnvelope(env Envelope) Message { @@ -98,6 +108,31 @@ func (el Executor) processEnvelope(env Envelope) Message { reactor := el.Topology.Reactor(req.Reactor) oevs := reactor.Timer(req.At) returnMessage = lib.MarshalUnscheduledEvents(req.Reactor, int(env.CorrelationId), oevs) + case "fault": + type FaultRequest struct { + Reactor string `json:"to"` + Event string `json:"event"` + } + var req FaultRequest + if err := json.Unmarshal(msg.Message, &req); err != nil { + panic(err) + } + switch req.Event { + case "restart": + el.Topology.Insert(req.Reactor, el.BuildReactor(req.Reactor)) + default: + fmt.Printf("Unhandled fault type %s\n", req.Event) + panic("Unhandled fault type") + } + bs, err := json.Marshal(struct { + Events []lib.Event `json:"events"` + CorrelationId CorrelationId `json:"corrId"` + }{[]lib.Event{}, env.CorrelationId}) + if err != nil { + panic(err) + } + returnMessage = bs + default: fmt.Printf("Unknown message type: %#v\n", msg.Kind) panic("Unknown message type") diff --git a/src/runtime-prototype/src/Scheduler.hs b/src/runtime-prototype/src/Scheduler.hs index 6d6fd380..7e3b2533 100644 --- a/src/runtime-prototype/src/Scheduler.hs +++ b/src/runtime-prototype/src/Scheduler.hs @@ -175,9 +175,11 @@ fakeScheduler executorRef (ClientRequest' "LoadTest" [SInt tid, SInt rid] cid) = on f (\(IOResultR (IORows rs)) -> case parseRows rs of Nothing -> clientResponse cid (InternalMessage "parse error") Just [fs@Faults{}] -> do + let (fState, fAgenda) = newFaultState fs modify $ \s -> - s { faultState = newFaultState fs + s { faultState = fState , runId = Just rid + , agenda = agenda s <> fAgenda }) -- clientResponse cid (InternalMessage (show fs))) -- hmm should we just do one response? return (InternalMessage "ok") diff --git a/src/runtime-prototype/src/Scheduler/Agenda.hs b/src/runtime-prototype/src/Scheduler/Agenda.hs index c381a19d..e1a74553 100644 --- a/src/runtime-prototype/src/Scheduler/Agenda.hs +++ b/src/runtime-prototype/src/Scheduler/Agenda.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} module Scheduler.Agenda where import Data.Heap (Entry(Entry), Heap) @@ -10,6 +12,7 @@ import StuntDouble.Time ------------------------------------------------------------------------ newtype Agenda = Agenda (Heap (Entry Time SchedulerEvent)) + deriving newtype (Semigroup, Monoid) empty :: Agenda empty = Agenda Heap.empty diff --git a/src/runtime-prototype/src/Scheduler/Fault.hs b/src/runtime-prototype/src/Scheduler/Fault.hs index bf4fb283..011cb283 100644 --- a/src/runtime-prototype/src/Scheduler/Fault.hs +++ b/src/runtime-prototype/src/Scheduler/Fault.hs @@ -2,6 +2,7 @@ {-# LANGUAGE GeneralizedNewtypeDeriving #-} module Scheduler.Fault where +import qualified Data.Aeson as Aeson import Data.List (foldl') import Data.Map (Map) import qualified Data.Map as Map @@ -10,6 +11,8 @@ import qualified Data.Set as Set import Data.Maybe (fromMaybe) import qualified Data.Time as Time +import qualified Scheduler.Agenda as Agenda +import Scheduler.Agenda (Agenda) import Scheduler.Event import qualified Scheduler.Faults as Faults import StuntDouble.LogicalTime @@ -72,15 +75,17 @@ instance Semigroup FaultState where instance Monoid FaultState where mempty = FaultState mempty -newFaultState :: Faults.Faults -> FaultState +newFaultState :: Faults.Faults -> (FaultState, Agenda) newFaultState = foldMap mkFaultState . Faults.faults where - mkFaultState :: Faults.Fault -> FaultState - mkFaultState f = FaultState $ uncurry Map.singleton (translate f) + mkFaultState :: Faults.Fault -> (FaultState, Agenda) + mkFaultState f = (FaultState . fromMaybe mempty $ uncurry Map.singleton <$> (translate f) + , agendaItems f) nodeName = NodeName "scheduler" - (!->) = (,) - translate :: Faults.Fault -> (ActorName, FaultStateForActor) + k !-> v = Just (k,v) + + translate :: Faults.Fault -> Maybe (ActorName, FaultStateForActor) translate (Faults.Omission _f t a) = t !-> mempty { fsOmissions = Set.singleton a} translate (Faults.Crash f a) = f !-> mempty { fsPermanentCrash = Just $ LogicalTime nodeName{-?-} a} translate (Faults.Pause n f t) = n !-> mempty { fsPause = singleton (TimeInterval f t)} @@ -88,6 +93,18 @@ newFaultState = foldMap mkFaultState . Faults.faults where ti = singleton $ TimeInterval f t translate (Faults.ClockSkewBump n d f t) = n !-> mempty { fsClockSkew = ClockSkew [(TimeInterval f t, CSABump d)]} translate (Faults.ClockSkewStrobe n d p f t) = n !-> mempty { fsClockSkew = ClockSkew [(TimeInterval f t, CSAStrobe d p)]} + translate Faults.RestartReactor{} = Nothing + + agendaItems :: Faults.Fault -> Agenda + agendaItems Faults.Omission{} = mempty + agendaItems Faults.Crash{} = mempty + agendaItems Faults.Pause{} = mempty + agendaItems Faults.Partition{} = mempty + agendaItems Faults.ClockSkewBump{} = mempty + agendaItems Faults.ClockSkewStrobe{} = mempty + agendaItems (Faults.RestartReactor n t) = Agenda.push (t, ev) mempty + where + ev = SchedulerEvent {kind = "fault", event = "restart", args = Aeson.object [], from = "god", to = n, at = t, meta = Nothing} ------------------------------------------------------------------------ afterLogicalTime :: LogicalTime -> LogicalTime -> Bool diff --git a/src/runtime-prototype/src/Scheduler/Faults.hs b/src/runtime-prototype/src/Scheduler/Faults.hs index 5af55201..45f51270 100644 --- a/src/runtime-prototype/src/Scheduler/Faults.hs +++ b/src/runtime-prototype/src/Scheduler/Faults.hs @@ -50,6 +50,10 @@ data Fault , fromTime :: Time , toTime :: Time } + | RestartReactor + { node :: String + , fromTime :: Time + } deriving (Generic, Show) customOptions :: Options diff --git a/src/sut/register/executor/executorcmd.go b/src/sut/register/executor/executorcmd.go index 7b4de717..154139a0 100644 --- a/src/sut/register/executor/executorcmd.go +++ b/src/sut/register/executor/executorcmd.go @@ -8,21 +8,23 @@ import ( "github.com/symbiont-io/detsys-testkit/src/sut/register" ) -func main() { +func constructor(name string) lib.Reactor { + switch name { + case "frontend": + return sut.NewFrontEnd4() + default: + return sut.NewRegister() + } +} +func main() { fmt.Printf("Starting up executor\n") adminI := executorEL.NewAdmin("/tmp/executor-admin.sock") fmt.Printf("Created admin\n") commandT := executorEL.NewCommandTransport("/tmp/executor.sock") fmt.Printf("Created command transport\n") - topology := func() lib.Topology { - return lib.NewTopology( - lib.Item{"frontend", sut.NewFrontEnd4()}, - lib.Item{"register1", sut.NewRegister()}, - lib.Item{"register2", sut.NewRegister()}, - ) - } - exe := executorEL.NewExecutor(topology, sut.NewMarshaler()) + peers := []string{"frontend", "register1", "register2"} + exe := executorEL.NewExecutor(peers, constructor, sut.NewMarshaler()) el := executorEL.NewEventLoop(adminI, commandT, exe) el.Run()