diff --git a/sarracenia/config.py b/sarracenia/config.py index ab9a62837..b4ec9126b 100755 --- a/sarracenia/config.py +++ b/sarracenia/config.py @@ -1360,7 +1360,7 @@ def _parse_binding(self, subtopic_string): subtopic = subtopic_string.split('/') if hasattr(self, 'exchange') and hasattr(self, 'topicPrefix'): - self.bindings.append((self.exchange, self.topicPrefix, subtopic)) + self.bindings.append((self.broker, self.exchange, self.topicPrefix, subtopic)) def _parse_v2plugin(self, entryPoint, value): """ @@ -1998,7 +1998,7 @@ def finalize(self, component=None, config=None): if (self.bindings == [] and hasattr(self, 'exchange')): - self.bindings = [(self.exchange, self.topicPrefix, [ '#' ])] + self.bindings = [(self.broker, self.exchange, self.topicPrefix, [ '#' ])] if hasattr(self, 'documentRoot') and (self.documentRoot is not None): path = os.path.expanduser(os.path.abspath(self.documentRoot)) @@ -2433,7 +2433,7 @@ def __call__(self, parser, namespace, values, option_string): topicPrefix = namespace.topicPrefix.split('/') namespace.bindings.append( - (namespace.exchange, topicPrefix, values)) + (namespace.broker, namespace.exchange, topicPrefix, values)) def parse_args(self, isPost=False): """ diff --git a/sarracenia/credentials.py b/sarracenia/credentials.py index e51f402a7..140fc46b5 100755 --- a/sarracenia/credentials.py +++ b/sarracenia/credentials.py @@ -104,25 +104,10 @@ def __init__(self, urlstr=None): self.azure_credentials = None self.implicit_ftps = False - def __str__(self): + def __str__(self) -> str: """Returns attributes of the Credential object as a readable string. """ - - s = '' - if False: - s += self.url.geturl() - else: - s += self.url.scheme + '://' - if self.url.username: - s += self.url.username - #if self.url.password: - # s += ':' + self.url.password - if self.url.hostname: - s += '@' + self.url.hostname - if self.url.port: - s += ':' + str(self.url.port) - if self.url.path: - s += self.url.path + s = self.geturl() s += " %s" % self.ssh_keyfile s += " %s" % self.passive @@ -139,6 +124,24 @@ def __str__(self): return s + def geturl(self) -> str: + if not hasattr(self,'url'): + return '' + + s='' + s += self.url.scheme + '://' + if self.url.username: + s += self.url.username + #if self.url.password: + # s += ':' + self.url.password + if self.url.hostname: + s += '@' + self.url.hostname + if self.url.port: + s += ':' + str(self.url.port) + if self.url.path: + s += self.url.path + return s + class CredentialDB: """Parses, stores and manages Credential objects. @@ -242,6 +245,8 @@ def get(self, urlstr): k=urlstr return False, self.credentials[k] + + def has(self, urlstr): """Return ``True`` if the Credential matching the urlstr is already in the CredentialDB. diff --git a/sarracenia/examples/flow_api_consumer.py b/sarracenia/examples/flow_api_consumer.py index 107443b23..9e7c1794b 100644 --- a/sarracenia/examples/flow_api_consumer.py +++ b/sarracenia/examples/flow_api_consumer.py @@ -12,9 +12,10 @@ cfg.component = 'subscribe' cfg.config = 'flow_demo' cfg.action = 'hoho' -cfg.bindings = [('xpublic', ['v02', 'post'], +cfg.bindings = [( 'xpublic', ['v02', 'post'], ['*', 'WXO-DD', 'observations', 'swob-ml', '#'])] cfg.queueName = 'q_anonymous.subscriber_test2' +#cfg.debug = True cfg.download = True cfg.batch = 1 cfg.messageCountMax = 5 diff --git a/sarracenia/flowcb/gather/message.py b/sarracenia/flowcb/gather/message.py index f6fcea4bb..38b94cfa8 100755 --- a/sarracenia/flowcb/gather/message.py +++ b/sarracenia/flowcb/gather/message.py @@ -23,10 +23,26 @@ def __init__(self, options) -> None: self.od = sarracenia.moth.default_options self.od.update(self.o.dictify()) - if hasattr(self.o, 'broker') and self.o.broker: - self.consumer = sarracenia.moth.Moth.subFactory(self.od) - else: + if not hasattr(self.o, 'broker') or not self.o.broker: logger.critical('missing required broker specification') + return + + if not hasattr(self.o, 'bindings') or not self.o.bindings: + logger.critical('missing required bindings (exchange,subtopic) for broker') + return + + self.brokers=[] + for binding in self.o.bindings: + if len(binding) >= 4 and binding[0] not in self.brokers: + self.brokers.append(binding[0]) + + if len(self.brokers) == 0: + self.brokers=[ self.o.broker ] + + self.consumers={} + for broker in self.brokers: + self.od['broker']=broker + self.consumers[broker] = sarracenia.moth.Moth.subFactory(self.od) def gather(self, messageCountMax) -> list: """ @@ -34,43 +50,69 @@ def gather(self, messageCountMax) -> list: True ... you can gather from other sources. and: a list of messages obtained from this source. """ - if hasattr(self,'consumer') and hasattr(self.consumer,'newMessages'): - return (True, self.consumer.newMessages()) - else: - logger.warning( f'not connected. Trying to connect to {self.o.broker}') - self.consumer = sarracenia.moth.Moth.subFactory(self.od) + new_messages=[] + if not hasattr(self,'consumers'): return (True, []) + for broker in self.brokers: + if broker in self.consumers and hasattr(self.consumers[broker],'newMessages'): + new_messages.extend(self.consumers[broker].newMessages()) + else: + logger.warning( f'not connected. Trying to connect to {broker}') + self.od['broker']=broker + self.consumers[broker] = sarracenia.moth.Moth.subFactory(self.od) + + return (True, new_messages) + def ack(self, mlist) -> None: - if not hasattr(self,'consumer'): + if not hasattr(self,'consumers'): return for m in mlist: + if not 'broker' in m: + logger.error( f"cannot ack, missing broker in {m}" ) + continue + + if not m['broker'] in self.consumers: + logger.error( f"cannot ack, no consumer for {m['broker'].geturl()}" ) + continue + # messages being re-downloaded should not be re-acked, but they won't have an ack_id (see issue #466) - self.consumer.ack(m) + if hasattr(self.consumers[m['broker']],'ack'): + self.consumers[m['broker']].ack(m) + else: + logger.error( f"cannot ack" ) def metricsReport(self) -> dict: - if hasattr(self,'consumer') and hasattr(self.consumer,'metricsReport'): - return self.consumer.metricsReport() - else: + + if not hasattr(self,'consumers'): return {} + metrics={} + for broker in self.brokers: + if hasattr(self.consumers[broker],'metricsReport'): + metrics[broker.geturl()] = self.consumers[broker].metricsReport() + return metrics + def on_housekeeping(self) -> None: - if not hasattr(self,'consumer'): + if not hasattr(self,'consumers'): return - if hasattr(self.consumer, 'metricsReport'): - m = self.consumer.metricsReport() - average = (m['rxByteCount'] / - m['rxGoodCount'] if m['rxGoodCount'] != 0 else 0) - logger.info( f"messages: good: {m['rxGoodCount']} bad: {m['rxBadCount']} " +\ - f"bytes: {naturalSize(m['rxByteCount'])} " +\ + m = self.metricsReport() + for broker in self.brokers: + burl=broker.geturl() + average = (m[burl]['rxByteCount'] / + m[burl]['rxGoodCount'] if m[burl]['rxGoodCount'] != 0 else 0) + logger.info( f"{burl} messages: good: {m[burl]['rxGoodCount']} bad: {m[burl]['rxBadCount']} " +\ + f"bytes: {naturalSize(m[burl]['rxByteCount'])} " +\ f"average: {naturalSize(average)}" ) - self.consumer.metricsReset() + self.consumers[broker].metricsReset() def on_stop(self) -> None: - if hasattr(self,'consumer') and hasattr(self.consumer, 'close'): - self.consumer.close() + if hasattr(self,'consumers'): + for broker in self.brokers: + if hasattr(self.consumers[broker], 'close'): + self.consumers[broker].close() logger.info('closing') diff --git a/sarracenia/flowcb/retry.py b/sarracenia/flowcb/retry.py index ca4bef880..a868f5025 100755 --- a/sarracenia/flowcb/retry.py +++ b/sarracenia/flowcb/retry.py @@ -113,6 +113,15 @@ def after_accept(self, worklist) -> None: if len(mlist) > 0: worklist.incoming.extend(mlist) + def clean_messages(self,worklist): + for m in worklist.failed: + # json cannot serialize Credential... so just remove it prior to storing it. + if 'broker' in m: + del m['broker'] + if 'ack_id' in m: + logger.error( f"putting unacked message {m.getIDStr()} in retry, will never be acked." ) + del m['ack_id'] + def after_work(self, worklist) -> None: """ Messages in `worklist.failed` should be put in the download retry queue. If there are only a few new @@ -123,6 +132,7 @@ def after_work(self, worklist) -> None: if len(worklist.failed) != 0: logger.debug( f"putting {len(worklist.failed)} messages into {self.download_retry_name}" ) + self.clean_messages(worklist) self.download_retry.put(worklist.failed) worklist.failed = [] @@ -151,6 +161,7 @@ def after_post(self, worklist) -> None: if not features['retry']['present'] : return + self.clean_messages(worklist) self.post_retry.put(worklist.failed) worklist.failed=[] diff --git a/sarracenia/moth/amqp.py b/sarracenia/moth/amqp.py index 2c9429a67..a6b536adc 100755 --- a/sarracenia/moth/amqp.py +++ b/sarracenia/moth/amqp.py @@ -112,8 +112,6 @@ def _msgRawToDict(self, raw_msg) -> sarracenia.Message: logger.info('had no delivery info') logger.info('raw message end') - - if type(body) is bytes: try: body = raw_msg.body.decode("utf8") @@ -142,9 +140,10 @@ def _msgRawToDict(self, raw_msg) -> sarracenia.Message: msg.deriveSource( self.o ) msg.deriveTopics( self.o, topic ) + msg['broker'] = self.o['broker'] msg['ack_id'] = raw_msg.delivery_info['delivery_tag'] msg['local_offset'] = 0 - msg['_deleteOnPost'] |= set( ['ack_id', 'exchange', 'local_offset', 'subtopic']) + msg['_deleteOnPost'] |= set( ['ack_id', 'broker', 'exchange', 'local_offset', 'subtopic']) if not msg.validate(): if hasattr(self,'channel'): self.channel.basic_ack(msg['ack_id']) @@ -359,7 +358,17 @@ def getSetup(self) -> None: if self.o['queueBind'] and self.o['queueName']: for tup in self.o['bindings']: - exchange, prefix, subtopic = tup + if len(tup) == 4: + broker, exchange, prefix, subtopic = tup + elif len(tup) == 3: + exchange, prefix, subtopic = tup + broker = self.o['broker'] + else: + logger.critical( f"binding \"{tup}\" should be a list of tuples ( broker, exchange, prefix, subtopic )" ) + continue + logger.critical( f"{broker=} {self.o['broker']=} ") + if broker != self.o['broker']: + continue topic = '.'.join(prefix + subtopic) if self.o['dry_run']: logger.info('binding (dry run) %s with %s to %s (as: %s)' % \ diff --git a/sarracenia/moth/mqtt.py b/sarracenia/moth/mqtt.py index d107a46a3..9fe30af19 100755 --- a/sarracenia/moth/mqtt.py +++ b/sarracenia/moth/mqtt.py @@ -213,8 +213,16 @@ def __sub_on_connect(client, userdata, flags, reason_code, properties=None): if 'topic' in userdata.o: subj=userdata.o['topic'] else: - exchange, prefix, subtopic = binding_tuple - logger.info( f"tuple: {exchange} {prefix} {subtopic}") + if len(binding_tuple) == 4: + broker, exchange, prefix, subtopic = binding_tuple + elif len(binding_tuple) == 3: + exchange, prefix, subtopic = binding_tuple + broker = userdata.o['broker'] + else: + logger.critical( f"invalid binding: \"{binding_tuple}\" should be a tuple containing ( broker, exchange, topicPrefix, subtopic )" ) + continue + + logger.info( f"tuple: {broker} {exchange} {prefix} {subtopic}") subj = '/'.join(['$share', userdata.o['queueName'], exchange] + prefix + subtopic) @@ -590,10 +598,11 @@ def _msgDecode(self, mqttMessage) -> sarracenia.Message: message.deriveSource( self.o ) message.deriveTopics( self.o, topic=mqttMessage.topic, separator='/' ) + message['broker'] = self.o['broker'] message['ack_id'] = mqttMessage.mid message['qos'] = mqttMessage.qos message['local_offset'] = 0 - message['_deleteOnPost'] |= set( ['exchange', 'local_offset', 'ack_id', 'qos' ]) + message['_deleteOnPost'] |= set( ['ack_id', 'broker', 'exchange', 'local_offset', 'qos' ]) self.metrics['rxLast'] = sarracenia.nowstr() if message.validate():