Skip to content

Commit

Permalink
improves testing
Browse files Browse the repository at this point in the history
  • Loading branch information
Shubhang Balkundi committed Mar 25, 2024
1 parent 9733b30 commit e948772
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 14 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.22

require (
github.com/confluentinc/confluent-kafka-go/v2 v2.2.0
github.com/google/go-cmp v0.5.9
github.com/makasim/amqpextra v0.16.4
github.com/prometheus/client_golang v1.11.1
github.com/rs/zerolog v1.26.0
Expand Down
4 changes: 0 additions & 4 deletions kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@ type ConsumerConfig struct {

func (c ConsumerConfig) toConfigMap() kafka.ConfigMap {

if c.RouteGroup == "" {
panic("route group cannot be empty")
}

kafkaConfMap := kafka.ConfigMap{
"bootstrap.servers": c.BootstrapServers,
"group.id": c.GroupID,
Expand Down
5 changes: 5 additions & 0 deletions kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ func (cg *ConsumerGroup) Consume(ctx context.Context, handler ziggurat.Handler)
}

cm := cg.GroupConfig.toConfigMap()

confCons := cg.consumerMakeFunc(&cm, cg.GroupConfig.Topics)

if cg.GroupConfig.RouteGroup == "" {
cg.GroupConfig.RouteGroup = cg.GroupConfig.GroupID
}
cg.c = confCons
for i := 0; i < consConf.ConsumerCount; i++ {
workerID := fmt.Sprintf("%s_%d", groupID, i)
Expand Down
43 changes: 33 additions & 10 deletions kafka/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,51 @@ import (
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/gojekfarm/ziggurat/v2"
"github.com/gojekfarm/ziggurat/v2/logger"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/mock"
"sync/atomic"
"testing"
"time"
)

type mockHandler struct {
mock.Mock
}

func (mh *mockHandler) Handle(ctx context.Context, event *ziggurat.Event) error {
return mh.Called(ctx, event).Error(0)
}

func TestWorker(t *testing.T) {

t.Run("worker processes messages successfully", func(t *testing.T) {
var msgCount int32

wantEvent := ziggurat.Event{
RoutingPath: "foo-group/foo/1",
EventType: "kafka",
Value: make([]byte, 0),
Key: make([]byte, 0),
Metadata: map[string]any{"kafka-partition": 1, "kafka-topic": "foo"},
}

eventMatcher := mock.MatchedBy(func(e *ziggurat.Event) bool {
diff := cmp.Diff(&wantEvent, e, cmpopts.IgnoreFields(ziggurat.Event{}, "ReceivedTimestamp"))
if diff != "" {
t.Logf("(-Want +Got)%s\n", diff)
return false
}
return true

})
mh := mockHandler{}
mh.On("Handle", mock.Anything, eventMatcher).Return(nil)

mc := MockConsumer{}
w := worker{
handler: ziggurat.HandlerFunc(func(ctx context.Context, event *ziggurat.Event) error {
atomic.AddInt32(&msgCount, 1)
return nil
}),
handler: &mh,
logger: logger.NOOP,
consumer: &mc,
routeGroup: "foo",
routeGroup: "foo-group",
pollTimeout: 100,
killSig: make(chan struct{}),
id: "foo-worker",
Expand All @@ -44,9 +70,6 @@ func TestWorker(t *testing.T) {
t.Errorf("expected error to be nil got:%v", w.err)
return
}
if msgCount < 1 {
t.Error("handler was never invoked")
}

})

Expand Down

0 comments on commit e948772

Please sign in to comment.