Skip to content
This repository has been archived by the owner on Jun 27, 2024. It is now read-only.

fix: buffered channel consumed too much memory #106

Merged
merged 1 commit into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module github.com/roadrunner-server/sdk/v4

go 1.21

toolchain go1.22rc1
toolchain go1.21.5

require (
github.com/goccy/go-json v0.10.2
Expand Down
47 changes: 16 additions & 31 deletions pool/static_pool/workers_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,15 +235,16 @@ begin:
}
}

// create channel for the stream (only if there are no errors)
// we need to create a buffered channel to prevent blocking
resp := make(chan *PExec, 100000000)
// send the initial frame
resp <- newPExec(rsp, nil)

switch {
case rsp.Flags&frame.STREAM != 0:
sp.log.Debug("stream mode", zap.Int64("pid", w.Pid()))
// create channel for the stream (only if there are no errors)
// we need to create a buffered channel to prevent blocking
// stream buffer size should be bigger than regular, to have some payloads ready (optimization)
resp := make(chan *PExec, 5)
// send the initial frame
resp <- newPExec(rsp, nil)

// in case of stream we should not return worker back immediately
go func() {
// would be called on Goexit
Expand Down Expand Up @@ -281,24 +282,15 @@ begin:
cancelT()
if errI != nil {
sp.log.Warn("stream error", zap.Error(err))
// send error response
select {
case resp <- newPExec(nil, errI):
default:
sp.log.Error("failed to send error", zap.Error(errI))
}

resp <- newPExec(nil, errI)

// move worker to the invalid state to restart
w.State().Transition(fsm.StateInvalid)
runtime.Goexit()
}

select {
case resp <- newPExec(pld, nil):
default:
sp.log.Error("failed to send payload chunk, stream is corrupted")
// we need to restart the worker since it can be in the incorrect state
w.State().Transition(fsm.StateErrored)
}
resp <- newPExec(pld, nil)

if !next {
w.State().Transition(fsm.StateReady)
Expand All @@ -311,24 +303,14 @@ begin:
if errI != nil {
sp.log.Warn("stream iter error", zap.Error(err))
// send error response
select {
case resp <- newPExec(nil, errI):
default:
sp.log.Error("failed to send error", zap.Error(errI))
}
resp <- newPExec(nil, errI)

// move worker to the invalid state to restart
w.State().Transition(fsm.StateInvalid)
runtime.Goexit()
}

select {
case resp <- newPExec(pld, nil):
default:
sp.log.Error("failed to send payload chunk, stream is corrupted")
// we need to restart the worker since it can be in the incorrect state
w.State().Transition(fsm.StateErrored)
}
resp <- newPExec(pld, nil)

if !next {
w.State().Transition(fsm.StateReady)
Expand All @@ -342,6 +324,9 @@ begin:

return resp, nil
default:
resp := make(chan *PExec, 1)
// send the initial frame
resp <- newPExec(rsp, nil)
sp.log.Debug("req-resp mode", zap.Int64("pid", w.Pid()))
if w.State().Compare(fsm.StateWorking) {
w.State().Transition(fsm.StateReady)
Expand Down
Loading