From 9d2e1b055cf7ed02eba725b86d476c24a718178d Mon Sep 17 00:00:00 2001 From: Ned Batchelder Date: Tue, 3 Jan 2023 06:51:59 -0500 Subject: [PATCH] mypy: test_concurrency.py, test_python.py --- tests/mixins.py | 25 ++++++---- tests/test_concurrency.py | 102 ++++++++++++++++++++------------------ tests/test_python.py | 3 +- tox.ini | 5 +- 4 files changed, 76 insertions(+), 59 deletions(-) diff --git a/tests/mixins.py b/tests/mixins.py index 0f578637d..f8cc50850 100644 --- a/tests/mixins.py +++ b/tests/mixins.py @@ -13,6 +13,7 @@ import sys from typing import Iterator, Tuple +from typing import Iterable, Optional import pytest @@ -24,26 +25,26 @@ class PytestBase: """A base class to connect to pytest in a test class hierarchy.""" @pytest.fixture(autouse=True) - def connect_to_pytest(self, request, monkeypatch): + def connect_to_pytest(self, request, monkeypatch) -> None: """Captures pytest facilities for use by other test helpers.""" # pylint: disable=attribute-defined-outside-init self._pytest_request = request self._monkeypatch = monkeypatch self.setUp() - def setUp(self): + def setUp(self) -> None: """Per-test initialization. Override this as you wish.""" pass - def addCleanup(self, fn, *args): + def addCleanup(self, fn, *args) -> None: """Like unittest's addCleanup: code to call when the test is done.""" self._pytest_request.addfinalizer(lambda: fn(*args)) - def set_environ(self, name, value): + def set_environ(self, name, value) -> None: """Set an environment variable `name` to be `value`.""" self._monkeypatch.setenv(name, value) - def del_environ(self, name): + def del_environ(self, name) -> None: """Delete an environment variable, unless we set it.""" self._monkeypatch.delenv(name, raising=False) @@ -72,7 +73,13 @@ def _temp_dir(self, tmp_path_factory: pytest.TempPathFactory) -> Iterator[None]: else: yield - def make_file(self, filename, text="", bytes=b"", newline=None): + def make_file( + self, + filename: str, + text: str="", + bytes: bytes=b"", + newline: Optional[str]=None, + ) -> str: """Make a file. See `tests.helpers.make_file`""" # pylint: disable=redefined-builtin # bytes assert self.run_in_temp_dir, "Only use make_file when running in a temp dir" @@ -83,7 +90,7 @@ class RestoreModulesMixin: """Auto-restore the imported modules at the end of each test.""" @pytest.fixture(autouse=True) - def _module_saving(self): + def _module_saving(self) -> Iterable[None]: """Remove modules we imported during the test.""" self._sys_module_saver = SysModuleSaver() try: @@ -91,7 +98,7 @@ def _module_saving(self): finally: self._sys_module_saver.restore() - def clean_local_file_imports(self): + def clean_local_file_imports(self) -> None: """Clean up the results of calls to `import_local_file`. Use this if you need to `import_local_file` the same file twice in @@ -120,7 +127,7 @@ class StdStreamCapturingMixin: """ @pytest.fixture(autouse=True) - def _capcapsys(self, capsys): + def _capcapsys(self, capsys: pytest.CaptureFixture[str]) -> None: """Grab the fixture so our methods can use it.""" self.capsys = capsys diff --git a/tests/test_concurrency.py b/tests/test_concurrency.py index d08ed1efb..e6910dc8c 100644 --- a/tests/test_concurrency.py +++ b/tests/test_concurrency.py @@ -13,6 +13,9 @@ import threading import time +from types import ModuleType +from typing import Iterable, Optional + from flaky import flaky import pytest @@ -44,7 +47,7 @@ greenlet = None -def measurable_line(l): +def measurable_line(l: str) -> bool: """Is this a line of code coverage will measure? Not blank, not a comment, and not "else" @@ -59,12 +62,12 @@ def measurable_line(l): return True -def line_count(s): +def line_count(s: str) -> int: """How many measurable lines are in `s`?""" return len(list(filter(measurable_line, s.splitlines()))) -def print_simple_annotation(code, linenos): +def print_simple_annotation(code: str, linenos: Iterable[int]) -> None: """Print the lines in `code` with X for each line number in `linenos`.""" for lineno, line in enumerate(code.splitlines(), start=1): print(" {} {}".format("X" if lineno in linenos else " ", line)) @@ -75,7 +78,7 @@ class LineCountTest(CoverageTest): run_in_temp_dir = False - def test_line_count(self): + def test_line_count(self) -> None: CODE = """ # Hey there! x = 1 @@ -169,7 +172,7 @@ def sum_range(limit): """ -def cant_trace_msg(concurrency, the_module): +def cant_trace_msg(concurrency: str, the_module: Optional[ModuleType]) -> Optional[str]: """What might coverage.py say about a concurrency setting and imported module?""" # In the concurrency choices, "multiprocessing" doesn't count, so remove it. if "multiprocessing" in concurrency: @@ -197,7 +200,13 @@ class ConcurrencyTest(CoverageTest): QLIMIT = 1000 - def try_some_code(self, code, concurrency, the_module, expected_out=None): + def try_some_code( + self, + code: str, + concurrency: str, + the_module: ModuleType, + expected_out: Optional[str]=None, + ) -> None: """Run some concurrency testing code and see that it was all covered. `code` is the Python code to execute. `concurrency` is the name of @@ -232,39 +241,40 @@ def try_some_code(self, code, concurrency, the_module, expected_out=None): # If the test fails, it's helpful to see this info: fname = abs_file("try_it.py") linenos = data.lines(fname) + assert linenos is not None print(f"{len(linenos)}: {linenos}") print_simple_annotation(code, linenos) lines = line_count(code) assert line_counts(data)['try_it.py'] == lines - def test_threads(self): + def test_threads(self) -> None: code = (THREAD + SUM_RANGE_Q + PRINT_SUM_RANGE).format(QLIMIT=self.QLIMIT) self.try_some_code(code, "thread", threading) - def test_threads_simple_code(self): + def test_threads_simple_code(self) -> None: code = SIMPLE.format(QLIMIT=self.QLIMIT) self.try_some_code(code, "thread", threading) - def test_eventlet(self): + def test_eventlet(self) -> None: code = (EVENTLET + SUM_RANGE_Q + PRINT_SUM_RANGE).format(QLIMIT=self.QLIMIT) self.try_some_code(code, "eventlet", eventlet) - def test_eventlet_simple_code(self): + def test_eventlet_simple_code(self) -> None: code = SIMPLE.format(QLIMIT=self.QLIMIT) self.try_some_code(code, "eventlet", eventlet) # https://github.com/nedbat/coveragepy/issues/663 @pytest.mark.skipif(env.WINDOWS, reason="gevent has problems on Windows: #663") - def test_gevent(self): + def test_gevent(self) -> None: code = (GEVENT + SUM_RANGE_Q + PRINT_SUM_RANGE).format(QLIMIT=self.QLIMIT) self.try_some_code(code, "gevent", gevent) - def test_gevent_simple_code(self): + def test_gevent_simple_code(self) -> None: code = SIMPLE.format(QLIMIT=self.QLIMIT) self.try_some_code(code, "gevent", gevent) - def test_greenlet(self): + def test_greenlet(self) -> None: GREENLET = """\ from greenlet import greenlet @@ -282,11 +292,11 @@ def test2(u): """ self.try_some_code(GREENLET, "greenlet", greenlet, "hello world\n42\n") - def test_greenlet_simple_code(self): + def test_greenlet_simple_code(self) -> None: code = SIMPLE.format(QLIMIT=self.QLIMIT) self.try_some_code(code, "greenlet", greenlet) - def test_bug_330(self): + def test_bug_330(self) -> None: BUG_330 = """\ from weakref import WeakKeyDictionary import eventlet @@ -304,7 +314,7 @@ def do(): """ self.try_some_code(BUG_330, "eventlet", eventlet, "0\n") - def test_threads_with_gevent(self): + def test_threads_with_gevent(self) -> None: self.make_file("both.py", """\ import queue import threading @@ -345,25 +355,25 @@ def gwork(q): last_line = self.squeezed_lines(out)[-1] assert re.search(r"TOTAL \d+ 0 100%", last_line) - def test_bad_concurrency(self): + def test_bad_concurrency(self) -> None: with pytest.raises(ConfigError, match="Unknown concurrency choices: nothing"): self.command_line("run --concurrency=nothing prog.py") - def test_bad_concurrency_in_config(self): + def test_bad_concurrency_in_config(self) -> None: self.make_file(".coveragerc", "[run]\nconcurrency = nothing\n") with pytest.raises(ConfigError, match="Unknown concurrency choices: nothing"): self.command_line("run prog.py") - def test_no_multiple_light_concurrency(self): + def test_no_multiple_light_concurrency(self) -> None: with pytest.raises(ConfigError, match="Conflicting concurrency settings: eventlet, gevent"): self.command_line("run --concurrency=gevent,eventlet prog.py") - def test_no_multiple_light_concurrency_in_config(self): + def test_no_multiple_light_concurrency_in_config(self) -> None: self.make_file(".coveragerc", "[run]\nconcurrency = gevent, eventlet\n") with pytest.raises(ConfigError, match="Conflicting concurrency settings: eventlet, gevent"): self.command_line("run prog.py") - def test_multiprocessing_needs_config_file(self): + def test_multiprocessing_needs_config_file(self) -> None: with pytest.raises(ConfigError, match="multiprocessing requires a configuration file"): self.command_line("run --concurrency=multiprocessing prog.py") @@ -372,9 +382,9 @@ class WithoutConcurrencyModuleTest(CoverageTest): """Tests of what happens if the requested concurrency isn't installed.""" @pytest.mark.parametrize("module", ["eventlet", "gevent", "greenlet"]) - def test_missing_module(self, module): + def test_missing_module(self, module: str) -> None: self.make_file("prog.py", "a = 1") - sys.modules[module] = None + sys.modules[module] = None # type: ignore[assignment] msg = f"Couldn't trace with concurrency={module}, the module isn't installed." with pytest.raises(ConfigError, match=msg): self.command_line(f"run --concurrency={module} prog.py") @@ -428,9 +438,9 @@ def process_worker_main(args): @pytest.fixture(params=["fork", "spawn"], name="start_method") -def start_method_fixture(request): +def start_method_fixture(request: pytest.FixtureRequest) -> str: """Parameterized fixture to choose the start_method for multiprocessing.""" - start_method = request.param + start_method: str = request.param if start_method not in multiprocessing.get_all_start_methods(): # Windows doesn't support "fork". pytest.skip(f"start_method={start_method} not supported here") @@ -443,14 +453,14 @@ class MultiprocessingTest(CoverageTest): def try_multiprocessing_code( self, - code, - expected_out, - the_module, - nprocs, - start_method, - concurrency="multiprocessing", - args="", - ): + code: str, + expected_out: Optional[str], + the_module: ModuleType, + nprocs: int, + start_method: str, + concurrency: str="multiprocessing", + args: str="", + ) -> None: """Run code using multiprocessing, it should produce `expected_out`.""" self.make_file("multi.py", code) self.make_file(".coveragerc", f"""\ @@ -459,9 +469,7 @@ def try_multiprocessing_code( source = . """) - cmd = "coverage run {args} multi.py {start_method}".format( - args=args, start_method=start_method, - ) + cmd = f"coverage run {args} multi.py {start_method}" out = self.run_command(cmd) expected_cant_trace = cant_trace_msg(concurrency, the_module) @@ -489,7 +497,7 @@ def try_multiprocessing_code( last_line = self.squeezed_lines(out)[-1] assert re.search(r"TOTAL \d+ 0 100%", last_line) - def test_multiprocessing_simple(self, start_method): + def test_multiprocessing_simple(self, start_method: str) -> None: nprocs = 3 upto = 30 code = (SQUARE_OR_CUBE_WORK + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto) @@ -503,7 +511,7 @@ def test_multiprocessing_simple(self, start_method): start_method=start_method, ) - def test_multiprocessing_append(self, start_method): + def test_multiprocessing_append(self, start_method: str) -> None: nprocs = 3 upto = 30 code = (SQUARE_OR_CUBE_WORK + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto) @@ -518,7 +526,7 @@ def test_multiprocessing_append(self, start_method): start_method=start_method, ) - def test_multiprocessing_and_gevent(self, start_method): + def test_multiprocessing_and_gevent(self, start_method: str) -> None: nprocs = 3 upto = 30 code = ( @@ -535,7 +543,7 @@ def test_multiprocessing_and_gevent(self, start_method): start_method=start_method, ) - def test_multiprocessing_with_branching(self, start_method): + def test_multiprocessing_with_branching(self, start_method: str) -> None: nprocs = 3 upto = 30 code = (SQUARE_OR_CUBE_WORK + MULTI_CODE).format(NPROCS=nprocs, UPTO=upto) @@ -559,7 +567,7 @@ def test_multiprocessing_with_branching(self, start_method): last_line = self.squeezed_lines(out)[-1] assert re.search(r"TOTAL \d+ 0 \d+ 0 100%", last_line) - def test_multiprocessing_bootstrap_error_handling(self): + def test_multiprocessing_bootstrap_error_handling(self) -> None: # An exception during bootstrapping will be reported. self.make_file("multi.py", """\ import multiprocessing @@ -576,7 +584,7 @@ def test_multiprocessing_bootstrap_error_handling(self): assert "Exception during multiprocessing bootstrap init" in out assert "Exception: Crashing because called by _bootstrap" in out - def test_bug_890(self): + def test_bug_890(self) -> None: # chdir in multiprocessing shouldn't keep us from finding the # .coveragerc file. self.make_file("multi.py", """\ @@ -596,11 +604,11 @@ def test_bug_890(self): assert out.splitlines()[-1] == "ok" -def test_coverage_stop_in_threads(): +def test_coverage_stop_in_threads() -> None: has_started_coverage = [] has_stopped_coverage = [] - def run_thread(): # pragma: nested + def run_thread() -> None: # pragma: nested """Check that coverage is stopping properly in threads.""" deadline = time.time() + 5 ident = threading.current_thread().ident @@ -648,7 +656,7 @@ def test_thread_safe_save_data(tmp_path: pathlib.Path) -> None: for module_name in module_names: import_local_file(module_name) - def random_load(): # pragma: nested + def random_load() -> None: # pragma: nested """Import modules randomly to stress coverage.""" while should_run[0]: module_name = random.choice(module_names) @@ -695,7 +703,7 @@ class SigtermTest(CoverageTest): """Tests of our handling of SIGTERM.""" @pytest.mark.parametrize("sigterm", [False, True]) - def test_sigterm_saves_data(self, sigterm): + def test_sigterm_saves_data(self, sigterm: bool) -> None: # A terminated process should save its coverage data. self.make_file("clobbered.py", """\ import multiprocessing @@ -741,7 +749,7 @@ def subproc(x): expected = "clobbered.py 17 5 71% 5-10" assert self.squeezed_lines(out)[2] == expected - def test_sigterm_still_runs(self): + def test_sigterm_still_runs(self) -> None: # A terminated process still runs its own SIGTERM handler. self.make_file("handler.py", """\ import multiprocessing diff --git a/tests/test_python.py b/tests/test_python.py index c8c58f4ef..14dbebef8 100644 --- a/tests/test_python.py +++ b/tests/test_python.py @@ -24,13 +24,14 @@ class GetZipBytesTest(CoverageTest): "encoding", ["utf-8", "gb2312", "hebrew", "shift_jis", "cp1252"], ) - def test_get_encoded_zip_files(self, encoding): + def test_get_encoded_zip_files(self, encoding: str) -> None: # See igor.py, do_zipmods, for the text of these files. zip_file = "tests/zipmods.zip" sys.path.append(zip_file) # So we can import the files. filename = zip_file + "/encoded_" + encoding + ".py" filename = os_sep(filename) zip_data = get_zip_bytes(filename) + assert zip_data is not None zip_text = zip_data.decode(encoding) assert 'All OK' in zip_text # Run the code to see that we really got it encoded properly. diff --git a/tox.ini b/tox.ini index a5cf258cb..fc255dc25 100644 --- a/tox.ini +++ b/tox.ini @@ -101,8 +101,9 @@ setenv = C_FN=coverage/files.py coverage/inorout.py coverage/jsonreport.py coverage/lcovreport.py coverage/multiproc.py coverage/numbits.py C_OP=coverage/parser.py coverage/phystokens.py coverage/plugin.py coverage/plugin_support.py coverage/python.py C_QZ=coverage/report.py coverage/results.py coverage/sqldata.py coverage/tomlconfig.py coverage/types.py coverage/version.py coverage/xmlreport.py - T_AN=tests/test_annotate.py tests/test_api.py tests/test_arcs.py tests/test_cmdline.py tests/test_collector.py tests/goldtest.py tests/helpers.py tests/test_html.py tests/test_xml.py - TYPEABLE={env:C__B} {env:C_CC} {env:C_DE} {env:C_FN} {env:C_OP} {env:C_QZ} {env:T_AN} + T_AC=tests/test_annotate.py tests/test_api.py tests/test_arcs.py tests/test_cmdline.py tests/test_collector.py tests/test_concurrency.py + T_DZ=tests/goldtest.py tests/helpers.py tests/test_html.py tests/test_python.py tests/test_xml.py + TYPEABLE={env:C__B} {env:C_CC} {env:C_DE} {env:C_FN} {env:C_OP} {env:C_QZ} {env:T_AC} {env:T_DZ} commands = # PYVERSIONS