diff --git a/spinup/backup_gym/gym/envs/classic_control/tworr.py b/spinup/backup_gym/gym/envs/classic_control/tworr.py index 65e14aee2..fc936981d 100644 --- a/spinup/backup_gym/gym/envs/classic_control/tworr.py +++ b/spinup/backup_gym/gym/envs/classic_control/tworr.py @@ -21,32 +21,41 @@ class TworrEnv(core.Env): } def __init__(self): - self.dt = 0.01 # [s] self.LINK_LENGTH_1 = 1. # [m] self.LINK_LENGTH_2 = 1. # [m] self.LINK_MASS_1 = 1. #: [kg] mass of link 1 - self.LINK_MASS_2 = 1. #: [kg] mass of link 2 - self.LINK_COM_POS_1 = 0.5 #: [m] position of the center of mass of link 1 - self.LINK_COM_POS_2 = 0.5 #: [m] position of the center of mass of link 2 + self.LINK_MASS_2 = 1. #: [kg] imperfect mass of link 2 + self.LINK_MASS_2_TRUE = 1.05 #: [kg] TRUE mass of link 2 + self.LINK_COM_POS_1 = 0.5 #r1: [m] distance of the center of mass of link 1 wrt joint + self.LINK_COM_POS_2 = 0.5 #r2: [m] distance of the center of mass of link 2 wrt joint self.torque_noise_max = 0. # TODO - self.MAX_TIMESTEPS = 100 # maximum timesteps per episode self.viewer = None self.state = None self.state_buffer= None self.t = 0 self.seed() - self.xd_init=1.2 - self.yd_init = 1.0 - self.xd = np.linspace(self.xd_init, self.xd_init+0.1, self.MAX_TIMESTEPS, endpoint=True) - self.yd = np.linspace(self.yd_init, self.yd_init+0.8, self.MAX_TIMESTEPS, endpoint=True) - # states=(xd,yd, q1,q2, dq1,dq2, qd1,qd2, dqd1,dqd2, tau1_hat,tau2_hat) - high_s = np.array([self.xd_init+1, self.yd_init+1, 1.4, 1.4, 0.2, 0.2, 1.4, 1.4, 0.2, 0.2, 1, 1]) - low_s = np.array([self.xd_init-1, self.yd_init-1, -1.4, -1.4, -0.2, -0.2, -1.4, -1.4, -0.2, -0.2, -1, -1]) + + self.xd_init = 1.5 + self.yd_init = 0.4 + self.dt = 1 / 10 # sec + self.MAX_TIMESTEPS = 100 # maximum timesteps per episode + self.vxd = -0.01 # m/s + self.vyd = 0.05 # m/s + deltax = vxd * dt * MAX_TIMESTEPS + deltay = vyd * dt * MAX_TIMESTEPS + self.xd = np.linspace(self.xd_init, self.xd_init + deltax, MAX_TIMESTEPS + 1, endpoint=True) + self.yd = np.linspace(self.yd_init, self.yd_init + deltay, MAX_TIMESTEPS + 1, endpoint=True) + + # TODO CHECK + high_s = np.array([self.xd_init+1, self.yd_init+1, 1.4, 1.4, 0.2, 0.2, 1.4, 1.4, 0.2, 0.2, 1, 1]) + low_s = np.array([self.xd_init-1, self.yd_init-1, -1.4, -1.4, -0.2, -0.2, -1.4, -1.4, -0.2, -0.2, -1, -1]) self.observation_space = spaces.Box(low=low_s, high=high_s, dtype=np.float32) high_a = np.array([0.1, 0.1]) low_a = np.array([-0.1, -0.1]) self.action_space = spaces.Box(low=low_a, high=high_a, dtype=np.float32) + + def two_link_forward_kinematics(self,q): """Compute the forward kinematics. Returns the base-coordinate Cartesian position of End-effector for a given joint angle vector. Optional @@ -112,18 +121,18 @@ def two_link_forward_dynamics(self, tau1, tau2, s_init): :param dth: :param ddth: :param self.LINK_MASS_1: - :param self.LINK_MASS_2: + :param self.LINK_MASS_2_TRUE: :param self.LINK_LENGTH_1: :param self.LINK_LENGTH_2: - :return: th1, dth1, th2, dth2 + :return: th1, dth1, th2, dth2 (second column) """ # Define derivative function def f(t, s): Iz1 = self.LINK_MASS_1 * self.LINK_LENGTH_1 ** 2 / 12 - Iz2 = self.LINK_MASS_2 * self.LINK_LENGTH_2 ** 2 / 12 - alpha = Iz1 + Iz2 + self.LINK_MASS_1 * self.LINK_COM_POS_1 ** 2 + self.LINK_MASS_2 * (self.LINK_LENGTH_1 ** 2 + self.LINK_LENGTH_2 ** 2) - beta = self.LINK_MASS_2 * self.LINK_LENGTH_1 * self.LINK_COM_POS_2 - sigma = Iz2 + self.LINK_MASS_2 * self.LINK_COM_POS_2 ** 2 + Iz2 = self.LINK_MASS_2_TRUE * self.LINK_LENGTH_2 ** 2 / 12 + alpha = Iz1 + Iz2 + self.LINK_MASS_1 * self.LINK_COM_POS_1 ** 2 + self.LINK_MASS_2_TRUE * (self.LINK_LENGTH_1 ** 2 + self.LINK_LENGTH_2 ** 2) + beta = self.LINK_MASS_2_TRUE * self.LINK_LENGTH_1 * self.LINK_COM_POS_2 + sigma = Iz2 + self.LINK_MASS_2_TRUE * self.LINK_COM_POS_2 ** 2 d11 = alpha + 2 * beta * np.cos(s[2]) d21 = sigma + beta * np.cos(s[2]) d12 = d21 @@ -141,79 +150,233 @@ def f(t, s): # Solve differential equation sol = solve_ivp(lambda t, s: f(t, s), [tspan[0], tspan[-1]], s_init, t_eval=tspan, rtol=1e-5) - return sol.y + return sol.y[:,1] + def two_link_jacobian(self, q, ld=0.1): + J=np.array([[-self.LINK_LENGTH_1*np.sin(q[0])-self.LINK_LENGTH_2*np.sin(q[0]+q[1]), -self.LINK_LENGTH_2*np.sin(q[0]+q[1])], + [self.LINK_LENGTH_1*np.cos(q[0])+self.LINK_LENGTH_2*np.cos(q[0]+q[1]), +self.LINK_LENGTH_2*np.cos(q[0]+q[1])]]) + def pseudoInverseMat(A, ld): + # Input: Any m-by-n matrix, and a damping factor. + # Output: An n-by-m pseudo-inverse of the input according to the Moore-Penrose formula + + # Get the number of rows (m) and columns (n) of A + [m, n] = np.shape(A) + + # Compute the pseudo inverse for both left and right cases + if (m > n): + # Compute the left pseudoinverse. + pinvA = np.linalg.lstsq((A.T * A + ld * ld * np.eye(n, n)), A.T)[0] + elif (m <= n): + # Compute the right pseudoinverse. + pinvA = np.linalg.lstsq((np.matmul(A, A.T) + ld * ld * np.eye(m, m)).T, A)[0].T + return pinvA + return pseudoInverseMat(J,ld) + + def q_command(r_ee, v_ee, Jpinv, rd, vd, e, dt): + """ + PID Traj Tracking Feedback Controller + Inputs: + r_ee : current end effector position + rd : desired end effector position + vd : desired end effector velocity + Jpinv : current pseudo inverse jacobian matrix + Output: joint-space velocity command of the robot. + """ + # TODO: User defined linear position gain + K_p = 40 + K_i = 10 + K_d = 10 + # TODO: User defined pseudo-inverse damping coefficient + # ld = 0.1 + # Jpinv=two_link_jacobian(q_hat_soln1, ld) + e_t = (rd - r_ee) + e = np.vstack((e, e_t.reshape(1, 2))) + v_command = vd + K_p * e_t + K_i * np.sum(e[1:, :], 0) * dt + K_d * (vd - v_ee) + qc = np.dot(Jpinv, v_command) + return qc, e def seed(self, seed=None): self.np_random, seed = seeding.np_random(seed) return [seed] def reset(self): - # The reset method should return a tuple of the initial observation and some auxiliary information. - # states=(xd,yd,q1,q2,dq1,dq2,qd1,qd2,dqd1,dqd2,tau1_hat,tau2_hat) - self.state = self.np_random.uniform(low=-0.1, high=0.1, size=(12,)) #TODO correct initialization - - xd_init_noisy= self.xd_init + np.random.normal(loc=0.0, scale=0.002, size=None) - yd_init_noisy= self.yd_init + np.random.normal(loc=0.0, scale=0.002, size=None) - q_init_noisy= self.two_link_inverse_kinematics(xd_init_noisy, yd_init_noisy)[0] + [np.random.normal(loc=0.0, scale=0.01, size=None), np.random.normal(loc=0.0, scale=0.01, size=None)] - dq_init=[0,0] - qd_init_noisy=self.two_link_inverse_kinematics(xd_init_noisy, yd_init_noisy)[0] - dqd_init=[0,0] - ddqd_init = [0,0] - tau1_hat_init_noisy, tau2_hat_init_noisy = self.two_link_inverse_dynamics(qd_init_noisy, dqd_init, ddqd_init) - # reset to initial states - self.state = [xd_init_noisy, - yd_init_noisy, - q_init_noisy[0], - q_init_noisy[1], - dq_init[0], - dq_init[1], - qd_init_noisy[0], - qd_init_noisy[1], - dqd_init[0], - dqd_init[1], - tau1_hat_init_noisy, - tau2_hat_init_noisy] - self.state_buffer=self.state #initialize episode state buffer + # initialize at t=-1 + rd_minus1 = np.array([xd[0], yd[0]]) + v_minus1 = np.array([vxd, vyd]) + ree_minus1 = rd_minus1 + q_hat_soln1, q_hat_soln2 = two_link_inverse_kinematics(rd_minus1[0], rd_minus1[1], l1, l2) + q_minus1 = q_hat_soln1 + e_minus1 = rd_minus1 - ree_minus1 + dq_t_minus1 = two_link_inverse_kinematics_joint_speeds(v_minus1[0], v_minus1[1], q_minus1[0], q_minus1[1], + l1=1.0, l2=1.0) + # starting from t=-1 + self.e = e_minus1.reshape(1, 2) + self.q = q_minus1.reshape(1, 2) + self.dq = dq_t_minus1.reshape(1, 2) + self.qc = np.zeros(2).reshape(1, 2) # attention on presumptions on qc + self.dqc = np.zeros(2).reshape(1, 2) + self.ddqc = np.zeros(2).reshape(1, 2) + self.r_hat = rd_minus1.reshape(1, 2) # attention: trivial assumption(?) + self.t = 0 + + rd_t = np.array([xd[self.t + 1], yd[self.t + 1]]) # attention: index desired starts from t=-1 + vd = np.array([vxd, vyd]) + # at time t=0 + q_t=self.q[-1,:] + dq_t = self.dq[-1, :] + r_hat_t = self.two_link_forward_kinematics(q_t, l1, l2) + Jpinv_t, J_t = two_link_jacobian(q_t, ld=0.01) + v_hat_t = (r_hat_t - self.r_hat[-1, :]) / self.dt + self.r_hat=np.vstack((self.r_hat,r_hat_t)) + dqc_t, e=self.q_command(r_ee=r_hat_t, v_ee=v_hat_t, Jpinv=Jpinv_t, rd=rd_t, vd=vd, e=e, dt=dt) + qc_t = dqc_t*self.dt+self.q[-2,:] #TODO is this observer(taking q(t-1) for integration) sufficient? + ddqc_t = (dqc_t-self.dq[-2,:])/self.dt + + self.qc = np.vstack((self.qc, qc_t)) + self.dqc = np.vstack((self.dqc, dqc_t)) + self.ddqc = np.vstack((self.ddqc, ddqc_t)) + + tau1_hat, tau2_hat = self.two_link_inverse_dynamics(q_t, dqc_t, ddqc_t) + + self.q = np.vstack((q, q_t)) + self.dq = np.vstack((dq, dq_t)) + + self.state = [r_hat_t[0]-rd_t[0], + r_hat_t[1]-rd_t[1], + q_t[0], + q_t[1], + dq_t[0], + dq_t[1], + tau1_hat, + tau2_hat] + self.state_buffer = self.state + + # + # + # + # + # # The reset method should return a tuple of the initial observation and some auxiliary information. + # # states=(xd,yd,q1,q2,dq1,dq2,qd1,qd2,dqd1,dqd2,tau1_hat,tau2_hat) + # self.state = self.np_random.uniform(low=-0.1, high=0.1, size=(12,)) #TODO correct initialization + # xd_init_noisy= self.xd_init + np.random.normal(loc=0.0, scale=0.002, size=None) + # yd_init_noisy= self.yd_init + np.random.normal(loc=0.0, scale=0.002, size=None) + # q_init_noisy= self.two_link_inverse_kinematics(xd_init_noisy, yd_init_noisy)[0] + [np.random.normal(loc=0.0, scale=0.01, size=None), np.random.normal(loc=0.0, scale=0.01, size=None)] + # dq_init=[0,0] + # qd_init_noisy=self.two_link_inverse_kinematics(xd_init_noisy, yd_init_noisy)[0] + # dqd_init=[0,0] + # ddqd_init = [0,0] + # tau1_hat_init_noisy, tau2_hat_init_noisy = self.two_link_inverse_dynamics(qd_init_noisy, dqd_init, ddqd_init) + # # reset to initial states + # self.state = [xd_init_noisy, + # yd_init_noisy, + # q_init_noisy[0], + # q_init_noisy[1], + # dq_init[0], + # dq_init[1], + # qd_init_noisy[0], + # qd_init_noisy[1], + # dqd_init[0], + # dqd_init[1], + # tau1_hat_init_noisy, + # tau2_hat_init_noisy] + # self.state_buffer=self.state #initialize episode state buffer return self._get_ob() def step(self,a): - if self.t==0: - s_t = self.state_buffer[:] # states at t - s_tm1 = s_t # TODO check. states at t-1 - else: - s_t = self.state_buffer[-1, :] # states at t - s_tm1 = self.state_buffer[-2, :] #states at t-1 - # TOODO - # Add noise to the force action - if self.torque_noise_max > 0: - torque += self.np_random.uniform(-self.torque_noise_max, self.torque_noise_max) - # choose a sample target desired position to test IK - xd = self.xd[self.t] - yd = self.yd[self.t] - qd = self.two_link_inverse_kinematics(xd, yd)[0] #attention: only return and use first solution of IK - # TODO correct for observation: - dqd_t = np.array([(qd[0]-s_tm1[6])/self.dt, (qd[1]-s_tm1[7])/self.dt]) - ddqd_t = np.array([(dqd_t[0]-s_tm1[8])/self.dt, (dqd_t[1]-s_tm1[9])/self.dt]) - tau1_hat, tau2_hat = self.two_link_inverse_dynamics(qd, dqd_t, ddqd_t) - s_init = [s_t[2], s_t[4], s_t[3], s_t[5]] #TODO check concept - q_FD = self.two_link_forward_dynamics(tau1_hat+a[0], tau2_hat+a[1], s_init) + # update time index + self.t += 1 #Attention doublecheck + rd_t = np.array([xd[self.t + 1], yd[self.t + 1]]) # attention: index desired starts from t=-1 + vd = np.array([vxd, vyd]) + q_t = self.q[-1, :] + dq_t = self.dq[-1, :] + r_hat_t = self.two_link_forward_kinematics(q_t, l1, l2) + Jpinv_t, J_t = self.two_link_jacobian(q_t, ld=0.01) + v_hat_t = (r_hat_t - r_hat[-1, :]) / self.dt + self.r_hat = np.vstack((self.r_hat, r_hat_t)) + dqc_t, e = self.q_command(r_ee=r_hat_t, v_ee=v_hat_t, Jpinv=Jpinv_t, rd=rd_t, vd=vd, e=e, dt=dt) + qc_t = dqc_t * dt + self.q[-2, :] # TODO is this observer(taking q(t-1) for integration) sufficient? + ddqc_t = (dqc_t - self.dq[-2, :]) / self.dt + + self.qc = np.vstack((self.qc, qc_t)) + self.dqc = np.vstack((self.dqc, dqc_t)) + self.ddqc = np.vstack((self.ddqc, ddqc_t)) + + tau1_hat, tau2_hat = self.two_link_inverse_dynamics(q_t, dqc_t, ddqc_t) + + # s_init=[th1_init,dth1_init,th2_init,dth2_init] + s_init = np.array([q_t[0], dq_t[0], q_t[1], dq_t[1]]) + t = time.time() + # TODO HERE WHY TAKES LONG?? + q_FD = self.two_link_forward_dynamics(tau1_hat+a[0], tau2_hat+a[1], + s_init) # attention: forward dynamics robot has correct m2 value + print("FD elapsed time=", time.time() - t) + self.q = np.vstack((self.q, np.array([q_FD[0], q_FD[2]]))) + self.dq = np.vstack((self.dq, np.array([q_FD[1], q_FD[3]]))) + # collect observations - obs=np.array([xd,yd,q_FD[0,1],q_FD[2,1],q_FD[1,1],q_FD[3,1],qd[0],qd[1],dqd_t[0],dqd_t[1],tau1_hat,tau2_hat]) #TODO + obs = np.array([r_hat_t[0]-rd_t[0], + r_hat_t[1]-rd_t[1], + q_t[0], + q_t[1], + dq_t[0], + dq_t[1], + tau1_hat, + tau2_hat]) # update states - self.state=obs - self.state_buffer=np.vstack((self.state_buffer,self.state)) - # update time index - self.t += 1 + self.state = obs + self.state_buffer = np.vstack((self.state_buffer, self.state)) # check done episode terminal = self._terminal() # calculate reward - x_FK, y_FK = self.two_link_forward_kinematics(np.array([q_FD[0,1],q_FD[2,1]])) - reward = 1. if np.sqrt((x_FK-xd)**2+(y_FK-yd)**2)<0.001 else 0. + x_FK, y_FK = self.two_link_forward_kinematics(np.array([q_FD[0], q_FD[2]])) + reward = 1. if np.sqrt((x_FK - xd) ** 2 + (y_FK - yd) ** 2) < 0.01 else 0. # given action it returns 4-tuple (observation, reward, done, info) return (self._get_ob(), reward, terminal, {}) + + + # + # + # if self.t==0: + # s_t = self.state_buffer[:] # states at t + # s_tm1 = s_t # TODO check. states at t-1 + # else: + # s_t = self.state_buffer[-1, :] # states at t + # s_tm1 = self.state_buffer[-2, :] #states at t-1 + # # TOODO + # # Add noise to the force action + # if self.torque_noise_max > 0: + # torque += self.np_random.uniform(-self.torque_noise_max, self.torque_noise_max) + # # choose a sample target desired position to test IK + # xd = self.xd[self.t] + # yd = self.yd[self.t] + # qd = self.two_link_inverse_kinematics(xd, yd)[0] #attention: only return and use first solution of IK + # + # self.two_link_jacobian(self.q) + # # TODO correct for observation: + # dqd_t = np.array([(qd[0]-s_tm1[6])/self.dt, (qd[1]-s_tm1[7])/self.dt]) + # ddqd_t = np.array([(dqd_t[0]-s_tm1[8])/self.dt, (dqd_t[1]-s_tm1[9])/self.dt]) + # tau1_hat, tau2_hat = self.two_link_inverse_dynamics(qd, dqd_t, ddqd_t) + # s_init = [s_t[2], s_t[4], s_t[3], s_t[5]] #TODO check concept + # q_FD = self.two_link_forward_dynamics(tau1_hat+a[0], tau2_hat+a[1], s_init) + # + # # collect observations + # obs=np.array([xd,yd,q_FD[0,1],q_FD[2,1],q_FD[1,1],q_FD[3,1],qd[0],qd[1],dqd_t[0],dqd_t[1],tau1_hat,tau2_hat]) #TODO + # # update states + # self.state=obs + # self.state_buffer=np.vstack((self.state_buffer,self.state)) + # # update time index + # self.t += 1 + # # check done episode + # terminal = self._terminal() + # + # # calculate reward + # x_FK, y_FK = self.two_link_forward_kinematics(np.array([q_FD[0,1],q_FD[2,1]])) + # reward = 1. if np.sqrt((x_FK-xd)**2+(y_FK-yd)**2)<0.001 else 0. + # # given action it returns 4-tuple (observation, reward, done, info) + # return (self._get_ob(), reward, terminal, {}) + def _get_ob(self): #TODO is state=observation a reasonable assumption? s = self.state return s