Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue339 making a new feature branch that will contain stuff as it matures... #1153

Merged
merged 9 commits into from
Aug 7, 2024
6 changes: 3 additions & 3 deletions sarracenia/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1360,7 +1360,7 @@ def _parse_binding(self, subtopic_string):
subtopic = subtopic_string.split('/')

if hasattr(self, 'exchange') and hasattr(self, 'topicPrefix'):
self.bindings.append((self.exchange, self.topicPrefix, subtopic))
self.bindings.append((self.broker, self.exchange, self.topicPrefix, subtopic))

def _parse_v2plugin(self, entryPoint, value):
"""
Expand Down Expand Up @@ -1998,7 +1998,7 @@ def finalize(self, component=None, config=None):


if (self.bindings == [] and hasattr(self, 'exchange')):
self.bindings = [(self.exchange, self.topicPrefix, [ '#' ])]
self.bindings = [(self.broker, self.exchange, self.topicPrefix, [ '#' ])]

if hasattr(self, 'documentRoot') and (self.documentRoot is not None):
path = os.path.expanduser(os.path.abspath(self.documentRoot))
Expand Down Expand Up @@ -2433,7 +2433,7 @@ def __call__(self, parser, namespace, values, option_string):
topicPrefix = namespace.topicPrefix.split('/')

namespace.bindings.append(
(namespace.exchange, topicPrefix, values))
(namespace.broker, namespace.exchange, topicPrefix, values))

def parse_args(self, isPost=False):
"""
Expand Down
39 changes: 22 additions & 17 deletions sarracenia/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,25 +104,10 @@ def __init__(self, urlstr=None):
self.azure_credentials = None
self.implicit_ftps = False

def __str__(self):
def __str__(self) -> str:
"""Returns attributes of the Credential object as a readable string.
"""

s = ''
if False:
s += self.url.geturl()
else:
s += self.url.scheme + '://'
if self.url.username:
s += self.url.username
#if self.url.password:
# s += ':' + self.url.password
if self.url.hostname:
s += '@' + self.url.hostname
if self.url.port:
s += ':' + str(self.url.port)
if self.url.path:
s += self.url.path
s = self.geturl()

s += " %s" % self.ssh_keyfile
s += " %s" % self.passive
Expand All @@ -139,6 +124,24 @@ def __str__(self):

return s

def geturl(self) -> str:
if not hasattr(self,'url'):
return ''

s=''
s += self.url.scheme + '://'
if self.url.username:
s += self.url.username
#if self.url.password:
# s += ':' + self.url.password
if self.url.hostname:
s += '@' + self.url.hostname
if self.url.port:
s += ':' + str(self.url.port)
if self.url.path:
s += self.url.path
return s


class CredentialDB:
"""Parses, stores and manages Credential objects.
Expand Down Expand Up @@ -242,6 +245,8 @@ def get(self, urlstr):
k=urlstr
return False, self.credentials[k]



def has(self, urlstr):
"""Return ``True`` if the Credential matching the urlstr is already in the CredentialDB.

