Skip to content

Commit

Permalink
Deadletter as an actor (#64)
Browse files Browse the repository at this point in the history
* randomize listen port. makes the tests pass on macos.

* wip

* make deadletter an actor.

* initialization code ok. tests in place.

* delete some commented out code.

* we stop storing the deadletters. just log them if there is a logger. the user can supply their own dead letter handling if they need to.

* since I've change the semantics of the internal registry/get method these two tests needed to change.

* registry/get now returns nil instead of deadletter.

* a tiny tweak that is more readable.

* make a wrapper around bytes.Buffer to suppress the race detector in the tests.

* just some docs.
  • Loading branch information
perbu authored Nov 30, 2023
1 parent 5913ef6 commit af84252
Show file tree
Hide file tree
Showing 13 changed files with 231 additions and 55 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ After configuring the Engine with a custom PID Separator the string representati
// 127.0.0.1:3000->foo->bar->baz->1
```

You can provide your own actor to do deadletter handling. This is useful if you want to forward deadletters to a
monitoring service or log them somewhere. The default deadletter handler will, if you have enabled logging,
log the deadletter to the logs, using WARN as the log level. For details on how to set up a custom deadletter handler,
please see the `actor/deadletter_test.go` file, where a custom deadletter handler is set up for testing purposes.

Note that you can also provide a custom logger to the engine. See the Logging section for more details.

## Custom middleware
Expand Down
4 changes: 2 additions & 2 deletions actor/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,6 @@ func TestSpawnChild(t *testing.T) {
e.Poison(pid, stopwg)
stopwg.Wait()

assert.Equal(t, e.deadLetter, e.Registry.get(NewPID("local", "child")))
assert.Equal(t, e.deadLetter, e.Registry.get(pid))
assert.Nil(t, e.Registry.get(NewPID("local", "child")))
assert.Nil(t, e.Registry.get(pid))
}
55 changes: 27 additions & 28 deletions actor/deadletter.go
Original file line number Diff line number Diff line change
@@ -1,43 +1,42 @@
package actor

import (
"reflect"
"sync"

"github.com/anthdm/hollywood/log"
"reflect"
)

// TODO: The deadLetter is implemented as a plain Processer, but
// can actually be implemented as a Receiver. This is a good first issue.
//

type deadLetter struct {
eventStream *EventStream
pid *PID
logger log.Logger
logger log.Logger
pid *PID
}

func newDeadLetter(eventStream *EventStream) *deadLetter {
func newDeadLetter() Receiver {
pid := NewPID(LocalLookupAddr, "deadLetter")
return &deadLetter{
eventStream: eventStream,
pid: NewPID(LocalLookupAddr, "deadLetter"),
logger: eventStream.logger.SubLogger("[deadLetter]"),
pid: pid,
}
}

func (d *deadLetter) Send(dest *PID, msg any, sender *PID) {
d.logger.Warnw("Send",
"dest", dest,
"msg", reflect.TypeOf(msg),
"sender", sender,
)
d.eventStream.Publish(&DeadLetterEvent{
Target: dest,
Message: msg,
Sender: sender,
})
// Receive implements the Receiver interface, handling the deadletter messages.
// It will log the deadletter message if a logger is set. If not, it will silently
// ignore the message. Any production system should either have a logger set or provide a custom
// deadletter actor.
func (d *deadLetter) Receive(ctx *Context) {
switch msg := ctx.Message().(type) {
case Started:
// intialize logger on deadletter startup. is this a sane approach? I'm not sure how the get to the logger otherwise.
d.logger = ctx.Engine().logger.SubLogger("[deadletter]")
d.logger.Debugw("default deadletter actor started")
case Stopped:
d.logger.Debugw("default deadletter actor stopped")
case Initialized:
d.logger.Debugw("default deadletter actor initialized")
case *DeadLetterEvent:
d.logger.Warnw("deadletter arrived", "msg-type", reflect.TypeOf(msg),
"sender", msg.Sender, "target", msg.Target, "msg", msg.Message)
default:
d.logger.Errorw("unknown message arrived", "msg", msg)
}
}

func (d *deadLetter) PID() *PID { return d.pid }
func (d *deadLetter) Shutdown(_ *sync.WaitGroup) {}
func (d *deadLetter) Start() {}
func (d *deadLetter) Invoke([]Envelope) {}
127 changes: 127 additions & 0 deletions actor/deadletter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package actor

import (
"bytes"
"fmt"
"github.com/anthdm/hollywood/log"
"github.com/stretchr/testify/assert"
"log/slog"
"os"
"sync"
"testing"
"time"
)

// TestDeadLetterDefault tests the default deadletter handling.
// It will spawn a new actor, kill it, send a message to it and then check if the deadletter
// received the message.
func TestDeadLetterDefault(t *testing.T) {
logBuffer := SafeBuffer{}
lh := log.NewHandler(&logBuffer, log.TextFormat, slog.LevelDebug)
e := NewEngine(EngineOptLogger(log.NewLogger("[engine]", lh)))
a1 := e.Spawn(newTestActor, "a1")
assert.NotNil(t, a1)
dl := e.Registry.getByID("deadletter")
assert.NotNil(t, dl) // should be registered by default
e.Poison(a1).Wait() // poison the a1 actor
e.Send(a1, testMessage{"bar"}) // should end up the deadletter queue
time.Sleep(time.Millisecond) // a flush would be nice here

// check the log buffer for the deadletter
assert.Contains(t, logBuffer.String(), "deadletter arrived")

}

// TestDeadLetterCustom tests the custom deadletter handling.
// It will spawn a new actor, kill it, send a message to it and then check if the deadletter
// received the message.
// It is using the custom deadletter receiver below.
func TestDeadLetterCustom(t *testing.T) {
lh := log.NewHandler(os.Stdout, log.TextFormat, slog.LevelDebug)
e := NewEngine(
EngineOptLogger(log.NewLogger("[engine]", lh)),
EngineOptDeadletter(newCustomDeadLetter))
a1 := e.Spawn(newTestActor, "a1")
assert.NotNil(t, a1)
dl := e.Registry.getByID("deadletter")
assert.NotNil(t, dl) // should be registered by default
// kill a1 actor.
e.Poison(a1).Wait() // poison the a1 actor
// should be in deadletter
fmt.Println("==== sending message via a1 to deadletter ====")
e.Send(a1, testMessage{"bar"})
time.Sleep(time.Millisecond) // a flush would be nice here :-)
resp, err := e.Request(dl.PID(), &customDeadLetterFetch{flush: true}, time.Millisecond*10).Result()
assert.Nil(t, err) // no error from the request
assert.NotNil(t, resp) // we should get a response to our request
respDeadLetters, ok := resp.([]*DeadLetterEvent)
assert.True(t, ok) // got a slice of deadletter events
assert.Equal(t, 1, len(respDeadLetters)) // one deadletter event
ev, ok := respDeadLetters[0].Message.(testMessage)
assert.True(t, ok) // should be our test message
assert.Equal(t, "bar", ev.data)
}

type testActor struct{}
type testMessage struct {
data string
}

func newTestActor() Receiver {
return testActor{}
}
func (t testActor) Receive(_ *Context) {
// do nothing
}

type customDeadLetterFetch struct{ flush bool }

// customDeadLetter is a custom deadletter actor / receiver
type customDeadLetter struct {
deadLetters []*DeadLetterEvent
}

func newCustomDeadLetter() Receiver {
return &customDeadLetter{
deadLetters: make([]*DeadLetterEvent, 0),
}
}

// Receive implements the Receiver interface. This is a OK example of an actor that
// that deals with deadletters. It will store the deadletters in a slice.
func (c *customDeadLetter) Receive(ctx *Context) {
switch ctx.Message().(type) {
case *customDeadLetterFetch:
ctx.Respond(c.deadLetters)
if ctx.Message().(*customDeadLetterFetch).flush {
c.deadLetters = make([]*DeadLetterEvent, 0)
}
case *DeadLetterEvent:
slog.Warn("received deadletter event")
msg, ok := ctx.Message().(*DeadLetterEvent)
if !ok {
slog.Error("failed to cast deadletter event")
return
}
c.deadLetters = append(c.deadLetters, msg)
}
}

type SafeBuffer struct {
buf bytes.Buffer
mu sync.Mutex
}

func (sb *SafeBuffer) Write(p []byte) (n int, err error) {
sb.mu.Lock()
defer sb.mu.Unlock()
return sb.buf.Write(p)
}

func (sb *SafeBuffer) String() string {
sb.mu.Lock()
defer sb.mu.Unlock()
return sb.buf.String()
}

// Usage in goroutines...
48 changes: 40 additions & 8 deletions actor/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,35 @@ type Engine struct {

address string
remote Remoter
deadLetter Processer
deadLetter *PID
logger log.Logger
}

// NewEngine returns a new actor Engine.
// You can pass an optional logger through
func NewEngine(opts ...func(*Engine)) *Engine {
e := &Engine{}
e.address = LocalLookupAddr
e.Registry = newRegistry(e) // need to init the registry in case we want a custom deadletter
e.EventStream = NewEventStream() //
for _, o := range opts {
o(e)
}
e.EventStream = NewEventStream(e.logger)
e.address = LocalLookupAddr
e.Registry = newRegistry(e)
e.deadLetter = newDeadLetter(e.EventStream)
e.Registry.add(e.deadLetter)

// if no deadletter is registered, we will register the default deadletter from deadletter.go
if e.deadLetter == nil {
e.logger.Debugw("no deadletter receiver set, registering default")
e.deadLetter = e.Spawn(newDeadLetter, "deadletter")
}
return e
}

func EngineOptLogger(logger log.Logger) func(*Engine) {
return func(e *Engine) {
e.logger = logger
// This is a bit hacky, but we need to set the logger for the eventstream
// which cannot be set in the constructor since the logger is not set yet.
e.EventStream.logger = logger.SubLogger("[eventStream]")
}
}

Expand All @@ -60,6 +67,12 @@ func EngineOptPidSeparator(sep string) func(*Engine) {
}
}

func EngineOptDeadletter(d Producer) func(*Engine) {
return func(e *Engine) {
e.deadLetter = e.Spawn(d, "deadletter")
}
}

// WithRemote returns a new actor Engine with the given Remoter,
// and will call its Start function
func (e *Engine) WithRemote(r Remoter) {
Expand Down Expand Up @@ -205,6 +218,15 @@ func (e *Engine) sendPoisonPill(pid *PID, graceful bool, wg ...*sync.WaitGroup)
}
_wg.Add(1)
proc := e.Registry.get(pid)
// deadletter - if we didn't find a process, we will send a deadletter message
if proc == nil {
e.Send(e.deadLetter, &DeadLetterEvent{
Target: pid,
Message: poisonPill{_wg, graceful},
Sender: nil,
})
return _wg
}
pill := poisonPill{
wg: _wg,
graceful: graceful,
Expand All @@ -215,11 +237,21 @@ func (e *Engine) sendPoisonPill(pid *PID, graceful bool, wg ...*sync.WaitGroup)
return _wg
}

// SendLocal will send the given message to the given PID. If the recipient is not found in the
// registry, the message will be sent to the DeadLetter process instead. If there is no deadletter
// process registered, the function will panic.
func (e *Engine) SendLocal(pid *PID, msg any, sender *PID) {
proc := e.Registry.get(pid)
if proc != nil {
proc.Send(pid, msg, sender)
if proc == nil {
// send a deadletter message
e.Send(e.deadLetter, &DeadLetterEvent{
Target: pid,
Message: msg,
Sender: sender,
})
return
}
proc.Send(pid, msg, sender)
}

func (e *Engine) isLocalMessage(pid *PID) bool {
Expand Down
15 changes: 8 additions & 7 deletions actor/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ func TestStop(t *testing.T) {
e.Stop(pid, stopwg)
stopwg.Wait()
// When a process is poisoned it should be removed from the registry.
// Hence, we should get the dead letter process here.
assert.Equal(t, e.deadLetter, e.Registry.get(pid))
// Hence, we should get nil when looking it up in the registry.
assert.Nil(t, e.Registry.get(pid))
}
}

Expand Down Expand Up @@ -277,8 +277,9 @@ func TestPoison(t *testing.T) {
e.Poison(pid, stopwg)
stopwg.Wait()
// When a process is poisoned it should be removed from the registry.
// Hence, we should get the dead letter process here.
assert.Equal(t, e.deadLetter, e.Registry.get(pid))
// Hence, we should get NIL when we try to get it.
assert.Nil(t, e.Registry.get(pid))

}
}

Expand All @@ -294,10 +295,10 @@ func TestRequestResponse(t *testing.T) {
res, err := resp.Result()
assert.Nil(t, err)
assert.Equal(t, "bar", res)
// Response PID should be the dead letter PID. This is because
// Response PID should be nil here. This is because
// the actual response process that will handle this RPC
// is deregistered. Test that its actually cleaned up.
assert.Equal(t, e.deadLetter, e.Registry.get(resp.pid))
// is deregistered. Test that it is actually cleaned up.
assert.Nil(t, e.Registry.get(resp.pid))
}

// 56 ns/op
Expand Down
5 changes: 2 additions & 3 deletions actor/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ type EventStream struct {
logger log.Logger
}

func NewEventStream(l log.Logger) *EventStream {
func NewEventStream() *EventStream {
return &EventStream{
subs: make(map[*EventSub]EventStreamFunc),
logger: l.SubLogger("[eventStream]"),
subs: make(map[*EventSub]EventStreamFunc),
}
}

Expand Down
4 changes: 3 additions & 1 deletion actor/opts.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package actor

import "time"
import (
"time"
)

const (
defaultInboxSize = 1024
Expand Down
2 changes: 1 addition & 1 deletion actor/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (pid *PID) Child(id string, tags ...string) *PID {
}

func (pid *PID) HasTag(tag string) bool {
panic("TODO")
return strings.Contains(pid.ID, pidSeparator+tag+pidSeparator)
}

func (pid *PID) LookupKey() uint64 {
Expand Down
2 changes: 1 addition & 1 deletion actor/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (p *process) Start() {
p.context.message = Started{}
applyMiddleware(recv.Receive, p.Opts.Middleware...)(p.context)
p.context.engine.EventStream.Publish(&ActivationEvent{PID: p.pid})
p.logger.Debugw("started", "pid", p.pid)
p.logger.Debugw("actor started", "pid", p.pid)
// If we have messages in our buffer, invoke them.
if len(p.mbuffer) > 0 {
p.Invoke(p.mbuffer)
Expand Down
Loading

0 comments on commit af84252

Please sign in to comment.