Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make AcCCS usable with different hardware setups #2

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 59 additions & 26 deletions EVSE.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class EVSE:

def __init__(self, args):
self.mode = RunMode(args.mode[0]) if args.mode else RunMode.FULL
self.skip_slac = True if args.skip_slac else False
self.disable_i2c = True if args.disable_i2c else False
self.iface = args.interface[0] if args.interface else "eth1"
self.sourceMAC = args.source_mac[0] if args.source_mac else "00:1e:c0:f2:6c:a0"
self.sourceIP = args.source_ip[0] if args.source_ip else "fe80::21e:c0ff:fef2:6ca0"
Expand Down Expand Up @@ -59,10 +61,12 @@ def __init__(self, args):
self.exi = EXIProcessor(self.protocol)

self.slac = _SLACHandler(self)
self.sdp = _SDPHandler(self)
self.tcp = _TCPHandler(self)

# I2C bus for relays
self.bus = SMBus(1)
if not self.disable_i2c:
self.bus = SMBus(1)

# Constants for i2c controlled relays
self.I2C_ADDR = 0x20
Expand All @@ -74,10 +78,13 @@ def __init__(self, args):
# Start the emulator
def start(self):
# Initialize the I2C bus for wwrite
self.bus.write_byte_data(self.I2C_ADDR, 0x00, 0x00)
if not self.disable_i2c:
self.bus.write_byte_data(self.I2C_ADDR, 0x00, 0x00)

self.toggleProximity()
self.doSLAC()
if not self.skip_slac:
self.doSLAC()
self.doSDP()
self.doTCP()
# If NMAP is not done, restart connection
if not self.tcp.finishedNMAP:
Expand All @@ -86,6 +93,9 @@ def start(self):

# Close the circuit for the proximity pins
def closeProximity(self):
if self.disable_i2c:
return

if self.modified_cordset:
print("INFO (EVSE): Closing CP/PP relay connections")
self.bus.write_byte_data(self.I2C_ADDR, self.CONTROL_REG, self.EVSE_PP | self.EVSE_CP)
Expand All @@ -95,6 +105,9 @@ def closeProximity(self):

# Close the circuit for the proximity pins
def openProximity(self):
if self.disable_i2c:
return

print("INFO (EVSE): Opening CP/PP relay connections")
self.bus.write_byte_data(self.I2C_ADDR, self.CONTROL_REG, self.ALL_OFF)

Expand All @@ -109,6 +122,10 @@ def doTCP(self):
self.tcp.start()
print("INFO (EVSE): Done TCP")

def doSDP(self):
self.sdp.start()
print("INFO (EVSE): Done SDP")

# Starts SLAC thread that handles layer 2 comms
def doSLAC(self):
self.slac.start()
Expand Down Expand Up @@ -155,22 +172,13 @@ def startSniff(self):
sniff(iface=self.iface, prn=self.handlePacket, stop_filter=self.stopSniff)

def stopSniff(self, pkt):
if pkt.haslayer("SECC_RequestMessage"):
print("INDO (EVSE): Recieved SECC_RequestMessage")
# self.evse.destinationMAC = pkt[Ether].src
# use this to send 3 secc responses incase car doesnt see one
self.destinationIP = pkt[IPv6].src
self.destinationPort = pkt[UDP].sport
Thread(target=self.sendSECCResponse).start()
if pkt.haslayer("CM_SLAC_MATCH_REQ"):
print("INFO (EVSE): Recieved SLAC_MATCH_REQ")
print("INFO (EVSE): Sending SLAC_MATCH_CNF")
sendp(self.buildSlacMatchCnf(), iface=self.iface, verbose=0)
self.stop = True
return self.stop

def sendSECCResponse(self):
time.sleep(0.2)
for i in range(3):
print("INFO (EVSE): Sending SECC_ResponseMessage")
sendp(self.buildSECCResponse(), iface=self.iface, verbose=0)

