Skip to content

Commit

Permalink
adding tworr with PID and SAC environment, dev, openai#1
Browse files Browse the repository at this point in the history
  • Loading branch information
mahdinobar committed Mar 22, 2024
1 parent 98838dc commit 6bdb437
Showing 1 changed file with 235 additions and 72 deletions.
307 changes: 235 additions & 72 deletions spinup/backup_gym/gym/envs/classic_control/tworr.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,41 @@ class TworrEnv(core.Env):
}

def __init__(self):
self.dt = 0.01 # [s]
self.LINK_LENGTH_1 = 1. # [m]
self.LINK_LENGTH_2 = 1. # [m]
self.LINK_MASS_1 = 1. #: [kg] mass of link 1
self.LINK_MASS_2 = 1. #: [kg] mass of link 2
self.LINK_COM_POS_1 = 0.5 #: [m] position of the center of mass of link 1
self.LINK_COM_POS_2 = 0.5 #: [m] position of the center of mass of link 2
self.LINK_MASS_2 = 1. #: [kg] imperfect mass of link 2
self.LINK_MASS_2_TRUE = 1.05 #: [kg] TRUE mass of link 2
self.LINK_COM_POS_1 = 0.5 #r1: [m] distance of the center of mass of link 1 wrt joint
self.LINK_COM_POS_2 = 0.5 #r2: [m] distance of the center of mass of link 2 wrt joint
self.torque_noise_max = 0. # TODO
self.MAX_TIMESTEPS = 100 # maximum timesteps per episode
self.viewer = None
self.state = None
self.state_buffer= None
self.t = 0
self.seed()
self.xd_init=1.2
self.yd_init = 1.0
self.xd = np.linspace(self.xd_init, self.xd_init+0.1, self.MAX_TIMESTEPS, endpoint=True)
self.yd = np.linspace(self.yd_init, self.yd_init+0.8, self.MAX_TIMESTEPS, endpoint=True)
# states=(xd,yd, q1,q2, dq1,dq2, qd1,qd2, dqd1,dqd2, tau1_hat,tau2_hat)
high_s = np.array([self.xd_init+1, self.yd_init+1, 1.4, 1.4, 0.2, 0.2, 1.4, 1.4, 0.2, 0.2, 1, 1])
low_s = np.array([self.xd_init-1, self.yd_init-1, -1.4, -1.4, -0.2, -0.2, -1.4, -1.4, -0.2, -0.2, -1, -1])

self.xd_init = 1.5
self.yd_init = 0.4
self.dt = 1 / 10 # sec
self.MAX_TIMESTEPS = 100 # maximum timesteps per episode
self.vxd = -0.01 # m/s
self.vyd = 0.05 # m/s
deltax = vxd * dt * MAX_TIMESTEPS
deltay = vyd * dt * MAX_TIMESTEPS
self.xd = np.linspace(self.xd_init, self.xd_init + deltax, MAX_TIMESTEPS + 1, endpoint=True)
self.yd = np.linspace(self.yd_init, self.yd_init + deltay, MAX_TIMESTEPS + 1, endpoint=True)

# TODO CHECK
high_s = np.array([self.xd_init+1, self.yd_init+1, 1.4, 1.4, 0.2, 0.2, 1.4, 1.4, 0.2, 0.2, 1, 1])
low_s = np.array([self.xd_init-1, self.yd_init-1, -1.4, -1.4, -0.2, -0.2, -1.4, -1.4, -0.2, -0.2, -1, -1])
self.observation_space = spaces.Box(low=low_s, high=high_s, dtype=np.float32)
high_a = np.array([0.1, 0.1])
low_a = np.array([-0.1, -0.1])
self.action_space = spaces.Box(low=low_a, high=high_a, dtype=np.float32)



