forked from xmlsec/python-xmlsec
-
Notifications
You must be signed in to change notification settings - Fork 0
/
base.py
140 lines (110 loc) · 4.19 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import gc
import os
import sys
import unittest
from lxml import etree
import xmlsec
if sys.version_info < (3,):
unittest.TestCase.assertRaisesRegex = unittest.TestCase.assertRaisesRegexp
etype = type(etree.Element("test"))
ns = {'dsig': xmlsec.constants.DSigNs, 'enc': xmlsec.constants.EncNs}
try:
import resource
def get_memory_usage():
return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
except ImportError:
resource = None
def get_memory_usage():
return 0
def get_iterations():
if sys.platform in ('win32',):
return 0
try:
return int(os.getenv("PYXMLSEC_TEST_ITERATIONS", "10"))
except ValueError:
return 0
class TestMemoryLeaks(unittest.TestCase):
maxDiff = None
iterations = get_iterations()
data_dir = os.path.join(os.path.dirname(__file__), "data")
def setUp(self):
gc.disable()
self.addTypeEqualityFunc(etype, "assertXmlEqual")
xmlsec.enable_debug_trace(1)
def run(self, result=None):
# run first time
super(TestMemoryLeaks, self).run(result=result)
if self.iterations == 0:
return
m_usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
o_count = gc.get_count()[0]
m_hits = 0
o_hits = 0
for i in range(self.iterations):
super(TestMemoryLeaks, self).run(result=result)
m_usage_n = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
if m_usage_n > m_usage:
m_usage = m_usage_n
m_hits += 1
o_count_n = gc.get_count()[0]
if o_count_n > o_count:
o_count = o_count_n
o_hits += 1
del m_usage_n
del o_count_n
if m_hits > int(self.iterations * 0.8):
result.buffer = False
try:
raise AssertionError("memory leak detected")
except AssertionError:
result.addError(self, sys.exc_info())
if o_hits > int(self.iterations * 0.8):
result.buffer = False
try:
raise AssertionError("unreferenced objects detected")
except AssertionError:
result.addError(self, sys.exc_info())
def path(self, name):
"""returns full path for resource"""
return os.path.join(self.data_dir, name)
def load(self, name):
"""loads resource by name"""
with open(self.path(name), "rb") as stream:
return stream.read()
def load_xml(self, name, xpath=None):
"""returns xml.etree"""
root = etree.parse(self.path(name)).getroot()
if xpath is None:
return root
return root.find(xpath)
def dump(self, root):
print(etree.tostring(root))
def assertXmlEqual(self, first, second, msg=None):
"""Checks equality of etree.roots"""
msg = msg or ''
if first.tag != second.tag:
self.fail('Tags do not match: %s and %s. %s' % (first.tag, second.tag, msg))
for name, value in first.attrib.items():
if second.attrib.get(name) != value:
self.fail('Attributes do not match: %s=%r, %s=%r. %s' % (name, value, name, second.attrib.get(name), msg))
for name in second.attrib.keys():
if name not in first.attrib:
self.fail('x2 has an attribute x1 is missing: %s. %s' % (name, msg))
if not xml_text_compare(first.text, second.text):
self.fail('text: %r != %r. %s' % (first.text, second.text, msg))
if not xml_text_compare(first.tail, second.tail):
self.fail('tail: %r != %r. %s' % (first.tail, second.tail, msg))
cl1 = sorted(first.getchildren(), key=lambda x: x.tag)
cl2 = sorted(second.getchildren(), key=lambda x: x.tag)
if len(cl1) != len(cl2):
self.fail('children length differs, %i != %i. %s' % (len(cl1), len(cl2), msg))
i = 0
for c1, c2 in zip(cl1, cl2):
i += 1
self.assertXmlEqual(c1, c2)
def xml_text_compare(t1, t2):
if not t1 and not t2:
return True
if t1 == '*' or t2 == '*':
return True
return (t1 or '').strip() == (t2 or '').strip()