From f6c67bd1d10219fe2a9604de61660e6dca0e49db Mon Sep 17 00:00:00 2001 From: Wayback Archiver <66856220+waybackarchiver@users.noreply.github.com> Date: Sun, 10 Jul 2022 14:13:51 +0000 Subject: [PATCH] Turns the pooling bucket into a non-pointer --- pooling/pooling.go | 17 +++++++++-------- pooling/pooling_test.go | 17 ++++++++++------- service/discord/discord.go | 2 +- service/httpd/httpd.go | 2 +- service/mastodon/mastodon.go | 2 +- service/matrix/matrix.go | 2 +- service/relaychat/relaychat.go | 2 +- service/slack/slack.go | 2 +- service/telegram/telegram.go | 2 +- service/twitter/twitter.go | 2 +- 10 files changed, 27 insertions(+), 23 deletions(-) diff --git a/pooling/pooling.go b/pooling/pooling.go index 430efa22..f980f3cd 100644 --- a/pooling/pooling.go +++ b/pooling/pooling.go @@ -55,7 +55,7 @@ type Bucket struct { elapsed uint64 // An object that will perform exactly one action. - once sync.Once + once *sync.Once } func newResource(id int) *resource { @@ -119,7 +119,7 @@ func (p *Pool) Roll() { continue } - if b := p.bucket(); b != nil { + if b, has := p.bucket(); has { go b.once.Do(func() { p.do(b) }) @@ -128,7 +128,7 @@ func (p *Pool) Roll() { } // Pub puts wayback requests to the resource pool -func (p *Pool) Put(b *Bucket) { +func (p *Pool) Put(b Bucket) { // Inserts a new bucket at the front of queue. p.mutex.Lock() p.staging.PushFront(b) @@ -166,7 +166,7 @@ func (p *Pool) push(r *resource) error { return nil } -func (p *Pool) do(b *Bucket) error { +func (p *Pool) do(b Bucket) error { atomic.AddInt32(&p.processing, 1) defer func() { atomic.AddInt32(&p.waiting, -1) @@ -223,13 +223,14 @@ func (p *Pool) do(b *Bucket) error { return nil } -func (p *Pool) bucket() *Bucket { +func (p *Pool) bucket() (b Bucket, ok bool) { p.mutex.Lock() defer p.mutex.Unlock() - if b, ok := p.staging.PopBack().(*Bucket); ok { - return b + if b, ok = p.staging.PopBack().(Bucket); ok { + b.once = new(sync.Once) + return b, ok } - return nil + return } diff --git a/pooling/pooling_test.go b/pooling/pooling_test.go index 8e20e4c3..9bef3a0e 100644 --- a/pooling/pooling_test.go +++ b/pooling/pooling_test.go @@ -7,6 +7,7 @@ package pooling // import "github.com/wabarc/wayback/pooling" import ( "context" "errors" + "sync/atomic" "testing" "time" @@ -39,7 +40,7 @@ func TestRoll(t *testing.T) { for i < capacity { ch := make(chan struct{}, 1) go func(i int) { - bucket := &Bucket{ + bucket := Bucket{ Request: func(_ context.Context) error { time.Sleep(time.Millisecond) return nil @@ -87,7 +88,7 @@ func TestTimeout(t *testing.T) { for i < capacity { ch := make(chan struct{}, 1) go func(i int) { - bucket := &Bucket{ + bucket := Bucket{ Request: func(_ context.Context) error { time.Sleep(10 * time.Millisecond) return nil @@ -122,8 +123,10 @@ func TestMaxRetries(t *testing.T) { } logger.SetLogLevel(logger.LevelFatal) - bucket := &Bucket{ + var elapsed uint64 + bucket := Bucket{ Request: func(_ context.Context) error { + atomic.AddUint64(&elapsed, 1) return errors.New("process request failed") }, Fallback: func(_ context.Context) error { @@ -133,13 +136,13 @@ func TestMaxRetries(t *testing.T) { maxRetries := uint64(3) p := New(context.Background(), 1) - p.timeout = time.Microsecond + p.timeout = time.Second p.maxRetries = maxRetries go p.Roll() p.Put(bucket) p.Close() - if bucket.elapsed != maxRetries { - t.Fatalf("Unexpected max retries got %d instead of %d", bucket.elapsed, maxRetries) + if elapsed != maxRetries { + t.Fatalf("Unexpected max retries got %d instead of %d", elapsed, maxRetries) } } @@ -155,7 +158,7 @@ func TestFallback(t *testing.T) { want := "foo" fall := "" - bucket := &Bucket{ + bucket := Bucket{ Request: func(_ context.Context) error { return errors.New("some error") }, diff --git a/service/discord/discord.go b/service/discord/discord.go index c27696a3..e1a739a5 100644 --- a/service/discord/discord.go +++ b/service/discord/discord.go @@ -253,7 +253,7 @@ func (d *Discord) process(m *discord.MessageCreate) (err error) { logger.Error("reply queue failed: %v", err) return } - bucket := &pooling.Bucket{ + bucket := pooling.Bucket{ Request: func(ctx context.Context) error { logger.Debug("content: %v", urls) if err := d.wayback(ctx, m, urls); err != nil { diff --git a/service/httpd/httpd.go b/service/httpd/httpd.go index 5a634c56..58ce273f 100644 --- a/service/httpd/httpd.go +++ b/service/httpd/httpd.go @@ -57,7 +57,7 @@ func (web *web) handle(pool *pooling.Pool) http.Handler { web.router.HandleFunc("/offline.html", web.showOfflinePage).Methods(http.MethodGet) web.router.HandleFunc("/wayback", func(w http.ResponseWriter, r *http.Request) { - bucket := &pooling.Bucket{ + bucket := pooling.Bucket{ Request: func(ctx context.Context) error { if err := web.process(ctx, w, r); err != nil { logger.Error("httpd: process retrying: %v", err) diff --git a/service/mastodon/mastodon.go b/service/mastodon/mastodon.go index 570a07c1..d36ceb9d 100644 --- a/service/mastodon/mastodon.go +++ b/service/mastodon/mastodon.go @@ -136,7 +136,7 @@ func (m *Mastodon) Serve() error { m.archiving[n.Status.ID] = true m.Unlock() metrics.IncrementWayback(metrics.ServiceMastodon, metrics.StatusRequest) - bucket := &pooling.Bucket{ + bucket := pooling.Bucket{ Request: func(ctx context.Context) error { if err := m.process(ctx, n.ID, n.Status); err != nil { logger.Error("process failure, notification: %#v, error: %v", n, err) diff --git a/service/matrix/matrix.go b/service/matrix/matrix.go index c22f9fd4..58d301cd 100644 --- a/service/matrix/matrix.go +++ b/service/matrix/matrix.go @@ -106,7 +106,7 @@ func (m *Matrix) Serve() error { return } metrics.IncrementWayback(metrics.ServiceMatrix, metrics.StatusRequest) - bucket := &pooling.Bucket{ + bucket := pooling.Bucket{ Request: func(ctx context.Context) error { if err := m.process(ctx, ev); err != nil { logger.Error("process request failure, error: %v", err) diff --git a/service/relaychat/relaychat.go b/service/relaychat/relaychat.go index 8ae4b7eb..dbf2ab69 100644 --- a/service/relaychat/relaychat.go +++ b/service/relaychat/relaychat.go @@ -82,7 +82,7 @@ func (i *IRC) Serve() error { i.conn.AddCallback("PRIVMSG", func(ev *irc.Event) { go func(ev *irc.Event) { metrics.IncrementWayback(metrics.ServiceIRC, metrics.StatusRequest) - bucket := &pooling.Bucket{ + bucket := pooling.Bucket{ Request: func(ctx context.Context) error { if err := i.process(ctx, ev); err != nil { logger.Error("process failure, message: %s, error: %v", ev.Message(), err) diff --git a/service/slack/slack.go b/service/slack/slack.go index 3d3e0981..1616244e 100644 --- a/service/slack/slack.go +++ b/service/slack/slack.go @@ -270,7 +270,7 @@ func (s *Slack) process(ev *event) (err error) { logger.Error("reply queue failed: %v", err) return } - bucket := &pooling.Bucket{ + bucket := pooling.Bucket{ Request: func(ctx context.Context) error { if err := s.wayback(ctx, ev, urls); err != nil { logger.Error("archives failed: %v", err) diff --git a/service/telegram/telegram.go b/service/telegram/telegram.go index 125f240e..1491a58f 100644 --- a/service/telegram/telegram.go +++ b/service/telegram/telegram.go @@ -227,7 +227,7 @@ func (t *Telegram) process(message *telegram.Message) (err error) { if err != nil { return errors.Wrap(err, "reply message failed") } - bucket := &pooling.Bucket{ + bucket := pooling.Bucket{ Request: func(ctx context.Context) error { _, err := t.bot.Edit(request, "Archiving...") if err != nil && err != telegram.ErrSameMessageContent { diff --git a/service/twitter/twitter.go b/service/twitter/twitter.go index 09088f36..e6a4ec17 100644 --- a/service/twitter/twitter.go +++ b/service/twitter/twitter.go @@ -104,7 +104,7 @@ func (t *Twitter) Serve() error { } go func(event twitter.DirectMessageEvent) { metrics.IncrementWayback(metrics.ServiceTwitter, metrics.StatusRequest) - bucket := &pooling.Bucket{ + bucket := pooling.Bucket{ Request: func(ctx context.Context) error { if err := t.process(ctx, event); err != nil { logger.Error("process failure, message: %#v, error: %v", event.Message, err)