Skip to content
This repository has been archived by the owner on Jun 27, 2024. It is now read-only.

Commit

Permalink
[#95]: feature: dynamic workers scaling
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored Sep 23, 2023
2 parents 4824d1b + fe59f30 commit 363d967
Show file tree
Hide file tree
Showing 10 changed files with 360 additions and 229 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ jobs:
- name: Run golang tests with coverage
run: make test_coverage

- uses: codecov/codecov-action@v4 # Docs: <https://github.com/codecov/codecov-action>
- uses: codecov/codecov-action@v3 # Docs: <https://github.com/codecov/codecov-action>
with:
file: ./coverage-ci/summary.txt
fail_ci_if_error: false
17 changes: 15 additions & 2 deletions fsm/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@ import (
"sync/atomic"

"github.com/roadrunner-server/errors"
"go.uber.org/zap"
)

// NewFSM returns new FSM implementation based on initial state
func NewFSM(initialState int64) *Fsm {
func NewFSM(initialState int64, log *zap.Logger) *Fsm {
return &Fsm{
log: log,
currentState: &initialState,
}
}

// Fsm is general https://en.wikipedia.org/wiki/Finite-state_machine to transition between worker states
type Fsm struct {
log *zap.Logger
numExecs uint64
// to be lightweight, use UnixNano
lastUsed uint64
Expand All @@ -36,6 +39,7 @@ Transition moves worker from one state to another
func (s *Fsm) Transition(to int64) {
err := s.recognizer(to)
if err != nil {
s.log.Debug("fsm transition error", zap.Error(err))
return
}

Expand Down Expand Up @@ -131,7 +135,16 @@ func (s *Fsm) recognizer(to int64) error {

return errors.E(op, errors.Errorf("can't transition from state: %s", s.String()))
// to
case StateInvalid, StateStopping, StateStopped, StateMaxJobsReached, StateErrored, StateIdleTTLReached, StateTTLReached, StateMaxMemoryReached, StateExecTTLReached:
case
StateInvalid,
StateStopping,
StateStopped,
StateMaxJobsReached,
StateErrored,
StateIdleTTLReached,
StateTTLReached,
StateMaxMemoryReached,
StateExecTTLReached:
// from
if atomic.LoadInt64(s.currentState) == StateDestroyed {
return errors.E(op, errors.Errorf("can't transition from state: %s", s.String()))
Expand Down
27 changes: 16 additions & 11 deletions fsm/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,29 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)

func Test_NewState(t *testing.T) {
st := NewFSM(StateErrored)
log, err := zap.NewDevelopment()
assert.NoError(t, err)
st := NewFSM(StateErrored, log)

assert.Equal(t, "errored", st.String())

assert.Equal(t, "inactive", NewFSM(StateInactive).String())
assert.Equal(t, "ready", NewFSM(StateReady).String())
assert.Equal(t, "working", NewFSM(StateWorking).String())
assert.Equal(t, "stopped", NewFSM(StateStopped).String())
assert.Equal(t, "undefined", NewFSM(1000).String())
assert.Equal(t, "inactive", NewFSM(StateInactive, log).String())
assert.Equal(t, "ready", NewFSM(StateReady, log).String())
assert.Equal(t, "working", NewFSM(StateWorking, log).String())
assert.Equal(t, "stopped", NewFSM(StateStopped, log).String())
assert.Equal(t, "undefined", NewFSM(1000, log).String())
}

func Test_IsActive(t *testing.T) {
assert.False(t, NewFSM(StateInactive).IsActive())
assert.True(t, NewFSM(StateReady).IsActive())
assert.True(t, NewFSM(StateWorking).IsActive())
assert.False(t, NewFSM(StateStopped).IsActive())
assert.False(t, NewFSM(StateErrored).IsActive())
log, err := zap.NewDevelopment()
assert.NoError(t, err)
assert.False(t, NewFSM(StateInactive, log).IsActive())
assert.True(t, NewFSM(StateReady, log).IsActive())
assert.True(t, NewFSM(StateWorking, log).IsActive())
assert.False(t, NewFSM(StateStopped, log).IsActive())
assert.False(t, NewFSM(StateErrored, log).IsActive())
}
9 changes: 6 additions & 3 deletions pool/static_pool/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
const (
MB = 1024 * 1024

// NSEC_IN_SEC nanoseconds in second
NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck
// NsecInSec nanoseconds in second
NsecInSec int64 = 1000000000
)

func (sp *Pool) Start() {
Expand Down Expand Up @@ -71,6 +71,9 @@ func (sp *Pool) control() {
_ = workers[i].Stop()
}

// call cleanup callback
workers[i].Callback()

continue
}

Expand Down Expand Up @@ -150,7 +153,7 @@ func (sp *Pool) control() {

// convert last used to unixNano and sub time.now to seconds
// negative number, because lu always in the past, except for the `back to the future` :)
res := ((int64(lu) - now.UnixNano()) / NSEC_IN_SEC) * -1
res := ((int64(lu) - now.UnixNano()) / NsecInSec) * -1

// maxWorkerIdle more than diff between now and last used
// for example:
Expand Down
68 changes: 66 additions & 2 deletions pool/static_pool/supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,39 @@ func Test_SupervisedPool_Exec(t *testing.T) {
cancel()
}

func Test_SupervisedPool_AddRemoveWorkers(t *testing.T) {
ctx := context.Background()
p, err := NewPool(
ctx,
func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") },
pipe.NewPipeFactory(log()),
cfgSupervised,
log(),
)

require.NoError(t, err)
require.NotNil(t, p)

time.Sleep(time.Second)

pidBefore := p.Workers()[0].Pid()

for i := 0; i < 10; i++ {
time.Sleep(time.Second)
_, err = p.Exec(ctx, &payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
}, make(chan struct{}))
require.NoError(t, err)
}

require.NotEqual(t, pidBefore, p.Workers()[0].Pid())

ctxNew, cancel := context.WithTimeout(ctx, time.Second)
p.Destroy(ctxNew)
cancel()
}

func Test_SupervisedPool_ImmediateDestroy(t *testing.T) {
ctx := context.Background()

Expand Down Expand Up @@ -118,6 +151,31 @@ func Test_SupervisedPool_NilConfig(t *testing.T) {
assert.Nil(t, p)
}

func Test_SupervisedPool_RemoveNoWorkers(t *testing.T) {
ctx := context.Background()

p, err := NewPool(
ctx,
func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(log()),
cfgSupervised,
log(),
)
assert.NoError(t, err)
assert.NotNil(t, p)

_, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{}))
assert.NoError(t, err)

wrks := p.Workers()
for i := 0; i < len(wrks); i++ {
assert.NoError(t, p.RemoveWorker(ctx))
}

assert.Len(t, p.Workers(), 0)
p.Destroy(ctx)
}

func Test_SupervisedPool_RemoveWorker(t *testing.T) {
ctx := context.Background()

Expand All @@ -136,13 +194,19 @@ func Test_SupervisedPool_RemoveWorker(t *testing.T) {

wrks := p.Workers()
for i := 0; i < len(wrks); i++ {
assert.NoError(t, p.RemoveWorker(wrks[i]))
assert.NoError(t, p.RemoveWorker(ctx))
}

_, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{}))
assert.Error(t, err)

err = p.AddWorker()
assert.NoError(t, err)

assert.Len(t, p.Workers(), 0)
_, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{}))
assert.NoError(t, err)

assert.Len(t, p.Workers(), 1)

p.Destroy(ctx)
}
Expand Down
21 changes: 15 additions & 6 deletions pool/static_pool/workers_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,19 @@ func (sp *Pool) Workers() (workers []*worker.Process) {
return sp.ww.List()
}

// RemoveWorker function should not be used outside the `Wait` function
func (sp *Pool) RemoveWorker(wb *worker.Process) error {
sp.ww.Remove(wb)
return nil
func (sp *Pool) RemoveWorker(ctx context.Context) error {
var cancel context.CancelFunc
_, ok := ctx.Deadline()
if !ok {
ctx, cancel = context.WithTimeout(ctx, sp.cfg.DestroyTimeout)
defer cancel()
}

return sp.ww.RemoveWorker(ctx)
}

func (sp *Pool) AddWorker() error {
return sp.ww.AddWorker()
}

// Exec executes provided payload on the worker
Expand Down Expand Up @@ -298,9 +307,9 @@ func (sp *Pool) Reset(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, sp.cfg.ResetTimeout)
defer cancel()
// reset all workers
sp.ww.Reset(ctx)
numToAllocate := sp.ww.Reset(ctx)
// re-allocate all workers
workers, err := pool.AllocateParallel(sp.cfg.NumWorkers, sp.allocator)
workers, err := pool.AllocateParallel(numToAllocate, sp.allocator)
if err != nil {
return err
}
Expand Down
61 changes: 57 additions & 4 deletions pool/static_pool/workers_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,47 @@ func Test_NewPool(t *testing.T) {
p.Destroy(ctx)
}

func Test_NewPoolAddRemoveWorkers(t *testing.T) {
var testCfg2 = &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 500,
DestroyTimeout: time.Second * 500,
}

ctx := context.Background()
p, err := NewPool(
ctx,
func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(log()),
testCfg2,
log(),
)
assert.NoError(t, err)
assert.NotNil(t, p)

r, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{}))
resp := <-r

