diff --git a/CMakeLists.txt b/CMakeLists.txt index 43d5fc3..8e195a8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -11,6 +11,7 @@ ament_python_install_package(pymoveit2) # Install examples set(EXAMPLES_DIR examples) install(PROGRAMS + ${EXAMPLES_DIR}/ex_allow_collisions.py ${EXAMPLES_DIR}/ex_collision_mesh.py ${EXAMPLES_DIR}/ex_collision_primitive.py ${EXAMPLES_DIR}/ex_fk.py diff --git a/examples/ex_allow_collisions.py b/examples/ex_allow_collisions.py new file mode 100755 index 0000000..d6519a0 --- /dev/null +++ b/examples/ex_allow_collisions.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python3 +""" +Example of (dis)allowing collisions between the robot object and an object with +the specified ID. +- ros2 run pymoveit2 ex_allow_collisions.py --ros-args -p id:="sphere" -p allow:=true +- ros2 run pymoveit2 ex_allow_collisions.py --ros-args -p id:="sphere" -p allow:=false +""" + +from threading import Thread + +import rclpy +from rclpy.callback_groups import ReentrantCallbackGroup +from rclpy.node import Node + +from pymoveit2 import MoveIt2 +from pymoveit2.robots import panda + + +def main(): + rclpy.init() + + # Create node for this example + node = Node("ex_allow_collisions") + + # Declare parameter for joint positions + node.declare_parameter( + "id", + "box", + ) + node.declare_parameter( + "allow", + True, + ) + + # Create callback group that allows execution of callbacks in parallel without restrictions + callback_group = ReentrantCallbackGroup() + + # Create MoveIt 2 interface + moveit2 = MoveIt2( + node=node, + joint_names=panda.joint_names(), + base_link_name=panda.base_link_name(), + end_effector_name=panda.end_effector_name(), + group_name=panda.MOVE_GROUP_ARM, + callback_group=callback_group, + ) + + # Spin the node in background thread(s) and wait a bit for initialization + executor = rclpy.executors.MultiThreadedExecutor(2) + executor.add_node(node) + executor_thread = Thread(target=executor.spin, daemon=True, args=()) + executor_thread.start() + node.create_rate(1.0).sleep() + + # Get parameters + object_id = node.get_parameter("id").get_parameter_value().string_value + allow = node.get_parameter("allow").get_parameter_value().bool_value + + # (Dis)allow collisions + moveit2.allow_collisions(object_id, allow) + node.get_logger().info( + f"{'Allow' if allow else 'Disallow'}ed collisions between all objects and '{object_id}'" + ) + + rclpy.shutdown() + executor_thread.join() + exit(0) + + +if __name__ == "__main__": + main() diff --git a/pymoveit2/moveit2.py b/pymoveit2/moveit2.py index 6d7684e..38566cd 100644 --- a/pymoveit2/moveit2.py +++ b/pymoveit2/moveit2.py @@ -1,3 +1,4 @@ +import copy import threading from enum import Enum from typing import List, Optional, Tuple, Union @@ -6,6 +7,7 @@ from geometry_msgs.msg import Point, Pose, PoseStamped, Quaternion from moveit_msgs.action import ExecuteTrajectory, MoveGroup from moveit_msgs.msg import ( + AllowedCollisionEntry, AttachedCollisionObject, CollisionObject, Constraints, @@ -15,8 +17,10 @@ PositionConstraint, ) from moveit_msgs.srv import ( + ApplyPlanningScene, GetCartesianPath, GetMotionPlan, + GetPlanningScene, GetPositionFK, GetPositionIK, ) @@ -221,6 +225,34 @@ def __init__( callback_group=self._callback_group, ) + # Create a service for getting the planning scene + self._get_planning_scene_service = self._node.create_client( + srv_type=GetPlanningScene, + srv_name="get_planning_scene", + qos_profile=QoSProfile( + durability=QoSDurabilityPolicy.VOLATILE, + reliability=QoSReliabilityPolicy.RELIABLE, + history=QoSHistoryPolicy.KEEP_LAST, + depth=1, + ), + callback_group=callback_group, + ) + self.__planning_scene = None + self.__old_allowed_collision_matrix = None + + # Create a service for applying the planning scene + self._apply_planning_scene_service = self._node.create_client( + srv_type=ApplyPlanningScene, + srv_name="apply_planning_scene", + qos_profile=QoSProfile( + durability=QoSDurabilityPolicy.VOLATILE, + reliability=QoSReliabilityPolicy.RELIABLE, + history=QoSHistoryPolicy.KEEP_LAST, + depth=1, + ), + callback_group=callback_group, + ) + self.__collision_object_publisher = self._node.create_publisher( CollisionObject, "/collision_object", 10 ) @@ -1691,6 +1723,90 @@ def detach_all_collision_objects(self): ) self.__attached_collision_object_publisher.publish(msg) + def __update_planning_scene(self) -> bool: + """ + Gets the current planning scene. Returns whether the service call was + successful. + """ + + if not self._get_planning_scene_service.service_is_ready(): + self._node.get_logger().warn( + f"Service '{self._get_planning_scene_service.srv_name}' is not yet available. Better luck next time!" + ) + return False + self.__planning_scene = self._get_planning_scene_service.call( + GetPlanningScene.Request() + ).scene + return True + + def allow_collisions(self, id: str, allow: bool) -> Optional[Future]: + """ + Takes in the ID of an element in the planning scene. Modifies the allowed + collision matrix to (dis)allow collisions between that object and all other + object. + + If `allow` is True, a plan will succeed even if the robot collides with that object. + If `allow` is False, a plan will fail if the robot collides with that object. + Returns whether it successfully updated the allowed collision matrix. + + Returns the future of the service call. + """ + # Update the planning scene + if not self.__update_planning_scene(): + return None + allowed_collision_matrix = self.__planning_scene.allowed_collision_matrix + self.__old_allowed_collision_matrix = copy.deepcopy(allowed_collision_matrix) + + # Get the location in the allowed collision matrix of the object + j = None + if id not in allowed_collision_matrix.entry_names: + allowed_collision_matrix.entry_names.append(id) + else: + j = allowed_collision_matrix.entry_names.index(id) + # For all other objects, (dis)allow collisions with the object with `id` + for i in range(len(allowed_collision_matrix.entry_values)): + if j is None: + allowed_collision_matrix.entry_values[i].enabled.append(allow) + elif i != j: + allowed_collision_matrix.entry_values[i].enabled[j] = allow + # For the object with `id`, (dis)allow collisions with all other objects + allowed_collision_entry = AllowedCollisionEntry( + enabled=[allow for _ in range(len(allowed_collision_matrix.entry_names))] + ) + if j is None: + allowed_collision_matrix.entry_values.append(allowed_collision_entry) + else: + allowed_collision_matrix.entry_values[j] = allowed_collision_entry + + # Apply the new planning scene + if not self._apply_planning_scene_service.service_is_ready(): + self._node.get_logger().warn( + f"Service '{self._apply_planning_scene_service.srv_name}' is not yet available. Better luck next time!" + ) + return None + return self._apply_planning_scene_service.call_async( + ApplyPlanningScene.Request(scene=self.__planning_scene) + ) + + def process_allow_collision_future(self, future: Future) -> bool: + """ + Return whether the allow collision service call is done and has succeeded + or not. If it failed, reset the allowed collision matrix to the old one. + """ + if not future.done(): + return False + + # Get response + resp = future.result() + + # If it failed, restore the old planning scene + if not resp.success: + self.__planning_scene.allowed_collision_matrix = ( + self.__old_allowed_collision_matrix + ) + + return resp.success + def __joint_state_callback(self, msg: JointState): # Update only if all relevant joints are included in the message for joint_name in self.joint_names: