diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index e6f4ae2c5f..9f8d8852c5 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -141,14 +141,25 @@ Processing batches from SQS works in three stages: #### FIFO queues -When using [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, we will stop processing messages after the first failure, and return all failed and unprocessed messages in `batchItemFailures`. -This helps preserve the ordering of messages in your queue. +When using [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-fifo-queues.html){target="_blank"}, a batch may include messages from different group IDs. -```typescript hl_lines="1-4 8 20-22" ---8<-- "examples/snippets/batch/gettingStartedSQSFifo.ts" -``` +By default, we will stop processing at the first failure and mark unprocessed messages as failed to preserve ordering. However, this behavior may not be optimal for customers who wish to proceed with processing messages from a different group ID. + +Enable the `skipGroupOnError` option for seamless processing of messages from various group IDs. This setup ensures that messages from a failed group ID are sent back to SQS, enabling uninterrupted processing of messages from the subsequent group ID. -1. **Step 1**. Creates a partial failure batch processor for SQS FIFO queues. See [partial failure mechanics for details](#partial-failure-mechanics) +=== "Recommended" + + ```typescript hl_lines="1-4 8" + --8<-- "examples/snippets/batch/gettingStartedSQSFifo.ts" + ``` + + 1. **Step 1**. Creates a partial failure batch processor for SQS FIFO queues. See [partial failure mechanics for details](#partial-failure-mechanics) + +=== "Enabling skipGroupOnError flag" + + ```typescript hl_lines="1-4 13 30" + --8<-- "examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts" + ``` !!! Note Note that SqsFifoPartialProcessor is synchronous using `processPartialResponseSync`. @@ -283,7 +294,7 @@ sequenceDiagram > Read more about [Batch Failure Reporting feature in AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank"}. -Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues. +Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues without `skipGroupOnError` flag.
```mermaid @@ -307,6 +318,31 @@ sequenceDiagram SQS FIFO mechanism with Batch Item Failures
+Sequence diagram to explain how [`SqsFifoPartialProcessor` works](#fifo-queues) with SQS FIFO queues with `skipGroupOnError` flag. + +
+```mermaid +sequenceDiagram + autonumber + participant SQS queue + participant Lambda service + participant Lambda function + Lambda service->>SQS queue: Poll + Lambda service->>Lambda function: Invoke (batch event) + activate Lambda function + Lambda function-->Lambda function: Process 2 out of 10 batch items + Lambda function--xLambda function: Fail on 3rd batch item + Lambda function-->Lambda function: Process messages from another MessageGroupID + Lambda function->>Lambda service: Report 3rd batch item and all messages within the same MessageGroupID as failure + deactivate Lambda function + activate SQS queue + Lambda service->>SQS queue: Delete successful messages processed + SQS queue-->>SQS queue: Failed messages return + deactivate SQS queue +``` +SQS FIFO mechanism with Batch Item Failures +
+ #### Kinesis and DynamoDB Streams > Read more about [Batch Failure Reporting feature](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-batchfailurereporting){target="_blank"}. diff --git a/examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts b/examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts new file mode 100644 index 0000000000..3124e7105c --- /dev/null +++ b/examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts @@ -0,0 +1,32 @@ +import { + SqsFifoPartialProcessor, + processPartialResponseSync, +} from '@aws-lambda-powertools/batch'; +import { Logger } from '@aws-lambda-powertools/logger'; +import type { + SQSEvent, + SQSRecord, + Context, + SQSBatchResponse, +} from 'aws-lambda'; + +const processor = new SqsFifoPartialProcessor(); +const logger = new Logger(); + +const recordHandler = (record: SQSRecord): void => { + const payload = record.body; + if (payload) { + const item = JSON.parse(payload); + logger.info('Processed item', { item }); + } +}; + +export const handler = async ( + event: SQSEvent, + context: Context +): Promise => { + return processPartialResponseSync(event, recordHandler, processor, { + context, + skipGroupOnError: true, + }); +}; diff --git a/packages/batch/src/SqsFifoPartialProcessor.ts b/packages/batch/src/SqsFifoPartialProcessor.ts index bac4939d8b..26354f3715 100644 --- a/packages/batch/src/SqsFifoPartialProcessor.ts +++ b/packages/batch/src/SqsFifoPartialProcessor.ts @@ -1,7 +1,17 @@ +import { SQSRecord } from 'aws-lambda'; import { BatchProcessorSync } from './BatchProcessorSync.js'; import { EventType } from './constants.js'; -import { SqsFifoShortCircuitError } from './errors.js'; -import type { FailureResponse, SuccessResponse } from './types.js'; +import { + BatchProcessingError, + SqsFifoMessageGroupShortCircuitError, + SqsFifoShortCircuitError, +} from './errors.js'; +import type { + BaseRecord, + EventSourceDataClassTypes, + FailureResponse, + SuccessResponse, +} from './types.js'; /** * Batch processor for SQS FIFO queues @@ -35,8 +45,36 @@ import type { FailureResponse, SuccessResponse } from './types.js'; * ``` */ class SqsFifoPartialProcessor extends BatchProcessorSync { + /** + * The ID of the current message group being processed. + */ + #currentGroupId?: string; + /** + * A set of group IDs that have already encountered failures. + */ + #failedGroupIds: Set; + public constructor() { super(EventType.SQS); + this.#failedGroupIds = new Set(); + } + + /** + * Handles a failure for a given record. + * Adds the current group ID to the set of failed group IDs if `skipGroupOnError` is true. + * @param record - The record that failed. + * @param exception - The error that occurred. + * @returns The failure response. + */ + public failureHandler( + record: EventSourceDataClassTypes, + exception: Error + ): FailureResponse { + if (this.options?.skipGroupOnError && this.#currentGroupId) { + this.#addToFailedGroup(this.#currentGroupId); + } + + return super.failureHandler(record, exception); } /** @@ -48,8 +86,11 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { * The method calls the prepare hook to initialize the processor and then * iterates over each record in the batch, processing them one by one. * - * If one of them fails, the method short circuits the processing and fails - * the remaining records in the batch. + * If one of them fails and `skipGroupOnError` is not true, the method short circuits + * the processing and fails the remaining records in the batch. + * + * If one of them fails and `skipGroupOnError` is true, then the method fails the current record + * if the message group has any previous failure, otherwise keeps processing. * * Then, it calls the clean hook to clean up the processor and returns the * processed records. @@ -60,13 +101,31 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { const processedRecords: (SuccessResponse | FailureResponse)[] = []; let currentIndex = 0; for (const record of this.records) { - // If we have any failed messages, it means the last message failed - // We should then short circuit the process and fail remaining messages - if (this.failureMessages.length != 0) { + this.#setCurrentGroup((record as SQSRecord).attributes?.MessageGroupId); + + // If we have any failed messages, we should then short circuit the process and + // fail remaining messages unless `skipGroupOnError` is true + const shouldShortCircuit = + !this.options?.skipGroupOnError && this.failureMessages.length !== 0; + if (shouldShortCircuit) { return this.shortCircuitProcessing(currentIndex, processedRecords); } - processedRecords.push(this.processRecordSync(record)); + // If `skipGroupOnError` is true and the current group has previously failed, + // then we should skip processing the current group. + const shouldSkipCurrentGroup = + this.options?.skipGroupOnError && + this.#currentGroupId && + this.#failedGroupIds.has(this.#currentGroupId); + + const result = shouldSkipCurrentGroup + ? this.#processFailRecord( + record, + new SqsFifoMessageGroupShortCircuitError() + ) + : this.processRecordSync(record); + + processedRecords.push(result); currentIndex++; } @@ -94,16 +153,46 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { const remainingRecords = this.records.slice(firstFailureIndex); for (const record of remainingRecords) { - const data = this.toBatchType(record, this.eventType); - processedRecords.push( - this.failureHandler(data, new SqsFifoShortCircuitError()) - ); + this.#processFailRecord(record, new SqsFifoShortCircuitError()); } this.clean(); return processedRecords; } + + /** + * Adds the specified group ID to the set of failed group IDs. + * + * @param group - The group ID to be added to the set of failed group IDs. + */ + #addToFailedGroup(group: string): void { + this.#failedGroupIds.add(group); + } + + /** + * Processes a fail record. + * + * @param record - The record that failed. + * @param exception - The error that occurred. + */ + #processFailRecord( + record: BaseRecord, + exception: BatchProcessingError + ): FailureResponse { + const data = this.toBatchType(record, this.eventType); + + return this.failureHandler(data, exception); + } + + /** + * Sets the current group ID for the message being processed. + * + * @param group - The group ID of the current message being processed. + */ + #setCurrentGroup(group?: string): void { + this.#currentGroupId = group; + } } export { SqsFifoPartialProcessor }; diff --git a/packages/batch/src/errors.ts b/packages/batch/src/errors.ts index df128e7aa2..9467e5e0d2 100644 --- a/packages/batch/src/errors.ts +++ b/packages/batch/src/errors.ts @@ -37,6 +37,17 @@ class SqsFifoShortCircuitError extends BatchProcessingError { } } +/** + * Error thrown by the Batch Processing utility when a previous record from + * SQS FIFO queue message group fails processing. + */ +class SqsFifoMessageGroupShortCircuitError extends BatchProcessingError { + public constructor() { + super('A previous record from this message group failed processing'); + this.name = 'SqsFifoMessageGroupShortCircuitError'; + } +} + /** * Error thrown by the Batch Processing utility when a partial processor receives an unexpected * batch type. @@ -56,5 +67,6 @@ export { BatchProcessingError, FullBatchFailureError, SqsFifoShortCircuitError, + SqsFifoMessageGroupShortCircuitError, UnexpectedBatchTypeError, }; diff --git a/packages/batch/src/index.ts b/packages/batch/src/index.ts index 6613712b7e..499202f722 100644 --- a/packages/batch/src/index.ts +++ b/packages/batch/src/index.ts @@ -3,6 +3,7 @@ export { BatchProcessingError, FullBatchFailureError, SqsFifoShortCircuitError, + SqsFifoMessageGroupShortCircuitError, UnexpectedBatchTypeError, } from './errors.js'; export { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js'; diff --git a/packages/batch/src/processPartialResponseSync.ts b/packages/batch/src/processPartialResponseSync.ts index 526d355a89..cdf349dd84 100644 --- a/packages/batch/src/processPartialResponseSync.ts +++ b/packages/batch/src/processPartialResponseSync.ts @@ -42,16 +42,42 @@ import type { * }); * ``` * + * When working with SQS FIFO queues, we will stop processing at the first failure + * and mark unprocessed messages as failed to preserve ordering. However, if you want to + * continue processing messages from different group IDs, you can enable the `skipGroupOnError` + * option for seamless processing of messages from various group IDs. + * + * @example + * ```typescript + * import { + * SqsFifoPartialProcessor, + * processPartialResponseSync, + * } from '@aws-lambda-powertools/batch'; + * import type { SQSRecord, SQSHandler } from 'aws-lambda'; + * + * const processor = new SqsFifoPartialProcessor(); + * + * const recordHandler = async (record: SQSRecord): Promise => { + * const payload = JSON.parse(record.body); + * }; + * + * export const handler: SQSHandler = async (event, context) => + * processPartialResponseSync(event, recordHandler, processor, { + * context, + * skipGroupOnError: true + * }); + * ``` + * * @param event The event object containing the batch of records * @param recordHandler Sync function to process each record from the batch * @param processor Batch processor instance to handle the batch processing - * @param options Batch processing options + * @param options Batch processing options, which can vary with chosen batch processor implementation */ -const processPartialResponseSync = ( +const processPartialResponseSync = ( event: { Records: BaseRecord[] }, recordHandler: CallableFunction, - processor: BasePartialBatchProcessor, - options?: BatchProcessingOptions + processor: T, + options?: BatchProcessingOptions ): PartialItemFailureResponse => { if (!event.Records || !Array.isArray(event.Records)) { throw new UnexpectedBatchTypeError(); diff --git a/packages/batch/src/types.ts b/packages/batch/src/types.ts index b06391df79..51f9b78e34 100644 --- a/packages/batch/src/types.ts +++ b/packages/batch/src/types.ts @@ -4,18 +4,27 @@ import type { KinesisStreamRecord, SQSRecord, } from 'aws-lambda'; +import { SqsFifoPartialProcessor } from './SqsFifoPartialProcessor.js'; +import { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js'; /** * Options for batch processing * + * @template T The type of the batch processor, defaults to BasePartialBatchProcessor * @property context The context object provided by the AWS Lambda runtime + * @property skipGroupOnError The option to group on error during processing */ -type BatchProcessingOptions = { +type BatchProcessingOptions = { /** * The context object provided by the AWS Lambda runtime. When provided, * it's made available to the handler function you specify */ - context: Context; + context?: Context; + /** + * This option is only available for SqsFifoPartialProcessor. + * If true skip the group on error during processing. + */ + skipGroupOnError?: T extends SqsFifoPartialProcessor ? boolean : never; }; /** diff --git a/packages/batch/tests/helpers/factories.ts b/packages/batch/tests/helpers/factories.ts index 7df6110742..ce51736788 100644 --- a/packages/batch/tests/helpers/factories.ts +++ b/packages/batch/tests/helpers/factories.ts @@ -5,7 +5,7 @@ import type { } from 'aws-lambda'; import { randomInt, randomUUID } from 'node:crypto'; -const sqsRecordFactory = (body: string): SQSRecord => { +const sqsRecordFactory = (body: string, messageGroupId?: string): SQSRecord => { return { messageId: randomUUID(), receiptHandle: 'AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a', @@ -15,6 +15,7 @@ const sqsRecordFactory = (body: string): SQSRecord => { SentTimestamp: '1545082649183', SenderId: 'AIDAIENQZJOLO23YVJ4VO', ApproximateFirstReceiveTimestamp: '1545082649185', + ...(messageGroupId ? { MessageGroupId: messageGroupId } : {}), }, messageAttributes: {}, md5OfBody: 'e4e68fb7bd0e697a0ae8f1bb342846b3', diff --git a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts index 61d12183fc..54f463caa8 100644 --- a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts +++ b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts @@ -7,6 +7,7 @@ import { SqsFifoPartialProcessor, processPartialResponseSync, SqsFifoShortCircuitError, + SqsFifoMessageGroupShortCircuitError, } from '../../src/index.js'; import { sqsRecordFactory } from '../helpers/factories.js'; import { sqsRecordHandler } from '../helpers/handlers.js'; @@ -68,5 +69,186 @@ describe('Class: SqsFifoBatchProcessor', () => { ); expect(processor.errors[1]).toBeInstanceOf(SqsFifoShortCircuitError); }); + + test('When `skipGroupOnError` is true, SQS FIFO processor is set to continue processing even after first failure', () => { + // Prepare + const firstRecord = sqsRecordFactory('fail', '1'); + const secondRecord = sqsRecordFactory('success', '1'); + const thirdRecord = sqsRecordFactory('fail', '2'); + const fourthRecord = sqsRecordFactory('success', '2'); + const fifthRecord = sqsRecordFactory('success', '3'); + const event = { + Records: [ + firstRecord, + secondRecord, + thirdRecord, + fourthRecord, + fifthRecord, + ], + }; + const processor = new SqsFifoPartialProcessor(); + + // Act + const result = processPartialResponseSync( + event, + sqsRecordHandler, + processor, + { + skipGroupOnError: true, + } + ); + + // Assess + expect(result['batchItemFailures'].length).toBe(4); + expect(result['batchItemFailures'][0]['itemIdentifier']).toBe( + firstRecord.messageId + ); + expect(result['batchItemFailures'][1]['itemIdentifier']).toBe( + secondRecord.messageId + ); + expect(result['batchItemFailures'][2]['itemIdentifier']).toBe( + thirdRecord.messageId + ); + expect(result['batchItemFailures'][3]['itemIdentifier']).toBe( + fourthRecord.messageId + ); + expect(processor.errors.length).toBe(4); + expect(processor.errors[1]).toBeInstanceOf( + SqsFifoMessageGroupShortCircuitError + ); + expect(processor.errors[3]).toBeInstanceOf( + SqsFifoMessageGroupShortCircuitError + ); + }); + + test('When `skipGroupOnError` is true, SQS FIFO processor is set to continue processing even after encountering errors in specific MessageGroupID', () => { + // Prepare + const firstRecord = sqsRecordFactory('success', '1'); + const secondRecord = sqsRecordFactory('success', '1'); + const thirdRecord = sqsRecordFactory('fail', '2'); + const fourthRecord = sqsRecordFactory('success', '2'); + const fifthRecord = sqsRecordFactory('success', '3'); + const event = { + Records: [ + firstRecord, + secondRecord, + thirdRecord, + fourthRecord, + fifthRecord, + ], + }; + const processor = new SqsFifoPartialProcessor(); + + // Act + const result = processPartialResponseSync( + event, + sqsRecordHandler, + processor, + { + skipGroupOnError: true, + } + ); + + // Assess + expect(result['batchItemFailures'].length).toBe(2); + expect(result['batchItemFailures'][0]['itemIdentifier']).toBe( + thirdRecord.messageId + ); + expect(result['batchItemFailures'][1]['itemIdentifier']).toBe( + fourthRecord.messageId + ); + expect(processor.errors.length).toBe(2); + expect(processor.errors[1]).toBeInstanceOf( + SqsFifoMessageGroupShortCircuitError + ); + }); + + test('When `skipGroupOnError` is true, SQS FIFO Batch processor processes everything with no failures', () => { + // Prepare + const firstRecord = sqsRecordFactory('success', '1'); + const secondRecord = sqsRecordFactory('success', '2'); + const thirdRecord = sqsRecordFactory('success', '3'); + const fourthRecord = sqsRecordFactory('success', '4'); + const event = { + Records: [firstRecord, secondRecord, thirdRecord, fourthRecord], + }; + const processor = new SqsFifoPartialProcessor(); + + // Act + const result = processPartialResponseSync( + event, + sqsRecordHandler, + processor, + { + skipGroupOnError: true, + } + ); + + // Assess + expect(result['batchItemFailures'].length).toBe(0); + expect(processor.errors.length).toBe(0); + }); + + test('When `skipGroupOnError` is false, SQS FIFO Batch processor processes everything with no failures', () => { + // Prepare + const firstRecord = sqsRecordFactory('success', '1'); + const secondRecord = sqsRecordFactory('success', '2'); + const thirdRecord = sqsRecordFactory('success', '3'); + const fourthRecord = sqsRecordFactory('success', '4'); + const event = { + Records: [firstRecord, secondRecord, thirdRecord, fourthRecord], + }; + const processor = new SqsFifoPartialProcessor(); + + // Act + const result = processPartialResponseSync( + event, + sqsRecordHandler, + processor, + { + skipGroupOnError: false, + } + ); + + // Assess + expect(result['batchItemFailures'].length).toBe(0); + expect(processor.errors.length).toBe(0); + }); + + test('When `skipGroupOnError` is false, SQS FIFO Batch processor short circuits the process on first failure', () => { + // Prepare + const firstRecord = sqsRecordFactory('success', '1'); + const secondRecord = sqsRecordFactory('fail', '2'); + const thirdRecord = sqsRecordFactory('success', '3'); + const fourthRecord = sqsRecordFactory('success', '4'); + const event = { + Records: [firstRecord, secondRecord, thirdRecord, fourthRecord], + }; + const processor = new SqsFifoPartialProcessor(); + + // Act + const result = processPartialResponseSync( + event, + sqsRecordHandler, + processor, + { + skipGroupOnError: false, + } + ); + + // Assess + expect(result['batchItemFailures'].length).toBe(3); + expect(result['batchItemFailures'][0]['itemIdentifier']).toBe( + secondRecord.messageId + ); + expect(result['batchItemFailures'][1]['itemIdentifier']).toBe( + thirdRecord.messageId + ); + expect(result['batchItemFailures'][2]['itemIdentifier']).toBe( + fourthRecord.messageId + ); + expect(processor.errors.length).toBe(3); + expect(processor.errors[1]).toBeInstanceOf(SqsFifoShortCircuitError); + }); }); });