-
Notifications
You must be signed in to change notification settings - Fork 9
/
test_control.py
196 lines (158 loc) · 6.56 KB
/
test_control.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
#!/usr/bin/env python
# Copyright (c) 2018 Christoph Pilz
#
# This work is licensed under the terms of the MIT license.
# For a copy, see <https://opensource.org/licenses/MIT>.
import datetime
import rospy
import select
import sys
import termios
import threading
import time
import tty
from support.control import InputController
from support.present import ClockHandler, MondeoPlayerAgentHandler
from scenario_parser import OpenScenarioParser
from simulator_control import CarlaSimulatorControl
from timed_event_handler import TimedEventHandler
class TestControl():
def __init__(self, simulatorType, simulatorIP, simulatorPort, simulatorTimeout, scenarioFileType):
self.__actors = []
self.__scenarioParser = None
self.__simulatorControl = None
self.__logProcessor = None
self.__simulatorIP = simulatorIP
self.__simulatorPort = simulatorPort
self.__simulatorTimeout = simulatorTimeout
self.__wakeUpOnScenarioEnd = threading.Event()
self.isSkipCurrentTest = False
self.isAbortAllFurtherTests = False
if scenarioFileType == "OpenScenario":
self.__scenarioParser = OpenScenarioParser()
else:
raise NotImplementedError("Scenarios of Type \"" + scenarioFileType + "\" are not yet supported")
if simulatorType == "Carla":
self.__simulatorControl = CarlaSimulatorControl(simulatorIP, simulatorPort, simulatorTimeout)
else:
raise NotImplementedError("Simulator of Type \"" + simulatorType + "\" is not yet supported")
# ROS part, not so fancy yet
rospy.init_node('control_listener', anonymous=True)
ClockHandler()
InputController()
MondeoPlayerAgentHandler()
# parses config; returns on error
def setupTestWithConfig(self, fileName):
# prepare system
print("# prepare system")
TimedEventHandler().clear()
# parse scenario
print("# parse scenario")
if not self.__scenarioParser.parseScenario(fileName):
return False
self.__actors = self.__scenarioParser.getActors()
# self.__scenarioParser.getSceneDescription()
# self.__scenarioParser.getSimTimeEvents()
# self.__scenarioParser.getStateEvents()
# setup carla
print("# setup carla")
if not self.__simulatorControl.connect():
return False
# setup world
print("# setup world - skipped")
# TODO load Scene
# setup actors
print("# setup actors")
isAllActorsConnected = True
for actor in self.__actors:
status = actor.connectToSimulatorAndEvenHandler(self.__simulatorIP, self.__simulatorPort, self.__simulatorTimeout, self.__wakeUpOnScenarioEnd)
isAllActorsConnected = isAllActorsConnected and status
if not isAllActorsConnected:
return False
# setup timedEventHandler
print("# start timedEventHandler - skipped events")
TimedEventHandler().start()
return True
def executeTest(self):
# run timedEventHandler
print("# run timedEventHandler")
# run actors
print("# run actors")
for actor in self.__actors:
actor.startActing()
# run Test - implement logic
isAllRunning = True
for actor in self.__actors:
isAllRunning = isAllRunning and actor.getIsRunning()
if isAllRunning:
startTime = datetime.datetime.now()
print("# test started at", startTime)
print("# test running ...")
self.__wakeUpOnScenarioEnd.clear()
waitForKeyboardThread = threading.Thread(target=self._wakeUpOnKeyPress)
waitForKeyboardThread.start()
self.__wakeUpOnScenarioEnd.wait()
endTime = datetime.datetime.now()
waitForKeyboardThread.join()
isTestWithoutCollision = True
for actor in self.__actors:
if actor.collisionEvent is not None:
print(actor.getName(), "crashed with", actor.collisionEvent, "at", actor.collisionEvent.actor.get_transform())
isTestWithoutCollision = False
if isTestWithoutCollision:
print("# SUCCESS at", endTime, "runtime was", endTime-startTime)
else:
print("# FAILED at", endTime, "runtime was", endTime-startTime)
else:
print("# test failed starting")
# stop actors
print("# stop actors")
for actor in self.__actors:
actor.stopActing()
# stop timedEventHandler
print("# stop timedEventHandler - skipped events")
TimedEventHandler().stop()
return False
def cleanupTest(self):
# cleanup timedEventHandler
print("# cleanup timedEventHandler")
TimedEventHandler().cleanup()
# cleanup actors
print("# cleanup actors")
for actor in self.__actors:
actor.disconnectFromSimulatorAndEventHandler()
# cleanup world
print("# setup world - skipped")
# TODO default Scene ?
# cleanup carla
print("# cleanup carla")
self.__simulatorControl.disconnect()
return True
def _wakeUpOnKeyPress(self):
old_settings = termios.tcgetattr(sys.stdin)
try:
tty.setcbreak(sys.stdin.fileno())
while True:
if self.__wakeUpOnScenarioEnd.is_set():
break
if select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []):
c = sys.stdin.read(1)
if c == 's':
if not self.__wakeUpOnScenarioEnd.is_set():
self.__wakeUpOnScenarioEnd.set()
self.isSkipCurrentTest = True
break
elif c == 'q' or c == '\x1b': # x1b is ESC
if not self.__wakeUpOnScenarioEnd.is_set():
self.__wakeUpOnScenarioEnd.set()
self.isAbortAllFurtherTests = True
break
else:
print("# press <s> to skip test")
print("# press <q> or <ESC> to abort all tests")
time.sleep(0.1)
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
def __getConfigDictFromFile(self, fileName):
print("[WARNING][TestControl::getConfigDictFromFile] Not yet implemented")
return False