-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcx.py
executable file
·211 lines (172 loc) · 6.74 KB
/
cx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
#!/usr/bin/env python
"""\
Execute API requests.
Usage: cx.py <api path> <request object>
Authentication is done by generating the appropriate header after reading the
line 'authentication <username> <api key>' from ~/.cxrc.
JSON will be seralized according to the default encoding, or ascii safe if
in doubt.
Stdin will be used if request object is "-". If no request object, the
empty object "{}" is assumed.
Wiki references and documentation:
API authentication: https://wiki.cxense.com/display/cust/API+authentication
Requests and responses: https://wiki.cxense.com/display/cust/Requests+and+responses
Examples (for bash, please check the wiki pages for tips how to run this on Windows)
Version:
$ cx.py --version
VERSION cx.py : <version>
Absolute path:
$ cx.py https://api.cxense.com/public/date
{
"date": "2013-04-22T15:06:20.252Z"
}
Relative path, defaults to https://api.cxense.com unless apiserver is set in ~/.cxrc
$ cx.py /public/date
{
"date": "2013-04-22T15:06:20.252Z"
}
POST request with non-empty request object:
$ cx.py /site '{"siteId":"9222300742735526873"}'
{
"sites": [
{
"id": "9222300742735526873"
"name": "Example site",
"url": "http://www.example.com",
"country": "US",
"timeZone": "America/Los_Angeles",
}
]
}
GET request with json parameter:
$ cx.py /profile/content/fetch?json=%7B%22url%22%3A%22http%3A%2F%2Fwww.example.com%22%7D
{
"url": "http://www.example.com",
"id": "0caaf24ab1a0c33440c06afe99df986365b0781f"
}
"""
import sys
def isPython2():
return sys.version_info.major == 2
if isPython2():
import httplib
import urlparse
else:
import http.client as httplib
import urllib.parse as urlparse
import os
import hmac
import json
import locale
import hashlib
import datetime
import traceback
import collections
#
# please update the version
#
VERSION_TIMESTAMP = '2017-06-09'
# Default configuration.
username = None
secret = None
apiserver = 'https://api.cxense.com'
# Locate and autoload configuration from ~/.cxrc
rc = os.path.join(os.path.expanduser('~'), '.cxrc')
if os.path.exists(rc):
for line in open(rc):
fields = line.split()
if fields[0] == 'authentication' and len(fields) == 3:
username = fields[1]
secret = fields[2]
elif fields[0] == 'apiserver' and len(fields) == 2:
apiserver = fields[1]
def getDate(connection):
# If the computer's time can be trusted, the below condition can be changed to False
if True:
try:
connection.request("GET", "/public/date")
return json.load(connection.getresponse())['date']
except:
pass
return datetime.datetime.utcnow().isoformat() + "Z"
def execute(url, content, username=username, secret=secret):
connection = (httplib.HTTPConnection if url.scheme == 'http' else httplib.HTTPSConnection)(url.netloc)
try:
date = getDate(connection)
signature = hmac.new(secret.encode('utf-8'), date.encode('utf-8'), digestmod=hashlib.sha256).hexdigest()
headers = {"X-cXense-Authentication": "username=%s date=%s hmac-sha256-hex=%s" % (username, date, signature)}
headers["Content-Type"] = "application/json; charset=utf-8"
connection.request("GET" if content is None else "POST", url.path + ("?" + url.query if url.query else ""), content, headers)
response = connection.getresponse()
return response.status, response.getheader('Content-Type', ''), response.read()
finally:
connection.close()
if __name__ == "__main__":
# output the version number of this script
if ('-v' in sys.argv) or ('--version' in sys.argv):
print('VERSION cx.py : %r\n' % VERSION_TIMESTAMP)
sys.exit()
if len(sys.argv) < 2 or '--help' in sys.argv:
print('VERSION cx.py: %r\n' % VERSION_TIMESTAMP)
print(__doc__)
sys.exit(1)
elif len(sys.argv) > 3:
print("Too many arguments. Remember to quote the JSON.")
sys.exit(1)
if username is None or secret is None:
print("Please add the line 'authentication <username> <api key>' to %s" % rc)
sys.exit(3)
if '@' not in username:
print("Username is not an email address: %s" % username)
sys.exit(4)
if not secret.startswith('api&'):
print("Invalid API key: %s" % secret)
sys.exit(5)
# Load data from argument or stdin, hopefully with correct encoding.
argument = sys.argv[2] if len(sys.argv) > 2 else None
if argument is None:
content = None
elif argument == '-':
content = sys.stdin.read()
else:
if isPython2():
content = unicode(argument, sys.stdin.encoding or locale.getpreferredencoding()).encode('UTF-8')
else:
content = argument.encode('UTF-8')
if len(sys.argv) == 2:
try:
# GET request: early integrity check of optional json query parameter
# if no json parameter exists (like for /public/date), make the test pass with "{}" as dummy instead
json.loads(urlparse.parse_qs(urlparse.urlparse(sys.argv[-1]).query).get('json',["{}"])[-1])
except ValueError as e:
print("Invalid JSON in \"json\" parameter: %s" % e)
sys.exit(3)
# Make sure piping works, which can have a undefined encoding.
ensure_ascii = sys.stdout.encoding != 'UTF-8'
# Default to apiserver, unless a full URL was given.
path = sys.argv[1]
if path.startswith('http'):
url = urlparse.urlparse(path)
else:
url = urlparse.urlparse(urlparse.urljoin(apiserver, path))
# Execute the API request and exit 0 only if it was successful.
try:
status, contentType, response = execute(url, content, username, secret)
except:
print("HTTP request to %s failed" % path)
traceback.print_exc()
sys.exit(6)
if contentType.startswith('application/json'):
print(json.dumps(json.loads(response.decode('utf-8'), object_pairs_hook=collections.OrderedDict), indent=2, ensure_ascii=ensure_ascii))
else:
if isPython2():
if sys.platform == "win32":
# On Windows, stdout is text mode by default. Set binary mode to prevent mangling. Not needed in
# Python 3, where sys.stdout.buffer.write ensures binary mode.
import os, msvcrt
msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
sys.stdout.write(response)
else:
sys.stdout.buffer.write(response)
if status != 200:
sys.exit(1)