From e73b575b784b7a59ca8cde02d4ca51ec97789d19 Mon Sep 17 00:00:00 2001 From: Asifur Rahman Date: Fri, 8 Nov 2024 15:59:52 +0600 Subject: [PATCH] feat(batch): Async Processing of Records for for SQS Fifo (#3160) Co-authored-by: Alexander Schueren Co-authored-by: Alexander Schueren --- docs/utilities/batch.md | 12 +- .../batch/gettingStartedSQSFifoAsync.ts | 22 ++ packages/batch/src/SqsFifoPartialProcessor.ts | 54 +--- .../batch/src/SqsFifoPartialProcessorAsync.ts | 167 ++++++++++ packages/batch/src/SqsFifoProcessor.ts | 89 ++++++ packages/batch/src/index.ts | 1 + packages/batch/src/processPartialResponse.ts | 8 +- packages/batch/src/types.ts | 15 +- .../unit/SqsFifoPartialProcessor.test.ts | 291 ++++++++++-------- 9 files changed, 478 insertions(+), 181 deletions(-) create mode 100644 examples/snippets/batch/gettingStartedSQSFifoAsync.ts create mode 100644 packages/batch/src/SqsFifoPartialProcessorAsync.ts create mode 100644 packages/batch/src/SqsFifoProcessor.ts diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index ec0e4ba108..dd019c2bf8 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -149,12 +149,18 @@ Enable the `skipGroupOnError` option for seamless processing of messages from va === "Recommended" - ```typescript hl_lines="1-4 8" + ```typescript hl_lines="1-4 8 20" --8<-- "examples/snippets/batch/gettingStartedSQSFifo.ts" ``` 1. **Step 1**. Creates a partial failure batch processor for SQS FIFO queues. See [partial failure mechanics for details](#partial-failure-mechanics) +=== "Async processing" + + ```typescript hl_lines="1-4 8 20" + --8<-- "examples/snippets/batch/gettingStartedSQSFifoAsync.ts" + ``` + === "Enabling skipGroupOnError flag" ```typescript hl_lines="1-4 13 30" @@ -162,8 +168,8 @@ Enable the `skipGroupOnError` option for seamless processing of messages from va ``` !!! Note - Note that SqsFifoPartialProcessor is synchronous using `processPartialResponseSync`. - This is because we need to preserve the order of messages in the queue. See [Async or sync processing section](#async-or-sync-processing) for more details. + Note that `SqsFifoPartialProcessor` is synchronous using `processPartialResponseSync`. + If you need asynchronous processing while preserving the order of messages in the queue, use `SqsFifoPartialProcessorAsync` with `processPartialResponse`. ### Processing messages from Kinesis diff --git a/examples/snippets/batch/gettingStartedSQSFifoAsync.ts b/examples/snippets/batch/gettingStartedSQSFifoAsync.ts new file mode 100644 index 0000000000..5920480cf6 --- /dev/null +++ b/examples/snippets/batch/gettingStartedSQSFifoAsync.ts @@ -0,0 +1,22 @@ +import { + SqsFifoPartialProcessorAsync, + processPartialResponse, +} from '@aws-lambda-powertools/batch'; +import { Logger } from '@aws-lambda-powertools/logger'; +import type { SQSHandler, SQSRecord } from 'aws-lambda'; + +const processor = new SqsFifoPartialProcessorAsync(); +const logger = new Logger(); + +const recordHandler = async (record: SQSRecord): Promise => { + const payload = record.body; + if (payload) { + const item = JSON.parse(payload); + logger.info('Processed item', { item }); + } +}; + +export const handler: SQSHandler = async (event, context) => + processPartialResponse(event, recordHandler, processor, { + context, + }); diff --git a/packages/batch/src/SqsFifoPartialProcessor.ts b/packages/batch/src/SqsFifoPartialProcessor.ts index 6a9546fcb6..5854e75e42 100644 --- a/packages/batch/src/SqsFifoPartialProcessor.ts +++ b/packages/batch/src/SqsFifoPartialProcessor.ts @@ -1,5 +1,6 @@ import type { SQSRecord } from 'aws-lambda'; import { BatchProcessorSync } from './BatchProcessorSync.js'; +import { SqsFifoProcessor } from './SqsFifoProcessor.js'; import { EventType } from './constants.js'; import { type BatchProcessingError, @@ -46,17 +47,13 @@ import type { */ class SqsFifoPartialProcessor extends BatchProcessorSync { /** - * The ID of the current message group being processed. + * Processor for handling SQS FIFO message */ - #currentGroupId?: string; - /** - * A set of group IDs that have already encountered failures. - */ - #failedGroupIds: Set; + readonly #processor: SqsFifoProcessor; public constructor() { super(EventType.SQS); - this.#failedGroupIds = new Set(); + this.#processor = new SqsFifoProcessor(); } /** @@ -70,9 +67,7 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { record: EventSourceDataClassTypes, exception: Error ): FailureResponse { - if (this.options?.skipGroupOnError && this.#currentGroupId) { - this.#addToFailedGroup(this.#currentGroupId); - } + this.#processor.processFailureForCurrentGroup(this.options); return super.failureHandler(record, exception); } @@ -101,24 +96,17 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { const processedRecords: (SuccessResponse | FailureResponse)[] = []; let currentIndex = 0; for (const record of this.records) { - this.#setCurrentGroup((record as SQSRecord).attributes?.MessageGroupId); + this.#processor.setCurrentGroup( + (record as SQSRecord).attributes?.MessageGroupId + ); - // If we have any failed messages, we should then short circuit the process and - // fail remaining messages unless `skipGroupOnError` is true - const shouldShortCircuit = - !this.options?.skipGroupOnError && this.failureMessages.length !== 0; - if (shouldShortCircuit) { + if ( + this.#processor.shouldShortCircuit(this.failureMessages, this.options) + ) { return this.shortCircuitProcessing(currentIndex, processedRecords); } - // If `skipGroupOnError` is true and the current group has previously failed, - // then we should skip processing the current group. - const shouldSkipCurrentGroup = - this.options?.skipGroupOnError && - this.#currentGroupId && - this.#failedGroupIds.has(this.#currentGroupId); - - const result = shouldSkipCurrentGroup + const result = this.#processor.shouldSkipCurrentGroup(this.options) ? this.#processFailRecord( record, new SqsFifoMessageGroupShortCircuitError() @@ -161,15 +149,6 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { return processedRecords; } - /** - * Adds the specified group ID to the set of failed group IDs. - * - * @param group - The group ID to be added to the set of failed group IDs. - */ - #addToFailedGroup(group: string): void { - this.#failedGroupIds.add(group); - } - /** * Processes a fail record. * @@ -184,15 +163,6 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { return this.failureHandler(data, exception); } - - /** - * Sets the current group ID for the message being processed. - * - * @param group - The group ID of the current message being processed. - */ - #setCurrentGroup(group?: string): void { - this.#currentGroupId = group; - } } export { SqsFifoPartialProcessor }; diff --git a/packages/batch/src/SqsFifoPartialProcessorAsync.ts b/packages/batch/src/SqsFifoPartialProcessorAsync.ts new file mode 100644 index 0000000000..98d47a3bb1 --- /dev/null +++ b/packages/batch/src/SqsFifoPartialProcessorAsync.ts @@ -0,0 +1,167 @@ +import type { SQSRecord } from 'aws-lambda'; +import { BatchProcessor } from './BatchProcessor.js'; +import { SqsFifoProcessor } from './SqsFifoProcessor.js'; +import { EventType } from './constants.js'; +import { + type BatchProcessingError, + SqsFifoMessageGroupShortCircuitError, + SqsFifoShortCircuitError, +} from './errors.js'; +import type { + BaseRecord, + EventSourceDataClassTypes, + FailureResponse, + SuccessResponse, +} from './types.js'; + +/** + * Batch processor for SQS FIFO queues + * + * This class extends the {@link BatchProcessor} class and provides + * a mechanism to process records from SQS FIFO queues asynchronously. + * + * By default, we will stop processing at the first failure and mark unprocessed messages as failed to preserve ordering. + * + * However, this behavior may not be optimal for customers who wish to proceed with processing messages from a different group ID. + * + * @example + * ```typescript + * import { + * BatchProcessor, + * SqsFifoPartialProcessorAsync, + * processPartialResponse, + * } from '@aws-lambda-powertools/batch'; + * import type { SQSRecord, SQSHandler } from 'aws-lambda'; + * + * const processor = new SqsFifoPartialProcessorAsync(); + * + * const recordHandler = async (record: SQSRecord): Promise => { + * const payload = JSON.parse(record.body); + * }; + * + * export const handler: SQSHandler = async (event, context) => + * processPartialResponse(event, recordHandler, processor, { + * context, + * }); + * ``` + */ +class SqsFifoPartialProcessorAsync extends BatchProcessor { + /** + * Processor for handling SQS FIFO message + */ + readonly #processor: SqsFifoProcessor; + + public constructor() { + super(EventType.SQS); + this.#processor = new SqsFifoProcessor(); + } + + /** + * Handles a failure for a given record. + * + * @param record - The record that failed. + * @param exception - The error that occurred. + */ + public failureHandler( + record: EventSourceDataClassTypes, + exception: Error + ): FailureResponse { + this.#processor.processFailureForCurrentGroup(this.options); + + return super.failureHandler(record, exception); + } + + /** + * Process a record with a asynchronous handler + * + * This method orchestrates the processing of a batch of records asynchronously + * for SQS FIFO queues. + * + * The method calls the prepare hook to initialize the processor and then + * iterates over each record in the batch, processing them one by one. + * + * If one of them fails and `skipGroupOnError` is not true, the method short circuits + * the processing and fails the remaining records in the batch. + * + * If one of them fails and `skipGroupOnError` is true, then the method fails the current record + * if the message group has any previous failure, otherwise keeps processing. + * + * Then, it calls the clean hook to clean up the processor and returns the + * processed records. + */ + public async process(): Promise<(SuccessResponse | FailureResponse)[]> { + this.prepare(); + + const processedRecords: (SuccessResponse | FailureResponse)[] = []; + let currentIndex = 0; + for (const record of this.records) { + this.#processor.setCurrentGroup( + (record as SQSRecord).attributes?.MessageGroupId + ); + + if ( + this.#processor.shouldShortCircuit(this.failureMessages, this.options) + ) { + return this.shortCircuitProcessing(currentIndex, processedRecords); + } + + const result = this.#processor.shouldSkipCurrentGroup(this.options) + ? this.#processFailRecord( + record, + new SqsFifoMessageGroupShortCircuitError() + ) + : await this.processRecord(record); + + processedRecords.push(result); + currentIndex++; + } + + this.clean(); + + return processedRecords; + } + + /** + * Starting from the first failure index, fail all remaining messages regardless + * of their group ID. + * + * This short circuit mechanism is used when we detect a failed message in the batch. + * + * Since messages in a FIFO queue are processed in order, we must stop processing any + * remaining messages in the batch to prevent out-of-order processing. + * + * @param firstFailureIndex Index of first message that failed + * @param processedRecords Array of response items that have been processed both successfully and unsuccessfully + */ + protected shortCircuitProcessing( + firstFailureIndex: number, + processedRecords: (SuccessResponse | FailureResponse)[] + ): (SuccessResponse | FailureResponse)[] { + const remainingRecords = this.records.slice(firstFailureIndex); + + for (const record of remainingRecords) { + this.#processFailRecord(record, new SqsFifoShortCircuitError()); + } + + this.clean(); + + return processedRecords; + } + + /** + * Processes a fail record. + * + * @param record - The record that failed. + * @param exception - The error that occurred. + */ + #processFailRecord( + record: BaseRecord, + exception: BatchProcessingError + ): FailureResponse { + const data = this.toBatchType(record, this.eventType); + + return this.failureHandler(data, exception); + } +} + +export { SqsFifoPartialProcessorAsync }; diff --git a/packages/batch/src/SqsFifoProcessor.ts b/packages/batch/src/SqsFifoProcessor.ts new file mode 100644 index 0000000000..d24510eba1 --- /dev/null +++ b/packages/batch/src/SqsFifoProcessor.ts @@ -0,0 +1,89 @@ +import type { + BatchProcessingOptions, + EventSourceDataClassTypes, +} from './types.js'; + +/** + * Class representing a processor for SQS FIFO messages. + * This class provides utilities for handling message groups, including tracking failed groups, + * determining whether to short-circuit processing, and skipping groups based on processing options. + */ +class SqsFifoProcessor { + /** + * The ID of the current message group being processed. + */ + #currentGroupId?: string; + + /** + * A set of group IDs that have already encountered failures. + */ + readonly #failedGroupIds: Set; + + public constructor() { + this.#failedGroupIds = new Set(); + } + + /** + * Adds the specified group ID to the set of failed group IDs. + * + * @param group - The group ID to be added to the set of failed group IDs. + */ + public addToFailedGroup(group: string): void { + this.#failedGroupIds.add(group); + } + + /** + * Sets the current group ID for the message being processed. + * + * @param group - The group ID of the current message being processed. + */ + public setCurrentGroup(group?: string): void { + this.#currentGroupId = group; + } + + /** + * Determines whether the current group should be short-circuited. + * + * If we have any failed messages, we should then short circuit the process and + * fail remaining messages unless `skipGroupOnError` is true + * + * @param failureMessages - The list of failure messages. + * @param options - The options for the batch processing. + */ + public shouldShortCircuit( + failureMessages: EventSourceDataClassTypes[], + options?: BatchProcessingOptions + ): boolean { + return !options?.skipGroupOnError && failureMessages.length !== 0; + } + + /** + * Determines whether the current group should be skipped. + * + * If `skipGroupOnError` is true and the current group has previously failed, + * then we should skip processing the current group. + * + * @param options - The options for the batch processing. + */ + public shouldSkipCurrentGroup(options?: BatchProcessingOptions): boolean { + return ( + (options?.skipGroupOnError ?? false) && + this.#currentGroupId && + this.#failedGroupIds.has(this.#currentGroupId) + ); + } + + /** + * Handles failure for current group + * Adds the current group ID to the set of failed group IDs if `skipGroupOnError` is true. + * + * @param options - The options for the batch processing. + */ + public processFailureForCurrentGroup(options?: BatchProcessingOptions) { + if (options?.skipGroupOnError && this.#currentGroupId) { + this.addToFailedGroup(this.#currentGroupId); + } + } +} + +export { SqsFifoProcessor }; diff --git a/packages/batch/src/index.ts b/packages/batch/src/index.ts index 499202f722..ceb2b114fb 100644 --- a/packages/batch/src/index.ts +++ b/packages/batch/src/index.ts @@ -12,3 +12,4 @@ export { BatchProcessor } from './BatchProcessor.js'; export { processPartialResponseSync } from './processPartialResponseSync.js'; export { processPartialResponse } from './processPartialResponse.js'; export { SqsFifoPartialProcessor } from './SqsFifoPartialProcessor.js'; +export { SqsFifoPartialProcessorAsync } from './SqsFifoPartialProcessorAsync.js'; diff --git a/packages/batch/src/processPartialResponse.ts b/packages/batch/src/processPartialResponse.ts index daba2e862a..d50fc478df 100644 --- a/packages/batch/src/processPartialResponse.ts +++ b/packages/batch/src/processPartialResponse.ts @@ -70,13 +70,13 @@ import type { * @param event The event object containing the batch of records * @param recordHandler Async function to process each record from the batch * @param processor Batch processor instance to handle the batch processing - * @param options Batch processing options + * @param options Batch processing options, see {{@link BatchProcessingOptions}} */ -const processPartialResponse = async ( +const processPartialResponse = async ( event: { Records: BaseRecord[] }, recordHandler: CallableFunction, - processor: BasePartialBatchProcessor, - options?: BatchProcessingOptions + processor: T, + options?: BatchProcessingOptions ): Promise => { if (!event.Records || !Array.isArray(event.Records)) { throw new UnexpectedBatchTypeError(); diff --git a/packages/batch/src/types.ts b/packages/batch/src/types.ts index 3748c03e95..e94b811f6c 100644 --- a/packages/batch/src/types.ts +++ b/packages/batch/src/types.ts @@ -6,6 +6,7 @@ import type { } from 'aws-lambda'; import type { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js'; import type { SqsFifoPartialProcessor } from './SqsFifoPartialProcessor.js'; +import type { SqsFifoPartialProcessorAsync } from './SqsFifoPartialProcessorAsync.js'; /** * Options for batch processing @@ -23,10 +24,14 @@ type BatchProcessingOptions = { */ context?: Context; /** - * This option is only available for SqsFifoPartialProcessor. + * This option is only available for SqsFifoPartialProcessor & SqsFifoPartialProcessorAsync. * If true skip the group on error during processing. */ - skipGroupOnError?: T extends SqsFifoPartialProcessor ? boolean : never; + skipGroupOnError?: T extends + | SqsFifoPartialProcessor + | SqsFifoPartialProcessorAsync + ? boolean + : never; /** * Set this to false to prevent throwing an error if the entire batch fails. */ @@ -36,7 +41,11 @@ type BatchProcessingOptions = { * When set to `true`, the records will be processed in parallel using `Promise.all`. * When set to `false`, the records will be processed sequentially. */ - processInParallel?: T extends SqsFifoPartialProcessor ? never : boolean; + processInParallel?: T extends + | SqsFifoPartialProcessor + | SqsFifoPartialProcessorAsync + ? never + : boolean; }; /** diff --git a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts index 7eec6164de..fc871ecf39 100644 --- a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts +++ b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts @@ -2,13 +2,42 @@ import { afterAll, beforeEach, describe, expect, it, vi } from 'vitest'; import { SqsFifoMessageGroupShortCircuitError, SqsFifoPartialProcessor, + SqsFifoPartialProcessorAsync, SqsFifoShortCircuitError, + processPartialResponse, processPartialResponseSync, } from '../../src/index.js'; +import type { PartialItemFailureResponse } from '../../src/types.js'; import { sqsRecordFactory } from '../helpers/factories.js'; import { sqsRecordHandler } from '../helpers/handlers.js'; -describe('Class: SqsFifoBatchProcessor', () => { +type ProcessorConfig = { + name: string; + processorClass: + | typeof SqsFifoPartialProcessor + | typeof SqsFifoPartialProcessorAsync; + processFunction: + | typeof processPartialResponse + | typeof processPartialResponseSync; + isAsync: boolean; +}; + +const processors: ProcessorConfig[] = [ + { + name: 'Synchronous', + processorClass: SqsFifoPartialProcessor, + processFunction: processPartialResponseSync, + isAsync: false, + }, + { + name: 'Asynchronous', + processorClass: SqsFifoPartialProcessorAsync, + processFunction: processPartialResponse, + isAsync: true, + }, +]; + +describe('SQS FIFO Processors', () => { const ENVIRONMENT_VARIABLES = process.env; beforeEach(() => { @@ -20,136 +49,140 @@ describe('Class: SqsFifoBatchProcessor', () => { process.env = ENVIRONMENT_VARIABLES; }); - describe('Synchronous SQS FIFO batch processing', () => { - it('completes processing with no failures', async () => { - // Prepare - const firstRecord = sqsRecordFactory('success'); - const secondRecord = sqsRecordFactory('success'); - const event = { Records: [firstRecord, secondRecord] }; - const processor = new SqsFifoPartialProcessor(); - - // Act - const result = processPartialResponseSync( - event, - sqsRecordHandler, - processor - ); - - // Assess - expect(result.batchItemFailures).toStrictEqual([]); - }); + for (const { name, processorClass, processFunction, isAsync } of processors) { + describe(`${name} SQS FIFO batch processing`, () => { + it('completes processing with no failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('success'); + const event = { Records: [firstRecord, secondRecord] }; + const processor = new processorClass(); - it('completes processing with some failures', async () => { - // Prepare - const firstRecord = sqsRecordFactory('success'); - const secondRecord = sqsRecordFactory('fail'); - const thirdRecord = sqsRecordFactory('success'); - const event = { Records: [firstRecord, secondRecord, thirdRecord] }; - const processor = new SqsFifoPartialProcessor(); - - // Act - const result = processPartialResponseSync( - event, - sqsRecordHandler, - processor - ); - - // Assess - expect(result.batchItemFailures.length).toBe(2); - expect(result.batchItemFailures[0].itemIdentifier).toBe( - secondRecord.messageId - ); - expect(result.batchItemFailures[1].itemIdentifier).toBe( - thirdRecord.messageId - ); - expect(processor.errors[1]).toBeInstanceOf(SqsFifoShortCircuitError); - }); + // Act + const result = isAsync + ? await processFunction(event, sqsRecordHandler, processor) + : (processFunction( + event, + sqsRecordHandler, + processor + ) as PartialItemFailureResponse); - it('continues processing and moves to the next group when `skipGroupOnError` is true', () => { - // Prepare - const firstRecord = sqsRecordFactory('fail', '1'); - const secondRecord = sqsRecordFactory('success', '1'); - const thirdRecord = sqsRecordFactory('fail', '2'); - const fourthRecord = sqsRecordFactory('success', '2'); - const fifthRecord = sqsRecordFactory('success', '3'); - const event = { - Records: [ - firstRecord, - secondRecord, - thirdRecord, - fourthRecord, - fifthRecord, - ], - }; - const processor = new SqsFifoPartialProcessor(); - - // Act - const result = processPartialResponseSync( - event, - sqsRecordHandler, - processor, - { - skipGroupOnError: true, - } - ); - - // Assess - expect(result.batchItemFailures.length).toBe(4); - expect(result.batchItemFailures[0].itemIdentifier).toBe( - firstRecord.messageId - ); - expect(result.batchItemFailures[1].itemIdentifier).toBe( - secondRecord.messageId - ); - expect(result.batchItemFailures[2].itemIdentifier).toBe( - thirdRecord.messageId - ); - expect(result.batchItemFailures[3].itemIdentifier).toBe( - fourthRecord.messageId - ); - expect(processor.errors.length).toBe(4); - expect(processor.errors[1]).toBeInstanceOf( - SqsFifoMessageGroupShortCircuitError - ); - expect(processor.errors[3]).toBeInstanceOf( - SqsFifoMessageGroupShortCircuitError - ); - }); + // Assess + expect(result.batchItemFailures).toStrictEqual([]); + }); + + it('completes processing with some failures', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success'); + const secondRecord = sqsRecordFactory('fail'); + const thirdRecord = sqsRecordFactory('success'); + const event = { Records: [firstRecord, secondRecord, thirdRecord] }; + const processor = new processorClass(); + + // Act + const result = isAsync + ? await processFunction(event, sqsRecordHandler, processor) + : (processFunction( + event, + sqsRecordHandler, + processor + ) as PartialItemFailureResponse); - it('short circuits on the first failure when `skipGroupOnError` is false', () => { - // Prepare - const firstRecord = sqsRecordFactory('success', '1'); - const secondRecord = sqsRecordFactory('fail', '2'); - const thirdRecord = sqsRecordFactory('success', '3'); - const fourthRecord = sqsRecordFactory('success', '4'); - const event = { - Records: [firstRecord, secondRecord, thirdRecord, fourthRecord], - }; - const processor = new SqsFifoPartialProcessor(); - - // Act - const result = processPartialResponseSync( - event, - sqsRecordHandler, - processor, - { - skipGroupOnError: false, - } - ); - - // Assess - expect(result.batchItemFailures.length).toBe(3); - expect(result.batchItemFailures[0].itemIdentifier).toBe( - secondRecord.messageId - ); - expect(result.batchItemFailures[1].itemIdentifier).toBe( - thirdRecord.messageId - ); - expect(result.batchItemFailures[2].itemIdentifier).toBe( - fourthRecord.messageId - ); - expect(processor.errors.length).toBe(3); - expect(processor.errors[1]).toBeInstanceOf(SqsFifoShortCircuitError); + // Assess + expect(result.batchItemFailures.length).toBe(2); + expect(result.batchItemFailures[0].itemIdentifier).toBe( + secondRecord.messageId + ); + expect(result.batchItemFailures[1].itemIdentifier).toBe( + thirdRecord.messageId + ); + expect(processor.errors[1]).toBeInstanceOf(SqsFifoShortCircuitError); + }); + + it('continues processing and moves to the next group when `skipGroupOnError` is true', async () => { + // Prepare + const firstRecord = sqsRecordFactory('fail', '1'); + const secondRecord = sqsRecordFactory('success', '1'); + const thirdRecord = sqsRecordFactory('fail', '2'); + const fourthRecord = sqsRecordFactory('success', '2'); + const fifthRecord = sqsRecordFactory('success', '3'); + const event = { + Records: [ + firstRecord, + secondRecord, + thirdRecord, + fourthRecord, + fifthRecord, + ], + }; + const processor = new processorClass(); + + // Act + const result = isAsync + ? await processFunction(event, sqsRecordHandler, processor, { + skipGroupOnError: true, + }) + : (processFunction(event, sqsRecordHandler, processor, { + skipGroupOnError: true, + }) as PartialItemFailureResponse); + + // Assess + expect(result.batchItemFailures.length).toBe(4); + expect(result.batchItemFailures[0].itemIdentifier).toBe( + firstRecord.messageId + ); + expect(result.batchItemFailures[1].itemIdentifier).toBe( + secondRecord.messageId + ); + expect(result.batchItemFailures[2].itemIdentifier).toBe( + thirdRecord.messageId + ); + expect(result.batchItemFailures[3].itemIdentifier).toBe( + fourthRecord.messageId + ); + expect(processor.errors.length).toBe(4); + expect(processor.errors[1]).toBeInstanceOf( + SqsFifoMessageGroupShortCircuitError + ); + expect(processor.errors[3]).toBeInstanceOf( + SqsFifoMessageGroupShortCircuitError + ); + }); + + it('short circuits on the first failure when `skipGroupOnError` is false', async () => { + // Prepare + const firstRecord = sqsRecordFactory('success', '1'); + const secondRecord = sqsRecordFactory('fail', '2'); + const thirdRecord = sqsRecordFactory('success', '3'); + const fourthRecord = sqsRecordFactory('success', '4'); + const event = { + Records: [firstRecord, secondRecord, thirdRecord, fourthRecord], + }; + const processor = new processorClass(); + + // Act + const result = isAsync + ? await processFunction(event, sqsRecordHandler, processor, { + skipGroupOnError: false, + }) + : (processFunction(event, sqsRecordHandler, processor, { + skipGroupOnError: false, + }) as PartialItemFailureResponse); + + // Assess + expect(result.batchItemFailures.length).toBe(3); + expect(result.batchItemFailures[0].itemIdentifier).toBe( + secondRecord.messageId + ); + expect(result.batchItemFailures[1].itemIdentifier).toBe( + thirdRecord.messageId + ); + expect(result.batchItemFailures[2].itemIdentifier).toBe( + fourthRecord.messageId + ); + expect(processor.errors.length).toBe(3); + expect(processor.errors[1]).toBeInstanceOf(SqsFifoShortCircuitError); + }); }); - }); + } });