Skip to content

Commit

Permalink
wip; architecture in place. some weird loop is happening,
Browse files Browse the repository at this point in the history
  • Loading branch information
perbu committed Dec 6, 2023
1 parent ce25461 commit 1993ba3
Show file tree
Hide file tree
Showing 10 changed files with 247 additions and 122 deletions.
26 changes: 15 additions & 11 deletions actor/deadletter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,40 @@ import (
"reflect"
)

//
// DeadLetterEvent is delivered to the deadletter actor when a message can't be delivered to it's recipient
type DeadLetterEvent struct {
Target *PID
Message any
Sender *PID
}

type deadLetter struct {
pid *PID
}

func newDeadLetter() Receiver {
pid := NewPID(LocalLookupAddr, "deadLetter")
pid := NewPID(LocalLookupAddr, "deadletter")
return &deadLetter{
pid: pid,
}
}

// Receive implements the Receiver interface, handling the deadletter messages.
// It will log the deadletter message if a logger is set. If not, it will silently
// ignore the message. Any production system should either have a logger set or provide a custom
// deadletter actor.
// Any production system should provide a custom deadletter handler.
func (d *deadLetter) Receive(ctx *Context) {
switch msg := ctx.Message().(type) {
case Started:
// intialize logger on deadletter startup. is this a sane approach? I'm not sure how the get to the logger otherwise.
slog.Debug("default deadletter actor started")
// Subscribe to deadletters
ctx.engine.BroadcastEvent(DeadletterSub{pid: d.pid})
case Stopped:
slog.Debug("default deadletter actor stopped")
ctx.engine.BroadcastEvent(DeadletterUnSub{pid: d.pid})
case Initialized:
slog.Debug("default deadletter actor initialized")
case *DeadLetterEvent:
slog.Debug("default deadletter actor initializing")
case DeadLetterEvent:
slog.Warn("deadletter arrived", "msg-type", reflect.TypeOf(msg),
"sender", msg.Sender, "target", msg.Target, "msg", msg.Message)
default:
slog.Error("unknown message arrived at deadletter", "msg", msg)
slog.Error("unknown message arrived at deadletter", "msg", msg,
"msg-type", reflect.TypeOf(msg))
}
}
71 changes: 48 additions & 23 deletions actor/deadletter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"bytes"
"fmt"
"log/slog"
"os"
"reflect"
"sync"
"testing"
"time"
Expand All @@ -16,21 +18,23 @@ import (
// received the message.
func TestDeadLetterDefault(t *testing.T) {
logBuffer := SafeBuffer{}
lh := slog.NewTextHandler(&logBuffer, nil)
logger := slog.New(lh)
logger := slog.New(slog.NewTextHandler(&logBuffer, &slog.HandlerOptions{Level: slog.LevelDebug}))
slog.SetDefault(logger)
e, err := NewEngine()
assert.NoError(t, err)
time.Sleep(10 * time.Millisecond)
a1 := e.Spawn(newTestActor, "a1")
assert.NotNil(t, a1)
dl := e.Registry.getByID("deadletter")
assert.NotNil(t, dl) // should be registered by default
e.Poison(a1).Wait() // poison the a1 actor
e.Send(a1, testMessage{"bar"}) // should end up the deadletter queue
time.Sleep(time.Millisecond) // a flush would be nice here
assert.NotNil(t, dl) // should be registered by default
e.Poison(a1).Wait() // poison the a1 actor
e.Send(a1, testMessage{"bar"}) // should end up the deadletter queue
time.Sleep(time.Millisecond * 100) // a flush would be nice here

// check the log buffer for the deadletter
assert.Contains(t, logBuffer.String(), "deadletter arrived")
logStr := logBuffer.String()
fmt.Println(logStr)
assert.Contains(t, logStr, "deadletter arrived")

}

Expand All @@ -39,25 +43,28 @@ func TestDeadLetterDefault(t *testing.T) {
// received the message.
// It is using the custom deadletter receiver below.
func TestDeadLetterCustom(t *testing.T) {
debuglogger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
slog.SetDefault(debuglogger)
e, err := NewEngine(
EngineOptDeadletter(newCustomDeadLetter))
assert.NoError(t, err)
time.Sleep(10 * time.Millisecond)
a1 := e.Spawn(newTestActor, "a1")
assert.NotNil(t, a1)
dl := e.Registry.getByID("deadletter")
assert.NotNil(t, dl) // should be registered by default
assert.NotNil(t, dl)
es := e.Registry.getByID("eventstream")
assert.NotNil(t, es)

// kill a1 actor.
e.Poison(a1).Wait() // poison the a1 actor
// should be in deadletter
fmt.Println("==== sending message via a1 to deadletter ====")
fmt.Println(e.Registry)
fmt.Println("ID=> ", dl.PID())
e.Send(a1, testMessage{"bar"})
time.Sleep(time.Millisecond) // a flush would be nice here :-)
resp, err := e.Request(dl.PID(), &customDeadLetterFetch{flush: true}, time.Millisecond*10).Result()
time.Sleep(time.Millisecond * 100) // a flush would be nice here :-)
resp, err := e.Request(dl.PID(), customDeadLetterFetch{flush: true}, time.Millisecond*10).Result()
assert.Nil(t, err) // no error from the request
assert.NotNil(t, resp) // we should get a response to our request
respDeadLetters, ok := resp.([]*DeadLetterEvent)
respDeadLetters, ok := resp.([]DeadLetterEvent)
assert.True(t, ok) // got a slice of deadletter events
assert.Equal(t, 1, len(respDeadLetters)) // one deadletter event
ev, ok := respDeadLetters[0].Message.(testMessage)
Expand All @@ -81,32 +88,50 @@ type customDeadLetterFetch struct{ flush bool }

// customDeadLetter is a custom deadletter actor / receiver
type customDeadLetter struct {
deadLetters []*DeadLetterEvent
deadLetters []DeadLetterEvent
}

func newCustomDeadLetter() Receiver {
return &customDeadLetter{
deadLetters: make([]*DeadLetterEvent, 0),
deadLetters: make([]DeadLetterEvent, 0),
}
}

// Receive implements the Receiver interface. This is a OK example of an actor that
// that deals with deadletters. It will store the deadletters in a slice.
func (c *customDeadLetter) Receive(ctx *Context) {
es := ctx.engine.Registry.getByID("eventstream")
if es == nil {
slog.Error("custom deadletter; no eventstream found")
}
switch ctx.Message().(type) {
case *customDeadLetterFetch:
case Started:
slog.Debug("custom deadletter starting", "action", "subscribing")
ctx.engine.BroadcastEvent(DeadletterSub{pid: ctx.pid})
time.Sleep(time.Millisecond * 10)
case Stopped:
slog.Debug("custom deadletter stopping", "action", "unsubscribing")
ctx.engine.BroadcastEvent(DeadletterUnSub{pid: ctx.pid})
case customDeadLetterFetch:
flush := ctx.Message().(customDeadLetterFetch).flush
slog.Debug("custom deadletter; received fetch request",
"flush", flush,
"messages", len(c.deadLetters))
ctx.Respond(c.deadLetters)
if ctx.Message().(*customDeadLetterFetch).flush {
c.deadLetters = make([]*DeadLetterEvent, 0)
if ctx.Message().(customDeadLetterFetch).flush {
c.deadLetters = c.deadLetters[:0]
}
case *DeadLetterEvent:
slog.Warn("received deadletter event")
msg, ok := ctx.Message().(*DeadLetterEvent)
case DeadLetterEvent:
slog.Warn("custom deadletter; received deadletter event")
msg, ok := ctx.Message().(DeadLetterEvent)
if !ok {
slog.Error("failed to cast deadletter event")
slog.Error("should never happen. brain damaged.")
return
}
c.deadLetters = append(c.deadLetters, msg)
default:
slog.Error("custom deadletter; received unknown message",
"msg", ctx.Message(), "msg-type", reflect.TypeOf(ctx.Message()))
}
}

Expand Down
17 changes: 5 additions & 12 deletions actor/engine.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package actor

import (
"log/slog"
reflect "reflect"
"sync"
"time"
)
Expand Down Expand Up @@ -164,12 +162,7 @@ func (e *Engine) send(pid *PID, msg any, sender *PID) {
return
}
if e.remote == nil {
slog.Error("failed sending messsage",
"err", "engine has no remote configured",
"to", pid,
"type", reflect.TypeOf(msg),
"msg", msg,
)
e.BroadcastEvent(EngineRemoteMissingEvent{Target: pid, Sender: sender, Message: msg})
return
}
e.remote.Send(pid, msg, sender)
Expand Down Expand Up @@ -243,9 +236,9 @@ func (e *Engine) sendPoisonPill(pid *PID, graceful bool, wg ...*sync.WaitGroup)
}
_wg.Add(1)
proc := e.Registry.get(pid)
// deadletter - if we didn't find a process, we will send a deadletter message
// deadletter - if we didn't find a process, we will broadcast a DeadletterEvent
if proc == nil {
e.Send(e.deadLetter, &DeadLetterEvent{
e.BroadcastEvent(DeadLetterEvent{
Target: pid,
Message: poisonPill{_wg, graceful},
Sender: nil,
Expand All @@ -268,8 +261,8 @@ func (e *Engine) sendPoisonPill(pid *PID, graceful bool, wg ...*sync.WaitGroup)
func (e *Engine) SendLocal(pid *PID, msg any, sender *PID) {
proc := e.Registry.get(pid)
if proc == nil {
// send a deadletter message
e.Send(e.deadLetter, &DeadLetterEvent{
// broadcast a deadLetter message
e.BroadcastEvent(DeadLetterEvent{
Target: pid,
Message: msg,
Sender: sender,
Expand Down
20 changes: 20 additions & 0 deletions actor/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,3 +337,23 @@ func BenchmarkSendWithSenderMessageLocal(b *testing.B) {
e.SendWithSender(pid, pid, pid)
}
}

type TestReceiveFunc func(*testing.T, *Context)

type TestReceiver struct {
OnReceive TestReceiveFunc
t *testing.T
}

func NewTestProducer(t *testing.T, f TestReceiveFunc) Producer {
return func() Receiver {
return &TestReceiver{
OnReceive: f,
t: t,
}
}
}

func (r *TestReceiver) Receive(ctx *Context) {
r.OnReceive(r.t, ctx)
}
47 changes: 36 additions & 11 deletions actor/event_stream.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,59 @@
package actor

// EventSub is the message that will be send to subscribe to the event stream.
type EventSub struct {
pid *PID
}

// EventUnSub is the message that will be send to unsubscribe from the event stream.
type EventUnsub struct {
pid *PID
}
import (
"context"
"fmt"
"log/slog"
"reflect"
)

type EventStream struct {
subs map[*PID]bool
subs map[*PID]bool
dlsubs map[*PID]bool
}

func NewEventStream() Producer {
return func() Receiver {
return &EventStream{
subs: make(map[*PID]bool),
subs: make(map[*PID]bool),
dlsubs: make(map[*PID]bool),
}
}
}

// Receive for the event stream. All system-wide events are sent here.
// Some events are specially handled, such as EventSub, EventUnSub (for subscribing to events),
// DeadletterSub, DeadletterUnSub, for subscribing to DeadLetterEvent
func (e *EventStream) Receive(c *Context) {
fmt.Printf("EventStream.Receive: %v\n", reflect.TypeOf(c.Message()))
switch msg := c.Message().(type) {
case EventSub:
e.subs[msg.pid] = true
fmt.Println("EventStream.Receive: EventSub")
case EventUnsub:
delete(e.subs, msg.pid)
fmt.Println("EventStream.Receive: EventUnsub")
case DeadletterSub:
e.dlsubs[msg.pid] = true
case DeadletterUnSub:
delete(e.subs, msg.pid)
case DeadLetterEvent:
// to avoid a loop, check that the message isn't a deadletter.
_, ok := msg.Message.(DeadLetterEvent)
if ok {
c.engine.BroadcastEvent(DeadLetterLoopEvent{})
break
}
for sub := range e.dlsubs {
c.Forward(sub)
}
default:
// check if we should log the event, if so, log it with the relevant level, message and attributes
logMsg, ok := c.Message().(eventLog)
if ok {
level, msg, attr := logMsg.log()
slog.Log(context.Background(), level, msg, attr...)
}
for sub := range e.subs {
c.Forward(sub)
}
Expand Down
Loading

0 comments on commit 1993ba3

Please sign in to comment.