Skip to content
This repository has been archived by the owner on Jun 27, 2024. It is now read-only.

Commit

Permalink
fix: socket erros during relocation
Browse files Browse the repository at this point in the history
Signed-off-by: Valery Piashchynski <[email protected]>
  • Loading branch information
rustatian committed Feb 14, 2022
1 parent 5629950 commit 5ef135f
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 114 deletions.
2 changes: 1 addition & 1 deletion ipc/pipe/pipe_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func Test_Pipe_Echo(t *testing.T) {
}

func Test_Pipe_Echo_Script(t *testing.T) {
t.Parallel()
t.Skip("not supported")
cmd := exec.Command("sh", "../../tests/pipes_test_script.sh")
ctx := context.Background()
w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd)
Expand Down
26 changes: 11 additions & 15 deletions ipc/socket/socket_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ type Factory struct {
// tout specifies for how long factory should serve for incoming relay connection
func NewSocketServer(ls net.Listener, tout time.Duration, log *zap.Logger) *Factory {
f := &Factory{
ls: ls,
tout: tout,
relays: sync.Map{},
log: log,
ls: ls,
tout: tout,
log: log,
}

// Be careful
Expand All @@ -50,6 +49,11 @@ func NewSocketServer(ls net.Listener, tout time.Duration, log *zap.Logger) *Fact
err := f.listen()
// there is no logger here, use fmt
if err != nil {
if opErr, ok := err.(*net.OpError); ok {
if opErr.Err.Error() == "use of closed network connection" {
return
}
}
fmt.Printf("[WARN]: socket server listen, error: %v\n", err)
}
}()
Expand Down Expand Up @@ -210,20 +214,12 @@ func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess
return nil, err
}
default:
// find first pid and attach relay to it
var r *socket.Relay
f.relays.Range(func(k, val interface{}) bool {
r = val.(*socket.Relay)
f.relays.Delete(k)
return false
})

// no relay exists
if r == nil {
rl, ok := f.relays.LoadAndDelete(w.Pid())
if !ok {
continue
}

return r, nil
return rl.(*socket.Relay), nil
}
}
}
Expand Down
1 change: 1 addition & 0 deletions ipc/socket/socket_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func Test_Tcp_Echo(t *testing.T) {
}

func Test_Tcp_Echo_Script(t *testing.T) {
t.Skip("not supported")
time.Sleep(time.Millisecond * 10) // to ensure free socket
ctx := context.Background()
ls, err := net.Listen("tcp", "127.0.0.1:9007")
Expand Down
59 changes: 59 additions & 0 deletions pool/allocator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package pool

import (
"context"
"os/exec"
"time"

"github.com/roadrunner-server/errors"
"github.com/roadrunner-server/sdk/v2/events"
"github.com/roadrunner-server/sdk/v2/ipc"
"github.com/roadrunner-server/sdk/v2/worker"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory ipc.Factory, cmd func() *exec.Cmd) worker.Allocator {
return func() (worker.SyncWorker, error) {
ctxT, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
w, err := factory.SpawnWorkerWithTimeout(ctxT, cmd())
if err != nil {
return nil, err
}

// wrap sync worker
sw := worker.From(w)
sp.log.Debug("worker is allocated", zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerConstruct.String()))
return sw, nil
}
}

// allocate required number of stack
func (sp *StaticPool) parallelAllocator(numWorkers uint64) ([]worker.BaseProcess, error) {
const op = errors.Op("static_pool_allocate_workers")

workers := make([]worker.BaseProcess, numWorkers)
eg := new(errgroup.Group)

// constant number of stack simplify logic
for i := uint64(0); i < numWorkers; i++ {
ii := i
eg.Go(func() error {
w, err := sp.allocator()
if err != nil {
return errors.E(op, errors.WorkerAllocate, err)
}

workers[ii] = w
return nil
})
}

err := eg.Wait()
if err != nil {
return nil, err
}

return workers, nil
}
6 changes: 0 additions & 6 deletions pool/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,3 @@ func WithCustomErrEncoder(errEnc ErrorEncoder) Options {
p.errEncoder = errEnc
}
}

