-
Notifications
You must be signed in to change notification settings - Fork 2
/
elm327.py
175 lines (133 loc) · 4.38 KB
/
elm327.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import serial
import sys
import time
import re
CAN_RE = re.compile( b'([0-9A-F]{3})\\W*([0-9A-F\\W]+)' )
# Protocol mapping (taken from ELM327 datasheet)
PROTOCOLS = """ 0 Autodetect (everything except User1 and User2)
1 SAE J1850 PWM (41.6 kbaud)
2 SAE J1850 VPW (10.4 kbaud)
3 ISO 9141-2 (5 baud init)
4 ISO 14230-4 KWP (5 baud init)
5 ISO 14230-4 KWP (fast init)
6 ISO 15765-4 CAN (11 bit ID, 500 kbaud)
7 ISO 15765-4 CAN (29 bit ID, 500 kbaud)
8 ISO 15765-4 CAN (11 bit ID, 250 kbaud)
9 ISO 15765-4 CAN (29 bit ID, 250 kbaud)
A SAE J1939 CAN (29 bit ID, 250* kbaud)
B User1 CAN (11* bit ID, 125* kbaud)
C User2 CAN (11* bit ID, 50* kbaud)
(Values marked as * can be set as programmable parameters.)
"""
class ELM327:
def __init__( self, device='/dev/ttyUSB0', baud_rate='500000', protocol='0' ):
self.elm = serial.Serial( device, baud_rate )
self.protocol = protocol
def send( self, cmd ):
self.elm.reset_input_buffer()
self.elm.reset_output_buffer()
self.elm.write( cmd )
self.elm.write( b'\r' )
def recv( self ):
output = bytearray()
while True:
b = self.elm.read( 1 )
if b == b'>':
break
output.append( b[0] )
print( output )
return output
def recv_line( self ):
output = bytearray()
while True:
b = self.elm.read( 1 )
if b == b'\r':
break
elif b == b'>':
raise EOFError
output.append( b[0] )
#print( output )
return output
def recv_can( self ):
msg_raw = self.recv_line()
msg_m = CAN_RE.match( msg_raw )
#print( msg_raw )
if msg_m:
msg_id = int( msg_m.group( 1 ), 16 )
msg_b = bytes.fromhex( msg_m.group( 2 ).decode( 'ascii' ) )
return (msg_id, msg_b)
if msg_raw.startswith( b'SEARCHING' ):
raise EOFError
return (-1, msg_raw)
def get_prompt( self ):
self.send( b'this will reset the prompt' )
self.recv()
def reset( self ):
# reset interface
self.get_prompt()
self.send( b'AT Z' )
self.recv()
# return to defaults
self.send( b'AT D' )
self.recv()
# turn echo off
self.send( b'AT E0' )
self.recv()
# check that information string works
self.send( b'AT I' )
ack = self.recv()
print( ack )
assert ack.startswith( b'ELM' )
# set the CANbus protocol used by the car
self.send( 'AT SP {}'.format( self.protocol ).encode('ascii') )
self.recv()
# initialize the CANbus interface
self.send( b'0100' )
print( self.recv() )
# get protocol
self.send( b'AT DPN' )
print( self.recv() )
# enable messages longer than 7 bytes (standards are for chumps!)
self.send( b'AT AL' )
self.recv()
# turn on headers
self.send( b'AT H1' )
self.recv()
# disable formatting
self.send( b'AT CAF0' )
self.recv()
# disable extra whitespace (~30% extra buffer space!)
self.send( b'AT S0' )
self.recv()
# set huuuuge timeout
self.send( b'AT STFF' )
self.recv()
print( '-- device ready' )
def start_can( self ):
self.get_prompt()
self.send( b'AT MA' )
def set_can_filter( self, filter ):
self.get_prompt()
self.send( 'AT CF {0:03x}'.format( filter ).encode( 'ascii' ) )
self.recv()
def set_can_mask( self, mask ):
self.get_prompt()
self.send( 'AT CM {0:03x}'.format( mask ).encode( 'ascii' ) )
self.recv()
def set_can_and( self, mask ):
cf = mask
cm = 0x7ff ^ mask
self.set_can_filter( cf )
self.set_can_mask( cm )
def set_can_whitelist( self, message_ids=None ):
cf = 0
cm = 0
if message_ids:
# generage a CAN ID filter/mask
# we are matching bits that *don't change* for all 11-bit message IDs?
cf = message_ids[0]
cm = 0x7ff
for m in message_ids[1:]:
cm &= ((cf ^ m) ^ 0x7ff)
self.set_can_filter( cf )
self.set_can_mask( cm )