-
Notifications
You must be signed in to change notification settings - Fork 0
/
esmSerial.py
92 lines (78 loc) · 2.53 KB
/
esmSerial.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
from esmPrint import esmPrintSource as ps
from enum import Enum
import serial
from multiprocessing import Process
from multiprocessing import Queue
import multiprocessing
import queue as Queue2
from time import sleep
class esmSerialPorts(Enum):
panelMicro = 0
electronicLoad = 1
leds = 2
class esmSerial():
def write(self,port,msg):
try:
if port.port != None:
port.write(msg)
except AttributeError:
pass
return None
def rxHandler(self,port,callback,queue):
while True:
if port.isOpen():
while port.in_waiting > 0:
#self.dprint(ps.serial,'Value Received')
if type(callback) is multiprocessing.queues.Queue:
callback.put((port.read(1)))
else:
port.read(1)
try:
item = queue.get_nowait()
if item == 'q':
break
except Queue2.Empty:
pass
sleep(.1)
def serialTasksClose(self):
for port in self.ports:
self.ports[port][2].put('q')
self.ports[port][1].join()
self.ports[port][0].close()
def __init__(self,Dprint,serPorts):
self.uiProc = None
self.dprint = Dprint.dprint
dprint = self.dprint
self.ports = {}
for port in serPorts:
# Create the port from the tuple provided
# 0 - port enum name
# 1 - port location
# 2 - port callback queue
if port[1] != None and port[1] != 'test':
try:
ser = serial.Serial(port[1])
except serial.SerialException:
self.dprint(ps.serial,'Port '+port[1]+' unable to open')
ser = serial.Serial()
else:
ser = serial.Serial()
if len(port) > 3:
ser.baudrate = port[3]
queue = Queue()
p = Process(target=self.rxHandler,args=(ser,port[2],queue))
p.start()
self.ports[port[0]] = (ser,p,queue)
def close(self):
self.serialTasksClose()
def sendSerial(self, port, msg):
try:
if not port in self.ports:
print("Port not defined")
return True
serPort = self.ports[port][0]
self.write(serPort,msg)
return len(msg)
except AttributeError:
raise
return True