From 4cbf2577199bc916867a34e8e043fb61d306123b Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 18 Dec 2024 10:46:33 -0500 Subject: [PATCH] Revert three commits related to supporting custom coder in reshuffle - Fix custom coder not being used in Reshuffle (global window) (#33339) - Fix custom coders not being used in Reshuffle (non global window) #33363 - Add missing to_type_hint to WindowedValueCoder #33403 --- sdks/python/apache_beam/coders/coders.py | 11 ---- sdks/python/apache_beam/coders/coders_test.py | 6 --- sdks/python/apache_beam/coders/typecoders.py | 2 - sdks/python/apache_beam/transforms/util.py | 13 +---- .../apache_beam/transforms/util_test.py | 54 ------------------- .../typehints/native_type_compatibility.py | 26 --------- .../python/apache_beam/typehints/typehints.py | 9 ---- 7 files changed, 2 insertions(+), 119 deletions(-) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 22d041f34f8b..57d8197a3a00 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -1438,17 +1438,6 @@ def __hash__(self): return hash( (self.wrapped_value_coder, self.timestamp_coder, self.window_coder)) - @classmethod - def from_type_hint(cls, typehint, registry): - # type: (Any, CoderRegistry) -> WindowedValueCoder - # Ideally this'd take two parameters so that one could hint at - # the window type as well instead of falling back to the - # pickle coders. - return cls(registry.get_coder(typehint.inner_type)) - - def to_type_hint(self): - return typehints.WindowedValue[self.wrapped_value_coder.to_type_hint()] - Coder.register_structured_urn( common_urns.coders.WINDOWED_VALUE.urn, WindowedValueCoder) diff --git a/sdks/python/apache_beam/coders/coders_test.py b/sdks/python/apache_beam/coders/coders_test.py index bddd2cb57e06..dc9780e36be3 100644 --- a/sdks/python/apache_beam/coders/coders_test.py +++ b/sdks/python/apache_beam/coders/coders_test.py @@ -258,12 +258,6 @@ def test_numpy_int(self): _ = indata | "CombinePerKey" >> beam.CombinePerKey(sum) -class WindowedValueCoderTest(unittest.TestCase): - def test_to_type_hint(self): - coder = coders.WindowedValueCoder(coders.VarIntCoder()) - self.assertEqual(coder.to_type_hint(), typehints.WindowedValue[int]) # type: ignore[misc] - - if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py index 892f508d0136..1667cb7a916a 100644 --- a/sdks/python/apache_beam/coders/typecoders.py +++ b/sdks/python/apache_beam/coders/typecoders.py @@ -94,8 +94,6 @@ def register_standard_coders(self, fallback_coder): self._register_coder_internal(str, coders.StrUtf8Coder) self._register_coder_internal(typehints.TupleConstraint, coders.TupleCoder) self._register_coder_internal(typehints.DictConstraint, coders.MapCoder) - self._register_coder_internal( - typehints.WindowedTypeConstraint, coders.WindowedValueCoder) # Default fallback coders applied in that order until the first matching # coder found. default_fallback_coders = [ diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index c9fd2c76b0db..a03652de2496 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -33,7 +33,6 @@ from typing import Callable from typing import Iterable from typing import List -from typing import Optional from typing import Tuple from typing import TypeVar from typing import Union @@ -74,13 +73,11 @@ from apache_beam.transforms.window import TimestampedValue from apache_beam.typehints import trivial_inference from apache_beam.typehints.decorators import get_signature -from apache_beam.typehints.native_type_compatibility import TypedWindowedValue from apache_beam.typehints.sharded_key_type import ShardedKeyType from apache_beam.utils import shared from apache_beam.utils import windowed_value from apache_beam.utils.annotations import deprecated from apache_beam.utils.sharded_key import ShardedKey -from apache_beam.utils.timestamp import Timestamp if TYPE_CHECKING: from apache_beam.runners.pipeline_context import PipelineContext @@ -956,10 +953,6 @@ def restore_timestamps(element): window.GlobalWindows.windowed_value((key, value), timestamp) for (value, timestamp) in values ] - - ungrouped = pcoll | Map(reify_timestamps).with_input_types( - Tuple[K, V]).with_output_types( - Tuple[K, Tuple[V, Optional[Timestamp]]]) else: # typing: All conditional function variants must have identical signatures @@ -973,8 +966,7 @@ def restore_timestamps(element): key, windowed_values = element return [wv.with_value((key, wv.value)) for wv in windowed_values] - ungrouped = pcoll | Map(reify_timestamps).with_input_types( - Tuple[K, V]).with_output_types(Tuple[K, TypedWindowedValue[V]]) + ungrouped = pcoll | Map(reify_timestamps).with_output_types(Any) # TODO(https://github.com/apache/beam/issues/19785) Using global window as # one of the standard window. This is to mitigate the Dataflow Java Runner @@ -1026,8 +1018,7 @@ def expand(self, pcoll): pcoll | 'AddRandomKeys' >> Map(lambda t: (random.randrange(0, self.num_buckets), t) ).with_input_types(T).with_output_types(Tuple[int, T]) - | ReshufflePerKey().with_input_types(Tuple[int, T]).with_output_types( - Tuple[int, T]) + | ReshufflePerKey() | 'RemoveRandomKeys' >> Map(lambda t: t[1]).with_input_types( Tuple[int, T]).with_output_types(T)) diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py index db73310dfe25..d86509c7dde3 100644 --- a/sdks/python/apache_beam/transforms/util_test.py +++ b/sdks/python/apache_beam/transforms/util_test.py @@ -1010,60 +1010,6 @@ def format_with_timestamp(element, timestamp=beam.DoFn.TimestampParam): equal_to(expected_data), label="formatted_after_reshuffle") - global _Unpicklable - global _UnpicklableCoder - - class _Unpicklable(object): - def __init__(self, value): - self.value = value - - def __getstate__(self): - raise NotImplementedError() - - def __setstate__(self, state): - raise NotImplementedError() - - class _UnpicklableCoder(beam.coders.Coder): - def encode(self, value): - return str(value.value).encode() - - def decode(self, encoded): - return _Unpicklable(int(encoded.decode())) - - def to_type_hint(self): - return _Unpicklable - - def is_deterministic(self): - return True - - def test_reshuffle_unpicklable_in_global_window(self): - beam.coders.registry.register_coder(_Unpicklable, _UnpicklableCoder) - - with TestPipeline() as pipeline: - data = [_Unpicklable(i) for i in range(5)] - expected_data = [0, 10, 20, 30, 40] - result = ( - pipeline - | beam.Create(data) - | beam.WindowInto(GlobalWindows()) - | beam.Reshuffle() - | beam.Map(lambda u: u.value * 10)) - assert_that(result, equal_to(expected_data)) - - def test_reshuffle_unpicklable_in_non_global_window(self): - beam.coders.registry.register_coder(_Unpicklable, _UnpicklableCoder) - - with TestPipeline() as pipeline: - data = [_Unpicklable(i) for i in range(5)] - expected_data = [0, 0, 0, 10, 10, 10, 20, 20, 20, 30, 30, 30, 40, 40, 40] - result = ( - pipeline - | beam.Create(data) - | beam.WindowInto(window.SlidingWindows(size=3, period=1)) - | beam.Reshuffle() - | beam.Map(lambda u: u.value * 10)) - assert_that(result, equal_to(expected_data)) - class WithKeysTest(unittest.TestCase): def setUp(self): diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility.py b/sdks/python/apache_beam/typehints/native_type_compatibility.py index 381d4f7aae2b..6f704b37a969 100644 --- a/sdks/python/apache_beam/typehints/native_type_compatibility.py +++ b/sdks/python/apache_beam/typehints/native_type_compatibility.py @@ -24,13 +24,9 @@ import sys import types import typing -from typing import Generic -from typing import TypeVar from apache_beam.typehints import typehints -T = TypeVar('T') - _LOGGER = logging.getLogger(__name__) # Describes an entry in the type map in convert_to_beam_type. @@ -220,18 +216,6 @@ def convert_collections_to_typing(typ): return typ -# During type inference of WindowedValue, we need to pass in the inner value -# type. This cannot be achieved immediately with WindowedValue class because it -# is not parameterized. Changing it to a generic class (e.g. WindowedValue[T]) -# could work in theory. However, the class is cythonized and it seems that -# cython does not handle generic classes well. -# The workaround here is to create a separate class solely for the type -# inference purpose. This class should never be used for creating instances. -class TypedWindowedValue(Generic[T]): - def __init__(self, *args, **kwargs): - raise NotImplementedError("This class is solely for type inference") - - def convert_to_beam_type(typ): """Convert a given typing type to a Beam type. @@ -283,12 +267,6 @@ def convert_to_beam_type(typ): # TODO(https://github.com/apache/beam/issues/20076): Currently unhandled. _LOGGER.info('Converting NewType type hint to Any: "%s"', typ) return typehints.Any - elif typ_module == 'apache_beam.typehints.native_type_compatibility' and \ - getattr(typ, "__name__", typ.__origin__.__name__) == 'TypedWindowedValue': - # Need to pass through WindowedValue class so that it can be converted - # to the correct type constraint in Beam - # This is needed to fix https://github.com/apache/beam/issues/33356 - pass elif (typ_module != 'typing') and (typ_module != 'collections.abc'): # Only translate types from the typing and collections.abc modules. return typ @@ -346,10 +324,6 @@ def convert_to_beam_type(typ): match=_match_is_exactly_collection, arity=1, beam_type=typehints.Collection), - _TypeMapEntry( - match=_match_issubclass(TypedWindowedValue), - arity=1, - beam_type=typehints.WindowedValue), ] # Find the first matching entry. diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py index a65a0f753826..0e18e887c2a0 100644 --- a/sdks/python/apache_beam/typehints/typehints.py +++ b/sdks/python/apache_beam/typehints/typehints.py @@ -1213,15 +1213,6 @@ def type_check(self, instance): repr(self.inner_type), instance.value.__class__.__name__)) - def bind_type_variables(self, bindings): - bound_inner_type = bind_type_variables(self.inner_type, bindings) - if bound_inner_type == self.inner_type: - return self - return WindowedValue[bound_inner_type] - - def __repr__(self): - return 'WindowedValue[%s]' % repr(self.inner_type) - class GeneratorHint(IteratorHint): """A Generator type hint.