Skip to content

Commit

Permalink
WIP for Matthew
Browse files Browse the repository at this point in the history
  • Loading branch information
Mokaz committed Dec 7, 2024
1 parent 9a5065e commit 3599364
Show file tree
Hide file tree
Showing 15 changed files with 578 additions and 470 deletions.
26 changes: 19 additions & 7 deletions TidySpot.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from grasping.GraspSelector import GraspSelector
from grasping.PointCloudCropper import PointCloudCropper

from manipulation.meshcat_utils import AddMeshcatTriad

import os
import sys
import torch
Expand All @@ -49,10 +51,10 @@ def run_TidySpot(args):
device = args.device
scenario_path = args.scenario

# use_anygrasp = True
# use_grounded_sam = True
# device = "cuda"
# scenario_path = "objects/simple_cracker_box_detection_test.yaml"
use_anygrasp = True
use_grounded_sam = True
device = "cuda"
scenario_path = "objects/simple_cracker_box_infront_grasp.yaml"

try:
### Start the visualizer ###
Expand Down Expand Up @@ -128,7 +130,7 @@ def run_TidySpot(args):

# Add IK controller for to solve arm positions for grasping
spot_plant = station.GetSubsystemByName("spot.controller").get_multibody_plant_for_control()
spot_arm_ik_controller = builder.AddSystem(SpotArmIKController(spot_plant))
spot_arm_ik_controller = builder.AddSystem(SpotArmIKController(plant, spot_plant, meshcat=meshcat))
spot_arm_ik_controller.set_name("spot_arm_ik_controller")
spot_arm_ik_controller.connect_components(builder, station, navigator, grasp_selector)

Expand All @@ -142,7 +144,7 @@ def run_TidySpot(args):
# Add Finite State Machine = TidySpotFSM
tidy_spot_planner = builder.AddSystem(TidySpotFSM(plant, bin_location))
tidy_spot_planner.set_name("tidy_spot_fsm")
tidy_spot_planner.connect_components(builder, spot_arm_ik_controller, point_cloud_mapper, navigator, station)
tidy_spot_planner.connect_components(builder, object_detector, spot_arm_ik_controller, point_cloud_mapper, navigator, station)

# Last component, add state interpolator which converts desired state to desired state and velocity
state_interpolator = builder.AddSystem(StateInterpolatorWithDiscreteDerivative(10, 0.1, suppress_initial_transient=True))
Expand Down Expand Up @@ -208,13 +210,23 @@ def PrintStates(context):
print(f"Goal: {goal[:3]}")
print("---")

def visualize_controller_poses(context):
gripper_pose = plant.EvalBodyPoseInWorld(plant_context, plant.GetBodyByName("arm_link_fngr"))
AddMeshcatTriad(meshcat, "arm_link_fngr", length=0.1, radius=0.006, X_PT=gripper_pose)
if spot_arm_ik_controller.prepick_pose is not None:
AddMeshcatTriad(meshcat, "prepick_pose", length=0.1, radius=0.006, X_PT=spot_arm_ik_controller.prepick_pose)
if spot_arm_ik_controller.desired_gripper_pose is not None:
AddMeshcatTriad(meshcat, "desired_gripper_pose", length=0.1, radius=0.006, X_PT=spot_arm_ik_controller.desired_gripper_pose)



# simulator.set_monitor(PrintStates)
simulator.set_monitor(visualize_controller_poses)

meshcat.Flush() # Wait for the large object meshes to get to meshcat.

meshcat.StartRecording()
simulator.AdvanceTo(60.0)
simulator.AdvanceTo(5)

################
### TESTZONE ###
Expand Down
48 changes: 26 additions & 22 deletions TidySpotFSM.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@ class TidySpotFSM(LeafSystem):
def __init__(self, plant, bin_location=[0, 0, 0]):
LeafSystem.__init__(self)

# Spot's internal planning states
# Internal states
self._state_index = self.DeclareAbstractState(AbstractValue.Make(SpotState.IDLE))
self.navigator_state_commanded = self.DeclareDiscreteState(1)

self._times_index = self.DeclareAbstractState(
AbstractValue.Make({"initial": 0.0})
)
self._times_index = self.DeclareAbstractState(AbstractValue.Make({"initial": 0.0}))
self._attempts_index = self.DeclareDiscreteState(1)

