Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue1155 AMQP ack refactor #1174

Merged
merged 3 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/flow_amqp_consumer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
fail-fast: false
matrix:
which_test: [ static_flow, no_mirror, flakey_broker, dynamic_flow, restart_server ]
osver: [ "ubuntu-20.04", "ubuntu-22.04" ]
osver: [ "ubuntu-20.04", "ubuntu-22.04", "ubuntu-24.04" ]

runs-on: ${{ matrix.osver }}

Expand Down
3 changes: 2 additions & 1 deletion sarracenia/moth/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,12 @@ def __init__(self, props=None, is_subscriber=True) -> None:
logging.basicConfig(format=self.o['logFormat'],
level=getattr(logging, self.o['logLevel'].upper()))

def ack(self, message: sarracenia.Message ) -> None:
def ack(self, message: sarracenia.Message ) -> bool:
"""
tell broker that a given message has been received.

ack uses the 'ack_id' property to send an acknowledgement back to the broker.
If there's no 'ack_id' in the message, you should return True.
"""
logger.error("ack unimplemented")

Expand Down
75 changes: 53 additions & 22 deletions sarracenia/moth/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import amqp
import copy
import json
import uuid

import logging

Expand Down Expand Up @@ -142,12 +143,17 @@ def _msgRawToDict(self, raw_msg) -> sarracenia.Message:
msg.deriveSource( self.o )
msg.deriveTopics( self.o, topic )

msg['ack_id'] = raw_msg.delivery_info['delivery_tag']
# keep track of which connection and channel the msg came from
msg['ack_id'] = { 'delivery_tag': raw_msg.delivery_info['delivery_tag'],
'channel_id': raw_msg.channel.channel_id,
'connection_id': self.connection_id,
'broker': self.broker,
}
msg['local_offset'] = 0
msg['_deleteOnPost'] |= set( ['ack_id', 'exchange', 'local_offset', 'subtopic'])
if not msg.validate():
if hasattr(self,'channel'):
self.channel.basic_ack(msg['ack_id'])
self.channel.basic_ack(msg['ack_id']['delivery_tag'])
logger.error('message acknowledged and discarded: %s' % msg)
msg = None
else:
Expand Down Expand Up @@ -188,6 +194,9 @@ def __init__(self, props, is_subscriber) -> None:
logger.setLevel(self.o['logLevel'].upper())

self.connection = None
self.connection_id = None
self.broker = None

def __connect(self, broker) -> bool:
"""
connect to broker.
Expand Down Expand Up @@ -223,6 +232,8 @@ def __connect(self, broker) -> bool:
login_method=broker.login_method,
virtual_host=vhost,
ssl=(broker.url.scheme[-1] == 's'))
self.connection_id = str(uuid.uuid4()) + ("_sub" if self.is_subscriber else "_pub")
self.broker = host + '/' + vhost
if hasattr(self.connection, 'connect'):
# check for amqp 1.3.3 and 1.4.9 because connect doesn't exist in those older versions
self.connection.connect()
Expand Down Expand Up @@ -315,7 +326,6 @@ def getSetup(self) -> None:
if message_strategy is stubborn, will loop here forever.
connect, declare queue, apply bindings.
"""

ebo = 1
original_sigint = signal.getsignal(signal.SIGINT)
original_sigterm = signal.getsignal(signal.SIGINT)
Expand Down Expand Up @@ -574,41 +584,60 @@ def getNewMessage(self) -> sarracenia.Message:
def ack(self, m: sarracenia.Message) -> None:
"""
do what you need to acknowledge that processing of a message is done.
NOTE: AMQP delivery tags (we call them ack_id) are scoped per channel. "Deliveries must be
acknowledged on the same channel they were received on. Acknowledging on a different channel
will result in an "unknown delivery tag" protocol exception and close the channel."
"""
if not self.is_subscriber: #build_consumer
logger.error("getting from a publisher")
return
return False


# silent success. retry messages will not have an ack_id, and so will not require acknowledgement.
if not 'ack_id' in m:
#logger.warning( f"no ackid present" )
return

return True

# when the connection/channel/broker doesn't match the current, don't attempt to ack, it will fail
if (m['ack_id']['connection_id'] != self.connection_id
or m['ack_id']['channel_id'] != self.channel.channel_id
or m['ack_id']['broker'] != self.broker):
logger.warning(f"failed for {m['ack_id']}. Does not match the current connection {self.connection_id}," +
f" channel {self.channel.channel_id} or broker {self.broker}")
del m['ack_id']
m['_deleteOnPost'].remove('ack_id')
return False

#logger.info( f"acknowledging {m['ack_id']}" )
ebo = 1
while True:
try:
if hasattr(self, 'channel'):
self.channel.basic_ack(m['ack_id'])
del m['ack_id']
m['_deleteOnPost'].remove('ack_id')
# Break loop if no exceptions encountered
return

self.channel.basic_ack(m['ack_id']['delivery_tag'])
del m['ack_id']
m['_deleteOnPost'].remove('ack_id')
return True
else:
logger.warning(f"Can't ack {m['ack_id']}, don't have a channel")
del m['ack_id']
m['_deleteOnPost'].remove('ack_id')
return False

except Exception as err:
logger.warning("failed for tag: %s: %s" % (m['ack_id'], err))
logger.debug('Exception details: ', exc_info=True)

# Cleanly close partially broken connection and restablish
self.close()
self.getSetup()

if ebo < 60: ebo *= 2

logger.info(
"Sleeping {} seconds before re-trying ack...".format(ebo))
if type(err) == BrokenPipeError or type(err) == ConnectionResetError:
# Cleanly close partially broken connection
self.close()
# No point in trying to ack again if the connection is broken
del m['ack_id']
m['_deleteOnPost'].remove('ack_id')
return False

if ebo < 60:
ebo *= 2
logger.info("Sleeping {} seconds before re-trying ack...".format(ebo))
time.sleep(ebo)
# TODO maybe implement message strategy stubborn here and give up after retrying?

def putNewMessage(self,
message: sarracenia.Message,
Expand Down Expand Up @@ -770,3 +799,5 @@ def close(self) -> None:
# FIXME toclose not useful as we don't close channels anymore
self.metricsDisconnect()
self.connection = None
self.connection_id = None
self.broker = None
1 change: 1 addition & 0 deletions sarracenia/moth/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,7 @@ def ack(self, m: sarracenia.Message ) -> None:
self.client.ack( m['ack_id'], m['qos'] )
del m['ack_id']
m['_deleteOnPost'].remove('ack_id')
return True

def putNewMessage(self,
message: sarracenia.Message,
Expand Down
Loading