forked from jasonacox/tuyapower
-
Notifications
You must be signed in to change notification settings - Fork 0
/
scan.py
155 lines (132 loc) · 4.96 KB
/
scan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#!/usr/bin/python
#
# TuyaScan - Scan for tuya devices on the network broadcasting on UDP port 6666 and 6667
#
# *****************************************************
# ** Depreciated: Use "python -m tuyapower" instead. **
# *****************************************************
#
# Author: Jason A. Cox
# For more information see https://github.com/jasonacox/tuyapower
from __future__ import print_function
import json
import socket
from hashlib import md5
from Crypto.Cipher import AES # PyCrypto
MAXCOUNT = 10 # How many tries before stopping
DEBUG = False # Additional details beyond verbose
UDPPORT = 6666 # Tuya 3.1 UDP Port
UDPPORTS = 6667 # Tuya 3.3 encrypted UDP Port
TIMEOUT = 3.0 # Seconds to wait for a broadcast
# Return positive number or zero
def floor(x):
if x > 0:
return x
else:
return 0
# UDP packet payload decryption - credit to tuya-convert
pad = lambda s: s + (16 - len(s) % 16) * chr(16 - len(s) % 16)
unpad = lambda s: s[:-ord(s[len(s) - 1:])]
encrypt = lambda msg, key: AES.new(key, AES.MODE_ECB).encrypt(pad(msg).encode())
decrypt = lambda msg, key: unpad(AES.new(key, AES.MODE_ECB).decrypt(msg)).decode()
udpkey = md5(b"yGAdlopoPVldABfn").digest()
decrypt_udp = lambda msg: decrypt(msg, udpkey)
# Store found devices in memory
def appenddevice(newdevice, devices):
if(newdevice['ip'] in devices):
return True
devices[newdevice['ip']] = newdevice
return False
# Scan function
def deviceScan(verbose = False):
"""Scans your network for smart plug devices on UDP ports 6666 and 6667
devices = tuyapower.deviceScan(verbose)
Parameters:
verbose = True or False, print formatted output to stdout
Response:
devices = Dictionary of all devices found with power data if available
To unpack data, you can do something like this:
devices = tuyapower.deviceScan()
for ip in devices:
id = devices[ip]['gwId']
key = devices[ip]['productKey']
vers = devices[ip]['version']
"""
# Enable UDP lisenting broadcasting mode on UDP port 6666 - 3.1 Devices
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
client.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
client.bind(("", UDPPORT))
client.settimeout(TIMEOUT)
# Enable UDP lisenting broadcasting mode on encrypted UDP port 6667 - 3.3 Devices
clients = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
clients.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
clients.bind(("", UDPPORTS))
clients.settimeout(TIMEOUT)
if(verbose):
print("Scanning on UDP ports %d and %d for devices...\n"%(UDPPORT,UDPPORTS))
# globals
devices={}
count = 0
counts = 0
spinnerx = 0
spinner = "|/-\\|"
# loop to find devices
while (count + counts) <= MAXCOUNT:
note = 'invalid'
print("Scanning... %s\r"%(spinner[spinnerx]), end = '')
spinnerx = (spinnerx + 1) % 4
if (count <= counts): # alternate between 6666 and 6667 ports
try:
data, addr = client.recvfrom(4048)
except:
# Timeout
count = count + 1
continue
else:
try:
data, addr = clients.recvfrom(4048)
except:
# Timeout
counts = counts + 1
continue
if(DEBUG):
print("* Message from [%s]: %s\n"%addr,data)
ip = addr[0]
gwId = productKey = version = ""
result = data
try:
result = data[20:-8]
try:
result = decrypt_udp(result) # 3.3
except:
result = result.decode() # 3.1
result = json.loads(result)
note = 'Valid'
ip = result['ip']
gwId = result['gwId']
productKey = result['productKey']
version = result['version']
except:
if(DEBUG):
print("* Unexpected payload=%r\n", result)
result = {"ip": ip}
note = "Unknown"
# check to see if we have seen this device before and add to devices array
if appenddevice(result, devices) == False:
# new device found - back off count if we keep getting new devices
if(version=='3.1'):
count = floor(count - 1)
else:
counts = floor(counts - 1)
if(verbose):
print("FOUND Device [%s payload]: %s\n ID = %s, productKey = %s, Version = %s" % (note,ip,gwId,productKey,version))
else:
# no new devices found
if(version=='3.1'):
count = count + 1
else:
counts = counts + 1
if(verbose):
print(" \nScan Complete! Found %s devices.\n"%len(devices))
return(devices)
d = deviceScan(True)