Skip to content

Commit

Permalink
NewAMQPClient
Browse files Browse the repository at this point in the history
  • Loading branch information
pieceowater committed Oct 22, 2024
1 parent 26c819d commit 00bc34b
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 0 deletions.
5 changes: 5 additions & 0 deletions gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gossiper

import (
"github.com/pieceowater-dev/lotof.lib.gossiper/internal"
"github.com/pieceowater-dev/lotof.lib.gossiper/internal/infra/amqp"
t "github.com/pieceowater-dev/lotof.lib.gossiper/types"
)

Expand All @@ -21,6 +22,10 @@ var EnvVars = &internal.EnvVars
// Provides RabbitMQ or AMQP related functions and configurations.
type AMQP = internal.AMQP

func NewAMQPClient(queueName string, dsn string) (*amqp.Client, error) {
return internal.NewAMQPClient(queueName, dsn)
}

// AMQMessage is an alias for the internal.DefaultMessage.
// Represents the structure of messages that are exchanged over AMQP.
type AMQMessage = t.DefaultMessage
Expand Down
5 changes: 5 additions & 0 deletions internal/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/pieceowater-dev/lotof.lib.gossiper/internal/conf"
"github.com/pieceowater-dev/lotof.lib.gossiper/internal/env"
"github.com/pieceowater-dev/lotof.lib.gossiper/internal/infra"
"github.com/pieceowater-dev/lotof.lib.gossiper/internal/infra/amqp"
"github.com/pieceowater-dev/lotof.lib.gossiper/internal/tools"
"github.com/pieceowater-dev/lotof.lib.gossiper/internal/tools/formats/errors"
"github.com/pieceowater-dev/lotof.lib.gossiper/internal/tools/formats/filter"
Expand All @@ -29,6 +30,10 @@ var EnvVars = &env.EnvVars
// Handles AMQP messaging operations, especially for RabbitMQ, providing key functions for producers and consumers.
type AMQP = infra.AMQP

func NewAMQPClient(queueName string, dsn string) (*amqp.Client, error) {
return infra.NewAMQPClient(queueName, dsn)
}

/* CONFIGURATION */

// Config is an alias for the conf.Config type.
Expand Down
177 changes: 177 additions & 0 deletions internal/infra/amqp/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package amqp

import (
"context"
"fmt"
"log"
"time"

amqp "github.com/rabbitmq/amqp091-go"
)

// Client struct to manage AMQP connections, channels, queues, etc.
type Client struct {
QueueName string
DSN string
conn *amqp.Connection
channel *amqp.Channel
}

// New creates a new AMQP client
func New(queueName, dsn string) (*Client, error) {
client := &Client{
QueueName: queueName,
DSN: dsn,
}

if err := client.connect(); err != nil {
return nil, err
}

return client, nil
}

// connect establishes a connection to RabbitMQ
func (c *Client) connect() error {
conn, err := amqp.Dial(c.DSN)
if err != nil {
return fmt.Errorf("failed to connect to RabbitMQ: %w", err)
}

ch, err := conn.Channel()
if err != nil {
return fmt.Errorf("failed to open a channel: %w", err)
}

c.conn = conn
c.channel = ch

return nil
}

// Close closes the connection and channel
func (c *Client) Close() {
if c.channel != nil {
_ = c.channel.Close()
}
if c.conn != nil {
_ = c.conn.Close()
}
}

// SendMessage sends a message to the queue with retry logic and optional reply
func (c *Client) SendMessage(body []byte, reply bool) ([]byte, error) {
var err error
for i := 0; i < 3; i++ {
if reply {
response, err := c.sendWithReply(body)
if err == nil {
return response, nil
}
} else {
err = c.send(body)
if err == nil {
return nil, nil
}
}
log.Printf("Error sending message, retrying... Attempt %d/3", i+1)
time.Sleep(3 * time.Second)
}
return nil, err
}

// send sends a message without waiting for a reply
func (c *Client) send(body []byte) error {
return c.channel.PublishWithContext(
context.TODO(),
"", // exchange
c.QueueName,
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: body,
},
)
}

// sendWithReply sends a message and waits for a reply
func (c *Client) sendWithReply(body []byte) ([]byte, error) {
// Declare an anonymous queue for replies
q, err := c.channel.QueueDeclare(
"", // name
false, // durable
true, // autoDelete
false, // exclusive
false, // noWait
nil, // args
)
if err != nil {
return nil, fmt.Errorf("failed to declare reply queue: %w", err)
}

//corrID := fmt.Sprintf("%d", time.Now().UnixNano())

// Set up the message publishing with a reply-to header
err = c.channel.PublishWithContext(
context.TODO(),
"", // exchange
c.QueueName,
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: body,
ReplyTo: q.Name,
CorrelationId: fmt.Sprintf("%d", time.Now().UnixNano()), // adjust as needed
},
)
if err != nil {
return nil, fmt.Errorf("failed to publish message: %w", err)
}

// Consume the reply message
msgs, err := c.channel.Consume(
q.Name, // queue
"", // consumer
true, // autoAck
false, // exclusive
false, // noLocal
false, // noWait
nil, // args
)
if err != nil {
return nil, fmt.Errorf("failed to consume reply: %w", err)
}

// Wait for a single response
for msg := range msgs {
return msg.Body, nil
}

return nil, fmt.Errorf("no reply received")
}

// Consume starts consuming messages from the queue
func (c *Client) Consume(handler func([]byte)) error {
msgs, err := c.channel.Consume(
c.QueueName,
"",
true, // autoAck
false, // exclusive
false, // noLocal
false, // noWait
nil,
)
if err != nil {
return fmt.Errorf("failed to register a consumer: %w", err)
}

go func() {
for d := range msgs {
handler(d.Body)
}
}()

return nil
}
4 changes: 4 additions & 0 deletions internal/infra/infra.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@ import (
)

type AMQP = amqp.AMQP

func NewAMQPClient(queueName string, dsn string) (*amqp.Client, error) {
return amqp.New(queueName, dsn)
}

0 comments on commit 00bc34b

Please sign in to comment.