Skip to content

Commit

Permalink
Fixed slow resuming
Browse files Browse the repository at this point in the history
  • Loading branch information
dave committed Oct 29, 2017
1 parent 1a330e8 commit 448fded
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 53 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ _testmain.go
.DS_Store
.idea/
coverage.out
out.log
out.log
data-long.csv
4 changes: 4 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,5 +168,9 @@ func (b *Blaster) loadConfigViper() error {
b.softTimeout = time.Duration(b.config.Timeout) * time.Millisecond
b.hardTimeout = time.Duration(b.config.Timeout+500) * time.Millisecond

if b.config.Resume && b.config.Repeat {
panic("Can't use repeat and resume at the same time!")
}

return nil
}
4 changes: 2 additions & 2 deletions dummyworker/dummyworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ func (w *Worker) Send(ctx context.Context, raw map[string]interface{}) (map[stri

// Dummy worker - return an error sometimes
errorrand := r.Float64()
if errorrand > 0.95 {
if errorrand > 0.99 {
return map[string]interface{}{"status": 500}, errors.New("Error 500")
} else if errorrand > 0.7 {
} else if errorrand > 0.96 {
return map[string]interface{}{"status": 404}, errors.New("Error 404")
} else {
return map[string]interface{}{"status": 200}, nil
Expand Down
50 changes: 48 additions & 2 deletions loop-main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import (
"fmt"
"io"

"encoding/json"

"sync/atomic"

"github.com/leemcloughlin/gofarmhash"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -41,7 +46,47 @@ func (b *Blaster) startMainLoop(ctx context.Context) {
b.error(errors.WithStack(err))
return
}
b.workerChannel <- workDef{Record: record}

skipped := true

for _, payloadVariantData := range b.config.PayloadVariants {

// Build the full data map that will be passed to the worker
data := map[string]string{}
for i, k := range b.dataHeaders {
// Add data from the CSV data source
data[k] = record[i]
}
for k, v := range payloadVariantData {
// Add data from the payload-variants config
data[k] = v
}

// Calculate the hash of the incoming data
j, err := json.Marshal(data)
if err != nil {
b.error(errors.WithStack(err))
return
}
hash := farmhash.Hash128(j)

// In resume mode, check to see if the hash occurred in a previous run
// (skip only contains successful requests from previous runs).
if b.config.Resume {
if _, skip := b.skip[hash]; skip {
atomic.AddUint64(&b.stats.requestsSkipped, 1)
continue
}
}

skipped = false

b.workerChannel <- workDef{Data: data, Hash: hash}
}
if skipped {
// if we've skipped all variants, continue with the next item immediately
continue
}
break
}
}
Expand All @@ -50,5 +95,6 @@ func (b *Blaster) startMainLoop(ctx context.Context) {
}

type workDef struct {
Record []string
Data map[string]string
Hash farmhash.Uint128
}
62 changes: 14 additions & 48 deletions loop-worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,17 @@ import (

"encoding/json"

"github.com/leemcloughlin/gofarmhash"
"github.com/pkg/errors"
)

func (b *Blaster) startWorkers(ctx context.Context) {
for i := 0; i < b.config.Workers; i++ {

// assign rotated vars from config
workerVariationData := map[string]string{}
workerVariantData := map[string]string{}
if b.config.WorkerVariants != nil {
for k, v := range b.config.WorkerVariants[i%len(b.config.WorkerVariants)] {
workerVariationData[k] = v
workerVariantData[k] = v
}
}

Expand All @@ -35,7 +34,7 @@ func (b *Blaster) startWorkers(ctx context.Context) {
w := workerFunc()

if s, ok := w.(Starter); ok {
workerSetup := replaceMap(b.config.WorkerTemplate, workerVariationData)
workerSetup := replaceMap(b.config.WorkerTemplate, workerVariantData)
if err := s.Start(ctx, workerSetup); err != nil {
b.error(errors.WithStack(err))
return
Expand All @@ -47,7 +46,7 @@ func (b *Blaster) startWorkers(ctx context.Context) {
defer b.workerWait.Done()
defer func() {
if s, ok := w.(Stopper); ok {
workerSetup := replaceMap(b.config.WorkerTemplate, workerVariationData)
workerSetup := replaceMap(b.config.WorkerTemplate, workerVariantData)
if err := s.Stop(ctx, workerSetup); err != nil {
b.error(errors.WithStack(err))
return
Expand All @@ -63,50 +62,17 @@ func (b *Blaster) startWorkers(ctx context.Context) {
// exit gracefully
return
case work := <-b.workerChannel:
for _, payloadVariationData := range b.config.PayloadVariants {
atomic.AddInt64(&b.stats.workersBusy, 1)
b.send(ctx, w, workerVariationData, work, payloadVariationData)
atomic.AddInt64(&b.stats.workersBusy, -1)
}
b.send(ctx, w, work)
}
}
}(i)
}
}

func (b *Blaster) send(ctx context.Context, w Worker, workerVariantData map[string]string, work workDef, payloadVariantData map[string]string) {

// Build the full data map that will be passed to the worker
data := map[string]string{}
for k, v := range workerVariantData {
// Add data from worker-variants config
data[k] = v
}
for i, k := range b.dataHeaders {
// Add data from the CSV data source
data[k] = work.Record[i]
}
for k, v := range payloadVariantData {
// Add data from the payload-variants config
data[k] = v
}

// Calculate the hash of th incoming data
j, err := json.Marshal(data)
if err != nil {
b.error(errors.WithStack(err))
return
}
hash := farmhash.Hash128(j)
func (b *Blaster) send(ctx context.Context, w Worker, work workDef) {

// In resume mode, check to see if the hash occurred in a previous run (skip only contains
// successful requests from previous runs).
if b.config.Resume {
if _, skip := b.skip[hash]; skip {
atomic.AddUint64(&b.stats.requestsSkipped, 1)
return
}
}
atomic.AddInt64(&b.stats.workersBusy, 1)
defer atomic.AddInt64(&b.stats.workersBusy, -1)

// Count the started request
atomic.AddUint64(&b.stats.requestsStarted, 1)
Expand All @@ -115,17 +81,17 @@ func (b *Blaster) send(ctx context.Context, w Worker, workerVariantData map[stri
start := time.Now()

// Render the payload template with the data generated above
renderedTemplate := replaceMap(b.config.PayloadTemplate, data)

success := true
var out map[string]interface{}
renderedTemplate := replaceMap(b.config.PayloadTemplate, work.Data)

// Create a child context with the selected timeout
child, cancel := context.WithTimeout(ctx, b.softTimeout)
defer cancel()

finished := make(chan struct{})

success := true
var out map[string]interface{}
var err error
go func() {
out, err = w.Send(child, renderedTemplate)
if err != nil {
Expand Down Expand Up @@ -186,7 +152,7 @@ func (b *Blaster) send(ctx context.Context, w Worker, workerVariantData map[stri
var fields []string
for _, key := range b.config.LogData {
var val string
if v, ok := data[key]; ok {
if v, ok := work.Data[key]; ok {
val = v
}
fields = append(fields, val)
Expand All @@ -202,7 +168,7 @@ func (b *Blaster) send(ctx context.Context, w Worker, workerVariantData map[stri
}

lr := logRecord{
Hash: hash,
Hash: work.Hash,
Result: success,
Fields: fields,
}
Expand Down

0 comments on commit 448fded

Please sign in to comment.