-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathBaseXClient.py
312 lines (247 loc) · 9.98 KB
/
BaseXClient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
# -*- coding: utf-8 -*-
"""
Python 2.7.3 and 3.x client for BaseX.
Works with BaseX 7.0 and later
Requires Python 3.x or Python 2.x having some backports like bytearray.
(I've tested Python 3.2.3, and Python 2.7.3 on Fedora 16 linux x86_64.)
LIMITATIONS:
* binary content would corrupt, maybe. (I didn't test it)
* also, will fail to extract stored binary content, maybe.
(both my code, and original don't care escaped 0xff.)
Documentation: http://docs.basex.org/wiki/Clients
(C) 2012, Hiroaki Itoh. BSD License
updated 2014 by Marc van Grootel
"""
import hashlib
import socket
import threading
# ---------------------------------
#
class SocketWrapper(object):
"""a wrapper to python native socket module."""
def __init__(self, sock,
receive_bytes_encoding='utf-8',
send_bytes_encoding='utf-8'):
self.receive_bytes_encoding = receive_bytes_encoding
self.send_bytes_encoding = send_bytes_encoding
self.terminator = bytearray(chr(0), self.receive_bytes_encoding)
self.__s = sock
self.__buf = bytearray(chr(0) * 0x1000, self.receive_bytes_encoding)
self.__bpos = 0
self.__bsize = 0
def clear_buffer(self):
"""reset buffer status for next invocation ``recv_until_terminator()``
or ``recv_single_byte()``."""
self.__bpos = 0
self.__bsize = 0
def __fill_buffer(self):
"""cache next bytes"""
if self.__bpos >= self.__bsize:
self.__bsize = self.__s.recv_into(self.__buf)
self.__bpos = 0
# Returns a single byte from the socket.
def recv_single_byte(self):
"""recv a single byte from previously fetched buffer."""
self.__fill_buffer()
result_byte = self.__buf[self.__bpos]
self.__bpos += 1
return result_byte
# Reads until terminator byte is found.
def recv_until_terminator(self):
"""recv a nul(or specified as terminator_byte)-terminated whole string
from previously fetched buffer."""
result_bytes = bytearray()
while True:
self.__fill_buffer()
pos = self.__buf.find(self.terminator, self.__bpos, self.__bsize)
if pos >= 0:
result_bytes.extend(self.__buf[self.__bpos:pos])
self.__bpos = pos + 1
break
else:
result_bytes.extend(self.__buf[self.__bpos:self.__bsize])
self.__bpos = self.__bsize
return result_bytes.decode(self.receive_bytes_encoding)
def sendall(self, data):
"""sendall with specified byte encoding if data is not bytearray, bytes
(maybe str). if data is bytearray or bytes, it will be passed to native sendall API
directly."""
if isinstance(data, (bytearray, bytes)):
return self.__s.sendall(data)
return self.__s.sendall(bytearray(data, self.send_bytes_encoding))
def __getattr__(self, name):
return lambda *arg, **kw: getattr(self.__s, name)(*arg, **kw)
# ---------------------------------
#
class Session(object):
"""class Session.
see http://docs.basex.org/wiki/Server_Protocol
"""
def __init__(self, host, port, user, password,
receive_bytes_encoding='utf-8',
send_bytes_encoding='utf-8'):
"""Create and return session with host, port, user name and password"""
self.__info = None
# create server connection
self.__swrapper = SocketWrapper(
socket.socket(socket.AF_INET, socket.SOCK_STREAM),
receive_bytes_encoding=receive_bytes_encoding,
send_bytes_encoding=send_bytes_encoding)
self.__swrapper.connect((host, port))
# receive timestamp
response = self.recv_c_str().split(':')
# send username and hashed password/timestamp
hfun = hashlib.md5()
if len(response) > 1:
code = "%s:%s:%s" % (user, response[0], password)
nonce = response[1]
else:
code = password
nonce = response[0]
hfun.update(hashlib.md5(code.encode('us-ascii')).hexdigest().encode('us-ascii'))
hfun.update(nonce.encode('us-ascii'))
self.send(user + chr(0) + hfun.hexdigest())
# evaluate success flag
if not self.server_response_success():
raise IOError('Access Denied.')
def execute(self, com):
"""Execute a command and return the result"""
# send command to server
self.send(com)
# receive result
result = self.receive()
self.__info = self.recv_c_str()
if not self.server_response_success():
raise IOError(self.__info)
return result
def query(self, querytxt):
"""Creates a new query instance (having id returned from server)."""
return Query(self, querytxt)
def create(self, name, content):
"""Creates a new database with the specified input (may be empty)."""
self.__send_input(8, name, content)
def add(self, path, content):
"""Adds a new resource to the opened database."""
self.__send_input(9, path, content)
def replace(self, path, content):
"""Replaces a resource with the specified input."""
self.__send_input(12, path, content)
def store(self, path, content):
"""Stores a binary resource in the opened database.
api won't escape 0x00, 0xff automatically, so you must do it
yourself explicitly."""
# ------------------------------------------
# chr(13) + path + chr(0) + content + chr(0)
self.__send_binary_input(13, path, content)
#
# ------------------------------------------
def info(self):
"""Return process information"""
return self.__info
def close(self):
"""Close the session"""
self.send('exit')
self.__swrapper.close()
def recv_c_str(self):
"""Retrieve a string from the socket"""
return self.__swrapper.recv_until_terminator()
def send(self, value):
"""Send the defined string"""
self.__swrapper.sendall(value + chr(0))
def __send_input(self, code, arg, content):
"""internal. don't care."""
self.__swrapper.sendall(chr(code) + arg + chr(0) + content + chr(0))
self.__info = self.recv_c_str()
if not self.server_response_success():
raise IOError(self.info())
def __send_binary_input(self, code, path, content):
"""internal. don't care."""
# at this time, we can't use __send_input itself because of encoding
# problem. we have to build bytearray directly.
if not isinstance(content, (bytearray, bytes)):
raise ValueError("Sorry, content must be bytearray or bytes, not " +
str(type(content)))
# ------------------------------------------
# chr(code) + path + chr(0) + content + chr(0)
data = bytearray([code])
try:
data.extend(path)
except:
data.extend(path.encode('utf-8'))
data.extend([0])
data.extend(content)
data.extend([0])
#
# ------------------------------------------
self.__swrapper.sendall(data)
self.__info = self.recv_c_str()
if not self.server_response_success():
raise IOError(self.info())
def server_response_success(self):
"""Return success check"""
return self.__swrapper.recv_single_byte() == 0
def receive(self):
"""Return received string"""
self.__swrapper.clear_buffer()
return self.recv_c_str()
def iter_receive(self):
"""iter_receive() -> (typecode, item)
iterate while the query returns items.
typecode list is in http://docs.basex.org/wiki/Server_Protocol:_Types
"""
self.__swrapper.clear_buffer()
typecode = self.__swrapper.recv_single_byte()
while typecode > 0:
string = self.recv_c_str()
yield (typecode, string)
typecode = self.__swrapper.recv_single_byte()
if not self.server_response_success():
raise IOError(self.recv_c_str())
# ---------------------------------
#
class Query():
"""class Query.
see http://docs.basex.org/wiki/Server_Protocol
"""
def __init__(self, session, querytxt):
"""Create query object with session and query"""
self.__session = session
self.__id = self.__exc(chr(0), querytxt)
def bind(self, name, value, datatype=''):
"""Binds a value to a variable.
An empty string can be specified as data type."""
self.__exc(chr(3), self.__id + chr(0) + name + chr(0) + value + chr(0) + datatype)
def context(self, value, datatype=''):
"""Bind the context item"""
self.__exc(chr(14), self.__id + chr(0) + value + chr(0) + datatype)
def iter(self):
"""iterate while the query returns items"""
self.__session.send(chr(4) + self.__id)
return self.__session.iter_receive()
def execute(self):
"""Execute the query and return the result"""
return self.__exc(chr(5), self.__id)
def info(self):
"""Return query information"""
return self.__exc(chr(6), self.__id)
def options(self):
"""Return serialization parameters"""
return self.__exc(chr(7), self.__id)
def updating(self):
"""Returns true if the query may perform updates; false otherwise."""
return self.__exc(chr(30), self.__id)
def full(self):
"""Returns all resulting items as strings, prefixed by XDM Meta Data."""
return self.__exc(chr(31), self.__id)
def close(self):
"""Close the query"""
self.__exc(chr(2), self.__id)
def __exc(self, cmd, arg):
"""internal. don't care."""
# should we expose this?
# (this makes sense only when mismatch between C/S is existing.)
self.__session.send(cmd + arg)
result = self.__session.receive()
if not self.__session.server_response_success():
raise IOError(self.__session.recv_c_str())
return result