From 6314faebee343e48b064472b375819c0c225e254 Mon Sep 17 00:00:00 2001 From: Jason R Briggs Date: Sun, 19 Apr 2020 15:49:31 +0100 Subject: [PATCH 01/10] change notify to pass in the frame --- stomp/adapter/multicast.py | 10 +++++----- stomp/transport.py | 10 +++++++--- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/stomp/adapter/multicast.py b/stomp/adapter/multicast.py index 26ceea55..4aa13bc3 100644 --- a/stomp/adapter/multicast.py +++ b/stomp/adapter/multicast.py @@ -76,8 +76,8 @@ def process_frame(self, f, frame_str): if frame_type == "message": if f.headers["destination"] not in self.subscriptions.values(): return - (f.headers, f.body) = self.notify("before_message", f.headers, f.body) - self.notify(frame_type, f.headers, f.body) + (f.headers, f.body) = self.notify("before_message", f) + self.notify(frame_type, f) if "receipt" in f.headers: receipt_frame = Frame("RECEIPT", {"receipt-id": f.headers["receipt"]}) lines = convert_frame(receipt_frame) @@ -153,13 +153,13 @@ def send_frame(self, cmd, headers=None, body=''): if cmd == CMD_BEGIN: trans = headers[HDR_TRANSACTION] if trans in self.transactions: - self.notify("error", {}, "Transaction %s already started" % trans) + self.notify("error", Frame(None, {}, "Transaction %s already started" % trans)) else: self.transactions[trans] = [] elif cmd == CMD_COMMIT: trans = headers[HDR_TRANSACTION] if trans not in self.transactions: - self.notify("error", {}, "Transaction %s not started" % trans) + self.notify("error", Frame(None, {}, "Transaction %s not started" % trans)) else: for f in self.transactions[trans]: self.transport.transmit(f) @@ -171,7 +171,7 @@ def send_frame(self, cmd, headers=None, body=''): if "transaction" in headers: trans = headers["transaction"] if trans not in self.transactions: - self.transport.notify("error", {}, "Transaction %s not started" % trans) + self.transport.notify("error", Frame(None, {}, "Transaction %s not started" % trans)) return else: self.transactions[trans].append(frame) diff --git a/stomp/transport.py b/stomp/transport.py index 428617a6..704000f3 100644 --- a/stomp/transport.py +++ b/stomp/transport.py @@ -180,16 +180,16 @@ def process_frame(self, f, frame_str): frame_type = f.cmd.lower() if frame_type in ["connected", "message", "receipt", "error", "heartbeat"]: if frame_type == "message": - (f.headers, f.body) = self.notify("before_message", f.headers, f.body) + (f.headers, f.body) = self.notify("before_message", f) if logging.isEnabledFor(logging.DEBUG): logging.debug("Received frame: %r, headers=%r, body=%r", f.cmd, f.headers, f.body) else: logging.info("Received frame: %r, len(body)=%r", f.cmd, length(f.body)) - self.notify(frame_type, f.headers, f.body) + self.notify(frame_type, f) else: logging.warning("Unknown response frame type: '%s' (frame length was %d)", frame_type, length(frame_str)) - def notify(self, frame_type, headers=None, body=None): + def notify(self, frame_type, frame=None): """ Utility function for notifying listeners of incoming and outgoing messages @@ -197,6 +197,10 @@ def notify(self, frame_type, headers=None, body=None): :param dict headers: the map of headers associated with the message :param body: the content of the message """ + headers, body = (None, None) + if frame is not None: + headers, body = (frame.headers, frame.body) + if frame_type == "receipt": # logic for wait-on-receipt notification receipt = headers["receipt-id"] From eba178b5dda5cab873a4b45ac040587a9f55b7f3 Mon Sep 17 00:00:00 2001 From: Jason R Briggs Date: Sun, 19 Apr 2020 17:13:31 +0100 Subject: [PATCH 02/10] pass frame as argument to listener funcs --- stomp/__main__.py | 43 ++++++++++--------- stomp/listener.py | 104 ++++++++++++++++++++++----------------------- stomp/transport.py | 2 +- tests/test_misc.py | 22 +++++----- 4 files changed, 86 insertions(+), 85 deletions(-) diff --git a/stomp/__main__.py b/stomp/__main__.py index 56d1e19c..9bdc215c 100644 --- a/stomp/__main__.py +++ b/stomp/__main__.py @@ -101,7 +101,7 @@ def __init__(self, host='localhost', port=61613, user='', passcode='', ver='1.1' self.__subscriptions = {} self.__subscription_id = 1 - def __print_async(self, frame_type, headers, body): + def __print_async(self, frame_type, frame): """ Utility function to print a message and setup the command prompt for the next input @@ -110,16 +110,16 @@ def __print_async(self, frame_type, headers, body): return if self.verbose: self.__sysout(frame_type) - for k, v in headers.items(): + for k, v in frame.headers.items(): self.__sysout("%s: %s" % (k, v)) else: - if "message-id" in headers: - self.__sysout("message-id: %s" % headers["message-id"]) - if "subscription" in headers: - self.__sysout("subscription: %s" % headers["subscription"]) + if "message-id" in frame.headers: + self.__sysout("message-id: %s" % frame.headers["message-id"]) + if "subscription" in frame.headers: + self.__sysout("subscription: %s" % frame.headers["subscription"]) if self.prompt != '': self.__sysout('') - self.__sysout(body) + self.__sysout(frame.body) if not self.__start: self.__sysout(self.prompt, end='') else: @@ -147,7 +147,7 @@ def on_disconnected(self): if not self.__quit: self.__error("lost connection") - def on_message(self, headers, body): + def on_message(self, frame): """ See :py:meth:`ConnectionListener.on_message` @@ -155,35 +155,36 @@ def on_message(self, headers, body): as a file """ self.__sysout('') - if "filename" in headers: - content = base64.b64decode(body.encode()) - if os.path.exists(headers["filename"]): - fname = "%s.%s" % (headers["filename"], int(time.time())) + if "filename" in frame.headers: + content = base64.b64decode(frame.body.encode()) + if os.path.exists(frame.headers["filename"]): + fname = "%s.%s" % (frame.headers["filename"], int(time.time())) else: - fname = headers["filename"] + fname = frame.headers["filename"] with open(fname, 'wb') as f: f.write(content) - self.__print_async("MESSAGE", headers, "Saved file: %s" % fname) + frame.body = "Saved file: %s" % fname + self.__print_async("MESSAGE", frame) else: - self.__print_async("MESSAGE", headers, body) + self.__print_async("MESSAGE", frame) - def on_error(self, headers, body): + def on_error(self, frame): """ See :py:meth:`ConnectionListener.on_error` """ - self.__print_async("ERROR", headers, body) + self.__print_async("ERROR", frame) - def on_receipt(self, headers, body): + def on_receipt(self, frame): """ See :py:meth:`ConnectionListener.on_receipt` """ - self.__print_async("RECEIPT", headers, body) + self.__print_async("RECEIPT", frame) - def on_connected(self, headers, body): + def on_connected(self, frame): """ See :py:meth:`ConnectionListener.on_connected` """ - self.__print_async("CONNECTED", headers, body) + self.__print_async("CONNECTED", frame) def on_send(self, frame): if self.verbose: diff --git a/stomp/listener.py b/stomp/listener.py index 260d5328..82be2b92 100644 --- a/stomp/listener.py +++ b/stomp/listener.py @@ -64,7 +64,7 @@ def on_connecting(self, host_and_port): """ pass - def on_connected(self, headers, body): + def on_connected(self, frame): """ Called by the STOMP connection when a CONNECTED frame is received (after a connection has been established or @@ -90,7 +90,7 @@ def on_heartbeat_timeout(self): """ pass - def on_before_message(self, headers, body): + def on_before_message(self, frame): """ Called by the STOMP connection before a message is returned to the client app. Returns a tuple containing the headers and body (so that implementing listeners can pre-process the content). @@ -98,9 +98,9 @@ def on_before_message(self, headers, body): :param dict headers: the message headers :param body: the message body """ - return headers, body + return frame.headers, frame.body - def on_message(self, headers, body): + def on_message(self, frame): """ Called by the STOMP connection when a MESSAGE frame is received. @@ -109,7 +109,7 @@ def on_message(self, headers, body): """ pass - def on_receipt(self, headers, body): + def on_receipt(self, frame): """ Called by the STOMP connection when a RECEIPT frame is received, sent by the server if requested by the client using @@ -120,7 +120,7 @@ def on_receipt(self, headers, body): """ pass - def on_error(self, headers, body): + def on_error(self, frame): """ Called by the STOMP connection when an ERROR frame is received. @@ -143,7 +143,7 @@ def on_heartbeat(self): """ pass - def on_receiver_loop_completed(self, headers, body): + def on_receiver_loop_completed(self, frame): """ Called when the connection receiver_loop has finished. """ @@ -164,7 +164,7 @@ def __init__(self, transport, heartbeats, heart_beat_receive_scale=1.5): self.heart_beat_receive_scale = heart_beat_receive_scale self.heartbeat_terminate_event = threading.Event() - def on_connected(self, headers, body): + def on_connected(self, frame): """ Once the connection is established, and 'heart-beat' is found in the headers, we calculate the real heartbeat numbers (based on what the server sent and what was specified by the client) - if the heartbeats @@ -173,9 +173,9 @@ def on_connected(self, headers, body): :param dict headers: headers in the connection message :param body: the message body """ - if "heart-beat" in headers: + if "heart-beat" in frame.headers: self.heartbeats = utils.calculate_heartbeats( - headers["heart-beat"].replace(' ', '').split(','), self.heartbeats) + frame.headers["heart-beat"].replace(' ', '').split(','), self.heartbeats) logging.debug("Heartbeats calculated %s", str(self.heartbeats)) if self.heartbeats != (0, 0): self.send_sleep = self.heartbeats[0] / 1000 @@ -200,7 +200,7 @@ def on_disconnected(self): self.running = False self.heartbeat_terminate_event.set() - def on_message(self, headers, body): + def on_message(self, frame): """ Reset the last received time whenever a message is received. @@ -326,14 +326,14 @@ def __init__(self, receipt): self.received = False self.disconnected = False - def on_receipt(self, headers, body): + def on_receipt(self, frame): """ If the receipt id can be found in the headers, then notify the waiting thread. :param dict headers: headers in the message :param body: the message content """ - if "receipt-id" in headers and headers["receipt-id"] == self.receipt: + if "receipt-id" in frame.headers and frame.headers["receipt-id"] == self.receipt: with self.receipt_condition: self.received = True self.receipt_condition.notify() @@ -387,7 +387,7 @@ def on_disconnected(self): self.disconnects += 1 logging.info("disconnected (x %s)", self.disconnects) - def on_error(self, headers, body): + def on_error(self, frame): """ Increment the error count. See :py:meth:`ConnectionListener.on_error` @@ -395,9 +395,9 @@ def on_error(self, headers, body): :param body: the message content """ if logging.isEnabledFor(logging.DEBUG): - logging.debug("received an error %s [%s]", body, headers) + logging.debug("received an error %s [%s]", frame.body, frame.headers) else: - logging.info("received an error %s", body) + logging.info("received an error %s", frame.body) self.errors += 1 def on_connecting(self, host_and_port): @@ -409,7 +409,7 @@ def on_connecting(self, host_and_port): logging.info("connecting %s %s (x %s)", host_and_port[0], host_and_port[1], self.connections) self.connections += 1 - def on_message(self, headers, body): + def on_message(self, frame): """ Increment the message received count. See :py:meth:`ConnectionListener.on_message` @@ -468,12 +468,12 @@ def on_connecting(self, host_and_port): """ self.__print("on_connecting %s %s", *host_and_port) - def on_connected(self, headers, body): + def on_connected(self, frame): """ :param dict headers: :param body: """ - self.__print("on_connected %s %s", headers, body) + self.__print("on_connected %s %s", frame.headers, frame.body) def on_disconnected(self): self.__print("on_disconnected") @@ -481,34 +481,34 @@ def on_disconnected(self): def on_heartbeat_timeout(self): self.__print("on_heartbeat_timeout") - def on_before_message(self, headers, body): + def on_before_message(self, frame): """ :param dict headers: :param body: """ - self.__print("on_before_message %s %s", headers, body) - return headers, body + self.__print("on_before_message %s %s", frame.headers, frame.body) + return frame.headers, frame.body - def on_message(self, headers, body): + def on_message(self, frame): """ :param dict headers: :param body: """ - self.__print("on_message %s %s", headers, body) + self.__print("on_message %s %s", frame.headers, frame.body) - def on_receipt(self, headers, body): + def on_receipt(self, frame): """ :param dict headers: :param body: """ - self.__print("on_receipt %s %s", headers, body) + self.__print("on_receipt %s %s", frame.headers, frame.body) - def on_error(self, headers, body): + def on_error(self, frame): """ :param dict headers: :param body: """ - self.__print("on_error %s %s", headers, body) + self.__print("on_error %s %s", frame.headers, frame.body) def on_send(self, frame): """ @@ -559,10 +559,10 @@ def on_connecting(self, host_and_port): PrintingListener.on_connecting(self, host_and_port) WaitingListener.on_connecting(self, host_and_port) - def on_connected(self, headers, body): - StatsListener.on_connected(self, headers, body) - PrintingListener.on_connected(self, headers, body) - WaitingListener.on_connected(self, headers, body) + def on_connected(self, frame): + StatsListener.on_connected(self, frame) + PrintingListener.on_connected(self, frame) + WaitingListener.on_connected(self, frame) def on_disconnected(self): StatsListener.on_disconnected(self) @@ -574,32 +574,32 @@ def on_heartbeat_timeout(self): PrintingListener.on_heartbeat_timeout(self) WaitingListener.on_heartbeat_timeout(self) - def on_before_message(self, headers, body): - StatsListener.on_before_message(self, headers, body) - PrintingListener.on_before_message(self, headers, body) - WaitingListener.on_before_message(self, headers, body) + def on_before_message(self, frame): + StatsListener.on_before_message(self, frame) + PrintingListener.on_before_message(self, frame) + WaitingListener.on_before_message(self, frame) - def on_message(self, headers, message): + def on_message(self, frame): """ :param dict headers: :param message: """ - StatsListener.on_message(self, headers, message) - PrintingListener.on_message(self, headers, message) - self.message_list.append((headers, message)) + StatsListener.on_message(self, frame) + PrintingListener.on_message(self, frame) + self.message_list.append((frame.headers, frame.body)) with self.message_condition: self.message_received = True self.message_condition.notify() - def on_receipt(self, headers, body): - StatsListener.on_receipt(self, headers, body) - PrintingListener.on_receipt(self, headers, body) - WaitingListener.on_receipt(self, headers, body) + def on_receipt(self, frame): + StatsListener.on_receipt(self, frame) + PrintingListener.on_receipt(self, frame) + WaitingListener.on_receipt(self, frame) - def on_error(self, headers, body): - StatsListener.on_error(self, headers, body) - PrintingListener.on_error(self, headers, body) - WaitingListener.on_error(self, headers, body) + def on_error(self, frame): + StatsListener.on_error(self, frame) + PrintingListener.on_error(self, frame) + WaitingListener.on_error(self, frame) def on_send(self, frame): StatsListener.on_send(self, frame) @@ -613,7 +613,7 @@ def on_heartbeat(self): self.heartbeat_received = True self.heartbeat_condition.notify() - def on_receiver_loop_completed(self, headers, body): - StatsListener.on_receiver_loop_completed(self, headers, body) - PrintingListener.on_receiver_loop_completed(self, headers, body) - WaitingListener.on_receiver_loop_completed(self, headers, body) + def on_receiver_loop_completed(self, frame): + StatsListener.on_receiver_loop_completed(self, frame) + PrintingListener.on_receiver_loop_completed(self, frame) + WaitingListener.on_receiver_loop_completed(self, frame) diff --git a/stomp/transport.py b/stomp/transport.py index 704000f3..72baf536 100644 --- a/stomp/transport.py +++ b/stomp/transport.py @@ -243,7 +243,7 @@ def notify(self, frame_type, frame=None): self.connection_error = True self.__connect_wait_condition.notify() - rtn = notify_func(headers, body) + rtn = notify_func(frame) if rtn: (headers, body) = rtn return (headers, body) diff --git a/tests/test_misc.py b/tests/test_misc.py index 817d7453..a697cc2d 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -18,15 +18,15 @@ def __init__(self, receipt, print_to_log): TestListener.__init__(self, receipt, print_to_log) self.message = None - def on_before_message(self, headers, body): - if "transformation" in headers: - trans_type = headers["transformation"] + def on_before_message(self, frame): + if "transformation" in frame.headers: + trans_type = frame.headers["transformation"] if trans_type != "jms-map-xml": - return body + return frame.body try: entries = {} - doc = xml.dom.minidom.parseString(body) + doc = xml.dom.minidom.parseString(frame.body) rootElem = doc.documentElement for entryElem in rootElem.getElementsByTagName("entry"): pair = [] @@ -36,17 +36,17 @@ def on_before_message(self, headers, body): pair.append(node.firstChild.nodeValue) assert len(pair) == 2 entries[pair[0]] = pair[1] - return (headers, entries) + return (frame.headers, entries) except Exception: # # unable to parse message. return original # traceback.print_exc() - return (headers, body) + return (frame.headers, frame.body) - def on_message(self, headers, body): - TestListener.on_message(self, headers, body) - self.message = body + def on_message(self, frame): + TestListener.on_message(self, frame) + self.message = frame.body @pytest.fixture() @@ -149,7 +149,7 @@ def test_on_heartbeat(self): def test_heartbeatlistener(self, mocker): transport = mocker.MagicMock() hl = HeartbeatListener(transport, (10000,20000)) - hl.on_connected({"heart-beat": "10000,20000"}, '') + hl.on_connected(Frame('heartbeat', {"heart-beat": "10000,20000"}, '')) time.sleep(1) hl.on_message From 24819247acede4e3429f0d7746b0d1133a96081f Mon Sep 17 00:00:00 2001 From: Jason R Briggs Date: Sun, 19 Apr 2020 22:43:01 +0100 Subject: [PATCH 03/10] change notify func to pass the frame rather than headers, body --- stomp/adapter/multicast.py | 4 ++-- stomp/listener.py | 5 +---- stomp/transport.py | 13 +++---------- tests/test_misc.py | 19 +++++++++---------- 4 files changed, 15 insertions(+), 26 deletions(-) diff --git a/stomp/adapter/multicast.py b/stomp/adapter/multicast.py index 4aa13bc3..3de43a46 100644 --- a/stomp/adapter/multicast.py +++ b/stomp/adapter/multicast.py @@ -76,7 +76,7 @@ def process_frame(self, f, frame_str): if frame_type == "message": if f.headers["destination"] not in self.subscriptions.values(): return - (f.headers, f.body) = self.notify("before_message", f) + self.notify("before_message", f) self.notify(frame_type, f) if "receipt" in f.headers: receipt_frame = Frame("RECEIPT", {"receipt-id": f.headers["receipt"]}) @@ -148,7 +148,7 @@ def send_frame(self, cmd, headers=None, body=''): """ if headers is None: headers = {} - frame = utils.Frame(cmd, headers, body) + frame = Frame(cmd, headers, body) if cmd == CMD_BEGIN: trans = headers[HDR_TRANSACTION] diff --git a/stomp/listener.py b/stomp/listener.py index 82be2b92..4f916e27 100644 --- a/stomp/listener.py +++ b/stomp/listener.py @@ -73,7 +73,6 @@ def on_connected(self, frame): :param dict headers: a dictionary containing all headers sent by the server as key/value pairs. :param body: the frame's payload. This is usually empty for CONNECTED frames. """ - pass def on_disconnected(self): """ @@ -98,7 +97,7 @@ def on_before_message(self, frame): :param dict headers: the message headers :param body: the message body """ - return frame.headers, frame.body + pass def on_message(self, frame): """ @@ -487,7 +486,6 @@ def on_before_message(self, frame): :param body: """ self.__print("on_before_message %s %s", frame.headers, frame.body) - return frame.headers, frame.body def on_message(self, frame): """ @@ -553,7 +551,6 @@ def wait_for_heartbeat(self): self.heartbeat_condition.wait() self.heartbeat_received = False - def on_connecting(self, host_and_port): StatsListener.on_connecting(self, host_and_port) PrintingListener.on_connecting(self, host_and_port) diff --git a/stomp/transport.py b/stomp/transport.py index 72baf536..db0840fe 100644 --- a/stomp/transport.py +++ b/stomp/transport.py @@ -180,7 +180,7 @@ def process_frame(self, f, frame_str): frame_type = f.cmd.lower() if frame_type in ["connected", "message", "receipt", "error", "heartbeat"]: if frame_type == "message": - (f.headers, f.body) = self.notify("before_message", f) + self.notify("before_message", f) if logging.isEnabledFor(logging.DEBUG): logging.debug("Received frame: %r, headers=%r, body=%r", f.cmd, f.headers, f.body) else: @@ -197,13 +197,9 @@ def notify(self, frame_type, frame=None): :param dict headers: the map of headers associated with the message :param body: the content of the message """ - headers, body = (None, None) - if frame is not None: - headers, body = (frame.headers, frame.body) - if frame_type == "receipt": # logic for wait-on-receipt notification - receipt = headers["receipt-id"] + receipt = frame.headers["receipt-id"] receipt_value = self.__receipts.get(receipt) with self.__send_wait_condition: self.set_receipt(receipt, None) @@ -243,10 +239,7 @@ def notify(self, frame_type, frame=None): self.connection_error = True self.__connect_wait_condition.notify() - rtn = notify_func(frame) - if rtn: - (headers, body) = rtn - return (headers, body) + notify_func(frame) def transmit(self, frame): """ diff --git a/tests/test_misc.py b/tests/test_misc.py index a697cc2d..85f6c69b 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -22,7 +22,7 @@ def on_before_message(self, frame): if "transformation" in frame.headers: trans_type = frame.headers["transformation"] if trans_type != "jms-map-xml": - return frame.body + return try: entries = {} @@ -36,17 +36,16 @@ def on_before_message(self, frame): pair.append(node.firstChild.nodeValue) assert len(pair) == 2 entries[pair[0]] = pair[1] - return (frame.headers, entries) + frame.body = entries except Exception: # # unable to parse message. return original # traceback.print_exc() - return (frame.headers, frame.body) def on_message(self, frame): TestListener.on_message(self, frame) - self.message = frame.body + self.message = frame @pytest.fixture() @@ -96,14 +95,14 @@ def test_transform(self, conn): ''', destination=queuename, headers={"transformation": "jms-map-xml"}, receipt="123") - listener.wait_on_receipt() listener.wait_for_message() - assert listener.message is not None, "Did not receive a message" - assert listener.message.__class__ == dict, "Message type should be dict after transformation, was %s" % listener.message.__class__ - assert listener.message["name"] == "Dejan", "Missing an expected dict element" - assert listener.message["city"] == "Belgrade", "Missing an expected dict element" + message = listener.message.body + assert message is not None, "Did not receive a message" + assert type(message) == dict, "Message type should be dict after transformation, was %s" % message.__class__ + assert message["name"] == "Dejan", "Missing an expected dict element" + assert message["city"] == "Belgrade", "Missing an expected dict element" class TestNoResponseConnectionKill(object): @@ -154,4 +153,4 @@ def test_heartbeatlistener(self, mocker): hl.on_message # just check if there was a received heartbeat calculated - assert hl.received_heartbeat > 0 \ No newline at end of file + assert hl.received_heartbeat > 0 From 39d417e58ed3289573a955d26e340482417e602a Mon Sep 17 00:00:00 2001 From: Jason R Briggs Date: Tue, 21 Apr 2020 21:21:51 +0100 Subject: [PATCH 04/10] final fix to get original headers working --- stomp/utils.py | 13 +++++++------ tests/test_misc.py | 32 +++++++++++++++++++++++++++++++- tests/test_utils.py | 4 ++-- 3 files changed, 40 insertions(+), 9 deletions(-) diff --git a/stomp/utils.py b/stomp/utils.py index 5bf4a794..7ad805ad 100644 --- a/stomp/utils.py +++ b/stomp/utils.py @@ -2,6 +2,7 @@ """ import copy +import logging import os import re import socket @@ -190,7 +191,6 @@ def parse_frame(frame): :rtype: Frame """ - f = Frame() mat = PREAMBLE_END_RE.search(frame) if mat: preamble_end = mat.start() @@ -201,7 +201,7 @@ def parse_frame(frame): preamble = decode(frame[0:preamble_end]) preamble_lines = LINE_END_RE.split(preamble) preamble_len = len(preamble_lines) - f.body = frame[body_start:] + body = frame[body_start:] # Skip any leading newlines first_line = 0 @@ -212,12 +212,12 @@ def parse_frame(frame): return None # Extract frame type/command - f.cmd = preamble_lines[first_line] + cmd = preamble_lines[first_line] # Put headers into a key/value map - f.headers = parse_headers(preamble_lines, first_line + 1) + headers = parse_headers(preamble_lines, first_line + 1) - return f + return Frame(cmd, headers, body) def merge_headers(header_map_list): @@ -326,9 +326,10 @@ class Frame(object): :param dict headers: a map of headers for the frame :param body: the content of the frame. """ - def __init__(self, cmd=None, headers=None, body=None): + def __init__(self, cmd, headers=None, body=None): self.cmd = cmd self.headers = headers if headers is not None else {} + self.original_headers = copy.copy(self.headers) self.body = body def __str__(self): diff --git a/tests/test_misc.py b/tests/test_misc.py index 85f6c69b..bf2b3cb5 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -48,6 +48,15 @@ def on_message(self, frame): self.message = frame +class HeaderModListener(TestListener): + def on_before_message(self, frame): + frame.headers["testheader"] = "modifiedheader" + + def on_message(self, frame): + TestListener.on_message(self, frame) + self.message = frame + + @pytest.fixture() def conn(): conn = stomp.Connection11(get_rabbitmq_host()) @@ -57,6 +66,15 @@ def conn(): conn.disconnect(receipt=None) +@pytest.fixture() +def conn2(): + conn2 = stomp.Connection11(get_rabbitmq_host()) + conn2.set_listener("testlistener", HeaderModListener("123", print_to_log=True)) + conn2.connect(get_rabbitmq_user(), get_rabbitmq_password(), wait=True) + yield conn2 + conn2.disconnect(receipt=None) + + def timeout_server(svr): time.sleep(3) logging.info("Stopping server %s" % svr) @@ -78,7 +96,6 @@ def timeout_thread(miscserver): class TestMessageTransform(object): - def test_transform(self, conn): listener = conn.get_listener("testlistener") queuename = "/queue/testtransform-%s" % listener.timestamp @@ -154,3 +171,16 @@ def test_heartbeatlistener(self, mocker): # just check if there was a received heartbeat calculated assert hl.received_heartbeat > 0 + + def test_original_headers(self, conn2): + listener = conn2.get_listener("testlistener") + queuename = "/queue/testheadermod-%s" % listener.timestamp + conn2.subscribe(destination=queuename, id=1, ack="auto") + + conn2.send(body="test message", destination=queuename, headers={"testheader": "originalheader"}, receipt="123") + + listener.wait_on_receipt() + listener.wait_for_message() + + assert "modifiedheader" == listener.message.headers["testheader"] + assert "originalheader" == listener.message.original_headers["testheader"] diff --git a/tests/test_utils.py b/tests/test_utils.py index 1a9226b8..b6154a16 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -76,8 +76,8 @@ def test_parse_frame(self): assert str(f) == str(Frame("MESSAGE", {"content-type": "text/plain"}, b"hello world!")) def test_clean_default_headers(self): - Frame().headers["foo"] = "bar" - assert Frame().headers == {} + Frame('test').headers["foo"] = "bar" + assert Frame('test').headers == {} def test_join(self): str = stomp.utils.join((b'a', b'b', b'c')) From 4d631264f701ecd593b49271b1f3ca72655dbeb4 Mon Sep 17 00:00:00 2001 From: Jason R Briggs Date: Wed, 22 Apr 2020 19:06:32 +0100 Subject: [PATCH 05/10] minor makefile change --- Makefile | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Makefile b/Makefile index fea1d0bc..18ba035a 100644 --- a/Makefile +++ b/Makefile @@ -60,3 +60,6 @@ run-docker: remove-docker: docker stop stomppy docker rm stomppy + + +docker: remove-docker docker-image run-docker \ No newline at end of file From ff3852f2b94e85e60e648a35c44b9142fdc42d0f Mon Sep 17 00:00:00 2001 From: Jason R Briggs Date: Sat, 25 Apr 2020 17:19:32 +0100 Subject: [PATCH 06/10] update version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 55184434..3d0c8fa3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "stomp.py" -version = "6.1.0" +version = "7.0.0" description = "Python STOMP client, supporting versions 1.0, 1.1 and 1.2 of the protocol" authors = ["Jason R Briggs "] license = "Apache-2.0" From 4d9c0204d3ac396acdc62dfed57b5817481d9793 Mon Sep 17 00:00:00 2001 From: Jason R Briggs Date: Sat, 25 Apr 2020 17:24:57 +0100 Subject: [PATCH 07/10] update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3de8c554..cf8b649c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * Add support for \r\n EOL handling (as per [stomp protocol v1.2](http://stomp.github.io/stomp-specification-1.2.html#Augmented_BNF)) * Remove heartbeat loop sleep (issue https://github.com/jasonrbriggs/stomp.py/issues/297, https://github.com/jasonrbriggs/stomp.py/pull/298) * Update version number using the makefile and the poetry version command + * Add `original_headers` access to the Frame so that you can get the original value of a header even if a listener modifies it (issue https://github.com/jasonrbriggs/stomp.py/issues/300, PR https://github.com/jasonrbriggs/stomp.py/pull/309) ## Version 6.0.0 - Feb 2020 From 2ab7609b27a62362b1038266ea86571003e9afd9 Mon Sep 17 00:00:00 2001 From: Jason R Briggs Date: Sat, 25 Apr 2020 17:25:07 +0100 Subject: [PATCH 08/10] minor code tidy up --- stomp/__main__.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/stomp/__main__.py b/stomp/__main__.py index 9bdc215c..9ec0c73f 100644 --- a/stomp/__main__.py +++ b/stomp/__main__.py @@ -164,9 +164,7 @@ def on_message(self, frame): with open(fname, 'wb') as f: f.write(content) frame.body = "Saved file: %s" % fname - self.__print_async("MESSAGE", frame) - else: - self.__print_async("MESSAGE", frame) + self.__print_async("MESSAGE", frame) def on_error(self, frame): """ From 998f5c79228c60f078280ccd0b6ab690776bc90d Mon Sep 17 00:00:00 2001 From: Jason R Briggs Date: Sat, 25 Apr 2020 17:29:14 +0100 Subject: [PATCH 09/10] update readme --- README.rst | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/README.rst b/README.rst index 9dd57855..d5c3b3d7 100644 --- a/README.rst +++ b/README.rst @@ -51,6 +51,13 @@ The current version of stomp.py supports: There is also legacy 3.1.7 version using the old 3-series code (see `3.1.7 on PyPi`_ and `3.1.7 on GitHub`_). This is no longer supported, but (at least as of 2018) there were still a couple of reports of this version still being used in the wild. +Note: stomp.py now follows [semantic versioning](https://semver.org/): + +- MAJOR version for incompatible API changes, +- MINOR version for functionality added in a backwards compatible manner, and +- PATCH version for backwards compatible bug fixes. + + Testing ======= From 03420b52c2cb0a9a797680853ca46c0410e3dad3 Mon Sep 17 00:00:00 2001 From: Jason R Briggs Date: Sun, 26 Apr 2020 10:15:28 +0100 Subject: [PATCH 10/10] minor fix for readme --- README.rst | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.rst b/README.rst index d5c3b3d7..d35f4b61 100644 --- a/README.rst +++ b/README.rst @@ -51,7 +51,7 @@ The current version of stomp.py supports: There is also legacy 3.1.7 version using the old 3-series code (see `3.1.7 on PyPi`_ and `3.1.7 on GitHub`_). This is no longer supported, but (at least as of 2018) there were still a couple of reports of this version still being used in the wild. -Note: stomp.py now follows [semantic versioning](https://semver.org/): +Note: stomp.py now follows `semantic versioning`_: - MAJOR version for incompatible API changes, - MINOR version for functionality added in a backwards compatible manner, and @@ -105,3 +105,5 @@ For testing locally, you'll need to install docker. Once installed: .. _`stompserver`: http://stompserver.rubyforge.org .. _`buy me a coffee`: https://www.paypal.me/jasonrbriggs + +.. _`semantic versioning`: https://semver.org/ \ No newline at end of file