Expand Down
3 changes: 2 additions & 1 deletion sarracenia/examples/flow_api_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
cfg.component = 'subscribe'
cfg.config = 'flow_demo'
cfg.action = 'hoho'
cfg.bindings = [('xpublic', ['v02', 'post'],
cfg.bindings = [( 'xpublic', ['v02', 'post'],
['*', 'WXO-DD', 'observations', 'swob-ml', '#'])]
cfg.queueName = 'q_anonymous.subscriber_test2'
#cfg.debug = True
cfg.download = True
cfg.batch = 1
cfg.messageCountMax = 5
Expand Down
88 changes: 65 additions & 23 deletions sarracenia/flowcb/gather/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,54 +23,96 @@ def __init__(self, options) -> None:
self.od = sarracenia.moth.default_options
self.od.update(self.o.dictify())

if hasattr(self.o, 'broker') and self.o.broker:
self.consumer = sarracenia.moth.Moth.subFactory(self.od)
else:
if not hasattr(self.o, 'broker') or not self.o.broker:
logger.critical('missing required broker specification')
return

if not hasattr(self.o, 'bindings') or not self.o.bindings:
logger.critical('missing required bindings (exchange,subtopic) for broker')
return

self.brokers=[]
for binding in self.o.bindings:
if len(binding) >= 4 and binding[0] not in self.brokers:
self.brokers.append(binding[0])

if len(self.brokers) == 0:
self.brokers=[ self.o.broker ]

self.consumers={}
for broker in self.brokers:
self.od['broker']=broker
self.consumers[broker] = sarracenia.moth.Moth.subFactory(self.od)

def gather(self, messageCountMax) -> list:
"""
return:
True ... you can gather from other sources. and:
a list of messages obtained from this source.
"""
if hasattr(self,'consumer') and hasattr(self.consumer,'newMessages'):
return (True, self.consumer.newMessages())
else:
logger.warning( f'not connected. Trying to connect to {self.o.broker}')
self.consumer = sarracenia.moth.Moth.subFactory(self.od)
new_messages=[]
if not hasattr(self,'consumers'):
return (True, [])

for broker in self.brokers:
if broker in self.consumers and hasattr(self.consumers[broker],'newMessages'):
new_messages.extend(self.consumers[broker].newMessages())
else:
logger.warning( f'not connected. Trying to connect to {broker}')
self.od['broker']=broker
self.consumers[broker] = sarracenia.moth.Moth.subFactory(self.od)

return (True, new_messages)

def ack(self, mlist) -> None:

if not hasattr(self,'consumer'):
if not hasattr(self,'consumers'):
return

for m in mlist:
if not 'broker' in m:
logger.error( f"cannot ack, missing broker in {m}" )
continue

if not m['broker'] in self.consumers:
logger.error( f"cannot ack, no consumer for {m['broker'].geturl()}" )
continue

# messages being re-downloaded should not be re-acked, but they won't have an ack_id (see issue #466)
self.consumer.ack(m)
if hasattr(self.consumers[m['broker']],'ack'):
self.consumers[m['broker']].ack(m)
else:
logger.error( f"cannot ack" )

def metricsReport(self) -> dict:
if hasattr(self,'consumer') and hasattr(self.consumer,'metricsReport'):
return self.consumer.metricsReport()
else:

if not hasattr(self,'consumers'):
return {}

metrics={}
for broker in self.brokers:
if hasattr(self.consumers[broker],'metricsReport'):
metrics[broker.geturl()] = self.consumers[broker].metricsReport()
return metrics

def on_housekeeping(self) -> None:

if not hasattr(self,'consumer'):
if not hasattr(self,'consumers'):
return

if hasattr(self.consumer, 'metricsReport'):
m = self.consumer.metricsReport()
average = (m['rxByteCount'] /
m['rxGoodCount'] if m['rxGoodCount'] != 0 else 0)
logger.info( f"messages: good: {m['rxGoodCount']} bad: {m['rxBadCount']} " +\
f"bytes: {naturalSize(m['rxByteCount'])} " +\
m = self.metricsReport()
for broker in self.brokers:
burl=broker.geturl()
average = (m[burl]['rxByteCount'] /
m[burl]['rxGoodCount'] if m[burl]['rxGoodCount'] != 0 else 0)
logger.info( f"{burl} messages: good: {m[burl]['rxGoodCount']} bad: {m[burl]['rxBadCount']} " +\
f"bytes: {naturalSize(m[burl]['rxByteCount'])} " +\
f"average: {naturalSize(average)}" )
self.consumer.metricsReset()
self.consumers[broker].metricsReset()

def on_stop(self) -> None:
if hasattr(self,'consumer') and hasattr(self.consumer, 'close'):
self.consumer.close()
if hasattr(self,'consumers'):
for broker in self.brokers:
if hasattr(self.consumers[broker], 'close'):
self.consumers[broker].close()
logger.info('closing')
11 changes: 11 additions & 0 deletions sarracenia/flowcb/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ def after_accept(self, worklist) -> None:
if len(mlist) > 0:
worklist.incoming.extend(mlist)

def clean_messages(self,worklist):
for m in worklist.failed:
# json cannot serialize Credential... so just remove it prior to storing it.
if 'broker' in m:
del m['broker']
if 'ack_id' in m:
logger.error( f"putting unacked message {m.getIDStr()} in retry, will never be acked." )
del m['ack_id']

def after_work(self, worklist) -> None:
"""
Messages in `worklist.failed` should be put in the download retry queue. If there are only a few new
Expand All @@ -123,6 +132,7 @@ def after_work(self, worklist) -> None:

if len(worklist.failed) != 0:
logger.debug( f"putting {len(worklist.failed)} messages into {self.download_retry_name}" )
self.clean_messages(worklist)
self.download_retry.put(worklist.failed)
worklist.failed = []

Expand Down Expand Up @@ -151,6 +161,7 @@ def after_post(self, worklist) -> None:
if not features['retry']['present'] :
return

self.clean_messages(worklist)
self.post_retry.put(worklist.failed)
worklist.failed=[]

Expand Down
17 changes: 13 additions & 4 deletions sarracenia/moth/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,6 @@ def _msgRawToDict(self, raw_msg) -> sarracenia.Message:
logger.info('had no delivery info')
logger.info('raw message end')



if type(body) is bytes:
try:
body = raw_msg.body.decode("utf8")
Expand Down Expand Up @@ -142,9 +140,10 @@ def _msgRawToDict(self, raw_msg) -> sarracenia.Message:
msg.deriveSource( self.o )
msg.deriveTopics( self.o, topic )

msg['broker'] = self.o['broker']
msg['ack_id'] = raw_msg.delivery_info['delivery_tag']
msg['local_offset'] = 0
msg['_deleteOnPost'] |= set( ['ack_id', 'exchange', 'local_offset', 'subtopic'])
msg['_deleteOnPost'] |= set( ['ack_id', 'broker', 'exchange', 'local_offset', 'subtopic'])
if not msg.validate():
if hasattr(self,'channel'):
self.channel.basic_ack(msg['ack_id'])
Expand Down Expand Up @@ -359,7 +358,17 @@ def getSetup(self) -> None:

if self.o['queueBind'] and self.o['queueName']:
for tup in self.o['bindings']:
exchange, prefix, subtopic = tup
if len(tup) == 4:
broker, exchange, prefix, subtopic = tup
elif len(tup) == 3:
exchange, prefix, subtopic = tup
broker = self.o['broker']
else:
logger.critical( f"binding \"{tup}\" should be a list of tuples ( broker, exchange, prefix, subtopic )" )
continue
logger.critical( f"{broker=} {self.o['broker']=} ")
if broker != self.o['broker']:
continue
topic = '.'.join(prefix + subtopic)
if self.o['dry_run']:
logger.info('binding (dry run) %s with %s to %s (as: %s)' % \
Expand Down
15 changes: 12 additions & 3 deletions sarracenia/moth/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,16 @@ def __sub_on_connect(client, userdata, flags, reason_code, properties=None):
if 'topic' in userdata.o:
subj=userdata.o['topic']
else:
exchange, prefix, subtopic = binding_tuple
logger.info( f"tuple: {exchange} {prefix} {subtopic}")
if len(binding_tuple) == 4:
broker, exchange, prefix, subtopic = binding_tuple
elif len(binding_tuple) == 3:
exchange, prefix, subtopic = binding_tuple
broker = userdata.o['broker']
else:
logger.critical( f"invalid binding: \"{binding_tuple}\" should be a tuple containing ( broker, exchange, topicPrefix, subtopic )" )
continue

logger.info( f"tuple: {broker} {exchange} {prefix} {subtopic}")

subj = '/'.join(['$share', userdata.o['queueName'], exchange] +
prefix + subtopic)
Expand Down Expand Up @@ -590,10 +598,11 @@ def _msgDecode(self, mqttMessage) -> sarracenia.Message:
message.deriveSource( self.o )
message.deriveTopics( self.o, topic=mqttMessage.topic, separator='/' )

message['broker'] = self.o['broker']
message['ack_id'] = mqttMessage.mid
message['qos'] = mqttMessage.qos
message['local_offset'] = 0
message['_deleteOnPost'] |= set( ['exchange', 'local_offset', 'ack_id', 'qos' ])
message['_deleteOnPost'] |= set( ['ack_id', 'broker', 'exchange', 'local_offset', 'qos' ])

self.metrics['rxLast'] = sarracenia.nowstr()
if message.validate():
Expand Down
Loading