-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrobotiq_gripper_service.py
171 lines (129 loc) · 5.74 KB
/
robotiq_gripper_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#!/usr/bin/env python
import roslib
import rospy
import threading
from robotiq_2f_gripper_control.msg import Robotiq2FGripper_robot_output as outputMsg
from robotiq_2f_gripper_control.msg import Robotiq2FGripper_robot_input as inputMsg
from time import sleep
from assembly_ros.srv import (RobotiqGripperActivate,
RobotiqGripperActivateRequest,
RobotiqGripperActivateResponse,
RobotiqGripperSetPosition,
RobotiqGripperSetPositionResponse,
RobotiqGripperSetSpeed,
RobotiqGripperSetSpeedResponse,
RobotiqGripperSetForce,
RobotiqGripperSetForceResponse)
class RobotiqService():
def __init__(self):
gripper_name = rospy.get_param('~gripper_name', 'robotiq_gripper')
# Services to control the gripper's activation, position, speed, and force
self.activate_command = rospy.Service('~activate_gripper', RobotiqGripperActivate, self._activate_gripper)
self.pos_command = rospy.Service('~set_gripper_position', RobotiqGripperSetPosition, self._set_gripper_position)
self.speed_command = rospy.Service('~set_gripper_speed', RobotiqGripperSetSpeed, self._set_gripper_speed)
self.force_command = rospy.Service('~set_gripper_force', RobotiqGripperSetForce, self._set_gripper_force)
# A publisher to send commands to the gripper
self._command_pub = rospy.Publisher(gripper_name + '/output', outputMsg, queue_size=1)
self._curr_command = outputMsg()
self._command_lock = threading.Lock()
# A subscriber to check the current status of the gripper
self._state_sub= rospy.Subscriber(gripper_name + '/input', inputMsg, self._set_curr_status)
self._curr_status= inputMsg()
self._status_lock = threading.Lock()
def is_activated(self):
status = self._get_curr_status()
return status.gSTA == 3 and status.gACT == 1
def is_reset(self):
status = self._get_curr_status()
return status.gSTA == 0 and status.gACT == 0
def is_moving(self):
status = self._get_curr_status()
return status.gGTO == 1 and status.gOBJ == 0
def is_stopped(self):
status = self._get_curr_status()
return status.gOBJ != 0
def is_faulted(self):
status = self._get_curr_status()
# Check if there is a fault
return status.gFLT != 0 and status.gFLT !=5
def object_detected(self):
status = self._get_curr_status()
return status.gOBJ == 1 or status.gOBJ == 2
# Wait until given status function is true, if timeout is negative, wait forever
def _wait_until_status(self, status, timeout=5):
r = rospy.Rate(30)
start_time = rospy.get_time()
while not rospy.is_shutdown():
if (timeout >= 0. and rospy.get_time() - start_time > timeout) or self.is_faulted():
return False
if status():
return True
r.sleep()
return False
def _set_curr_status(self, status):
with self._status_lock:
self._curr_status = status
def _get_curr_status(self):
with self._status_lock:
status = self._curr_status
return status
def _set_curr_command(self, command):
with self._command_lock:
self._curr_command = command
def _get_curr_command(self):
with self._command_lock:
command = self._curr_command
return command
def _publisher(self, command):
self._set_curr_command(command)
self._command_pub.publish(command)
def _activate_gripper(self, request = RobotiqGripperActivateRequest()):
# Reset gripper before activating
command = outputMsg()
command.rACT = 0
self._publisher(command)
if not self._wait_until_status(self.is_reset):
return RobotiqGripperActivateResponse(False)
# Activate gripper, set default speed and force
command.rACT = 1
command.rGTO = 1
command.rSP = 255
command.rFR = 150
self._publisher(command)
return RobotiqGripperActivateResponse(self._wait_until_status(self.is_activated))
def _set_gripper_position(self, request):
command = self._get_curr_command()
response = RobotiqGripperSetPositionResponse(True)
# Activate gripper if it isn't activated
if not self.is_activated():
self._activate_gripper()
command = self._get_curr_command()
command.rPR = request.position
self._publisher(command)
if not self._wait_until_status(self.is_moving):
response.success = False
return response
response.success = self._wait_until_status(self.is_stopped)
return response
def _set_gripper_speed(self, request):
command = self._get_curr_command()
# Activate gripper if it isn't activated
if not self.is_activated():
self._activate_gripper()
command = self._get_curr_command()
command.rSP = request.speed
self._publisher(command)
return RobotiqGripperSetSpeedResponse(not self.is_faulted())
def _set_gripper_force(self, request):
command = self._get_curr_command()
# Activate gripper if it isn't activated
if not self.is_activated():
self._activate_gripper()
command = self._get_curr_command()
command.rFR = request.force
self._publisher(command)
return RobotiqGripperSetForceResponse(not self.is_faulted())
if __name__ == '__main__':
rospy.init_node("robotiq_gripper_service")
gripper = RobotiqService()
rospy.spin()