Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] various worker / request queue improvements #995

Merged
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
codeberg.org/gruf/go-store/v2 v2.0.5
github.com/buckket/go-blurhash v1.1.0
github.com/coreos/go-oidc/v3 v3.4.0
github.com/cornelk/hashmap v1.0.8
github.com/disintegration/imaging v1.6.2
github.com/gin-contrib/cors v1.4.0
github.com/gin-contrib/gzip v0.0.6
Expand Down Expand Up @@ -67,7 +68,6 @@ require (
codeberg.org/gruf/go-pools v1.1.0 // indirect
codeberg.org/gruf/go-sched v1.1.1 // indirect
github.com/aymerick/douceur v0.2.0 // indirect
github.com/cornelk/hashmap v1.0.8 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dsoprea/go-exif/v3 v3.0.0-20210625224831-a6301f85c82b // indirect
github.com/dsoprea/go-iptc v0.0.0-20200610044640-bc9ca208b413 // indirect
Expand Down
42 changes: 29 additions & 13 deletions internal/concurrency/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"reflect"
"runtime"

"codeberg.org/gruf/go-kv"
"codeberg.org/gruf/go-runners"
"github.com/superseriousbusiness/gotosocial/internal/log"
)
Expand All @@ -35,7 +36,7 @@ type WorkerPool[MsgType any] struct {
workers runners.WorkerPool
process func(context.Context, MsgType) error
nw, nq int
prefix string // contains type prefix for logging
wtype string // contains worker type for logging
}

// New returns a new WorkerPool[MsgType] with given number of workers and queue ratio,
Expand All @@ -61,12 +62,12 @@ func NewWorkerPool[MsgType any](workers int, queueRatio int) *WorkerPool[MsgType
process: nil,
nw: workers,
nq: workers * queueRatio,
prefix: fmt.Sprintf("worker.Worker[%s]", msgType),
wtype: fmt.Sprintf("worker.Worker[%s]", msgType),
}

