forked from qvr/unifi-gateway
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtools.py
114 lines (103 loc) · 4.23 KB
/
tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# -*- coding: utf-8 -*-
import re
import time
from random import randint
def mac_string_2_array(mac):
return [int(i, 16) for i in mac.split(':')]
def ip_string_2_array(mac):
return [int(i) for i in mac.split('.')]
def netmask_to_cidr(netmask):
return sum([bin(int(x)).count('1') for x in netmask.split('.')])
def uptime():
with open('/proc/uptime', 'r') as f:
return int(float(f.readline().split()[0]))
def get_if_table(data, ports):
if_list = [ d['ifname'] for d in ports ]
lan_if = [ d['ifname'] for d in ports if d['name'].lower() == 'lan' ][0]
wan_if = [ d['ifname'] for d in ports if d['name'].lower() == 'wan' ][0]
if_table = []
if_data = data['ifstat']
for (iface,info) in if_data.items():
if iface in if_list:
if not iface in data['ip']:
continue
if_entry = {
'drops': int(info["rx_dropped"]) + int(info["tx_dropped"]),
'enable': True,
'full_duplex': True,
'ip': data['ip'][iface]['address'],
'mac': data['macs'][iface],
'name': iface,
'netmask': data['ip'][iface]['netmask'],
'num_port': 1,
'rx_bytes': info["rx_bytes"],
'rx_dropped': info["rx_dropped"],
'rx_errors': info["rx_errors"],
'rx_multicast': info["rx_multicast"],
'rx_packets': info["rx_packets"],
'speed': 1000,
'tx_bytes': info["tx_bytes"],
'tx_dropped': info["tx_dropped"],
'tx_errors': info["tx_errors"],
'tx_packets': info["tx_packets"],
'up': True,
'uptime': uptime()
}
if iface == wan_if:
if 'gateway' in data['ip'][iface]:
if_entry['gateways'] = [ data['ip'][iface]['gateway'] ]
if_entry['nameservers'] = data['nameservers']
if_entry['latency'] = data['latency']
if_entry['speedtest_lastrun'] = data['speedtest']['lastrun']
if_entry['speedtest_ping'] = data['speedtest']['ping']
if_entry['speedtest_status'] = 'Idle'
if_entry['xput_down'] = data['speedtest']['download']
if_entry['xput_up'] = data['speedtest']['upload']
if_table.append(if_entry)
return if_table
def get_network_table(data, ports):
if_list = [ d['ifname'] for d in ports ]
lan_if = [ d['ifname'] for d in ports if d['name'].lower() == 'lan' ][0]
wan_if = [ d['ifname'] for d in ports if d['name'].lower() == 'wan' ][0]
network_table = []
for iface in if_list:
if not iface in data['ip']:
continue
net_entry = {
'address': '%s/%s' % (data['ip'][iface]['address'], netmask_to_cidr(data['ip'][iface]['netmask'])),
'addresses': [
'%s/%s' % (data['ip'][iface]['address'], netmask_to_cidr(data['ip'][iface]['netmask']))
],
'autoneg': 'true',
'duplex': 'full',
'l1up': 'true',
'mac': data['macs'][iface],
'mtu': '1500',
'name': iface,
'speed': '1000',
'stats': get_net_stats(data, iface),
'up': 'true'
}
if iface == lan_if:
net_entry['host_table'] = data['host_table']
elif iface == wan_if:
net_entry['gateways'] = [ data['ip'][iface]['gateway'] ]
net_entry['nameservers'] = data['nameservers']
network_table.append(net_entry)
return network_table
def get_net_stats(data,iface):
if_stat = data['ifstat'][iface]
return {
'multicast': if_stat['rx_multicast'],
# 'rx_bps': '342',
'rx_bytes': if_stat['rx_bytes'],
'rx_dropped': if_stat['rx_dropped'],
'rx_errors': if_stat['rx_errors'],
'rx_multicast': if_stat['rx_multicast'],
'rx_packets': if_stat['rx_packets'],
# 'tx_bps': '250',
'tx_bytes': if_stat['tx_bytes'],
'tx_dropped': if_stat['tx_dropped'],
'tx_errors': if_stat['tx_errors'],
'tx_packets': if_stat['tx_packets']
}