diff --git a/sdks/python/apache_beam/transforms/__init__.py b/sdks/python/apache_beam/transforms/__init__.py index ab35331f2ba9..4e66a290842c 100644 --- a/sdks/python/apache_beam/transforms/__init__.py +++ b/sdks/python/apache_beam/transforms/__init__.py @@ -18,7 +18,6 @@ """PTransform and descendants.""" # pylint: disable=wildcard-import -from __future__ import absolute_import from apache_beam.transforms import combiners from apache_beam.transforms.core import * diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py index 750961bce653..496fb041bd45 100644 --- a/sdks/python/apache_beam/transforms/combiners.py +++ b/sdks/python/apache_beam/transforms/combiners.py @@ -19,17 +19,12 @@ # pytype: skip-file -from __future__ import absolute_import -from __future__ import division - import copy import heapq import operator import random import sys import warnings -from builtins import object -from builtins import zip from typing import Any from typing import Dict from typing import Iterable @@ -39,8 +34,6 @@ from typing import TypeVar from typing import Union -from past.builtins import long - from apache_beam import typehints from apache_beam.transforms import core from apache_beam.transforms import cy_combiners @@ -96,7 +89,7 @@ def expand(self, pcoll): # TODO(laolu): This type signature is overly restrictive. This should be # more general. -@with_input_types(Union[float, int, long]) +@with_input_types(Union[float, int]) @with_output_types(float) class MeanCombineFn(core.CombineFn): """CombineFn for computing an arithmetic mean.""" diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index daece5c0e68f..f8d25d32256e 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -18,15 +18,11 @@ """Unit tests for our libraries of combine PTransforms.""" # pytype: skip-file -from __future__ import absolute_import -from __future__ import division - import itertools import random import unittest import hamcrest as hc -from future.builtins import range from nose.plugins.attrib import attr import apache_beam as beam diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 564450bab023..a89cefabb8e9 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -19,19 +19,12 @@ # pytype: skip-file -from __future__ import absolute_import - import copy import inspect import logging import random import types import typing -from builtins import map -from builtins import object -from builtins import range - -from past.builtins import unicode from apache_beam import coders from apache_beam import pvalue @@ -80,11 +73,6 @@ from apache_beam.transforms.trigger import DefaultTrigger from apache_beam.transforms.trigger import TriggerFn -try: - import funcsigs # Python 2 only. -except ImportError: - funcsigs = None - __all__ = [ 'DoFn', 'CombineFn', @@ -385,12 +373,7 @@ def get_function_args_defaults(f): it doesn't include bound arguments and may follow function wrappers. """ signature = get_signature(f) - # Fall back on funcsigs if inspect module doesn't have 'Parameter'; prefer - # inspect.Parameter over funcsigs.Parameter if both are available. - try: - parameter = inspect.Parameter - except AttributeError: - parameter = funcsigs.Parameter + parameter = inspect.Parameter # TODO(BEAM-5878) support kwonlyargs on Python 3. _SUPPORTED_ARG_TYPES = [ parameter.POSITIONAL_ONLY, parameter.POSITIONAL_OR_KEYWORD @@ -1618,18 +1601,10 @@ def MapTuple(fn, *args, **kwargs): # pylint: disable=invalid-name beam.MapTuple(lambda a, b, ...: ...) - is equivalent to Python 2 - - beam.Map(lambda (a, b, ...), ...: ...) - In other words beam.MapTuple(fn) - is equivalent to - - beam.Map(lambda element, ...: fn(\*element, ...)) - This can be useful when processing a PCollection of tuples (e.g. key-value pairs). @@ -2904,7 +2879,7 @@ def __init__(self, values, reshuffle=True): values: An object of values for the PCollection """ super(Create, self).__init__() - if isinstance(values, (unicode, str, bytes)): + if isinstance(values, (str, bytes)): raise TypeError( 'PTransform Create: Refusing to treat string as ' 'an iterable. (string=%r)' % values) diff --git a/sdks/python/apache_beam/transforms/create_source.py b/sdks/python/apache_beam/transforms/create_source.py index e25a383f3de5..2fbc925afdda 100644 --- a/sdks/python/apache_beam/transforms/create_source.py +++ b/sdks/python/apache_beam/transforms/create_source.py @@ -17,13 +17,6 @@ # pytype: skip-file -from __future__ import absolute_import -from __future__ import division - -from builtins import map -from builtins import next -from builtins import range - from apache_beam.io import iobase from apache_beam.transforms.core import Create diff --git a/sdks/python/apache_beam/transforms/create_test.py b/sdks/python/apache_beam/transforms/create_test.py index fe9f84fdb31e..6dd642bcbb21 100644 --- a/sdks/python/apache_beam/transforms/create_test.py +++ b/sdks/python/apache_beam/transforms/create_test.py @@ -18,12 +18,8 @@ """Unit tests for the Create and _CreateSource classes.""" # pytype: skip-file -from __future__ import absolute_import -from __future__ import division - import logging import unittest -from builtins import range from apache_beam import Create from apache_beam.coders import FastPrimitivesCoder diff --git a/sdks/python/apache_beam/transforms/cy_combiners.py b/sdks/python/apache_beam/transforms/cy_combiners.py index 7e4cb0e6dbc7..dbbff0798ec2 100644 --- a/sdks/python/apache_beam/transforms/cy_combiners.py +++ b/sdks/python/apache_beam/transforms/cy_combiners.py @@ -24,11 +24,7 @@ # pytype: skip-file -from __future__ import absolute_import -from __future__ import division - import operator -from builtins import object from apache_beam.transforms import core diff --git a/sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py b/sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py index f072cd104b2c..eda658e245c6 100644 --- a/sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py +++ b/sdks/python/apache_beam/transforms/dataflow_distribution_counter_test.py @@ -16,8 +16,6 @@ # pytype: skip-file -from __future__ import absolute_import - import unittest from mock import Mock diff --git a/sdks/python/apache_beam/transforms/deduplicate.py b/sdks/python/apache_beam/transforms/deduplicate.py index 743412e3b5fe..8c348986ad60 100644 --- a/sdks/python/apache_beam/transforms/deduplicate.py +++ b/sdks/python/apache_beam/transforms/deduplicate.py @@ -19,9 +19,6 @@ """a collection of ptransforms for deduplicating elements.""" -from __future__ import absolute_import -from __future__ import division - import typing from apache_beam import typehints diff --git a/sdks/python/apache_beam/transforms/deduplicate_test.py b/sdks/python/apache_beam/transforms/deduplicate_test.py index 5bea89591ee1..36f61a78f88c 100644 --- a/sdks/python/apache_beam/transforms/deduplicate_test.py +++ b/sdks/python/apache_beam/transforms/deduplicate_test.py @@ -19,8 +19,6 @@ """Unit tests for deduplicate transform by using TestStream.""" -from __future__ import absolute_import - import unittest from nose.plugins.attrib import attr diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py index b88f6b1a4b32..2d3c6183b7b6 100644 --- a/sdks/python/apache_beam/transforms/display.py +++ b/sdks/python/apache_beam/transforms/display.py @@ -38,19 +38,14 @@ # pytype: skip-file -from __future__ import absolute_import - import calendar import inspect import json -from builtins import object from datetime import datetime from datetime import timedelta from typing import TYPE_CHECKING from typing import List -from past.builtins import unicode - from apache_beam.portability import common_urns from apache_beam.portability.api import beam_runner_api_pb2 @@ -231,7 +226,6 @@ class DisplayDataItem(object): """ typeDict = { str: 'STRING', - unicode: 'STRING', int: 'INTEGER', float: 'FLOAT', bool: 'BOOLEAN', diff --git a/sdks/python/apache_beam/transforms/display_test.py b/sdks/python/apache_beam/transforms/display_test.py index 840fc6d581bd..a7605b09a3ad 100644 --- a/sdks/python/apache_beam/transforms/display_test.py +++ b/sdks/python/apache_beam/transforms/display_test.py @@ -19,15 +19,12 @@ # pytype: skip-file -from __future__ import absolute_import - import unittest from datetime import datetime # pylint: disable=ungrouped-imports import hamcrest as hc from hamcrest.core.base_matcher import BaseMatcher -from past.builtins import unicode import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions @@ -167,7 +164,7 @@ def test_unicode_type_display_data(self): class MyDoFn(beam.DoFn): def display_data(self): return { - 'unicode_string': unicode('my string'), + 'unicode_string': 'my string', 'unicode_literal_string': u'my literal string' } diff --git a/sdks/python/apache_beam/transforms/dofn_lifecycle_test.py b/sdks/python/apache_beam/transforms/dofn_lifecycle_test.py index ec446a1bd1da..f50ebd0c768f 100644 --- a/sdks/python/apache_beam/transforms/dofn_lifecycle_test.py +++ b/sdks/python/apache_beam/transforms/dofn_lifecycle_test.py @@ -19,8 +19,6 @@ # pytype: skip-file -from __future__ import absolute_import - import unittest from nose.plugins.attrib import attr diff --git a/sdks/python/apache_beam/transforms/environments.py b/sdks/python/apache_beam/transforms/environments.py index 6876d965080a..717e96788321 100644 --- a/sdks/python/apache_beam/transforms/environments.py +++ b/sdks/python/apache_beam/transforms/environments.py @@ -21,8 +21,6 @@ # pytype: skip-file -from __future__ import absolute_import - import json import logging import sys diff --git a/sdks/python/apache_beam/transforms/environments_test.py b/sdks/python/apache_beam/transforms/environments_test.py index 020478d81103..e40b2ad95435 100644 --- a/sdks/python/apache_beam/transforms/environments_test.py +++ b/sdks/python/apache_beam/transforms/environments_test.py @@ -20,8 +20,6 @@ # pytype: skip-file -from __future__ import absolute_import - import logging import unittest diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index 19170916f067..dfff07765cca 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -21,9 +21,6 @@ """ # pytype: skip-file -from __future__ import absolute_import -from __future__ import print_function - import contextlib import copy import functools @@ -150,8 +147,6 @@ def _get_named_tuple_instance(self): class AnnotationBasedPayloadBuilder(SchemaBasedPayloadBuilder): """ Build a payload based on an external transform's type annotations. - - Supported in python 3 only. """ def __init__(self, transform, **values): """ @@ -174,8 +169,6 @@ def _get_named_tuple_instance(self): class DataclassBasedPayloadBuilder(SchemaBasedPayloadBuilder): """ Build a payload based on an external transform that uses dataclasses. - - Supported in python 3 only. """ def __init__(self, transform): """ diff --git a/sdks/python/apache_beam/transforms/external_it_test.py b/sdks/python/apache_beam/transforms/external_it_test.py index d99c21819716..b1eda0a68822 100644 --- a/sdks/python/apache_beam/transforms/external_it_test.py +++ b/sdks/python/apache_beam/transforms/external_it_test.py @@ -19,8 +19,6 @@ # pytype: skip-file -from __future__ import absolute_import - import unittest from nose.plugins.attrib import attr diff --git a/sdks/python/apache_beam/transforms/external_java.py b/sdks/python/apache_beam/transforms/external_java.py index a9cb723147fd..f0a963864c1d 100644 --- a/sdks/python/apache_beam/transforms/external_java.py +++ b/sdks/python/apache_beam/transforms/external_java.py @@ -17,8 +17,6 @@ """Tests for the Java external transforms.""" -from __future__ import absolute_import - import argparse import logging import subprocess @@ -26,7 +24,6 @@ import grpc from mock import patch -from past.builtins import unicode import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions @@ -139,7 +136,7 @@ def run_pipeline(pipeline_options, expansion_service, wait_until_finish=True): res = ( p | beam.Create(list('aaabccxyyzzz')) - | beam.Map(unicode) + | beam.Map(str) | beam.ExternalTransform( TEST_FILTER_URN, ImplicitSchemaPayloadBuilder({'data': u'middle'}), diff --git a/sdks/python/apache_beam/transforms/external_test.py b/sdks/python/apache_beam/transforms/external_test.py index 169091f8b3e9..f473eacd62d3 100644 --- a/sdks/python/apache_beam/transforms/external_test.py +++ b/sdks/python/apache_beam/transforms/external_test.py @@ -19,14 +19,11 @@ # pytype: skip-file -from __future__ import absolute_import - +import dataclasses import logging import typing import unittest -from past.builtins import unicode - import apache_beam as beam from apache_beam import Pipeline from apache_beam.coders import RowCoder @@ -36,14 +33,17 @@ from apache_beam.runners.portability.expansion_service_test import FibTransform from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +from apache_beam.transforms.external import AnnotationBasedPayloadBuilder from apache_beam.transforms.external import ImplicitSchemaPayloadBuilder from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder from apache_beam.typehints import typehints from apache_beam.typehints.native_type_compatibility import convert_to_beam_type -def get_payload(args): - return ExternalConfigurationPayload(configuration=args) +def get_payload(cls): + payload = ExternalConfigurationPayload() + payload.ParseFromString(cls._payload) + return payload class PayloadBase(object): @@ -83,32 +83,12 @@ def test_typing_payload_builder(self): for key, value in self.values.items(): self.assertEqual(getattr(decoded, key), value) - # TODO(BEAM-7372): Drop py2 specific "bytes" tests - def test_typing_payload_builder_with_bytes(self): - """ - string_utf8 coder will be used even if values are not unicode in python 2.x - """ - result = self.get_payload_from_typing_hints(self.bytes_values) - decoded = RowCoder(result.schema).decode(result.payload) - for key, value in self.values.items(): - self.assertEqual(getattr(decoded, key), value) - def test_typehints_payload_builder(self): result = self.get_payload_from_typing_hints(self.values) decoded = RowCoder(result.schema).decode(result.payload) for key, value in self.values.items(): self.assertEqual(getattr(decoded, key), value) - # TODO(BEAM-7372): Drop py2 specific "bytes" tests - def test_typehints_payload_builder_with_bytes(self): - """ - string_utf8 coder will be used even if values are not unicode in python 2.x - """ - result = self.get_payload_from_typing_hints(self.bytes_values) - decoded = RowCoder(result.schema).decode(result.payload) - for key, value in self.values.items(): - self.assertEqual(getattr(decoded, key), value) - def test_optional_error(self): """ value can only be None if typehint is Optional @@ -124,9 +104,9 @@ def get_payload_from_typing_hints(self, values): [ ('integer_example', int), ('boolean', bool), - ('string_example', unicode), - ('list_of_strings', typing.List[unicode]), - ('mapping', typing.Mapping[unicode, float]), + ('string_example', str), + ('list_of_strings', typing.List[str]), + ('mapping', typing.Mapping[str, float]), ('optional_integer', typing.Optional[int]), ]) @@ -279,6 +259,96 @@ def test_external_transform_finder_leaf(self): self.assertTrue(pipeline.contains_external_transforms) +class ExternalAnnotationPayloadTest(PayloadBase, unittest.TestCase): + def get_payload_from_typing_hints(self, values): + class AnnotatedTransform(beam.ExternalTransform): + URN = 'beam:external:fakeurn:v1' + + def __init__( + self, + integer_example: int, + boolean: bool, + string_example: str, + list_of_strings: typing.List[str], + mapping: typing.Mapping[str, float], + optional_integer: typing.Optional[int] = None, + expansion_service=None): + super(AnnotatedTransform, self).__init__( + self.URN, + AnnotationBasedPayloadBuilder( + self, + integer_example=integer_example, + boolean=boolean, + string_example=string_example, + list_of_strings=list_of_strings, + mapping=mapping, + optional_integer=optional_integer, + ), + expansion_service) + + return get_payload(AnnotatedTransform(**values)) + + def get_payload_from_beam_typehints(self, values): + class AnnotatedTransform(beam.ExternalTransform): + URN = 'beam:external:fakeurn:v1' + + def __init__( + self, + integer_example: int, + boolean: bool, + string_example: str, + list_of_strings: typehints.List[str], + mapping: typehints.Dict[str, float], + optional_integer: typehints.Optional[int] = None, + expansion_service=None): + super(AnnotatedTransform, self).__init__( + self.URN, + AnnotationBasedPayloadBuilder( + self, + integer_example=integer_example, + boolean=boolean, + string_example=string_example, + list_of_strings=list_of_strings, + mapping=mapping, + optional_integer=optional_integer, + ), + expansion_service) + + return get_payload(AnnotatedTransform(**values)) + + +class ExternalDataclassesPayloadTest(PayloadBase, unittest.TestCase): + def get_payload_from_typing_hints(self, values): + @dataclasses.dataclass + class DataclassTransform(beam.ExternalTransform): + URN = 'beam:external:fakeurn:v1' + + integer_example: int + boolean: bool + string_example: str + list_of_strings: typing.List[str] + mapping: typing.Mapping[str, float] = dataclasses.field(default=dict) + optional_integer: typing.Optional[int] = None + expansion_service: dataclasses.InitVar[typing.Optional[str]] = None + + return get_payload(DataclassTransform(**values)) + + def get_payload_from_beam_typehints(self, values): + @dataclasses.dataclass + class DataclassTransform(beam.ExternalTransform): + URN = 'beam:external:fakeurn:v1' + + integer_example: int + boolean: bool + string_example: str + list_of_strings: typehints.List[str] + mapping: typehints.Dict[str, float] = {} + optional_integer: typehints.Optional[int] = None + expansion_service: dataclasses.InitVar[typehints.Optional[str]] = None + + return get_payload(DataclassTransform(**values)) + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() diff --git a/sdks/python/apache_beam/transforms/external_test_py3.py b/sdks/python/apache_beam/transforms/external_test_py3.py deleted file mode 100644 index 2549eee5c474..000000000000 --- a/sdks/python/apache_beam/transforms/external_test_py3.py +++ /dev/null @@ -1,99 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Unit tests for the transform.external classes.""" - -# pytype: skip-file - -from __future__ import absolute_import - -import typing -import unittest - -import apache_beam as beam -from apache_beam import typehints -from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload -from apache_beam.transforms.external import AnnotationBasedPayloadBuilder -from apache_beam.transforms.external_test import PayloadBase - - -def get_payload(cls): - payload = ExternalConfigurationPayload() - payload.ParseFromString(cls._payload) - return payload - - -class ExternalAnnotationPayloadTest(PayloadBase, unittest.TestCase): - def get_payload_from_typing_hints(self, values): - class AnnotatedTransform(beam.ExternalTransform): - URN = 'beam:external:fakeurn:v1' - - def __init__( - self, - integer_example: int, - boolean: bool, - string_example: str, - list_of_strings: typing.List[str], - mapping: typing.Mapping[str, float], - optional_integer: typing.Optional[int] = None, - expansion_service=None): - super(AnnotatedTransform, self).__init__( - self.URN, - AnnotationBasedPayloadBuilder( - self, - integer_example=integer_example, - boolean=boolean, - string_example=string_example, - list_of_strings=list_of_strings, - mapping=mapping, - optional_integer=optional_integer, - ), - expansion_service) - - return get_payload(AnnotatedTransform(**values)) - - def get_payload_from_beam_typehints(self, values): - class AnnotatedTransform(beam.ExternalTransform): - URN = 'beam:external:fakeurn:v1' - - def __init__( - self, - integer_example: int, - boolean: bool, - string_example: str, - list_of_strings: typehints.List[str], - mapping: typehints.Dict[str, float], - optional_integer: typehints.Optional[int] = None, - expansion_service=None): - super(AnnotatedTransform, self).__init__( - self.URN, - AnnotationBasedPayloadBuilder( - self, - integer_example=integer_example, - boolean=boolean, - string_example=string_example, - list_of_strings=list_of_strings, - mapping=mapping, - optional_integer=optional_integer, - ), - expansion_service) - - return get_payload(AnnotatedTransform(**values)) - - -if __name__ == '__main__': - unittest.main() diff --git a/sdks/python/apache_beam/transforms/external_test_py37.py b/sdks/python/apache_beam/transforms/external_test_py37.py deleted file mode 100644 index 52eec45ba09c..000000000000 --- a/sdks/python/apache_beam/transforms/external_test_py37.py +++ /dev/null @@ -1,73 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Unit tests for the transform.external classes.""" - -# pytype: skip-file - -from __future__ import absolute_import - -import dataclasses -import typing -import unittest - -import apache_beam as beam -from apache_beam import typehints -from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload -from apache_beam.transforms.external_test import PayloadBase - - -def get_payload(cls): - payload = ExternalConfigurationPayload() - payload.ParseFromString(cls._payload) - return payload - - -class ExternalDataclassesPayloadTest(PayloadBase, unittest.TestCase): - def get_payload_from_typing_hints(self, values): - @dataclasses.dataclass - class DataclassTransform(beam.ExternalTransform): - URN = 'beam:external:fakeurn:v1' - - integer_example: int - boolean: bool - string_example: str - list_of_strings: typing.List[str] - mapping: typing.Mapping[str, float] = dataclasses.field(default=dict) - optional_integer: typing.Optional[int] = None - expansion_service: dataclasses.InitVar[typing.Optional[str]] = None - - return get_payload(DataclassTransform(**values)) - - def get_payload_from_beam_typehints(self, values): - @dataclasses.dataclass - class DataclassTransform(beam.ExternalTransform): - URN = 'beam:external:fakeurn:v1' - - integer_example: int - boolean: bool - string_example: str - list_of_strings: typehints.List[str] - mapping: typehints.Dict[str, float] = {} - optional_integer: typehints.Optional[int] = None - expansion_service: dataclasses.InitVar[typehints.Optional[str]] = None - - return get_payload(DataclassTransform(**values)) - - -if __name__ == '__main__': - unittest.main() diff --git a/sdks/python/apache_beam/transforms/periodicsequence.py b/sdks/python/apache_beam/transforms/periodicsequence.py index f3fc344bd6ba..9fc902992d40 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence.py +++ b/sdks/python/apache_beam/transforms/periodicsequence.py @@ -15,9 +15,6 @@ # limitations under the License. # -from __future__ import absolute_import -from __future__ import division - import math import time diff --git a/sdks/python/apache_beam/transforms/periodicsequence_test.py b/sdks/python/apache_beam/transforms/periodicsequence_test.py index 0c96c579e3df..7e6549e60890 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence_test.py +++ b/sdks/python/apache_beam/transforms/periodicsequence_test.py @@ -19,14 +19,9 @@ # pytype: skip-file -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - import inspect import time import unittest -from builtins import range import apache_beam as beam from apache_beam.testing.test_pipeline import TestPipeline diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index e823a1ce26d3..e859b382e050 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -36,8 +36,6 @@ class and wrapper class that allows lambda functions to be used as # pytype: skip-file -from __future__ import absolute_import - import copy import itertools import logging @@ -45,9 +43,6 @@ class and wrapper class that allows lambda functions to be used as import os import sys import threading -from builtins import hex -from builtins import object -from builtins import zip from functools import reduce from functools import wraps from typing import TYPE_CHECKING diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index e540197bbb11..8e5be547c580 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -19,10 +19,6 @@ # pytype: skip-file -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - import collections import operator import pickle @@ -30,16 +26,11 @@ import re import typing import unittest -from builtins import map -from builtins import range -from builtins import zip from functools import reduce from typing import Iterable from typing import Optional from unittest.mock import patch -# patches unittest.TestCase to be python3 compatible -import future.tests.base # pylint: disable=unused-import import hamcrest as hc from nose.plugins.attrib import attr diff --git a/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py b/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py index 0be434879c78..61e0c87a2d7d 100644 --- a/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py +++ b/sdks/python/apache_beam/transforms/py_dataflow_distribution_counter.py @@ -19,11 +19,6 @@ # pytype: skip-file -from __future__ import absolute_import - -from builtins import object -from builtins import range - globals()['INT64_MAX'] = 2**63 - 1 globals()['INT64_MIN'] = -2**63 diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py index 9832f0f01084..4088d3e827b0 100644 --- a/sdks/python/apache_beam/transforms/sideinputs.py +++ b/sdks/python/apache_beam/transforms/sideinputs.py @@ -26,10 +26,7 @@ # pytype: skip-file -from __future__ import absolute_import - import re -from builtins import object from typing import TYPE_CHECKING from typing import Any from typing import Callable diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py index 1ed2f296b5de..5f4f49fb88ec 100644 --- a/sdks/python/apache_beam/transforms/sideinputs_test.py +++ b/sdks/python/apache_beam/transforms/sideinputs_test.py @@ -19,8 +19,6 @@ # pytype: skip-file -from __future__ import absolute_import - import itertools import logging import unittest diff --git a/sdks/python/apache_beam/transforms/sql.py b/sdks/python/apache_beam/transforms/sql.py index 244cd17ef049..30d546443d06 100644 --- a/sdks/python/apache_beam/transforms/sql.py +++ b/sdks/python/apache_beam/transforms/sql.py @@ -19,12 +19,8 @@ # pytype: skip-file -from __future__ import absolute_import - import typing -from past.builtins import unicode - from apache_beam.transforms.external import BeamJarExpansionService from apache_beam.transforms.external import ExternalTransform from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder @@ -32,8 +28,7 @@ __all__ = ['SqlTransform'] SqlTransformSchema = typing.NamedTuple( - 'SqlTransformSchema', [('query', unicode), - ('dialect', typing.Optional[unicode])]) + 'SqlTransformSchema', [('query', str), ('dialect', typing.Optional[str])]) class SqlTransform(ExternalTransform): diff --git a/sdks/python/apache_beam/transforms/sql_test.py b/sdks/python/apache_beam/transforms/sql_test.py index c38fb9b03068..e7a718f28047 100644 --- a/sdks/python/apache_beam/transforms/sql_test.py +++ b/sdks/python/apache_beam/transforms/sql_test.py @@ -19,14 +19,11 @@ # pytype: skip-file -from __future__ import absolute_import - import logging import typing import unittest from nose.plugins.attrib import attr -from past.builtins import unicode import apache_beam as beam from apache_beam import coders @@ -37,14 +34,14 @@ from apache_beam.transforms.sql import SqlTransform SimpleRow = typing.NamedTuple( - "SimpleRow", [("id", int), ("str", unicode), ("flt", float)]) + "SimpleRow", [("id", int), ("str", str), ("flt", float)]) coders.registry.register_coder(SimpleRow, coders.RowCoder) -Enrich = typing.NamedTuple("Enrich", [("id", int), ("metadata", unicode)]) +Enrich = typing.NamedTuple("Enrich", [("id", int), ("metadata", str)]) coders.registry.register_coder(Enrich, coders.RowCoder) Shopper = typing.NamedTuple( - "Shopper", [("shopper", unicode), ("cart", typing.Mapping[unicode, int])]) + "Shopper", [("shopper", str), ("cart", typing.Mapping[str, int])]) coders.registry.register_coder(Shopper, coders.RowCoder) @@ -149,7 +146,7 @@ def test_row(self): out = ( p | beam.Create([1, 2, 10]) - | beam.Map(lambda x: beam.Row(a=x, b=unicode(x))) + | beam.Map(lambda x: beam.Row(a=x, b=str(x))) | SqlTransform("SELECT a*a as s, LENGTH(b) AS c FROM PCOLLECTION")) assert_that(out, equal_to([(1, 1), (4, 1), (100, 2)])) diff --git a/sdks/python/apache_beam/transforms/stats.py b/sdks/python/apache_beam/transforms/stats.py index 003d7744f2fb..cbd79f474727 100644 --- a/sdks/python/apache_beam/transforms/stats.py +++ b/sdks/python/apache_beam/transforms/stats.py @@ -30,16 +30,12 @@ # pytype: skip-file -from __future__ import absolute_import -from __future__ import division - import hashlib import heapq import itertools import logging import math import typing -from builtins import round from typing import Any from typing import Callable from typing import List @@ -145,8 +141,7 @@ def _get_sample_size_from_est_error(est_err): Calculate sample size from estimation error """ - #math.ceil in python2.7 returns a float, while it returns an int in python3. - return int(math.ceil(4.0 / math.pow(est_err, 2.0))) + return math.ceil(4.0 / math.pow(est_err, 2.0)) @typehints.with_input_types(T) @typehints.with_output_types(int) diff --git a/sdks/python/apache_beam/transforms/stats_test.py b/sdks/python/apache_beam/transforms/stats_test.py index 1cd8c8fa7d69..739438035c88 100644 --- a/sdks/python/apache_beam/transforms/stats_test.py +++ b/sdks/python/apache_beam/transforms/stats_test.py @@ -18,14 +18,10 @@ # pytype: skip-file -from __future__ import absolute_import -from __future__ import division - import math import random import sys import unittest -from builtins import range from collections import defaultdict import hamcrest as hc diff --git a/sdks/python/apache_beam/transforms/timeutil.py b/sdks/python/apache_beam/transforms/timeutil.py index 4525f9db8edc..87294b0dcf4d 100644 --- a/sdks/python/apache_beam/transforms/timeutil.py +++ b/sdks/python/apache_beam/transforms/timeutil.py @@ -19,13 +19,8 @@ # pytype: skip-file -from __future__ import absolute_import - from abc import ABCMeta from abc import abstractmethod -from builtins import object - -from future.utils import with_metaclass from apache_beam.portability.api import beam_runner_api_pb2 @@ -63,7 +58,7 @@ def is_event_time(domain): return TimeDomain.from_string(domain) == TimeDomain.WATERMARK -class TimestampCombinerImpl(with_metaclass(ABCMeta, object)): # type: ignore[misc] +class TimestampCombinerImpl(metaclass=ABCMeta): """Implementation of TimestampCombiner.""" @abstractmethod def assign_output_time(self, window, input_timestamp): @@ -88,7 +83,7 @@ def merge(self, unused_result_window, merging_timestamps): return self.combine_all(merging_timestamps) -class DependsOnlyOnWindow(with_metaclass(ABCMeta, TimestampCombinerImpl)): # type: ignore[misc] +class DependsOnlyOnWindow(TimestampCombinerImpl, metaclass=ABCMeta): """TimestampCombinerImpl that only depends on the window.""" def merge(self, result_window, unused_merging_timestamps): # Since we know that the result only depends on the window, we can ignore diff --git a/sdks/python/apache_beam/transforms/transforms_keyword_only_args_test_py3.py b/sdks/python/apache_beam/transforms/transforms_keyword_only_args_test.py similarity index 99% rename from sdks/python/apache_beam/transforms/transforms_keyword_only_args_test_py3.py rename to sdks/python/apache_beam/transforms/transforms_keyword_only_args_test.py index 374f7c41b072..80e0aff642c5 100644 --- a/sdks/python/apache_beam/transforms/transforms_keyword_only_args_test_py3.py +++ b/sdks/python/apache_beam/transforms/transforms_keyword_only_args_test.py @@ -19,8 +19,6 @@ # pytype: skip-file -from __future__ import absolute_import - import logging import unittest diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py index 863cb4b13af3..556789532ca5 100644 --- a/sdks/python/apache_beam/transforms/trigger.py +++ b/sdks/python/apache_beam/transforms/trigger.py @@ -22,19 +22,13 @@ # pytype: skip-file -from __future__ import absolute_import - import collections import copy import logging import numbers from abc import ABCMeta from abc import abstractmethod -from builtins import object - -from future.moves.itertools import zip_longest -from future.utils import iteritems -from future.utils import with_metaclass +from itertools import zip_longest from apache_beam.coders import coder_impl from apache_beam.coders import observable @@ -79,7 +73,7 @@ class AccumulationMode(object): # RETRACTING = 3 -class _StateTag(with_metaclass(ABCMeta, object)): # type: ignore[misc] +class _StateTag(metaclass=ABCMeta): """An identifier used to store and retrieve typed, combinable state. The given tag must be unique for this step.""" @@ -164,7 +158,7 @@ def with_prefix(self, prefix): # pylint: disable=unused-argument # TODO(robertwb): Provisional API, Java likely to change as well. -class TriggerFn(with_metaclass(ABCMeta, object)): # type: ignore[misc] +class TriggerFn(metaclass=ABCMeta): """A TriggerFn determines when window (panes) are emitted. See https://beam.apache.org/documentation/programming-guide/#triggers @@ -657,7 +651,7 @@ def has_ontime_pane(self): return self.underlying.has_ontime_pane() -class _ParallelTriggerFn(with_metaclass(ABCMeta, TriggerFn)): # type: ignore[misc] +class _ParallelTriggerFn(TriggerFn, metaclass=ABCMeta): def __init__(self, *triggers): self.triggers = triggers @@ -903,7 +897,7 @@ def clear_state(self, tag): # pylint: disable=unused-argument -class SimpleState(with_metaclass(ABCMeta, object)): # type: ignore[misc] +class SimpleState(metaclass=ABCMeta): """Basic state storage interface used for triggering. Only timers must hold the watermark (by their timestamp). @@ -1093,7 +1087,7 @@ def create_trigger_driver( return driver -class TriggerDriver(with_metaclass(ABCMeta, object)): # type: ignore[misc] +class TriggerDriver(metaclass=ABCMeta): """Breaks a series of bundle and timer firings into window (pane)s.""" @abstractmethod def process_elements( @@ -1529,7 +1523,7 @@ def get_and_clear_timers(self, watermark=MAX_TIMESTAMP): def get_earliest_hold(self): earliest_hold = MAX_TIMESTAMP - for unused_window, tagged_states in iteritems(self.state): + for unused_window, tagged_states in self.state.items(): # TODO(BEAM-2519): currently, this assumes that the watermark hold tag is # named "watermark". This is currently only true because the only place # watermark holds are set is in the GeneralTriggerDriver, where we use diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py index c0463067b0e0..a3dd4385c74c 100644 --- a/sdks/python/apache_beam/transforms/trigger_test.py +++ b/sdks/python/apache_beam/transforms/trigger_test.py @@ -19,19 +19,13 @@ # pytype: skip-file -from __future__ import absolute_import - import collections import json import os.path import pickle import random import unittest -from builtins import range -from builtins import zip -# patches unittest.TestCase to be python3 compatible -import future.tests.base # pylint: disable=unused-import import yaml import apache_beam as beam diff --git a/sdks/python/apache_beam/transforms/userstate.py b/sdks/python/apache_beam/transforms/userstate.py index 82ebd6ffc06e..84184d4bde02 100644 --- a/sdks/python/apache_beam/transforms/userstate.py +++ b/sdks/python/apache_beam/transforms/userstate.py @@ -23,11 +23,8 @@ # pytype: skip-file # mypy: disallow-untyped-defs -from __future__ import absolute_import - import collections import types -from builtins import object from typing import TYPE_CHECKING from typing import Any from typing import Callable diff --git a/sdks/python/apache_beam/transforms/userstate_test.py b/sdks/python/apache_beam/transforms/userstate_test.py index fac9bff0694c..702d5c367228 100644 --- a/sdks/python/apache_beam/transforms/userstate_test.py +++ b/sdks/python/apache_beam/transforms/userstate_test.py @@ -18,13 +18,10 @@ """Unit tests for the Beam State and Timer API interfaces.""" # pytype: skip-file -from __future__ import absolute_import - import unittest from typing import Any from typing import List -# patches unittest.TestCase to be python3 compatible import mock import apache_beam as beam diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index ec0012e55fc5..a4ed004fe25d 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -20,9 +20,6 @@ # pytype: skip-file -from __future__ import absolute_import -from __future__ import division - import collections import contextlib import random @@ -30,10 +27,6 @@ import threading import time import uuid -from builtins import filter -from builtins import object -from builtins import range -from builtins import zip from typing import TYPE_CHECKING from typing import Any from typing import Iterable @@ -42,8 +35,6 @@ from typing import TypeVar from typing import Union -from future.utils import itervalues - from apache_beam import coders from apache_beam import typehints from apache_beam.metrics import Metrics @@ -162,7 +153,7 @@ def __init__(self, **kwargs): def _extract_input_pvalues(self, pvalueish): try: # If this works, it's a dict. - return pvalueish, tuple(itervalues(pvalueish)) + return pvalueish, tuple(pvalueish.values()) except AttributeError: pcolls = tuple(pvalueish) return pcolls, pcolls diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py index 50710ee0a03c..df76d15ec794 100644 --- a/sdks/python/apache_beam/transforms/util_test.py +++ b/sdks/python/apache_beam/transforms/util_test.py @@ -19,9 +19,6 @@ # pytype: skip-file -from __future__ import absolute_import -from __future__ import division - import logging import math import random @@ -29,11 +26,7 @@ import time import unittest import warnings -from builtins import object -from builtins import range -# patches unittest.TestCase to be python3 compatible -import future.tests.base # pylint: disable=unused-import from nose.plugins.attrib import attr import apache_beam as beam diff --git a/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py b/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py index e24587bc2839..1a2eee7c7823 100644 --- a/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py +++ b/sdks/python/apache_beam/transforms/validate_runner_xlang_test.py @@ -50,15 +50,12 @@ for further details. """ -from __future__ import absolute_import - import logging import os import typing import unittest from nose.plugins.attrib import attr -from past.builtins import unicode import apache_beam as beam from apache_beam.testing.test_pipeline import TestPipeline @@ -94,7 +91,7 @@ def run_prefix(self, pipeline): with pipeline as p: res = ( p - | beam.Create(['a', 'b']).with_output_types(unicode) + | beam.Create(['a', 'b']).with_output_types(str) | beam.ExternalTransform( TEST_PREFIX_URN, ImplicitSchemaPayloadBuilder({'data': u'0'}), @@ -113,10 +110,10 @@ def run_multi_input_output_with_sideinput(self, pipeline): """ with pipeline as p: main1 = p | 'Main1' >> beam.Create( - ['a', 'bb'], reshuffle=False).with_output_types(unicode) + ['a', 'bb'], reshuffle=False).with_output_types(str) main2 = p | 'Main2' >> beam.Create( - ['x', 'yy', 'zzz'], reshuffle=False).with_output_types(unicode) - side = p | 'Side' >> beam.Create(['s']).with_output_types(unicode) + ['x', 'yy', 'zzz'], reshuffle=False).with_output_types(str) + side = p | 'Side' >> beam.Create(['s']).with_output_types(str) res = dict( main1=main1, main2=main2, side=side) | beam.ExternalTransform( TEST_MULTI_URN, None, self.expansion_service) @@ -138,7 +135,7 @@ def run_group_by_key(self, pipeline): p | beam.Create([(0, "1"), (0, "2"), (1, "3")], reshuffle=False).with_output_types( - typing.Tuple[int, unicode]) + typing.Tuple[int, str]) | beam.ExternalTransform(TEST_GBK_URN, None, self.expansion_service) | beam.Map(lambda x: "{}:{}".format(x[0], ','.join(sorted(x[1]))))) assert_that(res, equal_to(['0:1,2', '1:3'])) @@ -156,10 +153,10 @@ def run_cogroup_by_key(self, pipeline): with pipeline as p: col1 = p | 'create_col1' >> beam.Create( [(0, "1"), (0, "2"), (1, "3")], reshuffle=False).with_output_types( - typing.Tuple[int, unicode]) + typing.Tuple[int, str]) col2 = p | 'create_col2' >> beam.Create( [(0, "4"), (1, "5"), (1, "6")], reshuffle=False).with_output_types( - typing.Tuple[int, unicode]) + typing.Tuple[int, str]) res = ( dict(col1=col1, col2=col2) | beam.ExternalTransform(TEST_CGBK_URN, None, self.expansion_service) @@ -197,8 +194,8 @@ def run_combine_per_key(self, pipeline): with pipeline as p: res = ( p - | beam.Create([('a', 1), ('a', 2), ('b', 3)]).with_output_types( - typing.Tuple[unicode, int]) + | beam.Create([('a', 1), ('a', 2), + ('b', 3)]).with_output_types(typing.Tuple[str, int]) | beam.ExternalTransform( TEST_COMPK_URN, None, self.expansion_service)) assert_that(res, equal_to([('a', 3), ('b', 3)])) diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index 39151924c39e..de16c73079b9 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -49,18 +49,13 @@ # pytype: skip-file -from __future__ import absolute_import - import abc -from builtins import object -from builtins import range from functools import total_ordering from typing import Any from typing import Iterable from typing import List from typing import Optional -from future.utils import with_metaclass from google.protobuf import duration_pb2 from google.protobuf import timestamp_pb2 @@ -121,7 +116,7 @@ def get_impl(timestamp_combiner, window_fn): raise ValueError('Invalid TimestampCombiner: %s.' % timestamp_combiner) -class WindowFn(with_metaclass(abc.ABCMeta, urns.RunnerApiFn)): # type: ignore[misc] +class WindowFn(urns.RunnerApiFn, metaclass=abc.ABCMeta): """An abstract windowing function defining a basic assign and merge.""" class AssignContext(object): """Context passed to WindowFn.assign().""" diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py index 3b56a3eb20de..7369090729d6 100644 --- a/sdks/python/apache_beam/transforms/window_test.py +++ b/sdks/python/apache_beam/transforms/window_test.py @@ -18,11 +18,7 @@ """Unit tests for the windowing classes.""" # pytype: skip-file -from __future__ import absolute_import -from __future__ import division - import unittest -from builtins import range import apache_beam as beam from apache_beam.coders import coders diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py index 7ab656324831..ce402d8d3062 100644 --- a/sdks/python/apache_beam/transforms/write_ptransform_test.py +++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py @@ -19,8 +19,6 @@ # pytype: skip-file -from __future__ import absolute_import - import logging import unittest