def handlePacket(self, pkt):
if pkt[Ether].type != 0x88E1 or pkt[Ether].src == self.sourceMAC:
return
Expand All @@ -189,11 +197,6 @@ def handlePacket(self, pkt):
print("INFO (EVSE): Sending ATTEN_CHAR_IND")
sendp(self.buildAttenCharInd(), iface=self.iface, verbose=0)

if pkt.haslayer("CM_SLAC_MATCH_REQ"):
print("INFO (EVSE): Recieved SLAC_MATCH_REQ")
print("INFO (EVSE): Sending SLAC_MATCH_CNF")
sendp(self.buildSlacMatchCnf(), iface=self.iface, verbose=0)

def buildSlacParmCnf(self):
ethLayer = Ether()
ethLayer.src = self.sourceMAC
Expand Down Expand Up @@ -348,13 +351,41 @@ def buildSetKey(self):
responsePacket = ethLayer / homePlugAVLayer / homePlugLayer
return responsePacket

class _SDPHandler:
def __init__(self, evse: EVSE):
self.evse = evse

self.destinationIP = None
self.destinationPort = None

def start(self):
sniff(iface=self.evse.iface, stop_filter=self.stopSniff)

def stopSniff(self, pkt):
if pkt.haslayer("SECC_RequestMessage"):
print("INDO (EVSE): Recieved SECC_RequestMessage")
# self.evse.destinationMAC = pkt[Ether].src
# use this to send 3 secc responses incase car doesnt see one
self.destinationIP = pkt[IPv6].src
self.destinationPort = pkt[UDP].sport
Thread(target=self.sendSECCResponse).start()
return True
return False

def sendSECCResponse(self):
time.sleep(0.2)
for i in range(3):
print("INFO (EVSE): Sending SECC_ResponseMessage")
sendp(self.buildSECCResponse(), iface=self.evse.iface, verbose=0)


def buildSECCResponse(self):
e = Ether()
e.src = self.sourceMAC
e.dst = self.destinationMAC
e.src = self.evse.sourceMAC
e.dst = self.evse.destinationMAC

ip = IPv6()
ip.src = self.sourceIP
ip.src = self.evse.sourceIP
ip.dst = self.destinationIP

udp = UDP()
Expand All @@ -367,8 +398,8 @@ def buildSECCResponse(self):

seccRM = SECC_ResponseMessage()
seccRM.SecurityProtocol = 16
seccRM.TargetPort = self.sourcePort
seccRM.TargetAddress = self.sourceIP # eno1
seccRM.TargetPort = self.evse.sourcePort
seccRM.TargetAddress = self.evse.sourceIP # eno1

responsePacket = e / ip / udp / secc / seccRM
return responsePacket
Expand Down Expand Up @@ -693,6 +724,8 @@ def buildNeighborAdvertisement(self):
parser.add_argument("--nmap-mac", nargs=1, help="The MAC address of the target device to NMAP scan (default: EVCC MAC address)")
parser.add_argument("--nmap-ip", nargs=1, help="The IP address of the target device to NMAP scan (default: EVCC IP address)")
parser.add_argument("--nmap-ports", nargs=1, help="List of ports to scan seperated by commas (ex. 1,2,5-10,19,...) (default: Top 8000 common ports)")
parser.add_argument("--skip-slac", action="store_true", help="Set this option when not using QCA based powerline chip. You will have to handle slac externally. (default: False)")
parser.add_argument("--disable-i2c", action="store_true", help="Set this option when not using the original AcCCS hardware, i.e. no I2C bus is present. (default: False)")
parser.add_argument("--modified-cordset", action="store_true", help="Set this option when using a modified cordset during testing of a target vehicle. The AcCCS system will provide a 150 ohm ground on the proximity line to reset the connection. (default: False)")
args = parser.parse_args()

Expand Down
132 changes: 86 additions & 46 deletions PEV.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class PEV:

def __init__(self, args):
self.mode = RunMode(args.mode[0]) if args.mode else RunMode.FULL
self.skip_slac = True if args.skip_slac else False
self.disable_i2c = True if args.disable_i2c else False
self.iface = args.interface[0] if args.interface else "eth1"
self.sourceMAC = args.source_mac[0] if args.source_mac else "00:1e:c0:f2:6c:a1"
self.sourceIP = args.source_ip[0] if args.source_ip else "fe80::21e:c0ff:fef2:6ca1"
Expand All @@ -56,10 +58,12 @@ def __init__(self, args):
self.exi = EXIProcessor(self.protocol)

