Skip to content

Commit

Permalink
feat(batch): add option to not throw FullBatchFailureError when the…
Browse files Browse the repository at this point in the history
… entire batch fails (#2711)

Co-authored-by: Andrea Amorosi <[email protected]>
Co-authored-by: Alexander Schueren <[email protected]>
  • Loading branch information
3 people authored Jul 9, 2024
1 parent 649662a commit 74198ef
Show file tree
Hide file tree
Showing 8 changed files with 333 additions and 5 deletions.
14 changes: 13 additions & 1 deletion docs/utilities/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ All records in the batch will be passed to this handler for processing, even if

* **All records successfully processed**. We will return an empty list of item failures `{'batchItemFailures': []}`
* **Partial success with some exceptions**. We will return a list of all item IDs/sequence numbers that failed processing
* **All records failed to be processed**. We will raise `BatchProcessingError` exception with a list of all exceptions raised when processing
* **All records failed to be processed**. We will throw a `FullBatchFailureError` error with a list of all the errors thrown while processing unless `throwOnFullBatchFailure` is disabled.

The following sequence diagrams explain how each Batch processor behaves under different scenarios.

Expand Down Expand Up @@ -450,6 +450,18 @@ We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lam
--8<-- "examples/snippets/batch/accessLambdaContext.ts"
```

### Working with full batch failures

By default, the `BatchProcessor` will throw a `FullBatchFailureError` if all records in the batch fail to process, we do this to reflect the failure in your operational metrics.

When working with functions that handle batches with a small number of records, or when you use errors as a flow control mechanism, this behavior might not be desirable as your function might generate an unnaturally high number of errors. When this happens, the [Lambda service will scale down the concurrency of your function](https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html#services-sqs-backoff-strategy){target="_blank"}, potentially impacting performance.

For these scenarios, you can set the `throwOnFullBatchFailure` option to `false` when calling.

```typescript hl_lines="17"
--8<-- "examples/snippets/batch/noThrowOnFullBatchFailure.ts"
```

### Extending BatchProcessor

You might want to bring custom logic to the existing `BatchProcessor` to slightly override how we handle successes and failures.
Expand Down
18 changes: 18 additions & 0 deletions examples/snippets/batch/noThrowOnFullBatchFailure.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import {
BatchProcessor,
EventType,
processPartialResponse,
} from '@aws-lambda-powertools/batch';
import type { SQSHandler, SQSRecord } from 'aws-lambda';

const processor = new BatchProcessor(EventType.SQS);

const recordHandler = async (_record: SQSRecord): Promise<void> => {
// Process the record
};

export const handler: SQSHandler = async (event, context) =>
processPartialResponse(event, recordHandler, processor, {
context,
throwOnFullBatchFailure: false,
});
10 changes: 6 additions & 4 deletions packages/batch/src/BasePartialBatchProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,8 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
/**
* Clean up logic to be run after processing a batch
*
* If the entire batch failed, and the utility is not configured otherwise,
* this method will throw a `FullBatchFailureError` with the list of errors
* that occurred during processing.
* If the entire batch failed this method will throw a {@link FullBatchFailureError | `FullBatchFailureError`} with the list of
* errors that occurred during processing, unless the `throwOnFullBatchFailure` option is set to `false`.
*
* Otherwise, it will build the partial failure response based on the event type.
*/
Expand All @@ -74,7 +73,10 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
return;
}

if (this.entireBatchFailed()) {
if (
this.options?.throwOnFullBatchFailure !== false &&
this.entireBatchFailed()
) {
throw new FullBatchFailureError(this.errors);
}

Expand Down
25 changes: 25 additions & 0 deletions packages/batch/src/processPartialResponse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,31 @@ import type {
* });
* ```
*
* By default, if the entire batch fails, the function will throw an error.
* If you want to prevent this behavior, you can set the `throwOnFullBatchFailure` to `false`
*
* @example
* ```typescript
* import {
* BatchProcessor,
* EventType,
* processPartialResponse,
* } from '@aws-lambda-powertools/batch';
* import type { KinesisStreamHandler, KinesisStreamRecord } from 'aws-lambda';
*
* const processor = new BatchProcessor(EventType.KinesisDataStreams);
*
* const recordHandler = async (record: KinesisStreamRecord): Promise<void> => {
* const payload = JSON.parse(record.kinesis.data);
* };
*
* export const handler: KinesisStreamHandler = async (event, context) =>
* processPartialResponse(event, recordHandler, processor, {
* context,
* throwOnFullBatchFailure: false
* });
* ```
*
* @param event The event object containing the batch of records
* @param recordHandler Async function to process each record from the batch
* @param processor Batch processor instance to handle the batch processing
Expand Down
24 changes: 24 additions & 0 deletions packages/batch/src/processPartialResponseSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,30 @@ import type {
* });
* ```
*
* By default, if the entire batch fails, the function will throw an error.
* If you want to prevent this behavior, you can set the `throwOnFullBatchFailure` to `false`
*
* @example
* ```typescript
* import {
* SqsFifoPartialProcessor,
* processPartialResponseSync,
* } from '@aws-lambda-powertools/batch';
* import type { SQSRecord, SQSHandler } from 'aws-lambda';
*
* const processor = new SqsFifoPartialProcessor();
*
* const recordHandler = async (record: SQSRecord): Promise<void> => {
* const payload = JSON.parse(record.body);
* };
*
* export const handler: SQSHandler = async (event, context) =>
* processPartialResponseSync(event, recordHandler, processor, {
* context,
* throwOnFullBatchFailure: false
* });
* ```
*
* @param event The event object containing the batch of records
* @param recordHandler Sync function to process each record from the batch
* @param processor Batch processor instance to handle the batch processing
Expand Down
5 changes: 5 additions & 0 deletions packages/batch/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js';
* @template T The type of the batch processor, defaults to BasePartialBatchProcessor
* @property context The context object provided by the AWS Lambda runtime
* @property skipGroupOnError The option to group on error during processing
* @property throwOnFullBatchFailure The option to throw an error if the entire batch fails
*/
type BatchProcessingOptions<T = BasePartialBatchProcessor> = {
/**
Expand All @@ -25,6 +26,10 @@ type BatchProcessingOptions<T = BasePartialBatchProcessor> = {
* If true skip the group on error during processing.
*/
skipGroupOnError?: T extends SqsFifoPartialProcessor ? boolean : never;
/**
* Set this to false to prevent throwing an error if the entire batch fails.
*/
throwOnFullBatchFailure?: boolean;
};

/**
Expand Down
123 changes: 123 additions & 0 deletions packages/batch/tests/unit/processPartialResponse.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
processPartialResponse,
EventType,
UnexpectedBatchTypeError,
FullBatchFailureError,
} from '../../src/index.js';
import type {
BatchProcessingOptions,
Expand Down Expand Up @@ -90,6 +91,59 @@ describe('Function: processPartialResponse()', () => {
// Assess
expect(ret).toStrictEqual({ batchItemFailures: [] });
});

test('Process partial response function call with asynchronous handler for full batch failure', async () => {
// Prepare
const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')];
const batch = { Records: records };
const processor = new BatchProcessor(EventType.SQS);

// Act & Assess
await expect(
processPartialResponse(batch, asyncSqsRecordHandler, processor)
).rejects.toThrow(FullBatchFailureError);
});

test('Process partial response function call with asynchronous handler for full batch failure when `throwOnFullBatchFailure` is `true`', async () => {
// Prepare
const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')];
const batch = { Records: records };
const processor = new BatchProcessor(EventType.SQS);

// Act & Assess
await expect(
processPartialResponse(batch, asyncSqsRecordHandler, processor, {
...options,
throwOnFullBatchFailure: true,
})
).rejects.toThrow(FullBatchFailureError);
});

test('Process partial response function call with asynchronous handler for full batch failure when `throwOnFullBatchFailure` is `false`', async () => {
// Prepare
const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')];
const batch = { Records: records };
const processor = new BatchProcessor(EventType.SQS);

// Act
const response = await processPartialResponse(
batch,
asyncSqsRecordHandler,
processor,
{
...options,
throwOnFullBatchFailure: false,
}
);

// Assess
expect(response).toStrictEqual({
batchItemFailures: [
{ itemIdentifier: records[0].messageId },
{ itemIdentifier: records[1].messageId },
],
});
});
});

describe('Process partial response function call through handler', () => {
Expand Down Expand Up @@ -228,5 +282,74 @@ describe('Function: processPartialResponse()', () => {
// Assess
expect(result).toStrictEqual({ batchItemFailures: [] });
});

test('Process partial response through handler for full batch failure', async () => {
// Prepare
const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')];
const processor = new BatchProcessor(EventType.SQS);
const event: SQSEvent = { Records: records };

const handler = async (
event: SQSEvent,
_context: Context
): Promise<PartialItemFailureResponse> => {
return processPartialResponse(event, asyncSqsRecordHandler, processor);
};

// Act & Assess
await expect(handler(event, context)).rejects.toThrow(
FullBatchFailureError
);
});

test('Process partial response through handler for full batch failure when `throwOnFullBatchFailure` is `true`', async () => {
// Prepare
const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')];
const processor = new BatchProcessor(EventType.SQS);
const event: SQSEvent = { Records: records };

const handler = async (
event: SQSEvent,
_context: Context
): Promise<PartialItemFailureResponse> => {
return processPartialResponse(event, asyncSqsRecordHandler, processor, {
...options,
throwOnFullBatchFailure: true,
});
};

// Act & Assess
await expect(handler(event, context)).rejects.toThrow(
FullBatchFailureError
);
});

test('Process partial response through handler for full batch failure when `throwOnFullBatchFailure` is `false`', async () => {
// Prepare
const records = [sqsRecordFactory('fail'), sqsRecordFactory('fail')];
const processor = new BatchProcessor(EventType.SQS);
const event: SQSEvent = { Records: records };

const handler = async (
event: SQSEvent,
_context: Context
): Promise<PartialItemFailureResponse> => {
return processPartialResponse(event, asyncSqsRecordHandler, processor, {
...options,
throwOnFullBatchFailure: false,
});
};

// Act
const response = await handler(event, context);

// Assess
expect(response).toStrictEqual({
batchItemFailures: [
{ itemIdentifier: records[0].messageId },
{ itemIdentifier: records[1].messageId },
],
});
});
});
});
Loading

0 comments on commit 74198ef

Please sign in to comment.