def two_link_forward_kinematics(self,q):
"""Compute the forward kinematics. Returns the base-coordinate Cartesian
position of End-effector for a given joint angle vector. Optional
Expand Down Expand Up @@ -112,18 +121,18 @@ def two_link_forward_dynamics(self, tau1, tau2, s_init):
:param dth:
:param ddth:
:param self.LINK_MASS_1:
:param self.LINK_MASS_2:
:param self.LINK_MASS_2_TRUE:
:param self.LINK_LENGTH_1:
:param self.LINK_LENGTH_2:
:return: th1, dth1, th2, dth2
:return: th1, dth1, th2, dth2 (second column)
"""
# Define derivative function
def f(t, s):
Iz1 = self.LINK_MASS_1 * self.LINK_LENGTH_1 ** 2 / 12
Iz2 = self.LINK_MASS_2 * self.LINK_LENGTH_2 ** 2 / 12
alpha = Iz1 + Iz2 + self.LINK_MASS_1 * self.LINK_COM_POS_1 ** 2 + self.LINK_MASS_2 * (self.LINK_LENGTH_1 ** 2 + self.LINK_LENGTH_2 ** 2)
beta = self.LINK_MASS_2 * self.LINK_LENGTH_1 * self.LINK_COM_POS_2
sigma = Iz2 + self.LINK_MASS_2 * self.LINK_COM_POS_2 ** 2
Iz2 = self.LINK_MASS_2_TRUE * self.LINK_LENGTH_2 ** 2 / 12
alpha = Iz1 + Iz2 + self.LINK_MASS_1 * self.LINK_COM_POS_1 ** 2 + self.LINK_MASS_2_TRUE * (self.LINK_LENGTH_1 ** 2 + self.LINK_LENGTH_2 ** 2)
beta = self.LINK_MASS_2_TRUE * self.LINK_LENGTH_1 * self.LINK_COM_POS_2
sigma = Iz2 + self.LINK_MASS_2_TRUE * self.LINK_COM_POS_2 ** 2
d11 = alpha + 2 * beta * np.cos(s[2])
d21 = sigma + beta * np.cos(s[2])
d12 = d21
Expand All @@ -141,79 +150,233 @@ def f(t, s):
# Solve differential equation
sol = solve_ivp(lambda t, s: f(t, s),
[tspan[0], tspan[-1]], s_init, t_eval=tspan, rtol=1e-5)
return sol.y
return sol.y[:,1]
def two_link_jacobian(self, q, ld=0.1):
J=np.array([[-self.LINK_LENGTH_1*np.sin(q[0])-self.LINK_LENGTH_2*np.sin(q[0]+q[1]), -self.LINK_LENGTH_2*np.sin(q[0]+q[1])],
[self.LINK_LENGTH_1*np.cos(q[0])+self.LINK_LENGTH_2*np.cos(q[0]+q[1]), +self.LINK_LENGTH_2*np.cos(q[0]+q[1])]])
def pseudoInverseMat(A, ld):
# Input: Any m-by-n matrix, and a damping factor.
# Output: An n-by-m pseudo-inverse of the input according to the Moore-Penrose formula

# Get the number of rows (m) and columns (n) of A
[m, n] = np.shape(A)

# Compute the pseudo inverse for both left and right cases
if (m > n):
# Compute the left pseudoinverse.
pinvA = np.linalg.lstsq((A.T * A + ld * ld * np.eye(n, n)), A.T)[0]
elif (m <= n):
# Compute the right pseudoinverse.
pinvA = np.linalg.lstsq((np.matmul(A, A.T) + ld * ld * np.eye(m, m)).T, A)[0].T
return pinvA
return pseudoInverseMat(J,ld)

def q_command(r_ee, v_ee, Jpinv, rd, vd, e, dt):
"""
PID Traj Tracking Feedback Controller
Inputs:
r_ee : current end effector position
rd : desired end effector position
vd : desired end effector velocity
Jpinv : current pseudo inverse jacobian matrix
Output: joint-space velocity command of the robot.
"""
# TODO: User defined linear position gain
K_p = 40
K_i = 10
K_d = 10
# TODO: User defined pseudo-inverse damping coefficient
# ld = 0.1
# Jpinv=two_link_jacobian(q_hat_soln1, ld)
e_t = (rd - r_ee)
e = np.vstack((e, e_t.reshape(1, 2)))
v_command = vd + K_p * e_t + K_i * np.sum(e[1:, :], 0) * dt + K_d * (vd - v_ee)
qc = np.dot(Jpinv, v_command)
return qc, e

def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]

def reset(self):
# The reset method should return a tuple of the initial observation and some auxiliary information.
# states=(xd,yd,q1,q2,dq1,dq2,qd1,qd2,dqd1,dqd2,tau1_hat,tau2_hat)
self.state = self.np_random.uniform(low=-0.1, high=0.1, size=(12,)) #TODO correct initialization

