Skip to content

Commit

Permalink
Avoid pickling unstable reference to moved proto classes. (#23739)
Browse files Browse the repository at this point in the history
CloudPickle notices that schema_pb2 (from the closure of the locally
defined __reduce__ method) is not importable under its declared name
(org.apache.beam...) and tries to pickle it (which fails due to proto
classes themselves being unpicklable).  This avoids that error by
moving it out.
  • Loading branch information
robertwb authored Oct 21, 2022
1 parent 2d151c3 commit 8dd8749
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions sdks/python/apache_beam/typehints/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,19 +463,24 @@ def named_tuple_from_schema(self, schema: schema_pb2.Schema) -> type:

# Define a reduce function, otherwise these types can't be pickled
# (See BEAM-9574)
def __reduce__(self):
return (
_hydrate_namedtuple_instance,
(schema.SerializeToString(), tuple(self)))

setattr(user_type, '__reduce__', __reduce__)
setattr(
user_type,
'__reduce__',
_named_tuple_reduce_method(schema.SerializeToString()))

self.schema_registry.add(user_type, schema)
coders.registry.register_coder(user_type, coders.RowCoder)

return user_type


def _named_tuple_reduce_method(serialized_schema):
def __reduce__(self):
return _hydrate_namedtuple_instance, (serialized_schema, tuple(self))

return __reduce__


def _hydrate_namedtuple_instance(encoded_schema, values):
return named_tuple_from_schema(
proto_utils.parse_Bytes(encoded_schema, schema_pb2.Schema))(*values)
Expand Down

0 comments on commit 8dd8749

Please sign in to comment.