Skip to content

Commit

Permalink
fix(eventbus/kafka): Break loop on canceled ctx
Browse files Browse the repository at this point in the history
Breaking a select case isn't enough.
  • Loading branch information
Hans van den Bogert authored and Felix Svensson committed Nov 18, 2022
1 parent f8fe1ad commit 909c111
Showing 1 changed file with 6 additions and 4 deletions.
10 changes: 6 additions & 4 deletions eventbus/kafka/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,13 +287,18 @@ func (b *EventBus) Close() error {
// Handles all events coming in on the channel.
func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, r *kafka.Reader) {
defer b.wg.Done()
defer func() {
if err := r.Close(); err != nil {
log.Printf("eventhorizon: failed to close Kafka reader: %s", err)
}
}()

handler := b.handler(m, h, r)

for {
select {
case <-b.cctx.Done():
break
return
default:
}

Expand Down Expand Up @@ -335,9 +340,6 @@ func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, r *kafka.Reader)
}
}

if err := r.Close(); err != nil {
log.Printf("eventhorizon: failed to close Kafka reader: %s", err)
}
}

func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler, r *kafka.Reader) func(ctx context.Context, msg kafka.Message) *eh.EventBusError {
Expand Down

0 comments on commit 909c111

Please sign in to comment.