Skip to content

Commit

Permalink
Added SendRepeat functionality to Engine and Context (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
anthdm authored Apr 17, 2023
1 parent 7b48e18 commit fa30610
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 0 deletions.
15 changes: 15 additions & 0 deletions actor/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,21 @@ func (c *Context) Send(pid *PID, msg any) {
c.engine.SendWithSender(pid, msg, c.pid)
}

// SendRepeat will send the given message to the given PID each given interval.
// It will return a SendRepeater struct that can stop the repeating message by calling Stop().
func (c *Context) SendRepeat(pid *PID, msg any, interval time.Duration) SendRepeater {
sr := SendRepeater{
engine: c.engine,
self: c.pid,
target: pid.CloneVT(),
interval: interval,
msg: msg,
cancelch: make(chan struct{}, 1),
}
sr.start()
return sr
}

// Forward will forward the current received message to the given PID.
// This will also set the "forwarder" as the sender of the message.
func (c *Context) Forward(pid *PID) {
Expand Down
27 changes: 27 additions & 0 deletions actor/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,38 @@ package actor
import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestContextSendRepeat(t *testing.T) {
var (
e = NewEngine()
wg = &sync.WaitGroup{}
mu sync.Mutex
sr SendRepeater
)
wg.Add(1)

e.SpawnFunc(func(c *Context) {
switch c.Message().(type) {
case Started:
mu.Lock()
sr = c.SendRepeat(c.PID(), "foo", time.Millisecond*10)
mu.Unlock()
case string:
mu.Lock()
sr.Stop()
mu.Unlock()
assert.Equal(t, c.Sender(), c.PID())
wg.Done()
}
}, "test")
wg.Wait()
}

func TestSpawnChildPID(t *testing.T) {
pidSeparator = ">"
var (
Expand Down
43 changes: 43 additions & 0 deletions actor/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,49 @@ func (e *Engine) send(pid *PID, msg any, sender *PID) {
e.remote.Send(pid, msg, sender)
}

type SendRepeater struct {
engine *Engine
self *PID
target *PID
msg any
interval time.Duration
cancelch chan struct{}
}

func (sr SendRepeater) start() {
ticker := time.NewTicker(sr.interval)
go func() {
for {
select {
case <-ticker.C:
sr.engine.SendWithSender(sr.target, sr.msg, sr.self)
case <-sr.cancelch:
return
}
}
}()
}

func (sr SendRepeater) Stop() {
close(sr.cancelch)
}

// SendRepeat will send the given message to the given PID each given interval.
// It will return a SendRepeater struct that can stop the repeating message by calling Stop().
func (e *Engine) SendRepeat(pid *PID, msg any, interval time.Duration) SendRepeater {
clonedPID := *pid.CloneVT()
sr := SendRepeater{
engine: e,
self: nil,
target: &clonedPID,
interval: interval,
msg: msg,
cancelch: make(chan struct{}, 1),
}
sr.start()
return sr
}

// Poison will send a poisonPill to the process that is associated with the given PID.
// The process will shut down once it processed all its messages before the poisonPill
// was received. If given a WaitGroup, you can wait till the process is completely shutdown.
Expand Down
36 changes: 36 additions & 0 deletions actor/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,42 @@ import (
"github.com/stretchr/testify/require"
)

type tick struct{}
type tickReceiver struct {
ticks int
wg *sync.WaitGroup
}

func (r *tickReceiver) Receive(c *Context) {
switch c.Message().(type) {
case tick:
r.ticks++
if r.ticks == 10 {
r.wg.Done()
}
}
}

func newTickReceiver(wg *sync.WaitGroup) Producer {
return func() Receiver {
return &tickReceiver{
wg: wg,
}
}
}

func TestSendRepeat(t *testing.T) {
var (
e = NewEngine()
wg = &sync.WaitGroup{}
)
wg.Add(1)
pid := e.Spawn(newTickReceiver(wg), "test")
repeater := e.SendRepeat(pid, tick{}, time.Millisecond*2)
wg.Wait()
repeater.Stop()
}

func TestRestarts(t *testing.T) {
e := NewEngine()
wg := sync.WaitGroup{}
Expand Down

0 comments on commit fa30610

Please sign in to comment.