xd_init_noisy= self.xd_init + np.random.normal(loc=0.0, scale=0.002, size=None)
yd_init_noisy= self.yd_init + np.random.normal(loc=0.0, scale=0.002, size=None)
q_init_noisy= self.two_link_inverse_kinematics(xd_init_noisy, yd_init_noisy)[0] + [np.random.normal(loc=0.0, scale=0.01, size=None), np.random.normal(loc=0.0, scale=0.01, size=None)]
dq_init=[0,0]
qd_init_noisy=self.two_link_inverse_kinematics(xd_init_noisy, yd_init_noisy)[0]
dqd_init=[0,0]
ddqd_init = [0,0]
tau1_hat_init_noisy, tau2_hat_init_noisy = self.two_link_inverse_dynamics(qd_init_noisy, dqd_init, ddqd_init)
# reset to initial states
self.state = [xd_init_noisy,
yd_init_noisy,
q_init_noisy[0],
q_init_noisy[1],
dq_init[0],
dq_init[1],
qd_init_noisy[0],
qd_init_noisy[1],
dqd_init[0],
dqd_init[1],
tau1_hat_init_noisy,
tau2_hat_init_noisy]
self.state_buffer=self.state #initialize episode state buffer
# initialize at t=-1
rd_minus1 = np.array([xd[0], yd[0]])
v_minus1 = np.array([vxd, vyd])
ree_minus1 = rd_minus1
q_hat_soln1, q_hat_soln2 = two_link_inverse_kinematics(rd_minus1[0], rd_minus1[1], l1, l2)
q_minus1 = q_hat_soln1
e_minus1 = rd_minus1 - ree_minus1
dq_t_minus1 = two_link_inverse_kinematics_joint_speeds(v_minus1[0], v_minus1[1], q_minus1[0], q_minus1[1],
l1=1.0, l2=1.0)
# starting from t=-1
self.e = e_minus1.reshape(1, 2)
self.q = q_minus1.reshape(1, 2)
self.dq = dq_t_minus1.reshape(1, 2)
self.qc = np.zeros(2).reshape(1, 2) # attention on presumptions on qc
self.dqc = np.zeros(2).reshape(1, 2)
self.ddqc = np.zeros(2).reshape(1, 2)
self.r_hat = rd_minus1.reshape(1, 2) # attention: trivial assumption(?)

self.t = 0

rd_t = np.array([xd[self.t + 1], yd[self.t + 1]]) # attention: index desired starts from t=-1
vd = np.array([vxd, vyd])
# at time t=0
q_t=self.q[-1,:]
dq_t = self.dq[-1, :]
r_hat_t = self.two_link_forward_kinematics(q_t, l1, l2)
Jpinv_t, J_t = two_link_jacobian(q_t, ld=0.01)
v_hat_t = (r_hat_t - self.r_hat[-1, :]) / self.dt
self.r_hat=np.vstack((self.r_hat,r_hat_t))
dqc_t, e=self.q_command(r_ee=r_hat_t, v_ee=v_hat_t, Jpinv=Jpinv_t, rd=rd_t, vd=vd, e=e, dt=dt)
qc_t = dqc_t*self.dt+self.q[-2,:] #TODO is this observer(taking q(t-1) for integration) sufficient?
ddqc_t = (dqc_t-self.dq[-2,:])/self.dt

self.qc = np.vstack((self.qc, qc_t))
self.dqc = np.vstack((self.dqc, dqc_t))
self.ddqc = np.vstack((self.ddqc, ddqc_t))

tau1_hat, tau2_hat = self.two_link_inverse_dynamics(q_t, dqc_t, ddqc_t)

self.q = np.vstack((q, q_t))
self.dq = np.vstack((dq, dq_t))

self.state = [r_hat_t[0]-rd_t[0],
r_hat_t[1]-rd_t[1],
q_t[0],
q_t[1],
dq_t[0],
dq_t[1],
tau1_hat,
tau2_hat]
self.state_buffer = self.state

