Skip to content

Commit

Permalink
WIP - Graceful handling all messages when receiving poison pill (#69)
Browse files Browse the repository at this point in the history
* batched invoke, hence big performance increase

* WIP processing all messages that are still in the queue

* added Stop API - a non graceful poisonPill

* Added Stop tests
  • Loading branch information
anthdm authored Nov 30, 2023
1 parent 57f3f28 commit 5913ef6
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 15 deletions.
23 changes: 19 additions & 4 deletions actor/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,21 @@ func (e *Engine) SendRepeat(pid *PID, msg any, interval time.Duration) SendRepea
return sr
}

// Poison will send a poisonPill to the process that is associated with the given PID.
// The process will shut down once it processed all its messages before the poisonPill
// was received. If given a WaitGroup, you can wait till the process is completely shutdown.
// Stop will send a non-graceful poisonPill message to the process that is associated with the given PID.
// The process will shut down immediately, once it has processed the poisonPill messsage.
// If given a WaitGroup, it blocks till the process is completely shutdown.
func (e *Engine) Stop(pid *PID, wg ...*sync.WaitGroup) *sync.WaitGroup {
return e.sendPoisonPill(pid, false, wg...)
}

// Poison will send a graceful poisonPill message to the process that is associated with the given PID.
// The process will shut down gracefully once it has processed all the messages in the inbox.
// If given a WaitGroup, it blocks till the process is completely shutdown.
func (e *Engine) Poison(pid *PID, wg ...*sync.WaitGroup) *sync.WaitGroup {
return e.sendPoisonPill(pid, true, wg...)
}

func (e *Engine) sendPoisonPill(pid *PID, graceful bool, wg ...*sync.WaitGroup) *sync.WaitGroup {
var _wg *sync.WaitGroup
if len(wg) > 0 {
_wg = wg[0]
Expand All @@ -194,8 +205,12 @@ func (e *Engine) Poison(pid *PID, wg ...*sync.WaitGroup) *sync.WaitGroup {
}
_wg.Add(1)
proc := e.Registry.get(pid)
pill := poisonPill{
wg: _wg,
graceful: graceful,
}
if proc != nil {
e.SendLocal(pid, poisonPill{_wg}, nil)
e.SendLocal(pid, pill, nil)
}
return _wg
}
Expand Down
50 changes: 50 additions & 0 deletions actor/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,56 @@ func TestSpawn(t *testing.T) {
wg.Wait()
}

func TestStopWaitGroup(t *testing.T) {
var (
e = NewEngine()
wg = sync.WaitGroup{}
x = int32(0)
)
wg.Add(1)

pid := e.SpawnFunc(func(c *Context) {
switch c.Message().(type) {
case Started:
wg.Done()
case Stopped:
atomic.AddInt32(&x, 1)
}
}, "foo")
wg.Wait()

pwg := &sync.WaitGroup{}
e.Stop(pid, pwg)
pwg.Wait()
assert.Equal(t, int32(1), atomic.LoadInt32(&x))
}

func TestStop(t *testing.T) {
var (
e = NewEngine()
wg = sync.WaitGroup{}
)
for i := 0; i < 4; i++ {
wg.Add(1)
tag := strconv.Itoa(i)
pid := e.SpawnFunc(func(c *Context) {
switch c.Message().(type) {
case Started:
wg.Done()
case Stopped:
}
}, "foo", WithTags(tag))

wg.Wait()
stopwg := &sync.WaitGroup{}
e.Stop(pid, stopwg)
stopwg.Wait()
// When a process is poisoned it should be removed from the registry.
// Hence, we should get the dead letter process here.
assert.Equal(t, e.deadLetter, e.Registry.get(pid))
}
}

func TestPoisonWaitGroup(t *testing.T) {
var (
e = NewEngine()
Expand Down
5 changes: 3 additions & 2 deletions actor/inbox.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package actor

import (
"math"
"runtime"
"sync/atomic"

Expand Down Expand Up @@ -79,8 +80,8 @@ func (in *Inbox) run() {
}
i++

if msg, ok := in.rb.Pop(); ok {
in.proc.Invoke([]Envelope{msg})
if msgs, ok := in.rb.PopN(math.MaxInt); ok && len(msgs) > 0 {
in.proc.Invoke(msgs)
} else {
return
}
Expand Down
33 changes: 25 additions & 8 deletions actor/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func (p *process) Invoke(msgs []Envelope) {
nmsg = len(msgs)
// numbers of msgs that are processed.
nproc = 0
// FIXME: We could use nrpoc here, but for some reason placing nproc++ on the
// bottom of the function it freezes some tests. Hence, I created a new counter
// for bookkeeping.
processed = 0
)
defer func() {
// If we recovered, we buffer up all the messages that we could not process
Expand All @@ -82,17 +86,30 @@ func (p *process) Invoke(msgs []Envelope) {
nproc++
msg := msgs[i]
if pill, ok := msg.Msg.(poisonPill); ok {
// If we need to gracefuly stop, we process all the messages
// from the inbox, otherwise we ignore and cleanup.
if pill.graceful {
msgsToProcess := msgs[processed:]
for _, m := range msgsToProcess {
p.invokeMsg(m)
}
}
p.cleanup(pill.wg)
return
}
p.context.message = msg.Msg
p.context.sender = msg.Sender
recv := p.context.receiver
if len(p.Opts.Middleware) > 0 {
applyMiddleware(recv.Receive, p.Opts.Middleware...)(p.context)
} else {
recv.Receive(p.context)
}
p.invokeMsg(msg)
processed++
}
}

func (p *process) invokeMsg(msg Envelope) {
p.context.message = msg.Msg
p.context.sender = msg.Sender
recv := p.context.receiver
if len(p.Opts.Middleware) > 0 {
applyMiddleware(recv.Receive, p.Opts.Middleware...)(p.context)
} else {
recv.Receive(p.context)
}
}

Expand Down
3 changes: 2 additions & 1 deletion actor/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ type InternalError struct {
}

type poisonPill struct {
wg *sync.WaitGroup
wg *sync.WaitGroup
graceful bool
}
type Initialized struct{}
type Started struct{}
Expand Down

0 comments on commit 5913ef6

Please sign in to comment.