Skip to content

Commit

Permalink
TST: be smart about tests to run after a failure
Browse files Browse the repository at this point in the history
Use pytest incremental marker to skip running full length simulation
if reduced simulations fail.
  • Loading branch information
Blake Caldwell authored and jasmainak committed Nov 11, 2020
1 parent 0c4beea commit 2b217e2
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 64 deletions.
59 changes: 59 additions & 0 deletions hnn_core/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
""" Example from pytest documentation
https://pytest.org/en/stable/example/simple.html#incremental-testing-test-steps
"""

from typing import Dict, Tuple
import pytest

# store history of failures per test class name and per index in parametrize
# (if parametrize used)
_test_failed_incremental: Dict[str, Dict[Tuple[int, ...], str]] = {}


def pytest_runtest_makereport(item, call):

if "incremental" in item.keywords:
# incremental marker is used
if call.excinfo is not None and not call.excinfo.typename == "Skipped":
# the test has failed, but was not skiped

# retrieve the class name of the test
cls_name = str(item.cls)
# retrieve the index of the test (if parametrize is used in
# combination with incremental)
parametrize_index = (
tuple(item.callspec.indices.values())
if hasattr(item, "callspec")
else ()
)
# retrieve the name of the test function
test_name = item.originalname or item.name
# store in _test_failed_incremental the original name of the
# failed test
_test_failed_incremental.setdefault(cls_name, {}).setdefault(
parametrize_index, test_name
)


def pytest_runtest_setup(item):
if "incremental" in item.keywords:
# retrieve the class name of the test
cls_name = str(item.cls)
# check if a previous test has failed for this class
if cls_name in _test_failed_incremental:
# retrieve the index of the test (if parametrize is used in
# combination with incremental)
parametrize_index = (
tuple(item.callspec.indices.values())
if hasattr(item, "callspec")
else ()
)
# retrieve the name of the first test function to fail for this
# class name and index
test_name = _test_failed_incremental[cls_name].get(
parametrize_index, None)
# if name found, test has failed for the combination of class name
# and test name
if test_name is not None:
pytest.xfail("previous test failed ({})".format(test_name))
21 changes: 15 additions & 6 deletions hnn_core/tests/test_mpi_child.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,19 @@ def test_empty_data():


def test_data_len_mismatch():
"""Test that an unexpected data length raises RuntimeError"""
data_bytes = b'\0'
expected_len = 2
"""Test that padded data can be unpickled with warning for length """

with MPISimulation(skip_mpi_import=True) as mpi_sim:
pickled_bytes = mpi_sim._pickle_data({})

expected_len = len(pickled_bytes) + 1

backend = MPIBackend()
with pytest.raises(RuntimeError, match="Failed to receive all data from "
"the child MPI process. Expecting 2 bytes, got 1"):
backend._process_child_data(data_bytes, expected_len)
with pytest.warns(UserWarning) as record:
backend._process_child_data(pickled_bytes, expected_len)

expected_string = "Length of received data unexpected. " + \
"Expecting %d bytes, got %d" % (expected_len, len(pickled_bytes))

assert len(record) == 1
assert record[0].message.args[0] == expected_string
146 changes: 88 additions & 58 deletions hnn_core/tests/test_parallel_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,37 @@
from hnn_core import MPIBackend, JoblibBackend


def run_hnn_core_reduced(backend=None, n_jobs=1):
hnn_core_root = op.dirname(hnn_core.__file__)

# default params
params_fname = op.join(hnn_core_root, 'param', 'default.json')
params = read_params(params_fname)
params_reduced = params.copy()
params_reduced.update({'N_pyr_x': 3,
'N_pyr_y': 3,
'tstop': 25,
't_evprox_1': 5,
't_evdist_1': 10,
't_evprox_2': 20,
'N_trials': 2})

# run the simulation a reduced model (2 trials)
net_reduced = Network(params_reduced)

if backend == 'mpi':
with MPIBackend(mpi_cmd='mpiexec'):
dpls_reduced = simulate_dipole(net_reduced)
elif backend == 'joblib':
with JoblibBackend(n_jobs=n_jobs):
dpls_reduced = simulate_dipole(net_reduced)
else:
dpls_reduced = simulate_dipole(net_reduced)

return dpls_reduced


def run_hnn_core(backend=None, n_jobs=1):
"""Test to check if hnn-core does not break."""
# small snippet of data on data branch for now. To be deleted
# later. Data branch should have only commit so it does not
# pollute the history.
Expand All @@ -29,30 +58,18 @@ def run_hnn_core(backend=None, n_jobs=1):
# default params
params_fname = op.join(hnn_core_root, 'param', 'default.json')
params = read_params(params_fname)
params_reduced = params.copy()
params_reduced.update({'N_pyr_x': 3,
'N_pyr_y': 3,
'tstop': 25,
't_evprox_1': 5,
't_evdist_1': 10,
't_evprox_2': 20,
'N_trials': 2})

# run the simulation on full model (1 trial) and a reduced model (2 trials)
# run the simulation on full model (1 trial)
net = Network(params)
net_reduced = Network(params_reduced)

