-
Notifications
You must be signed in to change notification settings - Fork 168
/
Copy pathmisc_test.py
134 lines (111 loc) · 4.53 KB
/
misc_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import importlib
import platform
import traceback
import unittest
from unittest.mock import MagicMock
import xml.dom.minidom
import stomp
from stomp.exception import *
from stomp.listener import *
from stomp.test.testutils import *
class TransformationListener(TestListener):
def __init__(self, receipt):
TestListener.__init__(self, receipt)
self.message = None
def on_before_message(self, headers, body):
if 'transformation' in headers:
trans_type = headers['transformation']
if trans_type != 'jms-map-xml':
return body
try:
entries = {}
doc = xml.dom.minidom.parseString(body)
rootElem = doc.documentElement
for entryElem in rootElem.getElementsByTagName("entry"):
pair = []
for node in entryElem.childNodes:
if not isinstance(node, xml.dom.minidom.Element):
continue
pair.append(node.firstChild.nodeValue)
assert len(pair) == 2
entries[pair[0]] = pair[1]
return (headers, entries)
except Exception:
#
# unable to parse message. return original
#
traceback.print_exc()
return (headers, body)
def on_message(self, headers, body):
TestListener.on_message(self, headers, body)
self.message = body
class TestMessageTransform(unittest.TestCase):
def setUp(self):
conn = stomp.Connection(get_rabbitmq_host())
listener = TransformationListener('123')
conn.set_listener('', listener)
conn.connect(get_rabbitmq_user(), get_rabbitmq_password(), wait=True)
self.conn = conn
self.listener = listener
self.timestamp = time.strftime('%Y%m%d%H%M%S')
def tearDown(self):
if self.conn:
self.conn.disconnect(receipt=None)
def test_transform(self):
queuename = '/queue/testtransform-%s' % self.timestamp
self.conn.subscribe(destination=queuename, id=1, ack='auto')
self.conn.send(body='''<map>
<entry>
<string>name</string>
<string>Dejan</string>
</entry>
<entry>
<string>city</string>
<string>Belgrade</string>
</entry>
</map>''', destination=queuename, headers={'transformation': 'jms-map-xml'}, receipt='123')
self.listener.wait_on_receipt()
self.listener.wait_for_message()
self.assertTrue(self.listener.message is not None, 'Did not receive a message')
self.assertTrue(self.listener.message.__class__ == dict,
'Message type should be dict after transformation, was %s' % self.listener.message.__class__)
self.assertTrue(self.listener.message['name'] == 'Dejan', 'Missing an expected dict element')
self.assertTrue(self.listener.message['city'] == 'Belgrade', 'Missing an expected dict element')
class TestNoResponseConnectionKill(unittest.TestCase):
def setUp(self):
self.server = TestStompServer('127.0.0.1', 60000)
self.server.start()
self.timeout_thread = threading.Thread(name='shutdown test server', target=self.timeout_server)
def timeout_server(self):
time.sleep(3)
logging.info('Stopping server')
self.server.running = False
self.server.stop()
def test_noresponse(self):
try:
conn = stomp.Connection([('127.0.0.1', 60000)], heartbeats=(1000, 1000))
listener = TestListener(print_to_log=True)
conn.set_listener('', listener)
self.timeout_thread.start()
conn.connect(wait=True)
self.fail("Shouldn't happen")
except ConnectFailedException:
logging.info('Received connect failed - test success')
except Exception:
self.fail("Shouldn't happen")
class TestMiscellaneousLogic(unittest.TestCase):
def setUp(self):
platform.system = MagicMock(return_value="Windows")
def test_windows_colours(self):
import stomp.colours
importlib.reload(stomp.colours)
self.assertEquals("", stomp.colours.GREEN)
self.assertEquals("", stomp.colours.RED)
self.assertEquals("", stomp.colours.BOLD)
self.assertEquals("", stomp.colours.NO_COLOUR)
# just here for coverage
def test_publisher(self):
p = Publisher()
p.set_listener('test', None)
p.remove_listener('test')
self.assertIsNone(p.get_listener('test'))