Skip to content

Commit

Permalink
feat: use functional options pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
wilsonjord committed Nov 20, 2023
1 parent 733144a commit 077c916
Show file tree
Hide file tree
Showing 2 changed files with 264 additions and 64 deletions.
233 changes: 169 additions & 64 deletions aws/sqs/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,122 @@ import (
)

type Raw struct {
Body string
ReceiptHandle string
Attributes map[string]string
Body string
ReceiptHandle string
Attributes map[string]string
MessageAttributes map[string]string
}

type messageInput struct {
// generic
ctx context.Context

// send
msgAttributes map[string]string
delay *int32
group string
dedupe string

// receive
visibilityTimeout *int32
waitTime *int32
queueAttributes []types.QueueAttributeName
}

type MessageResponse struct {
MessageId string
}

type messageInputOptionFunc func(*messageInput)

func (s *SQS) SendMessage(queue string, body string, options ...messageInputOptionFunc) (MessageResponse, error) {
msg := &messageInput{}
for _, option := range options {
option(msg)
}

awsMsg := sqs.SendMessageInput{
QueueUrl: aws.String(queue),
MessageBody: aws.String(body),
}

// delayed message
if msg.delay != nil {
awsMsg.DelaySeconds = *msg.delay
}

// message with attributes
if len(msg.msgAttributes) > 0 {
awsMsg.MessageAttributes = map[string]types.MessageAttributeValue{}
for k, v := range msg.msgAttributes {
awsMsg.MessageAttributes[k] = types.MessageAttributeValue{
DataType: aws.String("String"),
StringValue: aws.String(v),
}
}
}

// message for FIFO queue
if msg.group != "" {
awsMsg.MessageGroupId = aws.String(msg.group)
awsMsg.MessageDeduplicationId = aws.String(msg.dedupe)
}

// context
ctx := msg.ctx
if ctx == nil {
ctx = context.TODO()
}

response, err := s.client.SendMessage(ctx, &awsMsg)

return MessageResponse{MessageId: *response.MessageId}, err
}

func WithDelay(delay int32) messageInputOptionFunc {
return func(m *messageInput) {
m.delay = new(int32)
*m.delay = delay
}
}

func WithWait(waitTimeSeconds int32) messageInputOptionFunc {
return func(m *messageInput) {
m.waitTime = new(int32)
*m.waitTime = waitTimeSeconds
}
}

func WithTimeout(visibilityTimeout int32) messageInputOptionFunc {
return func(m *messageInput) {
m.visibilityTimeout = new(int32)
*m.visibilityTimeout = visibilityTimeout
}
}

func WithMessageAttributes(attributes map[string]string) messageInputOptionFunc {
return func(m *messageInput) {
m.msgAttributes = attributes
}
}

func WithContext(ctx context.Context) messageInputOptionFunc {
return func(m *messageInput) {
m.ctx = ctx
}
}

func WithQueueAttributes(attributes []types.QueueAttributeName) messageInputOptionFunc {
return func(m *messageInput) {
m.queueAttributes = attributes
}
}

func WithFifo(group, dedupe string) messageInputOptionFunc {
return func(m *messageInput) {
m.group = group
m.dedupe = dedupe
}
}

