From 2edbc3219ef945cb8692968fb5e0303e63780c9c Mon Sep 17 00:00:00 2001 From: liferoad Date: Sat, 16 Nov 2024 20:12:49 -0500 Subject: [PATCH 1/5] Added the validation to __getstate__ --- sdks/python/apache_beam/coders/coder_impl.py | 4 ++++ sdks/python/apache_beam/coders/coders_test.py | 15 +++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index dfdb247d781d..79cf864c9653 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -510,6 +510,10 @@ def encode_special_deterministic(self, value, stream): stream.write_byte(NESTED_STATE_TYPE) self.encode_type(type(value), stream) state_value = value.__getstate__() + if value is not None and state_value is None: + # https://github.com/apache/beam/issues/33020 + # it seems that __getstate__ might be defined correctly + raise TypeError(self._deterministic_encoding_error_msg(value)) try: self.encode_to_stream(state_value, stream, True) except Exception as e: diff --git a/sdks/python/apache_beam/coders/coders_test.py b/sdks/python/apache_beam/coders/coders_test.py index 4d8e8fe9bcb8..9208b4676268 100644 --- a/sdks/python/apache_beam/coders/coders_test.py +++ b/sdks/python/apache_beam/coders/coders_test.py @@ -23,11 +23,14 @@ import proto import pytest +import apache_beam as beam + from apache_beam import typehints from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message from apache_beam.coders import coders from apache_beam.coders.avro_record import AvroRecord from apache_beam.coders.typecoders import registry as coders_registry +from apache_beam.testing.test_pipeline import TestPipeline class PickleCoderTest(unittest.TestCase): @@ -241,6 +244,18 @@ def test_to_type_hint(self): coder = coders.LengthPrefixCoder(coders.BytesCoder()) assert coder.to_type_hint() is bytes +class NumpyIntAsKeyTest(unittest.TestCase): + def test_numpy_int(self): + # this type is not supported as the key + import numpy as np + + with self.assertRaises(TypeError): + with TestPipeline() as p: + indata = p | "Create" >> beam.Create([(a, int(a)) for a in np.arange(3)]) + + # Apply CombinePerkey to sum values for each key. + indata | "CombinePerKey" >> beam.CombinePerKey(sum) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) From d809b3b0859c8c50ab7299a9ea7e0783e88d3d08 Mon Sep 17 00:00:00 2001 From: liferoad Date: Sat, 16 Nov 2024 20:24:14 -0500 Subject: [PATCH 2/5] fix the comments --- sdks/python/apache_beam/coders/coder_impl.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 79cf864c9653..55a49f9b2c06 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -512,7 +512,6 @@ def encode_special_deterministic(self, value, stream): state_value = value.__getstate__() if value is not None and state_value is None: # https://github.com/apache/beam/issues/33020 - # it seems that __getstate__ might be defined correctly raise TypeError(self._deterministic_encoding_error_msg(value)) try: self.encode_to_stream(state_value, stream, True) From baea2a20363ebacce2da3f7db937844c0c0a3bef Mon Sep 17 00:00:00 2001 From: liferoad Date: Mon, 18 Nov 2024 09:35:23 -0500 Subject: [PATCH 3/5] formatting --- sdks/python/apache_beam/coders/coders_test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/coders/coders_test.py b/sdks/python/apache_beam/coders/coders_test.py index 9208b4676268..49c6db79cc64 100644 --- a/sdks/python/apache_beam/coders/coders_test.py +++ b/sdks/python/apache_beam/coders/coders_test.py @@ -244,6 +244,7 @@ def test_to_type_hint(self): coder = coders.LengthPrefixCoder(coders.BytesCoder()) assert coder.to_type_hint() is bytes + class NumpyIntAsKeyTest(unittest.TestCase): def test_numpy_int(self): # this type is not supported as the key @@ -251,7 +252,8 @@ def test_numpy_int(self): with self.assertRaises(TypeError): with TestPipeline() as p: - indata = p | "Create" >> beam.Create([(a, int(a)) for a in np.arange(3)]) + indata = p | "Create" >> beam.Create([(a, int(a)) + for a in np.arange(3)]) # Apply CombinePerkey to sum values for each key. indata | "CombinePerKey" >> beam.CombinePerKey(sum) From 7e7247df7dc53d9393a77a975bf035c72cae72bc Mon Sep 17 00:00:00 2001 From: liferoad Date: Mon, 18 Nov 2024 10:38:37 -0500 Subject: [PATCH 4/5] fixed the lint --- sdks/python/apache_beam/coders/coders_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/coders/coders_test.py b/sdks/python/apache_beam/coders/coders_test.py index 49c6db79cc64..c7eafe4dc7a0 100644 --- a/sdks/python/apache_beam/coders/coders_test.py +++ b/sdks/python/apache_beam/coders/coders_test.py @@ -256,7 +256,7 @@ def test_numpy_int(self): for a in np.arange(3)]) # Apply CombinePerkey to sum values for each key. - indata | "CombinePerKey" >> beam.CombinePerKey(sum) + _ = indata | "CombinePerKey" >> beam.CombinePerKey(sum) if __name__ == '__main__': From fdda1174fca425a5c6956edf62db9f13df10c1b6 Mon Sep 17 00:00:00 2001 From: liferoad Date: Mon, 18 Nov 2024 11:29:19 -0500 Subject: [PATCH 5/5] fixes isort --- sdks/python/apache_beam/coders/coders_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/coders/coders_test.py b/sdks/python/apache_beam/coders/coders_test.py index c7eafe4dc7a0..dc9780e36be3 100644 --- a/sdks/python/apache_beam/coders/coders_test.py +++ b/sdks/python/apache_beam/coders/coders_test.py @@ -24,7 +24,6 @@ import pytest import apache_beam as beam - from apache_beam import typehints from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message from apache_beam.coders import coders