Skip to content

Commit

Permalink
Support Protobuf 5.x (#32679)
Browse files Browse the repository at this point in the history
* Force protobuf 5.x

* restore lower bound

* Add bypass for microsecond conversion bounds

* linting

* fix incorrect unit, standardize micros_to_nanos

* remove unused constant

* add extra unit tests

* formatting
  • Loading branch information
jrmccluskey authored Oct 8, 2024
1 parent bcd785f commit 193372f
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 9 deletions.
12 changes: 6 additions & 6 deletions sdks/python/apache_beam/transforms/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,8 @@ def to_runner_api_parameter(self, context):
standard_window_fns_pb2.FixedWindowsPayload)
def from_runner_api_parameter(fn_parameter, unused_context) -> 'FixedWindows':
return FixedWindows(
size=Duration(micros=fn_parameter.size.ToMicroseconds()),
offset=Timestamp(micros=fn_parameter.offset.ToMicroseconds()))
size=Duration(micros=proto_utils.to_micros(fn_parameter.size)),
offset=Timestamp(micros=proto_utils.to_micros(fn_parameter.offset)))


class SlidingWindows(NonMergingWindowFn):
Expand Down Expand Up @@ -522,9 +522,9 @@ def to_runner_api_parameter(self, context):
def from_runner_api_parameter(
fn_parameter, unused_context) -> 'SlidingWindows':
return SlidingWindows(
size=Duration(micros=fn_parameter.size.ToMicroseconds()),
offset=Timestamp(micros=fn_parameter.offset.ToMicroseconds()),
period=Duration(micros=fn_parameter.period.ToMicroseconds()))
size=Duration(micros=proto_utils.to_micros(fn_parameter.size)),
offset=Timestamp(micros=proto_utils.to_micros(fn_parameter.offset)),
period=Duration(micros=proto_utils.to_micros(fn_parameter.period)))


class Sessions(WindowFn):
Expand Down Expand Up @@ -589,4 +589,4 @@ def to_runner_api_parameter(self, context):
standard_window_fns_pb2.SessionWindowsPayload)
def from_runner_api_parameter(fn_parameter, unused_context) -> 'Sessions':
return Sessions(
gap_size=Duration(micros=fn_parameter.gap_size.ToMicroseconds()))
gap_size=Duration(micros=proto_utils.to_micros(fn_parameter.gap_size)))
28 changes: 26 additions & 2 deletions sdks/python/apache_beam/utils/proto_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@

message_types = (message.Message, )

_SECONDS_TO_MICROS = 10**6
_MICROS_TO_NANOS = 10**3


@overload
def pack_Any(msg: message.Message) -> any_pb2.Any:
Expand Down Expand Up @@ -115,8 +118,29 @@ def pack_Struct(**kwargs) -> struct_pb2.Struct:

def from_micros(cls: Type[TimeMessageT], micros: int) -> TimeMessageT:
result = cls()
result.FromMicroseconds(micros)
return result
if isinstance(result, duration_pb2.Duration):
result.FromMicroseconds(micros)
return result
# Protobuf 5.x enforces a maximum timestamp value less than the Beam
# maximum allowable timestamp, so we cannot use the built-in conversion.
elif isinstance(result, timestamp_pb2.Timestamp):
result.seconds = micros // _SECONDS_TO_MICROS
result.nanos = (micros % _SECONDS_TO_MICROS) * _MICROS_TO_NANOS
return result
else:
raise RuntimeError('cannot convert the micro seconds to %s' % cls)


def to_micros(value: Union[duration_pb2.Duration, timestamp_pb2.Timestamp]):
if isinstance(value, duration_pb2.Duration):
return value.ToMicroseconds()
# Protobuf 5.x enforces a maximum timestamp value less than the Beam
# maximum allowable timestamp, so we cannot use the built-in conversion.
elif isinstance(value, timestamp_pb2.Timestamp):
micros = value.seconds * _SECONDS_TO_MICROS
return micros + (value.nanos // _MICROS_TO_NANOS)
else:
raise RuntimeError('cannot convert %s to micro seconds' % value)


def to_Timestamp(time: Union[int, float]) -> timestamp_pb2.Timestamp:
Expand Down
67 changes: 67 additions & 0 deletions sdks/python/apache_beam/utils/proto_utils_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import unittest

from google.protobuf import duration_pb2
from google.protobuf import timestamp_pb2

from apache_beam.utils import proto_utils
from apache_beam.utils.timestamp import MAX_TIMESTAMP


class TestProtoUtils(unittest.TestCase):
def test_from_micros_duration(self):
ts = proto_utils.from_micros(duration_pb2.Duration, MAX_TIMESTAMP.micros)
expected = duration_pb2.Duration(
seconds=MAX_TIMESTAMP.seconds(), nanos=775000000)
self.assertEqual(ts, expected)

def test_from_micros_timestamp(self):
ts = proto_utils.from_micros(timestamp_pb2.Timestamp, MAX_TIMESTAMP.micros)
expected = timestamp_pb2.Timestamp(
seconds=MAX_TIMESTAMP.seconds(), nanos=775000000)
self.assertEqual(ts, expected)

def test_to_micros_duration(self):
dur = duration_pb2.Duration(
seconds=MAX_TIMESTAMP.seconds(), nanos=775000000)
ts = proto_utils.to_micros(dur)
expected = MAX_TIMESTAMP.micros
self.assertEqual(ts, expected)

def test_to_micros_timestamp(self):
dur = timestamp_pb2.Timestamp(
seconds=MAX_TIMESTAMP.seconds(), nanos=775000000)
ts = proto_utils.to_micros(dur)
expected = MAX_TIMESTAMP.micros
self.assertEqual(ts, expected)

def test_round_trip_duration(self):
expected = 919336704
dur = proto_utils.from_micros(duration_pb2.Duration, expected)
ms = proto_utils.to_micros(dur)
self.assertEqual(ms, expected)

def test_round_trip_timestamp(self):
expected = 919336704
ts = proto_utils.from_micros(timestamp_pb2.Timestamp, expected)
ms = proto_utils.to_micros(ts)
self.assertEqual(ms, expected)


if __name__ == '__main__':
unittest.main()
2 changes: 1 addition & 1 deletion sdks/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ def get_portability_package_data():
#
# 3. Exclude protobuf 4 versions that leak memory, see:
# https://github.com/apache/beam/issues/28246
'protobuf>=3.20.3,<4.26.0,!=4.0.*,!=4.21.*,!=4.22.0,!=4.23.*,!=4.24.*', # pylint: disable=line-too-long
'protobuf>=3.20.3,<6.0.0.dev0,!=4.0.*,!=4.21.*,!=4.22.0,!=4.23.*,!=4.24.*', # pylint: disable=line-too-long
'pydot>=1.2.0,<2',
'python-dateutil>=2.8.0,<3',
'pytz>=2018.3',
Expand Down

0 comments on commit 193372f

Please sign in to comment.