diff --git a/integration/decoder-tests.py b/integration/decoder-tests.py index 8d842fbc..bdec09d4 100644 --- a/integration/decoder-tests.py +++ b/integration/decoder-tests.py @@ -3,6 +3,8 @@ import sys import cbor +from validation import validate_outputs + class ToothEvent: def __init__(self, time, delay, trigger, angle=None, rpm=None, cycle=None): self.time = time @@ -255,16 +257,30 @@ def test_start_stop_start(self): first_output = next(results.filter_between(t1, t2).filter_outputs()) self.assertEqual(first_output.cycle, 1) - -# t1-t5: + +# t2-t4: # - should have sync # - ignition: dwell and advance match # - fuel: pw matches + for f in results.filter_between(t2, t4).filter_feeds(): + self.assertEqual(f.values['sync'], 1) -# -# t5: -# - we lose sync within X ms -# + validate_outputs(results.filter_between(t2, t4)) + +# t4-t5: +# - we lose sync within X ms, and it stays lost +# - last output within Y ms + last_sync = next(filter(lambda i: i.values['sync'] == 0, + results.filter_between(t4, t5).filter_feeds())) + + self.assertWithin(last_sync.time - t4, ms_ticks(1), ms_ticks(100)) + remaining_syncs = filter(lambda i: i.values['sync'] == 1, + results.filter_between(last_sync.time, t5).filter_feeds()) + self.assertEqual(len(list(remaining_syncs)), 0) + + outputs = list(results.filter_between(t4, t5).filter_outputs()) + if len(outputs) > 0: + self.assertWithin(outputs[-1].time, ms_ticks(1), ms_ticks(100)) if __name__ == "__main__": diff --git a/integration/validation.py b/integration/validation.py index c8670c35..7b4eb504 100644 --- a/integration/validation.py +++ b/integration/validation.py @@ -24,6 +24,7 @@ def filter_between(self, start, end): filter(lambda i: i.time >= start and i.time <= end, self.log) ) + def __next__(self): return next(self.log) @@ -33,6 +34,8 @@ def __iter__(self): + + def degrees_for_tick_rpm(ticks, rpm): ticks_per_degree = (4000000 / 6.0) / rpm return ticks / ticks_per_degree @@ -44,6 +47,67 @@ def clamp_angle(angle): angle += 720 return angle +class OutputConfig: + FUEL = 1 + IGN = 2 + + def __init__(self, pin, typ, angle): + self.pin = pin + self.typ = typ + self.angle = angle + + def _offset(self, angle): + adv = angle - self.angle + + # normalize to (-360, 360) for easy comparison with bounds + if adv >= 360: + adv -= 720 + if adv <= -360: + adv += 720 + return adv + +class Config: + def __init__(self): + self.outputs = [ + OutputConfig(pin=0, typ=OutputConfig.IGN, angle=0), + OutputConfig(pin=1, typ=OutputConfig.IGN, angle=120), + OutputConfig(pin=2, typ=OutputConfig.IGN, angle=240), + OutputConfig(pin=0, typ=OutputConfig.IGN, angle=360), + OutputConfig(pin=1, typ=OutputConfig.IGN, angle=480), + OutputConfig(pin=2, typ=OutputConfig.IGN, angle=600), + + OutputConfig(pin=8, typ=OutputConfig.FUEL, angle=700), + OutputConfig(pin=9, typ=OutputConfig.FUEL, angle=460), + OutputConfig(pin=10, typ=OutputConfig.FUEL, angle=220), + ] + + + def _offset_within(self, oc, angle, lower, upper): + """Validate that the advance is within bounds. lower and upper must + both be within (-360, 360).""" + + adv = oc._offset(angle) + if adv >= lower and adv <= upper: + return True + return False + + def lookup(self, pin, end_angle): + # TODO maybe don't hardcode this, but until then: + # - assume fuel is +/- 10 degrees + # - assume ignition is -50 to +10 degrees + for oc in self.outputs: + if pin != oc.pin: + continue + + if oc.typ == OutputConfig.IGN: + if self._offset_within(oc, end_angle, -50, 10): + return oc + + if oc.typ == OutputConfig.FUEL: + if self._offset_within(oc, end_angle, -10, 10): + return oc + +config = Config() class OutputEvent: def __init__(self, time, pin, duration_us, end_angle, cycle): @@ -52,9 +116,9 @@ def __init__(self, time, pin, duration_us, end_angle, cycle): self.duration_us = duration_us self.end_angle = end_angle self.cycle = cycle - -# self.relative_angle -# self.type = + self.oc = config.lookup(pin, end_angle) + if self.oc: + self.advance = -self.oc._offset(end_angle) class FeedEvent: @@ -122,3 +186,43 @@ def enrich_log(inputs, log) -> EnrichedLog: entry["values"])) return EnrichedLog(result) + +def validate_outputs(log): + """Validate that all outputs are associated with a configured output, + and that there are no gaps or missing outputs. Each cycle should have + the full count of configured outputs, except for the first and last. + + If feed values containing advance and durations (dwell, pw) are present, + also evaluate that those values are reasonably close.""" + + is_first_cycle = True + current_cycle = None + current_cycle_outputs = 0 + cycles = [] + for o in log.filter_outputs(): + if o.oc is None: + return False + + if is_first_cycle: + is_first_cycle = False + current_cycle = o.cycle + if o.cycle != current_cycle: + if o.cycle > current_cycle + 1: + # We skipped a cycle + return False + cycles.append(current_cycle_outputs) + current_cycle_outputs = 0 + current_cycle = o.cycle + + current_cycle_outputs += 1 + + expected_count = len(config.outputs) + + if cycles[0] <= expected_count and \ + cycles[-1] <= expected_count and ( \ + len(cycles) < 3 or \ + all(map(lambda c: c == expected_count, cycles[1:-2]))): + return True + + return False +