From 22ea62c1fb0bd71c51546a34d6fc43bc0c57c3d4 Mon Sep 17 00:00:00 2001 From: liferoad Date: Mon, 18 Nov 2024 13:28:32 -0500 Subject: [PATCH] numpy.int64 type is not serialized correctly in Python 3.11 and Python 3.12 (#33137) * Added the validation to __getstate__ * fix the comments * formatting * fixed the lint * fixes isort --- sdks/python/apache_beam/coders/coder_impl.py | 3 +++ sdks/python/apache_beam/coders/coders_test.py | 16 ++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index dfdb247d781d..55a49f9b2c06 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -510,6 +510,9 @@ def encode_special_deterministic(self, value, stream): stream.write_byte(NESTED_STATE_TYPE) self.encode_type(type(value), stream) state_value = value.__getstate__() + if value is not None and state_value is None: + # https://github.com/apache/beam/issues/33020 + raise TypeError(self._deterministic_encoding_error_msg(value)) try: self.encode_to_stream(state_value, stream, True) except Exception as e: diff --git a/sdks/python/apache_beam/coders/coders_test.py b/sdks/python/apache_beam/coders/coders_test.py index 4d8e8fe9bcb8..dc9780e36be3 100644 --- a/sdks/python/apache_beam/coders/coders_test.py +++ b/sdks/python/apache_beam/coders/coders_test.py @@ -23,11 +23,13 @@ import proto import pytest +import apache_beam as beam from apache_beam import typehints from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message from apache_beam.coders import coders from apache_beam.coders.avro_record import AvroRecord from apache_beam.coders.typecoders import registry as coders_registry +from apache_beam.testing.test_pipeline import TestPipeline class PickleCoderTest(unittest.TestCase): @@ -242,6 +244,20 @@ def test_to_type_hint(self): assert coder.to_type_hint() is bytes +class NumpyIntAsKeyTest(unittest.TestCase): + def test_numpy_int(self): + # this type is not supported as the key + import numpy as np + + with self.assertRaises(TypeError): + with TestPipeline() as p: + indata = p | "Create" >> beam.Create([(a, int(a)) + for a in np.arange(3)]) + + # Apply CombinePerkey to sum values for each key. + _ = indata | "CombinePerKey" >> beam.CombinePerKey(sum) + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main()