Skip to content

Commit

Permalink
Inbox stop (#50)
Browse files Browse the repository at this point in the history
* added process status stopped

* try recover when panic on process start

* add condition in for loop

* Atomically check inbox stopped status + test

---------

Co-authored-by: Terri Prifti <[email protected]>
  • Loading branch information
anthdm and tprifti authored Aug 30, 2023
1 parent b68a4c4 commit bd6a765
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 16 deletions.
46 changes: 35 additions & 11 deletions actor/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,33 +48,28 @@ func TestSendRepeat(t *testing.T) {
repeater.Stop()
}

func TestRestarts(t *testing.T) {
func TestRestartsMaxRestarts(t *testing.T) {
e := NewEngine()
wg := sync.WaitGroup{}
restarts := 2
type payload struct {
data int
}

wg.Add(1)
pid := e.SpawnFunc(func(c *Context) {
switch msg := c.Message().(type) {
case Started:
case Stopped:
fmt.Println("stopped!")
case payload:
if msg.data != 10 {
panic("I failed to process this message")
} else {
fmt.Println("finally processed all my messsages after borking.", msg.data)
wg.Done()
}
}
}, "foo", WithRestartDelay(time.Millisecond*10))
}, "foo", WithMaxRestarts(restarts))

e.Send(pid, payload{1})
e.Send(pid, payload{2})
e.Send(pid, payload{10})
wg.Wait()
for i := 0; i < 11; i++ {
e.Send(pid, payload{i})
}
}

func TestProcessInitStartOrder(t *testing.T) {
Expand Down Expand Up @@ -103,6 +98,35 @@ func TestProcessInitStartOrder(t *testing.T) {
wg.Wait()
}

func TestRestarts(t *testing.T) {
e := NewEngine()
wg := sync.WaitGroup{}
type payload struct {
data int
}

wg.Add(1)
pid := e.SpawnFunc(func(c *Context) {
switch msg := c.Message().(type) {
case Started:
case Stopped:
fmt.Println("stopped!")
case payload:
if msg.data != 10 {
panic("I failed to process this message")
} else {
fmt.Println("finally processed all my messsages after borking.", msg.data)
wg.Done()
}
}
}, "foo", WithRestartDelay(time.Millisecond*10))

e.Send(pid, payload{1})
e.Send(pid, payload{2})
e.Send(pid, payload{10})
wg.Wait()
}

func TestSendWithSender(t *testing.T) {
var (
e = NewEngine()
Expand Down
4 changes: 3 additions & 1 deletion actor/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const defaultThroughput = 300
const (
idle int32 = iota
running
stopped
)

type Scheduler interface {
Expand Down Expand Up @@ -71,7 +72,7 @@ func (in *Inbox) process() {

func (in *Inbox) run() {
i, t := 0, in.scheduler.Throughput()
for {
for atomic.LoadInt32(&in.procStatus) != stopped {
if i > t {
i = 0
runtime.Gosched()
Expand All @@ -91,5 +92,6 @@ func (in *Inbox) Start(proc Processer) {
}

func (in *Inbox) Stop() error {
atomic.StoreInt32(&in.procStatus, stopped)
return nil
}
14 changes: 10 additions & 4 deletions actor/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@ func (p *process) Invoke(msgs []Envelope) {
for i := 0; i < nmsg-nproc; i++ {
p.mbuffer[i] = msgs[i+nproc]
}
if p.Opts.MaxRestarts > 0 {
p.tryRestart(v)
}
p.tryRestart(v)
}
}()
for i := 0; i < len(msgs); i++ {
Expand All @@ -99,6 +97,13 @@ func (p *process) Invoke(msgs []Envelope) {
func (p *process) Start() {
recv := p.Producer()
p.context.receiver = recv
defer func() {
if v := recover(); v != nil {
p.context.message = Stopped{}
p.context.receiver.Receive(p.context)
p.tryRestart(v)
}
}()
p.context.message = Initialized{}
applyMiddleware(recv.Receive, p.Opts.Middleware...)(p.context)

Expand All @@ -118,7 +123,6 @@ func (p *process) Start() {
}

func (p *process) tryRestart(v any) {
p.restarts++
// InternalError does not take the maximum restarts into account.
// For now, InternalError is getting triggered when we are dialing
// a remote node. By doing this, we can keep dialing until it comes
Expand All @@ -144,6 +148,8 @@ func (p *process) tryRestart(v any) {
p.cleanup(nil)
return
}

p.restarts++
// Restart the process after its restartDelay
log.Errorw("[PROCESS] actor restarting", log.M{
"n": p.restarts,
Expand Down

0 comments on commit bd6a765

Please sign in to comment.