Skip to content

Commit

Permalink
Replace convert_to_typing_type with PEP 585 equivalent (#33538)
Browse files Browse the repository at this point in the history
* Replace convert_to_typing_type with PEP 585 equivalent

* linting
  • Loading branch information
jrmccluskey authored Jan 13, 2025
1 parent 0b2e57c commit 202709b
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 46 deletions.
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/transforms/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def convert_to_typing_type(type_):
if isinstance(type_, row_type.RowTypeConstraint):
return named_tuple_from_schema(named_fields_to_schema(type_._fields))
else:
return native_type_compatibility.convert_to_typing_type(type_)
return native_type_compatibility.convert_to_python_type(type_)


def _is_optional_or_none(typehint):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from apache_beam.portability.api import external_transforms_pb2
from apache_beam.pvalue import Row
from apache_beam.transforms import ptransform
from apache_beam.typehints.native_type_compatibility import convert_to_typing_type
from apache_beam.typehints.native_type_compatibility import convert_to_python_type
from apache_beam.typehints.schemas import named_fields_to_schema
from apache_beam.typehints.trivial_inference import instance_to_type
from apache_beam.utils import python_callable
Expand Down Expand Up @@ -100,11 +100,11 @@ def _resolve(cls, fully_qualified_name):

def to_runner_api_parameter(self, unused_context):
_args_schema = named_fields_to_schema([
(f'arg{ix}', convert_to_typing_type(instance_to_type(value)))
(f'arg{ix}', convert_to_python_type(instance_to_type(value)))
for (ix, value) in enumerate(self._args)
])
_kwargs_schema = named_fields_to_schema([
(key, convert_to_typing_type(instance_to_type(value)))
(key, convert_to_python_type(instance_to_type(value)))
for (key, value) in self._kwargs.items()
])
payload_schema = named_fields_to_schema({
Expand Down
32 changes: 16 additions & 16 deletions sdks/python/apache_beam/typehints/native_type_compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,8 @@ def convert_to_beam_types(args):
return [convert_to_beam_type(v) for v in args]


def convert_to_typing_type(typ):
"""Converts a given Beam type to a typing type.
def convert_to_python_type(typ):
"""Converts a given Beam type to a python type.
This is the reverse of convert_to_beam_type.
Expand Down Expand Up @@ -482,33 +482,33 @@ def convert_to_typing_type(typ):
if isinstance(typ, typehints.AnyTypeConstraint):
return typing.Any
if isinstance(typ, typehints.DictConstraint):
return typing.Dict[convert_to_typing_type(typ.key_type),
convert_to_typing_type(typ.value_type)]
return dict[convert_to_python_type(typ.key_type),
convert_to_python_type(typ.value_type)]
if isinstance(typ, typehints.ListConstraint):
return typing.List[convert_to_typing_type(typ.inner_type)]
return list[convert_to_python_type(typ.inner_type)]
if isinstance(typ, typehints.IterableTypeConstraint):
return typing.Iterable[convert_to_typing_type(typ.inner_type)]
return collections.abc.Iterable[convert_to_python_type(typ.inner_type)]
if isinstance(typ, typehints.UnionConstraint):
if not typ.union_types:
# Gracefully handle the empty union type.
return typing.Any
return typing.Union[tuple(convert_to_typing_types(typ.union_types))]
return typing.Union[tuple(convert_to_python_types(typ.union_types))]
if isinstance(typ, typehints.SetTypeConstraint):
return typing.Set[convert_to_typing_type(typ.inner_type)]
return set[convert_to_python_type(typ.inner_type)]
if isinstance(typ, typehints.FrozenSetTypeConstraint):
return typing.FrozenSet[convert_to_typing_type(typ.inner_type)]
return frozenset[convert_to_python_type(typ.inner_type)]
if isinstance(typ, typehints.TupleConstraint):
return typing.Tuple[tuple(convert_to_typing_types(typ.tuple_types))]
return tuple[tuple(convert_to_python_types(typ.tuple_types))]
if isinstance(typ, typehints.TupleSequenceConstraint):
return typing.Tuple[convert_to_typing_type(typ.inner_type), ...]
return tuple[convert_to_python_type(typ.inner_type), ...]
if isinstance(typ, typehints.IteratorTypeConstraint):
return typing.Iterator[convert_to_typing_type(typ.yielded_type)]
return collections.abc.Iterator[convert_to_python_type(typ.yielded_type)]

raise ValueError('Failed to convert Beam type: %s' % typ)


def convert_to_typing_types(args):
"""Convert the given list or dictionary of args to typing types.
def convert_to_python_types(args):
"""Convert the given list or dictionary of args to python types.
Args:
args: Either an iterable of types, or a dictionary where the values are
Expand All @@ -519,6 +519,6 @@ def convert_to_typing_types(args):
a dictionary with the same keys, and values which have been converted.
"""
if isinstance(args, dict):
return {k: convert_to_typing_type(v) for k, v in args.items()}
return {k: convert_to_python_type(v) for k, v in args.items()}
else:
return [convert_to_typing_type(v) for v in args]
return [convert_to_python_type(v) for v in args]
100 changes: 76 additions & 24 deletions sdks/python/apache_beam/typehints/native_type_compatibility_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@
from apache_beam.typehints.native_type_compatibility import convert_builtin_to_typing
from apache_beam.typehints.native_type_compatibility import convert_to_beam_type
from apache_beam.typehints.native_type_compatibility import convert_to_beam_types
from apache_beam.typehints.native_type_compatibility import convert_to_typing_type
from apache_beam.typehints.native_type_compatibility import convert_to_typing_types
from apache_beam.typehints.native_type_compatibility import convert_to_python_type
from apache_beam.typehints.native_type_compatibility import convert_to_python_types
from apache_beam.typehints.native_type_compatibility import convert_typing_to_builtin
from apache_beam.typehints.native_type_compatibility import is_any

_TestNamedTuple = typing.NamedTuple(
'_TestNamedTuple', [('age', int), ('name', bytes)])
_TestFlatAlias = typing.Tuple[bytes, float]
_TestNestedAlias = typing.List[_TestFlatAlias]
_TestFlatAlias = tuple[bytes, float]
_TestNestedAlias = list[_TestFlatAlias]
_TestFlatAliasTyping = typing.Tuple[bytes, float]
_TestNestedAliasTyping = typing.List[_TestFlatAliasTyping]


class _TestClass(object):
Expand Down Expand Up @@ -68,50 +70,51 @@ def test_convert_to_beam_type(self):
('raw int', int, int),
('raw float', float, float),
('any', typing.Any, typehints.Any),
('simple dict', typing.Dict[bytes, int],
('simple dict', dict[bytes, int],
typehints.Dict[bytes, int]),
('simple list', typing.List[int], typehints.List[int]),
('simple iterable', typing.Iterable[int], typehints.Iterable[int]),
('simple list', list[int], typehints.List[int]),
('simple iterable', collections.abc.Iterable[int],
typehints.Iterable[int]),
('simple optional', typing.Optional[int], typehints.Optional[int]),
('simple set', typing.Set[float], typehints.Set[float]),
('simple set', set[float], typehints.Set[float]),
('simple frozenset',
typing.FrozenSet[float],
frozenset[float],
typehints.FrozenSet[float]),
('simple unary tuple', typing.Tuple[bytes],
('simple unary tuple', tuple[bytes],
typehints.Tuple[bytes]),
('simple union', typing.Union[int, bytes, float],
typehints.Union[int, bytes, float]),
('namedtuple', _TestNamedTuple, _TestNamedTuple),
('test class', _TestClass, _TestClass),
('test class in list', typing.List[_TestClass],
('test class in list', list[_TestClass],
typehints.List[_TestClass]),
('generic bare', _TestGeneric, _TestGeneric),
('generic subscripted', _TestGeneric[int], _TestGeneric[int]),
('complex tuple', typing.Tuple[bytes, typing.List[typing.Tuple[
('complex tuple', tuple[bytes, list[tuple[
bytes, typing.Union[int, bytes, float]]]],
typehints.Tuple[bytes, typehints.List[typehints.Tuple[
bytes, typehints.Union[int, bytes, float]]]]),
('arbitrary-length tuple', typing.Tuple[int, ...],
('arbitrary-length tuple', tuple[int, ...],
typehints.Tuple[int, ...]),
('flat alias', _TestFlatAlias, typehints.Tuple[bytes, float]), # type: ignore[misc]
('nested alias', _TestNestedAlias,
typehints.List[typehints.Tuple[bytes, float]]),
('complex dict',
typing.Dict[bytes, typing.List[typing.Tuple[bytes, _TestClass]]],
dict[bytes, list[tuple[bytes, _TestClass]]],
typehints.Dict[bytes, typehints.List[typehints.Tuple[
bytes, _TestClass]]]),
('type var', typing.TypeVar('T'), typehints.TypeVariable('T')),
('nested type var',
typing.Tuple[typing.TypeVar('K'), typing.TypeVar('V')],
tuple[typing.TypeVar('K'), typing.TypeVar('V')],
typehints.Tuple[typehints.TypeVariable('K'),
typehints.TypeVariable('V')]),
('iterator', typing.Iterator[typing.Any],
('iterator', collections.abc.Iterator[typing.Any],
typehints.Iterator[typehints.Any]),
('nested generic bare', typing.List[_TestGeneric],
('nested generic bare', list[_TestGeneric],
typehints.List[_TestGeneric]),
('nested generic subscripted', typing.List[_TestGeneric[int]],
('nested generic subscripted', list[_TestGeneric[int]],
typehints.List[_TestGeneric[int]]),
('nested generic with any', typing.List[_TestPair[typing.Any]],
('nested generic with any', list[_TestPair[typing.Any]],
typehints.List[_TestPair[typing.Any]]),
('raw enum', _TestEnum, _TestEnum),
]
Expand All @@ -125,9 +128,58 @@ def test_convert_to_beam_type(self):
expected_beam_type = test_case[2]
converted_beam_type = convert_to_beam_type(typing_type)
self.assertEqual(converted_beam_type, expected_beam_type, description)
converted_typing_type = convert_to_typing_type(converted_beam_type)
converted_typing_type = convert_to_python_type(converted_beam_type)
self.assertEqual(converted_typing_type, typing_type, description)

def test_convert_to_beam_type_with_typing_types(self):
test_cases = [
('simple dict', typing.Dict[bytes, int],
typehints.Dict[bytes, int]),
('simple list', typing.List[int], typehints.List[int]),
('simple iterable', typing.Iterable[int], typehints.Iterable[int]),
('simple optional', typing.Optional[int], typehints.Optional[int]),
('simple set', typing.Set[float], typehints.Set[float]),
('simple frozenset',
typing.FrozenSet[float],
typehints.FrozenSet[float]),
('simple unary tuple', typing.Tuple[bytes],
typehints.Tuple[bytes]),
('test class in list', typing.List[_TestClass],
typehints.List[_TestClass]),
('complex tuple', typing.Tuple[bytes, typing.List[typing.Tuple[
bytes, typing.Union[int, bytes, float]]]],
typehints.Tuple[bytes, typehints.List[typehints.Tuple[
bytes, typehints.Union[int, bytes, float]]]]),
('arbitrary-length tuple', typing.Tuple[int, ...],
typehints.Tuple[int, ...]),
('flat alias', _TestFlatAliasTyping, typehints.Tuple[bytes, float]), # type: ignore[misc]
('nested alias', _TestNestedAliasTyping,
typehints.List[typehints.Tuple[bytes, float]]),
('complex dict',
typing.Dict[bytes, typing.List[typing.Tuple[bytes, _TestClass]]],
typehints.Dict[bytes, typehints.List[typehints.Tuple[
bytes, _TestClass]]]),
('nested type var',
typing.Tuple[typing.TypeVar('K'), typing.TypeVar('V')],
typehints.Tuple[typehints.TypeVariable('K'),
typehints.TypeVariable('V')]),
('iterator', typing.Iterator[typing.Any],
typehints.Iterator[typehints.Any]),
('nested generic bare', typing.List[_TestGeneric],
typehints.List[_TestGeneric]),
('nested generic subscripted', typing.List[_TestGeneric[int]],
typehints.List[_TestGeneric[int]]),
('nested generic with any', typing.List[_TestPair[typing.Any]],
typehints.List[_TestPair[typing.Any]]),
]

for test_case in test_cases:
description = test_case[0]
builtins_type = test_case[1]
expected_beam_type = test_case[2]
converted_beam_type = convert_to_beam_type(builtins_type)
self.assertEqual(converted_beam_type, expected_beam_type, description)

def test_convert_to_beam_type_with_builtin_types(self):
test_cases = [
('builtin dict', dict[str, int], typehints.Dict[str, int]),
Expand Down Expand Up @@ -312,9 +364,9 @@ def test_convert_bare_types_fail(self):
def test_convert_to_beam_types(self):
typing_types = [
bytes,
typing.List[bytes],
typing.List[typing.Tuple[bytes, int]],
typing.Union[int, typing.List[int]]
list[bytes],
list[tuple[bytes, int]],
typing.Union[int, list[int]]
]
beam_types = [
bytes,
Expand All @@ -324,7 +376,7 @@ def test_convert_to_beam_types(self):
]
converted_beam_types = convert_to_beam_types(typing_types)
self.assertEqual(converted_beam_types, beam_types)
converted_typing_types = convert_to_typing_types(converted_beam_types)
converted_typing_types = convert_to_python_types(converted_beam_types)
self.assertEqual(converted_typing_types, typing_types)

def test_is_any(self):
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/typehints/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
from apache_beam.typehints.native_type_compatibility import _match_is_exactly_mapping
from apache_beam.typehints.native_type_compatibility import _match_is_optional
from apache_beam.typehints.native_type_compatibility import _safe_issubclass
from apache_beam.typehints.native_type_compatibility import convert_to_typing_type
from apache_beam.typehints.native_type_compatibility import convert_to_python_type
from apache_beam.typehints.native_type_compatibility import extract_optional_type
from apache_beam.typehints.native_type_compatibility import match_is_named_tuple
from apache_beam.typehints.schema_registry import SCHEMA_REGISTRY
Expand Down Expand Up @@ -293,7 +293,7 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType:
return self.typing_to_runner_api(row_type_constraint)

if isinstance(type_, typehints.TypeConstraint):
type_ = convert_to_typing_type(type_)
type_ = convert_to_python_type(type_)

# All concrete types (other than NamedTuple sub-classes) should map to
# a supported primitive type.
Expand Down

0 comments on commit 202709b

Please sign in to comment.