Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix #188: tester for emitter #190

Merged
merged 1 commit into from
Jun 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ type Tester interface {
StorageBuilder() storage.Builder
ConsumerBuilder() kafka.ConsumerBuilder
ProducerBuilder() kafka.ProducerBuilder
EmitterProducerBuilder() kafka.ProducerBuilder
TopicManagerBuilder() kafka.TopicManagerBuilder
RegisterGroupGraph(*GroupGraph)
RegisterEmitter(Stream, Codec)
Expand Down Expand Up @@ -412,12 +413,11 @@ func WithEmitterHasher(hasher func() hash.Hash32) EmitterOption {

func WithEmitterTester(t Tester) EmitterOption {
return func(o *eoptions, topic Stream, codec Codec) {
o.builders.producer = t.ProducerBuilder()
o.builders.producer = t.EmitterProducerBuilder()
o.builders.topicmgr = t.TopicManagerBuilder()
t.RegisterEmitter(topic, codec)
}
}

func (opt *eoptions) applyOptions(topic Stream, codec Codec, opts ...EmitterOption) error {
opt.clientID = defaultClientID
opt.log = logger.Default()
Expand Down
33 changes: 33 additions & 0 deletions tester/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,19 @@ func (km *Tester) ProducerBuilder() kafka.ProducerBuilder {
}
}

// EmitterProducerBuilder creates a producer builder used for Emitters.
// Emitters need to flush when emitting messages.
func (km *Tester) EmitterProducerBuilder() kafka.ProducerBuilder {
builder := km.ProducerBuilder()
return func(b []string, cid string, hasher func() hash.Hash32) (kafka.Producer, error) {
prod, err := builder(b, cid, hasher)
return &flushingProducer{
tester: km,
producer: prod,
}, err
}
}

// StorageBuilder returns the storage builder when this tester is used as an option
// to a processor
func (km *Tester) StorageBuilder() storage.Builder {
Expand Down Expand Up @@ -268,6 +281,7 @@ func (km *Tester) waitStartup() {
// Consume a message using the topic's configured codec
func (km *Tester) Consume(topic string, key string, msg interface{}) {
km.waitStartup()
log.Printf("startup")

// if the user wants to send a nil for some reason,
// just let her. Goka should handle it accordingly :)
Expand Down Expand Up @@ -426,3 +440,22 @@ func (p *producerMock) Close() error {
logger.Printf("Closing producer mock")
return nil
}

// flushingProducer wraps the producer and
// waits for all consumers after the Emit.
type flushingProducer struct {
tester *Tester
producer kafka.Producer
}

// Emit using the underlying producer
func (e *flushingProducer) Emit(topic string, key string, value []byte) *kafka.Promise {
prom := e.producer.Emit(topic, key, value)
e.tester.waitForConsumers()
return prom
}

// Close using the underlying producer
func (e *flushingProducer) Close() error {
return e.producer.Close()
}
65 changes: 65 additions & 0 deletions tester/tester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tester
import (
"context"
"fmt"
"log"
"reflect"
"sync"
"testing"
Expand Down Expand Up @@ -498,3 +499,67 @@ func Test_ManyConsume(t *testing.T) {
t.Fatalf("did not receive all messages")
}
}

func TestEmitterStandalone(t *testing.T) {
gkt := New(t)

em, _ := goka.NewEmitter(nil, "test", new(codec.String), goka.WithEmitterTester(gkt))
est := gkt.NewQueueTracker("test")

em.Emit("key", "value")

_, _, ok := est.Next()
if !ok {
t.Errorf("No message emitted")
}
}

// Tests an emitter used inside a processor.
// For this the user has to start the emitter with
// a separate tester, otherwise it will deadlock.
func TestEmitterInProcessor(t *testing.T) {

gktProcessor := New(t)

// create a new emitter mocked with an extra tester
gktEmitter := New(t)
em, _ := goka.NewEmitter(nil, "output", new(codec.String), goka.WithEmitterTester(gktEmitter))

// create a new processor that uses the emitter internally
proc, err := goka.NewProcessor(nil, goka.DefineGroup("test-group",
goka.Input("input", new(codec.String), func(ctx goka.Context, msg interface{}) {
log.Printf("sending")

prom, err := em.Emit(ctx.Key(), msg)
log.Printf("sending done")
if err != nil {
t.Errorf("Error emitting in processor: %v", err)
}
prom.Then(func(err error) {
if err != nil {
t.Errorf("Error emitting in processor (in promise): %v", err)
}
})
log.Printf("done")
}),
),
goka.WithTester(gktProcessor),
)

if err != nil {
log.Fatalf("Error creating processor: %v", err)
}
runProcOrFail(proc)

// create a queue tracker from the extra tester
est := gktEmitter.NewQueueTracker("output")

// consume a message
gktProcessor.Consume("input", "key", "value")

// ensure the message is there
_, _, ok := est.Next()
if !ok {
t.Errorf("No message emitted")
}
}