Skip to content

Commit

Permalink
simplify dead letter handling even more.
Browse files Browse the repository at this point in the history
  • Loading branch information
perbu committed Dec 6, 2023
1 parent c87a2d1 commit 7f1b362
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 22 deletions.
4 changes: 2 additions & 2 deletions actor/deadletter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ func newDeadLetter() Receiver {
func (d *deadLetter) Receive(ctx *Context) {
switch msg := ctx.Message().(type) {
case Started:
ctx.engine.BroadcastEvent(DeadletterSub{pid: d.pid})
ctx.engine.BroadcastEvent(EventSub{pid: d.pid})
case Stopped:
ctx.engine.BroadcastEvent(DeadletterUnSub{pid: d.pid})
ctx.engine.BroadcastEvent(EventUnsub{pid: d.pid})
case Initialized:
case DeadLetterEvent:
slog.Warn("deadletter arrived", "msg-type", reflect.TypeOf(msg),
Expand Down
2 changes: 1 addition & 1 deletion actor/deadletter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestDeadLetterCustom(t *testing.T) {
e.SpawnFunc(func(c *Context) {
switch c.Message().(type) {
case Initialized:
c.engine.BroadcastEvent(DeadletterSub{c.pid})
c.engine.BroadcastEvent(EventSub{c.pid})
case DeadLetterEvent:
wg.Done()
}
Expand Down
16 changes: 5 additions & 11 deletions actor/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@ import (
)

type EventStream struct {
subs map[*PID]bool
dlsubs map[*PID]bool
subs map[*PID]bool
}

func NewEventStream() Producer {
return func() Receiver {
return &EventStream{
subs: make(map[*PID]bool),
dlsubs: make(map[*PID]bool),
subs: make(map[*PID]bool),
}
}
}
Expand All @@ -31,23 +29,19 @@ func (e *EventStream) Receive(c *Context) {
case EventUnsub:
delete(e.subs, msg.pid)
fmt.Println("EventStream.Receive: EventUnsub")
case DeadletterSub:
e.dlsubs[msg.pid] = true
case DeadletterUnSub:
delete(e.subs, msg.pid)
case DeadLetterEvent:
// to avoid a loop, check that the message isn't a deadletter.
_, ok := msg.Message.(DeadLetterEvent)
if ok {
c.engine.BroadcastEvent(DeadLetterLoopEvent{})
break
}
if len(e.dlsubs) == 0 {
slog.Warn("deadletter arrived, but no subscribers",
if len(e.subs) == 0 {
slog.Warn("deadletter arrived, but no subscribers to event stream",
"sender", msg.Sender, "target", msg.Target, "msg", msg.Message)
break
}
for sub := range e.dlsubs {
for sub := range e.subs {
c.Forward(sub)
}
default:
Expand Down
8 changes: 0 additions & 8 deletions actor/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,6 @@ type eventLog interface {
log() (slog.Level, string, []any)
}

type DeadletterSub struct {
pid *PID
}

type DeadletterUnSub struct {
pid *PID
}

// EventSub is the message that will be send to subscribe to the event stream.
type EventSub struct {
pid *PID
Expand Down

0 comments on commit 7f1b362

Please sign in to comment.