Skip to content

Commit

Permalink
Allow users to (dis)allow collisions with an object (#50)
Browse files Browse the repository at this point in the history
* Added functions to enable/disable collisions with an object vis the allowed collision matrix

* Clarified function name, reduced replicated code

* Made toggling collisions async

* Fix type annotations for consistency

* formatting

* Added tests

* Spelling

* Update docstring

* Fix unnecessary change
  • Loading branch information
amalnanavati authored Mar 30, 2024
1 parent 8bf71ae commit 93ca9aa
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 0 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ ament_python_install_package(pymoveit2)
# Install examples
set(EXAMPLES_DIR examples)
install(PROGRAMS
${EXAMPLES_DIR}/ex_allow_collisions.py
${EXAMPLES_DIR}/ex_collision_mesh.py
${EXAMPLES_DIR}/ex_collision_primitive.py
${EXAMPLES_DIR}/ex_fk.py
Expand Down
71 changes: 71 additions & 0 deletions examples/ex_allow_collisions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#!/usr/bin/env python3
"""
Example of (dis)allowing collisions between the robot object and an object with
the specified ID.
- ros2 run pymoveit2 ex_allow_collisions.py --ros-args -p id:="sphere" -p allow:=true
- ros2 run pymoveit2 ex_allow_collisions.py --ros-args -p id:="sphere" -p allow:=false
"""

from threading import Thread

import rclpy
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.node import Node

from pymoveit2 import MoveIt2
from pymoveit2.robots import panda


def main():
rclpy.init()

# Create node for this example
node = Node("ex_allow_collisions")

# Declare parameter for joint positions
node.declare_parameter(
"id",
"box",
)
node.declare_parameter(
"allow",
True,
)

# Create callback group that allows execution of callbacks in parallel without restrictions
callback_group = ReentrantCallbackGroup()

# Create MoveIt 2 interface
moveit2 = MoveIt2(
node=node,
joint_names=panda.joint_names(),
base_link_name=panda.base_link_name(),
end_effector_name=panda.end_effector_name(),
group_name=panda.MOVE_GROUP_ARM,
callback_group=callback_group,
)

# Spin the node in background thread(s) and wait a bit for initialization
executor = rclpy.executors.MultiThreadedExecutor(2)
executor.add_node(node)
executor_thread = Thread(target=executor.spin, daemon=True, args=())
executor_thread.start()
node.create_rate(1.0).sleep()

# Get parameters
object_id = node.get_parameter("id").get_parameter_value().string_value
allow = node.get_parameter("allow").get_parameter_value().bool_value

# (Dis)allow collisions
moveit2.allow_collisions(object_id, allow)
node.get_logger().info(
f"{'Allow' if allow else 'Disallow'}ed collisions between all objects and '{object_id}'"
)

rclpy.shutdown()
executor_thread.join()
exit(0)


if __name__ == "__main__":
main()
116 changes: 116 additions & 0 deletions pymoveit2/moveit2.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
import threading
from enum import Enum
from typing import List, Optional, Tuple, Union
Expand All @@ -6,6 +7,7 @@
from geometry_msgs.msg import Point, Pose, PoseStamped, Quaternion
from moveit_msgs.action import ExecuteTrajectory, MoveGroup
from moveit_msgs.msg import (
AllowedCollisionEntry,
AttachedCollisionObject,
CollisionObject,
Constraints,
Expand All @@ -15,8 +17,10 @@
PositionConstraint,
)
from moveit_msgs.srv import (
ApplyPlanningScene,
GetCartesianPath,
GetMotionPlan,
GetPlanningScene,
GetPositionFK,
GetPositionIK,
)
Expand Down Expand Up @@ -221,6 +225,34 @@ def __init__(
callback_group=self._callback_group,
)

# Create a service for getting the planning scene
self._get_planning_scene_service = self._node.create_client(
srv_type=GetPlanningScene,
srv_name="get_planning_scene",
qos_profile=QoSProfile(
durability=QoSDurabilityPolicy.VOLATILE,
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=1,
),
callback_group=callback_group,
)
self.__planning_scene = None
self.__old_allowed_collision_matrix = None

# Create a service for applying the planning scene
self._apply_planning_scene_service = self._node.create_client(
srv_type=ApplyPlanningScene,
srv_name="apply_planning_scene",
qos_profile=QoSProfile(
durability=QoSDurabilityPolicy.VOLATILE,
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=1,
),
callback_group=callback_group,
)

self.__collision_object_publisher = self._node.create_publisher(
CollisionObject, "/collision_object", 10
)
Expand Down Expand Up @@ -1691,6 +1723,90 @@ def detach_all_collision_objects(self):
)
self.__attached_collision_object_publisher.publish(msg)

def __update_planning_scene(self) -> bool:
"""
Gets the current planning scene. Returns whether the service call was
successful.
"""

if not self._get_planning_scene_service.service_is_ready():
self._node.get_logger().warn(
f"Service '{self._get_planning_scene_service.srv_name}' is not yet available. Better luck next time!"
)
return False
self.__planning_scene = self._get_planning_scene_service.call(
GetPlanningScene.Request()
).scene
return True

def allow_collisions(self, id: str, allow: bool) -> Optional[Future]:
"""
Takes in the ID of an element in the planning scene. Modifies the allowed
collision matrix to (dis)allow collisions between that object and all other
object.
If `allow` is True, a plan will succeed even if the robot collides with that object.
If `allow` is False, a plan will fail if the robot collides with that object.
Returns whether it successfully updated the allowed collision matrix.
Returns the future of the service call.
"""
# Update the planning scene
if not self.__update_planning_scene():
return None
allowed_collision_matrix = self.__planning_scene.allowed_collision_matrix
self.__old_allowed_collision_matrix = copy.deepcopy(allowed_collision_matrix)

# Get the location in the allowed collision matrix of the object
j = None
if id not in allowed_collision_matrix.entry_names:
allowed_collision_matrix.entry_names.append(id)
else:
j = allowed_collision_matrix.entry_names.index(id)
# For all other objects, (dis)allow collisions with the object with `id`
for i in range(len(allowed_collision_matrix.entry_values)):
if j is None:
allowed_collision_matrix.entry_values[i].enabled.append(allow)
elif i != j:
allowed_collision_matrix.entry_values[i].enabled[j] = allow
# For the object with `id`, (dis)allow collisions with all other objects
allowed_collision_entry = AllowedCollisionEntry(
enabled=[allow for _ in range(len(allowed_collision_matrix.entry_names))]
)
if j is None:
allowed_collision_matrix.entry_values.append(allowed_collision_entry)
else:
allowed_collision_matrix.entry_values[j] = allowed_collision_entry

# Apply the new planning scene
if not self._apply_planning_scene_service.service_is_ready():
self._node.get_logger().warn(
f"Service '{self._apply_planning_scene_service.srv_name}' is not yet available. Better luck next time!"
)
return None
return self._apply_planning_scene_service.call_async(
ApplyPlanningScene.Request(scene=self.__planning_scene)
)

def process_allow_collision_future(self, future: Future) -> bool:
"""
Return whether the allow collision service call is done and has succeeded
or not. If it failed, reset the allowed collision matrix to the old one.
"""
if not future.done():
return False

# Get response
resp = future.result()

# If it failed, restore the old planning scene
if not resp.success:
self.__planning_scene.allowed_collision_matrix = (
self.__old_allowed_collision_matrix
)

return resp.success

def __joint_state_callback(self, msg: JointState):
# Update only if all relevant joints are included in the message
for joint_name in self.joint_names:
Expand Down

0 comments on commit 93ca9aa

Please sign in to comment.