From 193372f31f6e0d65b51fbc9f2dca2280a3263a14 Mon Sep 17 00:00:00 2001 From: Jack McCluskey <34928439+jrmccluskey@users.noreply.github.com> Date: Tue, 8 Oct 2024 13:03:44 -0400 Subject: [PATCH] Support Protobuf 5.x (#32679) * Force protobuf 5.x * restore lower bound * Add bypass for microsecond conversion bounds * linting * fix incorrect unit, standardize micros_to_nanos * remove unused constant * add extra unit tests * formatting --- sdks/python/apache_beam/transforms/window.py | 12 ++-- sdks/python/apache_beam/utils/proto_utils.py | 28 +++++++- .../apache_beam/utils/proto_utils_test.py | 67 +++++++++++++++++++ sdks/python/setup.py | 2 +- 4 files changed, 100 insertions(+), 9 deletions(-) create mode 100644 sdks/python/apache_beam/utils/proto_utils_test.py diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 592164a5ef49..fc20174ca1e2 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -449,8 +449,8 @@ def to_runner_api_parameter(self, context): standard_window_fns_pb2.FixedWindowsPayload) def from_runner_api_parameter(fn_parameter, unused_context) -> 'FixedWindows': return FixedWindows( - size=Duration(micros=fn_parameter.size.ToMicroseconds()), - offset=Timestamp(micros=fn_parameter.offset.ToMicroseconds())) + size=Duration(micros=proto_utils.to_micros(fn_parameter.size)), + offset=Timestamp(micros=proto_utils.to_micros(fn_parameter.offset))) class SlidingWindows(NonMergingWindowFn): @@ -522,9 +522,9 @@ def to_runner_api_parameter(self, context): def from_runner_api_parameter( fn_parameter, unused_context) -> 'SlidingWindows': return SlidingWindows( - size=Duration(micros=fn_parameter.size.ToMicroseconds()), - offset=Timestamp(micros=fn_parameter.offset.ToMicroseconds()), - period=Duration(micros=fn_parameter.period.ToMicroseconds())) + size=Duration(micros=proto_utils.to_micros(fn_parameter.size)), + offset=Timestamp(micros=proto_utils.to_micros(fn_parameter.offset)), + period=Duration(micros=proto_utils.to_micros(fn_parameter.period))) class Sessions(WindowFn): @@ -589,4 +589,4 @@ def to_runner_api_parameter(self, context): standard_window_fns_pb2.SessionWindowsPayload) def from_runner_api_parameter(fn_parameter, unused_context) -> 'Sessions': return Sessions( - gap_size=Duration(micros=fn_parameter.gap_size.ToMicroseconds())) + gap_size=Duration(micros=proto_utils.to_micros(fn_parameter.gap_size))) diff --git a/sdks/python/apache_beam/utils/proto_utils.py b/sdks/python/apache_beam/utils/proto_utils.py index cc637dead477..9a93c9e48ea3 100644 --- a/sdks/python/apache_beam/utils/proto_utils.py +++ b/sdks/python/apache_beam/utils/proto_utils.py @@ -36,6 +36,9 @@ message_types = (message.Message, ) +_SECONDS_TO_MICROS = 10**6 +_MICROS_TO_NANOS = 10**3 + @overload def pack_Any(msg: message.Message) -> any_pb2.Any: @@ -115,8 +118,29 @@ def pack_Struct(**kwargs) -> struct_pb2.Struct: def from_micros(cls: Type[TimeMessageT], micros: int) -> TimeMessageT: result = cls() - result.FromMicroseconds(micros) - return result + if isinstance(result, duration_pb2.Duration): + result.FromMicroseconds(micros) + return result + # Protobuf 5.x enforces a maximum timestamp value less than the Beam + # maximum allowable timestamp, so we cannot use the built-in conversion. + elif isinstance(result, timestamp_pb2.Timestamp): + result.seconds = micros // _SECONDS_TO_MICROS + result.nanos = (micros % _SECONDS_TO_MICROS) * _MICROS_TO_NANOS + return result + else: + raise RuntimeError('cannot convert the micro seconds to %s' % cls) + + +def to_micros(value: Union[duration_pb2.Duration, timestamp_pb2.Timestamp]): + if isinstance(value, duration_pb2.Duration): + return value.ToMicroseconds() + # Protobuf 5.x enforces a maximum timestamp value less than the Beam + # maximum allowable timestamp, so we cannot use the built-in conversion. + elif isinstance(value, timestamp_pb2.Timestamp): + micros = value.seconds * _SECONDS_TO_MICROS + return micros + (value.nanos // _MICROS_TO_NANOS) + else: + raise RuntimeError('cannot convert %s to micro seconds' % value) def to_Timestamp(time: Union[int, float]) -> timestamp_pb2.Timestamp: diff --git a/sdks/python/apache_beam/utils/proto_utils_test.py b/sdks/python/apache_beam/utils/proto_utils_test.py new file mode 100644 index 000000000000..c40967cd2c0f --- /dev/null +++ b/sdks/python/apache_beam/utils/proto_utils_test.py @@ -0,0 +1,67 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import unittest + +from google.protobuf import duration_pb2 +from google.protobuf import timestamp_pb2 + +from apache_beam.utils import proto_utils +from apache_beam.utils.timestamp import MAX_TIMESTAMP + + +class TestProtoUtils(unittest.TestCase): + def test_from_micros_duration(self): + ts = proto_utils.from_micros(duration_pb2.Duration, MAX_TIMESTAMP.micros) + expected = duration_pb2.Duration( + seconds=MAX_TIMESTAMP.seconds(), nanos=775000000) + self.assertEqual(ts, expected) + + def test_from_micros_timestamp(self): + ts = proto_utils.from_micros(timestamp_pb2.Timestamp, MAX_TIMESTAMP.micros) + expected = timestamp_pb2.Timestamp( + seconds=MAX_TIMESTAMP.seconds(), nanos=775000000) + self.assertEqual(ts, expected) + + def test_to_micros_duration(self): + dur = duration_pb2.Duration( + seconds=MAX_TIMESTAMP.seconds(), nanos=775000000) + ts = proto_utils.to_micros(dur) + expected = MAX_TIMESTAMP.micros + self.assertEqual(ts, expected) + + def test_to_micros_timestamp(self): + dur = timestamp_pb2.Timestamp( + seconds=MAX_TIMESTAMP.seconds(), nanos=775000000) + ts = proto_utils.to_micros(dur) + expected = MAX_TIMESTAMP.micros + self.assertEqual(ts, expected) + + def test_round_trip_duration(self): + expected = 919336704 + dur = proto_utils.from_micros(duration_pb2.Duration, expected) + ms = proto_utils.to_micros(dur) + self.assertEqual(ms, expected) + + def test_round_trip_timestamp(self): + expected = 919336704 + ts = proto_utils.from_micros(timestamp_pb2.Timestamp, expected) + ms = proto_utils.to_micros(ts) + self.assertEqual(ms, expected) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 721cb4c1a8dd..ac72d06604ef 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -381,7 +381,7 @@ def get_portability_package_data(): # # 3. Exclude protobuf 4 versions that leak memory, see: # https://github.com/apache/beam/issues/28246 - 'protobuf>=3.20.3,<4.26.0,!=4.0.*,!=4.21.*,!=4.22.0,!=4.23.*,!=4.24.*', # pylint: disable=line-too-long + 'protobuf>=3.20.3,<6.0.0.dev0,!=4.0.*,!=4.21.*,!=4.22.0,!=4.23.*,!=4.24.*', # pylint: disable=line-too-long 'pydot>=1.2.0,<2', 'python-dateutil>=2.8.0,<3', 'pytz>=2018.3',