Skip to content

Commit

Permalink
[BEAM-3713] Move validatesRunnerBatchTests and validatesRunnerStreami…
Browse files Browse the repository at this point in the history
…ng tests from nose to pytest (#14788)
  • Loading branch information
benWize authored May 20, 2021
1 parent e0e3432 commit 02a2222
Show file tree
Hide file tree
Showing 16 changed files with 110 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_Py_VR_Dataflow', 'Run Python
commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 200)

publishers {
archiveJunit('**/nosetests*.xml')
archiveJunit('**/pytest*.xml')
}

// Execute gradle task to test Python SDK.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_Py_VR_Dataflow_V2', 'Run Pyt
commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 200)

publishers {
archiveJunit('**/nosetests*.xml')
archiveJunit('**/pytest*.xml')
}

// Execute gradle task to test Python SDK.
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/metrics/metric_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import unittest

import hamcrest as hc
from nose.plugins.attrib import attr
import pytest

import apache_beam as beam
from apache_beam import metrics
Expand Down Expand Up @@ -150,7 +150,7 @@ def test_general_urn_metric_name_str(self):
"urn=my_urn, labels={'key': 'value'})")
self.assertEqual(str(mn), expected_str)

@attr('ValidatesRunner')
@pytest.mark.it_validatesrunner
def test_user_counter_using_pardo(self):
class SomeDoFn(beam.DoFn):
"""A custom dummy DoFn using yield."""
Expand Down
9 changes: 4 additions & 5 deletions sdks/python/apache_beam/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from builtins import object
from builtins import range

from nose.plugins.attrib import attr
import pytest

import apache_beam as beam
from apache_beam import typehints
Expand Down Expand Up @@ -64,7 +64,6 @@
from apache_beam.utils.timestamp import MIN_TIMESTAMP

# TODO(BEAM-1555): Test is failing on the service, with FakeSource.
# from nose.plugins.attrib import attr


class FakeSource(NativeSource):
Expand Down Expand Up @@ -263,7 +262,7 @@ def test_create_singleton_pcollection(self):
assert_that(pcoll, equal_to([[1, 2, 3]]))

# TODO(BEAM-1555): Test is failing on the service, with FakeSource.
# @attr('ValidatesRunner')
# @pytest.mark.it_validatesrunner
def test_metrics_in_fake_source(self):
pipeline = TestPipeline()
pcoll = pipeline | Read(FakeSource([1, 2, 3, 4, 5, 6]))
Expand Down Expand Up @@ -723,7 +722,7 @@ def process(self, element, prefix, suffix=DoFn.SideInputParam):
TestDoFn(), prefix, suffix=AsSingleton(suffix))
assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))

@attr('ValidatesRunner')
@pytest.mark.it_validatesrunner
def test_element_param(self):
pipeline = TestPipeline()
input = [1, 2]
Expand All @@ -734,7 +733,7 @@ def test_element_param(self):
assert_that(pcoll, equal_to(input))
pipeline.run()

@attr('ValidatesRunner')
@pytest.mark.it_validatesrunner
def test_key_param(self):
pipeline = TestPipeline()
pcoll = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import unittest
import uuid

import pytest
from hamcrest.core.core.allof import all_of
from nose.plugins.attrib import attr

from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
from apache_beam.runners.dataflow import dataflow_exercise_streaming_metrics_pipeline
Expand Down Expand Up @@ -113,7 +113,9 @@ def run_pipeline(self):
return dataflow_exercise_streaming_metrics_pipeline.run(argv)

