-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathneato.py
386 lines (331 loc) · 12.9 KB
/
neato.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
#!/usr/bin/env python
# Generic driver for the Neato XV-11 Robot Vacuum
# Copyright (c) 2010 University at Albany. All right reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the University at Albany nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL VANADIUM LABS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
# OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
neato_driver.py is a generic driver for the Neato XV-11 Robotic Vacuum.
ROS Bindings can be found in the neato_node package.
"""
__author__ = "[email protected] (Michael Ferguson)"
import os
import usb.core as uc
import usb.util as uu
import time
# to find libusb-1.0.dll
dirWithDll = os.path.abspath(os.path.dirname(__file__))
if dirWithDll not in os.environ['PATH']:
print 'Going to search for libusb-1.0.dll in ' + dirWithDll
os.environ['PATH'] = os.environ['PATH'] + ';' + os.path.abspath(os.path.dirname(__file__))
BASE_WIDTH = 248 # millimeters
MAX_SPEED = 300 # millimeters/second
xv11_analog_sensors = [ "WallSensorInMM",
"BatteryVoltageInmV",
"LeftDropInMM",
"RightDropInMM",
"RightMagSensor",
"LeftMagSensor",
"XTemp0InC",
"XTemp1InC",
"VacuumCurrentInmA",
"ChargeVoltInmV",
"NotConnected1",
"BatteryTemp1InC",
"NotConnected2",
"CurrentInmA",
"NotConnected3",
"BatteryTemp0InC" ]
xv11_digital_sensors = [ "SNSR_DC_JACK_CONNECT",
"SNSR_DUSTBIN_IS_IN",
"SNSR_LEFT_WHEEL_EXTENDED",
"SNSR_RIGHT_WHEEL_EXTENDED",
"LSIDEBIT",
"LFRONTBIT",
"RSIDEBIT",
"RFRONTBIT" ]
xv11_motor_info = [ "Brush_MaxPWM",
"Brush_PWM",
"Brush_mVolts",
"Brush_Encoder",
"Brush_RPM",
"Vacuum_MaxPWM",
"Vacuum_PWM",
"Vacuum_CurrentInMA",
"Vacuum_Encoder",
"Vacuum_RPM",
"LeftWheel_MaxPWM",
"LeftWheel_PWM",
"LeftWheel_mVolts",
"LeftWheel_Encoder",
"LeftWheel_PositionInMM",
"LeftWheel_RPM",
"RightWheel_MaxPWM",
"RightWheel_PWM",
"RightWheel_mVolts",
"RightWheel_Encoder",
"RightWheel_PositionInMM",
"RightWheel_RPM",
"Laser_MaxPWM",
"Laser_PWM",
"Laser_mVolts",
"Laser_Encoder",
"Laser_RPM",
"Charger_MaxPWM",
"Charger_PWM",
"Charger_mAH" ]
xv11_charger_info = [ "FuelPercent",
"BatteryOverTemp",
"ChargingActive",
"ChargingEnabled",
"ConfidentOnFuel",
"OnReservedFuel",
"EmptyFuel",
"BatteryFailure",
"ExtPwrPresent",
"ThermistorPresent[0]",
"ThermistorPresent[1]",
"BattTempCAvg[0]",
"BattTempCAvg[1]",
"VBattV",
"VExtV",
"Charger_mAH",
"MaxPWM" ]
NEATO_DEVICE = None
INTERVAL = 0.05
def getAllUSBDevices():
return [i for i in uc.find(find_all=True)]
def getNeatoDevice():
global NEATO_DEVICE
if NEATO_DEVICE is None:
idVendor=0x2108
idProduct=0x780b
NEATO_DEVICE = uc.find(idVendor=idVendor, idProduct=idProduct)
if NEATO_DEVICE is None:
err = "could not find your device using vendor {} and product {}, please check that it is configured and installed correctly.".format(hex(idVendor), hex(idProduct))
print err
raise Exception('no neato found: ' + err)
else:
print 'found neato, product/manufacturer:{}, {}'.format(NEATO_DEVICE.product, NEATO_DEVICE.manufacturer)
return NEATO_DEVICE
class NeatoUSB():
def __init__(self):
self.device = getNeatoDevice()
self.d = self.device
self.d.set_configuration()
def write(self, data):
self.device.write(1, data)
time.sleep(INTERVAL)
def read(self, count=1):
return ''.join([chr(x) for x in self.device.read(0x81, count)])
def readline(self):
chars = []
chars.append(self.read(1))
while chars[-1] != '\n':
chars.append(self.read(1))
return ''.join(chars)
def flushInput(self):
l = self.readline()
while l:
try:
l = self.readline()
except:
return
class xv11():
def __init__(self):
self.port = NeatoUSB()
# Storage for motor and sensor information
self.state = {"LeftWheel_PositionInMM": 0, "RightWheel_PositionInMM": 0}
self.stop_state = True
# turn things on
self.setTestMode("on")
self.setLDS("on")
# show off a bit
distance = 25
self.setMotors(distance, -distance, 200)
self.setMotors(-distance, distance, 200)
# some alerts
self.sound(3)
self.sound(6)
def exit(self):
self.setLDS("off")
self.setTestMode("off")
def playSound(self, number):
return self.command('playsound {}'.format(number))
def sound(self, number):
return self.playSound(number)
def setTestMode(self, value="on"):
""" Turn test mode on/off. """
return self.command("testmode " + value)
@property
def LDS(self):
return self.noLDS()
def commands(self):
return self.command('help')
@LDS.setter
def LDS(self, a):
return self.setLDS("on" if a else "off")
def noLDS(self):
return self.setLDS("off")
def setLDS(self, value="on"):
return self.command("setldsrotation " + value)
def requestScan(self):
""" Ask neato for an array of scan reads. """
self.port.flushInput()
return self.command("getldsscan")
def getScanRanges(self):
""" Read values of a scan -- call requestScan first! """
ranges = list()
angle = 0
try:
line = self.port.readline()
except:
return []
while line.split(",")[0] != "AngleInDegrees":
try:
line = self.port.readline()
except:
return []
while angle < 360:
try:
vals = self.port.readline()
except:
pass
vals = vals.split(",")
#print angle, vals
try:
a = int(vals[0])
r = int(vals[1])
ranges.append(r/1000.0)
except:
ranges.append(0)
angle += 1
return ranges
def turnRight(self, degrees, speed=200):
ratio = 390.0 / 180
return self.setMotors(int(degrees*ratio), int(-degrees*ratio), speed)
def turnLeft(self, degrees, speed=200):
return self.turnRight(-degrees, speed)
def turnAndBack(self, degrees, speed=200):
a = self.turnRight(degrees, speed)
time.sleep(3.0)
b = self.turnLeft(degrees, speed)
time.sleep(3.0)
def setMotors(self, l, r, s):
""" Set motors, distance left & right + speed """
#This is a work-around for a bug in the Neato API. The bug is that the
#robot won't stop instantly if a 0-velocity command is sent - the robot
#could continue moving for up to a second. To work around this bug, the
#first time a 0-velocity is sent in, a velocity of 1,1,1 is sent. Then,
#the zero is sent. This effectively causes the robot to stop instantly.
if (int(l) == 0 and int(r) == 0 and int(s) == 0):
if (not self.stop_state):
self.stop_state = True
l = 1
r = 1
s = 1
else:
self.stop_state = False
return self.command("setmotor "+str(int(l))+" "+str(int(r))+" "+str(int(s)))
def getMotors(self):
""" Update values for motors in the self.state dictionary.
Returns current left, right encoder values. """
self.port.flushInput()
return self.command("getmotors")
line = self.port.readline()
while line.split(",")[0] != "Parameter":
try:
line = self.port.readline()
except:
return [0,0]
for i in range(len(xv11_motor_info)):
try:
values = self.port.readline().split(",")
self.state[values[0]] = int(values[1])
except:
pass
return [self.state["LeftWheel_PositionInMM"],self.state["RightWheel_PositionInMM"]]
def getAnalogSensors(self):
""" Update values for analog sensors in the self.state dictionary. """
lines = self.command("getanalogsensors").splitlines()
for index, line in enumerate(lines):
if "SensorName" in line:
break
for line in lines[index+1:]:
values = line.split(',')
self.state[values[0]] = int(values[1])
def getDigitalSensors(self):
""" Update values for digital sensors in the self.state dictionary. """
lines = self.command("getdigitalsensors").splitlines()
for index, line in enumerate(lines):
if "Digital Sensor Name" in line:
break
for line in lines[index+1:]:
values = line.split(",")
self.state[values[0]] = int(values[1])
def command(self, command):
self.port.write(command + '\n')
lines = []
l = self.port.readline()
lines.append(l)
while l:
try:
l = self.port.readline()
lines.append(l)
except:
return ''.join(lines)
return ''.join(lines)
def howto(self, command):
return self.command('help ' + command)
def getCharger(self):
""" Update values for charger/battery related info in self.state dictionary. """
return self.command("getcharger")
line = self.port.readline()
while line.split(",")[0] != "Label":
line = self.port.readline()
for i in range(len(xv11_charger_info)):
values = self.port.readline().split(",")
try:
self.state[values[0]] = int(values[1])
except:
pass
def setBacklight(self, value=1):
if value > 0:
return self.command("setled backlighton")
else:
return self.command("setled backlightoff")
def shell(self):
command = raw_input('NEATO->>:').lower().encode('utf-8')
print repr(command)
while command not in ['q', 'quit']:
print self.command(command)
command = raw_input('NTO->>:').lower().encode('utf-8')
#SetLED - Sets the specified LED to on,off,blink, or dim. (TestMode Only)
#BacklightOn - LCD Backlight On (mutually exclusive of BacklightOff)
#BacklightOff - LCD Backlight Off (mutually exclusive of BacklightOn)
#ButtonAmber - Start Button Amber (mutually exclusive of other Button options)
#ButtonGreen - Start Button Green (mutually exclusive of other Button options)
#LEDRed - Start Red LED (mutually exclusive of other Button options)
#LEDGreen - Start Green LED (mutually exclusive of other Button options)
#ButtonAmberDim - Start Button Amber Dim (mutually exclusive of other Button options)
#ButtonGreenDim - Start Button Green Dim (mutually exclusive of other Button options)
#ButtonOff - Start Button Off