#
#
#
#
# # The reset method should return a tuple of the initial observation and some auxiliary information.
# # states=(xd,yd,q1,q2,dq1,dq2,qd1,qd2,dqd1,dqd2,tau1_hat,tau2_hat)
# self.state = self.np_random.uniform(low=-0.1, high=0.1, size=(12,)) #TODO correct initialization
# xd_init_noisy= self.xd_init + np.random.normal(loc=0.0, scale=0.002, size=None)
# yd_init_noisy= self.yd_init + np.random.normal(loc=0.0, scale=0.002, size=None)
# q_init_noisy= self.two_link_inverse_kinematics(xd_init_noisy, yd_init_noisy)[0] + [np.random.normal(loc=0.0, scale=0.01, size=None), np.random.normal(loc=0.0, scale=0.01, size=None)]
# dq_init=[0,0]
# qd_init_noisy=self.two_link_inverse_kinematics(xd_init_noisy, yd_init_noisy)[0]
# dqd_init=[0,0]
# ddqd_init = [0,0]
# tau1_hat_init_noisy, tau2_hat_init_noisy = self.two_link_inverse_dynamics(qd_init_noisy, dqd_init, ddqd_init)
# # reset to initial states
# self.state = [xd_init_noisy,
# yd_init_noisy,
# q_init_noisy[0],
# q_init_noisy[1],
# dq_init[0],
# dq_init[1],
# qd_init_noisy[0],
# qd_init_noisy[1],
# dqd_init[0],
# dqd_init[1],
# tau1_hat_init_noisy,
# tau2_hat_init_noisy]
# self.state_buffer=self.state #initialize episode state buffer
return self._get_ob()

def step(self,a):
if self.t==0:
s_t = self.state_buffer[:] # states at t
s_tm1 = s_t # TODO check. states at t-1
else:
s_t = self.state_buffer[-1, :] # states at t
s_tm1 = self.state_buffer[-2, :] #states at t-1
# TOODO
# Add noise to the force action
if self.torque_noise_max > 0:
torque += self.np_random.uniform(-self.torque_noise_max, self.torque_noise_max)
# choose a sample target desired position to test IK
xd = self.xd[self.t]
yd = self.yd[self.t]
qd = self.two_link_inverse_kinematics(xd, yd)[0] #attention: only return and use first solution of IK
# TODO correct for observation:
dqd_t = np.array([(qd[0]-s_tm1[6])/self.dt, (qd[1]-s_tm1[7])/self.dt])
ddqd_t = np.array([(dqd_t[0]-s_tm1[8])/self.dt, (dqd_t[1]-s_tm1[9])/self.dt])
tau1_hat, tau2_hat = self.two_link_inverse_dynamics(qd, dqd_t, ddqd_t)
s_init = [s_t[2], s_t[4], s_t[3], s_t[5]] #TODO check concept
q_FD = self.two_link_forward_dynamics(tau1_hat+a[0], tau2_hat+a[1], s_init)
# update time index
self.t += 1 #Attention doublecheck
rd_t = np.array([xd[self.t + 1], yd[self.t + 1]]) # attention: index desired starts from t=-1
vd = np.array([vxd, vyd])
q_t = self.q[-1, :]
dq_t = self.dq[-1, :]
r_hat_t = self.two_link_forward_kinematics(q_t, l1, l2)
Jpinv_t, J_t = self.two_link_jacobian(q_t, ld=0.01)
v_hat_t = (r_hat_t - r_hat[-1, :]) / self.dt
self.r_hat = np.vstack((self.r_hat, r_hat_t))
dqc_t, e = self.q_command(r_ee=r_hat_t, v_ee=v_hat_t, Jpinv=Jpinv_t, rd=rd_t, vd=vd, e=e, dt=dt)
qc_t = dqc_t * dt + self.q[-2, :] # TODO is this observer(taking q(t-1) for integration) sufficient?
ddqc_t = (dqc_t - self.dq[-2, :]) / self.dt

self.qc = np.vstack((self.qc, qc_t))
self.dqc = np.vstack((self.dqc, dqc_t))
self.ddqc = np.vstack((self.ddqc, ddqc_t))

tau1_hat, tau2_hat = self.two_link_inverse_dynamics(q_t, dqc_t, ddqc_t)

