Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement message move limit #9

Merged
merged 1 commit into from
Feb 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ to move deadletter queue messages back into the original queue.

## Features

* Reliable delivery. Messages are only deleted from the source queue if they
were successfully enqueued to the destination.
* Reliable delivery. Messages are only deleted from the source queue if they were enqueued to the destination.
* Messages are sent and received in batches for faster processing.
* Progress indicator.
* Friendly output and error message.
* Queue name resolution. For ease of use, you only need to provide a queue name and not the full `arn` address.
* Message Attributes are copied over.
* Support for FIFO queues. MessageGroupId and MessageDeduplicationId are copied over to the destination messages.
* Optional flag to limit the number of messages to move.

## Installation

Expand Down Expand Up @@ -98,12 +98,14 @@ sqsmover --help
usage: sqsmover.exe --source=SOURCE --destination=DESTINATION [<flags>]

Flags:
--help Show context-sensitive help (also try
-h, --help Show context-sensitive help (also try
--help-long and --help-man).
-s, --source=SOURCE Source queue to move messages from
-d, --destination=DESTINATION Destination queue to move messages to
-r, --region="us-west-2" AWS Region for source and destination queues
-p, --profile="" Use a specific profile from your credential file.
-s, --source=SOURCE Source queue name to move messages from.
-d, --destination=DESTINATION Destination queue name to move messages to.
-r, --region="us-west-2" AWS region for source and destination queues.
-p, --profile="" Use a specific profile from AWS credentials file.
-l, --limit=0 Limits number of messages moved. No limit is set by default.
-v, --version Show application version.
```

Examples:
Expand All @@ -124,3 +126,8 @@ Profile will default to `Default`, you can also override it with `--profile` fla
sqsmover --source=my_source_queue_name --destination=my_destination_queuename --profile=user
```

Limit number of moved messages to 10
```
sqsmover -s my_source_queue_name -d my_destination_queuename -l 10
```

58 changes: 34 additions & 24 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ var (
)

var (
sourceQueue = kingpin.Flag("source", "Source queue to move messages from").Short('s').Required().String()
destinationQueue = kingpin.Flag("destination", "Destination queue to move messages to").Short('d').Required().String()
region = kingpin.Flag("region", "AWS Region for source and destination queues").Short('r').Default("us-west-2").String()
profile = kingpin.Flag("profile", "Use a specific profile from your credential file.").Short('p').Default("").String()
sourceQueue = kingpin.Flag("source", "Source queue name to move messages from.").Short('s').Required().String()
destinationQueue = kingpin.Flag("destination", "Destination queue name to move messages to.").Short('d').Required().String()
region = kingpin.Flag("region", "AWS region for source and destination queues.").Short('r').Default("us-west-2").String()
profile = kingpin.Flag("profile", "Use a specific profile from AWS credentials file.").Short('p').Default("").String()
limit = kingpin.Flag("limit", "Limits number of messages moved. No limit is set by default.").Short('l').Default("0").Int()
)


func main() {
log.SetHandler(cli.Default)

Expand Down Expand Up @@ -60,23 +60,23 @@ func main() {

svc := sqs.New(sess)

err, sourceQueueUrl := resolveQueueUrl(svc, *sourceQueue)
sourceQueueUrl, err := resolveQueueUrl(svc, *sourceQueue)

if err != nil {
logAwsError("Failed to resolve source queue", err)
return
}

log.Info(color.New(color.FgCyan).Sprintf("Source queue url: %s", sourceQueueUrl))
log.Info(color.New(color.FgCyan).Sprintf("Source queue URL: %s", sourceQueueUrl))

err, destinationQueueUrl := resolveQueueUrl(svc, *destinationQueue)
destinationQueueUrl, err := resolveQueueUrl(svc, *destinationQueue)

if err != nil {
logAwsError("Failed to resolve destination queue", err)
return
}

log.Info(color.New(color.FgCyan).Sprintf("Destination queue url: %s", destinationQueueUrl))
log.Info(color.New(color.FgCyan).Sprintf("Destination queue URL: %s", destinationQueueUrl))

queueAttributes, err := svc.GetQueueAttributes(&sqs.GetQueueAttributesInput{
QueueUrl: aws.String(sourceQueueUrl),
Expand All @@ -85,29 +85,33 @@ func main() {

numberOfMessages, _ := strconv.Atoi(*queueAttributes.Attributes["ApproximateNumberOfMessages"])

log.Info(color.New(color.FgCyan).Sprintf("Approximate number of messages in the source queue: %s",
*queueAttributes.Attributes["ApproximateNumberOfMessages"]))
log.Info(color.New(color.FgCyan).Sprintf("Approximate number of messages in the source queue: %d", numberOfMessages))

if numberOfMessages == 0 {
log.Info("Looks like nothing to move. Done.")
return
}

if *limit > 0 && numberOfMessages > *limit {
numberOfMessages = *limit
log.Info(color.New(color.FgCyan).Sprintf("Limit is set, will only move %d messages", numberOfMessages))
}

moveMessages(sourceQueueUrl, destinationQueueUrl, svc, numberOfMessages)

}

func resolveQueueUrl(svc *sqs.SQS, queueName string) (error, string) {
func resolveQueueUrl(svc *sqs.SQS, queueName string) (string, error) {
params := &sqs.GetQueueUrlInput{
QueueName: aws.String(queueName),
}
resp, err := svc.GetQueueUrl(params)

if err != nil {
return err, ""
return "", err
}

return nil, *resp.QueueUrl
return *resp.QueueUrl, nil
}

func logAwsError(message string, err error) {
Expand Down Expand Up @@ -153,7 +157,7 @@ func convertSuccessfulMessageToBatchRequestEntry(messages []*sqs.Message) []*sqs
return result
}

func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQS, numberOfMessages int) {
func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQS, totalMessages int) {
params := &sqs.ReceiveMessageInput{
QueueUrl: aws.String(sourceQueueUrl),
VisibilityTimeout: aws.Int64(2),
Expand All @@ -171,7 +175,7 @@ func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQ
term.HideCursor()
defer term.ShowCursor()

b := progress.NewInt(numberOfMessages)
b := progress.NewInt(totalMessages)
b.Width = 40
b.StartDelimiter = color.New(color.FgCyan).Sprint("|")
b.EndDelimiter = color.New(color.FgCyan).Sprint("|")
Expand All @@ -186,9 +190,9 @@ func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQ
for {
resp, err := svc.ReceiveMessage(params)

if len(resp.Messages) == 0 {
if len(resp.Messages) == 0 || messagesProcessed == totalMessages {
fmt.Println()
log.Info(color.New(color.FgCyan).Sprintf("Done. Moved %s messages", strconv.Itoa(numberOfMessages)))
log.Info(color.New(color.FgCyan).Sprintf("Done. Moved %s messages", strconv.Itoa(totalMessages)))
return
}

Expand All @@ -197,9 +201,15 @@ func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQ
return
}

messagesToCopy := resp.Messages

if len(resp.Messages)+messagesProcessed > totalMessages {
messagesToCopy = resp.Messages[0 : totalMessages-messagesProcessed]
}

batch := &sqs.SendMessageBatchInput{
QueueUrl: aws.String(destinationQueueUrl),
Entries: convertToEntries(resp.Messages),
Entries: convertToEntries(messagesToCopy),
}

sendResp, err := svc.SendMessageBatch(batch)
Expand All @@ -214,9 +224,9 @@ func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQ
return
}

if len(sendResp.Successful) == len(resp.Messages) {
if len(sendResp.Successful) == len(messagesToCopy) {
deleteMessageBatch := &sqs.DeleteMessageBatchInput{
Entries: convertSuccessfulMessageToBatchRequestEntry(resp.Messages),
Entries: convertSuccessfulMessageToBatchRequestEntry(messagesToCopy),
QueueUrl: aws.String(sourceQueueUrl),
}

Expand All @@ -232,11 +242,11 @@ func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQ
return
}

messagesProcessed += len(resp.Messages)
messagesProcessed += len(messagesToCopy)
}

// Increase the total if the approximation was under - avoids exception
if messagesProcessed > numberOfMessages {
if messagesProcessed > totalMessages {
b.Total = float64(messagesProcessed)
}

Expand All @@ -257,4 +267,4 @@ func buildVersion(version, commit, date, builtBy string) string {
result = fmt.Sprintf("%s\nbuilt by: %s", result, builtBy)
}
return result
}
}