Skip to content

Commit

Permalink
readded urc scripts
Browse files Browse the repository at this point in the history
  • Loading branch information
davidcalderon03 committed Oct 4, 2024
1 parent a4cba9f commit 2da9e23
Show file tree
Hide file tree
Showing 9 changed files with 357 additions and 0 deletions.
18 changes: 18 additions & 0 deletions urc_scripts/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>urc_scripts</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="[email protected]">keseterg</maintainer>
<license>TODO: License declaration</license>

<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>

<export>
<build_type>ament_python</build_type>
</export>
</package>
Empty file.
4 changes: 4 additions & 0 deletions urc_scripts/setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/urc_scripts
[install]
install_scripts=$base/lib/urc_scripts
26 changes: 26 additions & 0 deletions urc_scripts/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from setuptools import find_packages, setup

package_name = "urc_scripts"

setup(
name=package_name,
version="0.0.0",
packages=find_packages(exclude=["test"]),
data_files=[
("share/ament_index/resource_index/packages", ["resource/" + package_name]),
("share/" + package_name, ["package.xml"]),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="keseterg",
maintainer_email="[email protected]",
description="TODO: Package description",
license="TODO: License declaration",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"test_joint = urc_scripts.test_joint:main",
"joy_drive = urc_scripts.joy_drive:main",
],
},
)
27 changes: 27 additions & 0 deletions urc_scripts/test/test_copyright.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from ament_copyright.main import main
import pytest


# Remove the `skip` decorator once the source file(s) have a copyright header
@pytest.mark.skip(
reason="No copyright header has been placed in the generated source file."
)
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=[".", "test"])
assert rc == 0, "Found errors"
25 changes: 25 additions & 0 deletions urc_scripts/test/test_flake8.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from ament_flake8.main import main_with_errors
import pytest


@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
assert rc == 0, "Found %d code style errors / warnings:\n" % len(
errors
) + "\n".join(errors)
Empty file.
145 changes: 145 additions & 0 deletions urc_scripts/urc_scripts/joy_drive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import rclpy
from rclpy.publisher import Publisher
from rclpy.subscription import Subscription
from rclpy.node import Node
from rclpy.parameter import Parameter
from rcl_interfaces.msg import SetParametersResult
from geometry_msgs.msg import TwistStamped
from sensor_msgs.msg import Joy


class JoyDrive(Node):
"""
Use joystick to command the drivetrain.
Args:
Node (_type_): node from ros2.
"""

def __init__(self):
super().__init__("joy_drive")

# declare parameters
self.declare_parameter("max_linear_velocity_ms", 0.0)
self.declare_parameter("max_angular_velocity_radians", 0.0)
self.declare_parameter("joy_command_topic", "/joy")
self.declare_parameter("cmd_vel_topic", "/cmd_vel")
self.declare_parameter("target_axes", [1, 3])
self.declare_parameter("invert_linear_velocity", False)
self.declare_parameter("invert_angular_velocity", False)

# get parameters
self.max_linear = (
self.get_parameter("max_linear_velocity_ms")
.get_parameter_value()
.double_value
)
self.max_angular = (
self.get_parameter("max_angular_velocity_radians")
.get_parameter_value()
.double_value
)
self.joy_command_topic = (
self.get_parameter("joy_command_topic").get_parameter_value().string_value
)
self.cmd_vel_topic = (
self.get_parameter("cmd_vel_topic").get_parameter_value().string_value
)
self.op_axes = (
self.get_parameter("target_axes").get_parameter_value().integer_array_value
)
self.inv_linvel = (
self.get_parameter("invert_linear_velocity")
.get_parameter_value()
.double_value
)
self.inv_angvel = (
self.get_parameter("invert_angular_velocity")
.get_parameter_value()
.double_value
)

# register dynamic parameter callbacks
self.add_on_set_parameters_callback(self.on_param_update)

self.curr_twist = TwistStamped()
self.joy_msg_subscriber: Subscription = self.create_subscription(
Joy, self.joy_command_topic, self.update_vel_target, 10
)
self.cmd_vel_publisher: Publisher = self.create_publisher(
TwistStamped, self.cmd_vel_topic, 10
)

self.get_logger().info("Joy Drive Started.")
self.get_logger().info(
f"Default Settings: [linear velocity {self.max_linear} m/s],"
+ f"[angular velocity {self.max_angular} rad/s]"
)

def update_vel_target(self, msg: Joy):
"""Update velocity target for the drivetrain.
Args:
msg (Joy): joy message from the controller.
"""
self.curr_twist.twist.linear.x = (
msg.axes[self.op_axes[0]] * self.max_linear * (-1 if self.inv_linvel else 1)
)
self.curr_twist.twist.angular.z = (
msg.axes[self.op_axes[1]]
* self.max_angular
* (-1 if self.inv_angvel else 1)
)
self.curr_twist.header.stamp = self.get_clock().now().to_msg()

self.cmd_vel_publisher.publish(self.curr_twist)

