From 51da706e3d421630301c4ebe46542058003c6992 Mon Sep 17 00:00:00 2001 From: kim Date: Sun, 6 Nov 2022 22:07:32 +0000 Subject: [PATCH 1/9] greatly simplify httpclient request queuing Signed-off-by: kim --- internal/httpclient/client.go | 54 +++++++++++---- internal/httpclient/queue.go | 68 ------------------- internal/httpclient/queue_test.go | 106 ------------------------------ 3 files changed, 40 insertions(+), 188 deletions(-) delete mode 100644 internal/httpclient/queue.go delete mode 100644 internal/httpclient/queue_test.go diff --git a/internal/httpclient/client.go b/internal/httpclient/client.go index 7aa0cd8eab..3c5dedae35 100644 --- a/internal/httpclient/client.go +++ b/internal/httpclient/client.go @@ -26,6 +26,9 @@ import ( "net/netip" "runtime" "time" + + "codeberg.org/gruf/go-bytesize" + "codeberg.org/gruf/go-cache/v2" ) // ErrInvalidRequest is returned if a given HTTP request is invalid and cannot be performed. @@ -42,8 +45,8 @@ var ErrBodyTooLarge = errors.New("body size too large") // configuration values passed to initialized http.Transport{} // and http.Client{}, along with httpclient.Client{} specific. type Config struct { - // MaxOpenConns limits the max number of concurrent open connections. - MaxOpenConns int + // MaxOpenConnsPerHost limits the max number of open connections to a host. + MaxOpenConnsPerHost int // MaxIdleConns: see http.Transport{}.MaxIdleConns. MaxIdleConns int @@ -80,8 +83,9 @@ type Config struct { // is available (context channels still respected) type Client struct { client http.Client - rc *requestQueue - bmax int64 + queue cache.Cache[string, chan struct{}] + bmax int64 // max response body size + cmax int // max open conns per host } // New returns a new instance of Client initialized using configuration. @@ -94,20 +98,20 @@ func New(cfg Config) *Client { Resolver: &net.Resolver{}, } - if cfg.MaxOpenConns <= 0 { + if cfg.MaxOpenConnsPerHost <= 0 { // By default base this value on GOMAXPROCS. maxprocs := runtime.GOMAXPROCS(0) - cfg.MaxOpenConns = maxprocs * 10 + cfg.MaxOpenConnsPerHost = maxprocs * 20 } if cfg.MaxIdleConns <= 0 { // By default base this value on MaxOpenConns - cfg.MaxIdleConns = cfg.MaxOpenConns * 10 + cfg.MaxIdleConns = cfg.MaxOpenConnsPerHost * 10 } if cfg.MaxBodySize <= 0 { // By default set this to a reasonable 40MB - cfg.MaxBodySize = 40 * 1024 * 1024 + cfg.MaxBodySize = 40 * bytesize.MiB } // Protect dialer with IP range sanitizer @@ -118,11 +122,13 @@ func New(cfg Config) *Client { // Prepare client fields c.bmax = cfg.MaxBodySize - c.rc = &requestQueue{ - maxOpenConns: cfg.MaxOpenConns, - } + c.queue = cache.New[string, chan struct{}]() c.client.Timeout = cfg.Timeout + // Start cache sweep routines + c.queue.SetTTL(time.Minute*10, true) + _ = c.queue.Start(time.Second * 30) + // Set underlying HTTP client roundtripper c.client.Transport = &http.Transport{ Proxy: http.ProxyFromEnvironment, @@ -145,8 +151,8 @@ func New(cfg Config) *Client { // as the standard http.Client{}.Do() implementation except that response body will // be wrapped by an io.LimitReader() to limit response body sizes. func (c *Client) Do(req *http.Request) (*http.Response, error) { - // request a spot in the wait queue... - wait, release := c.rc.getWaitSpot(req.Host, req.Method) + // Get host's wait queue + wait := c.wait(req.Host) // ... and wait our turn select { @@ -167,7 +173,7 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { // that connections may not be closed until response body is closed. // The current implementation will reduce the viability of denial of // service attacks, but if there are future issues heed this advice :] - defer release() + defer func() { <-wait }() } // Firstly, ensure this is a valid request @@ -208,3 +214,23 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { return rsp, nil } + +// wait acquires the 'wait' queue for the given host string, or allocates new. +func (c *Client) wait(host string) chan struct{} { + // Look for an existing queue for host + queue, ok := c.queue.Get(host) + if ok { + return queue + } + + // Allocate a new host queue + queue = make(chan struct{}, c.cmax) + + // Attempt to cache this queue by host + if !c.queue.Put(host, queue) { + // Someone beat us to the punch... + queue, _ = c.queue.Get(host) + } + + return queue +} diff --git a/internal/httpclient/queue.go b/internal/httpclient/queue.go deleted file mode 100644 index 8cb1274be5..0000000000 --- a/internal/httpclient/queue.go +++ /dev/null @@ -1,68 +0,0 @@ -/* - GoToSocial - Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU Affero General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License - along with this program. If not, see . -*/ - -package httpclient - -import ( - "strings" - "sync" - - "github.com/superseriousbusiness/gotosocial/internal/log" -) - -type requestQueue struct { - hostQueues sync.Map // map of `hostQueue` - maxOpenConns int // max open conns per host per request method -} - -type hostQueue struct { - slotsByMethod sync.Map -} - -// getWaitSpot returns a wait channel and release function for http clients -// that want to do requests politely: that is, wait for their turn. -// -// To wait, a caller should do a select on an attempted insert into the -// returned wait channel. Once the insert succeeds, then the caller should -// proceed with the http request that pertains to the given host + method. -// It doesn't matter what's put into the wait channel, just any interface{}. -// -// When the caller is finished with their http request, they should free up the -// slot they were occupying in the wait queue, by calling the release function. -// -// The reason for the caller needing to provide host and method, is that each -// remote host has a separate wait queue, and there's a separate wait queue -// per method for that host as well. This ensures that outgoing requests can still -// proceed for others hosts and methods while other requests are undergoing, -// while also preventing one host from being spammed with, for example, a -// shitload of GET requests all at once. -func (rc *requestQueue) getWaitSpot(host string, method string) (wait chan<- interface{}, release func()) { - hostQueueI, _ := rc.hostQueues.LoadOrStore(host, new(hostQueue)) - hostQueue, ok := hostQueueI.(*hostQueue) - if !ok { - log.Panic("hostQueueI was not a *hostQueue") - } - - waitSlotI, _ := hostQueue.slotsByMethod.LoadOrStore(strings.ToUpper(method), make(chan interface{}, rc.maxOpenConns)) - methodQueue, ok := waitSlotI.(chan interface{}) - if !ok { - log.Panic("waitSlotI was not a chan interface{}") - } - - return methodQueue, func() { <-methodQueue } -} diff --git a/internal/httpclient/queue_test.go b/internal/httpclient/queue_test.go deleted file mode 100644 index c6d6ad324f..0000000000 --- a/internal/httpclient/queue_test.go +++ /dev/null @@ -1,106 +0,0 @@ -/* - GoToSocial - Copyright (C) 2021-2022 GoToSocial Authors admin@gotosocial.org - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU Affero General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License - along with this program. If not, see . -*/ - -package httpclient - -import ( - "net/http" - "testing" - "time" - - "github.com/stretchr/testify/suite" -) - -type QueueTestSuite struct { - suite.Suite -} - -func (suite *QueueTestSuite) TestQueue() { - maxOpenConns := 5 - waitTimeout := 1 * time.Second - - rc := &requestQueue{ - maxOpenConns: maxOpenConns, - } - - // fill all the open connections - var release func() - for i, n := range make([]interface{}, maxOpenConns) { - w, r := rc.getWaitSpot("example.org", http.MethodPost) - w <- n - if i == maxOpenConns-1 { - // save the last release function - release = r - } - } - - // try to wait again for the same host/method combo, it should timeout - waitAgain, _ := rc.getWaitSpot("example.org", "post") - - select { - case waitAgain <- struct{}{}: - suite.FailNow("first wait did not time out") - case <-time.After(waitTimeout): - break - } - - // now close the final release that we derived earlier - release() - - // try waiting again, it should work this time - select { - case waitAgain <- struct{}{}: - break - case <-time.After(waitTimeout): - suite.FailNow("second wait timed out") - } - - // the POST queue is now sitting on full - suite.Len(waitAgain, maxOpenConns) - - // we should still be able to make a GET for the same host though - getWait, getRelease := rc.getWaitSpot("example.org", http.MethodGet) - select { - case getWait <- struct{}{}: - break - case <-time.After(waitTimeout): - suite.FailNow("get wait timed out") - } - - // the GET queue has one request waiting - suite.Len(getWait, 1) - // clear it... - getRelease() - suite.Empty(getWait) - - // even though the POST queue for example.org is full, we - // should still be able to make a POST request to another host :) - waitForAnotherHost, _ := rc.getWaitSpot("somewhere.else", http.MethodPost) - select { - case waitForAnotherHost <- struct{}{}: - break - case <-time.After(waitTimeout): - suite.FailNow("get wait timed out") - } - - suite.Len(waitForAnotherHost, 1) -} - -func TestQueueTestSuite(t *testing.T) { - suite.Run(t, &QueueTestSuite{}) -} From 8acf1fba417b728ae0c7b02652a5657139a1c1ca Mon Sep 17 00:00:00 2001 From: kim Date: Wed, 14 Sep 2022 21:31:10 +0100 Subject: [PATCH 2/9] improved request queue mutex logic Signed-off-by: kim --- internal/httpclient/client.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/internal/httpclient/client.go b/internal/httpclient/client.go index 3c5dedae35..44c6661def 100644 --- a/internal/httpclient/client.go +++ b/internal/httpclient/client.go @@ -83,7 +83,7 @@ type Config struct { // is available (context channels still respected) type Client struct { client http.Client - queue cache.Cache[string, chan struct{}] + queue cache.TTLCache[string, chan struct{}] bmax int64 // max response body size cmax int // max open conns per host } @@ -122,8 +122,8 @@ func New(cfg Config) *Client { // Prepare client fields c.bmax = cfg.MaxBodySize - c.queue = cache.New[string, chan struct{}]() c.client.Timeout = cfg.Timeout + c.queue.Init() // Start cache sweep routines c.queue.SetTTL(time.Minute*10, true) @@ -217,20 +217,19 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { // wait acquires the 'wait' queue for the given host string, or allocates new. func (c *Client) wait(host string) chan struct{} { + // Acquire cache lock + c.queue.Lock() + defer c.queue.Unlock() + // Look for an existing queue for host - queue, ok := c.queue.Get(host) + queue, ok := c.queue.GetUnsafe(host) if ok { return queue } // Allocate a new host queue queue = make(chan struct{}, c.cmax) - - // Attempt to cache this queue by host - if !c.queue.Put(host, queue) { - // Someone beat us to the punch... - queue, _ = c.queue.Get(host) - } + c.queue.Set(host, queue) return queue } From dc1b639a4bcb03f09200ff0e2edcf3d401483300 Mon Sep 17 00:00:00 2001 From: kim Date: Sun, 6 Nov 2022 22:08:18 +0000 Subject: [PATCH 3/9] use improved hashmap library Signed-off-by: kim --- go.mod | 2 +- internal/httpclient/client.go | 28 ++++++++++------------------ 2 files changed, 11 insertions(+), 19 deletions(-) diff --git a/go.mod b/go.mod index a677b517d1..396882d380 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( codeberg.org/gruf/go-store/v2 v2.0.5 github.com/buckket/go-blurhash v1.1.0 github.com/coreos/go-oidc/v3 v3.4.0 + github.com/cornelk/hashmap v1.0.8 github.com/disintegration/imaging v1.6.2 github.com/gin-contrib/cors v1.4.0 github.com/gin-contrib/gzip v0.0.6 @@ -67,7 +68,6 @@ require ( codeberg.org/gruf/go-pools v1.1.0 // indirect codeberg.org/gruf/go-sched v1.1.1 // indirect github.com/aymerick/douceur v0.2.0 // indirect - github.com/cornelk/hashmap v1.0.8 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dsoprea/go-exif/v3 v3.0.0-20210625224831-a6301f85c82b // indirect github.com/dsoprea/go-iptc v0.0.0-20200610044640-bc9ca208b413 // indirect diff --git a/internal/httpclient/client.go b/internal/httpclient/client.go index 44c6661def..04739bdf2b 100644 --- a/internal/httpclient/client.go +++ b/internal/httpclient/client.go @@ -28,7 +28,7 @@ import ( "time" "codeberg.org/gruf/go-bytesize" - "codeberg.org/gruf/go-cache/v2" + "github.com/cornelk/hashmap" ) // ErrInvalidRequest is returned if a given HTTP request is invalid and cannot be performed. @@ -83,7 +83,7 @@ type Config struct { // is available (context channels still respected) type Client struct { client http.Client - queue cache.TTLCache[string, chan struct{}] + queue *hashmap.Map[string, chan struct{}] bmax int64 // max response body size cmax int // max open conns per host } @@ -111,7 +111,7 @@ func New(cfg Config) *Client { if cfg.MaxBodySize <= 0 { // By default set this to a reasonable 40MB - cfg.MaxBodySize = 40 * bytesize.MiB + cfg.MaxBodySize = int64(40 * bytesize.MiB) } // Protect dialer with IP range sanitizer @@ -121,13 +121,10 @@ func New(cfg Config) *Client { }).Sanitize // Prepare client fields - c.bmax = cfg.MaxBodySize c.client.Timeout = cfg.Timeout - c.queue.Init() - - // Start cache sweep routines - c.queue.SetTTL(time.Minute*10, true) - _ = c.queue.Start(time.Second * 30) + c.cmax = cfg.MaxOpenConnsPerHost + c.bmax = cfg.MaxBodySize + c.queue = hashmap.New[string, chan struct{}]() // Set underlying HTTP client roundtripper c.client.Transport = &http.Transport{ @@ -217,19 +214,14 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { // wait acquires the 'wait' queue for the given host string, or allocates new. func (c *Client) wait(host string) chan struct{} { - // Acquire cache lock - c.queue.Lock() - defer c.queue.Unlock() - - // Look for an existing queue for host - queue, ok := c.queue.GetUnsafe(host) + // Look for an existing queue + queue, ok := c.queue.Get(host) if ok { return queue } - // Allocate a new host queue - queue = make(chan struct{}, c.cmax) - c.queue.Set(host, queue) + // Allocate a new host queue (or return a sneaky existing one). + queue, _ = c.queue.GetOrInsert(host, make(chan struct{}, c.cmax)) return queue } From e921ac52212d954e471b91e954eabcd302eea7b9 Mon Sep 17 00:00:00 2001 From: kim Date: Mon, 7 Nov 2022 20:00:51 +0000 Subject: [PATCH 4/9] add warn logging when request queues are full Signed-off-by: kim --- internal/concurrency/workers.go | 24 +++++++++++++++++++----- internal/httpclient/client.go | 31 ++++++++++++++++++++++++++----- 2 files changed, 45 insertions(+), 10 deletions(-) diff --git a/internal/concurrency/workers.go b/internal/concurrency/workers.go index 279a0c3c1c..544e6cb17f 100644 --- a/internal/concurrency/workers.go +++ b/internal/concurrency/workers.go @@ -26,6 +26,7 @@ import ( "reflect" "runtime" + "codeberg.org/gruf/go-kv" "codeberg.org/gruf/go-runners" "github.com/superseriousbusiness/gotosocial/internal/log" ) @@ -113,12 +114,25 @@ func (w *WorkerPool[MsgType]) SetProcessor(fn func(context.Context, MsgType) err // Queue will queue provided message to be processed with there's a free worker. func (w *WorkerPool[MsgType]) Queue(msg MsgType) { - log.Tracef("%s queueing message (queue=%d): %+v", - w.prefix, w.workers.Queue(), msg, - ) - w.workers.Enqueue(func(ctx context.Context) { + log.Tracef("%s queueing message: %+v", w.prefix, msg) + + // Create new process function for msg + process := func(ctx context.Context) { if err := w.process(ctx, msg); err != nil { log.Errorf("%s %v", w.prefix, err) } - }) + } + + // Attempt a fast-enqueue of process + if !w.workers.EnqueueNow(process) { + // No spot acquired, log warning + log.WithFields(kv.Fields{ + kv.Field{K: "type", V: w.prefix}, + kv.Field{K: "queue", V: w.workers.Queue()}, + kv.Field{K: "msg", V: msg}, + }...).Warn("full worker queue") + + // Block on enqueuing process func + w.workers.Enqueue(process) + } } diff --git a/internal/httpclient/client.go b/internal/httpclient/client.go index 04739bdf2b..8792e5b822 100644 --- a/internal/httpclient/client.go +++ b/internal/httpclient/client.go @@ -28,7 +28,9 @@ import ( "time" "codeberg.org/gruf/go-bytesize" + "codeberg.org/gruf/go-kv" "github.com/cornelk/hashmap" + "github.com/superseriousbusiness/gotosocial/internal/log" ) // ErrInvalidRequest is returned if a given HTTP request is invalid and cannot be performed. @@ -151,14 +153,13 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { // Get host's wait queue wait := c.wait(req.Host) - // ... and wait our turn + var ok bool + select { - case <-req.Context().Done(): - // the request was canceled before we - // got to our turn: no need to release - return nil, req.Context().Err() + // Quickly try grab a spot case wait <- struct{}{}: // it's our turn! + ok = true // NOTE: // Ideally here we would set the slot release to happen either @@ -171,6 +172,26 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) { // The current implementation will reduce the viability of denial of // service attacks, but if there are future issues heed this advice :] defer func() { <-wait }() + default: + } + + if !ok { + // No spot acquired, log warning + log.WithFields(kv.Fields{ + {K: "queue", V: len(wait)}, + {K: "method", V: req.Method}, + {K: "host", V: req.Host}, + {K: "uri", V: req.URL.RequestURI()}, + }...).Warn("full request queue") + + select { + case <-req.Context().Done(): + // the request was canceled before we + // got to our turn: no need to release + return nil, req.Context().Err() + case wait <- struct{}{}: + defer func() { <-wait }() + } } // Firstly, ensure this is a valid request From 511ad6edbe63316884243afecdf49771999d3887 Mon Sep 17 00:00:00 2001 From: kim Date: Mon, 7 Nov 2022 20:03:29 +0000 Subject: [PATCH 5/9] improve worker pool prefix var naming Signed-off-by: kim --- internal/concurrency/workers.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/internal/concurrency/workers.go b/internal/concurrency/workers.go index 544e6cb17f..a717654d74 100644 --- a/internal/concurrency/workers.go +++ b/internal/concurrency/workers.go @@ -36,7 +36,7 @@ type WorkerPool[MsgType any] struct { workers runners.WorkerPool process func(context.Context, MsgType) error nw, nq int - prefix string // contains type prefix for logging + wtype string // contains worker type for logging } // New returns a new WorkerPool[MsgType] with given number of workers and queue ratio, @@ -62,12 +62,12 @@ func NewWorkerPool[MsgType any](workers int, queueRatio int) *WorkerPool[MsgType process: nil, nw: workers, nq: workers * queueRatio, - prefix: fmt.Sprintf("worker.Worker[%s]", msgType), + wtype: fmt.Sprintf("worker.Worker[%s]", msgType), } - // Log new worker creation with type prefix + // Log new worker creation with worker type prefix log.Infof("%s created with workers=%d queue=%d", - w.prefix, + w.wtype, workers, workers*queueRatio, ) @@ -77,7 +77,7 @@ func NewWorkerPool[MsgType any](workers int, queueRatio int) *WorkerPool[MsgType // Start will attempt to start the underlying worker pool, or return error. func (w *WorkerPool[MsgType]) Start() error { - log.Infof("%s starting", w.prefix) + log.Infof("%s starting", w.wtype) // Check processor was set if w.process == nil { @@ -94,7 +94,7 @@ func (w *WorkerPool[MsgType]) Start() error { // Stop will attempt to stop the underlying worker pool, or return error. func (w *WorkerPool[MsgType]) Stop() error { - log.Infof("%s stopping", w.prefix) + log.Infof("%s stopping", w.wtype) // Attempt to stop pool if !w.workers.Stop() { @@ -107,19 +107,19 @@ func (w *WorkerPool[MsgType]) Stop() error { // SetProcessor will set the Worker's processor function, which is called for each queued message. func (w *WorkerPool[MsgType]) SetProcessor(fn func(context.Context, MsgType) error) { if w.process != nil { - log.Fatalf("%s Worker.process is already set", w.prefix) + log.Panicf("%s Worker.process is already set", w.wtype) } w.process = fn } // Queue will queue provided message to be processed with there's a free worker. func (w *WorkerPool[MsgType]) Queue(msg MsgType) { - log.Tracef("%s queueing message: %+v", w.prefix, msg) + log.Tracef("%s queueing message: %+v", w.wtype, msg) // Create new process function for msg process := func(ctx context.Context) { if err := w.process(ctx, msg); err != nil { - log.Errorf("%s %v", w.prefix, err) + log.Errorf("%s %v", w.wtype, err) } } @@ -127,7 +127,7 @@ func (w *WorkerPool[MsgType]) Queue(msg MsgType) { if !w.workers.EnqueueNow(process) { // No spot acquired, log warning log.WithFields(kv.Fields{ - kv.Field{K: "type", V: w.prefix}, + kv.Field{K: "type", V: w.wtype}, kv.Field{K: "queue", V: w.workers.Queue()}, kv.Field{K: "msg", V: msg}, }...).Warn("full worker queue") From f53605bfbdaae2fb9db61ebf17acf9cf18ac6f0e Mon Sep 17 00:00:00 2001 From: kim Date: Mon, 7 Nov 2022 20:05:33 +0000 Subject: [PATCH 6/9] improved worker pool error logging Signed-off-by: kim --- internal/concurrency/workers.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/concurrency/workers.go b/internal/concurrency/workers.go index a717654d74..bac7ea41d7 100644 --- a/internal/concurrency/workers.go +++ b/internal/concurrency/workers.go @@ -119,7 +119,10 @@ func (w *WorkerPool[MsgType]) Queue(msg MsgType) { // Create new process function for msg process := func(ctx context.Context) { if err := w.process(ctx, msg); err != nil { - log.Errorf("%s %v", w.wtype, err) + log.WithFields(kv.Fields{ + kv.Field{K: "type", V: w.wtype}, + kv.Field{K: "msg", V: msg}, + }...).Errorf("message processing error: %v", err) } } From 6eb404e6280e891db4c76a0f069f208a747eafd4 Mon Sep 17 00:00:00 2001 From: kim Date: Mon, 7 Nov 2022 20:05:50 +0000 Subject: [PATCH 7/9] move error message into separate field Signed-off-by: kim --- internal/concurrency/workers.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/concurrency/workers.go b/internal/concurrency/workers.go index bac7ea41d7..b6f0b83def 100644 --- a/internal/concurrency/workers.go +++ b/internal/concurrency/workers.go @@ -119,10 +119,13 @@ func (w *WorkerPool[MsgType]) Queue(msg MsgType) { // Create new process function for msg process := func(ctx context.Context) { if err := w.process(ctx, msg); err != nil { + log.Errorf("%s %v", w.wtype, err) + log.WithFields(kv.Fields{ kv.Field{K: "type", V: w.wtype}, + kv.Field{K: "error", V: err}, kv.Field{K: "msg", V: msg}, - }...).Errorf("message processing error: %v", err) + }...).Error("message processing error") } } From 7fee9e0b9b0b997c853dd5ddb1b09a2614794154 Mon Sep 17 00:00:00 2001 From: kim Date: Mon, 7 Nov 2022 20:06:09 +0000 Subject: [PATCH 8/9] remove old log statement Signed-off-by: kim --- internal/concurrency/workers.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/concurrency/workers.go b/internal/concurrency/workers.go index b6f0b83def..21e9f71800 100644 --- a/internal/concurrency/workers.go +++ b/internal/concurrency/workers.go @@ -119,8 +119,6 @@ func (w *WorkerPool[MsgType]) Queue(msg MsgType) { // Create new process function for msg process := func(ctx context.Context) { if err := w.process(ctx, msg); err != nil { - log.Errorf("%s %v", w.wtype, err) - log.WithFields(kv.Fields{ kv.Field{K: "type", V: w.wtype}, kv.Field{K: "error", V: err}, From ee001f13858dd95b707d612918ada13a76d296e6 Mon Sep 17 00:00:00 2001 From: kim Date: Mon, 7 Nov 2022 21:15:19 +0000 Subject: [PATCH 9/9] don't export worker message, it gets very spammy :') Signed-off-by: kim --- internal/concurrency/workers.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/concurrency/workers.go b/internal/concurrency/workers.go index 21e9f71800..377b9e0419 100644 --- a/internal/concurrency/workers.go +++ b/internal/concurrency/workers.go @@ -122,7 +122,6 @@ func (w *WorkerPool[MsgType]) Queue(msg MsgType) { log.WithFields(kv.Fields{ kv.Field{K: "type", V: w.wtype}, kv.Field{K: "error", V: err}, - kv.Field{K: "msg", V: msg}, }...).Error("message processing error") } } @@ -133,7 +132,6 @@ func (w *WorkerPool[MsgType]) Queue(msg MsgType) { log.WithFields(kv.Fields{ kv.Field{K: "type", V: w.wtype}, kv.Field{K: "queue", V: w.workers.Queue()}, - kv.Field{K: "msg", V: msg}, }...).Warn("full worker queue") // Block on enqueuing process func