# Internal variables
self.bin_location = bin_location
self.path_planning_goal = (0, 0, 0) # Goal for path planning, if self.path_planning_goal[2] == None then Navigator will autogenerate final heading

# Input ports
self._spot_body_state_index = self.DeclareVectorInputPort("body_poses", 20).get_index()
self._path_planning_desired_index = self.DeclareVectorInputPort("path_planning_desired", 3).get_index()
Expand All @@ -46,40 +49,38 @@ def __init__(self, plant, bin_location=[0, 0, 0]):

# Output ports
# Declare output ports for the path planner. The path planner then sends to the actual robot
self.navigator_state_commanded = self.DeclareDiscreteState(1)
self.DeclareStateOutputPort("fsm_state", self._state_index)
self.DeclareStateOutputPort("navigator_state_commanded", self.navigator_state_commanded)
self._path_planning_goal_output = self.DeclareVectorOutputPort("path_planning_goal", 3, self.SetPathPlanningGoal).get_index()
self._path_planning_position_output = self.DeclareVectorOutputPort("path_planning_position", 3, self.SetPathPlanningCurrentPosition).get_index()

# Output ports for various components
self.grasp_requested = self.DeclareDiscreteState(1) # Use a state of 1 to represent a boolean
self.DeclareStateOutputPort("grasp_requested", self.grasp_requested)
self.do_arm_controller_mission = self.DeclareDiscreteState(1) # Use a state of 1 to represent a boolean
self.DeclareStateOutputPort("do_arm_controller_mission", self.do_arm_controller_mission)

self.DeclareInitializationUnrestrictedUpdateEvent(self._initialize_state)
self.DeclarePeriodicUnrestrictedUpdateEvent(0.1, 0.0, self.Update)

# Declare internal variables
self.bin_location = bin_location
self.path_planning_goal = (0, 0, 0) # Goal for path planning, if self.path_planning_goal[2] == None then Navigator will autogenerate final heading


def connect_components(self, builder, grasper, point_cloud_mapper, navigator, station):
def connect_components(self, builder, object_detector, spot_arm_ik_controller, point_cloud_mapper, navigator, station):
builder.Connect(station.GetOutputPort("spot.state_estimated"), self.get_input_port(self._spot_body_state_index))

# Connect the Navigator to the TidySpotFSM
# Connect the FSM to ObjectDetector
builder.Connect(self.GetOutputPort("fsm_state"), object_detector.GetInputPort("fsm_state"))

# Connect the Navigator to the FSM
builder.Connect(self.get_output_port(self._path_planning_goal_output), navigator.GetInputPort("goal"))
builder.Connect(self.get_output_port(self._path_planning_position_output), navigator.GetInputPort("current_position"))
builder.Connect(navigator.GetOutputPort("navigation_complete"), self.GetInputPort("navigation_complete"))
builder.Connect(self.GetOutputPort("navigator_state_commanded"), navigator.GetInputPort("navigator_state"))

# Connect the detection dict to the FSM planner
# Connect the Mapper's detected objects dictionary to the FSM
builder.Connect(point_cloud_mapper.GetOutputPort("object_clusters"), self._object_clusters_input) # TODO: Check that output name is correct
# connect the mappers frontier to the FSM
builder.Connect(point_cloud_mapper.GetOutputPort("frontier"), self.GetInputPort("frontier"))

# Connect the grasper to the FSM planner
builder.Connect(self.GetOutputPort("grasp_requested"), grasper.GetInputPort("do_grasp"))
builder.Connect(grasper.GetOutputPort("done_grasp"), self.GetInputPort("grasp_complete"))
# Connect the grasper to the FSM
builder.Connect(self.GetOutputPort("do_arm_controller_mission"), spot_arm_ik_controller.GetInputPort("do_arm_controller_mission"))
builder.Connect(spot_arm_ik_controller.GetOutputPort("done_grasp"), self.GetInputPort("grasp_complete"))

def get_spot_state_input_port(self):
return self.GetInputPort("body_poses")
Expand All @@ -94,9 +95,9 @@ def _get_navigation_completed(self, context, state):
return navigation_complete and navigator_state_commanded

