Skip to content

Commit

Permalink
TestWorkerVariants
Browse files Browse the repository at this point in the history
  • Loading branch information
Dave Brophy committed Dec 3, 2017
1 parent 749d504 commit 567bec7
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 24 deletions.
22 changes: 7 additions & 15 deletions blaster/blaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,26 +764,13 @@ func (l *LoggingWriter) Debug() {

func (l *LoggingWriter) mustLen(t *testing.T, expected int) {
t.Helper()
var log [][]string
reader := csv.NewReader(bytes.NewBuffer(l.buf.Bytes()))
for {
r, err := reader.Read()
if err != nil {
if err == io.EOF {
break
} else {
fmt.Println(err.Error())
}
}
log = append(log, r)
}
log := l.All()
if expected != len(log) {
t.Fatalf("Log is not length %d:\n%v", expected, log)
}
}

func (l *LoggingWriter) must(t *testing.T, index int, expected []string) {
t.Helper()
func (l *LoggingWriter) All() [][]string {
var log [][]string
reader := csv.NewReader(bytes.NewBuffer(l.buf.Bytes()))
for {
Expand All @@ -797,7 +784,12 @@ func (l *LoggingWriter) must(t *testing.T, index int, expected []string) {
}
log = append(log, r)
}
return log
}

func (l *LoggingWriter) must(t *testing.T, index int, expected []string) {
t.Helper()
log := l.All()
record := log[index]
if len(record) == len(expected) {
found := true
Expand Down
4 changes: 2 additions & 2 deletions blaster/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func ExampleBlaster_Start_batchJob() {
defer b.Exit()
b.SetWorker(func() blaster.Worker {
return &blaster.ExampleWorker{
SendFunc: func(ctx context.Context, in map[string]interface{}) (map[string]interface{}, error) {
SendFunc: func(ctx context.Context, self *blaster.ExampleWorker, in map[string]interface{}) (map[string]interface{}, error) {
return map[string]interface{}{"status": 200}, nil
},
}
Expand All @@ -45,7 +45,7 @@ func ExampleBlaster_Start_loadTest() {
defer b.Exit()
b.SetWorker(func() blaster.Worker {
return &blaster.ExampleWorker{
SendFunc: func(ctx context.Context, in map[string]interface{}) (map[string]interface{}, error) {
SendFunc: func(ctx context.Context, self *blaster.ExampleWorker, in map[string]interface{}) (map[string]interface{}, error) {
return map[string]interface{}{"status": 200}, nil
},
}
Expand Down
13 changes: 7 additions & 6 deletions blaster/loop-worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,16 +208,17 @@ func stringify(v interface{}) string {

// ExampleWorker facilitates code examples by satisfying the Worker, Starter and Stopper interfaces with provided functions.
type ExampleWorker struct {
SendFunc func(ctx context.Context, in map[string]interface{}) (map[string]interface{}, error)
StartFunc func(ctx context.Context, payload map[string]interface{}) error
StopFunc func(ctx context.Context, payload map[string]interface{}) error
SendFunc func(ctx context.Context, self *ExampleWorker, in map[string]interface{}) (map[string]interface{}, error)
StartFunc func(ctx context.Context, self *ExampleWorker, payload map[string]interface{}) error
StopFunc func(ctx context.Context, self *ExampleWorker, payload map[string]interface{}) error
Local map[string]interface{}
}

// Send satisfies the Worker interface.
func (e *ExampleWorker) Send(ctx context.Context, in map[string]interface{}) (map[string]interface{}, error) {
// notest
if e.SendFunc != nil {
return e.SendFunc(ctx, in)
return e.SendFunc(ctx, e, in)
}
return nil, nil
}
Expand All @@ -226,7 +227,7 @@ func (e *ExampleWorker) Send(ctx context.Context, in map[string]interface{}) (ma
func (e *ExampleWorker) Start(ctx context.Context, payload map[string]interface{}) error {
// notest
if e.StartFunc != nil {
return e.StartFunc(ctx, payload)
return e.StartFunc(ctx, e, payload)
}
return nil
}
Expand All @@ -235,7 +236,7 @@ func (e *ExampleWorker) Start(ctx context.Context, payload map[string]interface{
func (e *ExampleWorker) Stop(ctx context.Context, payload map[string]interface{}) error {
// notest
if e.StopFunc != nil {
return e.StopFunc(ctx, payload)
return e.StopFunc(ctx, e, payload)
}
return nil
}
73 changes: 72 additions & 1 deletion blaster/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,77 @@ import (
"testing"
)

func TestWorkerVariants(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
b := New(ctx, cancel)
b.Rate = 0 // set rate to 0 so we can inject items synthetically
b.itemFinishedChannel = make(chan struct{})
b.Workers = 9

b.WorkerVariants = []map[string]string{
{"index": "1"},
{"index": "2"},
{"index": "3"},
}
b.SetWorkerTemplate(map[string]interface{}{
"index": "{{ .index }}",
})

b.SetWorker(func() Worker {
return &ExampleWorker{
StartFunc: func(ctx context.Context, self *ExampleWorker, payload map[string]interface{}) error {
if self.Local == nil {
self.Local = map[string]interface{}{}
}
self.Local["index"] = payload["index"]
return nil
},
SendFunc: func(ctx context.Context, self *ExampleWorker, in map[string]interface{}) (map[string]interface{}, error) {
return map[string]interface{}{"index": self.Local["index"]}, nil
},
}
})

log := &LoggingWriter{buf: new(bytes.Buffer)}
b.SetLog(log)

b.LogOutput = []string{"index"}

finished := make(chan error, 1)
go func() {
finished <- b.start(ctx)
}()

// synthetically call the main channel, which is what the ticker would do
for i := 0; i < 1000; i++ {
b.mainChannel <- 0
<-b.itemFinishedChannel
}

// another tick and the data will reach EOF, and gracefully exit
close(b.dataFinishedChannel)

// wait for the start method to finish
must(t, <-finished)

b.Exit()

all := map[interface{}]int{}
for _, value := range log.All() {
all[value[2]]++
}
if len(all) != 3 {
t.Fatal("Unexpected worker variants summary counts:", all)
}
if all["1"] > 500 || all["1"] < 200 ||
all["2"] > 500 || all["2"] < 200 ||
all["3"] > 500 || all["3"] < 200 {
t.Fatal("Unexpected worker variants summary counts:", all)
}

}

func TestDataLog(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -17,7 +88,7 @@ func TestDataLog(t *testing.T) {

b.SetWorker(func() Worker {
return &ExampleWorker{
SendFunc: func(ctx context.Context, in map[string]interface{}) (map[string]interface{}, error) {
SendFunc: func(ctx context.Context, self *ExampleWorker, in map[string]interface{}) (map[string]interface{}, error) {
return map[string]interface{}{
"d": fmt.Sprintf("d%s", in["index"]),
"e": fmt.Sprintf("e%s", in["index"]),
Expand Down

0 comments on commit 567bec7

Please sign in to comment.