Skip to content

Commit

Permalink
Refactor gym envs to support dynamic arguments
Browse files Browse the repository at this point in the history
  • Loading branch information
hartikainen committed Feb 9, 2019
1 parent 07e0c98 commit b2a439f
Show file tree
Hide file tree
Showing 6 changed files with 621 additions and 162 deletions.
149 changes: 118 additions & 31 deletions gym/envs/mujoco/ant.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,131 @@
from gym import utils
from gym.envs.mujoco import mujoco_env


DEFAULT_CAMERA_CONFIG = {
'distance': 4.0,
}


class AntEnv(mujoco_env.MujocoEnv, utils.EzPickle):
def __init__(self):
def __init__(self,
ctrl_cost_weight=0.5,
contact_cost_weight=5e-4,
healthy_reward=1.0,
terminate_when_unhealthy=True,
healthy_z_range=(0.2, 1.0),
contact_force_range=(-1.0, 1.0),
reset_noise_scale=0.1,
exclude_current_positions_from_observation=True):
utils.EzPickle.__init__(**locals())

self._ctrl_cost_weight = ctrl_cost_weight
self._contact_cost_weight = contact_cost_weight

self._healthy_reward = healthy_reward
self._terminate_when_unhealthy = terminate_when_unhealthy
self._healthy_z_range = healthy_z_range

self._contact_force_range = contact_force_range

self._reset_noise_scale = reset_noise_scale

self._exclude_current_positions_from_observation = (
exclude_current_positions_from_observation)

mujoco_env.MujocoEnv.__init__(self, 'ant.xml', 5)
utils.EzPickle.__init__(self)

def step(self, a):
xposbefore = self.get_body_com("torso")[0]
self.do_simulation(a, self.frame_skip)
xposafter = self.get_body_com("torso")[0]
forward_reward = (xposafter - xposbefore)/self.dt
ctrl_cost = .5 * np.square(a).sum()
contact_cost = 0.5 * 1e-3 * np.sum(
np.square(np.clip(self.sim.data.cfrc_ext, -1, 1)))
survive_reward = 1.0
reward = forward_reward - ctrl_cost - contact_cost + survive_reward

@property
def healthy_reward(self):
return float(
self.is_healthy
or self._terminate_when_unhealthy
) * self._healthy_reward

def control_cost(self, action):
control_cost = self._ctrl_cost_weight * np.sum(np.square(action))
return control_cost

@property
def contact_forces(self):
raw_contact_forces = self.sim.data.cfrc_ext
min_value, max_value = self._contact_force_range
contact_forces = np.clip(raw_contact_forces, min_value, max_value)
return contact_forces

@property
def contact_cost(self):
contact_cost = self._contact_cost_weight * np.sum(
np.square(self.contact_forces))
return contact_cost

@property
def is_healthy(self):
state = self.state_vector()
notdone = np.isfinite(state).all() \
and state[2] >= 0.2 and state[2] <= 1.0
done = not notdone
ob = self._get_obs()
return ob, reward, done, dict(
reward_forward=forward_reward,
reward_ctrl=-ctrl_cost,
reward_contact=-contact_cost,
reward_survive=survive_reward)
min_z, max_z = self._healthy_z_range
is_healthy = (np.isfinite(state).all() and min_z <= state[2] <= max_z)
return is_healthy

@property
def done(self):
done = (not self.is_healthy
if self._terminate_when_unhealthy
else False)
return done

def step(self, action):
x_position_before = self.get_body_com("torso")[0]
self.do_simulation(action, self.frame_skip)
x_position_after = self.get_body_com("torso")[0]
x_velocity = (x_position_after - x_position_before) / self.dt

ctrl_cost = self.control_cost(action)
contact_cost = self.contact_cost

forward_reward = x_velocity
healthy_reward = self.healthy_reward

rewards = forward_reward + healthy_reward
costs = ctrl_cost + contact_cost

reward = rewards - costs
done = self.done
observation = self._get_obs()
info = {
'reward_forward': forward_reward,
'reward_ctrl': -ctrl_cost,
'reward_contact': -contact_cost,
'reward_survive': healthy_reward,
}

return observation, reward, done, info

def _get_obs(self):
return np.concatenate([
self.sim.data.qpos.flat[2:],
self.sim.data.qvel.flat,
np.clip(self.sim.data.cfrc_ext, -1, 1).flat,
])
position = self.sim.data.qpos.flat.copy()
velocity = self.sim.data.qvel.flat.copy()
contact_force = self.contact_forces.flat.copy()