def _get_grasping_completed(self, context, state):
grasp_requested = state.get_mutable_discrete_state(self.grasp_requested).get_value()[0]
do_arm_controller_mission = state.get_mutable_discrete_state(self.do_arm_controller_mission).get_value()[0]
grasp_complete = self.GetInputPort("grasp_complete").Eval(context)[0]
return grasp_complete and grasp_requested
return grasp_complete and do_arm_controller_mission

def Update(self, context, state):
current_state = context.get_abstract_state(int(self._state_index)).get_value()
Expand Down Expand Up @@ -156,7 +157,7 @@ def Update(self, context, state):
int(self._state_index)
).set_value(SpotState.GRASP_OBJECT)
# Send the grasp request to the grasper
state.get_mutable_discrete_state(self.grasp_requested).set_value([1])
state.get_mutable_discrete_state(self.do_arm_controller_mission).set_value([1])
print("Grasp requested.")
else:
# print("Approaching object at ", self.current_object_location)
Expand All @@ -175,8 +176,9 @@ def Update(self, context, state):
int(self._state_index)
).set_value(SpotState.TRANSPORT_OBJECT)
else:
print("Currently grasping object")
# print("Currently grasping object")
# Assume there is no failure state here
pass

elif current_state == SpotState.TRANSPORT_OBJECT:
if self._get_navigation_completed(context, state):
Expand Down Expand Up @@ -238,6 +240,8 @@ def set_new_random_exploration_goal(self, context):
# clip the goal to slightly inside the bounds
new_goal = (np.clip(new_goal[0], -4.5, 4.5), np.clip(new_goal[1], -4.5, 4.5))



print(f"Exploring environment, new exploration goal: {new_goal}")

self.path_planning_goal = (new_goal[0], new_goal[1], None)
Expand Down
193 changes: 193 additions & 0 deletions controller/InverseKinematics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
import numpy as np
from pydrake.all import (
Context,
InverseKinematics,
MultibodyPlant,
RigidTransform,
RotationMatrix,
Solve,
eq,
)

q_nominal_arm = np.array([0.0, -3.1, 3.1, 0.0, 0.0, 0.0, 0.0])

def solve_ik(
plant: MultibodyPlant,
context: Context,
X_WT: RigidTransform,
# target_frame_name: str = "arm_link_wr1",
target_frame_name: str = "arm_link_fngr",
base_position: np.ndarray = np.zeros(3),
fix_base: bool = True,
rotation_bound: float = 0.01,
position_bound: float = 0.01,
collision_bound: float = 0.001,
error_on_fail: bool = True,
q_current=q_nominal_arm,
max_iter: int = 20,
):
"""Convert the desired pose for Spot to joint angles, subject to constraints.
Args:
plant (MultibodyPlant): The plant that contains the Spot model.
context (Context): The plant context
X_WT (RigidTransform): The target pose in the world frame.
target_frame_name (str, optional): The name of a frame that X_WT should correspond to,
defaults to "arm_link_fngr" (the upper part of the gripper on Spot's arm).
fix_base (bool, optional): If True, then the body of Spot will be fixed to the current
pose. Defaults to True.
rotation_bound (float, optional): The maximum allowed rotation error in radians.
position_bound (float, optional): The maximum allowed position error.
collision_bound (float, optional): The minimum allowed distance between Spot and the other
objects in the scene.
"""
for _ in range(max_iter):
ik = InverseKinematics(plant, context)
q = ik.q() # Get variables for MathematicalProgram
prog = ik.prog() # Get MathematicalProgram

world_frame = plant.world_frame()
target_frame = plant.GetFrameByName(target_frame_name)

# nominal pose
q0 = np.zeros(len(q))
q0[:3] = base_position
q0[3:10] = q_current

# Target position and rotation
p_WT = X_WT.translation()
R_WT = X_WT.rotation()

