Skip to content

Commit

Permalink
Introduce for SQS.SendMessageBatch (#101)
Browse files Browse the repository at this point in the history
* Introduce for SQS.SendMessageBatch

- Introduces SQS.SendMessageBatch function
- Refactors SQS service to use json instead of xml requests.
- Had to bump local stack image because of localstack/localstack#9610

* Format

* Format

* Update build

* Update build
  • Loading branch information
lolleko authored Jul 15, 2024
1 parent ff8e6b6 commit 2119312
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 93 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ version: '3.3'
services:
localstack:
container_name: 'k6-jslib-aws-localstack'
image: 'localstack/localstack:2.0.2'
image: 'localstack/localstack:3.4.0'
ports:
- '127.0.0.1:4566:4566' # LocalStack Gateway
- '127.0.0.1:4510-4559:4510-4559' # external services port range
Expand Down
250 changes: 163 additions & 87 deletions src/internal/sqs.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
import { AWSClient } from './client'
import { AWSConfig } from './config'
import { InvalidSignatureError, SignatureV4 } from './signature'
import { HTTPHeaders, SignedHTTPRequest } from './http'
import { HTTPHeaders } from './http'
import http, { RefinedResponse, ResponseType } from 'k6/http'
import { toFormUrlEncoded } from './utils'
import { AWSError } from './error'

const API_VERSION = '2012-11-05'
import { AMZ_TARGET_HEADER } from './constants'
import { JSONObject } from './json'

export class SQSClient extends AWSClient {
private readonly signature: SignatureV4
private readonly commonHeaders: HTTPHeaders

private readonly serviceVersion: string

constructor(awsConfig: AWSConfig) {
super(awsConfig, 'sqs')

this.serviceVersion = 'AmazonSQS'

this.signature = new SignatureV4({
service: this.serviceName,
region: this.awsConfig.region,
Expand All @@ -28,7 +31,7 @@ export class SQSClient extends AWSClient {
})

this.commonHeaders = {
'Content-Type': 'application/x-www-form-urlencoded',
'Content-Type': 'application/x-amz-json-1.0',
}
}

Expand All @@ -40,77 +43,72 @@ export class SQSClient extends AWSClient {
* @param {Object} options - Options for the request
* @param {string} [options.messageDeduplicationId] - The message deduplication id.
* @param {string} [options.messageGroupId] - The message group ID for FIFO queues
* @returns {Message} - The message that was sent.
* @returns {MessageResponse} - The message that was sent.
*/
async sendMessage(
queueUrl: string,
messageBody: string,
options: SendMessageOptions = {}
): Promise<Message> {
const method = 'POST'
): Promise<MessageResponse> {
const action = 'SendMessage'

let body: object = {
Action: 'SendMessage',
Version: API_VERSION,
const body = {
QueueUrl: queueUrl,
MessageBody: messageBody,
...this._combineQueueMessageBodyAndOptions(messageBody, options),
}

if (typeof options.messageDeduplicationId !== 'undefined') {
body = { ...body, MessageDeduplicationId: options.messageDeduplicationId }
}
const res = await this._sendRequest(action, body)

if (typeof options.messageGroupId !== 'undefined') {
body = { ...body, MessageGroupId: options.messageGroupId }
}
const parsed = res.json() as JSONObject
return new MessageResponse(
parsed['MessageId'] as string,
parsed['MD5OfMessageBody'] as string
)
}

if (typeof options.messageAttributes !== 'undefined') {
/*
* A single message attribute is represented as 3 separate parameters: name, value, and type.
* The name of the value parameter varies based on the data type.
* See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html#SQS-SendMessage-request-MessageAttributes
* for more information.
*/
const attributeParameters = Object.entries(options.messageAttributes).reduce(
(params, [name, attribute], i) => {
const valueParameterSuffix =
attribute.type === 'Binary' ? 'BinaryValue' : 'StringValue'
return Object.assign(params, {
[`MessageAttribute.${i + 1}.Name`]: name,
[`MessageAttribute.${i + 1}.Value.${valueParameterSuffix}`]:
attribute.value,
[`MessageAttribute.${i + 1}.Value.DataType`]: attribute.type,
})
},
{} as Record<string, string>
/**
* Delivers up to ten messages to the specified queue.
*
* @param {string} queueUrl - The URL of the Amazon SQS queue to which a message is sent. Queue URLs and names are case-sensitive.
* @param {SendMessageBatchEntry[]} entries - A list of up to ten messages to send.
* @returns {MessageBatchResponse} - The messages that were sent.
*/
async sendMessageBatch(
queueUrl: string,
entries: SendMessageBatchEntry[]
): Promise<MessageBatchResponse> {
const action = 'SendMessageBatch'

const requestMessageEntries = entries.map((entry) => {
let requestMessageEntry = this._combineQueueMessageBodyAndOptions(
entry.messageBody,
entry.messageOptions
)
body = { ...body, ...attributeParameters }
}
requestMessageEntry = { ...requestMessageEntry, Id: entry.messageId }
return requestMessageEntry
})

if (typeof options.delaySeconds !== 'undefined') {
body = { ...body, DelaySeconds: options.delaySeconds }
}
const body = { QueueUrl: queueUrl, Entries: requestMessageEntries }

const signedRequest: SignedHTTPRequest = this.signature.sign(
{
method: 'POST',
endpoint: this.endpoint,
path: '/',
headers: {
...this.commonHeaders,
},
body: toFormUrlEncoded(body),
},
{}
)
const res = await this._sendRequest(action, body)

const res = await http.asyncRequest(method, signedRequest.url, signedRequest.body || '', {
headers: signedRequest.headers,
})
this._handleError('SendMessage', res)
const parsed = res.json() as JSONObject
const successful: JSONObject[] = (parsed['Successful'] as JSONObject[]) || []
const failed: JSONObject[] = (parsed['Failed'] as JSONObject[]) || []

const parsed = res.html('SendMessageResponse > SendMessageResult')
return new Message(parsed.find('MessageId').text(), parsed.find('MD5OfMessageBody').text())
return {
successful: successful.map(
(entry) =>
new MessageResponse(
entry['MessageId'] as string,
entry['MD5OfMessageBody'] as string
)
),
failed: failed.map(
(entry) =>
new SQSServiceError(entry['Message'] as string, entry['Code'] as string, action)
),
}
}

/**
Expand All @@ -125,12 +123,9 @@ export class SQSClient extends AWSClient {
* @returns {string} [Object.nextToken] - In the future, you can use NextToken to request the next set of results.
*/
async listQueues(parameters: ListQueuesRequestParameters = {}): Promise<ListQueuesResponse> {
const method = 'POST'
const action = 'ListQueues'

let body: object = {
Action: 'ListQueues',
Version: API_VERSION,
}
let body: object = {}

if (typeof parameters?.maxResults !== 'undefined') {
body = { ...body, MaxResults: parameters.maxResults }
Expand All @@ -144,60 +139,109 @@ export class SQSClient extends AWSClient {
body = { ...body, QueueNamePrefix: parameters.queueNamePrefix }
}

const signedRequest: SignedHTTPRequest = this.signature.sign(
const res = await this._sendRequest(action, body)

const parsed = res.json() as JSONObject
return {
urls: parsed['QueueUrls'] as string[],
nextToken: parsed?.NextToken as string,
}
}

private _combineQueueMessageBodyAndOptions(
messageBody: string,
options?: SendMessageOptions
): object {
let body: object = { MessageBody: messageBody }

if (options === undefined) {
return body
}

if (typeof options.messageDeduplicationId !== 'undefined') {
body = { ...body, MessageDeduplicationId: options.messageDeduplicationId }
}

if (typeof options.messageGroupId !== 'undefined') {
body = { ...body, MessageGroupId: options.messageGroupId }
}

if (typeof options.messageAttributes !== 'undefined') {
const messageAttributes: Record<string, Record<string, string>> = {}

for (const [name, attribute] of Object.entries(options.messageAttributes)) {
const valueParameterSuffix =
attribute.type === 'Binary' ? 'BinaryValue' : 'StringValue'
messageAttributes[name] = {
DataType: attribute.type,
}
messageAttributes[name][valueParameterSuffix] = attribute.value
}

body = { ...body, MessageAttributes: messageAttributes }
}

if (typeof options.delaySeconds !== 'undefined') {
body = { ...body, DelaySeconds: options.delaySeconds }
}

return body
}

private async _sendRequest(
action: SQSOperation,
body: object
): Promise<RefinedResponse<ResponseType>> {
const signedRequest = this.signature.sign(
{
method: 'POST',
endpoint: this.endpoint,
path: '/',
headers: {
...this.commonHeaders,
Host: this.endpoint.host,
[AMZ_TARGET_HEADER]: `${this.serviceVersion}.${action}`,
},
body: toFormUrlEncoded(body),
body: JSON.stringify(body),
},
{}
)

const res = await http.asyncRequest(method, signedRequest.url, signedRequest.body || '', {
const res = await http.asyncRequest('POST', signedRequest.url, signedRequest.body, {
headers: signedRequest.headers,
})
this._handleError('ListQueues', res)

const parsed = res.html()
return {
urls: parsed
.find('QueueUrl')
.toArray()
.map((e) => e.text()),
nextToken: parsed.find('NextToken').text() || undefined,
}
this._handleError(action, res)
return res
}

private _handleError(
operation: SQSOperation,
response: RefinedResponse<ResponseType | undefined>
) {
const errorCode: number = response.error_code
const errorMessage: string = response.error

if (errorMessage == '' && errorCode === 0) {
if (errorCode === 0) {
return
}

const awsError = AWSError.parseXML(response.body as string)
switch (awsError.code) {
case 'AuthorizationHeaderMalformed':
throw new InvalidSignatureError(awsError.message, awsError.code)
const error = response.json() as JSONObject

const errorMessage: string =
(error.Message as string) || (error.message as string) || (error.__type as string)

switch (error.__type) {
case 'InvalidSignatureException':
throw new InvalidSignatureError(errorMessage, error.__type)
default:
throw new SQSServiceError(awsError.message, awsError.code || 'unknown', operation)
throw new SQSServiceError(errorMessage, error.__type as string, operation)
}
}
}

/**
* An Amazon SQS message.
*/
export class Message {
export class MessageResponse {
/**
* A unique identifier for the message.
* A MessageIdis considered unique across all AWS accounts for an extended period of time.
Expand All @@ -221,6 +265,32 @@ export class Message {
}
}

/**
* An Amazon SQS message Batch Response.
*/
export class MessageBatchResponse {
/**
* A list of successful messages.
*/
successful: MessageResponse[]

/**
* A list of failed messages.
*/
failed: SQSServiceError[]

/**
* Instantiates a new MessageBatchResponse object.
*
* @param successful
* @param failed
*/
constructor(successful: MessageResponse[], failed: SQSServiceError[]) {
this.successful = successful
this.failed = failed
}
}

/**
* SQSServiceError indicates an error occurred while interacting with the SQS API.
*/
Expand All @@ -237,7 +307,7 @@ export class SQSServiceError extends AWSError {
/**
* SQSOperation describes possible SQS operations.
*/
type SQSOperation = 'ListQueues' | 'SendMessage'
type SQSOperation = 'ListQueues' | 'SendMessage' | 'SendMessageBatch'

export interface SendMessageOptions {
/**
Expand All @@ -263,6 +333,12 @@ export interface SendMessageOptions {
delaySeconds?: number
}

export interface SendMessageBatchEntry {
messageId: string
messageBody: string
messageOptions?: SendMessageOptions
}

export interface ListQueuesRequestParameters {
/**
* Maximum number of results to include in the response. Value range is 1 to 1000.
Expand Down
Loading

0 comments on commit 2119312

Please sign in to comment.