Skip to content

Commit

Permalink
cleanup the deadletter tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
perbu committed Dec 6, 2023
1 parent c5433f4 commit c87a2d1
Showing 1 changed file with 23 additions and 108 deletions.
131 changes: 23 additions & 108 deletions actor/deadletter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,11 @@ package actor

import (
"bytes"
"fmt"
"github.com/stretchr/testify/assert"
"log/slog"
"os"
"reflect"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

// TestDeadLetterDefault tests the default deadletter handling.
Expand All @@ -22,118 +18,32 @@ func TestDeadLetterDefault(t *testing.T) {
slog.SetDefault(logger)
e, err := NewEngine()
assert.NoError(t, err)
time.Sleep(10 * time.Millisecond)
a1 := e.Spawn(newTestActor, "a1")
assert.NotNil(t, a1)
e.Poison(a1).Wait() // poison the a1 actor
e.Send(a1, testMessage{"bar"}) // should end up the deadletter queue
time.Sleep(time.Millisecond * 100) // a flush would be nice here

e.Send(invalidPid(), "bar") // should end up the deadletter queue
time.Sleep(time.Millisecond) // wait for the deadletter to be processed
// check the log buffer for the deadletter
logStr := logBuffer.String()
fmt.Println(logStr)
assert.Contains(t, logStr, "deadletter arrived")

assert.Contains(t, logBuffer.String(), "deadletter arrived")
}

// TestDeadLetterCustom tests the custom deadletter handling.
// It will spawn a new actor, kill it, send a message to it and then check if the deadletter
// received the message.
// It is using the custom deadletter receiver below.
// It is using the custom deadletter receiver defined inline.
func TestDeadLetterCustom(t *testing.T) {
debuglogger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
slog.SetDefault(debuglogger)
e, err := NewEngine()
assert.NoError(t, err)
dl := e.Spawn(newCustomDeadLetter, "deadletter")
assert.NotNil(t, dl)
a1 := e.Spawn(newTestActor, "a1")
assert.NotNil(t, a1)
es := e.Registry.getByID("eventstream")
assert.NotNil(t, es)

// kill a1 actor.
e.Poison(a1).Wait() // poison the a1 actor
fmt.Println("==== sending message via a1 to deadletter ====")
e.Send(a1, testMessage{"bar"})
time.Sleep(time.Millisecond * 100) // a flush would be nice here :-)
resp, err := e.Request(dl, customDeadLetterFetch{flush: true}, time.Millisecond*10).Result()
assert.Nil(t, err) // no error from the request
assert.NotNil(t, resp) // we should get a response to our request
respDeadLetters, ok := resp.([]DeadLetterEvent)
assert.True(t, ok) // got a slice of deadletter events
// stop the tests if we don't have any deadletters
if len(respDeadLetters) != 1 {
t.Fatal("expected 1 deadletters, got", len(respDeadLetters))
}
ev, ok := respDeadLetters[0].Message.(testMessage)
assert.True(t, ok) // should be our test message
assert.Equal(t, "bar", ev.data)
}

type testActor struct{}
type testMessage struct {
data string
}

func newTestActor() Receiver {
return testActor{}
}
func (t testActor) Receive(_ *Context) {
// do nothing
}

type customDeadLetterFetch struct{ flush bool }

// customDeadLetter is a custom deadletter actor / receiver
type customDeadLetter struct {
deadLetters []DeadLetterEvent
}

func newCustomDeadLetter() Receiver {
return &customDeadLetter{
deadLetters: make([]DeadLetterEvent, 0),
}
}

// Receive implements the Receiver interface. This is a OK example of an actor that
// that deals with deadletters. It will store the deadletters in a slice.
func (c *customDeadLetter) Receive(ctx *Context) {
es := ctx.engine.Registry.getByID("eventstream")
if es == nil {
fmt.Println("custom deadletter; no eventstream found")
}
switch ctx.Message().(type) {
case Started:
slog.Debug("custom deadletter starting", "action", "subscribing")
ctx.engine.BroadcastEvent(DeadletterSub{pid: ctx.pid})
time.Sleep(time.Millisecond * 10)
case Stopped:
slog.Debug("custom deadletter stopping", "action", "unsubscribing")
ctx.engine.BroadcastEvent(DeadletterUnSub{pid: ctx.pid})
case customDeadLetterFetch:
flush := ctx.Message().(customDeadLetterFetch).flush
slog.Debug("custom deadletter; received fetch request",
"flush", flush,
"messages", len(c.deadLetters))
ctx.Respond(c.deadLetters)
if ctx.Message().(customDeadLetterFetch).flush {
c.deadLetters = c.deadLetters[:0]
}
case DeadLetterEvent:
slog.Warn("custom deadletter; received deadletter event")
msg, ok := ctx.Message().(DeadLetterEvent)
if !ok {
slog.Error("should never happen. brain damaged.")
return
wg := &sync.WaitGroup{}
wg.Add(1)
e.SpawnFunc(func(c *Context) {
switch c.Message().(type) {
case Initialized:
c.engine.BroadcastEvent(DeadletterSub{c.pid})
case DeadLetterEvent:
wg.Done()
}
c.deadLetters = append(c.deadLetters, msg)
default:
slog.Error("custom deadletter; received unknown message",
"msg", ctx.Message(), "msg-type", reflect.TypeOf(ctx.Message()))
}
}, "deadletter")
e.SendLocal(invalidPid(), "bar", nil)
wg.Wait()
}

// SafeBuffer is a threadsafe buffer, used for testing the that the deadletters are logged.
type SafeBuffer struct {
buf bytes.Buffer
mu sync.Mutex
Expand All @@ -151,4 +61,9 @@ func (sb *SafeBuffer) String() string {
return sb.buf.String()
}

// Usage in goroutines...
func invalidPid() *PID {
return &PID{
Address: "local",
ID: "squirrel",
}
}

0 comments on commit c87a2d1

Please sign in to comment.