diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 38fc1ca75fc..2a7a956a19d 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -85,7 +85,7 @@ def success_handler(self, record: Any, result: Any): self.success_messages.append(record) return entry - def failure_handler(self, record: Any, exception: Exception): + def failure_handler(self, record: Any, exception: Tuple): """ Failure callback @@ -94,8 +94,9 @@ def failure_handler(self, record: Any, exception: Exception): tuple "fail", exceptions args, original record """ - entry = ("fail", exception.args, record) - logger.debug(f"Record processing exception: {exception}") + exception_string = f"{exception[0]}:{exception[1]}" + entry = ("fail", exception_string, record) + logger.debug(f"Record processing exception: {exception_string}") self.exceptions.append(exception) self.fail_messages.append(record) return entry diff --git a/aws_lambda_powertools/utilities/batch/exceptions.py b/aws_lambda_powertools/utilities/batch/exceptions.py index 3e456eacec4..c2ead04a7b1 100644 --- a/aws_lambda_powertools/utilities/batch/exceptions.py +++ b/aws_lambda_powertools/utilities/batch/exceptions.py @@ -1,7 +1,25 @@ """ Batch processing exceptions """ +import traceback class SQSBatchProcessingError(Exception): """When at least one message within a batch could not be processed""" + + def __init__(self, msg="", child_exceptions=()): + super().__init__(msg) + self.msg = msg + self.child_exceptions = child_exceptions + + # Overriding this method so we can output all child exception tracebacks when we raise this exception to prevent + # errors being lost. See https://github.com/awslabs/aws-lambda-powertools-python/issues/275 + def __str__(self): + parent_exception_str = super(SQSBatchProcessingError, self).__str__() + exception_list = [f"{parent_exception_str}\n"] + for exception in self.child_exceptions: + extype, ex, tb = exception + formatted = "".join(traceback.format_exception(extype, ex, tb)) + exception_list.append(formatted) + + return "\n".join(exception_list) diff --git a/aws_lambda_powertools/utilities/batch/sqs.py b/aws_lambda_powertools/utilities/batch/sqs.py index 7da8ab52288..82c24ece10a 100644 --- a/aws_lambda_powertools/utilities/batch/sqs.py +++ b/aws_lambda_powertools/utilities/batch/sqs.py @@ -4,6 +4,7 @@ Batch SQS utilities """ import logging +import sys from typing import Callable, Dict, List, Optional, Tuple import boto3 @@ -90,10 +91,10 @@ def _process_record(self, record) -> Tuple: An object to be processed. """ try: - result = self.handler(record) - return self.success_handler(record, result) - except Exception as exc: - return self.failure_handler(record, exc) + result = self.handler(record=record) + return self.success_handler(record=record, result=result) + except Exception: + return self.failure_handler(record=record, exception=sys.exc_info()) def _prepare(self): """ @@ -123,7 +124,11 @@ def _clean(self): logger.debug(f"{len(self.fail_messages)} records failed processing, but exceptions are suppressed") else: logger.debug(f"{len(self.fail_messages)} records failed processing, raising exception") - raise SQSBatchProcessingError(list(self.exceptions)) + raise SQSBatchProcessingError( + msg=f"Not all records processed succesfully. {len(self.exceptions)} individual errors logged " + f"separately below.", + child_exceptions=self.exceptions, + ) return delete_message_response diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index 234f90e9f16..a453f0bfe07 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -85,7 +85,7 @@ def test_partial_sqs_processor_context_with_failure(sqs_event_factory, record_ha with partial_processor(records, record_handler) as ctx: ctx.process() - assert len(error.value.args[0]) == 1 + assert len(error.value.child_exceptions) == 1 stubber.assert_no_pending_responses() @@ -144,7 +144,7 @@ def lambda_handler(event, context): with pytest.raises(SQSBatchProcessingError) as error: lambda_handler(event, {}) - assert len(error.value.args[0]) == 2 + assert len(error.value.child_exceptions) == 2 stubber.assert_no_pending_responses() @@ -171,7 +171,7 @@ def lambda_handler(event, context): with pytest.raises(SQSBatchProcessingError) as error: lambda_handler(event, {}) - assert len(error.value.args[0]) == 1 + assert len(error.value.child_exceptions) == 1 stubber.assert_no_pending_responses() @@ -203,7 +203,7 @@ def lambda_handler(event, context): stubber.assert_no_pending_responses() - assert len(error.value.args[0]) == 1 + assert len(error.value.child_exceptions) == 1 assert capsys.readouterr().out == "Oh no ! It's a failure.\n" @@ -289,4 +289,4 @@ def test_partial_sqs_processor_context_only_failure(sqs_event_factory, record_ha with partial_processor(records, record_handler) as ctx: ctx.process() - assert len(error.value.args[0]) == 2 + assert len(error.value.child_exceptions) == 2 diff --git a/tests/unit/test_utilities_batch.py b/tests/unit/test_utilities_batch.py index 136e6ff2e8c..c491f0829cb 100644 --- a/tests/unit/test_utilities_batch.py +++ b/tests/unit/test_utilities_batch.py @@ -81,8 +81,8 @@ def test_partial_sqs_process_record_success(mocker, partial_sqs_processor): result = partial_sqs_processor._process_record(record) - handler_mock.assert_called_once_with(record) - success_handler_mock.assert_called_once_with(record, success_result) + handler_mock.assert_called_once_with(record=record) + success_handler_mock.assert_called_once_with(record=record, result=success_result) assert result == expected_value @@ -98,9 +98,13 @@ def test_partial_sqs_process_record_failure(mocker, partial_sqs_processor): result = partial_sqs_processor._process_record(record) - handler_mock.assert_called_once_with(record) - failure_handler_mock.assert_called_once_with(record, failure_result) + handler_mock.assert_called_once_with(record=record) + _, failure_handler_called_with_args = failure_handler_mock.call_args + failure_handler_mock.assert_called_once() + assert (failure_handler_called_with_args["record"]) == record + assert isinstance(failure_handler_called_with_args["exception"], tuple) + assert failure_handler_called_with_args["exception"][1] == failure_result assert result == expected_value