Skip to content

Commit

Permalink
Adding a context to the remote for shutdown. (#76)
Browse files Browse the repository at this point in the history
* one major change. engine.WithRemote now takes a context, which is passed to the remote. It will shut down the remote when the context is cancelled.

One minor change. The engine now has a GetLogger() which responds with the logger, so the remote can set up logging on its own.

* make it easy to get a flood of logs when debugging tests.

* wip; remote.Stop() returns sync.Waitgroup

* fix the tests so they're green again.

* track state so we don't stop or start twice.

* forgot to set the state to running. tests are green again.

* adjust the benchmark to changes in the API.

* actor.NewEngine() now returns (*Engine, error)

* return error from NewEngine if one or more of the options fail.
  • Loading branch information
perbu authored Dec 4, 2023
1 parent e82ae31 commit 1142617
Show file tree
Hide file tree
Showing 27 changed files with 300 additions and 89 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ build:
go build -o bin/restarts examples/restarts/main.go
go build -o bin/eventstream examples/eventstream/main.go
go build -o bin/tcpserver examples/tcpserver/main.go
go build -o bin/metrics examples/metrics/main.go
go build -o bin/metrics examples/metrics/main.go
go build -o bin/chatserver examples/chat/server/main.go
go build -o bin/chatclient examples/chat/client/main.go

bench:
go run _bench/main.go
Expand Down
20 changes: 12 additions & 8 deletions actor/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ import (

func TestContextSendRepeat(t *testing.T) {
var (
e = NewEngine()
wg = &sync.WaitGroup{}
mu sync.Mutex
sr SendRepeater
)
e, err := NewEngine()
require.NoError(t, err)
wg.Add(1)

e.SpawnFunc(func(c *Context) {
Expand All @@ -38,12 +39,12 @@ func TestContextSendRepeat(t *testing.T) {
func TestSpawnChildPID(t *testing.T) {
pidSeparator = ">"
var (
e = NewEngine()
wg = sync.WaitGroup{}
childfn = func(c *Context) {}
expectedPID = NewPID(LocalLookupAddr, "parent", "child")
)

e, err := NewEngine()
require.NoError(t, err)
wg.Add(1)
e.SpawnFunc(func(c *Context) {
switch c.Message().(type) {
Expand All @@ -61,9 +62,10 @@ func TestSpawnChildPID(t *testing.T) {

func TestChild(t *testing.T) {
var (
e = NewEngine()
wg = sync.WaitGroup{}
)
e, err := NewEngine()
require.NoError(t, err)
wg.Add(1)
e.SpawnFunc(func(c *Context) {
switch c.Message().(type) {
Expand All @@ -81,10 +83,11 @@ func TestChild(t *testing.T) {

func TestParent(t *testing.T) {
var (
e = NewEngine()
wg = sync.WaitGroup{}
parent = NewPID(LocalLookupAddr, "foo", "bar", "baz")
)
e, err := NewEngine()
require.NoError(t, err)
wg.Add(1)

childfn := func(c *Context) {
Expand All @@ -107,7 +110,8 @@ func TestParent(t *testing.T) {
}

func TestGetPID(t *testing.T) {
e := NewEngine()
e, err := NewEngine()
require.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(1)
e.SpawnFunc(func(c *Context) {
Expand All @@ -123,10 +127,10 @@ func TestGetPID(t *testing.T) {

func TestSpawnChild(t *testing.T) {
var (
e = NewEngine()
wg = sync.WaitGroup{}
)

e, err := NewEngine()
require.NoError(t, err)
wg.Add(1)
childFunc := func(c *Context) {
switch c.Message().(type) {
Expand Down
6 changes: 4 additions & 2 deletions actor/deadletter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import (
func TestDeadLetterDefault(t *testing.T) {
logBuffer := SafeBuffer{}
lh := log.NewHandler(&logBuffer, log.TextFormat, slog.LevelDebug)
e := NewEngine(EngineOptLogger(log.NewLogger("[engine]", lh)))
e, err := NewEngine(EngineOptLogger(log.NewLogger("[engine]", lh)))
assert.NoError(t, err)
a1 := e.Spawn(newTestActor, "a1")
assert.NotNil(t, a1)
dl := e.Registry.getByID("deadletter")
Expand All @@ -39,9 +40,10 @@ func TestDeadLetterDefault(t *testing.T) {
// It is using the custom deadletter receiver below.
func TestDeadLetterCustom(t *testing.T) {
lh := log.NewHandler(os.Stdout, log.TextFormat, slog.LevelDebug)
e := NewEngine(
e, err := NewEngine(
EngineOptLogger(log.NewLogger("[engine]", lh)),
EngineOptDeadletter(newCustomDeadLetter))
assert.NoError(t, err)
a1 := e.Spawn(newTestActor, "a1")
assert.NotNil(t, a1)
dl := e.Registry.getByID("deadletter")
Expand Down
30 changes: 23 additions & 7 deletions actor/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
type Remoter interface {
Address() string
Send(*PID, any, *PID)
Start(*Engine)
Start(*Engine, log.Logger) error
}

// Producer is any function that can return a Receiver
Expand All @@ -32,17 +32,23 @@ type Engine struct {
deadLetter *PID
eventStream *PID
logger log.Logger
initErrors []error
}

// NewEngine returns a new actor Engine.
// You can pass an optional logger through
func NewEngine(opts ...func(*Engine)) *Engine {
// You can pass configuration functions through the various functions starting with "EngineOpt"
// These run after the engine is configured
func NewEngine(opts ...func(*Engine)) (*Engine, error) {
e := &Engine{}
e.Registry = newRegistry(e) // need to init the registry in case we want a custom deadletter
e.address = LocalLookupAddr
e.initErrors = make([]error, 0)
for _, o := range opts {
o(e)
}
if len(e.initErrors) > 0 {
return nil, ErrInitFailed{Errors: e.initErrors}
}
if e.remote != nil {
e.address = e.remote.Address()
}
Expand All @@ -53,10 +59,10 @@ func NewEngine(opts ...func(*Engine)) *Engine {
e.logger.Debugw("no deadletter receiver set, registering default")
e.deadLetter = e.Spawn(newDeadLetter, "deadletter")
}
return e
return e, nil
}

// TODO: Doc
// EngineOptLogger configured the engine with a logger from the internal log package
func EngineOptLogger(logger log.Logger) func(*Engine) {
return func(e *Engine) {
e.logger = logger
Expand All @@ -69,25 +75,35 @@ func EngineOptRemote(r Remoter) func(*Engine) {
e.remote = r
e.address = r.Address()
// TODO: potential error not handled here
r.Start(e)
r.Start(e, e.logger)
}
}

// TODO: Doc
// Todo: make the pid separator a struct variable
func EngineOptPidSeparator(sep string) func(*Engine) {
// This looks weird because the separator is a global variable.
return func(e *Engine) {
pidSeparator = sep
}
}

// TODO: Doc
// EngineOptDeadletter takes an actor and configures the engine to use it for dead letter handling
// This allows you to customize how deadletters are handled.
func EngineOptDeadletter(d Producer) func(*Engine) {
return func(e *Engine) {
e.deadLetter = e.Spawn(d, "deadletter")
}
}

// WithRemote returns a new actor Engine with the given Remoter,
// and will call its Start function
func (e *Engine) WithRemote(r Remoter) {
e.remote = r
e.address = r.Address()
r.Start(e, e.logger)
}

// Spawn spawns a process that will producer by the given Producer and
// can be configured with the given opts.
func (e *Engine) Spawn(p Producer, name string, opts ...OptFunc) *PID {
Expand Down
46 changes: 30 additions & 16 deletions actor/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ func newTickReceiver(wg *sync.WaitGroup) Producer {

func TestSendRepeat(t *testing.T) {
var (
e = NewEngine()
wg = &sync.WaitGroup{}
)
e, err := NewEngine()
require.NoError(t, err)
wg.Add(1)
pid := e.Spawn(newTickReceiver(wg), "test")
repeater := e.SendRepeat(pid, tick{}, time.Millisecond*2)
Expand All @@ -49,7 +50,8 @@ func TestSendRepeat(t *testing.T) {
}

func TestRestartsMaxRestarts(t *testing.T) {
e := NewEngine()
e, err := NewEngine()
require.NoError(t, err)
restarts := 2
type payload struct {
data int
Expand All @@ -62,7 +64,7 @@ func TestRestartsMaxRestarts(t *testing.T) {
if msg.data != 10 {
panic("I failed to process this message")
} else {
fmt.Println("finally processed all my messsages after borking.", msg.data)
fmt.Println("finally processed all my messages after borking.", msg.data)
}
}
}, "foo", WithMaxRestarts(restarts))
Expand All @@ -74,10 +76,11 @@ func TestRestartsMaxRestarts(t *testing.T) {

func TestProcessInitStartOrder(t *testing.T) {
var (
e = NewEngine()
wg = sync.WaitGroup{}
started, init bool
)
e, err := NewEngine()
require.NoError(t, err)
pid := e.SpawnFunc(func(c *Context) {
switch c.Message().(type) {
case Initialized:
Expand All @@ -99,7 +102,8 @@ func TestProcessInitStartOrder(t *testing.T) {
}

func TestRestarts(t *testing.T) {
e := NewEngine()
e, err := NewEngine()
require.NoError(t, err)
wg := sync.WaitGroup{}
type payload struct {
data int
Expand Down Expand Up @@ -129,10 +133,11 @@ func TestRestarts(t *testing.T) {

func TestSendWithSender(t *testing.T) {
var (
e = NewEngine()
sender = NewPID("local", "sender")
wg = sync.WaitGroup{}
)
e, err := NewEngine()
require.NoError(t, err)
wg.Add(1)

pid := e.SpawnFunc(func(c *Context) {
Expand All @@ -147,7 +152,8 @@ func TestSendWithSender(t *testing.T) {
}

func TestSendMsgRaceCon(t *testing.T) {
e := NewEngine()
e, err := NewEngine()
require.NoError(t, err)
wg := sync.WaitGroup{}

pid := e.SpawnFunc(func(c *Context) {
Expand All @@ -166,7 +172,8 @@ func TestSendMsgRaceCon(t *testing.T) {
}

func TestSpawn(t *testing.T) {
e := NewEngine()
e, err := NewEngine()
require.NoError(t, err)
wg := sync.WaitGroup{}

for i := 0; i < 10; i++ {
Expand All @@ -184,10 +191,11 @@ func TestSpawn(t *testing.T) {

func TestStopWaitGroup(t *testing.T) {
var (
e = NewEngine()
wg = sync.WaitGroup{}
x = int32(0)
)
e, err := NewEngine()
require.NoError(t, err)
wg.Add(1)

pid := e.SpawnFunc(func(c *Context) {
Expand All @@ -208,9 +216,10 @@ func TestStopWaitGroup(t *testing.T) {

func TestStop(t *testing.T) {
var (
e = NewEngine()
wg = sync.WaitGroup{}
)
e, err := NewEngine()
require.NoError(t, err)
for i := 0; i < 4; i++ {
wg.Add(1)
tag := strconv.Itoa(i)
Expand All @@ -234,10 +243,11 @@ func TestStop(t *testing.T) {

func TestPoisonWaitGroup(t *testing.T) {
var (
e = NewEngine()
wg = sync.WaitGroup{}
x = int32(0)
)
e, err := NewEngine()
require.NoError(t, err)
wg.Add(1)

pid := e.SpawnFunc(func(c *Context) {
Expand All @@ -258,9 +268,10 @@ func TestPoisonWaitGroup(t *testing.T) {

func TestPoison(t *testing.T) {
var (
e = NewEngine()
wg = sync.WaitGroup{}
)
e, err := NewEngine()
require.NoError(t, err)
for i := 0; i < 4; i++ {
wg.Add(1)
tag := strconv.Itoa(i)
Expand All @@ -284,7 +295,8 @@ func TestPoison(t *testing.T) {
}

func TestRequestResponse(t *testing.T) {
e := NewEngine()
e, err := NewEngine()
require.NoError(t, err)
pid := e.Spawn(NewTestProducer(t, func(t *testing.T, ctx *Context) {
if msg, ok := ctx.Message().(string); ok {
assert.Equal(t, "foo", msg)
Expand All @@ -303,7 +315,8 @@ func TestRequestResponse(t *testing.T) {

// 56 ns/op
func BenchmarkSendMessageLocal(b *testing.B) {
e := NewEngine()
e, err := NewEngine()
require.NoError(b, err)
pid := e.SpawnFunc(func(_ *Context) {}, "bench", WithInboxSize(128))

b.ResetTimer()
Expand All @@ -315,10 +328,11 @@ func BenchmarkSendMessageLocal(b *testing.B) {
}

func BenchmarkSendWithSenderMessageLocal(b *testing.B) {
e := NewEngine()
e, err := NewEngine()
require.NoError(b, err)
p := NewTestProducer(nil, func(_ *testing.T, _ *Context) {})
pid := e.Spawn(p, "bench", WithInboxSize(1024*8))

b.ResetTimer()
for i := 0; i < b.N; i++ {
e.SendWithSender(pid, pid, pid)
}
Expand Down
12 changes: 12 additions & 0 deletions actor/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package actor

import "fmt"

type ErrInitFailed struct {
Errors []error
}

func (e ErrInitFailed) Error() string {
// Todo: make the error pretty
return fmt.Sprintf("failed to initialize engine: %v", e.Errors)
}
4 changes: 3 additions & 1 deletion actor/event_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package actor

import (
fmt "fmt"
"github.com/stretchr/testify/assert"
"sync"
"testing"
)
Expand All @@ -11,7 +12,8 @@ type CustomEvent struct {
}

func TestEventStreamLocal(t *testing.T) {
e := NewEngine()
e, err := NewEngine()
assert.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(2)
e.SpawnFunc(func(c *Context) {
Expand Down
Loading

0 comments on commit 1142617

Please sign in to comment.