Skip to content

Commit

Permalink
dev Tworr env, openai#3
Browse files Browse the repository at this point in the history
  • Loading branch information
mahdinobar committed Mar 14, 2024
1 parent bf27a1e commit 886ce69
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 52 deletions.
91 changes: 46 additions & 45 deletions spinup/backup_gym/gym/envs/classic_control/tworr.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
__credits__ = ["Mahdi Nobar"]
__author__ = "Mahdi Nobar ETH Zurich <[email protected]>"


class TworrEnv(core.Env):
# TODO
"""
Two-link planar arm with two revolut joints (based on simplified models at book "A Mathematical Introduction to
Robotic Manipulation" by Murry et al.
Expand All @@ -19,17 +19,18 @@ class TworrEnv(core.Env):
'render.modes': ['human', 'rgb_array'],
'video.frames_per_second' : 15
}
dt = 0.01 # [s]
LINK_LENGTH_1 = 1. # [m]
LINK_LENGTH_2 = 1. # [m]
LINK_MASS_1 = 1. #: [kg] mass of link 1
LINK_MASS_2 = 1. #: [kg] mass of link 2
LINK_COM_POS_1 = 0.5 #: [m] position of the center of mass of link 1
LINK_COM_POS_2 = 0.5 #: [m] position of the center of mass of link 2
torque_noise_max = 0. #TODO
MAX_TIMESTEPS=100 #maximum timesteps per episode

def __init__(self):
self.dt = 0.01 # [s]
self.LINK_LENGTH_1 = 1. # [m]
self.LINK_LENGTH_2 = 1. # [m]
self.LINK_MASS_1 = 1. #: [kg] mass of link 1
self.LINK_MASS_2 = 1. #: [kg] mass of link 2
self.LINK_COM_POS_1 = 0.5 #: [m] position of the center of mass of link 1
self.LINK_COM_POS_2 = 0.5 #: [m] position of the center of mass of link 2
self.torque_noise_max = 0. # TODO
self.MAX_TIMESTEPS = 100 # maximum timesteps per episode

self.viewer = None
# states=[xt, yt, th1, th2, dth1, dth2, th1_hat, th2_hat, dth1_hat, dth2_hat]
high_s = np.array([1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0])
Expand All @@ -42,22 +43,22 @@ def __init__(self):
self.state_buffer= None
self.t = 0
self.seed()
self.xd = np.linspace(1.2,1.3, MAX_TIMESTEPS, endpoint=True)
self.yd = np.linspace(1, 1.8, MAX_TIMESTEPS, endpoint=True)
self.xd = np.linspace(1.2,1.3, self.MAX_TIMESTEPS, endpoint=True)
self.yd = np.linspace(1, 1.8, self.MAX_TIMESTEPS, endpoint=True)

def two_link_forward_kinematics(self,q):
"""Compute the forward kinematics. Returns the base-coordinate Cartesian
position of End-effector for a given joint angle vector. Optional
parameters LINK_LENGTH_1 and LINK_LENGTH_2 are the link lengths. The base is assumed to be at the
parameters self.LINK_LENGTH_1 and self.LINK_LENGTH_2 are the link lengths. The base is assumed to be at the
origin.
:param q: two-element list or ndarray with [q1, q2] joint angles
:param LINK_LENGTH_1: length of link 1
:param LINK_LENGTH_2: length of link 2
:param self.LINK_LENGTH_1: length of link 1
:param self.LINK_LENGTH_2: length of link 2
:return: two-element ndarrays with [x,y] locations of End-Effector
"""
x = LINK_LENGTH_1 * np.cos(q[0]) + LINK_LENGTH_2 * np.cos(q[0] + q[1])
y = LINK_LENGTH_1 * np.sin(q[0]) + LINK_LENGTH_2 * np.sin(q[0] + q[1])
x = self.LINK_LENGTH_1 * np.cos(q[0]) + self.LINK_LENGTH_2 * np.cos(q[0] + q[1])
y = self.LINK_LENGTH_1 * np.sin(q[0]) + self.LINK_LENGTH_2 * np.sin(q[0] + q[1])
return x, y

def two_link_inverse_kinematics(self,x, y):
Expand All @@ -67,16 +68,16 @@ def two_link_inverse_kinematics(self,x, y):
If the target is out of reach, returns the closest pose.
:param target: two-element list or ndarray with [x1, y] target position
:param LINK_LENGTH_1: optional proximal link length
:param LINK_LENGTH_2: optional distal link length
:param self.LINK_LENGTH_1: optional proximal link length
:param self.LINK_LENGTH_2: optional distal link length
:return: tuple (solution1, solution2) of two-element ndarrays with q1, q2 angles
"""
# find the position of the point in polar coordinates
r = np.sqrt(x ** 2 + y ** 2)
# phi is the angle of target point w.r.t. -Y axis, same origin as arm
phi = np.arctan2(y, x)
alpha = np.arccos((LINK_LENGTH_1 ** 2 + LINK_LENGTH_2 ** 2 - r ** 2) / (2 * LINK_LENGTH_1 * LINK_LENGTH_2))
beta = np.arccos((r ** 2 + LINK_LENGTH_1 ** 2 - LINK_LENGTH_2 ** 2) / (2 * LINK_LENGTH_1 * r))
alpha = np.arccos((self.LINK_LENGTH_1 ** 2 + self.LINK_LENGTH_2 ** 2 - r ** 2) / (2 * self.LINK_LENGTH_1 * self.LINK_LENGTH_2))
beta = np.arccos((r ** 2 + self.LINK_LENGTH_1 ** 2 - self.LINK_LENGTH_2 ** 2) / (2 * self.LINK_LENGTH_1 * r))
soln1 = np.array((phi - beta, np.pi - alpha))
soln2 = np.array((phi + beta, np.pi + alpha))
return soln1, soln2
Expand All @@ -88,35 +89,35 @@ def two_link_inverse_dynamics(self, th, dth, ddth):
:param ddth:
:return: tau1, tau2
"""
Iz1 = LINK_MASS_1 * LINK_LENGTH_1 ** 2 / 12
Iz2 = LINK_MASS_2 * LINK_LENGTH_2 ** 2 / 12
alpha = Iz1 + Iz2 + LINK_MASS_1 * LINK_COM_POS_1 ** 2 + LINK_MASS_2 * (LINK_LENGTH_1 ** 2 + LINK_LENGTH_2 ** 2)
beta = LINK_MASS_2 * LINK_LENGTH_1 * LINK_COM_POS_2
sigma = Iz2 + LINK_MASS_2 * LINK_COM_POS_2 ** 2
Iz1 = self.LINK_MASS_1 * self.LINK_LENGTH_1 ** 2 / 12
Iz2 = self.LINK_MASS_2 * self.LINK_LENGTH_2 ** 2 / 12
alpha = Iz1 + Iz2 + self.LINK_MASS_1 * self.LINK_COM_POS_1 ** 2 + self.LINK_MASS_2 * (self.LINK_LENGTH_1 ** 2 + self.LINK_LENGTH_2 ** 2)
beta = self.LINK_MASS_2 * self.LINK_LENGTH_1 * self.LINK_COM_POS_2
sigma = Iz2 + self.LINK_MASS_2 * self.LINK_COM_POS_2 ** 2
tau1 = (alpha + 2 * beta * np.cos(th[1])) * ddth[0] + (sigma + beta * np.cos(th[1])) * ddth[1] + (
-beta * np.sin(th[1]) * dth[1]) * dth[1] + (-beta * np.sin(th[1]) * (dth[0] + dth[1]) * dth[1])
tau2 = (sigma + beta * np.cos(th[1])) * ddth[0] + sigma * ddth[1] + (beta * np.sin(th[1]) * dth[0] * dth[0])
return tau1, tau2

def two_link_forward_dynamics(self, tau1, tau2, s_init, LINK_MASS_1=1, LINK_MASS_2=1, LINK_LENGTH_1=1, LINK_LENGTH_2=1):
def two_link_forward_dynamics(self, tau1, tau2, s_init, self.LINK_MASS_1=1, self.LINK_MASS_2=1, self.LINK_LENGTH_1=1, self.LINK_LENGTH_2=1):
"""Compute two inverse dynamics solutions for a target end position.
:param s_init: [th1_init,dth1_init,th2_init,dth2_init]
:param th:
:param dth:
:param ddth:
:param LINK_MASS_1:
:param LINK_MASS_2:
:param LINK_LENGTH_1:
:param LINK_LENGTH_2:
:param self.LINK_MASS_1:
:param self.LINK_MASS_2:
:param self.LINK_LENGTH_1:
:param self.LINK_LENGTH_2:
:return: th1, dth1, th2, dth2
"""
# Define derivative function
def f(t, s):
Iz1 = LINK_MASS_1 * LINK_LENGTH_1 ** 2 / 12
Iz2 = LINK_MASS_2 * LINK_LENGTH_2 ** 2 / 12
alpha = Iz1 + Iz2 + LINK_MASS_1 * LINK_COM_POS_1 ** 2 + LINK_MASS_2 * (LINK_LENGTH_1 ** 2 + LINK_LENGTH_2 ** 2)
beta = LINK_MASS_2 * LINK_LENGTH_1 * LINK_COM_POS_2
sigma = Iz2 + LINK_MASS_2 * LINK_COM_POS_2 ** 2
Iz1 = self.LINK_MASS_1 * self.LINK_LENGTH_1 ** 2 / 12
Iz2 = self.LINK_MASS_2 * self.LINK_LENGTH_2 ** 2 / 12
alpha = Iz1 + Iz2 + self.LINK_MASS_1 * self.LINK_COM_POS_1 ** 2 + self.LINK_MASS_2 * (self.LINK_LENGTH_1 ** 2 + self.LINK_LENGTH_2 ** 2)
beta = self.LINK_MASS_2 * self.LINK_LENGTH_1 * self.LINK_COM_POS_2
sigma = Iz2 + self.LINK_MASS_2 * self.LINK_COM_POS_2 ** 2
d11 = alpha + 2 * beta * np.cos(s[2])
d21 = sigma + beta * np.cos(s[2])
d12 = d21
Expand Down Expand Up @@ -156,15 +157,15 @@ def step(self, a):
s_tm1 = self.state[-2, :] #states at t-1
# TOODO
# Add noise to the force action
if self.torque_noise_max > 0:
torque += self.np_random.uniform(-self.torque_noise_max, self.torque_noise_max)
if self.self.torque_noise_max > 0:
torque += self.np_random.uniform(-self.self.torque_noise_max, self.self.torque_noise_max)
# choose a sample target desired position to test IK
xd = self.xd[self.t]
yd = self.yd[self.t]
q_d = self.two_link_inverse_kinematics(xd, yd)[0] #attention: only return and use first solution of IK
# TODO correct for observation:
dqd_t = np.array([(q_d[0]-s_tm1[6])/dt, (q_d[1]-s_tm1[7])/dt])
ddqd_t = np.array([(dqd_t[0]-s_tm1[8])/dt, (dqd_t[1]-s_tm1[9])/dt])
dqd_t = np.array([(q_d[0]-s_tm1[6])/self.dt, (q_d[1]-s_tm1[7])/self.dt])
ddqd_t = np.array([(dqd_t[0]-s_tm1[8])/self.dt, (dqd_t[1]-s_tm1[9])/self.dt])
tau1_hat, tau2_hat = two_link_inverse_dynamics(q_d, dqd_t, ddqd_t)
s_init = [s[2], s[4], s[3], s[5]]
q_FD = self.two_link_forward_dynamics(tau1_hat+a[0], tau2_hat+a[1], s_init)
Expand All @@ -188,7 +189,7 @@ def _get_ob(self): #TODO is state=observation a reasonable assumption?
return s

def _terminal(self):
return bool(self.t >= MAX_TIMESTEPS)
return bool(self.t >= self.MAX_TIMESTEPS)

def render(self, mode='human'):
from gym.envs.classic_control import rendering
Expand All @@ -197,16 +198,16 @@ def render(self, mode='human'):

if self.viewer is None:
self.viewer = rendering.Viewer(500,500)
bound = self.LINK_LENGTH_1 + self.LINK_LENGTH_2 + 0.2 # 2.2 for default
bound = self.self.LINK_LENGTH_1 + self.self.LINK_LENGTH_2 + 0.2 # 2.2 for default
self.viewer.set_bounds(-bound,bound,-bound,bound)

if s is None: return None
p1 = [LINK_LENGTH_1 * np.cos(s[2]), LINK_LENGTH_1 * np.sin(s[2])]
p2 = [p1[0] + LINK_LENGTH_2 * np.cos(s[2] + s[3]), p1[1] + LINK_LENGTH_2 * np.sin(s[2] + q[3])]
p1 = [self.LINK_LENGTH_1 * np.cos(s[2]), self.LINK_LENGTH_1 * np.sin(s[2])]
p2 = [p1[0] + self.LINK_LENGTH_2 * np.cos(s[2] + s[3]), p1[1] + self.LINK_LENGTH_2 * np.sin(s[2] + q[3])]

xys = np.array([[0,0], p1, p2])[:,::-1]
thetas = [s[2], s[3]] # TODO check compatible with rendering
link_lengths = [self.LINK_LENGTH_1, self.LINK_LENGTH_2]
link_lengths = [self.self.LINK_LENGTH_1, self.self.LINK_LENGTH_2]

self.viewer.draw_line((-2.2, 1), (2.2, 1))
for ((x,y),th,llen) in zip(xys, thetas, link_lengths):
Expand Down
14 changes: 7 additions & 7 deletions spinup/examples/pytorch/inverted_pendulum_SAC.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,23 @@
from spinup.utils import plot
import os

TRAIN=False
DEBUG=True
env_fn = lambda: gym.make('Pendulum-v0')
TRAIN=True
DEBUG=False
env_fn = lambda: gym.make('Tworr-v0')
exp_name = "Tworrv0_0"
if __name__ == '__main__':
if TRAIN:
# train
exp_name="logs"
output_dir='/home/mahdi/ETHZ/codes/spinningup/spinup/examples/pytorch/'+exp_name
output_dir='/home/mahdi/ETHZ/codes/spinningup/spinup/examples/pytorch/logs/'+exp_name
if not os.path.exists(output_dir):
os.makedirs(output_dir)
logger_kwargs = dict(output_dir=output_dir, exp_name=exp_name)
sac(env_fn, ac_kwargs={}, seed=0, steps_per_epoch=1000, epochs=2, replay_size=100000, gamma=0.99, polyak=0.995, lr=0.001, alpha=0.2, batch_size=100, start_steps=1000, update_after=200, update_every=50, num_test_episodes=10, max_ep_len=1000, logger_kwargs=logger_kwargs, save_freq=1)
sac(env_fn, ac_kwargs={}, seed=0, steps_per_epoch=500, epochs=2, replay_size=10000, gamma=0.99, polyak=0.995, lr=0.001, alpha=0.2, batch_size=100, start_steps=1000, update_after=200, update_every=50, num_test_episodes=10, max_ep_len=100, logger_kwargs=logger_kwargs, save_freq=1)
if DEBUG:
env_fn = gym.make('Pendulum-v0')
# from gym.wrappers.monitoring.video_recorder import VideoRecorder
# VideoRecorder(env_fn,'/home/mahdi/ETHZ/codes/spinningup/spinup/examples/pytorch/logs/video.mp4', enabled=True)
# visualize output
_, get_action = load_policy_and_env('/home/mahdi/ETHZ/codes/spinningup/spinup/examples/pytorch/logs')
_, get_action = load_policy_and_env('/home/mahdi/ETHZ/codes/spinningup/spinup/examples/pytorch/logs/'+exp_name)
run_policy(env_fn, get_action,num_episodes=3)

0 comments on commit 886ce69

Please sign in to comment.