-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvehic_pos_processor.py
255 lines (210 loc) · 9.48 KB
/
vehic_pos_processor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
import argparse
import time
import json
import logging
import os
from pathlib import Path
from collections import deque
from urllib.error import URLError
from urllib.request import Request, urlopen
# geojson indexes names
class GI:
type = "type"
trip = "trip"
shapes = "shapes"
feature = "feature"
features = "features"
geometry = "geometry"
properties = "properties"
lineString = "LineString"
coordinates = "coordinates"
gtfs_trip_id = "gtfs_trip_id"
featureCollection = "FeatureCollection"
shape_dist_traveled = "shape_dist_traveled"
gtfs_route_short_name = "gtfs_route_short_name"
# logging messages
class Log:
start = "Program has started."
net_err = "Network error: "
sleep = "Sleep failed, "
io_err = "Write to file failed: "
vpf_up = "Vehicle positions file updated"
@staticmethod
def trip_updater(trip_id):
return "Trip " + trip_id + " positions file updated"
@staticmethod
def del_files(trip):
return "Shape of trip " + trip + " file removed"
@staticmethod
def deL_fail(trip):
return "Some file for trip " + trip + " not found."
@staticmethod
def new_shape(trip):
return "New shape of trip " + trip + " file exported"
def update_json_file(json_data, path, mode, log_message):
try:
with open(path, mode) as f:
f.seek(0)
f.write(json.dumps(json_data))
if log_message != "":
logging.info(log_message)
except Exception as e:
raise IOError(e)
finally:
f.close()
def downloadURL(request):
webURL = urlopen(request)
response_body = webURL.read()
encoding = webURL.info().get_content_charset('utf-8')
return json.loads(response_body.decode(encoding))
def prepare_geojson_lfms():
geojson_lfms = {}
geojson_lfms[GI.type] = GI.featureCollection
geojson_lfms[GI.features] = [{}]
geojson_lfms[GI.features][0][GI.type] = GI.feature
geojson_lfms[GI.features][0][GI.properties] = {}
geojson_lfms[GI.features][0][GI.geometry] = {}
geojson_lfms[GI.features][0][GI.geometry][GI.type] = GI.lineString
geojson_lfms[GI.features][0][GI.geometry][GI.coordinates] = []
return geojson_lfms
def transform_bus_list(bus_input_list):
bus_properties = bus_input_list[GI.properties][GI.trip]
current_trip_gtfs_id = bus_properties[GI.gtfs_trip_id]
bus_output_list = {}
bus_output_list[GI.type] = GI.feature
bus_output_list[GI.properties] = {}
bus_output_list[GI.properties][GI.gtfs_trip_id] = current_trip_gtfs_id
bus_output_list[GI.properties][GI.gtfs_route_short_name] = bus_properties[GI.gtfs_route_short_name]
bus_output_list[GI.geometry] = {}
bus_output_list[GI.geometry][GI.coordinates] = bus_input_list[GI.geometry][GI.coordinates]
bus_output_list[GI.geometry][GI.type] = "Point"
return bus_output_list
def transform_shape_json_file(old_json_data):
new_json_data = {}
new_json_data[GI.type] = GI.featureCollection
new_json_data[GI.features] = [None]
new_json_data[GI.features][0] = {}
new_json_data[GI.features][0][GI.type] = GI.feature
new_json_data[GI.features][0][GI.geometry] = {}
new_json_data[GI.features][0][GI.geometry][GI.type] = GI.lineString
new_json_data[GI.features][0][GI.geometry][GI.properties] = {}
new_json_data[GI.features][0][GI.geometry][GI.coordinates] = []
for feature in old_json_data[GI.shapes]:
new_json_data[GI.features][0][GI.geometry][GI.coordinates].append(feature[GI.geometry][GI.coordinates])
return new_json_data
def is_old(coordinates):
from datetime import datetime
fmt = '%H:%M:%S'
coordinates_time = coordinates[1]
now = datetime.now().strftime(fmt)
tdelta = datetime.strptime(now, fmt) - datetime.strptime(coordinates_time, fmt)
seconds = tdelta.total_seconds()
if seconds > 10*60:
return True
return False
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument("--file_name", default="../data/last_positions", type=str, help="The last generated output file")
parser.add_argument("--update_time", default=20, type=int, help="Time to next request")
parser.add_argument("--update_error", default=20, type=int, help="Update time if network error occurred")
parser.add_argument("--log", default="../veh_pos_proc.log", type=str, help="Name of logging file")
parser.add_argument("--trips_folder", default="../data/trips", type=str, help="Name of trips folder")
return parser.parse_args()
"""
Following header is necessary for requesting golemio api.
code copied from https://golemioapi.docs.apiary.io/#reference/public-transport/vehicle-positions/get-all-vehicle-positions
access token generated by https://api.golemio.cz/api-keys/auth/sign-in
Get your own token!
"""
headers = {
'Content-Type': 'application/json; charset=utf-8',
'x-access-token': 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJlbWFpbCI6ImNpem1hcmZpbGlwQGdtYWlsLmNvbSIsImlkIjo3NiwibmFtZSI6bnVsbCwic3VybmFtZSI6bnVsbCwiaWF0IjoxNTcwNTQ2MTU2LCJleHAiOjExNTcwNTQ2MTU2LCJpc3MiOiJnb2xlbWlvIiwianRpIjoiMzAxYWNhNDUtNGRlNC00ZDRmLWI4NzAtMzQwMDQ5OTM1MzBhIn0.4rCELzCNY8XOSvjqQA7cKocPGJ8D2ezhXiWUkIRUNjg'
}
if __name__ == "__main__":
args = parse_arguments()
logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', level=logging.INFO, filename=args.log, filemode='w')
logging.info(Log.start)
active_trips = {key[:-6]: deque() for key in os.listdir(args.trips_folder)}
active_trips_set = {key[:-6] for key in os.listdir(args.trips_folder)}
while True:
# Download the updated information about buses and store them in the json_data array.
req_start = time.time()
try:
json_vehiclepositions = downloadURL(Request('https://api.golemio.cz/v1/vehiclepositions', headers=headers))
except URLError as e:
logging.error(Log.net_err + str(e))
time.sleep(args.update_error - (time.time() - req_start))
continue
# Process json data and generate geojson file
geojson_vehiclepositions = {}
geojson_vehiclepositions[GI.type] = GI.featureCollection
geojson_vehiclepositions["timestamp"] = time.strftime("%Y-%m-%d-%H:%M:%S")
geojson_vehiclepositions[GI.features] = []
current_trips_set = set()
for bus_input_list in json_vehiclepositions[GI.features]:
geojson_vehiclepositions[GI.features].append(transform_bus_list(bus_input_list))
current_trip_gtfs_id = bus_input_list[GI.properties][GI.trip][GI.gtfs_trip_id]
current_trips_set.add(current_trip_gtfs_id)
"""
active_trips variable holds all positions of each trip for last n seconds (defined in function is_old).
The following code update the bus positions information in the variable.
"""
if current_trip_gtfs_id not in active_trips or len(active_trips[current_trip_gtfs_id]) == 0:
active_trips[current_trip_gtfs_id] = deque()
active_trips[current_trip_gtfs_id].append([bus_input_list[GI.geometry][GI.coordinates], bus_input_list[GI.properties]["last_position"]["origin_time"], bus_input_list[GI.properties]["last_position"]["gtfs_shape_dist_traveled"]])
else:
active_trips[current_trip_gtfs_id].append([bus_input_list[GI.geometry][GI.coordinates], bus_input_list[GI.properties]["last_position"]["origin_time"], bus_input_list[GI.properties]["last_position"]["gtfs_shape_dist_traveled"]])
while len(active_trips[current_trip_gtfs_id]) > 0 and is_old(active_trips[current_trip_gtfs_id][0]):
active_trips[current_trip_gtfs_id].popleft()
logging.info(Log.trip_updater(current_trip_gtfs_id))
try:
update_json_file(geojson_vehiclepositions, Path(args.file_name), "w+", Log.vpf_up)
except IOError as e:
logging.error(Log.io_err + str(e))
continue
for trip in active_trips_set - current_trips_set:
active_trips.pop(trip)
try:
os.remove(Path(args.trips_folder) / (trip + ".shape"))
os.remove(Path(args.trips_folder) / (trip + ".lfms"))
logging.info(Log.del_files(trip))
except FileNotFoundError as e:
logging.warning(Log.deL_fail(trip))
for trip in current_trips_set - active_trips_set:
try:
json_data_trip = downloadURL(Request('https://api.golemio.cz/v1/gtfs/trips/' + trip + '?includeShapes=true', headers=headers))
geojson_shape = transform_shape_json_file(json_data_trip)
update_json_file(geojson_shape, Path(args.trips_folder) / (trip + ".shape"), "w+", Log.new_shape(trip))
except URLError as e:
logging.error(Log.net_err + str(e))
current_trips_set -= trip
continue
except IOError as e:
logging.error(Log.io_err + str(e))
current_trips_set -= trip
continue
active_trips_set = current_trips_set
for trip in current_trips_set:
try:
json_data_trip = downloadURL(Request('https://api.golemio.cz/v1/gtfs/trips/' + trip + '?includeShapes=true', headers=headers))
geojson_lfms = prepare_geojson_lfms()
if len(active_trips[trip]) > 0:
geojson_lfms[GI.features][0][GI.geometry][GI.coordinates].append(active_trips[trip][0][0])
for shape_fault in json_data_trip[GI.shapes]:
if float(shape_fault[GI.properties][GI.shape_dist_traveled]) > float(active_trips[trip][0][2]) and float(shape_fault[GI.properties][GI.shape_dist_traveled]) < float(active_trips[trip][len(active_trips[trip]) - 1][2]):
geojson_lfms[GI.features][0][GI.geometry][GI.coordinates].append(shape_fault[GI.geometry][GI.coordinates])
geojson_lfms[GI.features][0][GI.geometry][GI.coordinates].append(active_trips[trip][len(active_trips[trip]) - 1][0])
update_json_file(geojson_lfms, Path(args.trips_folder) / (trip + '.lfms'), "w+", "")
except URLError as e:
logging.error(Log.net_err + str(e))
current_trips_set -= trip
continue
except IOError as e:
logging.error(Log.io_err + str(e))
current_trips_set -= trip
continue
try:
time.sleep(args.update_time - (time.time() - req_start))
except Exception as e:
logging.warning(Log.sleep + str(e))
continue