type SQS struct {
Expand Down Expand Up @@ -97,13 +210,13 @@ func (s *SQS) Ready() bool {
// Applications should be able to handle duplicate or out of order messages,
// and should back off on Receive error.
func (s *SQS) Receive(queueURL string, visibilityTimeout int32) (Raw, error) {
return s.ReceiveWithContext(context.TODO(), queueURL, visibilityTimeout)
return s.ReceiveMessage(queueURL, WithTimeout(visibilityTimeout))
}

// ReceiveWithAttributes is the same as Receive except that Queue Attributes can be requested
// to be received with the message.
func (s *SQS) ReceiveWithAttributes(queueURL string, visibilityTimeout int32, attrs []types.QueueAttributeName) (Raw, error) {
return s.ReceiveWithContextAttributes(context.TODO(), queueURL, visibilityTimeout, attrs)
return s.ReceiveMessage(queueURL, WithTimeout(visibilityTimeout), WithQueueAttributes(attrs))
}

// ReceiveWithContextAttributes by context and Queue Attributes,
Expand All @@ -112,37 +225,64 @@ func (s *SQS) ReceiveWithAttributes(queueURL string, visibilityTimeout int32, at
// when system stop signal is received, an error with message '... context canceled' will be returned
// which can be used to safely stop the system
func (s *SQS) ReceiveWithContextAttributes(ctx context.Context, queueURL string, visibilityTimeout int32, attrs []types.QueueAttributeName) (Raw, error) {
input := sqs.ReceiveMessageInput{
QueueUrl: aws.String(queueURL),
MaxNumberOfMessages: 1,
VisibilityTimeout: visibilityTimeout,
WaitTimeSeconds: 20,
AttributeNames: attrs,
}
return s.receiveMessage(ctx, &input)
return s.ReceiveMessage(queueURL, WithContext(ctx), WithTimeout(visibilityTimeout), WithWait(20), WithQueueAttributes(attrs))
}

// receiveMessage is the common code used internally to receive an SQS message based
// on the provided input.
func (s *SQS) receiveMessage(ctx context.Context, input *sqs.ReceiveMessageInput) (Raw, error) {
func (s *SQS) ReceiveMessage(queue string, options ...messageInputOptionFunc) (Raw, error) {
msg := &messageInput{}
for _, option := range options {
option(msg)
}

for {
r, err := s.client.ReceiveMessage(ctx, input)
if err != nil {
return Raw{}, err
}
awsMsg := sqs.ReceiveMessageInput{
QueueUrl: aws.String(queue),
MessageAttributeNames: []string{"All"},
AttributeNames: msg.queueAttributes,
}

// visibility
if msg.visibilityTimeout != nil {
awsMsg.VisibilityTimeout = *msg.visibilityTimeout
}

// wait time
if msg.waitTime != nil {
awsMsg.WaitTimeSeconds = *msg.waitTime
}

// context
ctx := msg.ctx
if ctx == nil {
ctx = context.TODO()
}

r, err := s.client.ReceiveMessage(ctx, &awsMsg)
if err != nil {
return Raw{}, err
}

for {
switch {
case r == nil || len(r.Messages) == 0:
// no message received
continue
case len(r.Messages) == 1:
raw := r.Messages[0]

msgAttributes := map[string]string{}
for k, v := range raw.MessageAttributes {
if aws.ToString(v.DataType) == "Binary" {
continue
}

msgAttributes[k] = aws.ToString(v.StringValue)
}

m := Raw{
Body: aws.ToString(raw.Body),
ReceiptHandle: aws.ToString(raw.ReceiptHandle),
Attributes: raw.Attributes,
Body: aws.ToString(raw.Body),
ReceiptHandle: aws.ToString(raw.ReceiptHandle),
Attributes: raw.Attributes,
MessageAttributes: msgAttributes,
}
return m, nil
case len(r.Messages) > 1:
Expand All @@ -156,13 +296,7 @@ func (s *SQS) receiveMessage(ctx context.Context, input *sqs.ReceiveMessageInput
// when system stop signal is received, an error with message '... context canceled' will be returned
// which can be used to safely stop the system
func (s *SQS) ReceiveWithContext(ctx context.Context, queueURL string, visibilityTimeout int32) (Raw, error) {
input := sqs.ReceiveMessageInput{
QueueUrl: aws.String(queueURL),
MaxNumberOfMessages: 1,
VisibilityTimeout: visibilityTimeout,
WaitTimeSeconds: 20,
}
return s.receiveMessage(ctx, &input)
return s.ReceiveMessage(queueURL, WithContext(ctx), WithTimeout(visibilityTimeout))
}

// Delete deletes the message referred to by receiptHandle from the queue.
Expand All @@ -179,49 +313,20 @@ func (s *SQS) Delete(queueURL, receiptHandle string) error {

// Send sends the message body to the SQS queue referred to by queueURL.
func (s *SQS) Send(queueURL string, body string) error {
params := sqs.SendMessageInput{
QueueUrl: aws.String(queueURL),
MessageBody: aws.String(body),
}

_, err := s.client.SendMessage(context.TODO(), &params)

_, err := s.SendMessage(queueURL, body)
return err
}

// SendWithDelay is the same as Send but adds a delay (in seconds) before sending.
func (s *SQS) SendWithDelay(queueURL string, body string, delay int32) error {
params := sqs.SendMessageInput{
QueueUrl: aws.String(queueURL),
MessageBody: aws.String(body),
DelaySeconds: delay,
}

_, err := s.client.SendMessage(context.TODO(), &params)

_, err := s.SendMessage(queueURL, body, WithDelay(delay))
return err
}

// SendFifoMessage puts a message onto the given AWS SQS queue.
func (s *SQS) SendFifoMessage(queue, group, dedupe string, msg []byte) (string, error) {
var id *string
if dedupe != "" {
id = aws.String(dedupe)
}
params := sqs.SendMessageInput{
MessageBody: aws.String(string(msg)),
QueueUrl: aws.String(queue),
MessageGroupId: aws.String(group),
MessageDeduplicationId: id,
}
output, err := s.client.SendMessage(context.TODO(), &params)
if err != nil {
return "", err
}
if id := output.MessageId; id != nil {
return *id, nil
}
return "", nil
r, err := s.SendMessage(queue, string(msg), WithFifo(group, dedupe))
return r.MessageId, err
}

// Leverage the sendbatch api for uploading large numbers of messages
Expand Down
Loading

0 comments on commit 077c916

Please sign in to comment.