-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathCAV.test.py
190 lines (142 loc) · 4.61 KB
/
CAV.test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# AUM SHREEGANESHAAYA NAMAH|| AUM NAMAH SHIVAAYA||
import os
import config
from CAV import CAV
def test_cav_init():
print("######## TEST CAV INIT ########")
print("###############################\n")
c = CAV({
"id" : 0,
"closest_wp" : 0, "dest" : 3,
"x" : 90, "y" : 906, "angle" : 0.785, "speed" : 10, "acc" : -1
})
print(c)
print()
def test_cav_sp_and_fp():
print("######## TEST CAV SHORTEST PATH AND FUTURE PATH ########")
print("########################################################\n")
for car in config.cars:
cav = CAV(car)
cav.a_brake, cav.v_max = -5, 5
cav.compute_future_path()
print(f"ID = {cav.ID}")
print(f"Shortest Path = {cav.SP}")
print(f"Future Path = {cav.FP}")
print()
print()
def test_cav_find_conflict_zones():
print("######## TEST CAV FIND CONFLICT ZONES ########")
print("##############################################\n")
CAVs, l_cars = [], len(config.cars)
for car in config.cars:
cav = CAV(car)
cav.a_brake, cav.v_max = -5, 5
cav.compute_future_path()
CAVs.append(cav)
for i in range(l_cars):
CAVs[i].broadcast_info()
for i in range(l_cars):
CAVs[i].receive_others_info()
CAVs[i].find_conflict_zones_all_CAVs()
print(f"ID = {CAVs[i].ID}")
print(CAVs[i].CZ)
print()
print()
def test_cav_upto_construct_CDG():
print("######## TEST CAV UPTO CONSTRUCT CDG #########")
print("##############################################\n")
CAVs, l_cars = [], len(config.cars)
for car in config.cars:
cav = CAV(car)
cav.a_brake, cav.v_max = -5, 10
cav.compute_future_path()
CAVs.append(cav)
for i in range(l_cars):
CAVs[i].broadcast_info()
for i in range(l_cars):
CAVs[i].receive_others_info()
CAVs[i].find_conflict_zones_all_CAVs()
for i in range(l_cars):
CAVs[i].broadcast_PDG()
for i in range(l_cars):
CAVs[i].receive_others_PDGs()
CAVs[i].construct_CDG()
print(f"ID = {CAVs[i].ID}")
print(f"PDG =", CAVs[i].PDG)
print(f"Others_PDGs =", CAVs[i].Others_PDG)
print(f"CDG =", CAVs[i].CDG)
print()
print()
def test_cav_upto_motion_controller():
print("######## TEST CAV UPTO MOTION CONTROLLER #########")
print("##############################################\n")
CAVs, l_cars = [], len(config.cars)
for car in config.cars:
cav = CAV(car)
cav.a_brake, cav.v_max = -5, 10
CAVs.append(cav)
for i in range(l_cars):
CAVs[i].compute_future_path()
CAVs[i].broadcast_info()
for i in range(l_cars):
CAVs[i].receive_others_info()
CAVs[i].find_conflict_zones_all_CAVs()
for i in range(l_cars):
CAVs[i].broadcast_PDG()
for i in range(l_cars):
CAVs[i].receive_others_PDGs()
CAVs[i].construct_CDG()
CAVs[i].deadlock_resolution()
CAVs[i].motion_planner()
CAVs[i].motion_controller()
print("TEST COMPLETE SEE LOG FILES.\n")
def test_cav_full_cycle():
print("######## TEST CAV FULL CYCLE #########")
print("######################################\n")
CAVs, l_cars = [], len(config.cars)
for car in config.cars:
cav = CAV(car)
cav.a_brake, cav.v_max = -5, 10
CAVs.append(cav)
for j in range(15):
for i in range(l_cars):
CAVs[i].compute_future_path()
CAVs[i].broadcast_info()
for i in range(l_cars):
CAVs[i].receive_others_info()
CAVs[i].find_conflict_zones_all_CAVs()
for i in range(l_cars):
CAVs[i].broadcast_PDG()
for i in range(l_cars):
CAVs[i].receive_others_PDGs()
CAVs[i].construct_CDG()
CAVs[i].deadlock_resolution()
CAVs[i].motion_planner()
CAVs[i].motion_controller()
print("TEST COMPLETE SEE LOG FILES.\n")
def test_cav_execute():
print("######## TEST CAV EXECUTE #########")
print("######################################\n")
CAVs, l_cars = [], len(config.cars)
for car in config.cars: # Initiialize
cav = CAV(car)
CAVs.append(cav)
for i in range(l_cars): # Start
CAVs[i].thread.start()
for i in range(l_cars): # Wait for End
CAVs[i].thread.join()
print("TEST COMPLETE SEE LOG FILES.\n")
if __name__ == "__main__":
# straight_1, curve_1, sharp_curve_1
# straight_2_block: second is immovable (do not allow to reach dest)
# straight_2: second is slower
# sharp_curve_2: second is slower
config.importParentGraph(os.path.join(os.getcwd(), "samples/sharp_curve_2.json"))
config.startServer() # config.S is now the functioning server
# test_cav_init()
# test_cav_sp_and_fp()
# test_cav_find_conflict_zones()
# test_cav_upto_construct_CDG()
# test_cav_upto_motion_controller()
# test_cav_full_cycle()
test_cav_execute()