From bd6a76531ef128a15604caa81c328e665ec85af6 Mon Sep 17 00:00:00 2001 From: Anthony De Meulemeester Date: Wed, 30 Aug 2023 11:23:06 +0200 Subject: [PATCH] Inbox stop (#50) * added process status stopped * try recover when panic on process start * add condition in for loop * Atomically check inbox stopped status + test --------- Co-authored-by: Terri Prifti --- actor/engine_test.go | 46 +++++++++++++++++++++++++++++++++----------- actor/inbox.go | 4 +++- actor/process.go | 14 ++++++++++---- 3 files changed, 48 insertions(+), 16 deletions(-) diff --git a/actor/engine_test.go b/actor/engine_test.go index 5d27a8c..d95e8e9 100644 --- a/actor/engine_test.go +++ b/actor/engine_test.go @@ -48,33 +48,28 @@ func TestSendRepeat(t *testing.T) { repeater.Stop() } -func TestRestarts(t *testing.T) { +func TestRestartsMaxRestarts(t *testing.T) { e := NewEngine() - wg := sync.WaitGroup{} + restarts := 2 type payload struct { data int } - - wg.Add(1) pid := e.SpawnFunc(func(c *Context) { switch msg := c.Message().(type) { case Started: case Stopped: - fmt.Println("stopped!") case payload: if msg.data != 10 { panic("I failed to process this message") } else { fmt.Println("finally processed all my messsages after borking.", msg.data) - wg.Done() } } - }, "foo", WithRestartDelay(time.Millisecond*10)) + }, "foo", WithMaxRestarts(restarts)) - e.Send(pid, payload{1}) - e.Send(pid, payload{2}) - e.Send(pid, payload{10}) - wg.Wait() + for i := 0; i < 11; i++ { + e.Send(pid, payload{i}) + } } func TestProcessInitStartOrder(t *testing.T) { @@ -103,6 +98,35 @@ func TestProcessInitStartOrder(t *testing.T) { wg.Wait() } +func TestRestarts(t *testing.T) { + e := NewEngine() + wg := sync.WaitGroup{} + type payload struct { + data int + } + + wg.Add(1) + pid := e.SpawnFunc(func(c *Context) { + switch msg := c.Message().(type) { + case Started: + case Stopped: + fmt.Println("stopped!") + case payload: + if msg.data != 10 { + panic("I failed to process this message") + } else { + fmt.Println("finally processed all my messsages after borking.", msg.data) + wg.Done() + } + } + }, "foo", WithRestartDelay(time.Millisecond*10)) + + e.Send(pid, payload{1}) + e.Send(pid, payload{2}) + e.Send(pid, payload{10}) + wg.Wait() +} + func TestSendWithSender(t *testing.T) { var ( e = NewEngine() diff --git a/actor/inbox.go b/actor/inbox.go index 19f581c..5728fb2 100644 --- a/actor/inbox.go +++ b/actor/inbox.go @@ -12,6 +12,7 @@ const defaultThroughput = 300 const ( idle int32 = iota running + stopped ) type Scheduler interface { @@ -71,7 +72,7 @@ func (in *Inbox) process() { func (in *Inbox) run() { i, t := 0, in.scheduler.Throughput() - for { + for atomic.LoadInt32(&in.procStatus) != stopped { if i > t { i = 0 runtime.Gosched() @@ -91,5 +92,6 @@ func (in *Inbox) Start(proc Processer) { } func (in *Inbox) Stop() error { + atomic.StoreInt32(&in.procStatus, stopped) return nil } diff --git a/actor/process.go b/actor/process.go index a8bd462..2732308 100644 --- a/actor/process.go +++ b/actor/process.go @@ -73,9 +73,7 @@ func (p *process) Invoke(msgs []Envelope) { for i := 0; i < nmsg-nproc; i++ { p.mbuffer[i] = msgs[i+nproc] } - if p.Opts.MaxRestarts > 0 { - p.tryRestart(v) - } + p.tryRestart(v) } }() for i := 0; i < len(msgs); i++ { @@ -99,6 +97,13 @@ func (p *process) Invoke(msgs []Envelope) { func (p *process) Start() { recv := p.Producer() p.context.receiver = recv + defer func() { + if v := recover(); v != nil { + p.context.message = Stopped{} + p.context.receiver.Receive(p.context) + p.tryRestart(v) + } + }() p.context.message = Initialized{} applyMiddleware(recv.Receive, p.Opts.Middleware...)(p.context) @@ -118,7 +123,6 @@ func (p *process) Start() { } func (p *process) tryRestart(v any) { - p.restarts++ // InternalError does not take the maximum restarts into account. // For now, InternalError is getting triggered when we are dialing // a remote node. By doing this, we can keep dialing until it comes @@ -144,6 +148,8 @@ func (p *process) tryRestart(v any) { p.cleanup(nil) return } + + p.restarts++ // Restart the process after its restartDelay log.Errorw("[PROCESS] actor restarting", log.M{ "n": p.restarts,