From 07947aa7b200d0913cda15ed917e91854da75cad Mon Sep 17 00:00:00 2001 From: Hans van den Bogert Date: Tue, 15 Feb 2022 14:46:59 +0100 Subject: [PATCH] feature(eventbus/kafka): retry eventhandling until success Previously the eventbus was lossy in case of eventhandlers returning error. --- eventbus/kafka/eventbus.go | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/eventbus/kafka/eventbus.go b/eventbus/kafka/eventbus.go index 1b81f233..06ae2393 100644 --- a/eventbus/kafka/eventbus.go +++ b/eventbus/kafka/eventbus.go @@ -319,23 +319,33 @@ func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, r *kafka.Reader) continue } - if err := handler(b.cctx, msg); err != nil { + for { select { - case b.errCh <- err: + case <-b.cctx.Done(): + return default: - log.Printf("eventhorizon: missed error in Kafka event bus: %s", err) } - continue - } + if err := handler(b.cctx, msg); err != nil { + select { + case b.errCh <- err: + default: + log.Printf("eventhorizon: missed error in Kafka event bus: %s", err) + } + + time.Sleep(time.Second) + } else { + // Use a new context to always finish the commit. + if err := r.CommitMessages(context.Background(), msg); err != nil { + err = fmt.Errorf("could not commit message: %w", err) + select { + case b.errCh <- &eh.EventBusError{Err: err}: + default: + log.Printf("eventhorizon: missed error in Kafka event bus: %s", err) + } + } - // Use a new context to always finish the commit. - if err := r.CommitMessages(context.Background(), msg); err != nil { - err = fmt.Errorf("could not commit message: %w", err) - select { - case b.errCh <- &eh.EventBusError{Err: err}: - default: - log.Printf("eventhorizon: missed error in Kafka event bus: %s", err) + break } } }