# Constraints
ik.AddPositionConstraint(
frameA=world_frame,
frameB=target_frame,
p_BQ=np.zeros(3),
p_AQ_lower=p_WT - position_bound,
p_AQ_upper=p_WT + position_bound,
)
ik.AddOrientationConstraint(
frameAbar=world_frame,
R_AbarA=R_WT,
frameBbar=target_frame,
R_BbarB=RotationMatrix(),
theta_bound=rotation_bound,
)
# # This is currently failing for some reason
# # collision constraint
# ik.AddMinimumDistanceLowerBoundConstraint(
# collision_bound, influence_distance_offset=0.001
# )

if fix_base:
prog.AddConstraint(eq(q[:3], base_position))

q_start = q0.copy()
q_start[3:10] = q_current
prog.AddQuadraticErrorCost(np.identity(len(q)), q0, q)
prog.SetInitialGuess(q, q_start)

result = Solve(ik.prog())
if result.is_success():
print("IK successfully solved")
return result.GetSolution(q)
if error_on_fail:
print("could not be solved")
raise AssertionError("IK failed :(")
return None

def solve_ik_downwards_only(
plant: MultibodyPlant,
context: Context,
X_WT: RigidTransform,
target_frame_name: str = "arm_link_fngr",
base_position: np.ndarray = np.zeros(3),
fix_base: bool = True,
rotation_bound: float = 0.01,
position_bound: float = 0.01,
collision_bound: float = 0.001,
error_on_fail: bool = True,
q_current=q_nominal_arm,
max_iter: int = 20,
):
"""Convert the desired pose for Spot to joint angles, subject to constraints.
This version constrains movement so the target frame only moves straight downward.
Args:
plant (MultibodyPlant): The plant that contains the Spot model.
context (Context): The plant context
X_WT (RigidTransform): The target pose in the world frame.
target_frame_name (str, optional): The name of the frame for the IK target.
fix_base (bool, optional): If True, the body of Spot is fixed at the current pose.
rotation_bound (float, optional): Max allowed rotation error in radians.
position_bound (float, optional): Max allowed position variation downward.
collision_bound (float, optional): Min allowed distance between Spot and other objects.
error_on_fail (bool, optional): Raise AssertionError if solving fails.
q_current (np.ndarray, optional): Current joint configuration for initial guess.
max_iter (int, optional): Maximum number of solve attempts.
Returns:
np.ndarray: The solved joint configuration if successful, else None.
"""
for _ in range(max_iter):
ik = InverseKinematics(plant, context)
q = ik.q() # Get variables for MathematicalProgram
prog = ik.prog() # Get MathematicalProgram

world_frame = plant.world_frame()
target_frame = plant.GetFrameByName(target_frame_name)

# Nominal pose
q0 = np.zeros(len(q))
q0[:3] = base_position
q0[3:10] = q_current

# Target position and rotation
p_WT = X_WT.translation()
R_WT = X_WT.rotation()

# Position constraint:
# Keep x and y fixed at p_WT[0], p_WT[1],
# allow z to vary only downward by position_bound.
p_AQ_lower = np.array([p_WT[0], p_WT[1], p_WT[2] - position_bound])
p_AQ_upper = np.array([p_WT[0], p_WT[1], p_WT[2]])

ik.AddPositionConstraint(
frameA=world_frame,
frameB=target_frame,
p_BQ=np.zeros(3),
p_AQ_lower=p_AQ_lower,
p_AQ_upper=p_AQ_upper,
)

# Orientation constraint remains the same
ik.AddOrientationConstraint(
frameAbar=world_frame,
R_AbarA=R_WT,
frameBbar=target_frame,
R_BbarB=RotationMatrix(),
theta_bound=rotation_bound,
)

# If you want collision constraints, you can uncomment this
# ik.AddMinimumDistanceLowerBoundConstraint(
# collision_bound, influence_distance_offset=0.001
# )

if fix_base:
prog.AddConstraint(eq(q[:3], base_position))

q_start = q0.copy()
q_start[3:10] = q_current
prog.AddQuadraticErrorCost(np.identity(len(q)), q0, q)
prog.SetInitialGuess(q, q_start)

result = Solve(ik.prog())
if result.is_success():
print("IK successfully solved with downward-only movement.")
return result.GetSolution(q)

if error_on_fail:
print("IK with downwards only movement could not be solved")
raise AssertionError("IK failed :(")
return None
Loading

0 comments on commit 3599364

Please sign in to comment.