diff --git a/.gitignore b/.gitignore index 07d0ff6..64c9713 100644 --- a/.gitignore +++ b/.gitignore @@ -28,4 +28,5 @@ _testmain.go .DS_Store .idea/ coverage.out -out.log \ No newline at end of file +out.log +data-long.csv \ No newline at end of file diff --git a/config.go b/config.go index 13959d9..4d0fbdd 100644 --- a/config.go +++ b/config.go @@ -168,5 +168,9 @@ func (b *Blaster) loadConfigViper() error { b.softTimeout = time.Duration(b.config.Timeout) * time.Millisecond b.hardTimeout = time.Duration(b.config.Timeout+500) * time.Millisecond + if b.config.Resume && b.config.Repeat { + panic("Can't use repeat and resume at the same time!") + } + return nil } diff --git a/dummyworker/dummyworker.go b/dummyworker/dummyworker.go index 7e519f5..e35753d 100644 --- a/dummyworker/dummyworker.go +++ b/dummyworker/dummyworker.go @@ -59,9 +59,9 @@ func (w *Worker) Send(ctx context.Context, raw map[string]interface{}) (map[stri // Dummy worker - return an error sometimes errorrand := r.Float64() - if errorrand > 0.95 { + if errorrand > 0.99 { return map[string]interface{}{"status": 500}, errors.New("Error 500") - } else if errorrand > 0.7 { + } else if errorrand > 0.96 { return map[string]interface{}{"status": 404}, errors.New("Error 404") } else { return map[string]interface{}{"status": 200}, nil diff --git a/loop-main.go b/loop-main.go index 650efcd..4b438f1 100644 --- a/loop-main.go +++ b/loop-main.go @@ -5,6 +5,11 @@ import ( "fmt" "io" + "encoding/json" + + "sync/atomic" + + "github.com/leemcloughlin/gofarmhash" "github.com/pkg/errors" ) @@ -41,7 +46,47 @@ func (b *Blaster) startMainLoop(ctx context.Context) { b.error(errors.WithStack(err)) return } - b.workerChannel <- workDef{Record: record} + + skipped := true + + for _, payloadVariantData := range b.config.PayloadVariants { + + // Build the full data map that will be passed to the worker + data := map[string]string{} + for i, k := range b.dataHeaders { + // Add data from the CSV data source + data[k] = record[i] + } + for k, v := range payloadVariantData { + // Add data from the payload-variants config + data[k] = v + } + + // Calculate the hash of the incoming data + j, err := json.Marshal(data) + if err != nil { + b.error(errors.WithStack(err)) + return + } + hash := farmhash.Hash128(j) + + // In resume mode, check to see if the hash occurred in a previous run + // (skip only contains successful requests from previous runs). + if b.config.Resume { + if _, skip := b.skip[hash]; skip { + atomic.AddUint64(&b.stats.requestsSkipped, 1) + continue + } + } + + skipped = false + + b.workerChannel <- workDef{Data: data, Hash: hash} + } + if skipped { + // if we've skipped all variants, continue with the next item immediately + continue + } break } } @@ -50,5 +95,6 @@ func (b *Blaster) startMainLoop(ctx context.Context) { } type workDef struct { - Record []string + Data map[string]string + Hash farmhash.Uint128 } diff --git a/loop-worker.go b/loop-worker.go index 7c5667f..e5baa18 100644 --- a/loop-worker.go +++ b/loop-worker.go @@ -12,7 +12,6 @@ import ( "encoding/json" - "github.com/leemcloughlin/gofarmhash" "github.com/pkg/errors" ) @@ -20,10 +19,10 @@ func (b *Blaster) startWorkers(ctx context.Context) { for i := 0; i < b.config.Workers; i++ { // assign rotated vars from config - workerVariationData := map[string]string{} + workerVariantData := map[string]string{} if b.config.WorkerVariants != nil { for k, v := range b.config.WorkerVariants[i%len(b.config.WorkerVariants)] { - workerVariationData[k] = v + workerVariantData[k] = v } } @@ -35,7 +34,7 @@ func (b *Blaster) startWorkers(ctx context.Context) { w := workerFunc() if s, ok := w.(Starter); ok { - workerSetup := replaceMap(b.config.WorkerTemplate, workerVariationData) + workerSetup := replaceMap(b.config.WorkerTemplate, workerVariantData) if err := s.Start(ctx, workerSetup); err != nil { b.error(errors.WithStack(err)) return @@ -47,7 +46,7 @@ func (b *Blaster) startWorkers(ctx context.Context) { defer b.workerWait.Done() defer func() { if s, ok := w.(Stopper); ok { - workerSetup := replaceMap(b.config.WorkerTemplate, workerVariationData) + workerSetup := replaceMap(b.config.WorkerTemplate, workerVariantData) if err := s.Stop(ctx, workerSetup); err != nil { b.error(errors.WithStack(err)) return @@ -63,50 +62,17 @@ func (b *Blaster) startWorkers(ctx context.Context) { // exit gracefully return case work := <-b.workerChannel: - for _, payloadVariationData := range b.config.PayloadVariants { - atomic.AddInt64(&b.stats.workersBusy, 1) - b.send(ctx, w, workerVariationData, work, payloadVariationData) - atomic.AddInt64(&b.stats.workersBusy, -1) - } + b.send(ctx, w, work) } } }(i) } } -func (b *Blaster) send(ctx context.Context, w Worker, workerVariantData map[string]string, work workDef, payloadVariantData map[string]string) { - - // Build the full data map that will be passed to the worker - data := map[string]string{} - for k, v := range workerVariantData { - // Add data from worker-variants config - data[k] = v - } - for i, k := range b.dataHeaders { - // Add data from the CSV data source - data[k] = work.Record[i] - } - for k, v := range payloadVariantData { - // Add data from the payload-variants config - data[k] = v - } - - // Calculate the hash of th incoming data - j, err := json.Marshal(data) - if err != nil { - b.error(errors.WithStack(err)) - return - } - hash := farmhash.Hash128(j) +func (b *Blaster) send(ctx context.Context, w Worker, work workDef) { - // In resume mode, check to see if the hash occurred in a previous run (skip only contains - // successful requests from previous runs). - if b.config.Resume { - if _, skip := b.skip[hash]; skip { - atomic.AddUint64(&b.stats.requestsSkipped, 1) - return - } - } + atomic.AddInt64(&b.stats.workersBusy, 1) + defer atomic.AddInt64(&b.stats.workersBusy, -1) // Count the started request atomic.AddUint64(&b.stats.requestsStarted, 1) @@ -115,10 +81,7 @@ func (b *Blaster) send(ctx context.Context, w Worker, workerVariantData map[stri start := time.Now() // Render the payload template with the data generated above - renderedTemplate := replaceMap(b.config.PayloadTemplate, data) - - success := true - var out map[string]interface{} + renderedTemplate := replaceMap(b.config.PayloadTemplate, work.Data) // Create a child context with the selected timeout child, cancel := context.WithTimeout(ctx, b.softTimeout) @@ -126,6 +89,9 @@ func (b *Blaster) send(ctx context.Context, w Worker, workerVariantData map[stri finished := make(chan struct{}) + success := true + var out map[string]interface{} + var err error go func() { out, err = w.Send(child, renderedTemplate) if err != nil { @@ -186,7 +152,7 @@ func (b *Blaster) send(ctx context.Context, w Worker, workerVariantData map[stri var fields []string for _, key := range b.config.LogData { var val string - if v, ok := data[key]; ok { + if v, ok := work.Data[key]; ok { val = v } fields = append(fields, val) @@ -202,7 +168,7 @@ func (b *Blaster) send(ctx context.Context, w Worker, workerVariantData map[stri } lr := logRecord{ - Hash: hash, + Hash: work.Hash, Result: success, Fields: fields, }