Skip to content

Commit

Permalink
Bugfix - Stopped message processed mutliple times on shutdown (#169)
Browse files Browse the repository at this point in the history
* initial commit

* fixed bug on startup

* finally fixed bug

* improved fix

* renamed test

---------

Co-authored-by: TheTGKing <[email protected]>
  • Loading branch information
troygilman0 and troygilman0 authored Nov 20, 2024
1 parent 7ba1326 commit 9721d9c
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 1 deletion.
18 changes: 18 additions & 0 deletions actor/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,3 +463,21 @@ func NewTestProducer(t *testing.T, f TestReceiveFunc) Producer {
func (r *TestReceiver) Receive(ctx *Context) {
r.OnReceive(r.t, ctx)
}

func TestMultipleStops(t *testing.T) {
e, err := NewEngine(NewEngineConfig())
require.NoError(t, err)
for i := 0; i < 1000; i++ {
done := make(chan struct{})
pid := e.SpawnFunc(func(ctx *Context) {
switch ctx.Message().(type) {
case Stopped:
close(done)
}
}, "test")
for j := 0; j < 10; j++ {
e.Stop(pid)
}
<-done
}
}
2 changes: 1 addition & 1 deletion actor/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (in *Inbox) schedule() {

func (in *Inbox) process() {
in.run()
atomic.StoreInt32(&in.procStatus, idle)
atomic.CompareAndSwapInt32(&in.procStatus, running, idle)
}

func (in *Inbox) run() {
Expand Down
18 changes: 18 additions & 0 deletions actor/inbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package actor

import (
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestInboxSendAndProcess(t *testing.T) {
Expand Down Expand Up @@ -41,3 +44,18 @@ func (m MockProcesser) Invoke(envelopes []Envelope) {
m.processFunc(envelopes)
}
func (m MockProcesser) Shutdown(_ *sync.WaitGroup) {}

func TestInboxStop(t *testing.T) {
inbox := NewInbox(10)
done := make(chan struct{})
mockProc := MockProcesser{
processFunc: func(envelopes []Envelope) {
inbox.Stop()
done <- struct{}{}
},
}
inbox.Start(mockProc)
inbox.Send(Envelope{})
<-done
require.True(t, atomic.LoadInt32(&inbox.procStatus) == stopped)
}

0 comments on commit 9721d9c

Please sign in to comment.