forked from PaulKMueller/llama_traffic
-
Notifications
You must be signed in to change notification settings - Fork 0
/
waymo_utils.py
118 lines (88 loc) · 3.61 KB
/
waymo_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import os
import numpy as np
from scipy import interpolate
import pandas as pd
def get_scenario_list():
"""Returns an array of strings with all the paths to the available scenarios."""
try:
folder_path = "/mrtstorage/datasets/tmp/waymo_open_motion_v_1_2_0/uncompressed/tf_example/training"
all_entries = os.listdir(folder_path)
# Filter the list, keeping only files (not directories)
files = [
entry
for entry in all_entries
if os.path.isfile(os.path.join(folder_path, entry))
]
return files
except FileNotFoundError:
return f"Error: The folder {folder_path} does not exist."
except PermissionError:
return f"Error: Permission denied for accessing {folder_path}."
except Exception as e:
return f"Error: An unexpected error occurred - {str(e)}."
def get_scenario_index(scenario_name):
"""Returns the scenario id for the given scenario name.
Args:
scenario_name (str): The name of the scenario
"""
scenario_list = get_scenario_list()
scenario_index = scenario_list.index(scenario_name)
return scenario_index
def get_scenario_id(scenario_name):
"""Returns the scenario id for the given scenario name.
Args:
scenario_name (str): The name of the scenario
"""
scenario_list = get_scenario_list()
scenario_index = scenario_list.index(scenario_name)
return scenario_index
def get_spline_for_coordinates(coordinates):
"""Returns the splined coordinates for the given trajectory coordinates.
Args:
coordinates (pd.DataFrame): The coordinates of a vehicle represented as a DataFrame
"""
# Get the x and y coordinates
x = coordinates["X"]
y = coordinates["Y"]
filtered_x = [x[0]]
filtered_y = [y[0]]
threshold = 1e-5
for i in range(1, len(x)):
if np.sqrt((x[i] - x[i - 1]) ** 2 + (y[i] - y[i - 1]) ** 2) > threshold:
filtered_x.append(x[i])
filtered_y.append(y[i])
# Check if there are more data points than the minimum required for a spline
if len(filtered_x) < 4 or len(filtered_y) < 4:
return get_adjusted_coordinates(coordinates)
# Check if the coordinates are constant
if len(set(filtered_x)) <= 1 and len(set(filtered_y)) <= 1:
print("Both x and y are constant. Cannot fit a spline.")
return get_adjusted_coordinates(coordinates)
elif len(set(filtered_x)) <= 1:
print("x is constant. Cannot fit a spline.")
return get_adjusted_coordinates(coordinates)
elif len(set(filtered_y)) <= 1:
print("y is constant. Cannot fit a spline.")
return get_adjusted_coordinates(coordinates)
else:
# Call splprep
tck, u = interpolate.splprep([filtered_x, filtered_y], s=120)
# Get the spline for the x and y coordinates
unew = np.arange(0, 1.01, 0.01)
spline = interpolate.splev(unew, tck)
result = pd.DataFrame({"X": spline[0], "Y": spline[1]})
return result
def get_adjusted_coordinates(coordinates):
"""For given coordinates returns their adjusted coordinates.
This means that the coordinates are adapted to have 101 X coordinates and 101 Y coordinates.
Args:
coordinates (pd.DataFrame): The coordinates of a vehicle represented as a DataFrame
"""
adjusted_coordinates = pd.DataFrame(columns=["X", "Y"])
x = coordinates["X"]
y = coordinates["Y"]
# Copy the first X coordinate 101 times
adjusted_coordinates["X"] = [x[0]] * 101
# Copy the first Y coordinate 101 times
adjusted_coordinates["Y"] = [y[0]] * 101
return adjusted_coordinates