Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

moved simulation logging to the simulation kernel #991

Merged
merged 4 commits into from
Jul 7, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 5 additions & 39 deletions flow/core/experiment.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
"""Contains an experiment class for running simulations."""
from flow.core.util import emission_to_csv
from flow.utils.registry import make_create_env
from flow.utils.data_pipeline import write_dict_to_csv, get_extra_info
from collections import defaultdict
from datetime import datetime
import logging
import time
import os
import numpy as np
import uuid


class Experiment:
Expand Down Expand Up @@ -140,20 +135,10 @@ def rl_actions(*_):
t = time.time()
times = []

# data pipeline
extra_info = defaultdict(list)
source_id = 'flow_{}'.format(uuid.uuid4().hex)

if convert_to_csv and self.env.simulator == "traci":
dir_path = self.env.sim_params.emission_path
trajectory_table_path = os.path.join(dir_path, '{}.csv'.format(source_id))

for i in range(num_runs):
ret = 0
vel = []
custom_vals = {key: [] for key in self.custom_callables.keys()}
run_id = "run_{}".format(i)
self.env.pipeline_params = (extra_info, source_id, run_id)
state = self.env.reset()
for j in range(num_steps):
t0 = time.time()
Expand All @@ -166,14 +151,6 @@ def rl_actions(*_):
vel.append(np.mean(self.env.k.vehicle.get_speed(veh_ids)))
ret += reward

# collect additional information for the data pipeline
get_extra_info(self.env.k.vehicle, extra_info, veh_ids, source_id, run_id)

# write to disk every 100 steps
if convert_to_csv and self.env.simulator == "traci" and j % 100 == 0:
write_dict_to_csv(trajectory_table_path, extra_info, not j and not i)
extra_info.clear()

# Compute the results for the custom callables.
for (key, lambda_func) in self.custom_callables.items():
custom_vals[key].append(lambda_func(self.env))
Expand All @@ -191,6 +168,11 @@ def rl_actions(*_):

print("Round {0}, return: {1}".format(i, ret))

# Save emission data at the end of every rollout. This is skipped
# by the internal method if no emission path was specified.
if self.env.simulator == "traci":
self.env.k.simulation.save_emission(run_id=i)