if self._exclude_current_positions_from_observation:
position = position[2:]

observations = np.concatenate((position, velocity, contact_force))

return observations

def reset_model(self):
qpos = self.init_qpos + self.np_random.uniform(size=self.model.nq, low=-.1, high=.1)
qvel = self.init_qvel + self.np_random.randn(self.model.nv) * .1
noise_low = -self._reset_noise_scale
noise_high = self._reset_noise_scale

qpos = self.init_qpos + self.np_random.uniform(
low=noise_low, high=noise_high, size=self.model.nq)
qvel = self.init_qvel + self._reset_noise_scale * self.np_random.randn(
self.model.nv)
self.set_state(qpos, qvel)
return self._get_obs()

observation = self._get_obs()

return observation

def viewer_setup(self):
self.viewer.cam.distance = self.model.stat.extent * 0.5
for key, value in DEFAULT_CAMERA_CONFIG.items():
setattr(self.viewer.cam, key, value)
84 changes: 67 additions & 17 deletions gym/envs/mujoco/half_cheetah.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,83 @@
from gym import utils
from gym.envs.mujoco import mujoco_env


DEFAULT_CAMERA_CONFIG = {
'distance': 4.0,
}


class HalfCheetahEnv(mujoco_env.MujocoEnv, utils.EzPickle):
def __init__(self):
def __init__(self,
forward_reward_weight=1.0,
ctrl_cost_weight=0.1,
reset_noise_scale=0.1,
exclude_current_positions_from_observation=True):
utils.EzPickle.__init__(**locals())

self._forward_reward_weight = forward_reward_weight

self._ctrl_cost_weight = ctrl_cost_weight

self._reset_noise_scale = reset_noise_scale

self._exclude_current_positions_from_observation = (
exclude_current_positions_from_observation)

mujoco_env.MujocoEnv.__init__(self, 'half_cheetah.xml', 5)
utils.EzPickle.__init__(self)

def control_cost(self, action):
control_cost = self._ctrl_cost_weight * np.sum(np.square(action))
return control_cost

def step(self, action):
xposbefore = self.sim.data.qpos[0]
x_position_before = self.sim.data.qpos[0]
self.do_simulation(action, self.frame_skip)
xposafter = self.sim.data.qpos[0]
ob = self._get_obs()
reward_ctrl = - 0.1 * np.square(action).sum()
reward_run = (xposafter - xposbefore)/self.dt
reward = reward_ctrl + reward_run
x_position_after = self.sim.data.qpos[0]
x_velocity = ((x_position_after - x_position_before)
/ self.dt)

ctrl_cost = self.control_cost(action)

forward_reward = self._forward_reward_weight * x_velocity

observation = self._get_obs()
reward = forward_reward - ctrl_cost
done = False
return ob, reward, done, dict(reward_run=reward_run, reward_ctrl=reward_ctrl)
info = {
'x_position': x_position_after,
'x_velocity': x_velocity,

'reward_run': forward_reward,
'reward_ctrl': -ctrl_cost
}

return observation, reward, done, info

def _get_obs(self):
return np.concatenate([
self.sim.data.qpos.flat[1:],
self.sim.data.qvel.flat,
])
position = self.sim.data.qpos.flat.copy()
velocity = self.sim.data.qvel.flat.copy()

if self._exclude_current_positions_from_observation:
position = position[1:]

observation = np.concatenate((position, velocity)).ravel()
return observation

def reset_model(self):
qpos = self.init_qpos + self.np_random.uniform(low=-.1, high=.1, size=self.model.nq)
qvel = self.init_qvel + self.np_random.randn(self.model.nv) * .1
noise_low = -self._reset_noise_scale
noise_high = self._reset_noise_scale

qpos = self.init_qpos + self.np_random.uniform(
low=noise_low, high=noise_high, size=self.model.nq)
qvel = self.init_qvel + self._reset_noise_scale * self.np_random.randn(
self.model.nv)

self.set_state(qpos, qvel)
return self._get_obs()

observation = self._get_obs()
return observation

def viewer_setup(self):
self.viewer.cam.distance = self.model.stat.extent * 0.5
for key, value in DEFAULT_CAMERA_CONFIG.items():
setattr(self.viewer.cam, key, value)
Loading

0 comments on commit b2a439f

Please sign in to comment.