diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 0ef0a1366cb..ada05766ab4 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -522,14 +522,19 @@ You might want to bring custom logic to the existing `BatchProcessor` to slightl For these scenarios, you can subclass `BatchProcessor` and quickly override `success_handler` and `failure_handler` methods: -* **`success_handler()`** – Keeps track of successful batch records -* **`failure_handler()`** – Keeps track of failed batch records +* **`success_handler()`** is called for each successfully processed record +* **`failure_handler()`** is called for each failed record -???+ example - Let's suppose you'd like to add a metric named `BatchRecordFailures` for each batch record that failed processing +???+ note + These functions have a common `record` argument. For backward compatibility reasons, their type is not the same: -```python hl_lines="8 9 16-19 22 38" title="Extending failure handling mechanism in BatchProcessor" ---8<-- "examples/batch_processing/src/extending_failure.py" + - `success_handler`: `record` type is `dict[str, Any]`, the raw record data. + - `failure_handler`: `record` type can be an Event Source Data Class or your [Pydantic model](#pydantic-integration). During Pydantic validation errors, we fall back and serialize `record` to Event Source Data Class to not break the processing pipeline. + +Let's suppose you'd like to add metrics to track successes and failures of your batch records. + +```python hl_lines="8-10 18-25 28 44" title="Extending failure handling mechanism in BatchProcessor" +--8<-- "examples/batch_processing/src/extending_processor_handlers.py" ``` ### Create your own partial processor diff --git a/examples/batch_processing/src/extending_failure.py b/examples/batch_processing/src/extending_processor_handlers.py similarity index 79% rename from examples/batch_processing/src/extending_failure.py rename to examples/batch_processing/src/extending_processor_handlers.py index 424c9a5189b..73af5710981 100644 --- a/examples/batch_processing/src/extending_failure.py +++ b/examples/batch_processing/src/extending_processor_handlers.py @@ -1,4 +1,5 @@ import json +from typing import Any from aws_lambda_powertools import Logger, Metrics, Tracer from aws_lambda_powertools.metrics import MetricUnit @@ -9,11 +10,16 @@ FailureResponse, process_partial_response, ) +from aws_lambda_powertools.utilities.batch.base import SuccessResponse from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord from aws_lambda_powertools.utilities.typing import LambdaContext class MyProcessor(BatchProcessor): + def success_handler(self, record: dict[str, Any], result: Any) -> SuccessResponse: + metrics.add_metric(name="BatchRecordSuccesses", unit=MetricUnit.Count, value=1) + return super().success_handler(record, result) + def failure_handler(self, record: SQSRecord, exception: ExceptionInfo) -> FailureResponse: metrics.add_metric(name="BatchRecordFailures", unit=MetricUnit.Count, value=1) return super().failure_handler(record, exception)