-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
integration-tests: implement framework for simulator-based integratio…
…n tests The py/viaems directory now contains code to: - interface with the hosted-mode simulator - define the concept of a test scenario with inputs - a Nminus1+cam test trigger for scenarios - validation for output angles and durations Additionally, py/integration-tests/smoke-tests.py an initial example of using this infrastructure to create a basic smoke test for engine cranking and startup.
- Loading branch information
Showing
22 changed files
with
749 additions
and
87 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
6 changes: 1 addition & 5 deletions
6
integration/interface-tests.py → py/integration-tests/interface-tests.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
import unittest | ||
import sys | ||
import cbor | ||
|
||
from viaems.connector import ViaemsWrapper | ||
from viaems.decoder import CrankNMinus1PlusCam_Wheel | ||
from viaems.scenario import Scenario | ||
from viaems.testcase import TestCase | ||
from viaems.util import ticks_for_rpm_degrees, ms_ticks | ||
from viaems.validation import validate_outputs | ||
|
||
|
||
|
||
class NMinus1DecoderTests(TestCase): | ||
|
||
def test_start_stop_start(self): | ||
scenario = Scenario("start_stop_start", CrankNMinus1PlusCam_Wheel(36)) | ||
scenario.set_brv(12.5) | ||
scenario.set_map(102); | ||
scenario.wait_milliseconds(1000) | ||
t1 = scenario.now() | ||
|
||
# simulate a crank condition | ||
scenario.set_rpm(300); | ||
scenario.set_brv(9.0) | ||
scenario.set_map(80.0) | ||
scenario.wait_milliseconds(1000) | ||
|
||
# engine catch ramp-up | ||
t2 = scenario.now() | ||
for rpm in range(300, 800, 100): | ||
scenario.set_rpm(rpm) | ||
scenario.wait_milliseconds(100) | ||
|
||
t3 = scenario.now() | ||
|
||
scenario.set_map(35) | ||
scenario.set_brv(14.4) | ||
scenario.wait_milliseconds(1000) | ||
|
||
t4 = scenario.now() | ||
|
||
scenario.set_rpm(0); | ||
scenario.set_map(102) | ||
scenario.set_brv(12) | ||
scenario.wait_milliseconds(5000) | ||
|
||
t5 = scenario.now() | ||
scenario.end() | ||
|
||
results = self.conn.execute_scenario(scenario) | ||
|
||
# t1 - t2 is cranking. validate: | ||
# - we gain sync within 500 ms | ||
# - we get first output within 1 cycles | ||
# - (after sync) advance is reasonable | ||
# - (after sync) pw is reasonable | ||
|
||
first_sync = next(filter(lambda i: i.values['sync'] == 1, | ||
results.filter_between(t1, t2).filter_feeds())) | ||
|
||
self.assertLessEqual(first_sync.time - t1, ms_ticks(500)) | ||
|
||
for f in results.filter_between(first_sync.time + ms_ticks(10), t2).filter_feeds(): | ||
# start looking 10 ms later, since actual calculations race with decoder | ||
self.assertWithin(f.values['advance'], 10, 20) | ||
self.assertEqual(f.values['sync'], 1) | ||
self.assertWithin(f.values['fuel_pulsewidth_us'], 4400, 4500) | ||
|
||
first_output = next(results.filter_between(t1, t2).filter_outputs()) | ||
self.assertEqual(first_output.cycle, 1) | ||
|
||
|
||
# t2-t4: | ||
# - should have sync | ||
# - fully validate output | ||
for f in results.filter_between(t2, t4).filter_feeds(): | ||
self.assertEqual(f.values['sync'], 1) | ||
|
||
# t4-t5: | ||
# - we lose sync within X ms, and it stays lost | ||
# - last output within Y ms | ||
last_sync = next(filter(lambda i: i.values['sync'] == 0, | ||
results.filter_between(t4, t5).filter_feeds())) | ||
|
||
self.assertWithin(last_sync.time - t4, ms_ticks(1), ms_ticks(100)) | ||
remaining_syncs = filter(lambda i: i.values['sync'] == 1, | ||
results.filter_between(last_sync.time, t5).filter_feeds()) | ||
self.assertEqual(len(list(remaining_syncs)), 0) | ||
|
||
outputs = list(results.filter_between(t4, t5).filter_outputs()) | ||
if len(outputs) > 0: | ||
self.assertWithin(outputs[-1].time, ms_ticks(1), ms_ticks(100)) | ||
|
||
# Finally, validate all the outputs are associated with an event and match | ||
# the expected angles/durations | ||
is_valid, msg = validate_outputs(results.filter_between(t1, t5)) | ||
self.assertTrue(is_valid, msg) | ||
|
||
|
||
if __name__ == "__main__": | ||
unittest.main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
[project] | ||
name = "viaems" | ||
version = "0.0.0" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,5 @@ | ||
pip | ||
cbor | ||
pyvcd | ||
|
||
. |
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
File renamed without changes.
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
import cbor | ||
import json | ||
import subprocess | ||
import tempfile | ||
import random | ||
import os | ||
|
||
from viaems.vcd import dump_vcd | ||
from viaems.validation import enrich_log | ||
|
||
|
||
class ViaemsWrapper: | ||
def __init__(self, binary): | ||
self.binary = binary | ||
self.process = None | ||
|
||
def start(self, replay=None): | ||
args = [self.binary] | ||
if replay: | ||
args.append("-i") | ||
args.append(replay) | ||
|
||
self.process = subprocess.Popen( | ||
args, | ||
bufsize=-1, | ||
stdin=subprocess.PIPE, | ||
stdout=subprocess.PIPE, | ||
stderr=subprocess.PIPE, | ||
) | ||
|
||
def kill(self): | ||
if not self.process: | ||
return | ||
self.process.kill() | ||
self.process.communicate() | ||
self.process.wait() | ||
|
||
def send(self, payload): | ||
binary = cbor.dumps(payload) | ||
self.process.stdin.write(binary) | ||
self.process.stdin.flush() | ||
|
||
def recv(self): | ||
result = cbor.load(self.process.stdout) | ||
return result | ||
|
||
def recv_until_id(self, id, max_messages=100000): | ||
while True: | ||
result = cbor.load(self.process.stdout) | ||
if "id" in result and int(result["id"]) == id: | ||
return result | ||
max_messages -= 1 | ||
if max_messages == 0: | ||
return None | ||
|
||
def structure(self): | ||
id = random.randint(0, 1024) | ||
self.send( | ||
{ | ||
"id": id, | ||
"type": "request", | ||
"method": "structure", | ||
} | ||
) | ||
result = self.recv_until_id(id) | ||
return result | ||
|
||
def get(self, path): | ||
id = random.randint(0, 1024) | ||
self.send( | ||
{ | ||
"id": id, | ||
"type": "request", | ||
"method": "get", | ||
"path": path, | ||
} | ||
) | ||
result = self.recv_until_id(id) | ||
return result | ||
|
||
def set(self, path, value): | ||
id = random.randint(0, 1024) | ||
self.send( | ||
{ | ||
"id": id, | ||
"type": "request", | ||
"method": "set", | ||
"path": path, | ||
"value": value, | ||
} | ||
) | ||
result = self.recv_until_id(id) | ||
return result | ||
|
||
def execute_scenario(self, scenario): | ||
tf = open(f"scenario_{scenario.name}.inputs", "w") | ||
for ev in scenario.events: | ||
tf.write(ev.render()) | ||
tf.close() | ||
|
||
self.start(replay=tf.name) | ||
results = [] | ||
while True: | ||
try: | ||
msg = self.recv() | ||
results.append(msg) | ||
except EOFError: | ||
break | ||
|
||
for line in self.process.stderr: | ||
print(line) | ||
|
||
results.sort(key=lambda x: x["time"]) | ||
dump_vcd(results, f"scenario_{scenario.name}.vcd") | ||
enriched_log = enrich_log(scenario.events, results) | ||
return enriched_log |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
from viaems.util import ticks_for_rpm_degrees, clamp_angle | ||
|
||
|
||
class CrankNMinus1PlusCam_Wheel: | ||
def __init__(self, N): | ||
self.N = N | ||
self.degrees_per_tooth = 360.0 / N | ||
self.cycle = 0 | ||
self.index = 0 | ||
|
||
# Populate even tooth wheel for trigger 0 | ||
self.wheel = [ | ||
(0, x * self.degrees_per_tooth) | ||
for x in range(N * 2) | ||
# Except for a missing tooth each rev | ||
# The first tooth *after* the gap is the 0 degree mark, and we want | ||
# the first tooth to be angle 0 | ||
if x != 35 and x != (N + 35) | ||
] | ||
|
||
# Add a cam sync at 45 degrees | ||
self.wheel.append((1, 45.0)) | ||
self.wheel.sort(key=lambda x: x[1]) | ||
|
||
def _next_index(self): | ||
return (self.index + 1) % len(self.wheel) | ||
|
||
def time_to_next_trigger(self, rpm): | ||
current_angle = self.wheel[self.index][1] | ||
next_angle = self.wheel[self._next_index()][1] | ||
diff = clamp_angle(next_angle - current_angle) | ||
return ticks_for_rpm_degrees(rpm, diff) | ||
|
||
def next(self, rpm): | ||
self.index = self._next_index() | ||
trigger, angle = self.wheel[self.index] | ||
if angle == 0: | ||
self.cycle += 1 | ||
return (trigger, angle) |
Oops, something went wrong.