# s_init=[th1_init,dth1_init,th2_init,dth2_init]
s_init = np.array([q_t[0], dq_t[0], q_t[1], dq_t[1]])
t = time.time()
# TODO HERE WHY TAKES LONG??
q_FD = self.two_link_forward_dynamics(tau1_hat+a[0], tau2_hat+a[1],
s_init) # attention: forward dynamics robot has correct m2 value
print("FD elapsed time=", time.time() - t)
self.q = np.vstack((self.q, np.array([q_FD[0], q_FD[2]])))
self.dq = np.vstack((self.dq, np.array([q_FD[1], q_FD[3]])))

# collect observations
obs=np.array([xd,yd,q_FD[0,1],q_FD[2,1],q_FD[1,1],q_FD[3,1],qd[0],qd[1],dqd_t[0],dqd_t[1],tau1_hat,tau2_hat]) #TODO
obs = np.array([r_hat_t[0]-rd_t[0],
r_hat_t[1]-rd_t[1],
q_t[0],
q_t[1],
dq_t[0],
dq_t[1],
tau1_hat,
tau2_hat])
# update states
self.state=obs
self.state_buffer=np.vstack((self.state_buffer,self.state))
# update time index
self.t += 1
self.state = obs
self.state_buffer = np.vstack((self.state_buffer, self.state))
# check done episode
terminal = self._terminal()

# calculate reward
x_FK, y_FK = self.two_link_forward_kinematics(np.array([q_FD[0,1],q_FD[2,1]]))
reward = 1. if np.sqrt((x_FK-xd)**2+(y_FK-yd)**2)<0.001 else 0.
x_FK, y_FK = self.two_link_forward_kinematics(np.array([q_FD[0], q_FD[2]]))
reward = 1. if np.sqrt((x_FK - xd) ** 2 + (y_FK - yd) ** 2) < 0.01 else 0.
# given action it returns 4-tuple (observation, reward, done, info)
return (self._get_ob(), reward, terminal, {})



#
#
# if self.t==0:
# s_t = self.state_buffer[:] # states at t
# s_tm1 = s_t # TODO check. states at t-1
# else:
# s_t = self.state_buffer[-1, :] # states at t
# s_tm1 = self.state_buffer[-2, :] #states at t-1
# # TOODO
# # Add noise to the force action
# if self.torque_noise_max > 0:
# torque += self.np_random.uniform(-self.torque_noise_max, self.torque_noise_max)
# # choose a sample target desired position to test IK
# xd = self.xd[self.t]
# yd = self.yd[self.t]
# qd = self.two_link_inverse_kinematics(xd, yd)[0] #attention: only return and use first solution of IK
#
# self.two_link_jacobian(self.q)
# # TODO correct for observation:
# dqd_t = np.array([(qd[0]-s_tm1[6])/self.dt, (qd[1]-s_tm1[7])/self.dt])
# ddqd_t = np.array([(dqd_t[0]-s_tm1[8])/self.dt, (dqd_t[1]-s_tm1[9])/self.dt])
# tau1_hat, tau2_hat = self.two_link_inverse_dynamics(qd, dqd_t, ddqd_t)
# s_init = [s_t[2], s_t[4], s_t[3], s_t[5]] #TODO check concept
# q_FD = self.two_link_forward_dynamics(tau1_hat+a[0], tau2_hat+a[1], s_init)
#
# # collect observations
# obs=np.array([xd,yd,q_FD[0,1],q_FD[2,1],q_FD[1,1],q_FD[3,1],qd[0],qd[1],dqd_t[0],dqd_t[1],tau1_hat,tau2_hat]) #TODO
# # update states
# self.state=obs
# self.state_buffer=np.vstack((self.state_buffer,self.state))
# # update time index
# self.t += 1
# # check done episode
# terminal = self._terminal()
#
# # calculate reward
# x_FK, y_FK = self.two_link_forward_kinematics(np.array([q_FD[0,1],q_FD[2,1]]))
# reward = 1. if np.sqrt((x_FK-xd)**2+(y_FK-yd)**2)<0.001 else 0.
# # given action it returns 4-tuple (observation, reward, done, info)
# return (self._get_ob(), reward, terminal, {})

def _get_ob(self): #TODO is state=observation a reasonable assumption?
s = self.state
return s
Expand Down

0 comments on commit 6bdb437

Please sign in to comment.