Skip to content

Commit

Permalink
test(ldfi): fix unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Jan 18, 2021
1 parent f5a3a66 commit 788d661
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 62 deletions.
53 changes: 37 additions & 16 deletions src/ldfi/ldfi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,23 @@ def load(self, config: Config) -> Data:
crashes = set()

for run_id in config.run_ids:
prods = []
self.c.execute("""SELECT faults FROM faults
WHERE test_id = (?)
AND run_id = (?)""", (config.test_id, run_id))
for r in self.c:
for fault in eval(r['faults'])['faults']:
# NOTE: eval introduces a space after the colon in a
# dict, we need to remove this otherwise the variables
# of the SAT expression will differ.
prods.append(str(fault).replace(": ", ":"))
logging.debug("fault: '%s'", str(fault).replace(": ", ":"))
previous_faults.append(prods)
# The last run id will not have any previous faults.
if run_id != config.run_ids[-1]:
prods = []
self.c.execute("""SELECT faults FROM faults
WHERE test_id = (?)
AND run_id = (?)""", (config.test_id, run_id))
for r in self.c:
for fault in eval(r['faults'])['faults']:
# NOTE: eval introduces a space after the colon in a
# dict, we need to remove this otherwise the variables
# of the SAT expression will differ.
prods.append(str(fault).replace(": ", ":"))
logging.debug("fault: '%s'", str(fault).replace(": ", ":"))
previous_faults.append(prods)

