Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/runtime/restart fault #378

Merged
merged 4 commits into from
Nov 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 43 additions & 8 deletions src/executor-event-loop/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,33 @@ type StepInfo struct {
type ComponentUpdate = func(component string) StepInfo

type Executor struct {
Topology lib.Topology
BuildTopology func() lib.Topology
Marshaler lib.Marshaler
Reactors []string
Topology lib.Topology
BuildReactor func(name string) lib.Reactor
Marshaler lib.Marshaler
//Update ComponentUpdate
}

func NewExecutor(buildTopology func() lib.Topology, m lib.Marshaler) *Executor {
func buildTopology(constructor func(name string) lib.Reactor, reactors []string) lib.Topology {
items := make([]lib.Item, 0, len(reactors))
for _, n := range reactors {
items = append(items, lib.Item{n, constructor(n)})
}
return lib.NewTopology(items...)
}

func NewExecutor(reactors []string, buildReactor func(name string) lib.Reactor, m lib.Marshaler) *Executor {
return &Executor{
Topology: buildTopology(),
BuildTopology: buildTopology,
Marshaler: m,
Reactors: reactors,
Topology: buildTopology(buildReactor, reactors),
BuildReactor: buildReactor,
Marshaler: m,
//Update: cu,
}
}

func (ex Executor) Reset() {
ex.Topology = ex.BuildTopology()
ex.Topology = buildTopology(ex.BuildReactor, ex.Reactors)
}

func (el Executor) processEnvelope(env Envelope) Message {
Expand Down Expand Up @@ -98,6 +108,31 @@ func (el Executor) processEnvelope(env Envelope) Message {
reactor := el.Topology.Reactor(req.Reactor)
oevs := reactor.Timer(req.At)
returnMessage = lib.MarshalUnscheduledEvents(req.Reactor, int(env.CorrelationId), oevs)
case "fault":
type FaultRequest struct {
Reactor string `json:"to"`
Event string `json:"event"`
}
var req FaultRequest
if err := json.Unmarshal(msg.Message, &req); err != nil {
panic(err)
}
switch req.Event {
case "restart":
el.Topology.Insert(req.Reactor, el.BuildReactor(req.Reactor))
default:
fmt.Printf("Unhandled fault type %s\n", req.Event)
panic("Unhandled fault type")
}
bs, err := json.Marshal(struct {
Events []lib.Event `json:"events"`
CorrelationId CorrelationId `json:"corrId"`
}{[]lib.Event{}, env.CorrelationId})
if err != nil {
panic(err)
}
returnMessage = bs

default:
fmt.Printf("Unknown message type: %#v\n", msg.Kind)
panic("Unknown message type")
Expand Down
4 changes: 3 additions & 1 deletion src/runtime-prototype/src/Scheduler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,11 @@ fakeScheduler executorRef (ClientRequest' "LoadTest" [SInt tid, SInt rid] cid) =
on f (\(IOResultR (IORows rs)) -> case parseRows rs of
Nothing -> clientResponse cid (InternalMessage "parse error")
Just [fs@Faults{}] -> do
let (fState, fAgenda) = newFaultState fs
modify $ \s ->
s { faultState = newFaultState fs
s { faultState = fState
, runId = Just rid
, agenda = agenda s <> fAgenda
})
-- clientResponse cid (InternalMessage (show fs))) -- hmm should we just do one response?
return (InternalMessage "ok")
Expand Down
3 changes: 3 additions & 0 deletions src/runtime-prototype/src/Scheduler/Agenda.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
module Scheduler.Agenda where

import Data.Heap (Entry(Entry), Heap)
Expand All @@ -10,6 +12,7 @@ import StuntDouble.Time
------------------------------------------------------------------------

newtype Agenda = Agenda (Heap (Entry Time SchedulerEvent))
deriving newtype (Semigroup, Monoid)

empty :: Agenda
empty = Agenda Heap.empty
Expand Down
27 changes: 22 additions & 5 deletions src/runtime-prototype/src/Scheduler/Fault.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
module Scheduler.Fault where

import qualified Data.Aeson as Aeson
import Data.List (foldl')
import Data.Map (Map)
import qualified Data.Map as Map
Expand All @@ -10,6 +11,8 @@ import qualified Data.Set as Set
import Data.Maybe (fromMaybe)
import qualified Data.Time as Time

import qualified Scheduler.Agenda as Agenda
import Scheduler.Agenda (Agenda)
import Scheduler.Event
import qualified Scheduler.Faults as Faults
import StuntDouble.LogicalTime
Expand Down Expand Up @@ -72,22 +75,36 @@ instance Semigroup FaultState where
instance Monoid FaultState where
mempty = FaultState mempty

newFaultState :: Faults.Faults -> FaultState
newFaultState :: Faults.Faults -> (FaultState, Agenda)
newFaultState = foldMap mkFaultState . Faults.faults
where
mkFaultState :: Faults.Fault -> FaultState
mkFaultState f = FaultState $ uncurry Map.singleton (translate f)
mkFaultState :: Faults.Fault -> (FaultState, Agenda)
mkFaultState f = (FaultState . fromMaybe mempty $ uncurry Map.singleton <$> (translate f)
, agendaItems f)

nodeName = NodeName "scheduler"
(!->) = (,)
translate :: Faults.Fault -> (ActorName, FaultStateForActor)
k !-> v = Just (k,v)

translate :: Faults.Fault -> Maybe (ActorName, FaultStateForActor)
translate (Faults.Omission _f t a) = t !-> mempty { fsOmissions = Set.singleton a}
translate (Faults.Crash f a) = f !-> mempty { fsPermanentCrash = Just $ LogicalTime nodeName{-?-} a}
translate (Faults.Pause n f t) = n !-> mempty { fsPause = singleton (TimeInterval f t)}
translate (Faults.Partition n c f t) = n !-> mempty { fsPartition = Map.fromList $ zip c (repeat ti) }
where ti = singleton $ TimeInterval f t
translate (Faults.ClockSkewBump n d f t) = n !-> mempty { fsClockSkew = ClockSkew [(TimeInterval f t, CSABump d)]}
translate (Faults.ClockSkewStrobe n d p f t) = n !-> mempty { fsClockSkew = ClockSkew [(TimeInterval f t, CSAStrobe d p)]}
translate Faults.RestartReactor{} = Nothing

agendaItems :: Faults.Fault -> Agenda
agendaItems Faults.Omission{} = mempty
agendaItems Faults.Crash{} = mempty
agendaItems Faults.Pause{} = mempty
agendaItems Faults.Partition{} = mempty
agendaItems Faults.ClockSkewBump{} = mempty
agendaItems Faults.ClockSkewStrobe{} = mempty
agendaItems (Faults.RestartReactor n t) = Agenda.push (t, ev) mempty
where
ev = SchedulerEvent {kind = "fault", event = "restart", args = Aeson.object [], from = "god", to = n, at = t, meta = Nothing}

------------------------------------------------------------------------
afterLogicalTime :: LogicalTime -> LogicalTime -> Bool
Expand Down
4 changes: 4 additions & 0 deletions src/runtime-prototype/src/Scheduler/Faults.hs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ data Fault
, fromTime :: Time
, toTime :: Time
}
| RestartReactor
{ node :: String
, fromTime :: Time
}
deriving (Generic, Show)

customOptions :: Options
Expand Down
20 changes: 11 additions & 9 deletions src/sut/register/executor/executorcmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,23 @@ import (
"github.com/symbiont-io/detsys-testkit/src/sut/register"
)

func main() {
func constructor(name string) lib.Reactor {
switch name {
case "frontend":
return sut.NewFrontEnd4()
default:
return sut.NewRegister()
}
}

func main() {
fmt.Printf("Starting up executor\n")
adminI := executorEL.NewAdmin("/tmp/executor-admin.sock")
fmt.Printf("Created admin\n")
commandT := executorEL.NewCommandTransport("/tmp/executor.sock")
fmt.Printf("Created command transport\n")
topology := func() lib.Topology {
return lib.NewTopology(
lib.Item{"frontend", sut.NewFrontEnd4()},
lib.Item{"register1", sut.NewRegister()},
lib.Item{"register2", sut.NewRegister()},
)
}
exe := executorEL.NewExecutor(topology, sut.NewMarshaler())
peers := []string{"frontend", "register1", "register2"}
exe := executorEL.NewExecutor(peers, constructor, sut.NewMarshaler())
el := executorEL.NewEventLoop(adminI, commandT, exe)

el.Run()
Expand Down