Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): Async Processing of Records for for SQS Fifo #3160

Merged
merged 22 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
e0c1a1f
feat: `SqsFifo` mixin class
arnabrahman Oct 6, 2024
c4c7acf
refactor: use `SqsFifo` mixin inside `SqsFifoPartialProcessor`
arnabrahman Oct 6, 2024
c3b2bb4
fix: put back `SqsFifoPartialProcessor` constructor
arnabrahman Oct 6, 2024
85b7b00
feat: `SqsFifoPartialProcessorAsync` for asynchronous FIFO record pro…
arnabrahman Oct 6, 2024
632b8f9
refactor: `BatchProcessingOptions` & `processPartialResponse` for `Sq…
arnabrahman Oct 6, 2024
c298f8b
tests: tests for `SqsFifoPartialProcessorAsync`, similar to `SqsFifoP…
arnabrahman Oct 6, 2024
9cf9cd3
refactor: extract `shouldShortCircuit` & `shouldSkipCurrentGroup` cal…
arnabrahman Oct 6, 2024
1453b0d
style: spacing in the doc
arnabrahman Oct 6, 2024
8c407bc
Merge branch 'main' into 3140-async-fifo-processor
arnabrahman Nov 3, 2024
97a0682
feat: revert to original implementation for `SqsFifoPartialProcessor`
arnabrahman Nov 3, 2024
bf95cd2
refactor: remove `Mixin` from `SqsFifoPartialProcessorAsync`
arnabrahman Nov 3, 2024
a425e81
fix: sonarlint issue for `failedGroupIds`
arnabrahman Nov 3, 2024
f561c1c
doc: async sqs fifo message processing
arnabrahman Nov 5, 2024
ae5c3ce
test: refactor `SqsFifoPartialProcessor` & `SqsFifoPartialProcessorAs…
arnabrahman Nov 5, 2024
f9beef9
refactor: use `SqsFifoProcessingUtils` inside `SqsFifoPartialProcesso…
arnabrahman Nov 7, 2024
b27f651
doc: update doc comments for `failureHandler`
arnabrahman Nov 7, 2024
d23b178
Merge branch 'main' into 3140-async-fifo-processor
am29d Nov 7, 2024
5d9d203
Update packages/batch/src/processPartialResponse.ts
arnabrahman Nov 7, 2024
fe25477
refactor: `SqsFifoProcessor` for functionalities used in fifo process…
arnabrahman Nov 8, 2024
dc345c5
Merge branch '3140-async-fifo-processor' of github.com:arnabrahman/aw…
arnabrahman Nov 8, 2024
8c7a8f1
Merge branch 'main' into 3140-async-fifo-processor
am29d Nov 8, 2024
42e74a2
Merge branch 'main' into 3140-async-fifo-processor
am29d Nov 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions docs/utilities/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,21 +149,27 @@ Enable the `skipGroupOnError` option for seamless processing of messages from va

=== "Recommended"

