From 4dd30d4aaf7f98dc81995445c20cee9712b4d43b Mon Sep 17 00:00:00 2001 From: Fernando Bandeira Date: Thu, 5 Dec 2024 16:07:54 -0300 Subject: [PATCH] add flag to delete dlx before creation --- internal/events/rabbitmq/config.go | 13 +++++++++++++ internal/events/rabbitmq/consumer/declares.go | 12 ++++++++++++ 2 files changed, 25 insertions(+) diff --git a/internal/events/rabbitmq/config.go b/internal/events/rabbitmq/config.go index f8b248c..7339128 100644 --- a/internal/events/rabbitmq/config.go +++ b/internal/events/rabbitmq/config.go @@ -20,6 +20,8 @@ const ( DefaultMaxInterval = backoff.DefaultMaxInterval // DefaultMaxRetries is the default max retries for the backoff DefaultMaxRetries = 5 + // DefaultDeleteDLX is the default value for deleting the DLX before creating it + DefaultDeleteDLX = false ) type Config struct { @@ -35,6 +37,8 @@ type Config struct { RandomizationFactor float64 Multiplier float64 MaxInterval time.Duration + + DeleteDLX bool } type RabbitmqConfigOption func(*Config) @@ -50,6 +54,7 @@ func LoadConfig(log *zerolog.Logger, opts ...RabbitmqConfigOption) Config { RandomizationFactor: DefaultRandomizationFactor, Multiplier: DefaultMultiplier, MaxInterval: DefaultMaxInterval, + DeleteDLX: DefaultDeleteDLX, } if c.ExchangeName == "" { @@ -136,6 +141,14 @@ func LoadConfig(log *zerolog.Logger, opts ...RabbitmqConfigOption) Config { } } + deleteDLX := os.Getenv("RABBIT_DELETE_DLX") + if deleteDLX != "" { + parsedDeleteDLX, err := strconv.ParseBool(deleteDLX) + if err == nil { + c.DeleteDLX = parsedDeleteDLX + } + } + for _, opt := range opts { opt(&c) } diff --git a/internal/events/rabbitmq/consumer/declares.go b/internal/events/rabbitmq/consumer/declares.go index b5a4d00..8f0fd92 100644 --- a/internal/events/rabbitmq/consumer/declares.go +++ b/internal/events/rabbitmq/consumer/declares.go @@ -98,6 +98,18 @@ func (r *rabbitmqConsumer) deadLetterDeclare(dlxName string) error { return eris.Wrap(err, "failed to declare exchange") } + if r.config.DeleteDLX { + _, err = r.chManager.Channel.QueueDelete( + dlxName, + false, + false, + false, + ) + if err != nil { + return eris.Wrap(err, "failed to delete queue") + } + } + _, err = r.chManager.Channel.QueueDeclare( dlxName, true,