if backend == 'mpi':
with MPIBackend(mpi_cmd='mpiexec'):
dpl = simulate_dipole(net)[0]
dpls_reduced = simulate_dipole(net_reduced)
elif backend == 'joblib':
with JoblibBackend(n_jobs=n_jobs):
dpl = simulate_dipole(net)[0]
dpls_reduced = simulate_dipole(net_reduced)
else:
dpl = simulate_dipole(net)[0]
dpls_reduced = simulate_dipole(net_reduced)

# write the dipole to a file and compare
fname = './dpl2.txt'
Expand All @@ -79,65 +96,78 @@ def run_hnn_core(backend=None, n_jobs=1):
'L5_basket': 85,
'evdist1': 234,
'evprox2': 269}
return dpls_reduced


def test_compare_across_backends():
"""Test that trials are generated consistently across parallel backends."""

# test consistency between default backend simulation and master
dpls_reduced_default = run_hnn_core(None)

try:
import mpi4py
mpi4py.__file__
# test consistency between mpi backend simulation & master
dpls_reduced_mpi = run_hnn_core(backend='mpi')
except ImportError:
print("Skipping MPIBackend test and dipole comparison because mpi4py "
"could not be imported...")
dpls_reduced_mpi = None

# test consistency between joblib backend simulation (n_jobs=2) with master
dpls_reduced_joblib = run_hnn_core(backend='joblib', n_jobs=2)

# test consistency across all parallel backends for multiple trials
assert_raises(AssertionError, assert_array_equal,
dpls_reduced_default[0].data['agg'],
dpls_reduced_default[1].data['agg'])

for trial_idx in range(len(dpls_reduced_default)):
# account for rounding error incured during MPI parallelization
if dpls_reduced_mpi:
# The purpose of this incremental mark is to avoid running the full length
# simulation when there are failures in previous (faster) tests. When a test
# in the sequence fails, all subsequent tests will be marked "xfailed" rather
# than skipped.


@pytest.mark.incremental
class TestParallelBackends():
dpls_reduced_mpi = None
dpls_reduced_default = None
dpls_reduced_joblib = None

def test_run_default(self):
"""Test consistency between default backend simulation and master"""
global dpls_reduced_default
dpls_reduced_default = run_hnn_core_reduced(None)
# test consistency across all parallel backends for multiple trials
assert_raises(AssertionError, assert_array_equal,
dpls_reduced_default[0].data['agg'],
dpls_reduced_default[1].data['agg'])

def test_run_joblibbackend(self):
"""Test consistency between joblib backend simulation with master"""
global dpls_reduced_default, dpls_reduced_joblib

dpls_reduced_joblib = run_hnn_core_reduced(backend='joblib', n_jobs=2)

for trial_idx in range(len(dpls_reduced_default)):
assert_array_equal(dpls_reduced_default[trial_idx].data['agg'],
dpls_reduced_joblib[trial_idx].data['agg'])

def test_mpi_nprocs(self):
"""Test that MPIBackend can use more than 1 processor"""
# if only 1 processor is available, then MPIBackend tests will not
# be valid
pytest.importorskip("mpi4py", reason="mpi4py not available")

backend = MPIBackend()
assert backend.n_procs > 1

def test_run_mpibackend(self):
global dpls_reduced_default, dpls_reduced_mpi
pytest.importorskip("mpi4py", reason="mpi4py not available")
dpls_reduced_mpi = run_hnn_core_reduced(backend='mpi')
for trial_idx in range(len(dpls_reduced_default)):
# account for rounding error incured during MPI parallelization
assert_allclose(dpls_reduced_default[trial_idx].data['agg'],
dpls_reduced_mpi[trial_idx].data['agg'], rtol=0,
atol=1e-14)
assert_array_equal(dpls_reduced_default[trial_idx].data['agg'],
dpls_reduced_joblib[trial_idx].data['agg'])

def test_compare_hnn_core(self):
"""Test to check if hnn-core does not break."""
# run one trial of each
run_hnn_core(backend='mpi')
run_hnn_core(backend='joblib')


# there are no dependencies if this unit tests fails, so not necessary to
# be part of incremental class
def test_mpi_failure():
"""Test that an MPI failure is handled and error messages pass through"""
"""Test that an MPI failure is handled and messages are printed"""
pytest.importorskip("mpi4py", reason="mpi4py not available")

# this MPI paramter will cause a MPI job with more than one process to fail
# this MPI paramter will cause a MPI job to fail
environ["OMPI_MCA_btl"] = "self"

with io.StringIO() as buf, redirect_stdout(buf):
with pytest.raises(RuntimeError, match="MPI simulation failed"):
run_hnn_core(backend='mpi')
run_hnn_core_reduced(backend='mpi')
stdout = buf.getvalue()
assert "MPI processes are unable to reach each other" in stdout

del environ["OMPI_MCA_btl"]


def test_mpi_nprocs():
"""Test that MPIBackend can use more than 1 processor"""

# if only 1 processor is available, then MPIBackend tests will not
# be valid
pytest.importorskip("mpi4py", reason="mpi4py not available")

backend = MPIBackend()
assert backend.n_procs > 1
3 changes: 3 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[pytest]
markers =
incremental: run tests with prerequisites in incremental order

0 comments on commit 2b217e2

Please sign in to comment.