Skip to content

Commit

Permalink
docs: clean up (re)actor api
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Aug 5, 2021
1 parent 5842803 commit 99c9b58
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 29 deletions.
66 changes: 37 additions & 29 deletions src/runtime-prototype/event-loop.org
Original file line number Diff line number Diff line change
Expand Up @@ -258,34 +258,44 @@ queue | __ __ __ | -.
#+begin_src go
func CreateEventLoop(cfg Config) EventLoop

func Spawn(r Reactor, e EventLoop) LocalRef
func Spawn(r Reactor, e EventLoop) LocalRef // `LocalRef` is basically a pointer.

func Invoke(r *Reactor, lref LocalRef, msg Message) Message
func Send(r *Reactor, rref Remoteref, msg Message) Promise
func Invoke(r *Reactor, lref LocalRef, msg Message) Message {
r.actions.append(InvokedAction{lref})
return *lref.SyncReceive(msg)
}
func Send(r *Reactor, rref Remoteref, msg Message, onSuccess Message, onFailure Message)
func LocalToRemoteRef(lref LocalRef, addr RemoteAddress) RemoteRef
func AsyncIO(r *Reactor, task IO) Promise
func SetTimer(r *Reactor, millis uint64, msg Message)
func RegisterCallback(r *Reactor, p Promise, ...) { // TODO: defunctionalised cb or not?
r.actions.append(RegCbAction{p, ..})
func AsyncIO(r *Reactor, task IO, onSuccess Message, onFailure Message)
func SetTimer(r *Reactor, millis uint64, msg Message) {
r.actions.append(SetTimerAction{millis, msg})
}

// all above functions don't actually DO anything, they merely append
// actions to the reactor like above in `RegisterCallback`.
// actions to the reactor like above in `SetTimer`.

func (r *Reactor) Receive(msg Message) {
switch msg.type {
case Foo:
reply := r.Invoke(r.refToOtherStateMachine, Message(...)) // non-blocking
p := r.Send(reply.f, ...)
r.SetTimer(3000, FooSendTimeout(p, ...))
r.RegisterCallback(p, ...) // TODO: register both a success and failure cb?!
r.Send(reply.f, r.remoteSM)
.onSuccess(FooSendSuccess{...}) // builder pattern could be nice here.
.onFailure(FooSendFailure{...})
.after(3000, FooSendTimeout(p, ...)) // to avoid a separate SetTimer call,
// still need timers unrelated to sends though.

// Success, failure and timers should perhaps have their own separate method
// instead of being part of `Receive`.
case FooSendSuccess:
// ...
case FoodSendFailure:
// ...
case FooSendTimeout:
if (retries == 3) {
r.AsyncIO(Log("gave up"))
} else {
r.retries++
p := r.Send(...)
r.SetTimer(3000, FooSendTimeout(p, ...))
r.Send(...).(...)
}
}
}
Expand All @@ -301,6 +311,12 @@ queue | __ __ __ | -.
case RemoteSend:
r := ls.lookupReactor(e.to)
r.Receive(e.message)
// If an action fails, call the failure callback associated with that promise.
// If the failure callback fails, or doesn't exist, then crash the reactor that
// created the action.

// When `InvokedAction{lref}` is handled we should recursively handle the actions
// of `lref`.
handleActions(ls, r.actions)
r.actions = nil
case ...
Expand All @@ -321,10 +337,8 @@ queue | __ __ __ | -.
// wait for signal to stop event loop
}
}

#+end_src


* Event loop threading
** Single-threaded polling
*** run each non-blocking handler in turn
Expand Down Expand Up @@ -409,20 +423,14 @@ queue | __ __ __ | -.
case ClientRequest:
// ...
case InternalMessage: // remote event loop sent message to reactor on this event loop
cb := lookupCallback(e.promise)
if (cb != nil) {
// the incoming message is a response for which we have a callback.
cb(e.message)
} else {
// the incoming message is a request
r := lookupReactor(e.receiver)
logState(r)
actions := r.receive(e.sender, e.message)
logState(r)
// actions are: send message to remote event loop, async disk I/O,
// set timer, or register callback for when any of the previous actions complete.
handleActions(ls, actions)
}
// the incoming message is a request
r := lookupReactor(e.receiver)
logState(r)
actions := r.receive(e.sender, e.message)
logState(r)
// actions are: send message to remote event loop, async disk I/O,
// or set timers.
handleActions(ls, actions)
case IOFinished:
// ...
case Timeout:
Expand Down
3 changes: 3 additions & 0 deletions src/runtime-prototype/src/StuntDouble/ActorMap.hs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ act ls as = mapM_ go as

go :: Action -> IO ()
go (SendAction from msg to p@(Promise i)) = do
-- XXX: What do we do if `transportSend` fails here? We should probably
-- call the failure handler/continuation for this promise, if it exists.
-- If it doesn't exist we probably want to crash the sender, i.e. `from`.
transportSend (lsTransport ls)
(Envelope RequestKind (localToRemoteRef (lsName ls) from) msg to (CorrelationId i))
t <- getCurrentTime (lsTime ls)
Expand Down

0 comments on commit 99c9b58

Please sign in to comment.