func UseParallelAlloc() Options {
return func(p *StaticPool) {
p.parallelAlloc = true
}
}
94 changes: 2 additions & 92 deletions pool/static_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package pool
import (
"context"
"os/exec"
"time"

"github.com/roadrunner-server/errors"
"github.com/roadrunner-server/sdk/v2/events"
Expand All @@ -13,7 +12,6 @@ import (
"github.com/roadrunner-server/sdk/v2/worker"
workerWatcher "github.com/roadrunner-server/sdk/v2/worker_watcher"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

const (
Expand Down Expand Up @@ -42,9 +40,6 @@ type StaticPool struct {
// manages worker states and TTLs
ww Watcher

// allocate workers in parallel
parallelAlloc bool

// allocate new worker
allocator worker.Allocator

Expand Down Expand Up @@ -92,7 +87,7 @@ func NewStaticPool(ctx context.Context, cmd Command, factory ipc.Factory, cfg *C
p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.log, p.cfg.NumWorkers, p.cfg.AllocateTimeout)

// allocate requested number of workers
workers, err := p.allocateWorkers()
workers, err := p.parallelAllocator(p.cfg.NumWorkers)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -177,7 +172,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) {
func (sp *StaticPool) Reset(ctx context.Context) error {
// destroy all workers
sp.ww.Reset(ctx)
workers, err := sp.allocateWorkers()
workers, err := sp.parallelAllocator(sp.cfg.NumWorkers)
if err != nil {
return err
}
Expand Down Expand Up @@ -367,91 +362,6 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload)
return r, err
}

func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory ipc.Factory, cmd func() *exec.Cmd) worker.Allocator {
return func() (worker.SyncWorker, error) {
ctxT, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
w, err := factory.SpawnWorkerWithTimeout(ctxT, cmd())
if err != nil {
return nil, err
}

// wrap sync worker
sw := worker.From(w)
sp.log.Debug("worker is allocated", zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerConstruct.String()))
return sw, nil
}
}

// allocateWorkers will allocate configured number of workers
func (sp *StaticPool) allocateWorkers() ([]worker.BaseProcess, error) {
switch sp.parallelAlloc {
case true:
workers, err := sp.parallelAllocator(sp.cfg.NumWorkers)
if err != nil {
return nil, err
}

return workers, nil

case false:
workers, err := sp.syncAllocator(sp.cfg.NumWorkers)
if err != nil {
return nil, err
}

return workers, nil
}

return nil, errors.Str("unknown option")
}

// allocate required number of stack
func (sp *StaticPool) parallelAllocator(numWorkers uint64) ([]worker.BaseProcess, error) {
const op = errors.Op("static_pool_allocate_workers")

workers := make([]worker.BaseProcess, numWorkers)
eg := new(errgroup.Group)

// constant number of stack simplify logic
for i := uint64(0); i < numWorkers; i++ {
ii := i
eg.Go(func() error {
w, err := sp.allocator()
if err != nil {
return errors.E(op, errors.WorkerAllocate, err)
}

workers[ii] = w
return nil
})
}

err := eg.Wait()
if err != nil {
return nil, err
}

return workers, nil
}

// allocate required number of stack
func (sp *StaticPool) syncAllocator(numWorkers uint64) ([]worker.BaseProcess, error) {
workers := make([]worker.BaseProcess, 0, numWorkers)

// constant number of stack simplify logic
for i := uint64(0); i < numWorkers; i++ {
w, err := sp.allocator()
if err != nil {
return nil, errors.E(errors.WorkerAllocate, err)
}

workers = append(workers, w)
}

return workers, nil
}

/*
Difference between recursion function call vs goto:
Expand Down

0 comments on commit 5ef135f

Please sign in to comment.