Skip to content

Commit

Permalink
change notify to pass in the frame
Browse files Browse the repository at this point in the history
  • Loading branch information
Jason R Briggs committed Apr 19, 2020
1 parent 1521e1c commit 6314fae
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 8 deletions.
10 changes: 5 additions & 5 deletions stomp/adapter/multicast.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ def process_frame(self, f, frame_str):
if frame_type == "message":
if f.headers["destination"] not in self.subscriptions.values():
return
(f.headers, f.body) = self.notify("before_message", f.headers, f.body)
self.notify(frame_type, f.headers, f.body)
(f.headers, f.body) = self.notify("before_message", f)
self.notify(frame_type, f)
if "receipt" in f.headers:
receipt_frame = Frame("RECEIPT", {"receipt-id": f.headers["receipt"]})
lines = convert_frame(receipt_frame)
Expand Down Expand Up @@ -153,13 +153,13 @@ def send_frame(self, cmd, headers=None, body=''):
if cmd == CMD_BEGIN:
trans = headers[HDR_TRANSACTION]
if trans in self.transactions:
self.notify("error", {}, "Transaction %s already started" % trans)
self.notify("error", Frame(None, {}, "Transaction %s already started" % trans))
else:
self.transactions[trans] = []
elif cmd == CMD_COMMIT:
trans = headers[HDR_TRANSACTION]
if trans not in self.transactions:
self.notify("error", {}, "Transaction %s not started" % trans)
self.notify("error", Frame(None, {}, "Transaction %s not started" % trans))
else:
for f in self.transactions[trans]:
self.transport.transmit(f)
Expand All @@ -171,7 +171,7 @@ def send_frame(self, cmd, headers=None, body=''):
if "transaction" in headers:
trans = headers["transaction"]
if trans not in self.transactions:
self.transport.notify("error", {}, "Transaction %s not started" % trans)
self.transport.notify("error", Frame(None, {}, "Transaction %s not started" % trans))
return
else:
self.transactions[trans].append(frame)
Expand Down
10 changes: 7 additions & 3 deletions stomp/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,23 +180,27 @@ def process_frame(self, f, frame_str):
frame_type = f.cmd.lower()
if frame_type in ["connected", "message", "receipt", "error", "heartbeat"]:
if frame_type == "message":
(f.headers, f.body) = self.notify("before_message", f.headers, f.body)
(f.headers, f.body) = self.notify("before_message", f)
if logging.isEnabledFor(logging.DEBUG):
logging.debug("Received frame: %r, headers=%r, body=%r", f.cmd, f.headers, f.body)
else:
logging.info("Received frame: %r, len(body)=%r", f.cmd, length(f.body))
self.notify(frame_type, f.headers, f.body)
self.notify(frame_type, f)
else:
logging.warning("Unknown response frame type: '%s' (frame length was %d)", frame_type, length(frame_str))

def notify(self, frame_type, headers=None, body=None):
def notify(self, frame_type, frame=None):
"""
Utility function for notifying listeners of incoming and outgoing messages
:param str frame_type: the type of message
:param dict headers: the map of headers associated with the message
:param body: the content of the message
"""
headers, body = (None, None)
if frame is not None:
headers, body = (frame.headers, frame.body)

if frame_type == "receipt":
# logic for wait-on-receipt notification
receipt = headers["receipt-id"]
Expand Down

0 comments on commit 6314fae

Please sign in to comment.