From f6dea40f96c0d64166959ecc31d116491361d1a1 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 10 Jan 2024 10:36:40 +0100 Subject: [PATCH] fix: buffered channel consumed too much memory Signed-off-by: Valery Piashchynski --- go.mod | 2 +- pool/static_pool/workers_pool.go | 47 +++++++++++--------------------- 2 files changed, 17 insertions(+), 32 deletions(-) diff --git a/go.mod b/go.mod index 179facc..65a514f 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,7 @@ module github.com/roadrunner-server/sdk/v4 go 1.21 -toolchain go1.22rc1 +toolchain go1.21.5 require ( github.com/goccy/go-json v0.10.2 diff --git a/pool/static_pool/workers_pool.go b/pool/static_pool/workers_pool.go index 82ab891..c13e78c 100644 --- a/pool/static_pool/workers_pool.go +++ b/pool/static_pool/workers_pool.go @@ -235,15 +235,16 @@ begin: } } - // create channel for the stream (only if there are no errors) - // we need to create a buffered channel to prevent blocking - resp := make(chan *PExec, 100000000) - // send the initial frame - resp <- newPExec(rsp, nil) - switch { case rsp.Flags&frame.STREAM != 0: sp.log.Debug("stream mode", zap.Int64("pid", w.Pid())) + // create channel for the stream (only if there are no errors) + // we need to create a buffered channel to prevent blocking + // stream buffer size should be bigger than regular, to have some payloads ready (optimization) + resp := make(chan *PExec, 5) + // send the initial frame + resp <- newPExec(rsp, nil) + // in case of stream we should not return worker back immediately go func() { // would be called on Goexit @@ -281,24 +282,15 @@ begin: cancelT() if errI != nil { sp.log.Warn("stream error", zap.Error(err)) - // send error response - select { - case resp <- newPExec(nil, errI): - default: - sp.log.Error("failed to send error", zap.Error(errI)) - } + + resp <- newPExec(nil, errI) + // move worker to the invalid state to restart w.State().Transition(fsm.StateInvalid) runtime.Goexit() } - select { - case resp <- newPExec(pld, nil): - default: - sp.log.Error("failed to send payload chunk, stream is corrupted") - // we need to restart the worker since it can be in the incorrect state - w.State().Transition(fsm.StateErrored) - } + resp <- newPExec(pld, nil) if !next { w.State().Transition(fsm.StateReady) @@ -311,24 +303,14 @@ begin: if errI != nil { sp.log.Warn("stream iter error", zap.Error(err)) // send error response - select { - case resp <- newPExec(nil, errI): - default: - sp.log.Error("failed to send error", zap.Error(errI)) - } + resp <- newPExec(nil, errI) // move worker to the invalid state to restart w.State().Transition(fsm.StateInvalid) runtime.Goexit() } - select { - case resp <- newPExec(pld, nil): - default: - sp.log.Error("failed to send payload chunk, stream is corrupted") - // we need to restart the worker since it can be in the incorrect state - w.State().Transition(fsm.StateErrored) - } + resp <- newPExec(pld, nil) if !next { w.State().Transition(fsm.StateReady) @@ -342,6 +324,9 @@ begin: return resp, nil default: + resp := make(chan *PExec, 1) + // send the initial frame + resp <- newPExec(rsp, nil) sp.log.Debug("req-resp mode", zap.Int64("pid", w.Pid())) if w.State().Compare(fsm.StateWorking) { w.State().Transition(fsm.StateReady)