def on_param_update(self, params: list[Parameter]) -> SetParametersResult:
"""When parameter is updated.
Args:
params (list[Parameter]): list of updated parameters
Returns:
SetParametersResult: result for setting the parameters.
"""
result = SetParametersResult()

for param in params:
if param.name == "max_linear_velocity_ms":
self.max_linear = param.get_parameter_value().double_value
self.get_logger().info(f"Max linvel updated to {self.max_linear} m/s.")
if param.name == "max_angular_velocity_radians":
self.max_angular = param.get_parameter_value().double_value
self.get_logger().info(
f"Max angvel updated to {self.max_angular} rad/s."
)
if param.name == "target_axes":
self.op_axes = param.get_parameter_value().integer_array_value
self.get_logger().info(f"Axies has be configured to {self.op_axes}.")
if param.name == "invert_linear_vel":
self.inv_linvel = param.get_parameter_value().bool_value
self.get_logger().info(f"Linear velocity inversion: {self.inv_linvel}")
if param.name == "invert_angular_vel":
self.inv_angvel = param.get_parameter_value().bool_value
self.get_logger().info(f"Angular velocity inversion: {self.inv_angvel}")
result.successful = True
return result


def main(args=None):
"""Entry function
Args:
args (_type_, optional): Arguments. Defaults to None.
"""
rclpy.init(args=args)
node = JoyDrive()
while rclpy.ok():
rclpy.spin_once(node)
node.destroy_node()
rclpy.shutdown()


if __name__ == "__main__":
main()
112 changes: 112 additions & 0 deletions urc_scripts/urc_scripts/test_joint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import rclpy
import numpy as np
from rclpy.publisher import Publisher
from rclpy.subscription import Subscription
from rclpy.client import Client
from rclpy.action import ActionClient
from rclpy.node import Node
from geometry_msgs.msg import Twist
from sensor_msgs.msg import Joy
from std_srvs.srv import Trigger
from control_msgs.action import GripperCommand


def rpy_to_quaternion(roll, pitch, yaw):
"""
Convert Roll-Pitch-Yaw (RPY) angles to quaternion.
Parameters:
roll (float): Roll angle in radians.
pitch (float): Pitch angle in radians.
yaw (float): Yaw angle in radians.
Returns:
numpy.array: Quaternion in the form [w, x, y, z].
"""
cy = np.cos(yaw * 0.5)
sy = np.sin(yaw * 0.5)
cr = np.cos(roll * 0.5)
sr = np.sin(roll * 0.5)
cp = np.cos(pitch * 0.5)
sp = np.sin(pitch * 0.5)

qw = cy * cr * cp + sy * sr * sp
qx = cy * sr * cp - sy * cr * sp
qy = cy * cr * sp + sy * sr * cp
qz = sy * cr * cp - cy * sr * sp

return qw, qx, qy, qz


# Example usage


class TestJoint(Node):
def __init__(self):
super().__init__("ff")
self.joy_subscriber: Subscription = self.create_subscription(
Joy, "/joy", self.pub, 10
)
self.cmd_twist_publisher: Publisher = self.create_publisher(
Twist, "/cmd_twist", 10
)
self.start_servo_service_request: Client = self.create_client(
Trigger, "/arm_rt_pinocchio/start_servo"
)
self.stop_servo_service_request: Client = self.create_client(
Trigger, "/arm_rt_pinocchio/stop_servo"
)
self.left_gripper_action_client: ActionClient = ActionClient(
self, GripperCommand, "/gripper_controller_left/gripper_cmd"
)
self.right_gripper_action_client: ActionClient = ActionClient(
self, GripperCommand, "/gripper_controller_right/gripper_cmd"
)
self.previous_open = False

def pub(self, msg: Joy):
t = Twist()
t.linear.x = msg.axes[1] * 0.30
t.linear.y = msg.axes[0] * 0.30

if msg.buttons[4] == 1:
t.linear.z = 0.10
elif msg.buttons[5] == 1:
t.linear.z = -0.10
else:
t.linear.z = 0.0

# t.angular.z = msg.axes[2] * 0.5
t.angular.y = msg.axes[3] * 0.2

self.cmd_twist_publisher.publish(t)

if msg.buttons[1] == 1:
self.start_servo_service_request.call_async(Trigger.Request())
if msg.buttons[2] == 1:
self.stop_servo_service_request.call_async(Trigger.Request())

if msg.buttons[3] == 1 and not self.previous_open:
target = GripperCommand.Goal()
target.command.position = 1.0
self.previous_open = True
self.left_gripper_action_client.send_goal_async(target)
self.right_gripper_action_client.send_goal_async(target)
elif msg.buttons[3] == 0 and self.previous_open:
target = GripperCommand.Goal()
target.command.position = -0.40
self.previous_open = False
self.left_gripper_action_client.send_goal_async(target)
self.right_gripper_action_client.send_goal_async(target)


def main(args=None):
rclpy.init(args=args)
node = TestJoint()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()


if __name__ == "__main__":
main()

0 comments on commit 2da9e23

Please sign in to comment.