diff --git a/hnn_core/tests/conftest.py b/hnn_core/tests/conftest.py new file mode 100644 index 000000000..38bfd5503 --- /dev/null +++ b/hnn_core/tests/conftest.py @@ -0,0 +1,59 @@ +""" Example from pytest documentation + +https://pytest.org/en/stable/example/simple.html#incremental-testing-test-steps +""" + +from typing import Dict, Tuple +import pytest + +# store history of failures per test class name and per index in parametrize +# (if parametrize used) +_test_failed_incremental: Dict[str, Dict[Tuple[int, ...], str]] = {} + + +def pytest_runtest_makereport(item, call): + + if "incremental" in item.keywords: + # incremental marker is used + if call.excinfo is not None and not call.excinfo.typename == "Skipped": + # the test has failed, but was not skiped + + # retrieve the class name of the test + cls_name = str(item.cls) + # retrieve the index of the test (if parametrize is used in + # combination with incremental) + parametrize_index = ( + tuple(item.callspec.indices.values()) + if hasattr(item, "callspec") + else () + ) + # retrieve the name of the test function + test_name = item.originalname or item.name + # store in _test_failed_incremental the original name of the + # failed test + _test_failed_incremental.setdefault(cls_name, {}).setdefault( + parametrize_index, test_name + ) + + +def pytest_runtest_setup(item): + if "incremental" in item.keywords: + # retrieve the class name of the test + cls_name = str(item.cls) + # check if a previous test has failed for this class + if cls_name in _test_failed_incremental: + # retrieve the index of the test (if parametrize is used in + # combination with incremental) + parametrize_index = ( + tuple(item.callspec.indices.values()) + if hasattr(item, "callspec") + else () + ) + # retrieve the name of the first test function to fail for this + # class name and index + test_name = _test_failed_incremental[cls_name].get( + parametrize_index, None) + # if name found, test has failed for the combination of class name + # and test name + if test_name is not None: + pytest.xfail("previous test failed ({})".format(test_name)) diff --git a/hnn_core/tests/test_mpi_child.py b/hnn_core/tests/test_mpi_child.py index 0199f5750..f0df9e68f 100644 --- a/hnn_core/tests/test_mpi_child.py +++ b/hnn_core/tests/test_mpi_child.py @@ -57,10 +57,19 @@ def test_empty_data(): def test_data_len_mismatch(): - """Test that an unexpected data length raises RuntimeError""" - data_bytes = b'\0' - expected_len = 2 + """Test that padded data can be unpickled with warning for length """ + + with MPISimulation(skip_mpi_import=True) as mpi_sim: + pickled_bytes = mpi_sim._pickle_data({}) + + expected_len = len(pickled_bytes) + 1 + backend = MPIBackend() - with pytest.raises(RuntimeError, match="Failed to receive all data from " - "the child MPI process. Expecting 2 bytes, got 1"): - backend._process_child_data(data_bytes, expected_len) + with pytest.warns(UserWarning) as record: + backend._process_child_data(pickled_bytes, expected_len) + + expected_string = "Length of received data unexpected. " + \ + "Expecting %d bytes, got %d" % (expected_len, len(pickled_bytes)) + + assert len(record) == 1 + assert record[0].message.args[0] == expected_string diff --git a/hnn_core/tests/test_parallel_backends.py b/hnn_core/tests/test_parallel_backends.py index df7fddf72..c33f99b32 100644 --- a/hnn_core/tests/test_parallel_backends.py +++ b/hnn_core/tests/test_parallel_backends.py @@ -13,8 +13,37 @@ from hnn_core import MPIBackend, JoblibBackend +def run_hnn_core_reduced(backend=None, n_jobs=1): + hnn_core_root = op.dirname(hnn_core.__file__) + + # default params + params_fname = op.join(hnn_core_root, 'param', 'default.json') + params = read_params(params_fname) + params_reduced = params.copy() + params_reduced.update({'N_pyr_x': 3, + 'N_pyr_y': 3, + 'tstop': 25, + 't_evprox_1': 5, + 't_evdist_1': 10, + 't_evprox_2': 20, + 'N_trials': 2}) + + # run the simulation a reduced model (2 trials) + net_reduced = Network(params_reduced) + + if backend == 'mpi': + with MPIBackend(mpi_cmd='mpiexec'): + dpls_reduced = simulate_dipole(net_reduced) + elif backend == 'joblib': + with JoblibBackend(n_jobs=n_jobs): + dpls_reduced = simulate_dipole(net_reduced) + else: + dpls_reduced = simulate_dipole(net_reduced) + + return dpls_reduced + + def run_hnn_core(backend=None, n_jobs=1): - """Test to check if hnn-core does not break.""" # small snippet of data on data branch for now. To be deleted # later. Data branch should have only commit so it does not # pollute the history. @@ -29,30 +58,18 @@ def run_hnn_core(backend=None, n_jobs=1): # default params params_fname = op.join(hnn_core_root, 'param', 'default.json') params = read_params(params_fname) - params_reduced = params.copy() - params_reduced.update({'N_pyr_x': 3, - 'N_pyr_y': 3, - 'tstop': 25, - 't_evprox_1': 5, - 't_evdist_1': 10, - 't_evprox_2': 20, - 'N_trials': 2}) - # run the simulation on full model (1 trial) and a reduced model (2 trials) + # run the simulation on full model (1 trial) net = Network(params) - net_reduced = Network(params_reduced) if backend == 'mpi': with MPIBackend(mpi_cmd='mpiexec'): dpl = simulate_dipole(net)[0] - dpls_reduced = simulate_dipole(net_reduced) elif backend == 'joblib': with JoblibBackend(n_jobs=n_jobs): dpl = simulate_dipole(net)[0] - dpls_reduced = simulate_dipole(net_reduced) else: dpl = simulate_dipole(net)[0] - dpls_reduced = simulate_dipole(net_reduced) # write the dipole to a file and compare fname = './dpl2.txt' @@ -79,65 +96,78 @@ def run_hnn_core(backend=None, n_jobs=1): 'L5_basket': 85, 'evdist1': 234, 'evprox2': 269} - return dpls_reduced - - -def test_compare_across_backends(): - """Test that trials are generated consistently across parallel backends.""" - - # test consistency between default backend simulation and master - dpls_reduced_default = run_hnn_core(None) - - try: - import mpi4py - mpi4py.__file__ - # test consistency between mpi backend simulation & master - dpls_reduced_mpi = run_hnn_core(backend='mpi') - except ImportError: - print("Skipping MPIBackend test and dipole comparison because mpi4py " - "could not be imported...") - dpls_reduced_mpi = None - - # test consistency between joblib backend simulation (n_jobs=2) with master - dpls_reduced_joblib = run_hnn_core(backend='joblib', n_jobs=2) - # test consistency across all parallel backends for multiple trials - assert_raises(AssertionError, assert_array_equal, - dpls_reduced_default[0].data['agg'], - dpls_reduced_default[1].data['agg']) - for trial_idx in range(len(dpls_reduced_default)): - # account for rounding error incured during MPI parallelization - if dpls_reduced_mpi: +# The purpose of this incremental mark is to avoid running the full length +# simulation when there are failures in previous (faster) tests. When a test +# in the sequence fails, all subsequent tests will be marked "xfailed" rather +# than skipped. + + +@pytest.mark.incremental +class TestParallelBackends(): + dpls_reduced_mpi = None + dpls_reduced_default = None + dpls_reduced_joblib = None + + def test_run_default(self): + """Test consistency between default backend simulation and master""" + global dpls_reduced_default + dpls_reduced_default = run_hnn_core_reduced(None) + # test consistency across all parallel backends for multiple trials + assert_raises(AssertionError, assert_array_equal, + dpls_reduced_default[0].data['agg'], + dpls_reduced_default[1].data['agg']) + + def test_run_joblibbackend(self): + """Test consistency between joblib backend simulation with master""" + global dpls_reduced_default, dpls_reduced_joblib + + dpls_reduced_joblib = run_hnn_core_reduced(backend='joblib', n_jobs=2) + + for trial_idx in range(len(dpls_reduced_default)): + assert_array_equal(dpls_reduced_default[trial_idx].data['agg'], + dpls_reduced_joblib[trial_idx].data['agg']) + + def test_mpi_nprocs(self): + """Test that MPIBackend can use more than 1 processor""" + # if only 1 processor is available, then MPIBackend tests will not + # be valid + pytest.importorskip("mpi4py", reason="mpi4py not available") + + backend = MPIBackend() + assert backend.n_procs > 1 + + def test_run_mpibackend(self): + global dpls_reduced_default, dpls_reduced_mpi + pytest.importorskip("mpi4py", reason="mpi4py not available") + dpls_reduced_mpi = run_hnn_core_reduced(backend='mpi') + for trial_idx in range(len(dpls_reduced_default)): + # account for rounding error incured during MPI parallelization assert_allclose(dpls_reduced_default[trial_idx].data['agg'], dpls_reduced_mpi[trial_idx].data['agg'], rtol=0, atol=1e-14) - assert_array_equal(dpls_reduced_default[trial_idx].data['agg'], - dpls_reduced_joblib[trial_idx].data['agg']) + def test_compare_hnn_core(self): + """Test to check if hnn-core does not break.""" + # run one trial of each + run_hnn_core(backend='mpi') + run_hnn_core(backend='joblib') + +# there are no dependencies if this unit tests fails, so not necessary to +# be part of incremental class def test_mpi_failure(): - """Test that an MPI failure is handled and error messages pass through""" + """Test that an MPI failure is handled and messages are printed""" pytest.importorskip("mpi4py", reason="mpi4py not available") - # this MPI paramter will cause a MPI job with more than one process to fail + # this MPI paramter will cause a MPI job to fail environ["OMPI_MCA_btl"] = "self" with io.StringIO() as buf, redirect_stdout(buf): with pytest.raises(RuntimeError, match="MPI simulation failed"): - run_hnn_core(backend='mpi') + run_hnn_core_reduced(backend='mpi') stdout = buf.getvalue() assert "MPI processes are unable to reach each other" in stdout del environ["OMPI_MCA_btl"] - - -def test_mpi_nprocs(): - """Test that MPIBackend can use more than 1 processor""" - - # if only 1 processor is available, then MPIBackend tests will not - # be valid - pytest.importorskip("mpi4py", reason="mpi4py not available") - - backend = MPIBackend() - assert backend.n_procs > 1 diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 000000000..c00948689 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +markers = + incremental: run tests with prerequisites in incremental order