-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathopenvpn_status_ws_parser.py
169 lines (130 loc) · 5.63 KB
/
openvpn_status_ws_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""File that contains OpenvpnStatusWsParser class."""
import time
from datetime import datetime
from humanize import naturalsize
class OpenvpnStatusWsParser():
"""Class that handles parsing OpenVPN's status log."""
log_path = None
log_type = None
raw_data = None
def __init__(self, status_log_path):
self.log_path = status_log_path
self.fetch_raw_data()
self.fetch_log_type()
def fetch_raw_data(self):
"""Fetches raw data from file and assings it to raw_data property."""
with open(self.log_path, 'r') as file_handle:
self.raw_data = file_handle.read().strip()
def fetch_log_type(self):
"""Fetches log type from raw_data and assings evaluated value to log_type property."""
first_line = self.raw_data.split("\n")[0].strip()
if first_line == 'OpenVPN CLIENT LIST':
self.log_type = 'subnet'
elif first_line == 'OpenVPN STATISTICS':
self.log_type = 'ptp'
@staticmethod
def get_parsed_datetime(raw_value):
"""Returns iso8601 and epoch formatted datetime of specified raw value."""
datet = datetime.strptime(raw_value, '%a %b %d %H:%M:%S %Y')
iso = datet.isoformat()
tstamp = time.mktime(datet.timetuple())
parsed_date = dict()
parsed_date['iso8601'] = iso
parsed_date['ts'] = tstamp
return parsed_date
@staticmethod
def get_timezone():
"""Returns system's timezone."""
timezone = dict()
timezone['name'] = time.tzname[1]
timezone['offset'] = time.strftime('%z')
return timezone
@staticmethod
def get_real_address(raw_value):
"""Returns dict that contains parsed IP address and port."""
ip_address, port = raw_value.split(':')
address = dict()
address['real'] = dict()
address['real']['ip'] = ip_address
address['real']['port'] = int(port)
return address
@staticmethod
def get_traffic(byte_stats):
"""Returns dict that contains parsed traffic statistics."""
traffic = dict()
traffic['received'] = dict()
traffic['sent'] = dict()
for key, value in byte_stats.items():
group = key.replace('bytes_', '')
try:
traffic[group]['bytes'] = int(value)
traffic[group]['rounded'] = naturalsize(int(value))
except KeyError:
traffic[group]['bytes'] = None
return traffic
def get_updated_at(self):
"""Returns iso8601 and epoch formatted datetime of last log update."""
second_line = self.raw_data.split("\n")[1].strip()
if second_line.startswith('Updated,'):
raw_value = second_line.split(',')[1]
updated_at = self.get_parsed_datetime(raw_value)
return updated_at
return None
def get_clients(self):
"""Finds client rows from raw data and returns dict that contains client information."""
lines = self.raw_data.split("\n")
start = lines.index('Common Name,Real Address,Bytes Received,Bytes Sent,Connected Since')
end = lines.index('ROUTING TABLE')
headers = ['common_name', 'address', 'bytes_received', 'bytes_sent', 'connected_since']
clients = dict()
byte_stats = dict()
for line in lines[start + 1:end]:
client_row = line.strip().split(',')
client = dict()
for index, header in enumerate(headers):
if header == 'address':
client[header] = self.get_real_address(client_row[index])
elif header in ('bytes_received', 'bytes_sent'):
byte_stats[header] = client_row[index]
elif header == 'connected_since':
client[header] = self.get_parsed_datetime(client_row[index])
else:
client[header] = client_row[index]
if bool(byte_stats):
client['traffic'] = self.get_traffic(byte_stats)
else:
client['traffic'] = None
clients[client['common_name']] = client
byte_stats.clear()
start = lines.index('Virtual Address,Common Name,Real Address,Last Ref')
end = lines.index('GLOBAL STATS')
headers = ['virtual_address', 'common_name', 'real_address', 'last_reference']
for line in lines[start + 1:end]:
client_row = line.strip().split(',')
common_name = client_row[headers.index('common_name')]
for index, header in enumerate(headers):
if header == 'virtual_address':
virtual_address = dict()
virtual_address['ip'] = client_row[index]
clients[common_name]['address']['virtual'] = virtual_address
elif header == 'last_reference':
clients[common_name][header] = self.get_parsed_datetime(client_row[index])
return clients
def get_clients_connected(self):
"""Returns amount of clients currently connected."""
return len(self.get_clients())
@property
def data(self):
"""Returns final output of parsed data."""
data = dict()
if self.log_type == 'subnet':
data['topology'] = self.log_type
data['updated_at'] = self.get_updated_at()
data['timezone'] = self.get_timezone()
data['clients'] = self.get_clients()
data['clients_connected'] = self.get_clients_connected()
elif self.log_type == 'ptp':
data['topology'] = self.log_type
return data