Skip to content

Commit

Permalink
feature(eventbus/kafka): retry eventhandling until success
Browse files Browse the repository at this point in the history
Previously the eventbus was lossy in case of eventhandlers returning error.
  • Loading branch information
Hans van den Bogert authored and Felix Svensson committed Nov 18, 2022
1 parent 909c111 commit 07947aa
Showing 1 changed file with 22 additions and 12 deletions.
34 changes: 22 additions & 12 deletions eventbus/kafka/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,23 +319,33 @@ func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, r *kafka.Reader)
continue
}

if err := handler(b.cctx, msg); err != nil {
for {
select {
case b.errCh <- err:
case <-b.cctx.Done():
return
default:
log.Printf("eventhorizon: missed error in Kafka event bus: %s", err)
}

continue
}
if err := handler(b.cctx, msg); err != nil {
select {
case b.errCh <- err:
default:
log.Printf("eventhorizon: missed error in Kafka event bus: %s", err)
}

time.Sleep(time.Second)
} else {
// Use a new context to always finish the commit.
if err := r.CommitMessages(context.Background(), msg); err != nil {
err = fmt.Errorf("could not commit message: %w", err)
select {
case b.errCh <- &eh.EventBusError{Err: err}:
default:
log.Printf("eventhorizon: missed error in Kafka event bus: %s", err)
}
}

// Use a new context to always finish the commit.
if err := r.CommitMessages(context.Background(), msg); err != nil {
err = fmt.Errorf("could not commit message: %w", err)
select {
case b.errCh <- &eh.EventBusError{Err: err}:
default:
log.Printf("eventhorizon: missed error in Kafka event bus: %s", err)
break
}
}
}
Expand Down

0 comments on commit 07947aa

Please sign in to comment.