From f7a278c585e0076e12ce9436a340e4d3d87aa1c6 Mon Sep 17 00:00:00 2001 From: Eugene Vinitsky Date: Thu, 28 May 2020 15:27:18 -0400 Subject: [PATCH] Network update (#953) Add new I210 models and envs. --- docs/source/flow_setup.rst | 16 + examples/exp_configs/non_rl/highway_single.py | 12 +- .../exp_configs/non_rl/i210_subnetwork.py | 249 +- examples/exp_configs/non_rl/straight_road.py | 7 +- .../rl/multiagent/multiagent_i210.py | 273 +- .../rl/multiagent/multiagent_straight_road.py | 58 +- .../i210_with_ghost_cell_with_downstream.xml | 10 +- ...0_with_ghost_cell_with_downstream_test.xml | 5719 +++++++++++++++++ examples/train.py | 94 +- flow/algorithms/centralized_PPO.py | 547 ++ flow/algorithms/custom_ppo.py | 318 + flow/controllers/car_following_models.py | 1 + flow/controllers/velocity_controllers.py | 84 +- flow/core/kernel/vehicle/base.py | 1 + flow/core/rewards.py | 4 +- flow/envs/base.py | 2 + flow/envs/multiagent/__init__.py | 1 + flow/envs/multiagent/base.py | 3 + flow/envs/multiagent/i210.py | 201 +- flow/networks/i210_subnetwork_ghost_cell.py | 162 + flow/visualize/time_space_diagram.py | 29 +- flow/visualize/visualizer_rllib.py | 27 +- scripts/ray_autoscale.yaml | 16 +- 23 files changed, 7550 insertions(+), 284 deletions(-) create mode 100644 examples/exp_configs/templates/sumo/i210_with_ghost_cell_with_downstream_test.xml create mode 100644 flow/algorithms/centralized_PPO.py create mode 100644 flow/algorithms/custom_ppo.py create mode 100644 flow/networks/i210_subnetwork_ghost_cell.py diff --git a/docs/source/flow_setup.rst b/docs/source/flow_setup.rst index 60734b7b1..cbe585d36 100644 --- a/docs/source/flow_setup.rst +++ b/docs/source/flow_setup.rst @@ -112,6 +112,22 @@ If you are a Mac user and the above command gives you the error ``FXApp:openDisplay: unable to open display :0.0``, make sure to open the application XQuartz. +*Troubleshooting*: +If you are a Mac user and the above command gives you the error +``Segmentation fault: 11``, make sure to reinstall ``fox`` using brew. +:: + + # Uninstall Catalina bottle of fox: + $ brew uninstall --ignore-dependencies fox + + # Edit brew Formula of fox: + $ brew edit fox + + # Comment out or delete the following line: sha256 "c6697be294c9a0458580564d59f8db32791beb5e67a05a6246e0b969ffc068bc" => :catalina + # Install Mojave bottle of fox: + $ brew install fox + + Testing your SUMO and Flow installation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/examples/exp_configs/non_rl/highway_single.py b/examples/exp_configs/non_rl/highway_single.py index 7e0a5eb49..0a9a6774b 100644 --- a/examples/exp_configs/non_rl/highway_single.py +++ b/examples/exp_configs/non_rl/highway_single.py @@ -1,4 +1,5 @@ """Example of an open network with human-driven vehicles and a wave.""" + import numpy as np from flow.controllers import IDMController @@ -10,8 +11,8 @@ from flow.core.params import VehicleParams from flow.core.params import SumoParams from flow.core.params import SumoLaneChangeParams +from flow.core.rewards import miles_per_gallon, miles_per_megajoule from flow.core.params import SumoCarFollowingParams -from flow.core.rewards import miles_per_megajoule from flow.networks import HighwayNetwork from flow.envs import TestEnv from flow.networks.highway import ADDITIONAL_NET_PARAMS @@ -23,7 +24,7 @@ # the inflow rate of vehicles TRAFFIC_FLOW = 2215 # the simulation time horizon (in steps) -HORIZON = 1500 +HORIZON = 1000 # whether to include noise in the car-following models INCLUDE_NOISE = True @@ -64,13 +65,13 @@ ), ) -inflows = InFlows() if PENETRATION_RATE > 0.0: vehicles.add( "av", + color='red', num_vehicles=0, - acceleration_controller=(FollowerStopper, {"v_des": 6.0}), + acceleration_controller=(FollowerStopper, {"v_des": 5.0, "control_length": [500, 2300]}), ) inflows = InFlows() @@ -145,5 +146,8 @@ env.k.vehicle.get_outflow_rate(120)), "miles_per_megajoule": lambda env: np.nan_to_num( miles_per_megajoule(env, env.k.vehicle.get_ids(), gain=1.0) + ), + "miles_per_gallon": lambda env: np.nan_to_num( + miles_per_gallon(env, env.k.vehicle.get_ids(), gain=1.0) ) } diff --git a/examples/exp_configs/non_rl/i210_subnetwork.py b/examples/exp_configs/non_rl/i210_subnetwork.py index 25565bb49..65131a6bd 100644 --- a/examples/exp_configs/non_rl/i210_subnetwork.py +++ b/examples/exp_configs/non_rl/i210_subnetwork.py @@ -2,8 +2,9 @@ import os import numpy as np -from flow.controllers import IDMController -from flow.controllers import I210Router +from flow.controllers.car_following_models import IDMController +from flow.controllers.velocity_controllers import FollowerStopper +from flow.controllers.routing_controllers import I210Router from flow.core.params import SumoParams from flow.core.params import EnvParams from flow.core.params import NetParams @@ -11,94 +12,181 @@ from flow.core.params import VehicleParams from flow.core.params import InitialConfig from flow.core.params import InFlows +from flow.core.rewards import miles_per_gallon, miles_per_megajoule + import flow.config as config from flow.envs import TestEnv -from flow.networks.i210_subnetwork import I210SubNetwork, EDGES_DISTRIBUTION -# =========================================================================== # -# Specify some configurable constants. # -# =========================================================================== # +# Instantiate which conditions we want to be true about the network -# whether to include the upstream ghost edge in the network +# whether to include a ghost cell at the entrance WANT_GHOST_CELL = True -# whether to include the downstream slow-down edge in the network -WANT_DOWNSTREAM_BOUNDARY = True # whether to include vehicles on the on-ramp -ON_RAMP = True -# the inflow rate of vehicles (in veh/hr) -INFLOW_RATE = 5 * 2215 +ON_RAMP = False +# fraction of vehicles that are follower-stoppers. 0.10 corresponds to 10% +PENETRATION_RATE = 0.0 +# desired speed of the follower stopper vehicles +V_DES = 5.0 +# horizon over which to run the env +HORIZON = 1000 +# steps to run before follower-stopper is allowed to take control +WARMUP_STEPS = 600 + +# Number of vehicles/hour/lane +inflow_rate = 2050 # the speed of inflowing vehicles from the main edge (in m/s) -INFLOW_SPEED = 24.1 +inflow_speed = 25.5 -# =========================================================================== # -# Specify the path to the network template. # -# =========================================================================== # +accel_data = (IDMController, {'a': 1.3, 'b': 2.0, 'noise': 0.3}) + +if WANT_GHOST_CELL: + from flow.networks.i210_subnetwork_ghost_cell import I210SubNetworkGhostCell, EDGES_DISTRIBUTION -if WANT_DOWNSTREAM_BOUNDARY: - net_template = os.path.join( - config.PROJECT_PATH, - "examples/exp_configs/templates/sumo/i210_with_ghost_cell_with_" - "downstream.xml") -elif WANT_GHOST_CELL: - net_template = os.path.join( - config.PROJECT_PATH, - "examples/exp_configs/templates/sumo/i210_with_ghost_cell.xml") + highway_start_edge = 'ghost0' else: - net_template = os.path.join( - config.PROJECT_PATH, - "examples/exp_configs/templates/sumo/test2.net.xml") + from flow.networks.i210_subnetwork import I210SubNetwork, EDGES_DISTRIBUTION -# If the ghost cell is not being used, remove it from the initial edges that -# vehicles can be placed on. -edges_distribution = EDGES_DISTRIBUTION.copy() -if not WANT_GHOST_CELL: - edges_distribution.remove("ghost0") - -# =========================================================================== # -# Specify vehicle-specific information and inflows. # -# =========================================================================== # + highway_start_edge = "119257914" vehicles = VehicleParams() -vehicles.add( - "human", - num_vehicles=0, - lane_change_params=SumoLaneChangeParams( - lane_change_mode="strategic", - ), - acceleration_controller=(IDMController, { - "a": 1.3, - "b": 2.0, - "noise": 0.3, - }), - routing_controller=(I210Router, {}) if ON_RAMP else None, -) inflow = InFlows() -# main highway -inflow.add( - veh_type="human", - edge="ghost0" if WANT_GHOST_CELL else "119257914", - vehs_per_hour=INFLOW_RATE, - departLane="best", - departSpeed=INFLOW_SPEED) -# on ramp + if ON_RAMP: + vehicles.add( + "human", + num_vehicles=0, + color="white", + lane_change_params=SumoLaneChangeParams( + lane_change_mode="strategic", + ), + acceleration_controller=accel_data, + routing_controller=(I210Router, {}) + ) + if PENETRATION_RATE > 0.0: + vehicles.add( + "av", + num_vehicles=0, + color="red", + acceleration_controller=(FollowerStopper, {"v_des": V_DES, + "no_control_edges": ["ghost0", "119257908#3"] + }), + routing_controller=(I210Router, {}) + ) + + # inflow.add( + # veh_type="human", + # edge=highway_start_edge, + # vehs_per_hour=inflow_rate, + # departLane="best", + # departSpeed=inflow_speed) + + lane_list = ['0', '1', '2', '3', '4'] + + for lane in lane_list: + inflow.add( + veh_type="human", + edge=highway_start_edge, + vehs_per_hour=int(inflow_rate * (1 - PENETRATION_RATE)), + departLane=lane, + departSpeed=inflow_speed) + inflow.add( veh_type="human", edge="27414345", - vehs_per_hour=500, + vehs_per_hour=int(500 * (1 - PENETRATION_RATE)), departLane="random", departSpeed=10) inflow.add( veh_type="human", edge="27414342#0", - vehs_per_hour=500, + vehs_per_hour=int(500 * (1 - PENETRATION_RATE)), departLane="random", departSpeed=10) -# =========================================================================== # -# Generate the flow_params dict with all relevant simulation information. # -# =========================================================================== # + if PENETRATION_RATE > 0.0: + for lane in lane_list: + inflow.add( + veh_type="av", + edge=highway_start_edge, + vehs_per_hour=int(inflow_rate * PENETRATION_RATE), + departLane=lane, + departSpeed=inflow_speed) + + inflow.add( + veh_type="av", + edge="27414345", + vehs_per_hour=int(500 * PENETRATION_RATE), + departLane="random", + departSpeed=10) + inflow.add( + veh_type="av", + edge="27414342#0", + vehs_per_hour=int(500 * PENETRATION_RATE), + departLane="random", + departSpeed=10) + +else: + # create the base vehicle type that will be used for inflows + vehicles.add( + "human", + num_vehicles=0, + lane_change_params=SumoLaneChangeParams( + lane_change_mode="strategic", + ), + acceleration_controller=accel_data, + ) + if PENETRATION_RATE > 0.0: + vehicles.add( + "av", + color="red", + num_vehicles=0, + acceleration_controller=(FollowerStopper, {"v_des": V_DES, + "no_control_edges": ["ghost0", "119257908#3"] + }), + ) + + # If you want to turn off the fail safes uncomment this: + + # vehicles.add( + # 'human', + # num_vehicles=0, + # lane_change_params=SumoLaneChangeParams( + # lane_change_mode='strategic', + # ), + # acceleration_controller=accel_data, + # car_following_params=SumoCarFollowingParams(speed_mode='19') + # ) + + lane_list = ['0', '1', '2', '3', '4'] + + for lane in lane_list: + inflow.add( + veh_type="human", + edge=highway_start_edge, + vehs_per_hour=int(inflow_rate * (1 - PENETRATION_RATE)), + departLane=lane, + departSpeed=inflow_speed) + + if PENETRATION_RATE > 0.0: + for lane in lane_list: + inflow.add( + veh_type="av", + edge=highway_start_edge, + vehs_per_hour=int(inflow_rate * PENETRATION_RATE), + departLane=lane, + departSpeed=inflow_speed) + +network_xml_file = "examples/exp_configs/templates/sumo/i210_with_ghost_cell_with_downstream_test.xml" + +# network_xml_file = "examples/exp_configs/templates/sumo/i210_with_congestion.xml" + +NET_TEMPLATE = os.path.join(config.PROJECT_PATH, network_xml_file) + +if WANT_GHOST_CELL: + network = I210SubNetworkGhostCell +else: + network = I210SubNetwork flow_params = dict( # name of the experiment @@ -108,7 +196,7 @@ env_name=TestEnv, # name of the network class the experiment is running on - network=I210SubNetwork, + network=network, # simulator that is used by the experiment simulator='traci', @@ -117,24 +205,23 @@ sim=SumoParams( sim_step=0.4, render=False, - color_by_speed=True, + color_by_speed=False, use_ballistic=True ), # environment related parameters (see flow.core.params.EnvParams) env=EnvParams( - horizon=10000, + horizon=HORIZON, + warmup_steps=WARMUP_STEPS, + sims_per_step=3 ), # network-related parameters (see flow.core.params.NetParams and the # network's documentation or ADDITIONAL_NET_PARAMS component) net=NetParams( inflows=inflow, - template=net_template, - additional_params={ - "on_ramp": ON_RAMP, - "ghost_edge": WANT_GHOST_CELL, - } + template=NET_TEMPLATE, + additional_params={"on_ramp": ON_RAMP, "ghost_edge": WANT_GHOST_CELL} ), # vehicles to be placed in the network at the start of a rollout (see @@ -144,7 +231,7 @@ # parameters specifying the positioning of vehicles upon initialization/ # reset (see flow.core.params.InitialConfig) initial=InitialConfig( - edges_distribution=edges_distribution, + edges_distribution=EDGES_DISTRIBUTION, ), ) @@ -153,14 +240,20 @@ # =========================================================================== # edge_id = "119257908#1-AddedOnRampEdge" + +def valid_ids(env, veh_ids): + return [veh_id for veh_id in veh_ids if env.k.vehicle.get_edge(veh_id) not in ["ghost0", "119257908#3"]] + custom_callables = { "avg_merge_speed": lambda env: np.nan_to_num(np.mean( - env.k.vehicle.get_speed(env.k.vehicle.get_ids()))), + env.k.vehicle.get_speed(valid_ids(env, env.k.vehicle.get_ids())))), "avg_outflow": lambda env: np.nan_to_num( env.k.vehicle.get_outflow_rate(120)), - # we multiply by 5 to account for the vehicle length and by 1000 to convert - # into veh/km - "avg_density": lambda env: 5 * 1000 * len(env.k.vehicle.get_ids_by_edge( - edge_id)) / (env.k.network.edge_length(edge_id) - * env.k.network.num_lanes(edge_id)), + # # we multiply by 5 to account for the vehicle length and by 1000 to convert + # # into veh/km + # "avg_density": lambda env: 5 * 1000 * len(env.k.vehicle.get_ids_by_edge( + # edge_id)) / (env.k.network.edge_length(edge_id) + # * env.k.network.num_lanes(edge_id)), + "mpg": lambda env: miles_per_gallon(env, valid_ids(env, env.k.vehicle.get_ids()), gain=1.0), + "mpj": lambda env: miles_per_megajoule(env, valid_ids(env, env.k.vehicle.get_ids()), gain=1.0), } diff --git a/examples/exp_configs/non_rl/straight_road.py b/examples/exp_configs/non_rl/straight_road.py index c557ce836..1669bb896 100644 --- a/examples/exp_configs/non_rl/straight_road.py +++ b/examples/exp_configs/non_rl/straight_road.py @@ -9,6 +9,7 @@ from flow.controllers.velocity_controllers import FollowerStopper from flow.core.params import EnvParams, NetParams, InitialConfig, InFlows, \ VehicleParams, SumoParams, SumoLaneChangeParams +from flow.core.rewards import miles_per_gallon from flow.networks import HighwayNetwork from flow.envs import TestEnv from flow.networks.highway import ADDITIONAL_NET_PARAMS @@ -58,7 +59,7 @@ vehicles.add( "av", num_vehicles=0, - acceleration_controller=(FollowerStopper, {"v_des": 18.0}), + acceleration_controller=(FollowerStopper, {"v_des": 12.0}), ) # add human vehicles on the highway @@ -98,7 +99,7 @@ # environment related parameters (see flow.core.params.EnvParams) env=EnvParams( horizon=HORIZON, - warmup_steps=0, + warmup_steps=400, sims_per_step=1, ), @@ -128,4 +129,6 @@ custom_callables = { "avg_speed": lambda env: np.nan_to_num(np.mean( env.k.vehicle.get_speed(env.k.vehicle.get_ids_by_edge(['highway_0', 'highway_1'])))), + "mpg": lambda env: miles_per_gallon(env, env.k.vehicle.get_ids(), gain=1.0) + } diff --git a/examples/exp_configs/rl/multiagent/multiagent_i210.py b/examples/exp_configs/rl/multiagent/multiagent_i210.py index 01b9e6082..f55917e49 100644 --- a/examples/exp_configs/rl/multiagent/multiagent_i210.py +++ b/examples/exp_configs/rl/multiagent/multiagent_i210.py @@ -9,6 +9,7 @@ from ray.tune.registry import register_env from flow.controllers import RLController +from flow.controllers.routing_controllers import I210Router from flow.controllers.car_following_models import IDMController import flow.config as config from flow.core.params import EnvParams @@ -25,20 +26,32 @@ from flow.utils.registry import make_create_env # SET UP PARAMETERS FOR THE SIMULATION +WANT_GHOST_CELL = True +# WANT_DOWNSTREAM_BOUNDARY = True +ON_RAMP = False +PENETRATION_RATE = 0.10 +V_DES = 7.0 +HORIZON = 1000 +WARMUP_STEPS = 600 -# number of steps per rollout -HORIZON = 2000 +inflow_rate = 2050 +inflow_speed = 25.5 + +accel_data = (IDMController, {'a': 1.3, 'b': 2.0, 'noise': 0.3}) VEH_PER_HOUR_BASE_119257914 = 10800 VEH_PER_HOUR_BASE_27414345 = 321 VEH_PER_HOUR_BASE_27414342 = 421 -# percentage of autonomous vehicles compared to human vehicles on highway -PENETRATION_RATE = 10 +if WANT_GHOST_CELL: + from flow.networks.i210_subnetwork_ghost_cell import I210SubNetworkGhostCell, EDGES_DISTRIBUTION -# TODO: temporary fix -edges_distribution = EDGES_DISTRIBUTION.copy() -edges_distribution.remove("ghost0") + edges_distribution = EDGES_DISTRIBUTION + highway_start_edge = 'ghost0' +else: + from flow.networks.i210_subnetwork import I210SubNetwork, EDGES_DISTRIBUTION + edges_distribution = EDGES_DISTRIBUTION + highway_start_edge = "119257914" # SET UP PARAMETERS FOR THE ENVIRONMENT additional_env_params = ADDITIONAL_ENV_PARAMS.copy() @@ -49,84 +62,180 @@ 'lead_obs': True, # whether to add in a reward for the speed of nearby vehicles "local_reward": True, + # whether to use the MPG reward. Otherwise, defaults to a target velocity reward + "mpg_reward": False, + # whether to use the MPJ reward. Otherwise, defaults to a target velocity reward + "mpj_reward": False, + # how many vehicles to look back for the MPG reward + "look_back_length": 1, # whether to reroute vehicles once they have exited "reroute_on_exit": True, - 'target_velocity': 18, + 'target_velocity': 8.0, + # how many AVs there can be at once (this is only for centralized critics) + "max_num_agents": 10, + # which edges we shouldn't apply control on + "no_control_edges": ["ghost0", "119257908#3"], + + # whether to add a slight reward for opening up a gap that will be annealed out N iterations in + "headway_curriculum": False, + # how many timesteps to anneal the headway curriculum over + "headway_curriculum_iters": 100, + # weight of the headway reward + "headway_reward_gain": 2.0, + # desired time headway + "min_time_headway": 2.0, + + # whether to add a slight reward for traveling at a desired speed + "speed_curriculum": True, + # how many timesteps to anneal the headway curriculum over + "speed_curriculum_iters": 20, + # weight of the headway reward + "speed_reward_gain": 0.5, + # penalize stopped vehicles + "penalize_stops": True, + + # penalize accels + "penalize_accel": True }) # CREATE VEHICLE TYPES AND INFLOWS # no vehicles in the network vehicles = VehicleParams() -vehicles.add( - "human", - num_vehicles=0, - lane_change_params=SumoLaneChangeParams(lane_change_mode="strategic"), - acceleration_controller=(IDMController, {"a": .3, "b": 2.0, "noise": 0.5}), - car_following_params=SumoCarFollowingParams(speed_mode="no_collide"), -) -vehicles.add( - "av", - acceleration_controller=(RLController, {}), - num_vehicles=0, - color='red' -) inflow = InFlows() -# main highway -pen_rate = PENETRATION_RATE / 100 -assert pen_rate < 1.0, "your penetration rate is over 100%" -assert pen_rate > 0.0, "your penetration rate should be above zero" -inflow.add( - veh_type="human", - edge="119257914", - vehs_per_hour=int(VEH_PER_HOUR_BASE_119257914 * (1 - pen_rate)), - # probability=1.0, - depart_lane="random", - departSpeed=20) -# # on ramp -# inflow.add( -# veh_type="human", -# edge="27414345", -# vehs_per_hour=321 * pen_rate, -# depart_lane="random", -# depart_speed=20) -# inflow.add( -# veh_type="human", -# edge="27414342#0", -# vehs_per_hour=421 * pen_rate, -# depart_lane="random", -# depart_speed=20) - -# Now add the AVs -# main highway -inflow.add( - veh_type="av", - edge="119257914", - vehs_per_hour=int(VEH_PER_HOUR_BASE_119257914 * pen_rate), - # probability=1.0, - depart_lane="random", - depart_speed=20) -# # on ramp -# inflow.add( -# veh_type="av", -# edge="27414345", -# vehs_per_hour=int(VEH_PER_HOUR_BASE_27414345 * pen_rate), -# depart_lane="random", -# depart_speed=20) -# inflow.add( -# veh_type="av", -# edge="27414342#0", -# vehs_per_hour=int(VEH_PER_HOUR_BASE_27414342 * pen_rate), -# depart_lane="random", -# depart_speed=20) - -NET_TEMPLATE = os.path.join( - config.PROJECT_PATH, - "examples/exp_configs/templates/sumo/test2.net.xml") - -warmup_steps = 0 -if additional_env_params['reroute_on_exit']: - warmup_steps = 400 + +if ON_RAMP: + vehicles.add( + "human", + num_vehicles=0, + color="white", + lane_change_params=SumoLaneChangeParams( + lane_change_mode="strategic", + ), + acceleration_controller=accel_data, + routing_controller=(I210Router, {}) + ) + if PENETRATION_RATE > 0.0: + vehicles.add( + "av", + num_vehicles=0, + color="red", + acceleration_controller=(RLController, {}), + routing_controller=(I210Router, {}) + ) + + # inflow.add( + # veh_type="human", + # edge=highway_start_edge, + # vehs_per_hour=inflow_rate, + # departLane="best", + # departSpeed=inflow_speed) + + lane_list = ['0', '1', '2', '3', '4'] + + for lane in lane_list: + inflow.add( + veh_type="human", + edge=highway_start_edge, + vehs_per_hour=int(inflow_rate * (1 - PENETRATION_RATE)), + departLane=lane, + departSpeed=inflow_speed) + + inflow.add( + veh_type="human", + edge="27414345", + vehs_per_hour=int(500 * (1 - PENETRATION_RATE)), + departLane="random", + departSpeed=10) + inflow.add( + veh_type="human", + edge="27414342#0", + vehs_per_hour=int(500 * (1 - PENETRATION_RATE)), + departLane="random", + departSpeed=10) + + if PENETRATION_RATE > 0.0: + for lane in lane_list: + inflow.add( + veh_type="av", + edge=highway_start_edge, + vehs_per_hour=int(inflow_rate * PENETRATION_RATE), + departLane=lane, + departSpeed=inflow_speed) + + inflow.add( + veh_type="av", + edge="27414345", + vehs_per_hour=int(500 * PENETRATION_RATE), + departLane="random", + departSpeed=10) + inflow.add( + veh_type="av", + edge="27414342#0", + vehs_per_hour=int(500 * PENETRATION_RATE), + departLane="random", + departSpeed=10) + +else: + # create the base vehicle type that will be used for inflows + vehicles.add( + "human", + num_vehicles=0, + lane_change_params=SumoLaneChangeParams( + lane_change_mode="strategic", + ), + acceleration_controller=accel_data, + ) + if PENETRATION_RATE > 0.0: + vehicles.add( + "av", + color="red", + num_vehicles=0, + acceleration_controller=(RLController, {}), + ) + + # If you want to turn off the fail safes uncomment this: + + # vehicles.add( + # 'human', + # num_vehicles=0, + # lane_change_params=SumoLaneChangeParams( + # lane_change_mode='strategic', + # ), + # acceleration_controller=accel_data, + # car_following_params=SumoCarFollowingParams(speed_mode='19') + # ) + + lane_list = ['0', '1', '2', '3', '4'] + + for lane in lane_list: + inflow.add( + veh_type="human", + edge=highway_start_edge, + vehs_per_hour=int(inflow_rate * (1 - PENETRATION_RATE)), + departLane=lane, + departSpeed=inflow_speed) + + if PENETRATION_RATE > 0.0: + for lane in lane_list: + inflow.add( + veh_type="av", + edge=highway_start_edge, + vehs_per_hour=int(inflow_rate * PENETRATION_RATE), + departLane=lane, + departSpeed=inflow_speed) + + +network_xml_file = "examples/exp_configs/templates/sumo/i210_with_ghost_cell_with_downstream_test.xml" + +# network_xml_file = "examples/exp_configs/templates/sumo/i210_with_congestion.xml" + +NET_TEMPLATE = os.path.join(config.PROJECT_PATH, network_xml_file) + +if WANT_GHOST_CELL: + network = I210SubNetworkGhostCell +else: + network = I210SubNetwork flow_params = dict( # name of the experiment @@ -136,14 +245,14 @@ env_name=I210MultiEnv, # name of the network class the experiment is running on - network=I210SubNetwork, + network=network, # simulator that is used by the experiment simulator='traci', # simulation-related parameters sim=SumoParams( - sim_step=0.5, + sim_step=0.4, render=False, color_by_speed=False, restart_instance=True, @@ -154,8 +263,8 @@ # environment related parameters (see flow.core.params.EnvParams) env=EnvParams( horizon=HORIZON, - sims_per_step=1, - warmup_steps=warmup_steps, + sims_per_step=3, + warmup_steps=WARMUP_STEPS, additional_params=additional_env_params, done_at_exit=False ), @@ -166,8 +275,8 @@ inflows=inflow, template=NET_TEMPLATE, additional_params={ - "on_ramp": False, - "ghost_edge": False + "on_ramp": ON_RAMP, + "ghost_edge": WANT_GHOST_CELL } ), diff --git a/examples/exp_configs/rl/multiagent/multiagent_straight_road.py b/examples/exp_configs/rl/multiagent/multiagent_straight_road.py index ec71a2f42..5816d3fe7 100644 --- a/examples/exp_configs/rl/multiagent/multiagent_straight_road.py +++ b/examples/exp_configs/rl/multiagent/multiagent_straight_road.py @@ -6,14 +6,13 @@ from flow.controllers import RLController, IDMController from flow.core.params import EnvParams, NetParams, InitialConfig, InFlows, \ VehicleParams, SumoParams, SumoLaneChangeParams, SumoCarFollowingParams -from flow.envs.ring.accel import ADDITIONAL_ENV_PARAMS from flow.networks import HighwayNetwork +from flow.envs.ring.accel import ADDITIONAL_ENV_PARAMS from flow.envs.multiagent import MultiStraightRoad from flow.networks.highway import ADDITIONAL_NET_PARAMS from flow.utils.registry import make_create_env from ray.tune.registry import register_env - # SET UP PARAMETERS FOR THE SIMULATION # the speed of vehicles entering the network @@ -23,7 +22,7 @@ # the inflow rate of vehicles HIGHWAY_INFLOW_RATE = 2215 # the simulation time horizon (in steps) -HORIZON = 1500 +HORIZON = 1000 # whether to include noise in the car-following models INCLUDE_NOISE = True @@ -54,11 +53,42 @@ additional_env_params.update({ 'max_accel': 2.6, 'max_decel': 4.5, - 'target_velocity': 18, + 'target_velocity': 6.0, 'local_reward': True, 'lead_obs': True, + 'control_range': [500, 2300], # whether to reroute vehicles once they have exited - "reroute_on_exit": True + "reroute_on_exit": True, + # whether to use the MPG reward. Otherwise, defaults to a target velocity reward + "mpg_reward": False, + # whether to use the joules reward. Otherwise, defaults to a target velocity reward + "mpj_reward": False, + # how many vehicles to look back for the MPG reward + "look_back_length": 3, + # how many AVs there can be at once (this is only for centralized critics) + "max_num_agents": 10, + + # whether to add a slight reward for opening up a gap that will be annealed out N iterations in + "headway_curriculum": False, + # how many timesteps to anneal the headway curriculum over + "headway_curriculum_iters": 100, + # weight of the headway reward + "headway_reward_gain": 2.0, + # desired time headway + "min_time_headway": 2.0, + + # whether to add a slight reward for traveling at a desired speed + "speed_curriculum": True, + # how many timesteps to anneal the headway curriculum over + "speed_curriculum_iters": 20, + # weight of the headway reward + "speed_reward_gain": 1.0, + + # penalize stopped vehicles + "penalize_stops": True, + + # penalize accels + "penalize_accel": True }) @@ -66,8 +96,6 @@ vehicles = VehicleParams() inflows = InFlows() - -# human vehicles vehicles.add( "human", acceleration_controller=(IDMController, { @@ -96,7 +124,7 @@ edge="highway_0", vehs_per_hour=int(HIGHWAY_INFLOW_RATE * (1 - PENETRATION_RATE / 100)), depart_lane="free", - depart_speed="23.0", + depart_speed=TRAFFIC_SPEED, name="idm_highway_inflow") # add autonomous vehicles on the highway @@ -106,13 +134,13 @@ edge="highway_0", vehs_per_hour=int(HIGHWAY_INFLOW_RATE * (PENETRATION_RATE / 100)), depart_lane="free", - depart_speed="23.0", + depart_speed=TRAFFIC_SPEED, name="rl_highway_inflow") # SET UP FLOW PARAMETERS warmup_steps = 0 if additional_env_params['reroute_on_exit']: - warmup_steps = 400 + warmup_steps = 500 flow_params = dict( # name of the experiment @@ -131,16 +159,16 @@ env=EnvParams( horizon=HORIZON, warmup_steps=warmup_steps, - sims_per_step=1, # do not put more than one - additional_params=additional_env_params, + sims_per_step=3, + additional_params=additional_env_params ), # sumo-related parameters (see flow.core.params.SumoParams) sim=SumoParams( - sim_step=0.5, + sim_step=0.4, render=False, - use_ballistic=True, - restart_instance=True + restart_instance=True, + use_ballistic=True ), # network-related parameters (see flow.core.params.NetParams and the diff --git a/examples/exp_configs/templates/sumo/i210_with_ghost_cell_with_downstream.xml b/examples/exp_configs/templates/sumo/i210_with_ghost_cell_with_downstream.xml index 10d4d8d45..b9b2db479 100644 --- a/examples/exp_configs/templates/sumo/i210_with_ghost_cell_with_downstream.xml +++ b/examples/exp_configs/templates/sumo/i210_with_ghost_cell_with_downstream.xml @@ -3501,11 +3501,11 @@ - - - - - + + + + + diff --git a/examples/exp_configs/templates/sumo/i210_with_ghost_cell_with_downstream_test.xml b/examples/exp_configs/templates/sumo/i210_with_ghost_cell_with_downstream_test.xml new file mode 100644 index 000000000..ee508b730 --- /dev/null +++ b/examples/exp_configs/templates/sumo/i210_with_ghost_cell_with_downstream_test.xml @@ -0,0 +1,5719 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/examples/train.py b/examples/train.py index 1689d846f..74a6cd71a 100644 --- a/examples/train.py +++ b/examples/train.py @@ -21,17 +21,19 @@ from stable_baselines.common.vec_env import DummyVecEnv, SubprocVecEnv from stable_baselines import PPO2 except ImportError: - print("Stable-baselines not installed") + print("Stable-baselines not installed. Please install it if you need it.") +import ray from ray import tune from ray.rllib.env.group_agents_wrapper import _GroupAgentsWrapper try: from ray.rllib.agents.agent import get_agent_class except ImportError: from ray.rllib.agents.registry import get_agent_class +from ray.tune.registry import register_env from flow.core.util import ensure_dir -from flow.core.rewards import energy_consumption +from flow.core.rewards import miles_per_gallon, miles_per_megajoule from flow.utils.registry import env_constructor from flow.utils.rllib import FlowParamsEncoder, get_flow_params from flow.utils.registry import make_create_env @@ -58,7 +60,7 @@ def parse_args(args): parser.add_argument( 'exp_title', type=str, - help='Title to give the run.') + help='Name of experiment that results will be stored in') # optional input parameters parser.add_argument( @@ -66,7 +68,8 @@ def parse_args(args): help='the RL trainer to use. either rllib or Stable-Baselines') parser.add_argument( '--algorithm', type=str, default="PPO", - help='RL algorithm to use. Options are PPO, TD3, MATD3 (MADDPG w/ TD3) right now.' + help='RL algorithm to use. Options are PPO, TD3, and CENTRALIZEDPPO (which uses a centralized value function)' + ' right now.' ) parser.add_argument( '--num_cpus', type=int, default=1, @@ -172,37 +175,68 @@ def setup_exps_rllib(flow_params, dict training configuration parameters """ - from ray.tune.registry import register_env horizon = flow_params['env'].horizon alg_run = flags.algorithm.upper() if alg_run == "PPO": - agent_cls = get_agent_class(alg_run) - config = deepcopy(agent_cls._default_config) + from flow.algorithms.custom_ppo import CustomPPOTrainer + from ray.rllib.agents.ppo import DEFAULT_CONFIG + alg_run = CustomPPOTrainer + config = deepcopy(DEFAULT_CONFIG) config["num_workers"] = n_cpus config["horizon"] = horizon - config["model"].update({"fcnet_hiddens": [32, 32, 32]}) + config["model"].update({"fcnet_hiddens": [32, 32]}) config["train_batch_size"] = horizon * n_rollouts - config["gamma"] = 0.999 # discount rate + config["gamma"] = 0.995 # discount rate config["use_gae"] = True config["lambda"] = 0.97 config["kl_target"] = 0.02 config["num_sgd_iter"] = 10 + if flags.grid_search: + config["lambda"] = tune.grid_search([0.5, 0.9]) + config["lr"] = tune.grid_search([5e-4, 5e-5]) + elif alg_run == "CENTRALIZEDPPO": + from flow.algorithms.centralized_PPO import CCTrainer, CentralizedCriticModel + from ray.rllib.agents.ppo import DEFAULT_CONFIG + from ray.rllib.models import ModelCatalog + alg_run = CCTrainer + config = deepcopy(DEFAULT_CONFIG) + config['model']['custom_model'] = "cc_model" + config["model"]["custom_options"]["max_num_agents"] = flow_params['env'].additional_params['max_num_agents'] + config["model"]["custom_options"]["central_vf_size"] = 100 + + ModelCatalog.register_custom_model("cc_model", CentralizedCriticModel) + + config["num_workers"] = n_cpus + config["horizon"] = horizon + config["model"].update({"fcnet_hiddens": [32, 32]}) + config["train_batch_size"] = horizon * n_rollouts + config["gamma"] = 0.995 # discount rate + config["use_gae"] = True + config["lambda"] = 0.97 + config["kl_target"] = 0.02 + config["num_sgd_iter"] = 10 + if flags.grid_search: + config["lambda"] = tune.grid_search([0.5, 0.9]) + config["lr"] = tune.grid_search([5e-4, 5e-5]) + elif alg_run == "TD3": agent_cls = get_agent_class(alg_run) config = deepcopy(agent_cls._default_config) config["num_workers"] = n_cpus config["horizon"] = horizon + config["learning_starts"] = 10000 config["buffer_size"] = 20000 # reduced to test if this is the source of memory problems if flags.grid_search: config["prioritized_replay"] = tune.grid_search(['True', 'False']) config["actor_lr"] = tune.grid_search([1e-3, 1e-4]) config["critic_lr"] = tune.grid_search([1e-3, 1e-4]) config["n_step"] = tune.grid_search([1, 10]) + else: sys.exit("We only support PPO, TD3, right now.") @@ -210,27 +244,59 @@ def setup_exps_rllib(flow_params, def on_episode_start(info): episode = info["episode"] episode.user_data["avg_speed"] = [] + episode.user_data["avg_speed_avs"] = [] episode.user_data["avg_energy"] = [] + episode.user_data["avg_mpg"] = [] + episode.user_data["avg_mpj"] = [] + def on_episode_step(info): episode = info["episode"] env = info["env"].get_unwrapped()[0] if isinstance(env, _GroupAgentsWrapper): env = env.env - speed = np.mean([speed for speed in env.k.vehicle.get_speed(env.k.vehicle.get_ids()) if speed >= 0]) + if hasattr(env, 'no_control_edges'): + veh_ids = [veh_id for veh_id in env.k.vehicle.get_ids() if (env.k.vehicle.get_speed(veh_id) >= 0 + and env.k.vehicle.get_edge(veh_id) + not in env.no_control_edges)] + rl_ids = [veh_id for veh_id in env.k.vehicle.get_rl_ids() if (env.k.vehicle.get_speed(veh_id) >= 0 + and env.k.vehicle.get_edge(veh_id) + not in env.no_control_edges)] + else: + veh_ids = [veh_id for veh_id in env.k.vehicle.get_ids() if env.k.vehicle.get_speed(veh_id) >= 0] + rl_ids = [veh_id for veh_id in env.k.vehicle.get_rl_ids() if env.k.vehicle.get_speed(veh_id) >= 0] + + speed = np.mean([speed for speed in env.k.vehicle.get_speed(veh_ids)]) if not np.isnan(speed): episode.user_data["avg_speed"].append(speed) - episode.user_data["avg_energy"].append(energy_consumption(env)) + av_speed = np.mean([speed for speed in env.k.vehicle.get_speed(rl_ids) if speed >= 0]) + if not np.isnan(av_speed): + episode.user_data["avg_speed_avs"].append(av_speed) + episode.user_data["avg_mpg"].append(miles_per_gallon(env, veh_ids, gain=1.0)) + episode.user_data["avg_mpj"].append(miles_per_megajoule(env, veh_ids, gain=1.0)) + def on_episode_end(info): episode = info["episode"] avg_speed = np.mean(episode.user_data["avg_speed"]) episode.custom_metrics["avg_speed"] = avg_speed + avg_speed_avs = np.mean(episode.user_data["avg_speed_avs"]) + episode.custom_metrics["avg_speed_avs"] = avg_speed_avs episode.custom_metrics["avg_energy_per_veh"] = np.mean(episode.user_data["avg_energy"]) + episode.custom_metrics["avg_mpg_per_veh"] = np.mean(episode.user_data["avg_mpg"]) + episode.custom_metrics["avg_mpj_per_veh"] = np.mean(episode.user_data["avg_mpj"]) + + def on_train_result(info): + """Store the mean score of the episode, and increment or decrement how many adversaries are on""" + trainer = info["trainer"] + trainer.workers.foreach_worker( + lambda ev: ev.foreach_env( + lambda env: env.set_iteration_num())) config["callbacks"] = {"on_episode_start": tune.function(on_episode_start), "on_episode_step": tune.function(on_episode_step), - "on_episode_end": tune.function(on_episode_end)} + "on_episode_end": tune.function(on_episode_end), + "on_train_result": tune.function(on_train_result)} # save the flow params for replay flow_json = json.dumps( @@ -240,7 +306,6 @@ def on_episode_end(info): # multiagent configuration if policy_graphs is not None: - print("policy_graphs", policy_graphs) config['multiagent'].update({'policies': policy_graphs}) if policy_mapping_fn is not None: config['multiagent'].update({'policy_mapping_fn': tune.function(policy_mapping_fn)}) @@ -255,7 +320,6 @@ def on_episode_end(info): def train_rllib(submodule, flags): """Train policies using the PPO algorithm in RLlib.""" - import ray flow_params = submodule.flow_params flow_params['sim'].render = flags.render @@ -280,7 +344,7 @@ def trial_str_creator(trial): ray.init() exp_dict = { "run_or_experiment": alg_run, - "name": gym_name, + "name": flags.exp_title, "config": config, "checkpoint_freq": flags.checkpoint_freq, "checkpoint_at_end": True, diff --git a/flow/algorithms/centralized_PPO.py b/flow/algorithms/centralized_PPO.py new file mode 100644 index 000000000..8f3b9f261 --- /dev/null +++ b/flow/algorithms/centralized_PPO.py @@ -0,0 +1,547 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +"""An example of customizing PPO to leverage a centralized critic.""" + +import argparse +import numpy as np + +from gym.spaces import Dict + +from ray import tune +from ray.rllib.agents.ppo.ppo import PPOTrainer +from flow.algorithms.custom_ppo import CustomPPOTFPolicy +from ray.rllib.evaluation.postprocessing import compute_advantages, \ + Postprocessing +from ray.rllib.policy.sample_batch import SampleBatch +from ray.rllib.policy.tf_policy import LearningRateSchedule, \ + EntropyCoeffSchedule, ACTION_LOGP +from ray.rllib.models.modelv2 import ModelV2 +from ray.rllib.models.tf.tf_modelv2 import TFModelV2 +from ray.rllib.models.tf.recurrent_tf_modelv2 import RecurrentTFModelV2 +from ray.rllib.models.model import restore_original_dimensions +from ray.rllib.utils.annotations import override +from ray.rllib.models.tf.fcnet_v2 import FullyConnectedNetwork +from ray.rllib.utils.explained_variance import explained_variance +from ray.rllib.utils import try_import_tf + + +tf = try_import_tf() + +# Frozen logits of the policy that computed the action +BEHAVIOUR_LOGITS = "behaviour_logits" + +CENTRAL_OBS = "central_obs" +OPPONENT_ACTION = "opponent_action" + +parser = argparse.ArgumentParser() +parser.add_argument("--stop", type=int, default=100000) + +#TODOy + +class CentralizedCriticModel(TFModelV2): + """Multi-agent model that implements a centralized VF.""" + # TODO(@evinitsky) make this work with more than boxes + + def __init__(self, obs_space, action_space, num_outputs, model_config, + name): + super(CentralizedCriticModel, self).__init__( + obs_space, action_space, num_outputs, model_config, name) + # Base of the model + self.model = FullyConnectedNetwork(obs_space, action_space, + num_outputs, model_config, name) + self.register_variables(self.model.variables()) + + # Central VF maps (obs, opp_ops, opp_act) -> vf_pred + self.max_num_agents = model_config['custom_options']['max_num_agents'] + self.obs_space_shape = obs_space.shape[0] + self.obs_space = obs_space + other_obs = tf.keras.layers.Input(shape=(obs_space.shape[0] * self.max_num_agents, ), name="central_obs") + central_vf_dense = tf.keras.layers.Dense( + model_config['custom_options']['central_vf_size'], activation=tf.nn.tanh, name="c_vf_dense")(other_obs) + central_vf_out = tf.keras.layers.Dense( + 1, activation=None, name="c_vf_out")(central_vf_dense) + self.central_vf = tf.keras.Model( + inputs=[other_obs], outputs=central_vf_out) + self.register_variables(self.central_vf.variables) + + def forward(self, input_dict, state, seq_lens): + return self.model.forward(input_dict, state, seq_lens) + + def central_value_function(self, central_obs): + return tf.reshape( + self.central_vf( + [central_obs]), [-1]) + + def value_function(self): + return self.model.value_function() # not used + + +# TODO(@evinitsky) support recurrence +class CentralizedCriticModelRNN(RecurrentTFModelV2): + """Example of using the Keras functional API to define a RNN model.""" + + def __init__(self, + obs_space, + action_space, + num_outputs, + model_config, + name, + hiddens_size=64, + cell_size=64): + super(CentralizedCriticModelRNN, self).__init__(obs_space, action_space, num_outputs, + model_config, name) + self.cell_size = cell_size + + # Define input layers + input_layer = tf.keras.layers.Input( + shape=(None, obs_space.shape[0]), name="inputs") + state_in_h = tf.keras.layers.Input(shape=(cell_size, ), name="h") + state_in_c = tf.keras.layers.Input(shape=(cell_size, ), name="c") + seq_in = tf.keras.layers.Input(shape=(), name="seq_in") + + # Preprocess observation with a hidden layer and send to LSTM cell + dense1 = tf.keras.layers.Dense( + hiddens_size, activation=tf.nn.relu, name="dense1")(input_layer) + lstm_out, state_h, state_c = tf.keras.layers.LSTM( + cell_size, return_sequences=True, return_state=True, name="lstm")( + inputs=dense1, + mask=tf.sequence_mask(seq_in), + initial_state=[state_in_h, state_in_c]) + + # Postprocess LSTM output with another hidden layer and compute values + logits = tf.keras.layers.Dense( + self.num_outputs, + activation=tf.keras.activations.linear, + name="logits")(lstm_out) + values = tf.keras.layers.Dense( + 1, activation=None, name="values")(lstm_out) + + # Create the RNN model + self.model = tf.keras.Model( + inputs=[input_layer, seq_in, state_in_h, state_in_c], + outputs=[logits, values, state_h, state_c]) + self.register_variables(self.model.variables) + self.model.summary() + + #TODO(@evinitsky) add layer sharing to the VF + # Create the centralized VF + # Central VF maps (obs, opp_ops, opp_act) -> vf_pred + self.max_num_agents = model_config.get("max_num_agents", 120) + self.obs_space_shape = obs_space.shape[0] + other_obs = tf.keras.layers.Input(shape=(obs_space.shape[0] * self.max_num_agents,), name="all_agent_obs") + central_vf_dense = tf.keras.layers.Dense( + model_config.get("central_vf_size", 64), activation=tf.nn.tanh, name="c_vf_dense")(other_obs) + central_vf_dense2 = tf.keras.layers.Dense( + model_config.get("central_vf_size", 64), activation=tf.nn.tanh, name="c_vf_dense")(central_vf_dense) + central_vf_out = tf.keras.layers.Dense( + 1, activation=None, name="c_vf_out")(central_vf_dense2) + self.central_vf = tf.keras.Model( + inputs=[other_obs], outputs=central_vf_out) + self.register_variables(self.central_vf.variables) + + @override(RecurrentTFModelV2) + def forward_rnn(self, inputs, state, seq_lens): + model_out, self._value_out, h, c = self.model([inputs, seq_lens] + + state) + return model_out, [h, c] + + @override(ModelV2) + def get_initial_state(self): + return [ + np.zeros(self.cell_size, np.float32), + np.zeros(self.cell_size, np.float32), + ] + + def central_value_function(self, central_obs): + return tf.reshape( + self.central_vf( + [central_obs]), [-1]) + + def value_function(self): + return tf.reshape(self._value_out, [-1]) # not used + + +class CentralizedValueMixin(object): + """Add methods to evaluate the central value function from the model.""" + + def __init__(self): + # TODO(@evinitsky) clean up naming + self.central_value_function = self.model.central_value_function( + self.get_placeholder(CENTRAL_OBS) + ) + + def compute_central_vf(self, central_obs): + feed_dict = { + self.get_placeholder(CENTRAL_OBS): central_obs, + } + return self.get_session().run(self.central_value_function, feed_dict) + + +# Grabs the opponent obs/act and includes it in the experience train_batch, +# and computes GAE using the central vf predictions. +def centralized_critic_postprocessing(policy, + sample_batch, + other_agent_batches=None, + episode=None): + if policy.loss_initialized(): + assert other_agent_batches is not None + + # time_span = (sample_batch['t'][0], sample_batch['t'][-1]) + # # there's a new problem here, namely that a segment might not be continuous due to the rerouting + # other_agent_timespans = {agent_id: + # (other_agent_batches[agent_id][1]["t"][0], + # other_agent_batches[agent_id][1]["t"][-1]) + # for agent_id in other_agent_batches.keys()} + other_agent_times = {agent_id: other_agent_batches[agent_id][1]["t"] + for agent_id in other_agent_batches.keys()} + agent_time = sample_batch['t'] + # # find agents whose time overlaps with the current agent + rel_agents = {agent_id: other_agent_time for agent_id, other_agent_time in other_agent_times.items()} + # if len(rel_agents) > 0: + other_obs = {agent_id: + other_agent_batches[agent_id][1]["obs"].copy() + for agent_id in other_agent_batches.keys()} + # padded_agent_obs = {agent_id: + # overlap_and_pad_agent( + # time_span, + # rel_agent_time, + # other_obs[agent_id]) + # for agent_id, + # rel_agent_time in rel_agents.items()} + padded_agent_obs = {agent_id: + fill_missing( + agent_time, + other_agent_times[agent_id], + other_obs[agent_id]) + for agent_id, + rel_agent_time in rel_agents.items()} + # okay, now we need to stack and sort + central_obs_list = [padded_obs for padded_obs in padded_agent_obs.values()] + try: + central_obs_batch = np.hstack((sample_batch["obs"], np.hstack(central_obs_list))) + except: + # TODO(@ev) this is a bug and needs to be fixed + central_obs_batch = sample_batch["obs"] + max_vf_agents = policy.model.max_num_agents + num_agents = len(rel_agents) + 1 + if num_agents < max_vf_agents: + diff = max_vf_agents - num_agents + zero_pad = np.zeros((central_obs_batch.shape[0], + policy.model.obs_space_shape * diff)) + central_obs_batch = np.hstack((central_obs_batch, + zero_pad)) + elif num_agents > max_vf_agents: + print("Too many agents!") + + # also record the opponent obs and actions in the trajectory + sample_batch[CENTRAL_OBS] = central_obs_batch + + # overwrite default VF prediction with the central VF + sample_batch[SampleBatch.VF_PREDS] = policy.compute_central_vf(sample_batch[CENTRAL_OBS]) + else: + # policy hasn't initialized yet, use zeros + #TODO(evinitsky) put in the right shape + obs_shape = sample_batch[SampleBatch.CUR_OBS].shape[1] + obs_shape = (1, obs_shape * (policy.model.max_num_agents)) + sample_batch[CENTRAL_OBS] = np.zeros(obs_shape) + # TODO(evinitsky) put in the right shape. Will break if actions aren't 1 + sample_batch[SampleBatch.VF_PREDS] = np.zeros(1, dtype=np.float32) + + completed = sample_batch["dones"][-1] + + # if not completed and policy.loss_initialized(): + # last_r = 0.0 + # else: + # next_state = [] + # for i in range(policy.num_state_tensors()): + # next_state.append([sample_batch["state_out_{}".format(i)][-1]]) + # last_r = policy.compute_central_vf(sample_batch[CENTRAL_OBS][-1][np.newaxis, ...])[0] + + batch = compute_advantages( + sample_batch, + 0.0, + policy.config["gamma"], + policy.config["lambda"], + use_gae=policy.config["use_gae"]) + return batch + + + +def time_overlap(time_span, agent_time): + """Check if agent_time overlaps with time_span""" + if agent_time[0] <= time_span[1] and agent_time[1] >= time_span[0]: + return True + else: + return False + + +def fill_missing(agent_time, other_agent_time, obs): + # shortcut, the two overlap perfectly + if np.sum(agent_time == other_agent_time) == agent_time.shape[0]: + return obs + new_obs = np.zeros((agent_time.shape[0], obs.shape[1])) + other_agent_time_set = set(other_agent_time) + for i, time in enumerate(agent_time): + if time in other_agent_time_set: + new_obs[i] = obs[np.where(other_agent_time == time)] + return new_obs + + +def overlap_and_pad_agent(time_span, agent_time, obs): + """take the part of obs that overlaps, pad to length time_span + Arguments: + time_span (tuple): tuple of the first and last time that the agent + of interest is in the system + agent_time (tuple): tuple of the first and last time that the + agent whose obs we are padding is in the system + obs (np.ndarray): observations of the agent whose time is + agent_time + """ + assert time_overlap(time_span, agent_time) + print(time_span) + print(agent_time) + if time_span[0] == 7 or agent_time[0] == 7: + import ipdb; ipdb.set_trace() + # FIXME(ev) some of these conditions can be combined + # no padding needed + if agent_time[0] == time_span[0] and agent_time[1] == time_span[1]: + if obs.shape[0] < 200: + import ipdb; ipdb.set_trace() + return obs + # agent enters before time_span starts and exits before time_span end + if agent_time[0] < time_span[0] and agent_time[1] < time_span[1]: + non_overlap_time = time_span[0] - agent_time[0] + missing_time = time_span[1] - agent_time[1] + overlap_obs = obs[non_overlap_time:] + padding = np.zeros((missing_time, obs.shape[1])) + obs_concat = np.concatenate((overlap_obs, padding)) + if obs_concat.shape[0] < 200: + import ipdb; ipdb.set_trace() + return obs_concat + # agent enters after time_span starts and exits after time_span ends + elif agent_time[0] > time_span[0] and agent_time[1] > time_span[1]: + non_overlap_time = agent_time[1] - time_span[1] + overlap_obs = obs[:-non_overlap_time] + missing_time = agent_time[0] - time_span[0] + padding = np.zeros((missing_time, obs.shape[1])) + obs_concat = np.concatenate((padding, overlap_obs)) + if obs_concat.shape[0] < 200: + import ipdb; ipdb.set_trace() + return obs_concat + # agent time is entirely contained in time_span + elif agent_time[0] >= time_span[0] and agent_time[1] <= time_span[1]: + missing_left = agent_time[0] - time_span[0] + missing_right = time_span[1] - agent_time[1] + obs_concat = obs + if missing_left > 0: + padding = np.zeros((missing_left, obs.shape[1])) + obs_concat = np.concatenate((padding, obs_concat)) + if missing_right > 0: + padding = np.zeros((missing_right, obs.shape[1])) + obs_concat = np.concatenate((obs_concat, padding)) + if obs_concat.shape[0] < 200: + import ipdb; ipdb.set_trace() + return obs_concat + # agent time totally contains time_span + elif agent_time[0] <= time_span[0] and agent_time[1] >= time_span[1]: + non_overlap_left = time_span[0] - agent_time[0] + non_overlap_right = agent_time[1] - time_span[1] + overlap_obs = obs + if non_overlap_left > 0: + overlap_obs = overlap_obs[non_overlap_left:] + if non_overlap_right > 0: + overlap_obs = overlap_obs[:-non_overlap_right] + if overlap_obs.shape[0] < 200: + import ipdb; ipdb.set_trace() + return overlap_obs + + +# Copied from PPO but optimizing the central value function +def loss_with_central_critic(policy, model, dist_class, train_batch): + CentralizedValueMixin.__init__(policy) + + logits, state = model.from_batch(train_batch) + action_dist = dist_class(logits, model) + + policy.loss_obj = PPOLoss( + policy.action_space, + dist_class, + model, + train_batch[Postprocessing.VALUE_TARGETS], + train_batch[Postprocessing.ADVANTAGES], + train_batch[SampleBatch.ACTIONS], + train_batch[BEHAVIOUR_LOGITS], + train_batch[ACTION_LOGP], + train_batch[SampleBatch.VF_PREDS], + action_dist, + policy.central_value_function, + policy.kl_coeff, + tf.ones_like(train_batch[Postprocessing.ADVANTAGES], dtype=tf.bool), + entropy_coeff=policy.entropy_coeff, + clip_param=policy.config["clip_param"], + vf_clip_param=policy.config["vf_clip_param"], + vf_loss_coeff=policy.config["vf_loss_coeff"], + use_gae=policy.config["use_gae"], + model_config=policy.config["model"]) + + return policy.loss_obj.loss + + +class PPOLoss(object): + def __init__(self, + action_space, + dist_class, + model, + value_targets, + advantages, + actions, + prev_logits, + prev_actions_logp, + vf_preds, + curr_action_dist, + value_fn, + cur_kl_coeff, + valid_mask, + entropy_coeff=0, + clip_param=0.1, + vf_clip_param=0.1, + vf_loss_coeff=1.0, + use_gae=True, + model_config=None): + """Constructs the loss for Proximal Policy Objective. + + Arguments: + action_space: Environment observation space specification. + dist_class: action distribution class for logits. + value_targets (Placeholder): Placeholder for target values; used + for GAE. + actions (Placeholder): Placeholder for actions taken + from previous model evaluation. + advantages (Placeholder): Placeholder for calculated advantages + from previous model evaluation. + prev_logits (Placeholder): Placeholder for logits output from + previous model evaluation. + prev_actions_logp (Placeholder): Placeholder for prob output from + previous model evaluation. + vf_preds (Placeholder): Placeholder for value function output + from previous model evaluation. + curr_action_dist (ActionDistribution): ActionDistribution + of the current model. + value_fn (Tensor): Current value function output Tensor. + cur_kl_coeff (Variable): Variable holding the current PPO KL + coefficient. + valid_mask (Tensor): A bool mask of valid input elements (#2992). + entropy_coeff (float): Coefficient of the entropy regularizer. + clip_param (float): Clip parameter + vf_clip_param (float): Clip parameter for the value function + vf_loss_coeff (float): Coefficient of the value function loss + use_gae (bool): If true, use the Generalized Advantage Estimator. + model_config (dict): (Optional) model config for use in specifying + action distributions. + """ + + def reduce_mean_valid(t): + return tf.reduce_mean(tf.boolean_mask(t, valid_mask)) + + prev_dist = dist_class(prev_logits, model) + # Make loss functions. + logp_ratio = tf.exp(curr_action_dist.logp(actions) - prev_actions_logp) + action_kl = prev_dist.kl(curr_action_dist) + self.mean_kl = reduce_mean_valid(action_kl) + + curr_entropy = curr_action_dist.entropy() + self.mean_entropy = reduce_mean_valid(curr_entropy) + + surrogate_loss = tf.minimum( + advantages * logp_ratio, + advantages * tf.clip_by_value(logp_ratio, 1 - clip_param, + 1 + clip_param)) + self.mean_policy_loss = reduce_mean_valid(-surrogate_loss) + + if use_gae: + vf_loss1 = tf.square(value_fn - value_targets) + vf_clipped = vf_preds + tf.clip_by_value( + value_fn - vf_preds, -vf_clip_param, vf_clip_param) + vf_loss2 = tf.square(vf_clipped - value_targets) + vf_loss = tf.maximum(vf_loss1, vf_loss2) + self.mean_vf_loss = reduce_mean_valid(vf_loss) + loss = reduce_mean_valid( + -surrogate_loss + + vf_loss_coeff * vf_loss - entropy_coeff * curr_entropy) + else: + self.mean_vf_loss = tf.constant(0.0) + loss = reduce_mean_valid(-surrogate_loss - + entropy_coeff * curr_entropy) + self.loss = loss + + +def new_ppo_surrogate_loss(policy, model, dist_class, train_batch): + loss = loss_with_central_critic(policy, model, dist_class, train_batch) + return loss + + +class KLCoeffMixin(object): + def __init__(self, config): + # KL Coefficient + self.kl_coeff_val = config["kl_coeff"] + self.kl_target = config["kl_target"] + self.kl_coeff = tf.get_variable( + initializer=tf.constant_initializer(self.kl_coeff_val), + name="kl_coeff", + shape=(), + trainable=False, + dtype=tf.float32) + def update_kl(self, blah): + pass + + +def setup_mixins(policy, obs_space, action_space, config): + # copied from PPO + KLCoeffMixin.__init__(policy, config) + + EntropyCoeffSchedule.__init__(policy, config["entropy_coeff"], + config["entropy_coeff_schedule"]) + LearningRateSchedule.__init__(policy, config["lr"], config["lr_schedule"]) + # hack: put in a noop VF so some of the inherited PPO code runs + policy.value_function = tf.zeros( + tf.shape(policy.get_placeholder(SampleBatch.CUR_OBS))[0]) + + +def central_vf_stats(policy, train_batch, grads): + # Report the explained variance of the central value function. + return { + "vf_explained_var": explained_variance( + train_batch[Postprocessing.VALUE_TARGETS], + policy.central_value_function), + } + +def kl_and_loss_stats(policy, train_batch): + return { + "cur_kl_coeff": tf.cast(policy.kl_coeff, tf.float64), + "cur_lr": tf.cast(policy.cur_lr, tf.float64), + "total_loss": policy.loss_obj.loss, + "policy_loss": policy.loss_obj.mean_policy_loss, + "vf_loss": policy.loss_obj.mean_vf_loss, + "vf_explained_var": explained_variance( + train_batch[Postprocessing.VALUE_TARGETS], + policy.model.value_function()), + "vf_preds": train_batch[Postprocessing.VALUE_TARGETS], + "kl": policy.loss_obj.mean_kl, + "entropy": policy.loss_obj.mean_entropy, + "entropy_coeff": tf.cast(policy.entropy_coeff, tf.float64), + } + +CCPPO = CustomPPOTFPolicy.with_updates( + name="CCPPO", + postprocess_fn=centralized_critic_postprocessing, + loss_fn=new_ppo_surrogate_loss, + stats_fn=kl_and_loss_stats, + before_loss_init=setup_mixins, + grad_stats_fn=central_vf_stats, + mixins=[ + LearningRateSchedule, EntropyCoeffSchedule, + CentralizedValueMixin, KLCoeffMixin + ]) + +CCTrainer = PPOTrainer.with_updates(name="CCPPOTrainer", default_policy=CCPPO) \ No newline at end of file diff --git a/flow/algorithms/custom_ppo.py b/flow/algorithms/custom_ppo.py new file mode 100644 index 000000000..a98af6c2d --- /dev/null +++ b/flow/algorithms/custom_ppo.py @@ -0,0 +1,318 @@ +"""PPO but we add in the outflow after the reward to the final reward""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import logging + +import numpy as np +import ray +from ray.rllib.agents.ppo.ppo import PPOTrainer +from ray.rllib.evaluation.postprocessing import compute_advantages, \ + Postprocessing +from ray.rllib.policy.sample_batch import SampleBatch +from ray.rllib.policy.tf_policy import LearningRateSchedule, \ + EntropyCoeffSchedule, ACTION_LOGP +from ray.rllib.policy.tf_policy_template import build_tf_policy +from ray.rllib.utils.explained_variance import explained_variance +from ray.rllib.utils.tf_ops import make_tf_callable +from ray.rllib.utils import try_import_tf + +tf = try_import_tf() + +logger = logging.getLogger(__name__) + +# Frozen logits of the policy that computed the action +BEHAVIOUR_LOGITS = "behaviour_logits" + + +class PPOLoss(object): + def __init__(self, + action_space, + dist_class, + model, + value_targets, + advantages, + actions, + prev_logits, + prev_actions_logp, + vf_preds, + curr_action_dist, + value_fn, + cur_kl_coeff, + valid_mask, + entropy_coeff=0, + clip_param=0.1, + vf_clip_param=0.1, + vf_loss_coeff=1.0, + use_gae=True, + model_config=None): + """Constructs the loss for Proximal Policy Objective. + + Arguments: + action_space: Environment observation space specification. + dist_class: action distribution class for logits. + value_targets (Placeholder): Placeholder for target values; used + for GAE. + actions (Placeholder): Placeholder for actions taken + from previous model evaluation. + advantages (Placeholder): Placeholder for calculated advantages + from previous model evaluation. + prev_logits (Placeholder): Placeholder for logits output from + previous model evaluation. + prev_actions_logp (Placeholder): Placeholder for prob output from + previous model evaluation. + vf_preds (Placeholder): Placeholder for value function output + from previous model evaluation. + curr_action_dist (ActionDistribution): ActionDistribution + of the current model. + value_fn (Tensor): Current value function output Tensor. + cur_kl_coeff (Variable): Variable holding the current PPO KL + coefficient. + valid_mask (Tensor): A bool mask of valid input elements (#2992). + entropy_coeff (float): Coefficient of the entropy regularizer. + clip_param (float): Clip parameter + vf_clip_param (float): Clip parameter for the value function + vf_loss_coeff (float): Coefficient of the value function loss + use_gae (bool): If true, use the Generalized Advantage Estimator. + model_config (dict): (Optional) model config for use in specifying + action distributions. + """ + + def reduce_mean_valid(t): + return tf.reduce_mean(tf.boolean_mask(t, valid_mask)) + + prev_dist = dist_class(prev_logits, model) + # Make loss functions. + logp_ratio = tf.exp(curr_action_dist.logp(actions) - prev_actions_logp) + action_kl = prev_dist.kl(curr_action_dist) + self.mean_kl = reduce_mean_valid(action_kl) + + curr_entropy = curr_action_dist.entropy() + self.mean_entropy = reduce_mean_valid(curr_entropy) + + surrogate_loss = tf.minimum( + advantages * logp_ratio, + advantages * tf.clip_by_value(logp_ratio, 1 - clip_param, + 1 + clip_param)) + self.mean_policy_loss = reduce_mean_valid(-surrogate_loss) + + if use_gae: + vf_loss1 = tf.square(value_fn - value_targets) + vf_clipped = vf_preds + tf.clip_by_value( + value_fn - vf_preds, -vf_clip_param, vf_clip_param) + vf_loss2 = tf.square(vf_clipped - value_targets) + vf_loss = tf.maximum(vf_loss1, vf_loss2) + self.mean_vf_loss = reduce_mean_valid(vf_loss) + loss = reduce_mean_valid( + -surrogate_loss + + vf_loss_coeff * vf_loss - entropy_coeff * curr_entropy) + else: + self.mean_vf_loss = tf.constant(0.0) + loss = reduce_mean_valid(-surrogate_loss -entropy_coeff * curr_entropy) + self.loss = loss + + +def ppo_surrogate_loss(policy, model, dist_class, train_batch): + logits, state = model.from_batch(train_batch) + action_dist = dist_class(logits, model) + + if state: + max_seq_len = tf.reduce_max(train_batch["seq_lens"]) + mask = tf.sequence_mask(train_batch["seq_lens"], max_seq_len) + mask = tf.reshape(mask, [-1]) + else: + mask = tf.ones_like( + train_batch[Postprocessing.ADVANTAGES], dtype=tf.bool) + + policy.loss_obj = PPOLoss( + policy.action_space, + dist_class, + model, + train_batch[Postprocessing.VALUE_TARGETS], + train_batch[Postprocessing.ADVANTAGES], + train_batch[SampleBatch.ACTIONS], + train_batch[BEHAVIOUR_LOGITS], + train_batch[ACTION_LOGP], + train_batch[SampleBatch.VF_PREDS], + action_dist, + model.value_function(), + policy.kl_coeff, + mask, + entropy_coeff=policy.entropy_coeff, + clip_param=policy.config["clip_param"], + vf_clip_param=policy.config["vf_clip_param"], + vf_loss_coeff=policy.config["vf_loss_coeff"], + use_gae=policy.config["use_gae"], + model_config=policy.config["model"]) + + return policy.loss_obj.loss + + +def kl_and_loss_stats(policy, train_batch): + return { + "cur_kl_coeff": tf.cast(policy.kl_coeff, tf.float64), + "cur_lr": tf.cast(policy.cur_lr, tf.float64), + "total_loss": policy.loss_obj.loss, + "policy_loss": policy.loss_obj.mean_policy_loss, + "vf_loss": policy.loss_obj.mean_vf_loss, + "vf_explained_var": explained_variance( + train_batch[Postprocessing.VALUE_TARGETS], + policy.model.value_function()), + "vf_preds": train_batch[Postprocessing.VALUE_TARGETS], + "kl": policy.loss_obj.mean_kl, + "entropy": policy.loss_obj.mean_entropy, + "entropy_coeff": tf.cast(policy.entropy_coeff, tf.float64), + "advantages": train_batch[Postprocessing.ADVANTAGES], + "rewards": train_batch["rewards"] + } + + +def vf_preds_and_logits_fetches(policy): + """Adds value function and logits outputs to experience train_batches.""" + return { + SampleBatch.VF_PREDS: policy.model.value_function(), + BEHAVIOUR_LOGITS: policy.model.last_output(), + } + + +def postprocess_ppo_gae(policy, + sample_batch, + other_agent_batches=None, + episode=None): + """Adds the policy logits, VF preds, and advantages to the trajectory.""" + + completed = sample_batch["dones"][-1] + if completed: + last_r = 0.0 + else: + next_state = [] + for i in range(policy.num_state_tensors()): + next_state.append([sample_batch["state_out_{}".format(i)][-1]]) + last_r = policy._value(sample_batch[SampleBatch.NEXT_OBS][-1], + sample_batch[SampleBatch.ACTIONS][-1], + sample_batch[SampleBatch.REWARDS][-1], + *next_state) + + batch = compute_advantages( + sample_batch, + last_r, + policy.config["gamma"], + policy.config["lambda"], + use_gae=policy.config["use_gae"]) + return batch + + +def clip_gradients(policy, optimizer, loss): + variables = policy.model.trainable_variables() + if policy.config["grad_clip"] is not None: + grads_and_vars = optimizer.compute_gradients(loss, variables) + grads = [g for (g, v) in grads_and_vars] + policy.grads, _ = tf.clip_by_global_norm(grads, + policy.config["grad_clip"]) + clipped_grads = list(zip(policy.grads, variables)) + return clipped_grads + else: + return optimizer.compute_gradients(loss, variables) + + +class ValueNetworkMixin(object): + def __init__(self, obs_space, action_space, config): + if config["use_gae"]: + + @make_tf_callable(self.get_session()) + def value(ob, prev_action, prev_reward, *state): + model_out, _ = self.model({ + SampleBatch.CUR_OBS: tf.convert_to_tensor([ob]), + SampleBatch.PREV_ACTIONS: tf.convert_to_tensor( + [prev_action]), + SampleBatch.PREV_REWARDS: tf.convert_to_tensor( + [prev_reward]), + "is_training": tf.convert_to_tensor(False), + }, [tf.convert_to_tensor([s]) for s in state], + tf.convert_to_tensor([1])) + return self.model.value_function()[0] + + else: + + @make_tf_callable(self.get_session()) + def value(ob, prev_action, prev_reward, *state): + return tf.constant(0.0) + + self._value = value + + +def setup_config(policy, obs_space, action_space, config): + # auto set the model option for layer sharing + config["model"]["vf_share_layers"] = config["vf_share_layers"] + + +def setup_mixins(policy, obs_space, action_space, config): + KLCoeffMixin.__init__(policy, config) + ValueNetworkMixin.__init__(policy, obs_space, action_space, config) + EntropyCoeffSchedule.__init__(policy, config["entropy_coeff"], + config["entropy_coeff_schedule"]) + LearningRateSchedule.__init__(policy, config["lr"], config["lr_schedule"]) + + +class KLCoeffMixin(object): + def __init__(self, config): + # KL Coefficient + self.kl_coeff_val = config["kl_coeff"] + self.kl_target = config["kl_target"] + self.kl_coeff = tf.get_variable( + initializer=tf.constant_initializer(self.kl_coeff_val), + name="kl_coeff", + shape=(), + trainable=False, + dtype=tf.float32) + def update_kl(self, blah): + pass + + +CustomPPOTFPolicy = build_tf_policy( + name="CustomPPOTFPolicy", + get_default_config=lambda: ray.rllib.agents.ppo.ppo.DEFAULT_CONFIG, + loss_fn=ppo_surrogate_loss, + stats_fn=kl_and_loss_stats, + extra_action_fetches_fn=vf_preds_and_logits_fetches, + postprocess_fn=postprocess_ppo_gae, + gradients_fn=clip_gradients, + before_init=setup_config, + before_loss_init=setup_mixins, + mixins=[ + LearningRateSchedule, EntropyCoeffSchedule, + ValueNetworkMixin, KLCoeffMixin + ]) + +def validate_config(config): + if config["entropy_coeff"] < 0: + raise DeprecationWarning("entropy_coeff must be >= 0") + if isinstance(config["entropy_coeff"], int): + config["entropy_coeff"] = float(config["entropy_coeff"]) + if config["batch_mode"] == "truncate_episodes" and not config["use_gae"]: + raise ValueError( + "Episode truncation is not supported without a value " + "function. Consider setting batch_mode=complete_episodes.") + if config["multiagent"]["policies"] and not config["simple_optimizer"]: + logger.info( + "In multi-agent mode, policies will be optimized sequentially " + "by the multi-GPU optimizer. Consider setting " + "simple_optimizer=True if this doesn't work for you.") + if config["simple_optimizer"]: + logger.warning( + "Using the simple minibatch optimizer. This will significantly " + "reduce performance, consider simple_optimizer=False.") + elif tf and tf.executing_eagerly(): + config["simple_optimizer"] = True # multi-gpu not supported + +from ray.rllib.agents.trainer_template import build_trainer +from ray.rllib.agents.ppo.ppo import choose_policy_optimizer, DEFAULT_CONFIG, update_kl, \ + warn_about_bad_reward_scales +CustomPPOTrainer = build_trainer( + name="CustomPPOTrainer", + default_config=DEFAULT_CONFIG, + default_policy=CustomPPOTFPolicy, + make_policy_optimizer=choose_policy_optimizer, + validate_config=validate_config, + after_train_result=warn_about_bad_reward_scales) \ No newline at end of file diff --git a/flow/controllers/car_following_models.py b/flow/controllers/car_following_models.py index 42c9b2a9b..280c94d37 100755 --- a/flow/controllers/car_following_models.py +++ b/flow/controllers/car_following_models.py @@ -647,6 +647,7 @@ def __init__(self, def get_accel(self, env): """See parent class.""" + # without generating waves. lead_id = env.k.vehicle.get_leader(self.veh_id) if not lead_id: # no car ahead if self.want_max_accel: diff --git a/flow/controllers/velocity_controllers.py b/flow/controllers/velocity_controllers.py index c3da6136d..62ce15beb 100644 --- a/flow/controllers/velocity_controllers.py +++ b/flow/controllers/velocity_controllers.py @@ -25,7 +25,10 @@ class FollowerStopper(BaseController): def __init__(self, veh_id, car_following_params, - v_des=15): + v_des=15, + danger_edges=None, + control_length=None, + no_control_edges=None): """Instantiate FollowerStopper.""" BaseController.__init__( self, veh_id, car_following_params, delay=0.0, @@ -45,6 +48,10 @@ def __init__(self, self.d_2 = 1.0 self.d_3 = 0.5 + self.danger_edges = danger_edges if danger_edges else {} + self.control_length = control_length + self.no_control_edges = no_control_edges + def find_intersection_dist(self, env): """Find distance to intersection. @@ -72,41 +79,54 @@ def find_intersection_dist(self, env): def get_accel(self, env): """See parent class.""" - lead_id = env.k.vehicle.get_leader(self.veh_id) - this_vel = env.k.vehicle.get_speed(self.veh_id) - lead_vel = env.k.vehicle.get_speed(lead_id) - - if self.v_des is None: + if env.time_counter < env.env_params.warmup_steps * env.env_params.sims_per_step: return None - - if lead_id is None: - v_cmd = self.v_des else: - dx = env.k.vehicle.get_headway(self.veh_id) - dv_minus = min(lead_vel - this_vel, 0) + lead_id = env.k.vehicle.get_leader(self.veh_id) + this_vel = env.k.vehicle.get_speed(self.veh_id) + lead_vel = env.k.vehicle.get_speed(lead_id) - dx_1 = self.dx_1_0 + 1 / (2 * self.d_1) * dv_minus**2 - dx_2 = self.dx_2_0 + 1 / (2 * self.d_2) * dv_minus**2 - dx_3 = self.dx_3_0 + 1 / (2 * self.d_3) * dv_minus**2 - v = min(max(lead_vel, 0), self.v_des) - # compute the desired velocity - if dx <= dx_1: - v_cmd = 0 - elif dx <= dx_2: - v_cmd = v * (dx - dx_1) / (dx_2 - dx_1) - elif dx <= dx_3: - v_cmd = v + (self.v_des - this_vel) * (dx - dx_2) \ - / (dx_3 - dx_2) - else: - v_cmd = self.v_des - - edge = env.k.vehicle.get_edge(self.veh_id) + if self.v_des is None: + return None - if edge == "": - return None - else: - # compute the acceleration from the desired velocity - return np.clip((v_cmd - this_vel) / env.sim_step, -np.abs(self.max_deaccel), self.max_accel) + if lead_id is None: + v_cmd = self.v_des + else: + dx = env.k.vehicle.get_headway(self.veh_id) + dv_minus = min(lead_vel - this_vel, 0) + + dx_1 = self.dx_1_0 + 1 / (2 * self.d_1) * dv_minus**2 + dx_2 = self.dx_2_0 + 1 / (2 * self.d_2) * dv_minus**2 + dx_3 = self.dx_3_0 + 1 / (2 * self.d_3) * dv_minus**2 + v = min(max(lead_vel, 0), self.v_des) + # compute the desired velocity + if dx <= dx_1: + v_cmd = 0 + elif dx <= dx_2: + v_cmd = v * (dx - dx_1) / (dx_2 - dx_1) + elif dx <= dx_3: + v_cmd = v + (self.v_des - this_vel) * (dx - dx_2) \ + / (dx_3 - dx_2) + else: + v_cmd = self.v_des + + edge = env.k.vehicle.get_edge(self.veh_id) + + if edge == "": + return None + + if (self.find_intersection_dist(env) <= 10 and \ + env.k.vehicle.get_edge(self.veh_id) in self.danger_edges) or \ + env.k.vehicle.get_edge(self.veh_id)[0] == ":"\ + or (self.control_length and (env.k.vehicle.get_x_by_id(self.veh_id) < self.control_length[0] + or env.k.vehicle.get_x_by_id(self.veh_id) > self.control_length[1]))\ + or edge in self.no_control_edges: + # TODO(@evinitsky) put back + # or env.k.vehicle.get_edge(self.veh_id) in self.no_control_edges: + return None + else: + # compute the acceleration from the desired velocity + return np.clip((v_cmd - this_vel) / env.sim_step, -np.abs(self.max_deaccel), self.max_accel) class NonLocalFollowerStopper(FollowerStopper): diff --git a/flow/core/kernel/vehicle/base.py b/flow/core/kernel/vehicle/base.py index 9ca83ab40..1c5ed271a 100644 --- a/flow/core/kernel/vehicle/base.py +++ b/flow/core/kernel/vehicle/base.py @@ -323,6 +323,7 @@ def get_fuel_consumption(selfself, veh_id, error=-1001): vehicle id, or list of vehicle ids error : any, optional value that is returned if the vehicle is not found + Returns ------- float diff --git a/flow/core/rewards.py b/flow/core/rewards.py index 1434636e6..5aada2d8e 100755 --- a/flow/core/rewards.py +++ b/flow/core/rewards.py @@ -333,7 +333,6 @@ def energy_consumption(env, gain=.001): return -gain * power - def vehicle_energy_consumption(env, veh_id, gain=.001): """Calculate power consumption of a vehicle. @@ -352,6 +351,7 @@ def vehicle_energy_consumption(env, veh_id, gain=.001): if veh_id not in env.k.vehicle.previous_speeds: return 0 + speed = env.k.vehicle.get_speed(veh_id) prev_speed = env.k.vehicle.get_previous_speed(veh_id) @@ -389,7 +389,7 @@ def miles_per_megajoule(env, veh_ids=None, gain=.001): speed = env.k.vehicle.get_speed(veh_id) # convert to be positive since the function called is a penalty power = -vehicle_energy_consumption(env, veh_id, gain=1.0) - if power > 0 and speed >= 0.0: + if power > 0 and speed >= 0.1: counter += 1 # meters / joule is (v * \delta t) / (power * \delta t) mpj += speed / power diff --git a/flow/envs/base.py b/flow/envs/base.py index cf1674355..fbc57f33b 100644 --- a/flow/envs/base.py +++ b/flow/envs/base.py @@ -148,6 +148,8 @@ def __init__(self, self.state = None self.obs_var_labels = [] + self.num_training_iters = 0 + # track IDs that have ever been observed in the system self.observed_ids = set() self.observed_rl_ids = set() diff --git a/flow/envs/multiagent/__init__.py b/flow/envs/multiagent/__init__.py index 818d6662b..8c5552580 100644 --- a/flow/envs/multiagent/__init__.py +++ b/flow/envs/multiagent/__init__.py @@ -12,6 +12,7 @@ from flow.envs.multiagent.merge import MultiAgentMergePOEnv from flow.envs.multiagent.i210 import I210MultiEnv, MultiStraightRoad + __all__ = [ 'MultiEnv', 'AdversarialAccelEnv', diff --git a/flow/envs/multiagent/base.py b/flow/envs/multiagent/base.py index 594fb2fdb..7104138de 100644 --- a/flow/envs/multiagent/base.py +++ b/flow/envs/multiagent/base.py @@ -322,3 +322,6 @@ def apply_rl_actions(self, rl_actions=None): # clip according to the action space requirements clipped_actions = self.clip_actions(rl_actions) self._apply_rl_actions(clipped_actions) + + def set_iteration_num(self): + self.num_training_iters += 1 diff --git a/flow/envs/multiagent/i210.py b/flow/envs/multiagent/i210.py index a6e39cdec..c9b63b23a 100644 --- a/flow/envs/multiagent/i210.py +++ b/flow/envs/multiagent/i210.py @@ -1,8 +1,13 @@ """Environment for training vehicles to reduce congestion in the I210.""" -from gym.spaces import Box +from collections import OrderedDict +from copy import deepcopy +from time import time + +from gym.spaces import Box, Discrete, Dict import numpy as np +from flow.core.rewards import miles_per_gallon, miles_per_megajoule from flow.envs.multiagent.base import MultiEnv # largest number of lanes on any given edge in the network @@ -19,6 +24,7 @@ "lead_obs": True, # whether the reward should come from local vehicles instead of global rewards "local_reward": True, + # desired velocity "target_velocity": 25 } @@ -66,10 +72,35 @@ def __init__(self, env_params, sim_params, network, simulator='traci'): self.reroute_on_exit = env_params.additional_params.get("reroute_on_exit") self.max_lanes = MAX_LANES self.num_enter_lanes = 5 - self.entrance_edge = "119257914" - self.exit_edge = "119257908#3" + self.entrance_edge = "ghost0" + self.exit_edge = "119257908#2" + self.control_range = env_params.additional_params.get('control_range', None) + self.no_control_edges = env_params.additional_params.get('no_control_edges', []) + self.mpg_reward = env_params.additional_params["mpg_reward"] + self.mpj_reward = env_params.additional_params["mpj_reward"] + self.look_back_length = env_params.additional_params["look_back_length"] + + # whether to add a slight reward for opening up a gap that will be annealed out N iterations in + self.headway_curriculum = env_params.additional_params["headway_curriculum"] + # how many timesteps to anneal the headway curriculum over + self.headway_curriculum_iters = env_params.additional_params["headway_curriculum_iters"] + self.headway_reward_gain = env_params.additional_params["headway_reward_gain"] + self.min_time_headway = env_params.additional_params["min_time_headway"] + + # whether to add a slight reward for opening up a gap that will be annealed out N iterations in + self.speed_curriculum = env_params.additional_params["speed_curriculum"] + # how many timesteps to anneal the headway curriculum over + self.speed_curriculum_iters = env_params.additional_params["speed_curriculum_iters"] + self.speed_reward_gain = env_params.additional_params["speed_reward_gain"] + self.num_training_iters = 0 self.leader = [] + # penalize stops + self.penalize_stops = env_params.additional_params["penalize_stops"] + + # penalize accel + self.penalize_accel = env_params.additional_params.get("penalize_accel", False) + @property def observation_space(self): """See class definition.""" @@ -109,6 +140,8 @@ def action_space(self): def _apply_rl_actions(self, rl_actions): """See class definition.""" # in the warmup steps, rl_actions is None + id_list = [] + accel_list = [] if rl_actions: for rl_id, actions in rl_actions.items(): accel = actions[0] @@ -117,15 +150,28 @@ def _apply_rl_actions(self, rl_actions): # lane_change_softmax /= np.sum(lane_change_softmax) # lane_change_action = np.random.choice([-1, 0, 1], # p=lane_change_softmax) + id_list.append(rl_id) + accel_list.append(accel) + self.k.vehicle.apply_acceleration(id_list, accel_list) + # self.k.vehicle.apply_lane_change(rl_id, lane_change_action) + # print('time to apply actions is ', time() - t) - self.k.vehicle.apply_acceleration(rl_id, accel) - # self.k.vehicle.apply_lane_change(rl_id, lane_change_action) + def in_control_range(self, veh_id): + """Return if a veh_id is on an edge that is allowed to be controlled. + + If control range is defined it uses control range, otherwise it searches over a set of edges + """ + return (self.control_range and self.k.vehicle.get_x_by_id(veh_id) < self.control_range[1] \ + and self.k.vehicle.get_x_by_id(veh_id) > self.control_range[0]) or \ + (len(self.no_control_edges) > 0 and self.k.vehicle.get_edge(veh_id) not in + self.no_control_edges) def get_state(self): """See class definition.""" + valid_ids = [rl_id for rl_id in self.k.vehicle.get_rl_ids() if self.in_control_range(rl_id)] if self.lead_obs: veh_info = {} - for rl_id in self.k.vehicle.get_rl_ids(): + for rl_id in valid_ids: speed = self.k.vehicle.get_speed(rl_id) lead_id = self.k.vehicle.get_leader(rl_id) if lead_id in ["", None]: @@ -140,7 +186,7 @@ def get_state(self): else: veh_info = {rl_id: np.concatenate((self.state_util(rl_id), self.veh_statistics(rl_id))) - for rl_id in self.k.vehicle.get_rl_ids()} + for rl_id in valid_ids} return veh_info def compute_reward(self, rl_actions, **kwargs): @@ -150,27 +196,107 @@ def compute_reward(self, rl_actions, **kwargs): return {} rewards = {} + valid_ids = [rl_id for rl_id in self.k.vehicle.get_rl_ids() if self.in_control_range(rl_id)] + if self.env_params.additional_params["local_reward"]: des_speed = self.env_params.additional_params["target_velocity"] - for rl_id in self.k.vehicle.get_rl_ids(): + for rl_id in valid_ids: rewards[rl_id] = 0 - speeds = [] - follow_speed = self.k.vehicle.get_speed(self.k.vehicle.get_follower(rl_id)) - if follow_speed >= 0: - speeds.append(follow_speed) - if self.k.vehicle.get_speed(rl_id) >= 0: - speeds.append(self.k.vehicle.get_speed(rl_id)) - if len(speeds) > 0: - # rescale so the critic can estimate it quickly - rewards[rl_id] = np.mean([(des_speed - np.abs(speed - des_speed)) ** 2 - for speed in speeds]) / (des_speed ** 2) + if self.mpg_reward: + rewards[rl_id] = miles_per_gallon(self, rl_id, gain=1.0) / 100.0 + follow_id = rl_id + for i in range(self.look_back_length): + follow_id = self.k.vehicle.get_follower(follow_id) + if follow_id not in ["", None]: + rewards[rl_id] += miles_per_gallon(self, follow_id, gain=1.0) / 100.0 + else: + break + elif self.mpj_reward: + rewards[rl_id] = miles_per_megajoule(self, rl_id, gain=1.0) / 100.0 + follow_id = rl_id + for i in range(self.look_back_length): + follow_id = self.k.vehicle.get_follower(follow_id) + if follow_id not in ["", None]: + # if self.time_counter > 700 and miles_per_megajoule(self, follow_id, gain=1.0) > 1.0: + # import ipdb; ipdb.set_trace() + rewards[rl_id] += miles_per_megajoule(self, follow_id, gain=1.0) / 100.0 + else: + break + else: + speeds = [] + follow_speed = self.k.vehicle.get_speed(self.k.vehicle.get_follower(rl_id)) + if follow_speed >= 0: + speeds.append(follow_speed) + if self.k.vehicle.get_speed(rl_id) >= 0: + speeds.append(self.k.vehicle.get_speed(rl_id)) + if len(speeds) > 0: + # rescale so the critic can estimate it quickly + rewards[rl_id] = np.mean([(des_speed - np.abs(speed - des_speed)) ** 2 + for speed in speeds]) / (des_speed ** 2) else: - speeds = self.k.vehicle.get_speed(self.k.vehicle.get_ids()) + if self.mpg_reward: + reward = np.nan_to_num(miles_per_gallon(self, self.k.vehicle.get_ids(), gain=1.0)) / 100.0 + else: + speeds = self.k.vehicle.get_speed(self.k.vehicle.get_ids()) + des_speed = self.env_params.additional_params["target_velocity"] + # rescale so the critic can estimate it quickly + if self.reroute_on_exit: + reward = np.nan_to_num(np.mean([(des_speed - np.abs(speed - des_speed)) + for speed in speeds]) / (des_speed)) + else: + reward = np.nan_to_num(np.mean([(des_speed - np.abs(speed - des_speed)) ** 2 + for speed in speeds]) / (des_speed ** 2)) + rewards = {rl_id: reward for rl_id in valid_ids} + + # curriculum over time-gaps + if self.headway_curriculum and self.num_training_iters <= self.headway_curriculum_iters: + t_min = self.min_time_headway # smallest acceptable time headway + for veh_id, rew in rewards.items(): + lead_id = self.k.vehicle.get_leader(veh_id) + penalty = 0 + if lead_id not in ["", None] \ + and self.k.vehicle.get_speed(veh_id) > 0: + t_headway = max( + self.k.vehicle.get_headway(veh_id) / + self.k.vehicle.get_speed(veh_id), 0) + # print('time headway is {}, headway is {}'.format(t_headway, self.k.vehicle.get_headway(veh_id))) + scaling_factor = max(0, 1 - self.num_training_iters / self.headway_curriculum_iters) + penalty += scaling_factor * self.headway_reward_gain * min((t_headway - t_min) / t_min, 0) + # print('penalty is ', penalty) + + rewards[veh_id] += penalty + + if self.speed_curriculum and self.num_training_iters <= self.speed_curriculum_iters: des_speed = self.env_params.additional_params["target_velocity"] - # rescale so the critic can estimate it quickly - reward = np.nan_to_num(np.mean([(des_speed - np.abs(speed - des_speed)) ** 2 - for speed in speeds]) / (des_speed ** 2)) - rewards = {rl_id: reward for rl_id in self.k.vehicle.get_rl_ids()} + + for veh_id, rew in rewards.items(): + speed = self.k.vehicle.get_speed(veh_id) + speed_reward = 0.0 + follow_id = veh_id + for i in range(self.look_back_length): + follow_id = self.k.vehicle.get_follower(follow_id) + if follow_id not in ["", None]: + if self.reroute_on_exit: + speed_reward += ((des_speed - np.abs(speed - des_speed))) / (des_speed) + else: + speed_reward += ((des_speed - np.abs(speed - des_speed)) ** 2) / (des_speed ** 2) + else: + break + scaling_factor = max(0, 1 - self.num_training_iters / self.speed_curriculum_iters) + + rewards[veh_id] += speed_reward * scaling_factor * self.speed_reward_gain + + for veh_id in rewards.keys(): + speed = self.k.vehicle.get_speed(veh_id) + if self.penalize_stops: + if speed < 1.0: + rewards[veh_id] -= .01 + if self.penalize_accel and veh_id in self.k.vehicle.previous_speeds: + prev_speed = self.k.vehicle.get_previous_speed(veh_id) + abs_accel = abs(speed - prev_speed) / self.sim_step + rewards[veh_id] -= abs_accel / 400.0 + + # print('time to get reward is ', time() - t) return rewards def additional_command(self): @@ -191,6 +317,7 @@ def additional_command(self): and not self.env_params.evaluate: veh_ids = self.k.vehicle.get_ids() edges = self.k.vehicle.get_edge(veh_ids) + valid_lanes = list(range(self.num_enter_lanes)) for veh_id, edge in zip(veh_ids, edges): if edge == "": continue @@ -200,28 +327,38 @@ def additional_command(self): if edge == self.exit_edge and \ (self.k.vehicle.get_position(veh_id) > self.k.network.edge_length(self.exit_edge) - 100) \ and self.k.vehicle.get_leader(veh_id) is None: + # if self.step_counter > 6000: + # import ipdb; ipdb.set_trace() type_id = self.k.vehicle.get_type(veh_id) # remove the vehicle self.k.vehicle.remove(veh_id) - lane = np.random.randint(low=0, high=self.num_enter_lanes) + index = np.random.randint(low=0, high=len(valid_lanes)) + lane = valid_lanes[index] + del valid_lanes[index] # reintroduce it at the start of the network # TODO(@evinitsky) select the lane and speed a bit more cleanly # Note, the position is 10 so you are not overlapping with the inflow car that is being removed. # this allows the vehicle to be immediately inserted. - self.k.vehicle.add( - veh_id=veh_id, - edge=self.entrance_edge, - type_id=str(type_id), - lane=str(lane), - pos="10.0", - speed="23.0") + try: + self.k.vehicle.add( + veh_id=veh_id, + edge=self.entrance_edge, + type_id=str(type_id), + lane=str(lane), + pos="20.0", + speed="23.0") + except Exception as e: + print(e) + if len(valid_lanes) == 0: + break departed_ids = self.k.vehicle.get_departed_ids() - if len(departed_ids) > 0: + if isinstance(departed_ids, tuple) and len(departed_ids) > 0: for veh_id in departed_ids: if veh_id not in self.observed_ids: self.k.vehicle.remove(veh_id) + def state_util(self, rl_id): """Return an array of headway, tailway, leader speed, follower speed. diff --git a/flow/networks/i210_subnetwork_ghost_cell.py b/flow/networks/i210_subnetwork_ghost_cell.py new file mode 100644 index 000000000..8a45b4d91 --- /dev/null +++ b/flow/networks/i210_subnetwork_ghost_cell.py @@ -0,0 +1,162 @@ +"""Contains the I-210 sub-network class.""" + +from flow.networks.base import Network + +EDGES_DISTRIBUTION = [ + # Main highway + "ghost0", + "119257914", + "119257908#0", + "119257908#1-AddedOnRampEdge", + "119257908#1", + "119257908#1-AddedOffRampEdge", + "119257908#2", + "119257908#3", + + # On-ramp + "27414345", + "27414342#0", + "27414342#1-AddedOnRampEdge", + + # Off-ramp + "173381935", +] + + +class I210SubNetworkGhostCell(Network): + """A network used to simulate the I-210 sub-network. + + Usage + ----- + >>> from flow.core.params import NetParams + >>> from flow.core.params import VehicleParams + >>> from flow.core.params import InitialConfig + >>> from flow.networks import I210SubNetwork + >>> + >>> network = I210SubNetwork( + >>> name='I-210_subnetwork', + >>> vehicles=VehicleParams(), + >>> net_params=NetParams() + >>> ) + """ + + def specify_routes(self, net_params): + """See parent class. + + Routes for vehicles moving through the I210. + """ + if net_params.additional_params["on_ramp"]: + rts = { + # Main highway + "ghost0": [ + (["ghost0", "119257914", "119257908#0", "119257908#1-AddedOnRampEdge", + "119257908#1", "119257908#1-AddedOffRampEdge", "119257908#2", + "119257908#3"], + 1 - 17 / 8378), # HOV: 1509 (on ramp: 57), Non HOV: 6869 (onramp: 16) + (["119257914", "119257908#0", "119257908#1-AddedOnRampEdge", + "119257908#1", "119257908#1-AddedOffRampEdge", "173381935"], + 17 / 8378) + ], + "119257914": [ + (["119257914", "119257908#0", "119257908#1-AddedOnRampEdge", + "119257908#1", "119257908#1-AddedOffRampEdge", "119257908#2", + "119257908#3"], + 1 - 17 / 8378), # HOV: 1509 (on ramp: 57), Non HOV: 6869 (onramp: 16) + (["119257914", "119257908#0", "119257908#1-AddedOnRampEdge", + "119257908#1", "119257908#1-AddedOffRampEdge", "173381935"], + 17 / 8378) + ], + "119257908#0": [ + (["119257908#0", "119257908#1-AddedOnRampEdge", "119257908#1", + "119257908#1-AddedOffRampEdge", "119257908#2", + "119257908#3"], + 1.0), + # (["119257908#0", "119257908#1-AddedOnRampEdge", "119257908#1", + # "119257908#1-AddedOffRampEdge", "173381935"], + # 0.5), + ], + "119257908#1-AddedOnRampEdge": [ + (["119257908#1-AddedOnRampEdge", "119257908#1", + "119257908#1-AddedOffRampEdge", "119257908#2", + "119257908#3"], + 1.0), + # (["119257908#1-AddedOnRampEdge", "119257908#1", + # "119257908#1-AddedOffRampEdge", "173381935"], + # 0.5), + ], + "119257908#1": [ + (["119257908#1", "119257908#1-AddedOffRampEdge", "119257908#2", + "119257908#3"], + 1.0), + # (["119257908#1", "119257908#1-AddedOffRampEdge", "173381935"], + # 0.5), + ], + "119257908#1-AddedOffRampEdge": [ + (["119257908#1-AddedOffRampEdge", "119257908#2", + "119257908#3"], + 1.0), + # (["119257908#1-AddedOffRampEdge", "173381935"], + # 0.5), + ], + "119257908#2": [ + (["119257908#2", "119257908#3"], 1), + ], + "119257908#3": [ + (["119257908#3"], 1), + ], + + # On-ramp + "27414345": [ + (["27414345", "27414342#1-AddedOnRampEdge", + "27414342#1", + "119257908#1-AddedOnRampEdge", "119257908#1", + "119257908#1-AddedOffRampEdge", "119257908#2", + "119257908#3"], + 1 - 9 / 321), + (["27414345", "27414342#1-AddedOnRampEdge", + "27414342#1", + "119257908#1-AddedOnRampEdge", "119257908#1", + "119257908#1-AddedOffRampEdge", "173381935"], + 9 / 321), + ], + "27414342#0": [ + (["27414342#0", "27414342#1-AddedOnRampEdge", + "27414342#1", + "119257908#1-AddedOnRampEdge", "119257908#1", + "119257908#1-AddedOffRampEdge", "119257908#2", + "119257908#3"], + 1 - 20 / 421), + (["27414342#0", "27414342#1-AddedOnRampEdge", + "27414342#1", + "119257908#1-AddedOnRampEdge", "119257908#1", + "119257908#1-AddedOffRampEdge", "173381935"], + 20 / 421), + ], + "27414342#1-AddedOnRampEdge": [ + (["27414342#1-AddedOnRampEdge", "27414342#1", "119257908#1-AddedOnRampEdge", + "119257908#1", "119257908#1-AddedOffRampEdge", "119257908#2", + "119257908#3"], + 0.5), + (["27414342#1-AddedOnRampEdge", "27414342#1", "119257908#1-AddedOnRampEdge", + "119257908#1", "119257908#1-AddedOffRampEdge", "173381935"], + 0.5), + ], + + # Off-ramp + "173381935": [ + (["173381935"], 1), + ], + } + + else: + rts = { + # Main highway + "ghost0": [ + (["ghost0", "119257914", "119257908#0", "119257908#1-AddedOnRampEdge", + "119257908#1", "119257908#1-AddedOffRampEdge", "119257908#2", + "119257908#3"], + 1), + ], + } + + return rts diff --git a/flow/visualize/time_space_diagram.py b/flow/visualize/time_space_diagram.py index 9ac6938d4..004172765 100644 --- a/flow/visualize/time_space_diagram.py +++ b/flow/visualize/time_space_diagram.py @@ -256,12 +256,31 @@ def _highway(data, params, all_time): time step. Set to zero if the vehicle is not present in the network at that time step. """ - length = params['net'].additional_params['length'] - num_edges = params['net'].additional_params['num_edges'] - edge_len = length / num_edges + junction_length = 0.1 + length = params['net'].additional_params["length"] + num_edges = params['net'].additional_params.get("num_edges", 1) edge_starts = {} - for i in range(num_edges): - edge_starts.update({"highway_{}".format(i): i * edge_len, ":edge_{}_0".format(i): i * edge_len}) + # Add the main edges. + edge_starts.update({ + "highway_{}".format(i): + i * (length / num_edges + junction_length) + for i in range(num_edges) + }) + + if params['net'].additional_params["use_ghost_edge"]: + edge_starts.update({"highway_end": length + num_edges * junction_length}) + + edge_starts.update({ + ":edge_{}".format(i + 1): + (i + 1) * length / num_edges + i * junction_length + for i in range(num_edges - 1) + }) + + if params['net'].additional_params["use_ghost_edge"]: + edge_starts.update({ + ":edge_{}".format(num_edges): + length + (num_edges - 1) * junction_length + }) # compute the absolute position for veh_id in data.keys(): diff --git a/flow/visualize/visualizer_rllib.py b/flow/visualize/visualizer_rllib.py index c1dd83193..5c52e196f 100644 --- a/flow/visualize/visualizer_rllib.py +++ b/flow/visualize/visualizer_rllib.py @@ -26,6 +26,7 @@ from ray.rllib.agents.registry import get_agent_class from ray.tune.registry import register_env +from flow.core.rewards import miles_per_gallon, miles_per_megajoule from flow.core.util import emission_to_csv from flow.utils.registry import make_create_env from flow.utils.rllib import get_flow_params @@ -90,6 +91,14 @@ def visualizer_rllib(args): sys.exit(1) if args.run: agent_cls = get_agent_class(args.run) + elif config['env_config']['run'] == "": + from flow.algorithms.centralized_PPO import CCTrainer, CentralizedCriticModel + from ray.rllib.models import ModelCatalog + agent_cls = CCTrainer + ModelCatalog.register_custom_model("cc_model", CentralizedCriticModel) + elif config['env_config']['run'] == "": + from flow.algorithms.custom_ppo import CustomPPOTrainer + agent_cls = CustomPPOTrainer elif config_run: agent_cls = get_agent_class(config_run) else: @@ -160,6 +169,10 @@ def visualizer_rllib(args): else: env = gym.make(env_name) + # reroute on exit is a training hack, it should be turned off at test time. + if hasattr(env, "reroute_on_exit"): + env.reroute_on_exit = False + if args.render_mode == 'sumo_gui': env.sim_params.render = True # set to True after initializing agent and env @@ -197,6 +210,8 @@ def visualizer_rllib(args): # Simulate and collect metrics final_outflows = [] final_inflows = [] + mpg = [] + mpj = [] mean_speed = [] std_speed = [] for i in range(args.num_rollouts): @@ -214,6 +229,9 @@ def visualizer_rllib(args): if speeds: vel.append(np.mean(speeds)) + mpg.append(miles_per_gallon(env.unwrapped, vehicles.get_ids(), gain=1.0)) + mpj.append(miles_per_megajoule(env.unwrapped, vehicles.get_ids(), gain=1.0)) + if multiagent: action = {} for agent_id in state.keys(): @@ -279,10 +297,11 @@ def visualizer_rllib(args): print(mean_speed) print('Average, std: {}, {}'.format(np.mean(mean_speed), np.std( mean_speed))) - print("\nSpeed, std (m/s):") - print(std_speed) - print('Average, std: {}, {}'.format(np.mean(std_speed), np.std( - std_speed))) + + print('Average, std miles per gallon: {}, {}'.format(np.mean(mpg), np.std(mpg))) + + print('Average, std miles per megajoule: {}, {}'.format(np.mean(mpj), np.std(mpj))) + # Compute arrival rate of vehicles in the last 500 sec of the run print("\nOutflows (veh/hr):") diff --git a/scripts/ray_autoscale.yaml b/scripts/ray_autoscale.yaml index 5cf0eca96..18e25154d 100644 --- a/scripts/ray_autoscale.yaml +++ b/scripts/ray_autoscale.yaml @@ -1,4 +1,4 @@ -# cluster.yaml ========================================= +# cluster.yaml ========================================= # An unique identifier for the head node and workers of this cluster. cluster_name: test # @@ -39,8 +39,8 @@ auth: # For more documentation on available fields, see: # http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances head_node: - InstanceType: c4.4xlarge - ImageId: ami-0b489700e7f810707 # Flow AMI (Ubuntu) + InstanceType: c4.8xlarge + ImageId: ami-0c047f3ddd3939b30 # Flow AMI (Ubuntu) InstanceMarketOptions: MarketType: spot #Additional options can be found in the boto docs, e.g. @@ -54,10 +54,10 @@ head_node: # For more documentation on available fields, see: # http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances worker_nodes: - InstanceType: c4.4xlarge - ImageId: ami-0b489700e7f810707 # Flow AMI (Ubuntu) + InstanceType: c4.8xlarge + ImageId: ami-0c047f3ddd3939b30 # Flow AMI (Ubuntu) - #Run workers on spot by default. Comment this out to use on-demand. + #Run workers on spot by default. Comment this out to use on-demand. InstanceMarketOptions: MarketType: spot # Additional options can be found in the boto docs, e.g. @@ -67,7 +67,8 @@ worker_nodes: # Additional options in the boto docs. setup_commands: - - cd flow && git fetch && git checkout origin/i210_dev + - cd flow && git fetch && git checkout origin/flow_maddpg + - flow/scripts/setup_sumo_ubuntu1604.sh - pip install ray==0.8.0 - pip install tabulate - pip install boto3==1.10.45 # 1.4.8 adds InstanceMarketOptions @@ -79,7 +80,6 @@ setup_commands: - pip install lz4 - pip install dm-tree - pip install numpy==1.18.4 - - ./flow/scripts/setup_sumo_ubuntu1604.sh head_setup_commands: []