Skip to content

Commit

Permalink
feat(runtime): Add init messages to new scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-daniel-gustafsson committed Nov 10, 2021
1 parent 34180b6 commit aca71c9
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 20 deletions.
55 changes: 39 additions & 16 deletions src/executor-event-loop/executor.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package executorEL

import (
// "encoding/json"
"encoding/json"
"fmt"
// "io/ioutil"
// "strconv"
Expand Down Expand Up @@ -41,22 +41,45 @@ func (el Executor) processEnvelope(env Envelope) Message {

msg := env.Message

// let's assume we only have client-request/internal messages
var sev lib.ScheduledEvent
bytesToDeserialise := msg.Message
fmt.Printf("About to deserialise message\n%s\n", string(bytesToDeserialise))
if err := lib.UnmarshalScheduledEvent(el.Marshaler, bytesToDeserialise, &sev); err != nil {
panic(err)
}
var returnMessage json.RawMessage
switch msg.Kind {
case "receive":
var sev lib.ScheduledEvent
bytesToDeserialise := msg.Message
fmt.Printf("About to deserialise message\n%s\n", string(bytesToDeserialise))
if err := lib.UnmarshalScheduledEvent(el.Marshaler, bytesToDeserialise, &sev); err != nil {
panic(err)
}

reactorName := sev.To // should be from env
reactor := el.Topology.Reactor(reactorName)
heapBefore := dumpHeapJson(reactor)
oevs := reactor.Receive(sev.At, sev.From, sev.Event)
heapAfter := dumpHeapJson(reactor)
/* heapDiff := */ jsonDiff(heapBefore, heapAfter)
// si := el.Update(reactorName)

reactorName := sev.To // should be from env
reactor := el.Topology.Reactor(reactorName)
heapBefore := dumpHeapJson(reactor)
oevs := reactor.Receive(sev.At, sev.From, sev.Event)
heapAfter := dumpHeapJson(reactor)
/* heapDiff := */ jsonDiff(heapBefore, heapAfter)
// si := el.Update(reactorName)
returnMessage = lib.MarshalUnscheduledEvents(reactorName, int(env.CorrelationId), oevs)
case "init":
var inits = make([]lib.Event, 0)

returnMessage := lib.MarshalUnscheduledEvents(reactorName, int(env.CorrelationId), oevs)
reactors := el.Topology.Reactors()
for _, reactor := range reactors {
inits = append(inits,
lib.OutEventsToEvents(reactor, el.Topology.Reactor(reactor).Init())...)
}

bs, err := json.Marshal(struct {
Events []lib.Event `json:"events"`
CorrelationId CorrelationId `json:"corrId"`
}{inits, env.CorrelationId})
if err != nil {
panic(err)
}
returnMessage = bs
default:
fmt.Printf("Unknown message type: %#v\n", msg.Kind)
panic("Unknown message type")
}
return Message{"what", returnMessage}
}
15 changes: 14 additions & 1 deletion src/runtime-prototype/src/Scheduler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,24 @@ fakeScheduler executorRef (ClientRequest' "Start" [] cid) =
-}
in
Actor $ do
step
firstStep step
return (InternalMessage "ok")
where
prettyEvent :: SchedulerEvent -> String
prettyEvent = LBS.unpack . encode

firstStep step = do
p <- send executorRef (InternalMessage "INIT")
-- currentLogicalTime <- Time.currentLogicalClock timeC
-- emitEvent traceC clientC testId runId dropped currentLogicalTime ae
on p $ \(InternalMessageR (InternalMessage' "Events" args)) ->
let
Just evs = sequence (map (fromSDatatype zeroTime) args)
evs' = filter (\e -> kind e /= "ok") (concat evs)
agenda' = Agenda.fromList (map (\e -> (at e, e)) evs')
in do
modify $ \s -> s { agenda = agenda s `Agenda.union` agenda' }
step
fakeScheduler _ msg = error (show msg)

-- XXX: Avoid going to string, not sure if we should use bytestring or text though?
Expand Down
10 changes: 7 additions & 3 deletions src/runtime-prototype/src/Scheduler/Executor.hs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,13 @@ toExecutorEnvelope e = ExecutorEnvelope
{ executorEnvelopeKind = envelopeKind e
, executorEnvelopeSender = envelopeSender e
-- this is silly.. going back and forth between json..
, executorEnvelopeMessage = ExecutorEnvelopeMessage "receive" $ case eitherDecode . LBS.pack . getMessage $ envelopeMessage e of
Left err -> error err
Right x -> x
, executorEnvelopeMessage =
let msg = getMessage $ envelopeMessage e in
case msg of
"INIT" -> ExecutorEnvelopeMessage "init" ""
_ -> ExecutorEnvelopeMessage "receive" $ case eitherDecode . LBS.pack $ msg of
Left err -> error err
Right x -> x
, executorEnvelopeReceiver = envelopeReceiver e
, executorEnvelopeCorrelationId = envelopeCorrelationId e
, executorEnvelopeLogicalTime = let LogicalTime _ i = envelopeLogicalTime e in i
Expand Down
3 changes: 3 additions & 0 deletions src/runtime-prototype/src/StuntDouble/Time.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ newtype Time = Time Time.UTCTime
instance FromJSON Time
instance ToJSON Time

zeroTime :: Time
zeroTime = Time $ Time.UTCTime (Time.fromOrdinalDate 1970 0) 0

addTime :: Time -> Time.NominalDiffTime -> Time
addTime (Time t) dt = Time (Time.addUTCTime dt t)

Expand Down

0 comments on commit aca71c9

Please sign in to comment.