```typescript hl_lines="1-4 8"
```typescript hl_lines="1-4 8 20"
--8<-- "examples/snippets/batch/gettingStartedSQSFifo.ts"
```

1. **Step 1**. Creates a partial failure batch processor for SQS FIFO queues. See [partial failure mechanics for details](#partial-failure-mechanics)

=== "Async processing"

```typescript hl_lines="1-4 8 20"
--8<-- "examples/snippets/batch/gettingStartedSQSFifoAsync.ts"
```

=== "Enabling skipGroupOnError flag"

```typescript hl_lines="1-4 13 30"
--8<-- "examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts"
```

!!! Note
Note that SqsFifoPartialProcessor is synchronous using `processPartialResponseSync`.
This is because we need to preserve the order of messages in the queue. See [Async or sync processing section](#async-or-sync-processing) for more details.
Note that `SqsFifoPartialProcessor` is synchronous using `processPartialResponseSync`.
If you need asynchronous processing while preserving the order of messages in the queue, use `SqsFifoPartialProcessorAsync` with `processPartialResponse`.

### Processing messages from Kinesis

Expand Down
22 changes: 22 additions & 0 deletions examples/snippets/batch/gettingStartedSQSFifoAsync.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import {
SqsFifoPartialProcessorAsync,
processPartialResponse,
} from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type { SQSHandler, SQSRecord } from 'aws-lambda';

const processor = new SqsFifoPartialProcessorAsync();
const logger = new Logger();

const recordHandler = async (record: SQSRecord): Promise<void> => {
const payload = record.body;
if (payload) {
const item = JSON.parse(payload);
logger.info('Processed item', { item });
}
};

export const handler: SQSHandler = async (event, context) =>
processPartialResponse(event, recordHandler, processor, {
context,
});
165 changes: 165 additions & 0 deletions packages/batch/src/SqsFifoPartialProcessorAsync.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import type { SQSRecord } from 'aws-lambda';
import { BatchProcessor } from './BatchProcessor.js';
import { SqsFifoProcessingUtils } from './SqsFifoProcessingUtils.js';
import { EventType } from './constants.js';
import {
type BatchProcessingError,
SqsFifoMessageGroupShortCircuitError,
SqsFifoShortCircuitError,
} from './errors.js';
import type {
BaseRecord,
EventSourceDataClassTypes,
FailureResponse,
SuccessResponse,
} from './types.js';

/**
* Batch processor for SQS FIFO queues
*
* This class extends the {@link BatchProcessor} class and provides
* a mechanism to process records from SQS FIFO queues asynchronously.
*
* By default, we will stop processing at the first failure and mark unprocessed messages as failed to preserve ordering.
*
* However, this behavior may not be optimal for customers who wish to proceed with processing messages from a different group ID.
*
* @example
* ```typescript
* import {
* BatchProcessor,
* SqsFifoPartialProcessorAsync,
* processPartialResponse,
* } from '@aws-lambda-powertools/batch';
* import type { SQSRecord, SQSHandler } from 'aws-lambda';
*
* const processor = new SqsFifoPartialProcessorAsync();
*
* const recordHandler = async (record: SQSRecord): Promise<void> => {
* const payload = JSON.parse(record.body);
* };
*
* export const handler: SQSHandler = async (event, context) =>
* processPartialResponse(event, recordHandler, processor, {
* context,
* });
* ```
*/
class SqsFifoPartialProcessorAsync extends BatchProcessor {
/**
* Utility class for processing SQS FIFO queues
*/
readonly #utils: SqsFifoProcessingUtils;

public constructor() {
super(EventType.SQS);
this.#utils = new SqsFifoProcessingUtils();
}

/**
* Handles a failure for a given record.
*
* @param record - The record that failed.
* @param exception - The error that occurred.
*/
public failureHandler(
record: EventSourceDataClassTypes,
exception: Error
): FailureResponse {
this.#utils.processFailureForCurrentGroup(this.options);

return super.failureHandler(record, exception);
}

/**
* Process a record with a asynchronous handler
*
* This method orchestrates the processing of a batch of records asynchronously
* for SQS FIFO queues.
*
* The method calls the prepare hook to initialize the processor and then
* iterates over each record in the batch, processing them one by one.
*
* If one of them fails and `skipGroupOnError` is not true, the method short circuits
* the processing and fails the remaining records in the batch.
*
* If one of them fails and `skipGroupOnError` is true, then the method fails the current record
* if the message group has any previous failure, otherwise keeps processing.
*
* Then, it calls the clean hook to clean up the processor and returns the
* processed records.
*/
public async process(): Promise<(SuccessResponse | FailureResponse)[]> {
this.prepare();

const processedRecords: (SuccessResponse | FailureResponse)[] = [];
let currentIndex = 0;
for (const record of this.records) {
this.#utils.setCurrentGroup(
(record as SQSRecord).attributes?.MessageGroupId
);

if (this.#utils.shouldShortCircuit(this.failureMessages, this.options)) {
return this.shortCircuitProcessing(currentIndex, processedRecords);
}

const result = this.#utils.shouldSkipCurrentGroup(this.options)
? this.#processFailRecord(
record,
new SqsFifoMessageGroupShortCircuitError()
)
: await this.processRecord(record);

processedRecords.push(result);
currentIndex++;
}

this.clean();

return processedRecords;
}

/**
* Starting from the first failure index, fail all remaining messages regardless
* of their group ID.
*
* This short circuit mechanism is used when we detect a failed message in the batch.
*
* Since messages in a FIFO queue are processed in order, we must stop processing any
* remaining messages in the batch to prevent out-of-order processing.
*
* @param firstFailureIndex Index of first message that failed
* @param processedRecords Array of response items that have been processed both successfully and unsuccessfully
*/
protected shortCircuitProcessing(
firstFailureIndex: number,
processedRecords: (SuccessResponse | FailureResponse)[]
): (SuccessResponse | FailureResponse)[] {
const remainingRecords = this.records.slice(firstFailureIndex);

for (const record of remainingRecords) {
this.#processFailRecord(record, new SqsFifoShortCircuitError());
}

this.clean();

return processedRecords;
}

/**
* Processes a fail record.
*
* @param record - The record that failed.
* @param exception - The error that occurred.
*/
#processFailRecord(
record: BaseRecord,
exception: BatchProcessingError
): FailureResponse {
const data = this.toBatchType(record, this.eventType);

return this.failureHandler(data, exception);
}
}

export { SqsFifoPartialProcessorAsync };
87 changes: 87 additions & 0 deletions packages/batch/src/SqsFifoProcessingUtils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import type {
BatchProcessingOptions,
EventSourceDataClassTypes,
} from './types.js';

/**
* Utility class to handle processing of SQS FIFO messages.
*/
class SqsFifoProcessingUtils {
/**
* The ID of the current message group being processed.
*/
#currentGroupId?: string;

/**
* A set of group IDs that have already encountered failures.
*/
readonly #failedGroupIds: Set<string>;

public constructor() {
this.#failedGroupIds = new Set<string>();
}

/**
* Adds the specified group ID to the set of failed group IDs.
*
* @param group - The group ID to be added to the set of failed group IDs.
*/
public addToFailedGroup(group: string): void {
this.#failedGroupIds.add(group);
}

/**
* Sets the current group ID for the message being processed.
*
* @param group - The group ID of the current message being processed.
*/
public setCurrentGroup(group?: string): void {
this.#currentGroupId = group;
}

/**
* Determines whether the current group should be short-circuited.
*
* If we have any failed messages, we should then short circuit the process and
* fail remaining messages unless `skipGroupOnError` is true
*
* @param failureMessages - The list of failure messages.
* @param options - The options for the batch processing.
*/
public shouldShortCircuit(
failureMessages: EventSourceDataClassTypes[],
options?: BatchProcessingOptions
): boolean {
return !options?.skipGroupOnError && failureMessages.length !== 0;
}

/**
* Determines whether the current group should be skipped.
*
* If `skipGroupOnError` is true and the current group has previously failed,
* then we should skip processing the current group.
*
* @param options - The options for the batch processing.
*/
public shouldSkipCurrentGroup(options?: BatchProcessingOptions): boolean {
return (
(options?.skipGroupOnError ?? false) &&
this.#currentGroupId &&
this.#failedGroupIds.has(this.#currentGroupId)
);
}

/**
* Handles failure for current group
* Adds the current group ID to the set of failed group IDs if `skipGroupOnError` is true.
*
* @param options - The options for the batch processing.
*/
public processFailureForCurrentGroup(options?: BatchProcessingOptions) {
if (options?.skipGroupOnError && this.#currentGroupId) {
this.addToFailedGroup(this.#currentGroupId);
}
}
}

export { SqsFifoProcessingUtils };
1 change: 1 addition & 0 deletions packages/batch/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ export { BatchProcessor } from './BatchProcessor.js';
export { processPartialResponseSync } from './processPartialResponseSync.js';
export { processPartialResponse } from './processPartialResponse.js';
export { SqsFifoPartialProcessor } from './SqsFifoPartialProcessor.js';
export { SqsFifoPartialProcessorAsync } from './SqsFifoPartialProcessorAsync.js';
8 changes: 4 additions & 4 deletions packages/batch/src/processPartialResponse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ import type {
* @param event The event object containing the batch of records
* @param recordHandler Async function to process each record from the batch
* @param processor Batch processor instance to handle the batch processing
* @param options Batch processing options
* @param options Batch processing options, which can vary with chosen batch processor implementation
*/
const processPartialResponse = async (
const processPartialResponse = async <T extends BasePartialBatchProcessor>(
event: { Records: BaseRecord[] },
recordHandler: CallableFunction,
processor: BasePartialBatchProcessor,
options?: BatchProcessingOptions
processor: T,
options?: BatchProcessingOptions<T>
): Promise<PartialItemFailureResponse> => {
if (!event.Records || !Array.isArray(event.Records)) {
throw new UnexpectedBatchTypeError();
Expand Down
15 changes: 12 additions & 3 deletions packages/batch/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type {
} from 'aws-lambda';
import type { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js';
import type { SqsFifoPartialProcessor } from './SqsFifoPartialProcessor.js';
import type { SqsFifoPartialProcessorAsync } from './SqsFifoPartialProcessorAsync.js';

/**
* Options for batch processing
Expand All @@ -23,10 +24,14 @@ type BatchProcessingOptions<T = BasePartialBatchProcessor> = {
*/
context?: Context;
/**
* This option is only available for SqsFifoPartialProcessor.
* This option is only available for SqsFifoPartialProcessor & SqsFifoPartialProcessorAsync.
* If true skip the group on error during processing.
*/
skipGroupOnError?: T extends SqsFifoPartialProcessor ? boolean : never;
skipGroupOnError?: T extends
| SqsFifoPartialProcessor
| SqsFifoPartialProcessorAsync
? boolean
: never;
/**
* Set this to false to prevent throwing an error if the entire batch fails.
*/
Expand All @@ -36,7 +41,11 @@ type BatchProcessingOptions<T = BasePartialBatchProcessor> = {
* When set to `true`, the records will be processed in parallel using `Promise.all`.
* When set to `false`, the records will be processed sequentially.
*/
processInParallel?: T extends SqsFifoPartialProcessor ? never : boolean;
processInParallel?: T extends
| SqsFifoPartialProcessor
| SqsFifoPartialProcessorAsync
? never
: boolean;
};

/**
Expand Down
Loading