From b2a439f74f8be390c806515f77849e04776f91bb Mon Sep 17 00:00:00 2001 From: Kristian Hartikainen Date: Fri, 1 Feb 2019 22:10:33 -0800 Subject: [PATCH] Refactor gym envs to support dynamic arguments --- gym/envs/mujoco/ant.py | 149 ++++++++++++++++++++++------ gym/envs/mujoco/half_cheetah.py | 84 ++++++++++++---- gym/envs/mujoco/hopper.py | 149 ++++++++++++++++++++++------ gym/envs/mujoco/humanoid.py | 170 +++++++++++++++++++++++++------- gym/envs/mujoco/swimmer.py | 90 ++++++++++++----- gym/envs/mujoco/walker2d.py | 141 ++++++++++++++++++++------ 6 files changed, 621 insertions(+), 162 deletions(-) diff --git a/gym/envs/mujoco/ant.py b/gym/envs/mujoco/ant.py index 550fb645a55..02c521d0844 100644 --- a/gym/envs/mujoco/ant.py +++ b/gym/envs/mujoco/ant.py @@ -2,44 +2,131 @@ from gym import utils from gym.envs.mujoco import mujoco_env + +DEFAULT_CAMERA_CONFIG = { + 'distance': 4.0, +} + + class AntEnv(mujoco_env.MujocoEnv, utils.EzPickle): - def __init__(self): + def __init__(self, + ctrl_cost_weight=0.5, + contact_cost_weight=5e-4, + healthy_reward=1.0, + terminate_when_unhealthy=True, + healthy_z_range=(0.2, 1.0), + contact_force_range=(-1.0, 1.0), + reset_noise_scale=0.1, + exclude_current_positions_from_observation=True): + utils.EzPickle.__init__(**locals()) + + self._ctrl_cost_weight = ctrl_cost_weight + self._contact_cost_weight = contact_cost_weight + + self._healthy_reward = healthy_reward + self._terminate_when_unhealthy = terminate_when_unhealthy + self._healthy_z_range = healthy_z_range + + self._contact_force_range = contact_force_range + + self._reset_noise_scale = reset_noise_scale + + self._exclude_current_positions_from_observation = ( + exclude_current_positions_from_observation) + mujoco_env.MujocoEnv.__init__(self, 'ant.xml', 5) - utils.EzPickle.__init__(self) - - def step(self, a): - xposbefore = self.get_body_com("torso")[0] - self.do_simulation(a, self.frame_skip) - xposafter = self.get_body_com("torso")[0] - forward_reward = (xposafter - xposbefore)/self.dt - ctrl_cost = .5 * np.square(a).sum() - contact_cost = 0.5 * 1e-3 * np.sum( - np.square(np.clip(self.sim.data.cfrc_ext, -1, 1))) - survive_reward = 1.0 - reward = forward_reward - ctrl_cost - contact_cost + survive_reward + + @property + def healthy_reward(self): + return float( + self.is_healthy + or self._terminate_when_unhealthy + ) * self._healthy_reward + + def control_cost(self, action): + control_cost = self._ctrl_cost_weight * np.sum(np.square(action)) + return control_cost + + @property + def contact_forces(self): + raw_contact_forces = self.sim.data.cfrc_ext + min_value, max_value = self._contact_force_range + contact_forces = np.clip(raw_contact_forces, min_value, max_value) + return contact_forces + + @property + def contact_cost(self): + contact_cost = self._contact_cost_weight * np.sum( + np.square(self.contact_forces)) + return contact_cost + + @property + def is_healthy(self): state = self.state_vector() - notdone = np.isfinite(state).all() \ - and state[2] >= 0.2 and state[2] <= 1.0 - done = not notdone - ob = self._get_obs() - return ob, reward, done, dict( - reward_forward=forward_reward, - reward_ctrl=-ctrl_cost, - reward_contact=-contact_cost, - reward_survive=survive_reward) + min_z, max_z = self._healthy_z_range + is_healthy = (np.isfinite(state).all() and min_z <= state[2] <= max_z) + return is_healthy + + @property + def done(self): + done = (not self.is_healthy + if self._terminate_when_unhealthy + else False) + return done + + def step(self, action): + x_position_before = self.get_body_com("torso")[0] + self.do_simulation(action, self.frame_skip) + x_position_after = self.get_body_com("torso")[0] + x_velocity = (x_position_after - x_position_before) / self.dt + + ctrl_cost = self.control_cost(action) + contact_cost = self.contact_cost + + forward_reward = x_velocity + healthy_reward = self.healthy_reward + + rewards = forward_reward + healthy_reward + costs = ctrl_cost + contact_cost + + reward = rewards - costs + done = self.done + observation = self._get_obs() + info = { + 'reward_forward': forward_reward, + 'reward_ctrl': -ctrl_cost, + 'reward_contact': -contact_cost, + 'reward_survive': healthy_reward, + } + + return observation, reward, done, info def _get_obs(self): - return np.concatenate([ - self.sim.data.qpos.flat[2:], - self.sim.data.qvel.flat, - np.clip(self.sim.data.cfrc_ext, -1, 1).flat, - ]) + position = self.sim.data.qpos.flat.copy() + velocity = self.sim.data.qvel.flat.copy() + contact_force = self.contact_forces.flat.copy() + + if self._exclude_current_positions_from_observation: + position = position[2:] + + observations = np.concatenate((position, velocity, contact_force)) + + return observations def reset_model(self): - qpos = self.init_qpos + self.np_random.uniform(size=self.model.nq, low=-.1, high=.1) - qvel = self.init_qvel + self.np_random.randn(self.model.nv) * .1 + noise_low = -self._reset_noise_scale + noise_high = self._reset_noise_scale + + qpos = self.init_qpos + self.np_random.uniform( + low=noise_low, high=noise_high, size=self.model.nq) + qvel = self.init_qvel + self._reset_noise_scale * self.np_random.randn( + self.model.nv) self.set_state(qpos, qvel) - return self._get_obs() + + observation = self._get_obs() + + return observation def viewer_setup(self): - self.viewer.cam.distance = self.model.stat.extent * 0.5 + for key, value in DEFAULT_CAMERA_CONFIG.items(): + setattr(self.viewer.cam, key, value) diff --git a/gym/envs/mujoco/half_cheetah.py b/gym/envs/mujoco/half_cheetah.py index ea9761c5610..22882ada925 100644 --- a/gym/envs/mujoco/half_cheetah.py +++ b/gym/envs/mujoco/half_cheetah.py @@ -2,33 +2,83 @@ from gym import utils from gym.envs.mujoco import mujoco_env + +DEFAULT_CAMERA_CONFIG = { + 'distance': 4.0, +} + + class HalfCheetahEnv(mujoco_env.MujocoEnv, utils.EzPickle): - def __init__(self): + def __init__(self, + forward_reward_weight=1.0, + ctrl_cost_weight=0.1, + reset_noise_scale=0.1, + exclude_current_positions_from_observation=True): + utils.EzPickle.__init__(**locals()) + + self._forward_reward_weight = forward_reward_weight + + self._ctrl_cost_weight = ctrl_cost_weight + + self._reset_noise_scale = reset_noise_scale + + self._exclude_current_positions_from_observation = ( + exclude_current_positions_from_observation) + mujoco_env.MujocoEnv.__init__(self, 'half_cheetah.xml', 5) - utils.EzPickle.__init__(self) + + def control_cost(self, action): + control_cost = self._ctrl_cost_weight * np.sum(np.square(action)) + return control_cost def step(self, action): - xposbefore = self.sim.data.qpos[0] + x_position_before = self.sim.data.qpos[0] self.do_simulation(action, self.frame_skip) - xposafter = self.sim.data.qpos[0] - ob = self._get_obs() - reward_ctrl = - 0.1 * np.square(action).sum() - reward_run = (xposafter - xposbefore)/self.dt - reward = reward_ctrl + reward_run + x_position_after = self.sim.data.qpos[0] + x_velocity = ((x_position_after - x_position_before) + / self.dt) + + ctrl_cost = self.control_cost(action) + + forward_reward = self._forward_reward_weight * x_velocity + + observation = self._get_obs() + reward = forward_reward - ctrl_cost done = False - return ob, reward, done, dict(reward_run=reward_run, reward_ctrl=reward_ctrl) + info = { + 'x_position': x_position_after, + 'x_velocity': x_velocity, + + 'reward_run': forward_reward, + 'reward_ctrl': -ctrl_cost + } + + return observation, reward, done, info def _get_obs(self): - return np.concatenate([ - self.sim.data.qpos.flat[1:], - self.sim.data.qvel.flat, - ]) + position = self.sim.data.qpos.flat.copy() + velocity = self.sim.data.qvel.flat.copy() + + if self._exclude_current_positions_from_observation: + position = position[1:] + + observation = np.concatenate((position, velocity)).ravel() + return observation def reset_model(self): - qpos = self.init_qpos + self.np_random.uniform(low=-.1, high=.1, size=self.model.nq) - qvel = self.init_qvel + self.np_random.randn(self.model.nv) * .1 + noise_low = -self._reset_noise_scale + noise_high = self._reset_noise_scale + + qpos = self.init_qpos + self.np_random.uniform( + low=noise_low, high=noise_high, size=self.model.nq) + qvel = self.init_qvel + self._reset_noise_scale * self.np_random.randn( + self.model.nv) + self.set_state(qpos, qvel) - return self._get_obs() + + observation = self._get_obs() + return observation def viewer_setup(self): - self.viewer.cam.distance = self.model.stat.extent * 0.5 + for key, value in DEFAULT_CAMERA_CONFIG.items(): + setattr(self.viewer.cam, key, value) diff --git a/gym/envs/mujoco/hopper.py b/gym/envs/mujoco/hopper.py index be826a409af..4c0bd2433f1 100644 --- a/gym/envs/mujoco/hopper.py +++ b/gym/envs/mujoco/hopper.py @@ -1,40 +1,133 @@ import numpy as np -from gym import utils from gym.envs.mujoco import mujoco_env +from gym import utils + + +DEFAULT_CAMERA_CONFIG = { + 'trackbodyid': 2, + 'distance': 3.0, + 'lookat': np.array((0.0, 0.0, 1.15)), + 'elevation': -20.0, +} + class HopperEnv(mujoco_env.MujocoEnv, utils.EzPickle): - def __init__(self): + def __init__(self, + forward_reward_weight=1.0, + ctrl_cost_weight=1e-3, + healthy_reward=1.0, + terminate_when_unhealthy=True, + healthy_state_range=(-100.0, 100.0), + healthy_z_range=(0.7, float('inf')), + healthy_angle_range=(-0.2, 0.2), + reset_noise_scale=5e-3, + exclude_current_positions_from_observation=True): + utils.EzPickle.__init__(**locals()) + + self._forward_reward_weight = forward_reward_weight + + self._ctrl_cost_weight = ctrl_cost_weight + + self._healthy_reward = healthy_reward + self._terminate_when_unhealthy = terminate_when_unhealthy + + self._healthy_state_range = healthy_state_range + self._healthy_z_range = healthy_z_range + self._healthy_angle_range = healthy_angle_range + + self._reset_noise_scale = reset_noise_scale + + self._exclude_current_positions_from_observation = ( + exclude_current_positions_from_observation) + mujoco_env.MujocoEnv.__init__(self, 'hopper.xml', 4) - utils.EzPickle.__init__(self) - - def step(self, a): - posbefore = self.sim.data.qpos[0] - self.do_simulation(a, self.frame_skip) - posafter, height, ang = self.sim.data.qpos[0:3] - alive_bonus = 1.0 - reward = (posafter - posbefore) / self.dt - reward += alive_bonus - reward -= 1e-3 * np.square(a).sum() - s = self.state_vector() - done = not (np.isfinite(s).all() and (np.abs(s[2:]) < 100).all() and - (height > .7) and (abs(ang) < .2)) - ob = self._get_obs() - return ob, reward, done, {} + + @property + def healthy_reward(self): + return float( + self.is_healthy + or self._terminate_when_unhealthy + ) * self._healthy_reward + + def control_cost(self, action): + control_cost = self._ctrl_cost_weight * np.sum(np.square(action)) + return control_cost + + @property + def is_healthy(self): + z, angle = self.sim.data.qpos[1:3] + state = self.state_vector()[2:] + + min_state, max_state = self._healthy_state_range + min_z, max_z = self._healthy_z_range + min_angle, max_angle = self._healthy_angle_range + + healthy_state = np.all( + np.logical_and(min_state < state, state < max_state)) + healthy_z = min_z < z < max_z + healthy_angle = min_angle < angle < max_angle + + is_healthy = all((healthy_state, healthy_z, healthy_angle)) + + return is_healthy + + @property + def done(self): + done = (not self.is_healthy + if self._terminate_when_unhealthy + else False) + return done def _get_obs(self): - return np.concatenate([ - self.sim.data.qpos.flat[1:], - np.clip(self.sim.data.qvel.flat, -10, 10) - ]) + position = self.sim.data.qpos.flat.copy() + velocity = np.clip( + self.sim.data.qvel.flat.copy(), -10, 10) + + if self._exclude_current_positions_from_observation: + position = position[1:] + + observation = np.concatenate((position, velocity)).ravel() + return observation + + def step(self, action): + x_position_before = self.sim.data.qpos[0] + self.do_simulation(action, self.frame_skip) + x_position_after = self.sim.data.qpos[0] + x_velocity = ((x_position_after - x_position_before) + / self.dt) + + ctrl_cost = self.control_cost(action) + + forward_reward = self._forward_reward_weight * x_velocity + healthy_reward = self.healthy_reward + + rewards = forward_reward + healthy_reward + costs = ctrl_cost + + observation = self._get_obs() + reward = rewards - costs + done = self.done + info = { + 'x_position': x_position_after, + 'x_velocity': x_velocity, + } + + return observation, reward, done, info def reset_model(self): - qpos = self.init_qpos + self.np_random.uniform(low=-.005, high=.005, size=self.model.nq) - qvel = self.init_qvel + self.np_random.uniform(low=-.005, high=.005, size=self.model.nv) + noise_low = -self._reset_noise_scale + noise_high = self._reset_noise_scale + + qpos = self.init_qpos + self.np_random.uniform( + low=noise_low, high=noise_high, size=self.model.nq) + qvel = self.init_qvel + self.np_random.uniform( + low=noise_low, high=noise_high, size=self.model.nv) + self.set_state(qpos, qvel) - return self._get_obs() + + observation = self._get_obs() + return observation def viewer_setup(self): - self.viewer.cam.trackbodyid = 2 - self.viewer.cam.distance = self.model.stat.extent * 0.75 - self.viewer.cam.lookat[2] = 1.15 - self.viewer.cam.elevation = -20 + for key, value in DEFAULT_CAMERA_CONFIG.items(): + setattr(self.viewer.cam, key, value) diff --git a/gym/envs/mujoco/humanoid.py b/gym/envs/mujoco/humanoid.py index 021733e89bb..033ab56eb6c 100644 --- a/gym/envs/mujoco/humanoid.py +++ b/gym/envs/mujoco/humanoid.py @@ -2,50 +2,148 @@ from gym.envs.mujoco import mujoco_env from gym import utils + +DEFAULT_CAMERA_CONFIG = { + 'trackbodyid': 1, + 'distance': 4.0, + 'lookat': np.array((0.0, 0.0, 2.0)), + 'elevation': -20.0, +} + + def mass_center(model, sim): - mass = np.expand_dims(model.body_mass, 1) + mass = np.expand_dims(model.body_mass, axis=1) xpos = sim.data.xipos - return (np.sum(mass * xpos, 0) / np.sum(mass))[0] + return (np.sum(mass * xpos, axis=0) / np.sum(mass))[0] + class HumanoidEnv(mujoco_env.MujocoEnv, utils.EzPickle): - def __init__(self): + def __init__(self, + forward_reward_weight=0.25, + ctrl_cost_weight=0.1, + contact_cost_weight=5e-7, + contact_cost_range=(-np.inf, 10.0), + healthy_reward=5.0, + terminate_when_unhealthy=True, + healthy_z_range=(1.0, 2.0), + reset_noise_scale=1e-2, + exclude_current_positions_from_observation=True): + utils.EzPickle.__init__(**locals()) + + self._forward_reward_weight = forward_reward_weight + self._ctrl_cost_weight = ctrl_cost_weight + self._contact_cost_weight = contact_cost_weight + self._contact_cost_range = contact_cost_range + self._healthy_reward = healthy_reward + self._terminate_when_unhealthy = terminate_when_unhealthy + self._healthy_z_range = healthy_z_range + + self._reset_noise_scale = reset_noise_scale + + self._exclude_current_positions_from_observation = ( + exclude_current_positions_from_observation) + mujoco_env.MujocoEnv.__init__(self, 'humanoid.xml', 5) - utils.EzPickle.__init__(self) + + @property + def healthy_reward(self): + return float( + self.is_healthy + or self._terminate_when_unhealthy + ) * self._healthy_reward + + def control_cost(self, action): + control_cost = self._ctrl_cost_weight * np.sum( + np.square(self.sim.data.ctrl)) + return control_cost + + @property + def contact_cost(self): + contact_forces = self.sim.data.cfrc_ext + contact_cost = self._contact_cost_weight * np.sum( + np.square(contact_forces)) + min_cost, max_cost = self._contact_cost_range + contact_cost = np.clip(contact_cost, min_cost, max_cost) + return contact_cost + + @property + def is_healthy(self): + min_z, max_z = self._healthy_z_range + is_healthy = min_z < self.sim.data.qpos[2] < max_z + + return is_healthy + + @property + def done(self): + done = ((not self.is_healthy) + if self._terminate_when_unhealthy + else False) + return done def _get_obs(self): - data = self.sim.data - return np.concatenate([data.qpos.flat[2:], - data.qvel.flat, - data.cinert.flat, - data.cvel.flat, - data.qfrc_actuator.flat, - data.cfrc_ext.flat]) - - def step(self, a): - pos_before = mass_center(self.model, self.sim) - self.do_simulation(a, self.frame_skip) - pos_after = mass_center(self.model, self.sim) - alive_bonus = 5.0 - data = self.sim.data - lin_vel_cost = 0.25 * (pos_after - pos_before) / self.model.opt.timestep - quad_ctrl_cost = 0.1 * np.square(data.ctrl).sum() - quad_impact_cost = .5e-6 * np.square(data.cfrc_ext).sum() - quad_impact_cost = min(quad_impact_cost, 10) - reward = lin_vel_cost - quad_ctrl_cost - quad_impact_cost + alive_bonus - qpos = self.sim.data.qpos - done = bool((qpos[2] < 1.0) or (qpos[2] > 2.0)) - return self._get_obs(), reward, done, dict(reward_linvel=lin_vel_cost, reward_quadctrl=-quad_ctrl_cost, reward_alive=alive_bonus, reward_impact=-quad_impact_cost) + position = self.sim.data.qpos.flat.copy() + velocity = self.sim.data.qvel.flat.copy() + + com_inertia = self.sim.data.cinert.flat.copy() + com_velocity = self.sim.data.cvel.flat.copy() + + actuator_forces = self.sim.data.qfrc_actuator.flat.copy() + external_contact_forces = self.sim.data.cfrc_ext.flat.copy() + + if self._exclude_current_positions_from_observation: + position = position[2:] + + return np.concatenate(( + position, + velocity, + com_inertia, + com_velocity, + actuator_forces, + external_contact_forces, + )) + + def step(self, action): + x_position_before = mass_center(self.model, self.sim) + self.do_simulation(action, self.frame_skip) + x_position_after = mass_center(self.model, self.sim) + + x_velocity = ((x_position_after - x_position_before) + / self.model.opt.timestep) + + ctrl_cost = self.control_cost(action) + contact_cost = self.contact_cost + + forward_reward = self._forward_reward_weight * x_velocity + healthy_reward = self.healthy_reward + + rewards = forward_reward + healthy_reward + costs = ctrl_cost + contact_cost + + observation = self._get_obs() + reward = rewards - costs + done = self.done + info = { + 'reward_linvel': forward_reward, + 'reward_quadctrl': -ctrl_cost, + 'reward_alive': healthy_reward, + 'reward_impact': -contact_cost, + } + + return observation, reward, done, info def reset_model(self): - c = 0.01 - self.set_state( - self.init_qpos + self.np_random.uniform(low=-c, high=c, size=self.model.nq), - self.init_qvel + self.np_random.uniform(low=-c, high=c, size=self.model.nv,) - ) - return self._get_obs() + noise_low = -self._reset_noise_scale + noise_high = self._reset_noise_scale + + qpos = self.init_qpos + self.np_random.uniform( + low=noise_low, high=noise_high, size=self.model.nq) + qvel = self.init_qvel + self.np_random.uniform( + low=noise_low, high=noise_high, size=self.model.nv) + self.set_state(qpos, qvel) + + observation = self._get_obs() + return observation def viewer_setup(self): - self.viewer.cam.trackbodyid = 1 - self.viewer.cam.distance = self.model.stat.extent * 1.0 - self.viewer.cam.lookat[2] = 2.0 - self.viewer.cam.elevation = -20 + for key, value in DEFAULT_CAMERA_CONFIG.items(): + setattr(self.viewer.cam, key, value) diff --git a/gym/envs/mujoco/swimmer.py b/gym/envs/mujoco/swimmer.py index 42574fa3507..f20aeb70a12 100644 --- a/gym/envs/mujoco/swimmer.py +++ b/gym/envs/mujoco/swimmer.py @@ -1,31 +1,77 @@ import numpy as np -from gym import utils from gym.envs.mujoco import mujoco_env +from gym import utils + class SwimmerEnv(mujoco_env.MujocoEnv, utils.EzPickle): - def __init__(self): + def __init__(self, + forward_reward_weight=1.0, + ctrl_cost_weight=1e-4, + reset_noise_scale=0.1, + exclude_current_positions_from_observation=True): + utils.EzPickle.__init__(**locals()) + + self._forward_reward_weight = forward_reward_weight + self._ctrl_cost_weight = ctrl_cost_weight + + self._reset_noise_scale = reset_noise_scale + + self._exclude_current_positions_from_observation = ( + exclude_current_positions_from_observation) + mujoco_env.MujocoEnv.__init__(self, 'swimmer.xml', 4) - utils.EzPickle.__init__(self) - - def step(self, a): - ctrl_cost_coeff = 0.0001 - xposbefore = self.sim.data.qpos[0] - self.do_simulation(a, self.frame_skip) - xposafter = self.sim.data.qpos[0] - reward_fwd = (xposafter - xposbefore) / self.dt - reward_ctrl = - ctrl_cost_coeff * np.square(a).sum() - reward = reward_fwd + reward_ctrl - ob = self._get_obs() - return ob, reward, False, dict(reward_fwd=reward_fwd, reward_ctrl=reward_ctrl) + + def control_cost(self, action): + control_cost = self._ctrl_cost_weight * np.sum(np.square(action)) + return control_cost + + def step(self, action): + x_position_before = self.sim.data.qpos[0] + self.do_simulation(action, self.frame_skip) + x_position_after = self.sim.data.qpos[0] + + x_velocity = ((x_position_after - x_position_before) + / self.dt) + forward_reward = self._forward_reward_weight * x_velocity + + ctrl_cost = self.control_cost(action) + + xy_positions = self.sim.data.qpos[0:2] + + observation = self._get_obs() + reward = forward_reward - ctrl_cost + done = False + info = { + 'x_position': xy_positions[0], + 'y_position': xy_positions[1], + 'xy_position': np.linalg.norm(xy_positions, ord=2), + + 'x_velocity': x_velocity, + 'forward_reward': forward_reward, + } + + return observation, reward, done, info def _get_obs(self): - qpos = self.sim.data.qpos - qvel = self.sim.data.qvel - return np.concatenate([qpos.flat[2:], qvel.flat]) + position = self.sim.data.qpos.flat.copy() + velocity = self.sim.data.qvel.flat.copy() + + if self._exclude_current_positions_from_observation: + position = position[2:] + + observation = np.concatenate([position, velocity]).ravel() + return observation def reset_model(self): - self.set_state( - self.init_qpos + self.np_random.uniform(low=-.1, high=.1, size=self.model.nq), - self.init_qvel + self.np_random.uniform(low=-.1, high=.1, size=self.model.nv) - ) - return self._get_obs() + noise_low = -self._reset_noise_scale + noise_high = self._reset_noise_scale + + qpos = self.init_qpos + self.np_random.uniform( + low=noise_low, high=noise_high, size=self.model.nq) + qvel = self.init_qvel + self.np_random.uniform( + low=noise_low, high=noise_high, size=self.model.nv) + + self.set_state(qpos, qvel) + + observation = self._get_obs() + return observation diff --git a/gym/envs/mujoco/walker2d.py b/gym/envs/mujoco/walker2d.py index 805f2dd591a..a7f949c7609 100644 --- a/gym/envs/mujoco/walker2d.py +++ b/gym/envs/mujoco/walker2d.py @@ -1,40 +1,125 @@ import numpy as np -from gym import utils from gym.envs.mujoco import mujoco_env +from gym import utils + + +DEFAULT_CAMERA_CONFIG = { + 'trackbodyid': 2, + 'distance': 4.0, + 'lookat': np.array((0.0, 0.0, 1.15)), + 'elevation': -20.0, +} + class Walker2dEnv(mujoco_env.MujocoEnv, utils.EzPickle): + def __init__(self, + forward_reward_weight=1.0, + ctrl_cost_weight=1e-3, + healthy_reward=1.0, + terminate_when_unhealthy=True, + healthy_z_range=(0.8, 2.0), + healthy_angle_range=(-1.0, 1.0), + reset_noise_scale=5e-3, + exclude_current_positions_from_observation=True): + utils.EzPickle.__init__(**locals()) + + self._forward_reward_weight = forward_reward_weight + self._ctrl_cost_weight = ctrl_cost_weight + + self._healthy_reward = healthy_reward + self._terminate_when_unhealthy = terminate_when_unhealthy + + self._healthy_z_range = healthy_z_range + self._healthy_angle_range = healthy_angle_range + + self._reset_noise_scale = reset_noise_scale + + self._exclude_current_positions_from_observation = ( + exclude_current_positions_from_observation) - def __init__(self): mujoco_env.MujocoEnv.__init__(self, "walker2d.xml", 4) - utils.EzPickle.__init__(self) - - def step(self, a): - posbefore = self.sim.data.qpos[0] - self.do_simulation(a, self.frame_skip) - posafter, height, ang = self.sim.data.qpos[0:3] - alive_bonus = 1.0 - reward = ((posafter - posbefore) / self.dt) - reward += alive_bonus - reward -= 1e-3 * np.square(a).sum() - done = not (height > 0.8 and height < 2.0 and - ang > -1.0 and ang < 1.0) - ob = self._get_obs() - return ob, reward, done, {} + + @property + def healthy_reward(self): + return float( + self.is_healthy + or self._terminate_when_unhealthy + ) * self._healthy_reward + + def control_cost(self, action): + control_cost = self._ctrl_cost_weight * np.sum(np.square(action)) + return control_cost + + @property + def is_healthy(self): + z, angle = self.sim.data.qpos[1:3] + + min_z, max_z = self._healthy_z_range + min_angle, max_angle = self._healthy_angle_range + + healthy_z = min_z < z < max_z + healthy_angle = min_angle < angle < max_angle + is_healthy = healthy_z and healthy_angle + + return is_healthy + + @property + def done(self): + done = (not self.is_healthy + if self._terminate_when_unhealthy + else False) + return done def _get_obs(self): - qpos = self.sim.data.qpos - qvel = self.sim.data.qvel - return np.concatenate([qpos[1:], np.clip(qvel, -10, 10)]).ravel() + position = self.sim.data.qpos.flat.copy() + velocity = np.clip( + self.sim.data.qvel.flat.copy(), -10, 10) + + if self._exclude_current_positions_from_observation: + position = position[1:] + + observation = np.concatenate((position, velocity)).ravel() + return observation + + def step(self, action): + x_position_before = self.sim.data.qpos[0] + self.do_simulation(action, self.frame_skip) + x_position_after = self.sim.data.qpos[0] + x_velocity = ((x_position_after - x_position_before) + / self.dt) + + ctrl_cost = self.control_cost(action) + + forward_reward = self._forward_reward_weight * x_velocity + healthy_reward = self.healthy_reward + + rewards = forward_reward + healthy_reward + costs = ctrl_cost + + observation = self._get_obs() + reward = rewards - costs + done = self.done + info = { + 'x_position': x_position_after, + 'x_velocity': x_velocity, + } + + return observation, reward, done, info def reset_model(self): - self.set_state( - self.init_qpos + self.np_random.uniform(low=-.005, high=.005, size=self.model.nq), - self.init_qvel + self.np_random.uniform(low=-.005, high=.005, size=self.model.nv) - ) - return self._get_obs() + noise_low = -self._reset_noise_scale + noise_high = self._reset_noise_scale + + qpos = self.init_qpos + self.np_random.uniform( + low=noise_low, high=noise_high, size=self.model.nq) + qvel = self.init_qvel + self.np_random.uniform( + low=noise_low, high=noise_high, size=self.model.nv) + + self.set_state(qpos, qvel) + + observation = self._get_obs() + return observation def viewer_setup(self): - self.viewer.cam.trackbodyid = 2 - self.viewer.cam.distance = self.model.stat.extent * 0.5 - self.viewer.cam.lookat[2] = 1.15 - self.viewer.cam.elevation = -20 + for key, value in DEFAULT_CAMERA_CONFIG.items(): + setattr(self.viewer.cam, key, value)