// Log new worker creation with type prefix
// Log new worker creation with worker type prefix
log.Infof("%s created with workers=%d queue=%d",
w.prefix,
w.wtype,
workers,
workers*queueRatio,
)
Expand All @@ -76,7 +77,7 @@ func NewWorkerPool[MsgType any](workers int, queueRatio int) *WorkerPool[MsgType

// Start will attempt to start the underlying worker pool, or return error.
func (w *WorkerPool[MsgType]) Start() error {
log.Infof("%s starting", w.prefix)
log.Infof("%s starting", w.wtype)

// Check processor was set
if w.process == nil {
Expand All @@ -93,7 +94,7 @@ func (w *WorkerPool[MsgType]) Start() error {

// Stop will attempt to stop the underlying worker pool, or return error.
func (w *WorkerPool[MsgType]) Stop() error {
log.Infof("%s stopping", w.prefix)
log.Infof("%s stopping", w.wtype)

// Attempt to stop pool
if !w.workers.Stop() {
Expand All @@ -106,19 +107,34 @@ func (w *WorkerPool[MsgType]) Stop() error {
// SetProcessor will set the Worker's processor function, which is called for each queued message.
func (w *WorkerPool[MsgType]) SetProcessor(fn func(context.Context, MsgType) error) {
if w.process != nil {
log.Fatalf("%s Worker.process is already set", w.prefix)
log.Panicf("%s Worker.process is already set", w.wtype)
}
w.process = fn
}

// Queue will queue provided message to be processed with there's a free worker.
func (w *WorkerPool[MsgType]) Queue(msg MsgType) {
log.Tracef("%s queueing message (queue=%d): %+v",
w.prefix, w.workers.Queue(), msg,
)
w.workers.Enqueue(func(ctx context.Context) {
log.Tracef("%s queueing message: %+v", w.wtype, msg)

// Create new process function for msg
process := func(ctx context.Context) {
if err := w.process(ctx, msg); err != nil {
log.Errorf("%s %v", w.prefix, err)
log.WithFields(kv.Fields{
kv.Field{K: "type", V: w.wtype},
kv.Field{K: "error", V: err},
}...).Error("message processing error")
}
})
}

// Attempt a fast-enqueue of process
if !w.workers.EnqueueNow(process) {
// No spot acquired, log warning
log.WithFields(kv.Fields{
kv.Field{K: "type", V: w.wtype},
kv.Field{K: "queue", V: w.workers.Queue()},
}...).Warn("full worker queue")

// Block on enqueuing process func
w.workers.Enqueue(process)
}
}
78 changes: 58 additions & 20 deletions internal/httpclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ import (
"net/netip"
"runtime"
"time"

"codeberg.org/gruf/go-bytesize"
"codeberg.org/gruf/go-kv"
"github.com/cornelk/hashmap"
"github.com/superseriousbusiness/gotosocial/internal/log"
)

// ErrInvalidRequest is returned if a given HTTP request is invalid and cannot be performed.
Expand All @@ -42,8 +47,8 @@ var ErrBodyTooLarge = errors.New("body size too large")
// configuration values passed to initialized http.Transport{}
// and http.Client{}, along with httpclient.Client{} specific.
type Config struct {
// MaxOpenConns limits the max number of concurrent open connections.
MaxOpenConns int
// MaxOpenConnsPerHost limits the max number of open connections to a host.
MaxOpenConnsPerHost int

// MaxIdleConns: see http.Transport{}.MaxIdleConns.
MaxIdleConns int
Expand Down Expand Up @@ -80,8 +85,9 @@ type Config struct {
// is available (context channels still respected)
type Client struct {
client http.Client
rc *requestQueue
bmax int64
queue *hashmap.Map[string, chan struct{}]
bmax int64 // max response body size
cmax int // max open conns per host
}

// New returns a new instance of Client initialized using configuration.
Expand All @@ -94,20 +100,20 @@ func New(cfg Config) *Client {
Resolver: &net.Resolver{},
}

if cfg.MaxOpenConns <= 0 {
if cfg.MaxOpenConnsPerHost <= 0 {
// By default base this value on GOMAXPROCS.
maxprocs := runtime.GOMAXPROCS(0)
cfg.MaxOpenConns = maxprocs * 10
cfg.MaxOpenConnsPerHost = maxprocs * 20
}

if cfg.MaxIdleConns <= 0 {
// By default base this value on MaxOpenConns
cfg.MaxIdleConns = cfg.MaxOpenConns * 10
cfg.MaxIdleConns = cfg.MaxOpenConnsPerHost * 10
}

if cfg.MaxBodySize <= 0 {
// By default set this to a reasonable 40MB
cfg.MaxBodySize = 40 * 1024 * 1024
cfg.MaxBodySize = int64(40 * bytesize.MiB)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

}

// Protect dialer with IP range sanitizer
Expand All @@ -117,11 +123,10 @@ func New(cfg Config) *Client {
}).Sanitize

// Prepare client fields
c.bmax = cfg.MaxBodySize
c.rc = &requestQueue{
maxOpenConns: cfg.MaxOpenConns,
}
c.client.Timeout = cfg.Timeout
c.cmax = cfg.MaxOpenConnsPerHost
c.bmax = cfg.MaxBodySize
c.queue = hashmap.New[string, chan struct{}]()

// Set underlying HTTP client roundtripper
c.client.Transport = &http.Transport{
Expand All @@ -145,17 +150,16 @@ func New(cfg Config) *Client {
// as the standard http.Client{}.Do() implementation except that response body will
// be wrapped by an io.LimitReader() to limit response body sizes.
func (c *Client) Do(req *http.Request) (*http.Response, error) {
// request a spot in the wait queue...
wait, release := c.rc.getWaitSpot(req.Host, req.Method)
// Get host's wait queue
wait := c.wait(req.Host)

var ok bool

// ... and wait our turn
select {
case <-req.Context().Done():
// the request was canceled before we
// got to our turn: no need to release
return nil, req.Context().Err()
// Quickly try grab a spot
case wait <- struct{}{}:
// it's our turn!
ok = true

// NOTE:
// Ideally here we would set the slot release to happen either
Expand All @@ -167,7 +171,27 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
// that connections may not be closed until response body is closed.
// The current implementation will reduce the viability of denial of
// service attacks, but if there are future issues heed this advice :]
defer release()
defer func() { <-wait }()
default:
}

if !ok {
// No spot acquired, log warning
log.WithFields(kv.Fields{
{K: "queue", V: len(wait)},
{K: "method", V: req.Method},
{K: "host", V: req.Host},
{K: "uri", V: req.URL.RequestURI()},
}...).Warn("full request queue")

select {
case <-req.Context().Done():
// the request was canceled before we
// got to our turn: no need to release
return nil, req.Context().Err()
case wait <- struct{}{}:
defer func() { <-wait }()
}
}

// Firstly, ensure this is a valid request
Expand Down Expand Up @@ -208,3 +232,17 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {

return rsp, nil
}

// wait acquires the 'wait' queue for the given host string, or allocates new.
func (c *Client) wait(host string) chan struct{} {
// Look for an existing queue
queue, ok := c.queue.Get(host)
if ok {
return queue
}

// Allocate a new host queue (or return a sneaky existing one).
queue, _ = c.queue.GetOrInsert(host, make(chan struct{}, c.cmax))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah concurrency


return queue
}
68 changes: 0 additions & 68 deletions internal/httpclient/queue.go

This file was deleted.

Loading