Skip to content

Commit

Permalink
Merge pull request #378 from symbiont-io/feat/runtime/restart-fault
Browse files Browse the repository at this point in the history
Feat/runtime/restart fault
  • Loading branch information
symbiont-daniel-gustafsson authored Nov 16, 2021
2 parents 33559e6 + d15621c commit 8fd3dfc
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 23 deletions.
51 changes: 43 additions & 8 deletions src/executor-event-loop/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,33 @@ type StepInfo struct {
type ComponentUpdate = func(component string) StepInfo

type Executor struct {
Topology lib.Topology
BuildTopology func() lib.Topology
Marshaler lib.Marshaler
Reactors []string
Topology lib.Topology
BuildReactor func(name string) lib.Reactor
Marshaler lib.Marshaler
//Update ComponentUpdate
}

func NewExecutor(buildTopology func() lib.Topology, m lib.Marshaler) *Executor {
func buildTopology(constructor func(name string) lib.Reactor, reactors []string) lib.Topology {
items := make([]lib.Item, 0, len(reactors))
for _, n := range reactors {
items = append(items, lib.Item{n, constructor(n)})
}
return lib.NewTopology(items...)
}

func NewExecutor(reactors []string, buildReactor func(name string) lib.Reactor, m lib.Marshaler) *Executor {
return &Executor{
Topology: buildTopology(),
BuildTopology: buildTopology,
Marshaler: m,
Reactors: reactors,
Topology: buildTopology(buildReactor, reactors),
BuildReactor: buildReactor,
Marshaler: m,
//Update: cu,
}
}

func (ex Executor) Reset() {
ex.Topology = ex.BuildTopology()
ex.Topology = buildTopology(ex.BuildReactor, ex.Reactors)
}

func (el Executor) processEnvelope(env Envelope) Message {
Expand Down Expand Up @@ -98,6 +108,31 @@ func (el Executor) processEnvelope(env Envelope) Message {
reactor := el.Topology.Reactor(req.Reactor)
oevs := reactor.Timer(req.At)
returnMessage = lib.MarshalUnscheduledEvents(req.Reactor, int(env.CorrelationId), oevs)
case "fault":
type FaultRequest struct {
Reactor string `json:"to"`
Event string `json:"event"`
}
var req FaultRequest
if err := json.Unmarshal(msg.Message, &req); err != nil {
panic(err)
}
switch req.Event {
case "restart":
el.Topology.Insert(req.Reactor, el.BuildReactor(req.Reactor))
default:
fmt.Printf("Unhandled fault type %s\n", req.Event)
panic("Unhandled fault type")
}
bs, err := json.Marshal(struct {
Events []lib.Event `json:"events"`
CorrelationId CorrelationId `json:"corrId"`
}{[]lib.Event{}, env.CorrelationId})
if err != nil {
panic(err)
}
returnMessage = bs

default:
fmt.Printf("Unknown message type: %#v\n", msg.Kind)
panic("Unknown message type")
Expand Down
4 changes: 3 additions & 1 deletion src/runtime-prototype/src/Scheduler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,11 @@ fakeScheduler executorRef (ClientRequest' "LoadTest" [SInt tid, SInt rid] cid) =
on f (\(IOResultR (IORows rs)) -> case parseRows rs of
Nothing -> clientResponse cid (InternalMessage "parse error")
Just [fs@Faults{}] -> do
let (fState, fAgenda) = newFaultState fs
modify $ \s ->
s { faultState = newFaultState fs
s { faultState = fState
, runId = Just rid
, agenda = agenda s <> fAgenda
})
-- clientResponse cid (InternalMessage (show fs))) -- hmm should we just do one response?
return (InternalMessage "ok")
Expand Down
3 changes: 3 additions & 0 deletions src/runtime-prototype/src/Scheduler/Agenda.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
module Scheduler.Agenda where

import Data.Heap (Entry(Entry), Heap)
Expand All @@ -10,6 +12,7 @@ import StuntDouble.Time
------------------------------------------------------------------------

newtype Agenda = Agenda (Heap (Entry Time SchedulerEvent))
deriving newtype (Semigroup, Monoid)

empty :: Agenda
empty = Agenda Heap.empty
Expand Down
27 changes: 22 additions & 5 deletions src/runtime-prototype/src/Scheduler/Fault.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
module Scheduler.Fault where

import qualified Data.Aeson as Aeson
import Data.List (foldl')
import Data.Map (Map)
import qualified Data.Map as Map
Expand All @@ -10,6 +11,8 @@ import qualified Data.Set as Set
import Data.Maybe (fromMaybe)
import qualified Data.Time as Time

import qualified Scheduler.Agenda as Agenda
import Scheduler.Agenda (Agenda)
import Scheduler.Event
import qualified Scheduler.Faults as Faults
import StuntDouble.LogicalTime
Expand Down Expand Up @@ -72,22 +75,36 @@ instance Semigroup FaultState where
instance Monoid FaultState where
mempty = FaultState mempty

newFaultState :: Faults.Faults -> FaultState
newFaultState :: Faults.Faults -> (FaultState, Agenda)
newFaultState = foldMap mkFaultState . Faults.faults
where
mkFaultState :: Faults.Fault -> FaultState
mkFaultState f = FaultState $ uncurry Map.singleton (translate f)
mkFaultState :: Faults.Fault -> (FaultState, Agenda)
mkFaultState f = (FaultState . fromMaybe mempty $ uncurry Map.singleton <$> (translate f)
, agendaItems f)

nodeName = NodeName "scheduler"
(!->) = (,)
translate :: Faults.Fault -> (ActorName, FaultStateForActor)
k !-> v = Just (k,v)

translate :: Faults.Fault -> Maybe (ActorName, FaultStateForActor)
translate (Faults.Omission _f t a) = t !-> mempty { fsOmissions = Set.singleton a}
translate (Faults.Crash f a) = f !-> mempty { fsPermanentCrash = Just $ LogicalTime nodeName{-?-} a}
translate (Faults.Pause n f t) = n !-> mempty { fsPause = singleton (TimeInterval f t)}
translate (Faults.Partition n c f t) = n !-> mempty { fsPartition = Map.fromList $ zip c (repeat ti) }
where ti = singleton $ TimeInterval f t
translate (Faults.ClockSkewBump n d f t) = n !-> mempty { fsClockSkew = ClockSkew [(TimeInterval f t, CSABump d)]}
translate (Faults.ClockSkewStrobe n d p f t) = n !-> mempty { fsClockSkew = ClockSkew [(TimeInterval f t, CSAStrobe d p)]}
translate Faults.RestartReactor{} = Nothing

agendaItems :: Faults.Fault -> Agenda
agendaItems Faults.Omission{} = mempty
agendaItems Faults.Crash{} = mempty
agendaItems Faults.Pause{} = mempty
agendaItems Faults.Partition{} = mempty
agendaItems Faults.ClockSkewBump{} = mempty
agendaItems Faults.ClockSkewStrobe{} = mempty
agendaItems (Faults.RestartReactor n t) = Agenda.push (t, ev) mempty
where
ev = SchedulerEvent {kind = "fault", event = "restart", args = Aeson.object [], from = "god", to = n, at = t, meta = Nothing}

------------------------------------------------------------------------
afterLogicalTime :: LogicalTime -> LogicalTime -> Bool
Expand Down
4 changes: 4 additions & 0 deletions src/runtime-prototype/src/Scheduler/Faults.hs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ data Fault
, fromTime :: Time
, toTime :: Time
}
| RestartReactor
{ node :: String
, fromTime :: Time
}
deriving (Generic, Show)

customOptions :: Options
Expand Down
20 changes: 11 additions & 9 deletions src/sut/register/executor/executorcmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,23 @@ import (
"github.com/symbiont-io/detsys-testkit/src/sut/register"
)

func main() {
func constructor(name string) lib.Reactor {
switch name {
case "frontend":
return sut.NewFrontEnd4()
default:
return sut.NewRegister()
}
}

func main() {
fmt.Printf("Starting up executor\n")
adminI := executorEL.NewAdmin("/tmp/executor-admin.sock")
fmt.Printf("Created admin\n")
commandT := executorEL.NewCommandTransport("/tmp/executor.sock")
fmt.Printf("Created command transport\n")
topology := func() lib.Topology {
return lib.NewTopology(
lib.Item{"frontend", sut.NewFrontEnd4()},
lib.Item{"register1", sut.NewRegister()},
lib.Item{"register2", sut.NewRegister()},
)
}
exe := executorEL.NewExecutor(topology, sut.NewMarshaler())
peers := []string{"frontend", "register1", "register2"}
exe := executorEL.NewExecutor(peers, constructor, sut.NewMarshaler())
el := executorEL.NewEventLoop(adminI, commandT, exe)

el.Run()
Expand Down

0 comments on commit 8fd3dfc

Please sign in to comment.