-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathbench.py
295 lines (255 loc) · 11.8 KB
/
bench.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
#!/usr/bin/env python
# encoding: utf-8
"""
pure python http benchmark client
@author:nikan([email protected])
@file: bench.py
@time: 2018/7/31 上午10:11
"""
import argparse
import sys
import time
from itertools import cycle
from typing import Text
from urllib.parse import urlparse
import gevent
import gevent.pool
from gevent.lock import Semaphore
from gevent.monkey import patch_all
patch_all()
import requests
from requests.auth import HTTPDigestAuth, HTTPBasicAuth
from requests.exceptions import ConnectionError
import urllib3
urllib3.disable_warnings()
auth_class = {'DIGEST': HTTPDigestAuth, 'BASIC': HTTPBasicAuth}
def mean(numbers: list):
return float(sum(numbers)) / max(len(numbers), 1)
class URLContainer:
"""每个url都有3个时间connect_cost_time, process_cost_time, total_cost_time"""
keep_alive_session = requests.session()
def __init__(self, url):
self.url = url
self.start_time = None
self.connect_time = None
self.read_time = None
self.end_time = None
self.content_length = None
self.status_code = None
self.error = False
@property
def total_cost_time(self):
return self.end_time - self.start_time
@property
def connect_cost_time(self):
return self.connect_time - self.start_time
@property
def process_cost_time(self):
return self.end_time - self.read_time
def request(self, *, method='get', timeout=None, params=None, data=None, json=None, files=None, proxies=None,
auth=None, verify=False, keep_alive=False, headers=None, cookies=None):
"""
A request is divided into two parts:
One for setting up connection.
Another for receiving response body.
:param method: get, post, put, delete.
:param timeout: default None means infinity.
:param params: parameters
:param data: Content-Type: application/x-www-form-urlencoded
:param json: Content-Type: application/body
:param files: Content-Type: `multipart/form-data`
:param proxies: support http proxies and socks5 proxies
:param auth: support basic Auth and digest Auth.
This parameter format is "auth_method:auth_user:auth_password"
So spilit it to three variable.
:param verify: True when required https.Default False.
:return: None
"""
try:
session = self.keep_alive_session if keep_alive else requests.session()
self.start_time = time.time()
r = session.request(method=method, url=self.url, params=params, data=data, files=files, proxies=proxies,
json=json, stream=True, timeout=timeout, verify=verify, auth=auth, headers=headers,
cookies=cookies)
self.status_code = r.status_code
self.connect_time = time.time()
# Semaphore防止 content过大时时间计算不准确
with Semaphore(1):
self.read_time = time.time()
_ = r.content
self.end_time = time.time()
self.content_length = len(_)
except ConnectionError:
print('网络连接失败, 请检查网络是否连通,或者网址是否有效')
exit(0)
except Exception as e:
self.error = True
if not self.connect_time:
self.connect_time = time.time()
if not self.read_time:
self.read_time = time.time()
if not self.end_time:
self.end_time = time.time()
class Benchmark:
def __init__(self, *, concurrency: int, total_requests: int, urls: list, timeout: int = None, method: Text = 'get',
keep_alive: bool = False, auth: Text = None, data: Text = None, json: dict = None,
headers: dict = None, cookies: dict = None):
self.concurrency = concurrency
self.total_requests = total_requests
self.timeout = timeout
self.method = method
self.keep_alive = keep_alive
self.auth = auth
self.urls = urls
self.data = data
self.json = json
self.headers = headers
self.cookies = cookies
self.url_containers = []
self.pool = gevent.pool.Pool(self.concurrency)
def start(self):
request_number = 0
for url in cycle(self.urls):
if request_number < self.total_requests:
container = URLContainer(url)
self.url_containers.append(container)
self.pool.spawn(container.request, method=self.method, timeout=self.timeout, keep_alive=self.keep_alive,
auth=self.auth, data=self.data, json=self.json, headers=self.headers,
cookies=self.cookies)
request_number += 1
else:
break
self.pool.join(raise_error=False)
# def get_request_time_
# 获取()连接时间、处理时间、总时间)的最短时间,平均时间,中位时间和最长时间
# TODO: nikan([email protected])
# 这可能不是必要的指标,但是确实能知道latency的瓶颈在哪里
# 不过我觉得这并不重要,这些指标是可以通过外部分析获得的。
def get_request_time_distribution(self, total_times: list):
"""得到不同百分比的耗时状态"""
sorted_times = sorted(total_times)
zero_percent = sorted_times[0]
ten_percent = sorted_times[int(len(sorted_times) * 0.1) - 1]
fifty_percent = sorted_times[int(len(sorted_times) * 0.5) - 1]
ninety_percent = sorted_times[int(len(sorted_times) * 0.9) - 1]
ninety_five_percent = sorted_times[int(len(sorted_times) * 0.95) - 1]
one_hundred_percent = sorted_times[-1]
request_time_distribution_string = '\n'.join(
['请求时间分布(秒)',
'{:5}{:.3f}'.format('0%(最快)', zero_percent),
'{:5}{:.3f}'.format('10%', ten_percent),
'{:5}{:.3f}'.format('50%', fifty_percent),
'{:5}{:.3f}'.format('90%', ninety_percent),
'{:5}{:.3f}'.format('95%', ninety_five_percent),
'{:5}{:.3f}'.format('100%(最慢)', one_hundred_percent)])
return request_time_distribution_string
def print_result(self):
print('压测结果========================')
connect_times = []
process_times = []
total_times = []
non_200_responses = 0
failed_responses = 0
for container in self.url_containers:
connect_times.append(container.connect_cost_time)
process_times.append(container.process_cost_time)
total_times.append(container.total_cost_time)
if container.status_code != 200:
non_200_responses += 1
if container.error:
failed_responses += 1
total_time_mean = mean(total_times)
formatted_string_one = '\n'.join(['{:20s}{}'.format('并发数:', self.concurrency),
'{:20s}{}'.format('请求数:', self.total_requests),
'{:20s}{}'.format('失败数:', failed_responses),
'{:19s}{}'.format('非200响应数:', non_200_responses),
'{:14s}{:.3f}'.format('平均请求时长(秒):', total_time_mean),
])
request_time_distribution_string = self.get_request_time_distribution(total_times)
print(formatted_string_one)
print('============================')
print(request_time_distribution_string)
return
def start_bench(concurrency, total_requests, urls, timeout, method, keep_alive, auth, data, json, headers, cookies):
bench_instance = Benchmark(concurrency=concurrency, total_requests=total_requests, urls=urls,
timeout=timeout, method=method, keep_alive=keep_alive, auth=auth, data=data, json=json,
headers=headers, cookies=cookies)
bench_instance.start()
bench_instance.print_result()
def parse_args(shell_args):
parser = argparse.ArgumentParser(prog='webenchmark', description='HTTP压测小工具🎂 Author: Ni Kan([email protected])')
parser.add_argument('-c', '--concurrency', dest='concurrency', type=int, default=1, help='并发数')
parser.add_argument('-n', '--number', dest='total_requests', type=int, help='请求数')
parser.add_argument('-m', '--method', dest='method', default='get', help='请求方式{GET,POST,DELETE,PUT,HEAD,OPTIONS}')
parser.add_argument('-f', '--file', dest='file_path', help='文件路径')
parser.add_argument('-d', '--data', dest='data', help='post/put 数据')
parser.add_argument('-j', '--json', dest='json', help='post/put json 数据')
parser.add_argument('-t', '--timeout', dest='timeout', type=int, help='超时时间')
parser.add_argument('-k', '--keep-alive', dest='keep_alive', default=False, action='store_true', help='是否启用长连接')
parser.add_argument('-a', '--auth', dest='auth', help='身份认证 eg. basic:user:password')
parser.add_argument('-H', '--headers', dest='headers', help='请求头')
parser.add_argument('-C', '--cookies', dest='cookies', type=str, help='请求cookies')
parser.add_argument('urls', nargs='*', help='请求URL(一个或多个)')
parser.add_argument('--version', action='version', version='%(prog)s {}'.format('1.0'), help="当前版本")
if len(sys.argv) == 1:
parser.print_help()
return
else:
if shell_args is None:
shell_args = sys.argv[1:]
args = parser.parse_args(shell_args)
urls = args.urls
if args.file_path:
with open(args.file_path) as open_file:
urls = [_.strip() for _ in open_file]
def uri_validator(x):
try:
result = urlparse(x)
return result.scheme and result.netloc
except:
return False
for url in urls:
if not uri_validator(url):
print('URL校验失败,请检查你的URL是否有效')
return
concurrency = args.concurrency
total_requests = args.total_requests or len(urls)
timeout = args.timeout
method = args.method.upper()
keep_alive = args.keep_alive
auth_str = args.auth
auth = None
if auth_str:
(auth_method, auth_user, auth_password) = auth_str.split(':')
auth = auth_class[auth_method.upper()](auth_user, auth_password)
data = args.data
_json = eval(args.json) if args.json else None
headers = eval(args.headers) if args.headers else None
cookies = eval(args.cookies) if args.cookies else None
return {
'concurrency': concurrency,
'total_requests': total_requests,
'urls': urls,
'timeout': timeout,
'method': method,
'keep_alive': keep_alive,
'auth': auth,
'data': data,
'json': _json,
'headers': headers,
'cookies': cookies
}
def run():
args = None
# try:
args = parse_args(sys.argv[1:])
# except Exception:
# print('请检查命令是否正确')
if args:
print('正在进行压测.....')
start_bench(concurrency=args['concurrency'], total_requests=args['total_requests'], urls=args['urls'],
timeout=args['timeout'], method=args['method'], keep_alive=args['keep_alive'], auth=args['auth'],
data=args['data'], json=args['json'],
headers=args['headers'], cookies=args['cookies'])
if __name__ == '__main__':
run()