self.slac = _SLACHandler(self)
self.sdp = _SDPHandler(self)
self.tcp = _TCPHandler(self)

# I2C bus for relays
self.bus = SMBus(1)
if not self.disable_i2c:
self.bus = SMBus(1)

# Constants for i2c controlled relays
self.I2C_ADDR = 0x20
Expand All @@ -71,10 +75,13 @@ def __init__(self, args):

def start(self):
# Initialize the smbus for I2C commands
self.bus.write_byte_data(self.I2C_ADDR, 0x00, 0x00)
if not self.disable_i2c:
self.bus.write_byte_data(self.I2C_ADDR, 0x00, 0x00)

self.toggleProximity()
self.doSLAC()
if not self.skip_slac:
self.doSLAC()
self.doSDP()
self.doTCP()
# If NMAP is not done, restart connection
if not self.tcp.finishedNMAP:
Expand All @@ -85,6 +92,12 @@ def doTCP(self):
self.tcp.start()
print("INFO (PEV) : Done TCP")

def doSDP(self):
print("INFO (PEV) : Starting SDP")
self.sdp.start()
self.sdp.sniffThread.join()
print("INFO (PEV) : Done SDP")

def doSLAC(self):
print("INFO (PEV) : Starting SLAC")
self.slac.start()
Expand All @@ -98,6 +111,10 @@ def openProximity(self):
self.setState(PEVState.A)

def setState(self, state: PEVState):
if self.disable_i2c:
print(f"INFO (PEV) : State transition to {state}")
return

if state == PEVState.A:
print("INFO (PEV) : Going to state A")
self.bus.write_byte_data(self.I2C_ADDR, self.CONTROL_REG, self.ALL_OFF)
Expand Down Expand Up @@ -160,11 +177,18 @@ def startSniff(self):

# Stop the thread when the slac match is done
def stopSniff(self, pkt):
if pkt.haslayer("SECC_ResponseMessage"):
self.pev.destinationIP = pkt[SECC_ResponseMessage].TargetAddress
self.pev.destinationPort = pkt[SECC_ResponseMessage].TargetPort
if pkt.haslayer("CM_SLAC_MATCH_CNF"):
print("INFO (PEV) : Recieved SLAC_MATCH_CNF")
self.NID = pkt[CM_SLAC_MATCH_CNF].VariableField.NetworkID
self.NMK = pkt[CM_SLAC_MATCH_CNF].VariableField.NMK
print("INFO (PEV) : Sending SET_KEY_REQ")
sendp(self.buildSetKeyReq(), iface=self.iface, verbose=0)
time.sleep(3) # give modem some time to reboot

if self.neighborSolicitationThread.running:
self.neighborSolicitationThread.stop()

self.stop = True
return True
return False

Expand Down Expand Up @@ -202,21 +226,6 @@ def handlePacket(self, pkt):
self.timeSinceLastPkt = time.time()
return

if pkt.haslayer("CM_SLAC_MATCH_CNF"):
print("INFO (PEV) : Recieved SLAC_MATCH_CNF")
self.NID = pkt[CM_SLAC_MATCH_CNF].VariableField.NetworkID
self.NMK = pkt[CM_SLAC_MATCH_CNF].VariableField.NMK
print("INFO (PEV) : Sending SET_KEY_REQ")
sendp(self.buildSetKeyReq(), iface=self.iface, verbose=0)
self.stop = True
Thread(target=self.sendSECCRequest).start()
return

def sendSECCRequest(self):
time.sleep(3)
print("INFO (PEV) : Sending 3 SECC_RequestMessage")
for i in range(1):
sendp(self.buildSECCRequest(), iface=self.iface, verbose=0)

def sendSounds(self):
self.numRemainingSounds = self.numSounds
Expand Down Expand Up @@ -344,31 +353,6 @@ def buildSetKeyReq(self):
responsePacket = ethLayer / homePlugAVLayer / homePlugLayer
return responsePacket