# Print the averages/std for all variables in the info_dict.
for key in info_dict.keys():
print("Average, std {}: {}, {}".format(
Expand All @@ -200,20 +182,4 @@ def rl_actions(*_):
print("steps/second:", np.mean(times))
self.env.terminate()

if convert_to_csv and self.env.simulator == "traci":
# wait a short period of time to ensure the xml file is readable
time.sleep(0.1)

# collect the location of the emission file
emission_filename = \
"{0}-emission.xml".format(self.env.network.name)
emission_path = os.path.join(dir_path, emission_filename)

# convert the emission file into a csv
emission_to_csv(emission_path)

# Delete the .xml version of the emission file.
os.remove(emission_path)
write_dict_to_csv(trajectory_table_path, extra_info)

return info_dict
182 changes: 165 additions & 17 deletions flow/core/kernel/simulation/traci.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import logging
import subprocess
import signal
import csv


# Number of retries on restarting SUMO before giving up
Expand All @@ -21,6 +22,32 @@ class TraCISimulation(KernelSimulation):
"""Sumo simulation kernel.

Extends flow.core.kernel.simulation.KernelSimulation

Attributes
----------
sumo_proc : subprocess.Popen
contains the subprocess.Popen instance used to start traci
sim_step : float
seconds per simulation step
emission_path : str or None
Path to the folder in which to create the emissions output. Emissions
output is not generated if this value is not specified
time : float
used to internally keep track of the simulation time
stored_data : dict <str, dict <float, dict <str, Any>>>
a dict object used to store additional data if an emission file is
provided. The first key corresponds to the name of the vehicle, the
second corresponds to the time the sample was issued, and the final
keys represent the additional data stored at every given time for every
vehicle, and consists of the following keys:

* acceleration (no noise): the accelerations issued to the vehicle,
excluding noise
* acceleration (requested): the requested acceleration by the vehicle,
including noise
* acceleration (actual): the actual acceleration by the vehicle,
collected by computing the difference between the speeds of the
vehicle and dividing it by the sim_step term
"""

def __init__(self, master_kernel):
Expand All @@ -33,8 +60,12 @@ def __init__(self, master_kernel):
sub-kernels)
"""
KernelSimulation.__init__(self, master_kernel)
# contains the subprocess.Popen instance used to start traci

self.sumo_proc = None
self.sim_step = None
self.emission_path = None
self.time = 0
self.stored_data = dict()

def pass_api(self, kernel_api):
"""See parent class.
Expand Down Expand Up @@ -62,10 +93,61 @@ def simulation_step(self):

def update(self, reset):
"""See parent class."""
pass
if reset:
self.time = 0
else:
self.time += self.sim_step

# Collect the additional data to store in the emission file.
if self.emission_path is not None:
kv = self.master_kernel.vehicle
for veh_id in self.master_kernel.vehicle.get_ids():
t = round(self.time, 2)

# some miscellaneous pre-processing
position = kv.get_2d_position(veh_id)

# Make sure dictionaries corresponding to the vehicle and
# time are available.
if veh_id not in self.stored_data.keys():
self.stored_data[veh_id] = dict()
if t not in self.stored_data[veh_id].keys():
self.stored_data[veh_id][t] = dict()

# Add the speed, position, and lane data.
self.stored_data[veh_id][t].update({
"speed": kv.get_speed(veh_id),
"lane_number": kv.get_lane(veh_id),
"edge_id": kv.get_edge(veh_id),
"relative_position": kv.get_position(veh_id),
"x": position[0],
"y": position[1],
"headway": kv.get_headway(veh_id),
"leader_id": kv.get_leader(veh_id),
"follower_id": kv.get_follower(veh_id),
"leader_rel_speed":
kv.get_speed(kv.get_leader(veh_id))
- kv.get_speed(veh_id),
"target_accel_with_noise_with_failsafe":
kv.get_accel(veh_id, noise=True, failsafe=True),
"target_accel_no_noise_no_failsafe":
kv.get_accel(veh_id, noise=False, failsafe=False),
"target_accel_with_noise_no_failsafe":
kv.get_accel(veh_id, noise=True, failsafe=False),
"target_accel_no_noise_with_failsafe":
kv.get_accel(veh_id, noise=False, failsafe=True),
"realized_accel":
kv.get_realized_accel(veh_id),
"road_grade": kv.get_road_grade(veh_id),
"distance": kv.get_distance(veh_id),
})
AboudyKreidieh marked this conversation as resolved.
Show resolved Hide resolved

def close(self):
"""See parent class."""
# Save the emission data to a csv.
if self.emission_path is not None:
self.save_emission()

self.kernel_api.close()

def check_collision(self):
Expand All @@ -75,10 +157,24 @@ def check_collision(self):
def start_simulation(self, network, sim_params):
"""Start a sumo simulation instance.

This method uses the configuration files created by the network class
to initialize a sumo instance. Also initializes a traci connection to
interface with sumo from Python.
This method performs the following operations:

1. It collect the simulation step size and the emission path
information. If an emission path is specifies, it ensures that the
path exists.
2. It also uses the configuration files created by the network class to
initialize a sumo instance.
3. Finally, It initializes a traci connection to interface with sumo
from Python and returns the connection.
"""
# Save the simulation step size (for later use).
self.sim_step = sim_params.sim_step

# Update the emission path term.
self.emission_path = sim_params.emission_path
if self.emission_path is not None:
ensure_dir(self.emission_path)

error = None
for _ in range(RETRIES_ON_ERROR):
try:
Expand Down Expand Up @@ -109,17 +205,6 @@ def start_simulation(self, network, sim_params):
sumo_call.append("--lateral-resolution")
sumo_call.append(str(sim_params.lateral_resolution))

# add the emission path to the sumo command (if requested)
if sim_params.emission_path is not None:
ensure_dir(sim_params.emission_path)
emission_out = os.path.join(
sim_params.emission_path,
"{0}-emission.xml".format(network.name))
sumo_call.append("--emission-output")
sumo_call.append(emission_out)
else:
emission_out = None

if sim_params.overtake_right:
sumo_call.append("--lanechange.overtake-right")
sumo_call.append("true")
Expand All @@ -146,7 +231,7 @@ def start_simulation(self, network, sim_params):
if sim_params.num_clients > 1:
logging.info(" Num clients are" +
str(sim_params.num_clients))
logging.debug(" Emission file: " + str(emission_out))
logging.debug(" Emission file: " + str(self.emission_path))
logging.debug(" Step length: " + str(sim_params.sim_step))

# Opening the I/O thread to SUMO
Expand Down Expand Up @@ -180,3 +265,66 @@ def teardown_sumo(self):
os.killpg(self.sumo_proc.pid, signal.SIGTERM)
except Exception as e:
print("Error during teardown: {}".format(e))

def save_emission(self, run_id=0):
"""Save any collected emission data to a csv file.

If not data was collected, nothing happens. Moreover, any internally
stored data by this class is clear whenever data is stored.

Parameters
----------
run_id : int
the rollout number, appended to the name of the emission file. Used
to store emission files from multiple rollouts run sequentially.
"""
# If there is no stored data, ignore this operation. This is to ensure
# that data isn't deleted if the operation is called twice.
if len(self.stored_data) == 0:
return

# Get a csv name for the emission file.
name = "{}-{}_emission.csv".format(
self.master_kernel.network.network.name, run_id)

# The name of all stored data-points (excluding id and time)
stored_ids = [
"speed",
"lane_number",
"edge_id",
"relative_position",
"x",
"y",
"headway",
"leader_id",
"follower_id",
"leader_rel_speed",
"target_accel_with_noise_with_failsafe",
"target_accel_no_noise_no_failsafe",
"target_accel_with_noise_no_failsafe",
"target_accel_no_noise_with_failsafe",
"realized_accel",
"road_grade",
"distance",
]

# Update the stored data to push to the csv file.
final_data = {"id": [], "time": []}
final_data.update({key: [] for key in stored_ids})

for veh_id in self.stored_data.keys():
for t in self.stored_data[veh_id].keys():
final_data['id'].append(veh_id)
final_data['time'].append(t)
for key in stored_ids:
final_data[key].append(self.stored_data[veh_id][t][key])

with open(os.path.join(self.emission_path, name), "w") as f:
print(os.path.join(self.emission_path, name), self.emission_path)
writer = csv.writer(f, delimiter=',')
writer.writerow(final_data.keys())
writer.writerows(zip(*final_data.values()))

# Clear all memory from the stored data. This is useful if this
# function is called in between resets.
self.stored_data.clear()
52 changes: 0 additions & 52 deletions flow/utils/data_pipeline.py

This file was deleted.

Loading