# Need not run streaming test in batch mode.
@attr('IT', 'ValidatesRunner', 'sickbay-batch')
@pytest.mark.it_validatesrunner
@pytest.mark.no_sickbay_batch
@pytest.mark.no_xdist
def test_streaming_pipeline_returns_expected_user_metrics_fnapi_it(self):
"""
Runs streaming Dataflow job and verifies that user metrics are reported
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
from typing import Tuple

import hamcrest # pylint: disable=ungrouped-imports
import pytest
from hamcrest.core.matcher import Matcher
from hamcrest.core.string_description import StringDescription
from nose.plugins.attrib import attr
from tenacity import retry
from tenacity import stop_after_attempt

Expand Down Expand Up @@ -1989,7 +1989,7 @@ def __reduce__(self):
return (self.__class__, (self.num_elements, 'x' * self.num_elements))


@attr('ValidatesRunner')
@pytest.mark.it_validatesrunner
class FnApiBasedStateBackedCoderTest(unittest.TestCase):
def create_pipeline(self):
return beam.Pipeline(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import logging
import unittest

from nose.plugins.attrib import attr
import pytest

import apache_beam as beam
from apache_beam import runners
Expand Down Expand Up @@ -249,7 +249,7 @@ def assert_is_topologically_sorted(transform_id, visited_pcolls):
assert_is_topologically_sorted(
optimized_pipeline_proto.root_transform_ids[0], set())

@attr('ValidatesRunner')
@pytest.mark.it_validatesrunner
def test_run_packable_combine_per_key(self):
class MultipleCombines(beam.PTransform):
def annotations(self):
Expand Down Expand Up @@ -279,7 +279,7 @@ def expand(self, pcoll):
| Create([('a', x) for x in vals])
| 'multiple-combines' >> MultipleCombines())

@attr('ValidatesRunner')
@pytest.mark.it_validatesrunner
def test_run_packable_combine_globally(self):
class MultipleCombines(beam.PTransform):
def annotations(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import unittest
from functools import wraps

from nose.plugins.attrib import attr
import pytest
from parameterized import parameterized_class

from apache_beam.options.pipeline_options import DebugOptions
Expand Down Expand Up @@ -54,7 +54,7 @@ def wrapped(*args, **kwargs):
return wrapped


@attr('ValidatesRunner')
@pytest.mark.it_validatesrunner
class CombineFnLifecycleTest(unittest.TestCase):
def setUp(self):
self.pipeline = TestPipeline(is_integration_test=True)
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/transforms/combiners_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import unittest

import hamcrest as hc
from nose.plugins.attrib import attr
import pytest

import apache_beam as beam
import apache_beam.transforms.combiners as combine
Expand Down Expand Up @@ -811,7 +811,7 @@ def test_with_input_types_decorator_violation(self):
#
# Test cases for streaming.
#
@attr('ValidatesRunner')
@pytest.mark.it_validatesrunner
class TimestampCombinerTest(unittest.TestCase):
def test_combiner_earliest(self):
"""Test TimestampCombiner with EARLIEST."""
Expand Down
6 changes: 4 additions & 2 deletions sdks/python/apache_beam/transforms/deduplicate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import unittest

from nose.plugins.attrib import attr
import pytest

import apache_beam as beam
from apache_beam.coders import coders
Expand All @@ -39,7 +39,9 @@
# TestStream is only supported in streaming pipeline. The Deduplicate transform
# also requires Timer support. Sickbaying this testsuite until dataflow runner
# supports both TestStream and user timer.
@attr('ValidatesRunner', 'sickbay-batch', 'sickbay-streaming')
@pytest.mark.no_sickbay_batch
@pytest.mark.no_sickbay_streaming
@pytest.mark.it_validatesrunner
class DeduplicateTest(unittest.TestCase):
def __init__(self, *args, **kwargs):
self.runner = None
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/transforms/dofn_lifecycle_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import unittest

from nose.plugins.attrib import attr
import pytest

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
Expand Down Expand Up @@ -74,7 +74,7 @@ def teardown(self):
self._teardown_called = True


@attr('ValidatesRunner')
@pytest.mark.it_validatesrunner
class DoFnLifecycleTest(unittest.TestCase):
def test_dofn_lifecycle(self):
with TestPipeline() as p:
Expand Down
26 changes: 14 additions & 12 deletions sdks/python/apache_beam/transforms/ptransform_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from unittest.mock import patch

import hamcrest as hc
from nose.plugins.attrib import attr
import pytest

import apache_beam as beam
import apache_beam.pvalue as pvalue
Expand Down Expand Up @@ -179,15 +179,16 @@ def test_do_with_multiple_outputs_maintains_unique_name(self):
assert_that(r1.m, equal_to([2, 3, 4]), label='r1')
assert_that(r2.m, equal_to([3, 4, 5]), label='r2')

@attr('ValidatesRunner')
@pytest.mark.it_validatesrunner
def test_impulse(self):
with TestPipeline() as pipeline:
result = pipeline | beam.Impulse() | beam.Map(lambda _: 0)
assert_that(result, equal_to([0]))

# TODO(BEAM-3544): Disable this test in streaming temporarily.
# Remove sickbay-streaming tag after it's resolved.
@attr('ValidatesRunner', 'sickbay-streaming')
@pytest.mark.no_sickbay_streaming
@pytest.mark.it_validatesrunner
def test_read_metrics(self):
from apache_beam.io.utils import CountingSource

Expand All @@ -213,7 +214,7 @@ def process(self, element):
self.assertEqual(outputs_counter.committed, 100)
self.assertEqual(outputs_counter.attempted, 100)

@attr('ValidatesRunner')
@pytest.mark.it_validatesrunner
def test_par_do_with_multiple_outputs_and_using_yield(self):
class SomeDoFn(beam.DoFn):
"""A custom DoFn using yield."""
Expand All @@ -232,7 +233,7 @@ def process(self, element):
assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
assert_that(results.even, equal_to([2, 4]), label='assert:even')

@attr('ValidatesRunner')
@pytest.mark.it_validatesrunner
def test_par_do_with_multiple_outputs_and_using_return(self):
def some_fn(v):
if v % 2 == 0:
Expand All @@ -247,7 +248,7 @@ def some_fn(v):
assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
assert_that(results.even, equal_to([2, 4]), label='assert:even')

@attr('ValidatesRunner')
@pytest.mark.it_validatesrunner
def test_undeclared_outputs(self):
with TestPipeline() as pipeline:
nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4])
Expand All @@ -261,7 +262,7 @@ def test_undeclared_outputs(self):
assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
assert_that(results.even, equal_to([2, 4]), label='assert:even')

@attr('ValidatesRunner')
@pytest.mark.it_validatesrunner
def test_multiple_empty_outputs(self):
with TestPipeline() as pipeline:
nums = pipeline | 'Some Numbers' >> beam.Create([1, 3, 5])
Expand Down Expand Up @@ -646,7 +647,7 @@ def test_partition_followed_by_flatten_and_groupbykey(self):
grouped = flattened | 'D' >> beam.GroupByKey() | _SortLists
assert_that(grouped, equal_to([('aa', [1, 2]), ('bb', [2])]))

@attr('ValidatesRunner')
@pytest.mark.it_validatesrunner
def test_flatten_pcollections(self):
with TestPipeline() as pipeline:
pcoll_1 = pipeline | 'Start 1' >> beam.Create([0, 1, 2, 3])
Expand All @@ -661,7 +662,7 @@ def test_flatten_no_pcollections(self):
result = () | 'Empty' >> beam.Flatten(pipeline=pipeline)
assert_that(result, equal_to([]))

@attr('ValidatesRunner')
@pytest.mark.it_validatesrunner
def test_flatten_one_single_pcollection(self):
with TestPipeline() as pipeline:
input = [0, 1, 2, 3]
Expand All @@ -670,7 +671,8 @@ def test_flatten_one_single_pcollection(self):
assert_that(result, equal_to(input))

# TODO(BEAM-9002): Does not work in streaming mode on Dataflow.
@attr('ValidatesRunner', 'sickbay-streaming')
@pytest.mark.no_sickbay_streaming
@pytest.mark.it_validatesrunner
def test_flatten_same_pcollections(self):
with TestPipeline() as pipeline:
pc = pipeline | beam.Create(['a', 'b'])
Expand All @@ -683,7 +685,7 @@ def test_flatten_pcollections_in_iterable(self):
result = [pcoll for pcoll in (pcoll_1, pcoll_2)] | beam.Flatten()
assert_that(result, equal_to([0, 1, 2, 3, 4, 5, 6, 7]))

@attr('ValidatesRunner')
@pytest.mark.it_validatesrunner
def test_flatten_a_flattened_pcollection(self):
with TestPipeline() as pipeline:
pcoll_1 = pipeline | 'Start 1' >> beam.Create([0, 1, 2, 3])
Expand All @@ -705,7 +707,7 @@ def test_flatten_input_type_must_be_iterable_of_pcolls(self):
with self.assertRaises(TypeError):
set([1, 2, 3]) | beam.Flatten()

@attr('ValidatesRunner')
@pytest.mark.it_validatesrunner
def test_flatten_multiple_pcollections_having_multiple_consumers(self):
with TestPipeline() as pipeline:
input = pipeline | 'Start' >> beam.Create(['AA', 'BBB', 'CC'])
Expand Down
Loading

0 comments on commit 02a2222

Please sign in to comment.