From 567bec7b44a4780eb9915a9cbe00b11ed20522f1 Mon Sep 17 00:00:00 2001 From: Dave Brophy Date: Sun, 3 Dec 2017 15:50:11 -0800 Subject: [PATCH] TestWorkerVariants --- blaster/blaster_test.go | 22 ++++-------- blaster/examples_test.go | 4 +-- blaster/loop-worker.go | 13 +++---- blaster/worker_test.go | 73 +++++++++++++++++++++++++++++++++++++++- 4 files changed, 88 insertions(+), 24 deletions(-) diff --git a/blaster/blaster_test.go b/blaster/blaster_test.go index 02bf36c..864f8a4 100644 --- a/blaster/blaster_test.go +++ b/blaster/blaster_test.go @@ -764,26 +764,13 @@ func (l *LoggingWriter) Debug() { func (l *LoggingWriter) mustLen(t *testing.T, expected int) { t.Helper() - var log [][]string - reader := csv.NewReader(bytes.NewBuffer(l.buf.Bytes())) - for { - r, err := reader.Read() - if err != nil { - if err == io.EOF { - break - } else { - fmt.Println(err.Error()) - } - } - log = append(log, r) - } + log := l.All() if expected != len(log) { t.Fatalf("Log is not length %d:\n%v", expected, log) } } -func (l *LoggingWriter) must(t *testing.T, index int, expected []string) { - t.Helper() +func (l *LoggingWriter) All() [][]string { var log [][]string reader := csv.NewReader(bytes.NewBuffer(l.buf.Bytes())) for { @@ -797,7 +784,12 @@ func (l *LoggingWriter) must(t *testing.T, index int, expected []string) { } log = append(log, r) } + return log +} +func (l *LoggingWriter) must(t *testing.T, index int, expected []string) { + t.Helper() + log := l.All() record := log[index] if len(record) == len(expected) { found := true diff --git a/blaster/examples_test.go b/blaster/examples_test.go index 46e7a13..33bc1a9 100644 --- a/blaster/examples_test.go +++ b/blaster/examples_test.go @@ -20,7 +20,7 @@ func ExampleBlaster_Start_batchJob() { defer b.Exit() b.SetWorker(func() blaster.Worker { return &blaster.ExampleWorker{ - SendFunc: func(ctx context.Context, in map[string]interface{}) (map[string]interface{}, error) { + SendFunc: func(ctx context.Context, self *blaster.ExampleWorker, in map[string]interface{}) (map[string]interface{}, error) { return map[string]interface{}{"status": 200}, nil }, } @@ -45,7 +45,7 @@ func ExampleBlaster_Start_loadTest() { defer b.Exit() b.SetWorker(func() blaster.Worker { return &blaster.ExampleWorker{ - SendFunc: func(ctx context.Context, in map[string]interface{}) (map[string]interface{}, error) { + SendFunc: func(ctx context.Context, self *blaster.ExampleWorker, in map[string]interface{}) (map[string]interface{}, error) { return map[string]interface{}{"status": 200}, nil }, } diff --git a/blaster/loop-worker.go b/blaster/loop-worker.go index 5b04574..6cdb139 100644 --- a/blaster/loop-worker.go +++ b/blaster/loop-worker.go @@ -208,16 +208,17 @@ func stringify(v interface{}) string { // ExampleWorker facilitates code examples by satisfying the Worker, Starter and Stopper interfaces with provided functions. type ExampleWorker struct { - SendFunc func(ctx context.Context, in map[string]interface{}) (map[string]interface{}, error) - StartFunc func(ctx context.Context, payload map[string]interface{}) error - StopFunc func(ctx context.Context, payload map[string]interface{}) error + SendFunc func(ctx context.Context, self *ExampleWorker, in map[string]interface{}) (map[string]interface{}, error) + StartFunc func(ctx context.Context, self *ExampleWorker, payload map[string]interface{}) error + StopFunc func(ctx context.Context, self *ExampleWorker, payload map[string]interface{}) error + Local map[string]interface{} } // Send satisfies the Worker interface. func (e *ExampleWorker) Send(ctx context.Context, in map[string]interface{}) (map[string]interface{}, error) { // notest if e.SendFunc != nil { - return e.SendFunc(ctx, in) + return e.SendFunc(ctx, e, in) } return nil, nil } @@ -226,7 +227,7 @@ func (e *ExampleWorker) Send(ctx context.Context, in map[string]interface{}) (ma func (e *ExampleWorker) Start(ctx context.Context, payload map[string]interface{}) error { // notest if e.StartFunc != nil { - return e.StartFunc(ctx, payload) + return e.StartFunc(ctx, e, payload) } return nil } @@ -235,7 +236,7 @@ func (e *ExampleWorker) Start(ctx context.Context, payload map[string]interface{ func (e *ExampleWorker) Stop(ctx context.Context, payload map[string]interface{}) error { // notest if e.StopFunc != nil { - return e.StopFunc(ctx, payload) + return e.StopFunc(ctx, e, payload) } return nil } diff --git a/blaster/worker_test.go b/blaster/worker_test.go index 226cf80..4c310d0 100644 --- a/blaster/worker_test.go +++ b/blaster/worker_test.go @@ -8,6 +8,77 @@ import ( "testing" ) +func TestWorkerVariants(t *testing.T) { + + ctx, cancel := context.WithCancel(context.Background()) + b := New(ctx, cancel) + b.Rate = 0 // set rate to 0 so we can inject items synthetically + b.itemFinishedChannel = make(chan struct{}) + b.Workers = 9 + + b.WorkerVariants = []map[string]string{ + {"index": "1"}, + {"index": "2"}, + {"index": "3"}, + } + b.SetWorkerTemplate(map[string]interface{}{ + "index": "{{ .index }}", + }) + + b.SetWorker(func() Worker { + return &ExampleWorker{ + StartFunc: func(ctx context.Context, self *ExampleWorker, payload map[string]interface{}) error { + if self.Local == nil { + self.Local = map[string]interface{}{} + } + self.Local["index"] = payload["index"] + return nil + }, + SendFunc: func(ctx context.Context, self *ExampleWorker, in map[string]interface{}) (map[string]interface{}, error) { + return map[string]interface{}{"index": self.Local["index"]}, nil + }, + } + }) + + log := &LoggingWriter{buf: new(bytes.Buffer)} + b.SetLog(log) + + b.LogOutput = []string{"index"} + + finished := make(chan error, 1) + go func() { + finished <- b.start(ctx) + }() + + // synthetically call the main channel, which is what the ticker would do + for i := 0; i < 1000; i++ { + b.mainChannel <- 0 + <-b.itemFinishedChannel + } + + // another tick and the data will reach EOF, and gracefully exit + close(b.dataFinishedChannel) + + // wait for the start method to finish + must(t, <-finished) + + b.Exit() + + all := map[interface{}]int{} + for _, value := range log.All() { + all[value[2]]++ + } + if len(all) != 3 { + t.Fatal("Unexpected worker variants summary counts:", all) + } + if all["1"] > 500 || all["1"] < 200 || + all["2"] > 500 || all["2"] < 200 || + all["3"] > 500 || all["3"] < 200 { + t.Fatal("Unexpected worker variants summary counts:", all) + } + +} + func TestDataLog(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -17,7 +88,7 @@ func TestDataLog(t *testing.T) { b.SetWorker(func() Worker { return &ExampleWorker{ - SendFunc: func(ctx context.Context, in map[string]interface{}) (map[string]interface{}, error) { + SendFunc: func(ctx context.Context, self *ExampleWorker, in map[string]interface{}) (map[string]interface{}, error) { return map[string]interface{}{ "d": fmt.Sprintf("d%s", in["index"]), "e": fmt.Sprintf("e%s", in["index"]),