Skip to content

Commit

Permalink
wip: rate limited logger
Browse files Browse the repository at this point in the history
  • Loading branch information
AndersonQ committed Aug 1, 2024
1 parent 1160ccd commit 0e24ee6
Show file tree
Hide file tree
Showing 2 changed files with 208 additions and 0 deletions.
96 changes: 96 additions & 0 deletions logp/ratelimited.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package logp

import (
"sync"
"sync/atomic"
"time"
)

// RateLimitedLogger is a logger that limits log messages to once within a
// specified period.
// It is intended for logging events that occur frequently, providing a summary
// with the number of occurrences within the given interval.
//
// RateLimitedLogger takes a logger function, logFn, which is called every time
// the specified period has elapsed.
type RateLimitedLogger struct {
count atomic.Uint64

period time.Duration

// logFn is called for logging, which receives the count of events and the
// duration since the last call.
logFn func(count uint64, d time.Duration)
lastLog time.Time
done chan struct{}

nowFn func() time.Time
started atomic.Bool
wg sync.WaitGroup
ticker *time.Ticker
}

// NewRateLimited returns a new RateLimitedLogger. It takes a logFn, which is
// called with // the count of events and the time elapsed since the last call,
// and a period determining how often the log function should be called.
func NewRateLimited(
logFn func(count uint64, d time.Duration), period time.Duration) *RateLimitedLogger {
return &RateLimitedLogger{
period: period,
logFn: logFn,

nowFn: time.Now,
}
}

func (r *RateLimitedLogger) Add() {
r.count.Add(1)
}

func (r *RateLimitedLogger) AddN(n uint64) {
r.count.Add(n)
}

func (r *RateLimitedLogger) Start() {
if r.started.Load() {
return
}

r.done = make(chan struct{})
r.started.Store(true)
r.lastLog = r.nowFn()
r.ticker = time.NewTicker(r.period)

r.wg.Add(1)
go func() {
defer r.wg.Done()

defer r.ticker.Stop()

for {
select {
case now := <-r.ticker.C:
r.log(now)
case <-r.done:
r.log(r.nowFn())
return
}
}
}()
}

func (r *RateLimitedLogger) Stop() {
close(r.done)
r.wg.Wait()
r.started.Store(false)
}

func (r *RateLimitedLogger) log(now time.Time) {
count := r.count.Swap(0)
if count > 0 {
elapsed := now.Sub(r.lastLog)

r.lastLog = now
r.logFn(count, elapsed)
}
}
112 changes: 112 additions & 0 deletions logp/ratelimited_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package logp

import (
"fmt"
"math"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestRateLimitedLogger(t *testing.T) {
log, buff := NewInMemory("RateLimitedLogger", ConsoleEncoderConfig())

pattern := "%d occurrences in the last %s"
logFn := func(count uint64, d time.Duration) {
log.Infof(pattern, count, d)
}

now := time.Now()

t.Run("Start", func(t *testing.T) {
r := NewRateLimited(logFn, math.MaxInt64)
defer r.Stop()
r.nowFn = func() time.Time { return now }

r.Start()
tch := make(chan time.Time)
r.ticker.C = tch

assert.True(t, r.started.Load(),
"Start() was called, thus 'started' should be true")
assert.NotEmpty(t, r.lastLog, "lastLog should have been set")
})

t.Run("Start twice", func(t *testing.T) {
r := NewRateLimited(logFn, math.MaxInt64)
defer r.Stop()

r.nowFn = func() time.Time { return now }

r.Start()
r.nowFn = func() time.Time { return now.Add(time.Minute) }
r.Start()

assert.Equal(t, now, r.lastLog, "lastLog should have been set a second time")
})

t.Run("Stop", func(t *testing.T) {
r := NewRateLimited(logFn, math.MaxInt64)
r.nowFn = func() time.Time { return now }

r.Start()
tch := make(chan time.Time)
r.ticker.C = tch

r.nowFn = func() time.Time { return now.Add(42 * time.Second) }

r.count.Add(1)
r.Stop()

got := strings.TrimSpace(buff.String())

assert.False(t, r.started.Load(),
"Stop() was called, thus 'started' should be false")
assert.Len(t, strings.Split(got, "\n"), 1)
assert.Contains(t, buff.String(), fmt.Sprintf(pattern, 1, 42*time.Second))
})

t.Run("Add", func(t *testing.T) {
r := NewRateLimited(logFn, math.MaxInt64)
defer r.Stop()
r.nowFn = func() time.Time { return now }

r.Start()
tch := make(chan time.Time)
r.ticker.C = tch

r.Add()

tch <- now.Add(10 * time.Second)

assert.Eventually(t, func() bool {
return buff.Len() != 0
}, 10*time.Millisecond, time.Millisecond)

assert.Contains(t, buff.String(), fmt.Sprintf(pattern, 1, 10*time.Second))
})

t.Run("AddN", func(t *testing.T) {
r := NewRateLimited(logFn, math.MaxInt64)
defer r.Stop()
r.nowFn = func() time.Time { return now }

n := 100
r.Start()
tch := make(chan time.Time)
r.ticker.C = tch

r.AddN(uint64(n))

tch <- now.Add(10 * time.Second)

assert.Eventually(t, func() bool {
return buff.Len() != 0
}, 10*time.Millisecond, time.Millisecond)

assert.Contains(t, buff.String(), fmt.Sprintf(pattern, n, 10*time.Second))
})

}

0 comments on commit 0e24ee6

Please sign in to comment.