sums = []
logging.debug("previous_faults: %s", previous_faults)
self.c.execute("""SELECT * FROM network_trace
WHERE test_id = (?)
AND run_id = (?)
Expand All @@ -109,6 +112,7 @@ def load(self, config: Config) -> Data:
omission = ("{'kind':'omission', 'from':'%s', 'to':'%s', 'at':%d}"
% (r['from'], r['to'], r['at']))
if omission not in previous_faults:
logging.debug("added: %s", omission)
sums.append(omission)
if config.max_crashes > 0:
crash = ("{'kind':'crash', 'from':'%s', 'at':%d}"
Expand All @@ -118,6 +122,7 @@ def load(self, config: Config) -> Data:
crashes.add(crash)
potential_faults.append(sums)

logging.debug("potential_faults: %s", potential_faults)
return Data(previous_faults, potential_faults, crashes)

def store(self, event: Event):
Expand All @@ -133,19 +138,30 @@ def sanity_check(data):
else:
len_previous_faults = len(data.previous_faults)

logging.debug("data.previous_faults: %s (len: %d)",
data.previous_faults, len_previous_faults)
logging.debug("data.potential_faults: %s (len: %d)",
data.potential_faults, len(data.potential_faults) + 1)

assert(len(data.potential_faults) == len_previous_faults + 1)

# previous_faults = [[oab1]]
# potential_faults = [[oab1, oac1], [oac1]]
# data.potential_faults[-1] + data.previous_faults[-1] = data.potential_faults[-2]

def create_sat_formula(config, data):
potential_faults = []
for i, sum in enumerate(data.potential_faults):
logging.debug("i: %d", i)
kept = z3.Bools(sum)
logging.debug("kept: " + str(kept))
drop = []
if data.previous_faults[i-1]:
if i != 0 and data.previous_faults[i-1]:
drop = z3.Bools(data.previous_faults[i-1])
logging.debug("drop: " + str(drop))
if drop:
potential_faults.append(z3.Or(z3.Or(kept), z3.Not(z3.And(drop))))
potential_faults.append(z3.Or(z3.Or(kept),
z3.Not(z3.And(drop))))
else:
potential_faults.append(z3.Or(kept))

Expand All @@ -163,10 +179,15 @@ def sat_solve(formula):
solver = z3.Solver()
solver.add(formula)
result = solver.check()
model = solver.model()
statistics = solver.statistics()
if result == z3.sat:
model = solver.model()
statistics = solver.statistics()
else:
model = None
statistics = None
return (result, model, statistics)


def order(d: dict) -> str:
return("%s %s %s %d" % (d['kind'], d['from'], d.get('to', ""), d['at']))

Expand Down
2 changes: 1 addition & 1 deletion src/ldfi/release.nix
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ pythonPackages.buildPythonApplication rec {
checkInputs = with pythonPackages; [ pytest pytestrunner ];
propagatedBuildInputs = with pythonPackages; [ z3-solver setuptools setuptools_scm ];

checkPhase = "pytest";
checkPhase = "pytest --capture=tee-sys";
}
148 changes: 103 additions & 45 deletions src/ldfi/tests/test_ldfi.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,112 @@
import unittest
import logging
import ldfi
import z3
from z3 import (And, Or, Not, Bool)

class TestSortedFaults(unittest.TestCase):
def test_sorted_faults(self):
l = sorted([{"kind": "omission", "from": "frontend", "to": "register2", "at": 2},
{"kind": "crash", "from": "frontend", "at": 1},
{"kind": "crash", "from": "frontend", "at": 0},
{"kind": "omission", "from": "frontend", "to": "register2", "at": 1}],
key=ldfi.order)
assert(l ==
[{"kind": "crash", "from": "frontend", "at": 0},
def test_sorted_faults():
l = sorted([{"kind": "omission", "from": "frontend", "to": "register2", "at": 2},
{"kind": "crash", "from": "frontend", "at": 1},
{"kind": "omission", "from": "frontend", "to": "register2", "at": 1},
{"kind": "omission", "from": "frontend", "to": "register2", "at": 2}])
{"kind": "crash", "from": "frontend", "at": 0},
{"kind": "omission", "from": "frontend", "to": "register2", "at": 1}],
key=ldfi.order)
assert(l ==
[{"kind": "crash", "from": "frontend", "at": 0},
{"kind": "crash", "from": "frontend", "at": 1},
{"kind": "omission", "from": "frontend", "to": "register2", "at": 1},
{"kind": "omission", "from": "frontend", "to": "register2", "at": 2}])

def o(f, t, at):
return ('{"kind": "omission", "from": "%s", "to": "%s", "at": %d}' % (f, t, at))

class TestCreateFormula(unittest.TestCase):
def test_create_formula(self):
config = ldfi.Config(-1, [-1], 2, 0)
previous_faults = [[]]
oab1 = o("A", "B", 1)
oac1 = o("A", "C", 1)

# First run
potential_faults = [[oab1, oac1]]
crashes = set()
data = ldfi.Data(previous_faults, potential_faults, crashes)
ldfi.sanity_check(data)
formula = ldfi.create_sat_formula(config, data)
assert(formula == And(Or(Bool(oab1), Bool(oac1))))
(result, model, statistics) = ldfi.sat_solve(formula)
assert result == z3.sat
event = ldfi.create_log_event(config, result, model, statistics)
assert event.faults == ('{"faults": [%s]}' % oab1)

# Second run
previous_faults = [[oab1]]
potential_faults = [[oab1, oac1], [oac1]]
data = ldfi.Data(previous_faults, potential_faults, crashes)
ldfi.sanity_check(data)
formula = ldfi.create_sat_formula(config, data)
print(formula)
assert(formula == And(Or(Or(Bool(oab1), Bool(oac1)),
Not(And(Bool(oab1)))),
Or(Or(Bool(oac1)),
Not(And(Bool(oab1))))))

if __name__ == '__main__':
unittest.main()
def test_create_formula(caplog):
caplog.set_level(logging.DEBUG)

config = ldfi.Config(-1, [-1], 2, 0)
oab1 = o("A", "B", 1)
oac1 = o("A", "C", 1)
ab1 = z3.Bool(oab1)
ac1 = z3.Bool(oac1)

crashes = set()

# First run
previous_faults = [[]]
potential_faults = [[oab1, oac1]]
data = ldfi.Data(previous_faults, potential_faults, crashes)
ldfi.sanity_check(data)
formula = ldfi.create_sat_formula(config, data)
assert(formula == And(Or(ab1, ac1)))
(result, model, statistics) = ldfi.sat_solve(formula)
assert result == z3.sat
event = ldfi.create_log_event(config, result, model, statistics)
assert event.faults == ('{"faults": [%s]}' % oab1)

# Second run
previous_faults = [[oab1]]
potential_faults = [[oab1, oac1], [oac1]]
data = ldfi.Data(previous_faults, potential_faults, crashes)
ldfi.sanity_check(data)
formula = ldfi.create_sat_formula(config, data)
expected_formula = And(Or(ab1, ac1),
Or(Or(ac1),
Not(And(ab1))))
assert(formula == expected_formula)
(result, model, statistics) = ldfi.sat_solve(formula)
assert result == z3.sat
event = ldfi.create_log_event(config, result, model, statistics)
assert event.faults == ('{"faults": [%s]}' % oac1)

# Third run
previous_faults = [[oab1], [oac1]]
potential_faults = [[oab1, oac1], [oac1], [oab1]]
data = ldfi.Data(previous_faults, potential_faults, crashes)
ldfi.sanity_check(data)
formula = ldfi.create_sat_formula(config, data)
expected_formula = And(Or(ab1, ac1),
Or(Or(ac1),
Not(And(ab1))),
Or(Or(ab1),
Not(And(ac1))))
assert(formula == expected_formula)
(result, model, statistics) = ldfi.sat_solve(formula)
assert result == z3.sat
event = ldfi.create_log_event(config, result, model, statistics)
assert event.faults == ('{"faults": [%s, %s]}' % (oab1, oac1))

# Forth run
oab2 = o("A", "B", 2) # newly discovered edge
ab2 = z3.Bool(oab2)
previous_faults = [[oab1], [oac1], [oab1, oac1]]
potential_faults = [[oab1, oac1], [oac1], [oab1], [oab2]] # solver finds: oab2
data = ldfi.Data(previous_faults, potential_faults, crashes)
ldfi.sanity_check(data)
formula = ldfi.create_sat_formula(config, data)
expected_formula = And(Or(ab1, ac1),
Or(Or(ac1),
Not(And(ab1))),
Or(Or(ab1),
Not(And(ac1))),
Or(Or(ab2),
Not(And(ab1, ac1))))
(result, model, statistics) = ldfi.sat_solve(formula)
assert result == z3.sat
event = ldfi.create_log_event(config, result, model, statistics)
assert event.faults == ('{"faults": [%s, %s, %s]}' % (oab1, oab2, oac1))

# Fifth run
previous_faults = [[oab1], [oac1], [oab1, oac1], [oab2]]
potential_faults = [[oab1, oac1], [oac1], [oab1], [oab2], []] # done
data = ldfi.Data(previous_faults, potential_faults, crashes)
ldfi.sanity_check(data)
formula = ldfi.create_sat_formula(config, data)
expected_formula = And(Or(ab1, ac1),
Or(Or(ac1),
Not(And(ab1))),
Or(Or(ab1),
Not(And(ac1))),
Or(Or(ab2),
Not(And(ab1, ac1))),
Or(Not(And(ab1, ab2, ac1))))

(result, model, statistics) = ldfi.sat_solve(formula)
assert result == z3.unsat

0 comments on commit 788d661

Please sign in to comment.