assert.Equal(t, []byte("hello"), resp.Body())
assert.NoError(t, err)

for i := 0; i < 100; i++ {
err = p.AddWorker()
assert.NoError(t, err)
}

err = p.AddWorker()
assert.NoError(t, err)

err = p.RemoveWorker(ctx)
assert.NoError(t, err)

err = p.RemoveWorker(ctx)
assert.NoError(t, err)

p.Destroy(ctx)
}

func Test_StaticPool_NilFactory(t *testing.T) {
ctx := context.Background()
p, err := NewPool(
Expand Down Expand Up @@ -104,11 +145,17 @@ func Test_StaticPool_ImmediateDestroy(t *testing.T) {
func Test_StaticPool_RemoveWorker(t *testing.T) {
ctx := context.Background()

var testCfg2 = &pool.Config{
NumWorkers: 5,
AllocateTimeout: time.Second * 5,
DestroyTimeout: time.Second * 5,
}

p, err := NewPool(
ctx,
func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(log()),
testCfg,
testCfg2,
log(),
)
assert.NoError(t, err)
Expand All @@ -119,18 +166,24 @@ func Test_StaticPool_RemoveWorker(t *testing.T) {

wrks := p.Workers()
for i := 0; i < len(wrks); i++ {
assert.NoError(t, p.RemoveWorker(wrks[i]))
assert.NoError(t, p.RemoveWorker(ctx))
}

_, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{}))
assert.Error(t, err)

err = p.AddWorker()
assert.NoError(t, err)

assert.Len(t, p.Workers(), 0)
_, err = p.Exec(ctx, &payload.Payload{Body: []byte("hello"), Context: nil}, make(chan struct{}))
assert.NoError(t, err)

assert.Len(t, p.Workers(), 1)

p.Destroy(ctx)
}

func Test_Poll_Reallocate(t *testing.T) {
func Test_Pool_Reallocate(t *testing.T) {
var testCfg2 = &pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 500,
Expand Down
14 changes: 13 additions & 1 deletion worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Process struct {
created time.Time
log *zap.Logger

callback func()
// fsm holds information about current Process state,
// number of Process executions, buf status change time.
// publicly this object is receive-only and protected using Mutex
Expand Down Expand Up @@ -98,7 +99,7 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
w.log = z
}

w.fsm = fsm.NewFSM(fsm.StateInactive)
w.fsm = fsm.NewFSM(fsm.StateInactive, w.log)

// set self as stderr implementation (Writer interface)
rc, err := cmd.StderrPipe()
Expand Down Expand Up @@ -130,6 +131,17 @@ func (w *Process) Pid() int64 {
return int64(w.pid)
}

func (w *Process) AddCallback(cb func()) {
w.callback = cb
}

func (w *Process) Callback() {
if w.callback == nil {
return
}
w.callback()
}

// Created returns time, worker was created at.
func (w *Process) Created() time.Time {
return w.created
Expand Down
Loading

0 comments on commit 363d967

Please sign in to comment.