Skip to content

Commit

Permalink
fix: memory issues in load test
Browse files Browse the repository at this point in the history
  • Loading branch information
abelanger5 committed Feb 5, 2025
1 parent e65e537 commit da444fb
Showing 1 changed file with 50 additions and 17 deletions.
67 changes: 50 additions & 17 deletions examples/loadtest/cli/do.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import (
"time"
)

type avgResult struct {
count int64
avg time.Duration
}

func do(duration time.Duration, eventsPerSecond int, delay time.Duration, wait time.Duration, concurrency int, workerDelay time.Duration, slots int, failureRate float32, payloadSize string) error {
l.Info().Msgf("testing with duration=%s, eventsPerSecond=%d, delay=%s, wait=%s, concurrency=%d", duration, eventsPerSecond, delay, wait, concurrency)

Expand All @@ -21,45 +26,73 @@ func do(duration time.Duration, eventsPerSecond int, delay time.Duration, wait t
}()

ch := make(chan int64, 2)
durations := make(chan time.Duration, eventsPerSecond*int(duration.Seconds())*3)
durations := make(chan time.Duration, eventsPerSecond)

// Compute running average for executed durations using a rolling average.
durationsResult := make(chan avgResult)
go func() {
if workerDelay.Seconds() > 0 {
var count int64
var avg time.Duration
for d := range durations {
count++
if count == 1 {
avg = d
} else {
avg = avg + (d-avg)/time.Duration(count)
}
}
durationsResult <- avgResult{count: count, avg: avg}
}()

go func() {
if workerDelay > 0 {
l.Info().Msgf("wait %s before starting the worker", workerDelay)
time.Sleep(workerDelay)
}
l.Info().Msg("starting worker now")
count, uniques := run(ctx, delay, durations, concurrency, slots, failureRate)
close(durations)
ch <- count
ch <- uniques
}()

time.Sleep(after)

scheduled := make(chan time.Duration, eventsPerSecond*int(duration.Seconds())*2)
scheduled := make(chan time.Duration, eventsPerSecond)

// Compute running average for scheduled times using a rolling average.
scheduledResult := make(chan avgResult)
go func() {
var count int64
var avg time.Duration
for d := range scheduled {
count++
if count == 1 {
avg = d
} else {
avg = avg + (d-avg)/time.Duration(count)
}
}
scheduledResult <- avgResult{count: count, avg: avg}
}()

emitted := emit(ctx, eventsPerSecond, duration, scheduled, payloadSize)
close(scheduled)

executed := <-ch
uniques := <-ch

finalDurationResult := <-durationsResult
finalScheduledResult := <-scheduledResult

log.Printf("ℹ️ emitted %d, executed %d, uniques %d, using %d events/s", emitted, executed, uniques, eventsPerSecond)

if executed == 0 {
return fmt.Errorf("❌ no events executed")
}

var totalDurationExecuted time.Duration
for i := 0; i < int(executed); i++ {
totalDurationExecuted += <-durations
}
durationPerEventExecuted := totalDurationExecuted / time.Duration(executed)
log.Printf("ℹ️ average duration per executed event: %s", durationPerEventExecuted)

var totalDurationScheduled time.Duration
for i := 0; i < int(emitted); i++ {
totalDurationScheduled += <-scheduled
}
scheduleTimePerEvent := totalDurationScheduled / time.Duration(emitted)

log.Printf("ℹ️ average scheduling time per event: %s", scheduleTimePerEvent)
log.Printf("ℹ️ final average duration per executed event: %s", finalDurationResult.avg)
log.Printf("ℹ️ final average scheduling time per event: %s", finalScheduledResult.avg)

if emitted != executed {
log.Printf("⚠️ warning: emitted and executed counts do not match: %d != %d", emitted, executed)
Expand Down

0 comments on commit da444fb

Please sign in to comment.