diff --git a/actor/engine_test.go b/actor/engine_test.go index 4a08a26..3f61629 100644 --- a/actor/engine_test.go +++ b/actor/engine_test.go @@ -463,3 +463,21 @@ func NewTestProducer(t *testing.T, f TestReceiveFunc) Producer { func (r *TestReceiver) Receive(ctx *Context) { r.OnReceive(r.t, ctx) } + +func TestMultipleStops(t *testing.T) { + e, err := NewEngine(NewEngineConfig()) + require.NoError(t, err) + for i := 0; i < 1000; i++ { + done := make(chan struct{}) + pid := e.SpawnFunc(func(ctx *Context) { + switch ctx.Message().(type) { + case Stopped: + close(done) + } + }, "test") + for j := 0; j < 10; j++ { + e.Stop(pid) + } + <-done + } +} diff --git a/actor/inbox.go b/actor/inbox.go index 5adb607..726c905 100644 --- a/actor/inbox.go +++ b/actor/inbox.go @@ -72,7 +72,7 @@ func (in *Inbox) schedule() { func (in *Inbox) process() { in.run() - atomic.StoreInt32(&in.procStatus, idle) + atomic.CompareAndSwapInt32(&in.procStatus, running, idle) } func (in *Inbox) run() { diff --git a/actor/inbox_test.go b/actor/inbox_test.go index 680d5b5..6a5a347 100644 --- a/actor/inbox_test.go +++ b/actor/inbox_test.go @@ -2,8 +2,11 @@ package actor import ( "sync" + "sync/atomic" "testing" "time" + + "github.com/stretchr/testify/require" ) func TestInboxSendAndProcess(t *testing.T) { @@ -41,3 +44,18 @@ func (m MockProcesser) Invoke(envelopes []Envelope) { m.processFunc(envelopes) } func (m MockProcesser) Shutdown(_ *sync.WaitGroup) {} + +func TestInboxStop(t *testing.T) { + inbox := NewInbox(10) + done := make(chan struct{}) + mockProc := MockProcesser{ + processFunc: func(envelopes []Envelope) { + inbox.Stop() + done <- struct{}{} + }, + } + inbox.Start(mockProc) + inbox.Send(Envelope{}) + <-done + require.True(t, atomic.LoadInt32(&inbox.procStatus) == stopped) +}