Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: [AUEML-2412] remove fail on startup if rabbitmq is offline #97

Merged
merged 1 commit into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions internal/events/rabbitmq/outboxPublisher/failedPublisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package outboxpublisher

import (
"github.com/TheRafaBonin/roxy"
"github.com/ThreeDotsLabs/watermill/message"
)

// Dummy publisher that always returns error
type FailedPublisher struct {}

func (f FailedPublisher) Publish(topic string, messages ...*message.Message) error {
return roxy.New("failed to connect to broker on startup, can't publish messages")
}


func (f FailedPublisher) Close() error {
return roxy.New("failed to connect to broker on startup, can't close publisher")
}
13 changes: 12 additions & 1 deletion internal/events/rabbitmq/outboxPublisher/outPublisher.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package outboxpublisher

import (
"strings"

"github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/alexdrl/zerowater"
Expand Down Expand Up @@ -69,6 +71,15 @@ func newWatermillConfig(logger *zerolog.Logger) amqp.Config {
// newRabbitMQOutPublisher creates a new rabbitmq publisher that publishes messages to the rabbitmq broker
// It uses the watermill library to publish messages
// It is used by the forwarder to publish messages from the outbox table to the rabbitmq broker
// If we can't connect to the broker, returns a dummy publisher that always returns error
func newRabbitMQOutPublisher(logger *zerolog.Logger) (message.Publisher, error) {
return amqp.NewPublisher(newWatermillConfig(logger), zerowater.NewZerologLoggerAdapter(logger.With().Logger()))
publisher, err := amqp.NewPublisher(newWatermillConfig(logger), zerowater.NewZerologLoggerAdapter(logger.With().Logger()))
if err != nil {
if strings.Contains(err.Error(), "create new connection") {
logger.Error().Msg("failed to connect to publisher! Using dummy publisher that always fails")
return FailedPublisher{}, nil
}
return nil, err
}
return publisher, nil
}
3 changes: 3 additions & 0 deletions internal/events/rabbitmq/publisher/close.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (

// Graceful shutdown of the publisher.
func (r *rabbitmqPublisher) Close(ctx context.Context) error {
if r.chManager == nil {
return eris.New("r.chManager is nil! Invalid publisher")
}
r.logger.Info().Msg("closing publisher")

// Wait till all events are published.
Expand Down
3 changes: 3 additions & 0 deletions internal/events/rabbitmq/publisher/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ const (
)

func (r *rabbitmqPublisher) healthCheckLoop() {
if r.chManager == nil {
return
}
logger := r.logger.With().Str("component", "publisher_health_check").Logger()

ticker := time.NewTicker(timeCheckTicker)
Expand Down
3 changes: 3 additions & 0 deletions internal/events/rabbitmq/publisher/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type message struct {
// The message is published asynchronously
// The message will be republished if the connection is lost
func (r *rabbitmqPublisher) Publish(ctx context.Context, topic string, payload interface{}) error {
if r.chManager == nil {
return eris.New("r.chManager is nil! Invalid publisher")
}
ctx, span := otel.Tracer(scope).Start(ctx, "rabbitmqPublisher.Publish",
trace.WithSpanKind(trace.SpanKindInternal),
trace.WithAttributes(
Expand Down
3 changes: 3 additions & 0 deletions internal/events/rabbitmq/publisher/startPublisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ func (r *rabbitmqPublisher) StartPublisher(ctx context.Context) error {
go r.healthCheckLoop()

for {
if r.chManager == nil {
return eris.New("r.chManager is nil! Invalid publisher")
}
err := r.chManager.Channel.Confirm(false)
if err != nil {
return eris.Wrap(err, "failed to enable publisher confirms")
Expand Down
18 changes: 2 additions & 16 deletions pkg/events/rabbitmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@ func registerNamedConsumer(lc fx.Lifecycle, s fx.Shutdowner, logger *zerolog.Log
consumer, err := NewRabbitMQConsumer(logger, WithQueueNamePosfix(namedHandler.QueuePosfix()))
if err != nil {
logger.Error().Err(err).Msg("failed to create consumer")
err = s.Shutdown()
if err != nil {
logger.Error().Err(err).Msg("failed to shutdown")
}
return
}

registerProvidedConsumer(lc, s, logger, namedHandler, consumer)
Expand All @@ -39,10 +36,7 @@ func registerConsumer(lc fx.Lifecycle, s fx.Shutdowner, logger *zerolog.Logger,
consumer, err := NewRabbitMQConsumer(logger)
if err != nil {
logger.Error().Err(err).Msg("failed to create consumer")
err = s.Shutdown()
if err != nil {
logger.Error().Err(err).Msg("failed to shutdown")
}
return
}

lc.Append(
Expand All @@ -52,10 +46,6 @@ func registerConsumer(lc fx.Lifecycle, s fx.Shutdowner, logger *zerolog.Logger,
err := consumer.Subscribe(ctx, handler)
if err != nil {
logger.Error().Err(err).Msg("failed to subscribe to topics")
err = s.Shutdown()
if err != nil {
logger.Error().Err(err).Msg("failed to shutdown")
}
}
}()

Expand Down Expand Up @@ -85,10 +75,6 @@ func registerProvidedConsumer(lc fx.Lifecycle, s fx.Shutdowner, logger *zerolog.
err := consumer.Subscribe(ctx, handler)
if err != nil {
logger.Error().Err(err).Msg("failed to subscribe to topics")
err = s.Shutdown()
if err != nil {
logger.Error().Err(err).Msg("failed to shutdown")
}
}
}()

Expand Down
8 changes: 0 additions & 8 deletions pkg/events/rabbitmq/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ func provideRabbitMQPublisher(logger *zerolog.Logger, s fx.Shutdowner) events.Ev
publisher, err := NewRabbitMQPublisher(logger)
if err != nil {
logger.Error().Err(err).Msg("failed to create publisher")
err = s.Shutdown()
if err != nil {
logger.Error().Err(err).Msg("failed to shutdown")
}
}

return publisher
Expand Down Expand Up @@ -55,10 +51,6 @@ func startPublisher(lc fx.Lifecycle, s fx.Shutdowner, logger *zerolog.Logger, pu
err := publisher.StartPublisher(context.Background())
if err != nil {
logger.Error().Err(err).Msg("failed to start publisher")
err = s.Shutdown()
if err != nil {
logger.Error().Err(err).Msg("failed to shutdown")
}
}
}()

Expand Down
Loading