From f5ba953c6173f17678bcc64c5a700efaacade34a Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 13 Dec 2024 18:17:24 -0800 Subject: [PATCH 1/3] Faster default coder for unknown windows. This will get used in a windowed reshuffle, among other places. --- sdks/python/apache_beam/coders/coder_impl.pxd | 12 ++++++ sdks/python/apache_beam/coders/coder_impl.py | 31 ++++++++++++++ sdks/python/apache_beam/coders/coders.py | 40 ++++++++++++++++++- .../apache_beam/coders/coders_test_common.py | 8 ++++ 4 files changed, 90 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd index 52889fa2fd92..8a28499555c1 100644 --- a/sdks/python/apache_beam/coders/coder_impl.pxd +++ b/sdks/python/apache_beam/coders/coder_impl.pxd @@ -219,6 +219,18 @@ cdef libc.stdint.int64_t MIN_TIMESTAMP_micros cdef libc.stdint.int64_t MAX_TIMESTAMP_micros +cdef class _OrderedUnionCoderImpl(StreamCoderImpl): + cdef tuple _types + cdef tuple _coder_impls + cdef CoderImpl _fallback_coder_impl + + @cython.locals(ix=int, c=CoderImpl) + cpdef encode_to_stream(self, value, OutputStream stream, bint nested) + + @cython.locals(ix=int, c=CoderImpl) + cpdef decode_from_stream(self, InputStream stream, bint nested) + + cdef class WindowedValueCoderImpl(StreamCoderImpl): """A coder for windowed values.""" cdef CoderImpl _value_coder diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index 5262e6adf8a6..5dff35052901 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -1421,6 +1421,37 @@ def estimate_size(self, value, nested=False): return size +class _OrderedUnionCoderImpl(StreamCoderImpl): + def __init__(self, coder_impl_types, fallback_coder_impl): + assert len(coder_impl_types) < 128 + self._types, self._coder_impls = zip(*coder_impl_types) + self._fallback_coder_impl = fallback_coder_impl + + def encode_to_stream(self, value, out, nested): + value_t = type(value) + for (ix, t) in enumerate(self._types): + if value_t is t: + out.write_byte(ix) + c = self._coder_impls[ix] # for typing + c.encode_to_stream(value, out, nested) + break + else: + if self._fallback_coder_impl is None: + raise ValueError("No fallback.") + out.write_byte(0xFF) + self._fallback_coder_impl.encode_to_stream(value, out, nested) + + def decode_from_stream(self, in_stream, nested): + ix = in_stream.read_byte() + if ix == 0xFF: + if self._fallback_coder_impl is None: + raise ValueError("No fallback.") + return self._fallback_coder_impl.decode_from_stream(in_stream, nested) + else: + c = self._coder_impls[ix] # for typing + return c.decode_from_stream(in_stream, nested) + + class WindowedValueCoderImpl(StreamCoderImpl): """For internal use only; no backwards-compatibility guarantees. diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 724f268a8312..f221d1f940dd 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -1350,12 +1350,50 @@ def __hash__(self): common_urns.coders.INTERVAL_WINDOW.urn, IntervalWindowCoder) +class _OrderedUnionCoder(FastCoder): + def __init__( + self, + *coder_types: Iterable[Tuple[type, Coder]], + fallback_coder: Optional[Coder]): + self._coder_types = coder_types + self._fallback_coder = fallback_coder + + def _create_impl(self): + return coder_impl._OrderedUnionCoderImpl( + [(t, c.get_impl()) for t, c in self._coder_types], + fallback_coder_impl=self._fallback_coder.get_impl() + if self._fallback_coder else None) + + def is_deterministic(self) -> bool: + return ( + all(c.is_deterministic for _, c in self._coder_types) and ( + self._fallback_coder is None or + self._fallback_coder.is_deterministic())) + + def to_type_hint(self): + return Any + + def __eq__(self, other): + return ( + type(self) == type(other) and + self._coder_types == other._coder_types and + self._fallback_coder == other._fallback_coder) + + def __hash__(self): + return hash((type(self), tuple(self._coder_types), self._fallback_coder)) + + class WindowedValueCoder(FastCoder): """Coder for windowed values.""" def __init__(self, wrapped_value_coder, window_coder=None): # type: (Coder, Optional[Coder]) -> None if not window_coder: - window_coder = PickleCoder() + # Avoid circular imports. + from apache_beam.transforms import window + window_coder = _OrderedUnionCoder( + (window.GlobalWindow, GlobalWindowCoder()), + (window.IntervalWindow, IntervalWindowCoder()), + fallback_coder=PickleCoder()) self.wrapped_value_coder = wrapped_value_coder self.timestamp_coder = TimestampCoder() self.window_coder = window_coder diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index 4bd9698dd57b..f3381cdb1d69 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -769,6 +769,14 @@ def test_decimal_coder(self): test_encodings[idx], base64.b64encode(test_coder.encode(value)).decode().rstrip("=")) + def test_OrderedUnionCoder(self): + test_coder = coders._OrderedUnionCoder((str, coders.StrUtf8Coder()), + (int, coders.VarIntCoder()), + fallback_coder=coders.FloatCoder()) + self.check_coder(test_coder, 's') + self.check_coder(test_coder, 123) + self.check_coder(test_coder, 1.5) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) From b0c0f6b4152147669e3c820946199b70b9891cd0 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 13 Dec 2024 22:42:48 -0800 Subject: [PATCH 2/3] mypy --- sdks/python/apache_beam/coders/coders.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index f221d1f940dd..1103529bd734 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -1353,7 +1353,7 @@ def __hash__(self): class _OrderedUnionCoder(FastCoder): def __init__( self, - *coder_types: Iterable[Tuple[type, Coder]], + *coder_types: Tuple[type, Coder], fallback_coder: Optional[Coder]): self._coder_types = coder_types self._fallback_coder = fallback_coder From fad1563c9bb1b3c5a7c2b2b8cd2d8d9bb058d9a0 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Mon, 16 Dec 2024 12:12:42 -0800 Subject: [PATCH 3/3] yapf --- sdks/python/apache_beam/coders/coders.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 1103529bd734..e52c6048a15c 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -1352,9 +1352,7 @@ def __hash__(self): class _OrderedUnionCoder(FastCoder): def __init__( - self, - *coder_types: Tuple[type, Coder], - fallback_coder: Optional[Coder]): + self, *coder_types: Tuple[type, Coder], fallback_coder: Optional[Coder]): self._coder_types = coder_types self._fallback_coder = fallback_coder