Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Online DMP Trajectory Generation #231

Merged
merged 4 commits into from
Apr 28, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
from nav_msgs.msg import Path
import tf
from ros_dmp.roll_dmp import RollDmp
import pydmps

import yaml


class DMPExecutor(object):
Expand Down Expand Up @@ -53,7 +56,7 @@ def move_to(self, goal):
as encoded by self.dmp_name.

Keyword arguments:
goal: np.array -- end effector goal position in the base link frame
goal: numpy.ndarray -- end effector goal position in the base link frame

'''
initial_pos = None
Expand All @@ -69,11 +72,7 @@ def move_to(self, goal):
except (tf.LookupException, tf.ConnectivityException, tf.ExtrapolationException):
initial_pos = np.zeros(3)

path = self.generate_trajectory(initial_pos, goal)
path_odom = np.array([self.transform_pose(position, self.base_link_frame_name, self.odom_frame_name)
for position in path])
self.publish_path(path_odom)
self.follow_path(path_odom)
self.follow_path(initial_pos, goal)

def generate_trajectory(self, initial_pos, goal):
'''Returns a 2D numpy array representing a path of [x, y, z] points
Expand All @@ -82,8 +81,8 @@ def generate_trajectory(self, initial_pos, goal):
on the path.

Keyword arguments:
initial_pos: np.array -- initial position for the motion
goal: np.array -- goal position
initial_pos: numpy.ndarray -- initial position for the motion
goal: numpy.ndarray -- goal position

'''
initial_pose = np.array([initial_pos[0], initial_pos[1], initial_pos[2], 0., 0., 0.])
Expand All @@ -102,7 +101,7 @@ def generate_trajectory(self, initial_pos, goal):
for state in cartesian_trajectory.cartesian_state])
return path

def follow_path(self, path):
def follow_path(self, initial_pos, goal):
'''Moves a manipulator so that it follows the given path. If whole body
motion is enabled and some points on the path lie outside the reachable
workspace, the base is moved accordingly as well. The path is followed
Expand Down Expand Up @@ -133,18 +132,20 @@ def follow_path(self, path):
minimum sigma value (determined experimentally)

Keyword arguments:
path: np.array -- a 2D array of points (each row represents a point)
path: numpy.ndarray -- a 2D array of points (each row represents a point)

'''
# sanity check - we only consider the path if it contains any points
if path is None or path.shape[0] == 0:
rospy.logerr('[move_arm/dmp/follow_path] No points in path; aborting execution')
return

rospy.loginfo('[move_arm/dmp/follow_path] Executing motion')
trans, _ = self.get_transform(self.odom_frame_name, self.palm_link_name, rospy.Time(0))
current_pos = np.array([trans[0], trans[1], trans[2]])
distance_to_goal = np.linalg.norm((path[-1] - current_pos))
distance_to_goal = np.linalg.norm((goal - current_pos))

initial_pose = np.array([initial_pos[0], initial_pos[1], initial_pos[2], 0., 0., 0.])
goal_pose = np.array([goal[0], goal[1], goal[2], 0., 0., 0.])

current_path = initial_pose[:3]

self.dmp = self.instantiate_dmp(initial_pose, goal_pose)

self.motion_completed = False
self.motion_cancelled = False
Expand All @@ -158,21 +159,16 @@ def follow_path(self, path):

# if the end effector has reached the goal (within the allowed
# tolerance threshold), we stop the motion
distance_to_goal = np.linalg.norm((path[-1] - current_pos))
distance_to_goal = np.linalg.norm((goal - current_pos))
if distance_to_goal <= self.goal_tolerance:
self.motion_completed = True
break

path_point_distances = [np.linalg.norm(p - current_pos) for p in path]
min_dist_idx = np.argmin(path_point_distances)
next_pose, _, _ = self.dmp.step(tau=self.tau)
next_pos = next_pose[:3]
current_path = np.vstack((current_path, next_pos))

next_goal_idx = -1
if min_dist_idx == path.shape[0] - 1:
next_goal_idx = min_dist_idx
else:
next_goal_idx = min_dist_idx + 1

vel = self.feedforward_gain * (path[next_goal_idx] - path[min_dist_idx]) + self.feedback_gain * (path[next_goal_idx] - current_pos)
vel = self.feedback_gain * (next_pos - current_pos)

# we limit the speed if it is above the allowed limit
velocity_norm = np.linalg.norm(vel)
Expand Down Expand Up @@ -219,6 +215,8 @@ def follow_path(self, path):
self.vel_publisher_arm.publish(twist_arm)
cmd_count += 1

self.publish_path(current_path)

# stop arm and base motion after converging
twist_arm = TwistStamped()
twist_arm.header.seq = cmd_count
Expand Down Expand Up @@ -278,7 +276,7 @@ def publish_path(self, path):
'''Publishes the given path to the topic specified by self.dmp_executor_path_topic.

Keyword arguments:
path: np.array -- a 2D array of points in which each row represents a position
path: numpy.ndarray -- a 2D array of points in which each row represents a position

'''
path_msg = Path()
Expand All @@ -301,3 +299,32 @@ def sigma_values_cb(self, msg):

'''
self.min_sigma_value = min(msg.data)

def instantiate_dmp(self, initial_pose, goal_pose):
'''Instantiates a DMP object from learned weights, given
the initial and final poses.

Keyword arguments:
initial_pose: numpy.ndarray -- initial pose coordinates of end-effector
goal_pose: numpy.ndarray -- target pose coordinates of end-effector
'''
with open(self.dmp_name) as f:
dmp_weights_dict = yaml.load(f)

n_dmps, n_bfs = len(dmp_weights_dict), len(dmp_weights_dict['x'])

dmp_weights = np.zeros((n_dmps, n_bfs))
dmp_weights[0, :] = dmp_weights_dict['x']
dmp_weights[1, :] = dmp_weights_dict['y']
dmp_weights[2, :] = dmp_weights_dict['z']
dmp_weights[3, :] = dmp_weights_dict['roll']
dmp_weights[4, :] = dmp_weights_dict['pitch']
dmp_weights[5, :] = dmp_weights_dict['yaw']

return pydmps.dmp_discrete.DMPs_discrete(n_dmps=n_dmps,
n_bfs=n_bfs,
dt=self.time_step,
y0=initial_pose,
goal=goal_pose,
ay=None,
w=dmp_weights)