diff --git a/sdks/python/apache_beam/transforms/error_handling_test.py b/sdks/python/apache_beam/transforms/error_handling_test.py index 935446986500..4d8c2d23dc14 100644 --- a/sdks/python/apache_beam/transforms/error_handling_test.py +++ b/sdks/python/apache_beam/transforms/error_handling_test.py @@ -86,6 +86,32 @@ def test_error_handling_pardo(self): assert_that(result, equal_to(['A', 'Bb']), label='CheckGood') assert_that(error_pcoll, equal_to(['error: cccc']), label='CheckBad') + def test_error_handling_pardo_with_exception_handling_kwargs(self): + def side_effect(*args): + beam._test_error_handling_pardo_with_exception_handling_kwargs_val = True + + def check_side_effect(): + return getattr( + beam, + '_test_error_handling_pardo_with_exception_handling_kwargs_val', + False) + + self.assertFalse(check_side_effect()) + + with beam.Pipeline() as p: + pcoll = p | beam.Create(['a', 'bb', 'cccc']) + with error_handling.ErrorHandler( + beam.Map(lambda x: "error: %s" % x[0])) as error_handler: + result = pcoll | beam.Map( + exception_throwing_map, limit=3).with_error_handler( + error_handler, on_failure_callback=side_effect) + error_pcoll = error_handler.output() + + assert_that(result, equal_to(['A', 'Bb']), label='CheckGood') + assert_that(error_pcoll, equal_to(['error: cccc']), label='CheckBad') + + self.assertTrue(check_side_effect()) + def test_error_on_unclosed_error_handler(self): with self.assertRaisesRegex(RuntimeError, r'.*Unclosed error handler.*'): with beam.Pipeline() as p: