-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrcon.py
131 lines (114 loc) · 3.95 KB
/
rcon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
from socket import *
from struct import pack, unpack
from time import sleep
from pprint import pprint
import re
USER_REGEX = re.compile('^# +([0-9]+) "([^"]+)" +([^ ]+) +([0-9:]+) +([0-9]+) +([0-9]+) ([^ ]+) (.+)$')
USER_KEYS = ('user_id', 'name', 'steam_id', 'connected', 'ping', 'loss', 'state', 'adr')
class RCONClient:
def __init__(self, password, host, port=27016):
self._players = None
self._status = None
self._cvars = None
self._commands = None
self._maplist = None
self.server = (host, port)
self.socket = socket(AF_INET, SOCK_STREAM)
self.socket.connect(self.server)
self.auth(password)
def auth(self, password):
self.send_command(password, is_auth=True)
def read_packet(self):
data_len = unpack('<L', self.socket.recv(4))[0]
data = ''
while len(data) < data_len:
data += self.socket.recv(data_len-len(data))
clen = len(data)-10
keys = ['id','flag','data','null1','null2']
fmt = '<2L%ssBB' % clen
parsed = unpack(fmt, data)
return dict(zip(keys,parsed))
def send_command(self, command, is_auth=False):
command += '\x00'
clen = len(command)
fmt = '<3L%ssB' % clen
auth_flag = 3 if is_auth else 2
packet = pack(fmt, *(clen+9, 0, auth_flag, command, 0))
self.socket.send(packet)
packet = self.read_packet()
if not packet['data']:
" this was an auth attempt "
packet = self.read_packet()
if packet['flag'] != 2:
raise Exception('Expected flag value of 2, got %s' \
% packet['flag'])
if packet['id'] == 0xffffffff:
raise Exception('Authentication Failed')
return packet
def get_status(self):
packet = self.send_command('status')
status = {}
players = []
for line in packet['data'].split('\n'):
if not line.strip():
continue
if line[0] == '#':
m = USER_REGEX.match(line)
if m:
players.append(dict(zip(USER_KEYS, m.groups())))
else:
k,v = line.strip().split(':',1)
status[k.strip()] = v.strip()
return status, players
def get_cvars(self):
packet = self.send_command('cvarlist')
data = packet['data']
while True:
packet = self.read_packet()
data += packet['data']
if data.endswith('total convars/concommands\n'):
break
cvars = []
cmds = []
for line in data.split('\n'):
if line.find(':') == -1:
continue
cvar = [x.strip() for x in line.split(':',3)]
if cvar[1] == 'cmd':
cmds.append(cvar)
else:
cvars.append(cvar)
return (cvars, cmds)
def get_maplist(self):
packet = self.send_command('maps *')
maps = []
data = packet['data']
for line in data.split('\n'):
if line.startswith('PENDING: (fs)'):
maps.append(line.split(' ')[-1])
return maps
@property
def players(self):
if self._players is None:
self._status, self._players = self.get_status()
return self._players
@property
def status(self):
if self._status is None:
self._status, self._players = self.get_status()
return self._status
@property
def cvars(self):
if self._cvars is None:
self._cvars, self._commands = self.get_cvars()
return self._cvars
@property
def commands(self):
if self._commands is None:
self._cvars, self._commands = self.get_cvars()
return self._commands
@property
def maps(self):
if self._maplist is None:
self._maplist = self.get_maplist()
return self._maplist