Skip to content

Commit

Permalink
Use other fallback coders for protobuf Message base class.
Browse files Browse the repository at this point in the history
  • Loading branch information
shunping committed Dec 20, 2024
1 parent a51a0e1 commit aacbd60
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 2 deletions.
9 changes: 7 additions & 2 deletions sdks/python/apache_beam/coders/coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -1039,11 +1039,16 @@ def __hash__(self):

@classmethod
def from_type_hint(cls, typehint, unused_registry):
if issubclass(typehint, proto_utils.message_types):
# The typehint must be a subclass of google.protobuf.message.Message.
# Using message.Message itself prevents ProtoCoder usage, as required APIs
# are not implemented in the base class. If this occurs, an error is raised
# and the system defaults to other fallback coders.
if (issubclass(typehint, proto_utils.message_types) and
typehint != message.Message):
return cls(typehint)
else:
raise ValueError((
'Expected a subclass of google.protobuf.message.Message'
'Expected a strict subclass of google.protobuf.message.Message'
', but got a %s' % typehint))

def to_type_hint(self):
Expand Down
18 changes: 18 additions & 0 deletions sdks/python/apache_beam/coders/coders_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import logging
import unittest

from google.protobuf import message
import proto
import pytest

Expand Down Expand Up @@ -86,6 +87,23 @@ def test_proto_coder(self):
self.assertEqual(ma, real_coder.decode(real_coder.encode(ma)))
self.assertEqual(ma.__class__, real_coder.to_type_hint())

def test_proto_coder_on_protobuf_message_subclasses(self):
# This replicates a scenario where users provide message.Message as the
# output typehint for a Map function, even though the actual output messages
# are subclasses of message.Message.
ma = test_message.MessageA()
mb = ma.field2.add()
mb.field1 = True
ma.field1 = 'hello world'

coder = coders_registry.get_coder(message.Message)
# For messages of google.protobuf.message.Message, the fallback coder will
# be FastPrimitiveCoder other than ProtoCoder.
# See the comment on ProtoCoder.from_type_hint() for further details.
self.assertEqual(coder, coders.FastPrimitivesCoder())

self.assertEqual(ma, coder.decode(coder.encode(ma)))


class DeterministicProtoCoderTest(unittest.TestCase):
def test_deterministic_proto_coder(self):
Expand Down

0 comments on commit aacbd60

Please sign in to comment.