def buildSECCRequest(self):
ethLayer = Ether()
ethLayer.src = self.sourceMAC
ethLayer.dst = "33:33:00:00:00:01"

ipLayer = IPv6()
ipLayer.src = self.sourceIP
ipLayer.dst = "ff02::1"
ipLayer.hlim = 255

udpLayer = UDP()
udpLayer.sport = self.pev.sourcePort
udpLayer.dport = 15118

seccLayer = SECC()
seccLayer.SECCType = 0x9000
seccLayer.PayloadLen = 2

seccRequestLayer = SECC_RequestMessage()
seccRequestLayer.SecurityProtocol = 16
seccRequestLayer.TransportProtocol = 0

responsePacket = ethLayer / ipLayer / udpLayer / seccLayer / seccRequestLayer
return responsePacket

def buildNeighborAdvertisement(self):
ethLayer = Ether()
ethLayer.src = self.sourceMAC
Expand Down Expand Up @@ -402,6 +386,60 @@ def sendNeighborSoliciation(self, pkt):
# print("INFO (EVSE): Sending Neighor Advertisement")
sendp(self.buildNeighborAdvertisement(), iface=self.iface, verbose=0)

class _SDPHandler:
def __init__(self, pev: PEV):
self.pev = pev

# This method starts the slac process and will stop
def start(self):
self.runID = os.urandom(8)
self.stop = False
# Thread for sniffing packets and handling responses
# self.sniffThread = Thread(target=self.startSniff)
# self.sniffThread.start()

self.sniffThread = AsyncSniffer(iface=self.pev.iface, stop_filter=self.stopSniff)
self.sniffThread.start()

self.sendSECCRequest()

# Stop the thread when the slac match is done
def stopSniff(self, pkt):
if pkt.haslayer("SECC_ResponseMessage"):
self.pev.destinationIP = pkt[SECC_ResponseMessage].TargetAddress
self.pev.destinationPort = pkt[SECC_ResponseMessage].TargetPort
return True
return False

def sendSECCRequest(self):
print("INFO (PEV) : Sending 3 SECC_RequestMessage")
for i in range(1):
sendp(self.buildSECCRequest(), iface=self.pev.iface, verbose=0)

def buildSECCRequest(self):
ethLayer = Ether()
ethLayer.src = self.pev.sourceMAC
ethLayer.dst = "33:33:00:00:00:01"

ipLayer = IPv6()
ipLayer.src = self.pev.sourceIP
ipLayer.dst = "ff02::1"
ipLayer.hlim = 255

udpLayer = UDP()
udpLayer.sport = self.pev.sourcePort
udpLayer.dport = 15118

seccLayer = SECC()
seccLayer.SECCType = 0x9000
seccLayer.PayloadLen = 2

seccRequestLayer = SECC_RequestMessage()
seccRequestLayer.SecurityProtocol = 16
seccRequestLayer.TransportProtocol = 0

responsePacket = ethLayer / ipLayer / udpLayer / seccLayer / seccRequestLayer
return responsePacket

class _TCPHandler:
def __init__(self, pev: PEV):
Expand Down Expand Up @@ -778,6 +816,8 @@ def buildNeighborAdvertisement(self):
parser.add_argument("--nmap-mac", nargs=1, help="The MAC address of the target device to NMAP scan (default: SECC MAC address)")
parser.add_argument("--nmap-ip", nargs=1, help="The IP address of the target device to NMAP scan (default: SECC IP address)")
parser.add_argument("--nmap-ports", nargs=1, help="List of ports to scan seperated by commas (ex. 1,2,5-10,19,...) (default: Top 8000 common ports)")
parser.add_argument("--skip-slac", action="store_true", help="Set this option when not using QCA based powerline chip. You will have to handle slac externally. (default: False)")
parser.add_argument("--disable-i2c", action="store_true", help="Set this option when not using the original AcCCS hardware, i.e. no I2C bus is present. (default: False)")
args = parser.parse_args()

pev = PEV(args)
Expand Down