From deab96ac56194be859afa33eb47906424f17b942 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Fri, 2 Feb 2024 09:01:17 -0500 Subject: [PATCH 01/30] increment version away from previous release --- debian/changelog | 6 ++++++ sarracenia/_version.py | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/debian/changelog b/debian/changelog index b178c3c2b..a1854f05c 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +metpx-sr3 (3.00.51rc8) UNRELEASED; urgency=medium + + * + + -- peter Fri, 02 Feb 2024 09:00:18 -0500 + metpx-sr3 (3.00.51rc7) unstable; urgency=medium * PR #910 detect v02 messages without content_type header. diff --git a/sarracenia/_version.py b/sarracenia/_version.py index 20a58355e..194546e4e 100755 --- a/sarracenia/_version.py +++ b/sarracenia/_version.py @@ -1 +1 @@ -__version__ = "3.00.51rc7" +__version__ = "3.00.51rc8" From 0c3b3914abdeff1e2240d6675f342905efa4f29a Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Fri, 2 Feb 2024 14:49:47 +0000 Subject: [PATCH 02/30] fix #917 document inlineEncoding --- docs/source/Reference/sr3_options.7.rst | 12 +++++++++++- docs/source/fr/Reference/sr3_options.7.rst | 12 ++++++++++++ sarracenia/config.py | 7 ++++++- 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/docs/source/Reference/sr3_options.7.rst b/docs/source/Reference/sr3_options.7.rst index 0520c2cd4..d6a403943 100644 --- a/docs/source/Reference/sr3_options.7.rst +++ b/docs/source/Reference/sr3_options.7.rst @@ -941,7 +941,17 @@ will not be posted. inlineByteMax -------------------- -the maximums size of messages to inline. +The maximum size of messages to inline. + +inlineEncoding text|binary|guess (default: guess) +_________________________________________________ + +when inlining file content, what sort of encoding should be done? Three choices: + + * text: the file content is assumed to be utf-8 text and encoded as such. + * binary: the file content is unconditionally converted to base64 binary encoding. + * guess: try making text, if that fails fall back to binary. + inlineOnly ---------- diff --git a/docs/source/fr/Reference/sr3_options.7.rst b/docs/source/fr/Reference/sr3_options.7.rst index e5ae1dd68..6fe44b5d5 100644 --- a/docs/source/fr/Reference/sr3_options.7.rst +++ b/docs/source/fr/Reference/sr3_options.7.rst @@ -926,6 +926,18 @@ inlineByteMax la taille maximale des fichiers dont le contenu est à inclure dans un messages d'annonce (envoyé inline.) + + +inlineEncoding text|binary|guess (défaut: guess) +_________________________________________________ + +Quand on inclut les données dans le message on l'encode dans un format choisi: + + * text: le fichier doit être du utf-8 valide (ou érreur.) + * binary: le fichier peut être encodé n'importe comment, il sera en format base64. + * guess: essaie le format text en premier, utilise binary en cas d'erreur. + + inlineOnly ---------- ignorer les messages d´annonce si les données ne sont pas inline. diff --git a/sarracenia/config.py b/sarracenia/config.py index b26973392..1b2019dba 100755 --- a/sarracenia/config.py +++ b/sarracenia/config.py @@ -163,7 +163,8 @@ def __repr__(self) -> str: str_options = [ 'action', 'admin', 'baseDir', 'broker', 'cluster', 'directory', 'exchange', - 'exchange_suffix', 'feeder', 'filename', 'flatten', 'flowMain', 'header', 'identity', 'logLevel', + 'exchange_suffix', 'feeder', 'filename', 'flatten', 'flowMain', 'header', 'identity', + 'inlineEncoding', 'logLevel', 'pollUrl', 'post_baseUrl', 'post_baseDir', 'post_broker', 'post_exchange', 'post_exchangeSuffix', 'post_format', 'post_topic', 'queueName', 'sendTo', 'rename', 'report_exchange', 'source', 'strip', 'timezone', 'nodupe_ttl', 'nodupe_driver', @@ -1911,6 +1912,10 @@ def finalize(self, component=None, config=None): self._resolve_exchange() self._resolveQueueName(component,cfg) + valid_inlineEncodings = [ 'guess', 'text', 'binary' ] + if hasattr(self, 'inlineEncoding') and self.inlineEncoding not in valid_inlineEncodings: + logger.error( f"invalid inlineEncoding: {self.inlineEncoding} must be one of: {','.join(valid_inlineEncodings)}" ) + if hasattr(self, 'no'): if self.statehost: hostdir = self.hostdir From e51cb1790131dd3d493c6eaa2bd29d458dd5b39b Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Fri, 2 Feb 2024 15:16:03 +0000 Subject: [PATCH 03/30] updating documentation to reflect iso-8859-1 might show up in inline content --- docs/source/Reference/sr_post.7.rst | 7 +++++-- docs/source/fr/Reference/sr_post.7.rst | 13 +++++++------ 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/docs/source/Reference/sr_post.7.rst b/docs/source/Reference/sr_post.7.rst index acf41a2c1..17f70c52f 100644 --- a/docs/source/Reference/sr_post.7.rst +++ b/docs/source/Reference/sr_post.7.rst @@ -129,9 +129,12 @@ The headers are an array of name:value pairs:: "content" - for smaller files, the content may be embedded. { - "encoding" : "utf-8" | "base64" , + "encoding" : "utf-8" | "base64" | "iso-8859-1" , "value" " "encoded file content" - } + } + Note that the iso-8859-1 encoding is only an allowance for legacy data flows. + Should normally not be used. + "contentType" : "string" - MIME-type information referring to the data. For "v03.report" topic notification messages the following addtional diff --git a/docs/source/fr/Reference/sr_post.7.rst b/docs/source/fr/Reference/sr_post.7.rst index 390d4209e..9bbe52ffc 100644 --- a/docs/source/fr/Reference/sr_post.7.rst +++ b/docs/source/fr/Reference/sr_post.7.rst @@ -94,7 +94,6 @@ Les en-têtes sont un tableau de paires nom:valeur:: "type": "Feature" "geometry": RFC 7946 (geoJSON) spécification géographique compatible. - "size" - le nombre d’octets annoncés. "blocks" - si le fichier publié est partitionné, alors : { @@ -117,9 +116,12 @@ Les en-têtes sont un tableau de paires nom:valeur:: "content" - pour les fichiers plus petits, le contenu peut être incorporé. { - "encoding" : "utf-8" | "base64" , + "encoding" : "utf-8" | "base64" | "iso-8859-1" , "value" " "contenu de fichier encodé" - } + } + Noter que iso-8859-1 est là pour des raisons de transmission inaltéré de formats obsolètes. + + "contentType" : "chaine mime-type" - indique le format des données. Pour le messages de thème "v03.report", les en-têtes additionnelles qui suivent seront présents: @@ -128,9 +130,8 @@ Les en-têtes sont un tableau de paires nom:valeur:: "message" : - message de rapport d’état documenté dans `Report Messages`_ } - "contentType" : "chaine mime-type" - indique le format des données. - "type": "Feature" - utilisé pour la compatibilité geoJSON. - "geometry" : ... selon la compatibilité GoJSON RFC7946. + Autres champs optionnels: + des paires supplémentaires nom:valeur définies par l’utilisateur sont autorisées. From 4df6a7547c8777eb56afe74f1e7d26959770e17a Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Fri, 2 Feb 2024 16:41:54 +0000 Subject: [PATCH 04/30] document bufsize setting. --- docs/source/Reference/sr3_options.7.rst | 6 ++++++ docs/source/fr/Reference/sr3_options.7.rst | 4 ++++ 2 files changed, 10 insertions(+) diff --git a/docs/source/Reference/sr3_options.7.rst b/docs/source/Reference/sr3_options.7.rst index d6a403943..f4fed04d5 100644 --- a/docs/source/Reference/sr3_options.7.rst +++ b/docs/source/Reference/sr3_options.7.rst @@ -503,6 +503,12 @@ Once connected to an AMQP broker, the user needs to bind a queue to exchanges and topics to determine the notification messages of interest. +bufsize (default: 1MB) +----------------------------- + +Files will be copied in *bufsize*-byte blocks. for use by transfer protocols. + + byteRateMax (default: 0) -------------------------------- diff --git a/docs/source/fr/Reference/sr3_options.7.rst b/docs/source/fr/Reference/sr3_options.7.rst index 6fe44b5d5..2dc539fdf 100644 --- a/docs/source/fr/Reference/sr3_options.7.rst +++ b/docs/source/fr/Reference/sr3_options.7.rst @@ -491,6 +491,10 @@ L’option broker indique à chaque composant quel courtier contacter. Une fois connecté à un courtier AMQP, l’utilisateur doit lier une fil d’attente aux échanges et aux thèmes pour déterminer le messages d'annonce en question. +bufsize (défaut: 1m) +--------------------------- + +Les fichiers seront copiés en tranches de *bufsize* octets. Utilisé par les protocoles de transfert. byteRateMax (défaut: 0) ------------------------------ From f74419efd558ceebc82e6a353a7742eaef38151c Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Fri, 2 Feb 2024 17:20:31 +0000 Subject: [PATCH 05/30] documenting timezone --- docs/source/Reference/sr3_options.7.rst | 7 +++++++ docs/source/fr/Reference/sr3_options.7.rst | 8 ++++++++ 2 files changed, 15 insertions(+) diff --git a/docs/source/Reference/sr3_options.7.rst b/docs/source/Reference/sr3_options.7.rst index f4fed04d5..487090880 100644 --- a/docs/source/Reference/sr3_options.7.rst +++ b/docs/source/Reference/sr3_options.7.rst @@ -1775,6 +1775,13 @@ The **timeout** option, sets the number of seconds to wait before aborting a connection or download transfer (applied per buffer during transfer). +timezone (default: utc) +-------------------------------- + +Interpret listings from an FTP server as being in the given timezone as per `pytz `_ +Has no effect other than in when polling an FTP server. + + tlsRigour (default: medium) --------------------------- diff --git a/docs/source/fr/Reference/sr3_options.7.rst b/docs/source/fr/Reference/sr3_options.7.rst index 2dc539fdf..3fea3cb6e 100644 --- a/docs/source/fr/Reference/sr3_options.7.rst +++ b/docs/source/fr/Reference/sr3_options.7.rst @@ -1739,6 +1739,14 @@ timeout (défaut: 0) L’option **timeout** définit le nombre de secondes à attendre avant d’interrompre un transfert de connexion ou de téléchargement (appliqué pendant le transfert). +timezone (défaut: utc) +-------------------------------- + +Établir le fuseau horaire pour les dates afficher pour les fichiers sur un serveur FTP. +La valeur est tel que décrit timezone as per `pytz `_ +Seulement actif dans le contexte de sondage de serveur FTP. + + tlsRigour (défaut: medium) -------------------------- From ac6a3a763ea6317d70f75d73afcc2023180860d3 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Fri, 2 Feb 2024 17:34:10 +0000 Subject: [PATCH 06/30] adding string examples to timezone doc --- docs/source/Reference/sr3_options.7.rst | 3 ++- docs/source/fr/Reference/sr3_options.7.rst | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/source/Reference/sr3_options.7.rst b/docs/source/Reference/sr3_options.7.rst index 487090880..4357e8588 100644 --- a/docs/source/Reference/sr3_options.7.rst +++ b/docs/source/Reference/sr3_options.7.rst @@ -1775,10 +1775,11 @@ The **timeout** option, sets the number of seconds to wait before aborting a connection or download transfer (applied per buffer during transfer). -timezone (default: utc) +timezone (default: UTC) -------------------------------- Interpret listings from an FTP server as being in the given timezone as per `pytz `_ +Examples: Canada/Pacific, Pacific/Nauru, Canada/Eastern, Europe/Paris Has no effect other than in when polling an FTP server. diff --git a/docs/source/fr/Reference/sr3_options.7.rst b/docs/source/fr/Reference/sr3_options.7.rst index 3fea3cb6e..102326f77 100644 --- a/docs/source/fr/Reference/sr3_options.7.rst +++ b/docs/source/fr/Reference/sr3_options.7.rst @@ -1739,11 +1739,12 @@ timeout (défaut: 0) L’option **timeout** définit le nombre de secondes à attendre avant d’interrompre un transfert de connexion ou de téléchargement (appliqué pendant le transfert). -timezone (défaut: utc) +timezone (défaut: UTC) -------------------------------- Établir le fuseau horaire pour les dates afficher pour les fichiers sur un serveur FTP. La valeur est tel que décrit timezone as per `pytz `_ +exemples: Canada/Pacific, Pacific/Nauru, Europe/Paris Seulement actif dans le contexte de sondage de serveur FTP. From d2d5284ed84a38b8e08b7691e0ef59b4735b0e57 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Fri, 2 Feb 2024 15:00:07 -0500 Subject: [PATCH 07/30] correct work description for different running components --- sarracenia/flowcb/log.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sarracenia/flowcb/log.py b/sarracenia/flowcb/log.py index 3b134e28a..7350f2f42 100755 --- a/sarracenia/flowcb/log.py +++ b/sarracenia/flowcb/log.py @@ -32,14 +32,14 @@ def __init__(self, options): logger.info(f'{self.o.component} initialized with: logEvents: {self.o.logEvents}, logMessageDump: {self.o.logMessageDump}') if self.o.component in ['sender']: self.action_verb = 'sent' + elif self.o.component in ['subscribe', 'sarra' ]: + self.action_verb = 'downloaded' elif self.o.component in ['post', 'poll', 'watch']: self.action_verb = 'noticed' - elif self.o.component in ['shovel']: - self.action_verb = 'filtered' - elif self.o.component in ['flow']: - self.action_verb = 'flowed' + elif self.o.component in [ 'flow', 'shovel', 'winnow']: + self.action_verb = self.o.component + 'ed' else: - self.action_verb = 'downloaded' + self.action_verb = 'done' self.started = nowflt() self.rxTopicSeparator='.' From fb404603355d7c10cea83ee46a958b1020b5001e Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Fri, 2 Feb 2024 22:18:11 -0500 Subject: [PATCH 08/30] fix #922 avoid mixing up post_exchange with exchange in display --- sarracenia/sr.py | 44 +++++++++++++++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 11 deletions(-) diff --git a/sarracenia/sr.py b/sarracenia/sr.py index 70feae07d..83af8b5e3 100755 --- a/sarracenia/sr.py +++ b/sarracenia/sr.py @@ -749,6 +749,37 @@ def __resolved_exchanges(self, c, cfg, o): exl.append(x) return exl + def __resolved_post_exchanges(self, c, cfg, o): + """ + Guess the name of an exchange. looking at either a direct setting, + or an existing queue state file, or lastly just guess based on conventions. + """ + exl = [] + #if hasattr(o,'declared_exchanges'): + # exl.extend(o.declared_exchanges) + + if hasattr(o, 'post_exchange'): + if type(o.post_exchange) == list: + exl.extend(o.post_exchange) + else: + exl.append(o.post_exchange) + return exl + + x = 'xs_%s' % o.post_broker.url.username + + if hasattr(o, 'post_exchangeSuffix'): + x += '_%s' % o.post_exchangeSuffix + + if hasattr(o, 'post_exchangeSplit'): + l = [] + for i in range(0, o.instances): + y = x + '%02d' % i + l.append(y) + return l + else: + exl.append(x) + return exl + def __guess_queueName(self, c, cfg, o): """ Guess the name of a queue. looking at either a direct setting, @@ -832,17 +863,8 @@ def _resolve_brokers(self): if hasattr(o, 'post_broker') and o.post_broker is not None and o.post_broker.url is not None: host = self._init_broker_host(o.post_broker.url.netloc) - #o.broker = o.post_broker - if hasattr(o, 'post_exchange'): - o.exchange = o.post_exchange - if hasattr(o, 'post_exchangeSplit'): - o.exchangeSplit = o.post_exchangeSplit - if hasattr(o, 'post_exchangeSuffix'): - o.exchangeSuffix = o.post_exchangeSuffix - - xl = self.__resolved_exchanges(c, cfg, o) - - self.configs[c][cfg]['options'].resolved_exchanges = xl + self.configs[c][cfg]['options'].resolved_exchanges = \ + self.__resolved_post_exchanges(c, cfg, o) if hasattr(o, 'post_exchange'): self.brokers[host]['exchange'] = o.post_exchange From c8e566a7dec22754566103cf506b65070f738d16 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Mon, 5 Feb 2024 12:16:20 -0500 Subject: [PATCH 09/30] changing defaults and documenting the consideration --- docs/source/Explanation/CommandLineGuide.rst | 4 +++- docs/source/Explanation/DuplicateSuppression.rst | 5 +++++ docs/source/Reference/sr3_options.7.rst | 12 ++++++++++-- docs/source/fr/Explication/GuideLigneDeCommande.rst | 4 +++- docs/source/fr/Explication/SupprimerLesDoublons.rst | 5 +++++ docs/source/fr/Reference/sr3_options.7.rst | 4 ++++ sarracenia/flow/poll.py | 7 +++++-- 7 files changed, 35 insertions(+), 6 deletions(-) diff --git a/docs/source/Explanation/CommandLineGuide.rst b/docs/source/Explanation/CommandLineGuide.rst index 727812c36..da2abf798 100644 --- a/docs/source/Explanation/CommandLineGuide.rst +++ b/docs/source/Explanation/CommandLineGuide.rst @@ -954,8 +954,10 @@ These options set what files the user wants to be notified for and where - **accept [rename=] (must be set)** - **reject (optional)** - **permDefault (default: 0o400)** -- **nodupe_fileAgeMax (default 30d)** +- **nodupe_fileAgeMax (default: 7h)** +nodupe_fileAgeMax should be less than nodupe_ttl when using duplicate suppression, +to avoid re-ingesting of files that have aged out of the nodupe cache. The option *filename* can be used to set a global rename to the products. Ex.: diff --git a/docs/source/Explanation/DuplicateSuppression.rst b/docs/source/Explanation/DuplicateSuppression.rst index a74d4ebe1..142536204 100644 --- a/docs/source/Explanation/DuplicateSuppression.rst +++ b/docs/source/Explanation/DuplicateSuppression.rst @@ -177,6 +177,11 @@ Files which are older than 600 seconds (10 minutes) will not be considerred for This is usually used with polls that have very long lasting directories on a remote server. example: a remote server has a permanent database of remote files. +It is often the case that nodupe_ttl should be greater than nodupe_fileAgeMax to prevent +files from aging out of the cache before they are considered "too old" and then being +(erroneously) re-ingested. A warning message is emitted if this is the case in a poll +on startup. + Roll Your Own ------------- diff --git a/docs/source/Reference/sr3_options.7.rst b/docs/source/Reference/sr3_options.7.rst index 4357e8588..53fbd2828 100644 --- a/docs/source/Reference/sr3_options.7.rst +++ b/docs/source/Reference/sr3_options.7.rst @@ -1169,12 +1169,17 @@ or: More information: `Duplicate Suppresion <../Explanation/DuplicateSuppression.html>`_ -nodupe_fileAgeMax +nodupe_fileAgeMax ----------------- -If files are older than this setting (default: 30d in poll, 0 in other components), +If files are older than this setting (default: 7h in poll, 0 in other components), then ignore them, they are too old to post. 0 deactivates the setting. +In a Poll: + * default is 7 hours. should be less than nodupe_ttl to prevent re-ingest of duplicate data. + (full discussion here: https://github.com/MetPX/sarracenia/issues/904 ) + + nodupe_fileAgeMin ----------------- @@ -1198,6 +1203,9 @@ dropped, so duplicates separated by a large enough interval will get through. A value of 1d (day) or 1w (week) can be appropriate. Setting the option without specifying a time will result in 300 seconds (or 5 minutes) being the expiry interval. +Default value in a Poll is 8 hours, should be longer than nodupe_fileAgeMax to prevent +re-ingesting files that have aged out of the duplicate suppression cache. + **Use of the cache is incompatible with the default *parts 0* strategy**, one must specify an alternate strategy. One must use either a fixed blocksize, or always never partition files. One must avoid the dynamic algorithm that will change the partition size used as a file grows. diff --git a/docs/source/fr/Explication/GuideLigneDeCommande.rst b/docs/source/fr/Explication/GuideLigneDeCommande.rst index 7121902b5..07abf2d91 100644 --- a/docs/source/fr/Explication/GuideLigneDeCommande.rst +++ b/docs/source/fr/Explication/GuideLigneDeCommande.rst @@ -960,8 +960,10 @@ Ces options définissent les fichiers pour lesquels l’utilisateur souhaite êt - **accept [rename=] (doit être défini)** - **reject (facultatif)** - **permDefault (par défaut: 0o400)** -- **nodupe_fileAgeMax (par défaut 30d)** +- **nodupe_fileAgeMax (par défaut 7h)** +Nodupe_fileAgeMax doit être inférieur à nodupe_ttl lors de l'utilisation de la suppression des doublons, +pour éviter la réingestion de fichiers obsolètes une fois partie du cache nodupe. L’option *filename* peut être utilisée pour définir un changement de nom global pour les produits. Ex.: diff --git a/docs/source/fr/Explication/SupprimerLesDoublons.rst b/docs/source/fr/Explication/SupprimerLesDoublons.rst index 62866a4d8..3bfe95ffc 100644 --- a/docs/source/fr/Explication/SupprimerLesDoublons.rst +++ b/docs/source/fr/Explication/SupprimerLesDoublons.rst @@ -181,6 +181,11 @@ Ceci est généralement utilisé avec des sondages (poll) qui ont des répertoir Exemple : un serveur distant dispose d'une base de données permanente de fichiers distants. ca ne sert à rien de reexaminer de fichiers vieux de deux ans. +Il arrive souvent que nodupe_ttl soit supérieur à nodupe_fileAgeMax pour éviter +les fichiers soient oubliés par la cache avant d'être considérés comme "trop vieux" et ensuite d'être +(à tort) ré-ingéré. Un message d'avertissement est émis si c'est le cas dans un sondage +au démarrage. + A votre gout! diff --git a/docs/source/fr/Reference/sr3_options.7.rst b/docs/source/fr/Reference/sr3_options.7.rst index 102326f77..650db0fa0 100644 --- a/docs/source/fr/Reference/sr3_options.7.rst +++ b/docs/source/fr/Reference/sr3_options.7.rst @@ -1152,6 +1152,10 @@ nodupe_fileAgeMax Si les fichiers sont plus anciens que ce paramètre (défaut: 30d), ignorez-les, ils sont trop ancien pour qu'il puisse être posté. +Dans un Poll : + * La valeur par défaut est 7 heures. doit être inférieur à nodupe_ttl pour empêcher la réabsorption de données en double. + (discussion complète ici (en anglais): https://github.com/MetPX/sarracenia/issues/904) + nodupe_fileAgeMin ----------------- diff --git a/sarracenia/flow/poll.py b/sarracenia/flow/poll.py index 8f99b17c4..30e2760b0 100644 --- a/sarracenia/flow/poll.py +++ b/sarracenia/flow/poll.py @@ -32,8 +32,8 @@ 'randomize': False, 'post_on_start': False, 'sleep': -1, - 'nodupe_ttl': 7 * 60 * 60, - 'nodupe_fileAgeMax': 30 * 24 * 60 * 60, + 'nodupe_ttl': 8 * 60 * 60, + 'nodupe_fileAgeMax': 7 * 60 * 60, } # 'sumflg': 'cod,md5', @@ -74,6 +74,9 @@ def __init__(self, options): self.plugins['load'].insert(0, 'sarracenia.flowcb.post.message.Message') + if self.o.nodupe_ttl < self.o.nodupe_fileAgeMax: + logger.warning( f"nodupe_ttl < nodupe_fileAgeMax means some files could age out of the cache and be re-ingested ( see : https://github.com/MetPX/sarracenia/issues/904") + if not features['ftppoll']['present']: if hasattr( self.o, 'pollUrl' ) and ( self.o.pollUrl.startswith('ftp') ): logger.critical( f"attempting to configure an FTP poll pollUrl={self.o.pollUrl}, but missing python modules: {' '.join(features['ftppoll']['modules_needed'])}" ) From fd8a5d27c00757d44e716d0302f697a64e4b461b Mon Sep 17 00:00:00 2001 From: petersilva Date: Thu, 8 Feb 2024 14:41:29 -0500 Subject: [PATCH 10/30] incrementing version away from previous stable release. --- debian/changelog | 4 ++-- sarracenia/_version.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/debian/changelog b/debian/changelog index a1854f05c..c05534c6a 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,10 +1,10 @@ -metpx-sr3 (3.00.51rc8) UNRELEASED; urgency=medium +metpx-sr3 (3.00.52rc1) UNRELEASED; urgency=medium * -- peter Fri, 02 Feb 2024 09:00:18 -0500 -metpx-sr3 (3.00.51rc7) unstable; urgency=medium +metpx-sr3 (3.00.51) unstable; urgency=medium * PR #910 detect v02 messages without content_type header. * PR #900 major revamp of AM protocol reception. diff --git a/sarracenia/_version.py b/sarracenia/_version.py index 194546e4e..9335bd111 100755 --- a/sarracenia/_version.py +++ b/sarracenia/_version.py @@ -1 +1 @@ -__version__ = "3.00.51rc8" +__version__ = "3.00.52rc1" From 281bd6415a80fa89761d7807fb291ea6bd7f201e Mon Sep 17 00:00:00 2001 From: petersilva Date: Thu, 8 Feb 2024 18:51:47 -0500 Subject: [PATCH 11/30] fix #907 admin declare exchnge --- sarracenia/sr.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/sarracenia/sr.py b/sarracenia/sr.py index 83af8b5e3..42e35e5d2 100755 --- a/sarracenia/sr.py +++ b/sarracenia/sr.py @@ -301,15 +301,6 @@ def _read_configs(self): self.default_cfg = sarracenia.config.default_config() - #self.default_cfg = sarracenia.config.Config() - #if os.path.exists( "default.conf" ): - # self.default_cfg.parse_file("default.conf") - #if os.path.exists( "admin.conf" ): - # self.default_cfg.parse_file("admin.conf") - - #self.admin_cfg = copy.deepcopy( self.default_cfg ) - #if os.path.exists( "admin.conf" ): - # self.admin_cfg.parse_file("admin.conf") os.chdir(self.user_config_dir) for c in self.components: @@ -1361,6 +1352,20 @@ def declare(self): self.default_cfg.declared_users[u_url.username], u_url.username, u_url.password, self.options.dry_run ) + # declare admin exchanges. + if hasattr(self,'default_cfg'): + logger.info( f"Declaring exchnges for admin.conf using {self.default_cfg.admin} ") + if hasattr(self.default_cfg, 'declared_exchanges'): + xdc = sarracenia.moth.Moth.pubFactory( + { + 'broker': self.default_cfg.admin, + 'dry_run': self.options.dry_run, + 'exchange': self.default_cfg.declared_exchanges, + 'message_strategy': { 'stubborn':True } + }) + xdc.putSetup() + xdc.close() + # declare exchanges first. for f in self.filtered_configurations: if self.please_stop: @@ -1426,7 +1431,6 @@ def declare(self): flow.runCallbacksTime('on_declare') del flow flow=None - def disable(self): if len(self.filtered_configurations) == 0: From a3e2ff69d8f1ec01ebe84e09943d9b1c653f09ad Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Wed, 7 Feb 2024 17:47:13 -0500 Subject: [PATCH 12/30] Update subscriber.rst capitalization in opening paragraph. --- docs/source/How2Guides/subscriber.rst | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/source/How2Guides/subscriber.rst b/docs/source/How2Guides/subscriber.rst index 8170a3d8a..1b4f444f0 100644 --- a/docs/source/How2Guides/subscriber.rst +++ b/docs/source/How2Guides/subscriber.rst @@ -30,10 +30,10 @@ The usual intent is to automatically download the data wanted to a directory on a subscriber machine where other software can process it. Please note: -- the tool is entirely command line driven (there is no GUI) More accurately, it is mostly configuration file driven. - most of the *interface* involves using a text editor to modify configuration files. -- while written to be compatible with other environments, the focus is on Linux usage. -- the tool can be used as either an end-user tool, or a system-wide transfer engine. +- The tool is entirely command line driven (there is no GUI) More accurately, it is mostly configuration file driven. + Most of the *interface* involves using a text editor to modify configuration files. +- While written to be compatible with other environments, the focus is on Linux usage. +- The tool can be used as either an end-user tool, or a system-wide transfer engine. This guide is focused on the end-user case. - All documentation of the package is available at https://metpx.github.io/sarracenia From bac7388da0c89237d87e4d604760c497347b735d Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Mon, 12 Feb 2024 10:34:08 -0500 Subject: [PATCH 13/30] Issue920 fix inflight mtime not working. (#926) * fix #920 inflight time was not working * discard events we do not understand. * missed some tuples in return from process_event * replace nowstr() with equivalent but more direct time.time() * changing defaults and documenting the consideration * refactor age settings to put them all in gather, and reduce per message checks. * rename nodupe_fileAge* to remove the nodupe_ prefix... now used in mainline. * incrementing version away from previous stable release. * fix #920 inflight time was not working * discard events we do not understand. * missed some tuples in return from process_event * replace nowstr() with equivalent but more direct time.time() * refactor age settings to put them all in gather, and reduce per message checks. * rename nodupe_fileAge* to remove the nodupe_ prefix... now used in mainline. --------- Co-authored-by: petersilva --- docs/source/Explanation/CommandLineGuide.rst | 12 ++-- docs/source/Explanation/DetectFileReady.rst | 4 +- .../Explanation/DuplicateSuppression.rst | 2 +- docs/source/Explanation/FileCompletion.rst | 4 +- docs/source/How2Guides/UPGRADING.rst | 4 +- docs/source/Reference/sr3_options.7.rst | 10 +-- docs/source/fr/CommentFaire/MiseANiveau.rst | 2 +- .../fr/Explication/AssurerLaLivraison.rst | 4 +- .../fr/Explication/GuideLigneDeCommande.rst | 14 ++-- .../fr/Explication/SupprimerLesDoublons.rst | 2 +- docs/source/fr/Reference/sr3_options.7.rst | 9 +-- .../fr/Tutoriel/1_CLI_introduction.ipynb | 2 +- sarracenia/config.py | 15 ++-- sarracenia/flow/poll.py | 4 +- sarracenia/flowcb/gather/file.py | 70 ++++++++----------- sarracenia/flowcb/nodupe/disk.py | 14 ++-- sarracenia/flowcb/nodupe/redis.py | 14 ++-- sarracenia/flowcb/v2wrapper.py | 2 +- 18 files changed, 90 insertions(+), 98 deletions(-) diff --git a/docs/source/Explanation/CommandLineGuide.rst b/docs/source/Explanation/CommandLineGuide.rst index da2abf798..a1fdde4e4 100644 --- a/docs/source/Explanation/CommandLineGuide.rst +++ b/docs/source/Explanation/CommandLineGuide.rst @@ -333,7 +333,7 @@ View all configuration settings (the result of all parsing... what the flow comp 'message_ttl': 0, 'mirror': True, 'msg_total_interval': '0', - 'nodupe_fileAgeMax': 0, + 'fileAgeMax': 0, 'nodupe_ttl': 0, 'overwrite': True, 'permCopy': True, @@ -954,7 +954,7 @@ These options set what files the user wants to be notified for and where - **accept [rename=] (must be set)** - **reject (optional)** - **permDefault (default: 0o400)** -- **nodupe_fileAgeMax (default: 7h)** +- **fileAgeMax (default 30d)** nodupe_fileAgeMax should be less than nodupe_ttl when using duplicate suppression, to avoid re-ingesting of files that have aged out of the nodupe cache. @@ -1021,12 +1021,12 @@ other nodes participating in the poll, when they don't have the vip, will subscribe to the output of the poll to keep their duplicate suppression caches current. -files that are more than nodupe_fileAgeMax are ignored. However, this +files that are more than fileAgeMax are ignored. However, this can be modified to any specified time limit in the configurations by using -the option *nodupe_fileAgeMax *. By default in components +the option *fileAgeMax *. By default in components other than poll, it is disabled by being set to zero (0). As it is a duration option, units are in seconds by default, but minutes, hours, -days, and weeks, are available. In the poll component, nodupe_fileAgeMax +days, and weeks, are available. In the poll component, fileAgeMax defaults to 30 days. Advanced Polling @@ -1118,7 +1118,7 @@ The notification protocol is defined here `sr_post(7) <../Reference/sr_post.7.ht **poll** connects to a *broker*. Every *sleep* seconds, it connects to a *pollUrl* (sftp, ftp, ftps). For each of the *directory* defined, it lists the contents. Polling is only intended to be used for recently modified -files. The *nodupe_fileAgeMax* option eliminates files that are too old +files. The *fileAgeMax* option eliminates files that are too old from consideration. When a file is found that matches a pattern given by *accept*, **poll** builds a notification message for that product. diff --git a/docs/source/Explanation/DetectFileReady.rst b/docs/source/Explanation/DetectFileReady.rst index 0a401d074..a17e3eec3 100644 --- a/docs/source/Explanation/DetectFileReady.rst +++ b/docs/source/Explanation/DetectFileReady.rst @@ -88,12 +88,12 @@ File Detection Strategy Table |sr_watch with| | | |inflight |Minimum age (modification time) |Last choice, guarantees delay only if | |number |of the file before it is considered |no other method works. | -|(mtime) |complete. | | +|(mtime) |complete. (aka: fileAgeMin) | | | | |Receiving from uncooperative | |Alternate | - Adds delay in every transfer. |sources. | |setting | - Vulnerable to network failures. | | | | - Vulnerable to clock skew. |(ok choice with PDS) | -|nodupe\_\ | | | +| | | | |fileAgeMin | |If a process is re-writing a file | | | |often, can use mtime to smooth out | | | |the i/o pattern, by slowing posts. | diff --git a/docs/source/Explanation/DuplicateSuppression.rst b/docs/source/Explanation/DuplicateSuppression.rst index 142536204..d123b66bf 100644 --- a/docs/source/Explanation/DuplicateSuppression.rst +++ b/docs/source/Explanation/DuplicateSuppression.rst @@ -170,7 +170,7 @@ Files That are Too Old in the configuration file:: - nodupe_fileAgeMax 600 + fileAgeMax 600 Files which are older than 600 seconds (10 minutes) will not be considerred for transfer. diff --git a/docs/source/Explanation/FileCompletion.rst b/docs/source/Explanation/FileCompletion.rst index 2dcb551e6..6b08e4c62 100644 --- a/docs/source/Explanation/FileCompletion.rst +++ b/docs/source/Explanation/FileCompletion.rst @@ -49,7 +49,7 @@ Inflight Table | number |of the file before it is considered |guaranteed delay added | | (mtime) |complete. | | | | |Receiving from uncooperative | -| |Adds delay in every transfer. |sources. | +| fileAgeMin |Adds delay in every transfer. |sources. | | |Vulnerable to network failures. | | | |Vulnerable to clock skew. |(ok choice with PDS) | +-------------+---------------------------------------+--------------------------------------+ @@ -62,7 +62,7 @@ NOTES: On versions of sr_sender prior to 2.18, the default was NONE, but was documented as '.tmp' To ensure compatibility with later versions, it is likely better to explicitly write - the *inflight* setting. + the *inflight* setting. The numeric variant is the same as setting *fileAgeMin* *inflight* was renamed from the old *lock* option in January 2017. For compatibility with older versions, can use *lock*, but name is deprecated. diff --git a/docs/source/How2Guides/UPGRADING.rst b/docs/source/How2Guides/UPGRADING.rst index a4311b7e3..4193a44fb 100644 --- a/docs/source/How2Guides/UPGRADING.rst +++ b/docs/source/How2Guides/UPGRADING.rst @@ -309,7 +309,7 @@ V2 to Sr3 all participants in a vip update ls_files nodes subscribe to the output exchange poll builds strings to describe files poll builds stat(2) like paramiko.SftpAttributes() participants rely on their ls_files for state poll uses flowcb.nodupe module like rest of sr3 - file_time_limit to ignore older files nodupe_fileAgeMax + file_time_limit to ignore older files fileAgeMax *destination* gives where to poll *pollUrl* *directory* gives remote directory to list *path* used like in *post* and *watch* need *accept* per *directory* need only one *accept* @@ -367,7 +367,7 @@ V2 to Sr3 e fileEvents events fileEvents exchange_split exchangeSplit - file_time_limit nodupe_fileAgeMax + file_time_limit fileAgeMax hb_memory_baseline_file MemoryBaseLineFile hb_memory_max MemoryMax hb_memory_multiplier MemoryMultiplier diff --git a/docs/source/Reference/sr3_options.7.rst b/docs/source/Reference/sr3_options.7.rst index 53fbd2828..bcadf113d 100644 --- a/docs/source/Reference/sr3_options.7.rst +++ b/docs/source/Reference/sr3_options.7.rst @@ -912,6 +912,7 @@ can also be specified as a time interval, for example, 10 for 10 seconds. When set to a time interval, file posting process ensures that it waits until the file has not been modified in that interval. So a file will not be processed until it has stayed the same for at least 10 seconds. +This is the same as setting the **fileAgeMin** setting. Lastly, **inflight** can be set to *NONE*, which case the file is written directly with the final name, where the recipient will wait to receive a post notifying it @@ -1169,8 +1170,8 @@ or: More information: `Duplicate Suppresion <../Explanation/DuplicateSuppression.html>`_ -nodupe_fileAgeMax ------------------ +fileAgeMax +---------- If files are older than this setting (default: 7h in poll, 0 in other components), then ignore them, they are too old to post. 0 deactivates the setting. @@ -1179,9 +1180,8 @@ In a Poll: * default is 7 hours. should be less than nodupe_ttl to prevent re-ingest of duplicate data. (full discussion here: https://github.com/MetPX/sarracenia/issues/904 ) - -nodupe_fileAgeMin ------------------ +fileAgeMin +---------- If files are newer than this setting (default: 0), then ignore them, they are too new to post. 0 deactivates the setting. diff --git a/docs/source/fr/CommentFaire/MiseANiveau.rst b/docs/source/fr/CommentFaire/MiseANiveau.rst index e9ce4ba79..15b9c3831 100644 --- a/docs/source/fr/CommentFaire/MiseANiveau.rst +++ b/docs/source/fr/CommentFaire/MiseANiveau.rst @@ -295,7 +295,7 @@ V2 to Sr3 e fileEvents events fileEvents exchange_split exchangeSplit - file_time_limit nodupe_fileAgeMax + file_time_limit fileAgeMax hb_memory_baseline_file MemoryBaseLineFile hb_memory_max MemoryMax hb_memory_multiplier MemoryMultiplier diff --git a/docs/source/fr/Explication/AssurerLaLivraison.rst b/docs/source/fr/Explication/AssurerLaLivraison.rst index b0520e7f6..122ce8502 100644 --- a/docs/source/fr/Explication/AssurerLaLivraison.rst +++ b/docs/source/fr/Explication/AssurerLaLivraison.rst @@ -49,9 +49,9 @@ Tableau de Inflight +-------------+---------------------------------------+--------------------------------------+ | |Âge minimum (temps de modification) |Dernier choix, ne garantit un délai | | entier |du fichier avant que le transfer soit |que si aucun autre moyen peut servir | -| (mtime) |considéré Complèté. | | +| (mtime) |considéré Complèté. (fileAgeMin pareil)| | | | |Réception de ceux qui ne coopèrent pas| -| |Retard tous les avis | | +| fileAgeMin |Retarde tous les avis | | | |Vulnérable aux pannes de réseau. | (choix acceptable pour PDS) | | |Vulnérable aux horloges en désaccord | | +-------------+---------------------------------------+--------------------------------------+ diff --git a/docs/source/fr/Explication/GuideLigneDeCommande.rst b/docs/source/fr/Explication/GuideLigneDeCommande.rst index 07abf2d91..683a35e60 100644 --- a/docs/source/fr/Explication/GuideLigneDeCommande.rst +++ b/docs/source/fr/Explication/GuideLigneDeCommande.rst @@ -334,7 +334,7 @@ Afficher tous les paramètres de configuration (le résultat de toutes les analy 'message_ttl': 0, 'mirror': True, 'msg_total_interval': '0', - 'nodupe_fileAgeMax': 0, + 'fileAgeMax': 0, 'nodupe_ttl': 0, 'overwrite': True, 'permCopy': True, @@ -960,9 +960,9 @@ Ces options définissent les fichiers pour lesquels l’utilisateur souhaite êt - **accept [rename=] (doit être défini)** - **reject (facultatif)** - **permDefault (par défaut: 0o400)** -- **nodupe_fileAgeMax (par défaut 7h)** +- **fileAgeMax (par défaut 30d)** -Nodupe_fileAgeMax doit être inférieur à nodupe_ttl lors de l'utilisation de la suppression des doublons, +fileAgeMax doit être inférieur à nodupe_ttl lors de l'utilisation de la suppression des doublons, pour éviter la réingestion de fichiers obsolètes une fois partie du cache nodupe. L’option *filename* peut être utilisée pour définir un changement de nom global pour les produits. @@ -1026,12 +1026,12 @@ Comme pour tous les autres composants, l’option **vip** peut être utilisée p qu’un poll doit être actif sur seulement un seul nœud d’un cluster. Notez que quand d’autres nœuds participant au poll et wu’ils n’ont pas le vip, ils -les fichiers qui sont plus vieux que nodupe_fileAgeMax sont ignorés. Cela +les fichiers qui sont plus vieux que fileAgeMax sont ignorés. Cela peut être modifié à n’importe quelle limite de temps spécifiée dans les configurations en utilisant -l’option *nodupe_fileAgeMax *. Par défaut, dans les composants +l’option *fileAgeMax *. Par défaut, dans les composants autre que poll, cette option est désactivé en étant défini à zéro (0). Comme il s’agit d’une option de durée, les unités sont en secondes par défaut, mais il est possible de definir l'option -en utilisant des minutes, heures, jours ou des semaines. Dans la composante de poll, nodupe_fileAgeMax +en utilisant des minutes, heures, jours ou des semaines. Dans la composante de poll, fileAgeMax est défini à 30 jours par défaut. Sondage avancé (Advanced Polling) @@ -1120,7 +1120,7 @@ Le protocle de notification est défini ici `sr3_post(7) <../Reference/sr3_post. **poll** se connecte à un *broker*. À toutes les secondes de *sleep*, il se connecte à une *pollUrl* (sftp, ftp, ftps). Pour chacun des *directory* définis, les contenus sont listés. Le poll est seulement destinée à être utilisée pour les fichiers récemment modifiés. -L’option *nodupe_fileAgeMax* élimine les fichiers trop anciens. Lorsqu’un fichier correspondant +L’option *fileAgeMax* élimine les fichiers trop anciens. Lorsqu’un fichier correspondant à un modèle donné est trouvé by *accept*, **poll** crée un message de notification pour ce produit. Le message est ensuite verifié dans la cache dupliqué (limité en temps par l'option diff --git a/docs/source/fr/Explication/SupprimerLesDoublons.rst b/docs/source/fr/Explication/SupprimerLesDoublons.rst index 3bfe95ffc..316fb55bd 100644 --- a/docs/source/fr/Explication/SupprimerLesDoublons.rst +++ b/docs/source/fr/Explication/SupprimerLesDoublons.rst @@ -172,7 +172,7 @@ Fichiers trop vieux dans les fichiers de configuration:: - nodupe_fileAgeMax 600 + fileAgeMax 600 Les messages notificationsfichiers pour des fichiers qui sont agés de plus que 600 secondes (10 minutes) seront supprimés. diff --git a/docs/source/fr/Reference/sr3_options.7.rst b/docs/source/fr/Reference/sr3_options.7.rst index 650db0fa0..80c4dfea9 100644 --- a/docs/source/fr/Reference/sr3_options.7.rst +++ b/docs/source/fr/Reference/sr3_options.7.rst @@ -894,6 +894,7 @@ peut également être spécifié comme une intervalle de temps, par exemple, 10 Lorsque l'option est défini sur une intervalle de temps, le processus de publication de fichiers attends jusqu’à ce que le fichier n’ai pas été modifié pendant cet intervalle. Ainsi, un fichier ne peux pas être traité tant qu’il n’est pas resté le même pendant au moins 10 secondes. +C'est un effet pareil à un choix de valeur pou *fileAgeMin*. Enfin, **inflight** peut être réglé a *NONE*. Dans ce cas, le fichier est écrit directement avec le nom final, où le destinataire attendra de recevoir un poste pour notifier l’arrivée du fichier. @@ -1146,8 +1147,8 @@ ou: Pour plus d´information: `Supprimer les doublons <../Explication/SupprimerLesDoublons.html>`_ -nodupe_fileAgeMax ------------------ +fileAgeMax +---------- Si les fichiers sont plus anciens que ce paramètre (défaut: 30d), ignorez-les, ils sont trop ancien pour qu'il puisse être posté. @@ -1156,8 +1157,8 @@ Dans un Poll : * La valeur par défaut est 7 heures. doit être inférieur à nodupe_ttl pour empêcher la réabsorption de données en double. (discussion complète ici (en anglais): https://github.com/MetPX/sarracenia/issues/904) -nodupe_fileAgeMin ------------------ +fileAgeMin +---------- Si les fichiers sont plus neuf que ce paramètre (défaut: 0 ... désactivé), ignorez-les, ils sont trop neufs pour qu'ils puissent être postés. diff --git a/docs/source/fr/Tutoriel/1_CLI_introduction.ipynb b/docs/source/fr/Tutoriel/1_CLI_introduction.ipynb index c000d4665..dfe11e6df 100644 --- a/docs/source/fr/Tutoriel/1_CLI_introduction.ipynb +++ b/docs/source/fr/Tutoriel/1_CLI_introduction.ipynb @@ -214,7 +214,7 @@ "messageAgeMax=0, messageCountMax=10, messageDebugDump=False, messageRateMax=0,\n", "messageRateMin=0,\n", "message_strategy=\"...ubborn': True, 'failure_duration': '5m'}\", message_ttl=0,\n", - "mirror=False, no=0, nodupe_fileAgeMax=0, nodupe_ttl=0, overwrite=True,\n", + "mirror=False, no=0, fileAgeMax=0, nodupe_ttl=0, overwrite=True,\n", "permCopy=True, permDefault=0, permDirDefault=509, permLog=384,\n", "pid_filename=\"...e/hpfx_amis//subscribe_hpfx_amis_00.pid'\", plugins_early=[],\n", "plugins_late=['sarracenia.flowcb.log.Log'], post_baseDir=None,\n", diff --git a/sarracenia/config.py b/sarracenia/config.py index 1b2019dba..898462984 100755 --- a/sarracenia/config.py +++ b/sarracenia/config.py @@ -141,8 +141,8 @@ def __repr__(self) -> str: float_options = [ ] duration_options = [ - 'expire', 'housekeeping', 'logRotateInterval', 'message_ttl', 'nodupe_fileAgeMax', 'retry_ttl', - 'sanity_log_dead', 'sleep', 'timeout', 'varTimeOffset' + 'expire', 'housekeeping', 'logRotateInterval', 'message_ttl', 'fileAgeMax', 'fileAgeMin', \ + 'retry_ttl', 'sanity_log_dead', 'sleep', 'timeout', 'varTimeOffset' ] list_options = [ 'path', 'vip' ] @@ -666,7 +666,9 @@ class Config: 'exchange_split': 'exchangeSplit', 'exchange_suffix': 'exchangeSuffix', 'expiry': 'expire', - 'file_time_limit' : 'nodupe_fileAgeMax', + 'file_time_limit' : 'fileAgeMax', + 'nodupe_fileAgeMax' : 'fileAgeMax', + 'nodupe_fileAgeMin' : 'fileAgeMin', 'fp' : 'force_polling', 'fs' : 'follow_symlinks', 'h' : 'help', @@ -768,7 +770,8 @@ def __init__(self, parent=None) -> 'Config': self.bufsize = 1024 * 1024 self.byteRateMax = 0 - self.nodupe_fileAgeMax = 0 # disabled. + self.fileAgeMax = 0 # disabled. + self.fileAgeMin = 0 # disabled. self.timezone = 'UTC' self.debug = False self.declared_exchanges = [] @@ -1553,7 +1556,9 @@ def parse_file(self, cfg, component=None): self._parse_v3unplugin(v) elif k in ['inflight', 'lock']: if v[:-1].isnumeric(): - setattr(self, k, durationToSeconds(v)) + vv = durationToSeconds(v) + setattr(self, k, vv) + self.fileAgeMin = vv else: if line[1].lower() in ['none', 'off', 'false']: setattr(self, k, None) diff --git a/sarracenia/flow/poll.py b/sarracenia/flow/poll.py index 30e2760b0..ab640c0f5 100644 --- a/sarracenia/flow/poll.py +++ b/sarracenia/flow/poll.py @@ -32,8 +32,8 @@ 'randomize': False, 'post_on_start': False, 'sleep': -1, - 'nodupe_ttl': 8 * 60 * 60, - 'nodupe_fileAgeMax': 7 * 60 * 60, + 'nodupe_ttl': 7 * 60 * 60, + 'fileAgeMax': 30 * 24 * 60 * 60, } # 'sumflg': 'cod,md5', diff --git a/sarracenia/flowcb/gather/file.py b/sarracenia/flowcb/gather/file.py index e5e157914..0f3421c17 100755 --- a/sarracenia/flowcb/gather/file.py +++ b/sarracenia/flowcb/gather/file.py @@ -51,30 +51,6 @@ def __init__(self, parent): self.on_moved = parent.on_moved super().__init__() -def path_inflight_tooNew(inflight, lstat): - """ - check the inflight, compare fail age against it. - return True if the file is too new to be posted. - """ - - if not type(inflight) in [float, int] : - #logger.debug("ok inflight unused") - return False - - if lstat == None or not hasattr(lstat,'st_mtime'): - #logger.debug("ok lstat None") - return False - - age = nowflt() - lstat.st_mtime - if age < inflight: - logger.debug("%d vs (inflight setting) %d seconds. Too New!" % \ - (age,inflight) ) - return True - - return False - - - class File(FlowCB): """ @@ -142,7 +118,6 @@ def __init__(self, options): self.o.create_modify = ('create' in self.o.fileEvents) or ( 'modify' in self.o.fileEvents) - def post_delete(self, path, key=None, value=None,is_directory=False): #logger.debug("post_delete %s (%s,%s)" % (path, key, value)) @@ -439,7 +414,9 @@ def post1move(self, src, dst): def process_event(self, event, src, dst): """ - return a list of messages. + return a tuple: pop? + list of messages. + + """ #logger.debug("process_event %s %s %s " % (event,src,dst) ) @@ -447,19 +424,19 @@ def process_event(self, event, src, dst): if event == 'delete' : if event in self.o.fileEvents: - return self.post1file(src, None) - return [] + return (True, self.post1file(src, None)) + return (True, []) if event == 'rmdir' : if event in self.o.fileEvents: - return self.post1file(src, None, is_directory=True) - return [] + return (True, self.post1file(src, None, is_directory=True)) + return (True, []) # move if event == 'move': if self.o.create_modify: - return self.post1move(src, dst) + return (True, self.post1move(src, dst)) # create or modify @@ -472,27 +449,36 @@ def process_event(self, event, src, dst): if os.path.islink(src): if 'link' in self.o.fileEvents: - return self.post1file(src, None) - return [] + return (True, self.post1file(src, None)) + return (True, []) # file : must exists # (may have been deleted since event caught) - if not os.path.exists(src): return [] + if not os.path.exists(src): return (True, []) # file : must be old enough lstat = sarracenia.stat(src) - if path_inflight_tooNew(self.o.inflight, lstat): return [] + if lstat and hasattr(lstat,'st_mtime'): + age = time.time() - lstat.st_mtime + + if age < self.o.fileAgeMin: + logger.debug( "%d vs (inflight setting) %d seconds. Too New!" % (age,self.o.fileAgeMin) ) + return (False, []) + + if self.o.fileAgeMax > 0 and age > self.o.fileAgeMax: + logger.debug("%d vs (fileAgeMax setting) %d seconds. Too Old!" % (age,self.o.fileAgeMax) ) + return (True, []) # post it if event == 'mkdir' and 'mkdir' in self.o.fileEvents: - return self.post1file(src, lstat, is_directory=True) + return (True, self.post1file(src, lstat, is_directory=True)) elif self.o.create_modify: - return self.post1file(src, lstat) - return [] + return (True, self.post1file(src, lstat)) + return (True, []) def set_blocksize(self, bssetting, fsiz): @@ -529,9 +515,11 @@ def wakeup(self): messages = [] for key in self.cur_events: + event_done=False event, src, dst = self.cur_events[key] try: - messages.extend(self.process_event(event, src, dst)) + (event_done, new_messages) = self.process_event(event, src, dst) + messages.extend(new_messages) except OSError as err: """ This message is reduced to debug priority because it often happens when files @@ -542,7 +530,9 @@ def wakeup(self): logger.debug("skipping event that could not be processed: ({}): {}".format( event, err)) logger.debug("Exception details:", exc_info=True) - self.left_events.pop(key) + event_done=True + if event_done: + self.left_events.pop(key) return messages def walk(self, src): diff --git a/sarracenia/flowcb/nodupe/disk.py b/sarracenia/flowcb/nodupe/disk.py index cc31d502d..b900f55c9 100755 --- a/sarracenia/flowcb/nodupe/disk.py +++ b/sarracenia/flowcb/nodupe/disk.py @@ -30,10 +30,10 @@ class Disk(NoDupe): The expiry based on nodupe_ttl is applied every housekeeping interval. - nodupe_fileAgeMax - the oldest file that will be considered for processing. + fileAgeMax - the oldest file that will be considered for processing. files older than this threshold will be rejected. - nodupe_fileAgeMin - the newest file that can be considered for processing. + fileAgeMin - the newest file that can be considered for processing. files newer than this threshold will be rejected. if not specified, the value of option *inflight* may be referenced if it is an integer value. @@ -55,8 +55,6 @@ def __init__(self, options): logging.basicConfig(format=self.o.logFormat, level=getattr(logging, self.o.logLevel.upper())) self.o.add_option( 'nodupe_ttl', 'duration', 0 ) - self.o.add_option( 'nodupe_fileAgeMax', 'duration', 0 ) - self.o.add_option( 'nodupe_fileAgeMin', 'duration', 0 ) logger.info('time_to_live=%d, ' % (self.o.nodupe_ttl)) @@ -149,13 +147,13 @@ def check_message(self, msg) -> bool : def after_accept(self, worklist): new_incoming = [] self.now = nowflt() - if self.o.nodupe_fileAgeMax > 0: - min_mtime = self.now - self.o.nodupe_fileAgeMax + if self.o.fileAgeMax > 0: + min_mtime = self.now - self.o.fileAgeMax else: min_mtime = 0 - if self.o.nodupe_fileAgeMin > 0: - max_mtime = self.now - self.o.nodupe_fileAgeMin + if self.o.fileAgeMin > 0: + max_mtime = self.now - self.o.fileAgeMin elif type(self.o.inflight) in [ int, float ] and self.o.inflight > 0: max_mtime = self.now - self.o.inflight else: diff --git a/sarracenia/flowcb/nodupe/redis.py b/sarracenia/flowcb/nodupe/redis.py index 20d92d94a..8602fb6e0 100644 --- a/sarracenia/flowcb/nodupe/redis.py +++ b/sarracenia/flowcb/nodupe/redis.py @@ -32,10 +32,10 @@ class Redis(NoDupe): The expiry based on nodupe_ttl is applied every housekeeping interval. - nodupe_fileAgeMax - the oldest file that will be considered for processing. + fileAgeMax - the oldest file that will be considered for processing. files older than this threshold will be rejected. - nodupe_fileAgeMin - the newest file that can be considered for processing. + fileAgeMin - the newest file that can be considered for processing. files newer than this threshold will be rejected. if not specified, the value of option *inflight* may be referenced if it is an integer value. @@ -50,8 +50,6 @@ def __init__(self, options): logging.basicConfig(format=self.o.logFormat, level=getattr(logging, self.o.logLevel.upper())) self.o.add_option( 'nodupe_ttl', 'duration', 0 ) - self.o.add_option( 'nodupe_fileAgeMax', 'duration', 0 ) - self.o.add_option( 'nodupe_fileAgeMin', 'duration', 0 ) logger.info('time_to_live=%d, ' % (self.o.nodupe_ttl)) @@ -145,13 +143,13 @@ def on_housekeeping(self): def after_accept(self, worklist): new_incoming = [] self.now = nowflt() - if self.o.nodupe_fileAgeMax > 0: - min_mtime = self.now - self.o.nodupe_fileAgeMax + if self.o.fileAgeMax > 0: + min_mtime = self.now - self.o.fileAgeMax else: min_mtime = 0 - if self.o.nodupe_fileAgeMin > 0: - max_mtime = self.now - self.o.nodupe_fileAgeMin + if self.o.fileAgeMin > 0: + max_mtime = self.now - self.o.fileAgeMin elif type(self.o.inflight) in [ int, float ] and self.o.inflight > 0: max_mtime = self.now - self.o.inflight else: diff --git a/sarracenia/flowcb/v2wrapper.py b/sarracenia/flowcb/v2wrapper.py index 57e1b244f..82e7b17ab 100755 --- a/sarracenia/flowcb/v2wrapper.py +++ b/sarracenia/flowcb/v2wrapper.py @@ -61,6 +61,7 @@ def sumstrFromMessage( msg ) -> str: if 'fileOp' in msg: if 'rename' in msg['fileOp']: msg['oldname'] = msg['fileOp']['rename'] + if 'link' in msg['fileOp']: hash = sha512() hash.update( bytes( msg['fileOp']['link'], encoding='utf-8' ) ) @@ -77,7 +78,6 @@ def sumstrFromMessage( msg ) -> str: sumstr = 'r,%s' % hash.hexdigest() else: sumstr = 'm,%s' % hash.hexdigest() - else: logger.error('unknown fileOp: %s' % msg['fileOp'] ) return sumstr From 1cec149ed7547bb3d91e5cdfd5f0e3303a46e0df Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Mon, 12 Feb 2024 11:00:38 -0500 Subject: [PATCH 14/30] Issue912 retry refilter flag (#928) * #912 add messageCountMax argument to gather * #912 missed some gather api changes * #912 move retry loads from after_accept to gather * #912 adding retry_refilter to permit both retry schemes * documenting retry_refilter option --- docs/source/Reference/sr3_options.7.rst | 13 ++++++++ docs/source/fr/Reference/sr3_options.7.rst | 15 +++++++++ sarracenia/config.py | 7 ++++- sarracenia/flow/__init__.py | 9 +++++- sarracenia/flowcb/__init__.py | 4 +-- sarracenia/flowcb/gather/file.py | 2 +- sarracenia/flowcb/gather/message.py | 2 +- sarracenia/flowcb/log.py | 4 +-- sarracenia/flowcb/retry.py | 36 +++++++++++++++++++++- sarracenia/flowcb/run.py | 2 +- sarracenia/flowcb/scheduled/__init__.py | 2 +- sarracenia/flowcb/scheduled/wiski.py | 4 +-- 12 files changed, 87 insertions(+), 13 deletions(-) diff --git a/docs/source/Reference/sr3_options.7.rst b/docs/source/Reference/sr3_options.7.rst index bcadf113d..3ef81e96e 100644 --- a/docs/source/Reference/sr3_options.7.rst +++ b/docs/source/Reference/sr3_options.7.rst @@ -1552,6 +1552,19 @@ retryEmptyBeforeExit: (default: False) Used for sr_insects flow tests. Prevents Sarracenia from exiting while there are messages remaining in the retry queue(s). By default, a post will cleanly exit once it has created and attempted to publish messages for all files in the specified directory. If any messages are not successfully published, they will be saved to disk to retry later. If a post is only run once, as in the flow tests, these messages will never be retried unless retryEmptyBeforeExit is set to True. +retry_refilter (default: False) +----------------------------------------- + +The **retry_refilter** option alters how messages are reloaded when they are retrieved from +a retry queue. The default way (value: False) is to repeat the transfer using exactly +the same message as before. If **retry_refilter** is set (value: True) then all the +message's calculated fields will be discarded, and the processing re-started from the gather +phase (accept/reject processing will be repeated, destinations re-calculated.) + +The normal retry behaviour is use when the remote has had a failure, and need to +re-send later, while the retry_refilter option is used when recovering from configuration +file errors, and some messages had incorrect selection or destination criteria. + retry_ttl (default: same as expire) ---------------------------------------------- diff --git a/docs/source/fr/Reference/sr3_options.7.rst b/docs/source/fr/Reference/sr3_options.7.rst index 80c4dfea9..479c170c3 100644 --- a/docs/source/fr/Reference/sr3_options.7.rst +++ b/docs/source/fr/Reference/sr3_options.7.rst @@ -1524,6 +1524,21 @@ publiés avec succès, ils seront enregistrés sur le disque pour réessayer ult exécutée qu’une seule fois, comme dans les tests de flux, ces messages d'annonce ne seront jamais réessayés, sauf si retryEmptyBeforeExit est défini à True. +retry_refilter (par défaut : False) +--------------------------------------------- + +L'option **retry_refilter** modifie la façon dont les messages sont rechargés lorsqu'ils sont récupérés à partir +d'une file d'attente pour une nouvelle tentive de transfer (retry). La méthode par défaut (valeur : False) +consiste à répéter le transfert en utilisant exactement le même message que précédemment. Si **retry_refilter** +est défini (valeur : True), alors tous les Les champs calculés du message + seront supprimés et le +traitement redémarrera à partir de la phase *gather* (le traitement d'acceptation/rejet sera +répété, les destinations recalculées.) + +Le comportement normal de nouvelle tentative (retry) est utilisé lorsque la destination a subit une +panne et doit renvoyer plus tard, tandis que l'option **retry_refilter** est utilisée lors de la récupération +de la configuration + + retry_ttl (défaut: identique à expire) ------------------------------------------------- diff --git a/sarracenia/config.py b/sarracenia/config.py index 898462984..26e0078a7 100755 --- a/sarracenia/config.py +++ b/sarracenia/config.py @@ -113,6 +113,7 @@ def __repr__(self) -> str: 'recursive' : True, 'report': False, 'retryEmptyBeforeExit': False, + 'retry_refilter': False, 'sanity_log_dead': 9999, 'sourceFromExchange': False, 'sundew_compat_regex_first_match_is_zero': False, @@ -134,7 +135,7 @@ def __repr__(self) -> str: 'follow_symlinks', 'force_polling', 'inline', 'inlineOnly', 'inplace', 'logMetrics', 'logStdout', 'logReject', 'restore', \ 'messageDebugDump', 'mirror', 'timeCopy', 'notify_only', 'overwrite', 'post_on_start', \ 'permCopy', 'queueBind', 'queueDeclare', 'randomize', 'recursive', 'realpathPost', \ - 'reconnect', 'report', 'reset', 'retryEmptyBeforeExit', 'save', 'sundew_compat_regex_first_match_is_zero', \ + 'reconnect', 'report', 'reset', 'retry_refilter', 'retryEmptyBeforeExit', 'save', 'sundew_compat_regex_first_match_is_zero', \ 'sourceFromExchange', 'statehost', 'users', 'v2compatRenameDoublePost' ] @@ -2521,6 +2522,10 @@ def parse_args(self, isPost=False): help= 'allows simultaneous use of multiple versions and types of messages' ) + parser.add_argument('--retry_refilter', + action='store_true', + default=self.retry_refilter, + help='repeat message processing when retrying transfers (default just resends as previous attempt.)') #FIXME: select/accept/reject in parser not implemented. parser.add_argument( '--select', diff --git a/sarracenia/flow/__init__.py b/sarracenia/flow/__init__.py index 7483dfb78..2c98eff43 100644 --- a/sarracenia/flow/__init__.py +++ b/sarracenia/flow/__init__.py @@ -1070,9 +1070,10 @@ def filter(self) -> None: (len(self.worklist.incoming), len(self.worklist.rejected))) def gather(self) -> None: + so_far=0 for p in self.plugins["gather"]: try: - new_incoming = p() + new_incoming = p(self.o.batch-so_far) except Exception as ex: logger.error( f'flowCallback plugin {p} crashed: {ex}' ) logger.debug( "details:", exc_info=True ) @@ -1080,6 +1081,12 @@ def gather(self) -> None: if len(new_incoming) > 0: self.worklist.incoming.extend(new_incoming) + so_far += len(new_incoming) + + # if we gathered enough with a subset of plugins then return. + if so_far >= self.o.batch: + return + def do(self) -> None: diff --git a/sarracenia/flowcb/__init__.py b/sarracenia/flowcb/__init__.py index cdd2cd276..5ec2c94de 100755 --- a/sarracenia/flowcb/__init__.py +++ b/sarracenia/flowcb/__init__.py @@ -80,9 +80,9 @@ def ack(self,messagelist) -> None:: Task: acknowledge messages from a gather source. - def gather(self) -> list:: + def gather(self, messageCountMax) -> list:: - Task: gather messages from a source... return a list of messages. + Task: gather messages from a source... return a list of messages in a poll, gather is always called, regardless of vip posession. in all other components, gather is only called when in posession diff --git a/sarracenia/flowcb/gather/file.py b/sarracenia/flowcb/gather/file.py index 0f3421c17..8238cb4dc 100755 --- a/sarracenia/flowcb/gather/file.py +++ b/sarracenia/flowcb/gather/file.py @@ -670,7 +670,7 @@ def on_start(self): self.queued_messages = [] self.primed = False - def gather(self): + def gather(self, messageCountMax): """ from sr_post.py/run diff --git a/sarracenia/flowcb/gather/message.py b/sarracenia/flowcb/gather/message.py index e45f0bf7f..d155b8795 100755 --- a/sarracenia/flowcb/gather/message.py +++ b/sarracenia/flowcb/gather/message.py @@ -28,7 +28,7 @@ def __init__(self, options) -> None: else: logger.critical('missing required broker specification') - def gather(self) -> list: + def gather(self, messageCountMax) -> list: """ return a current list of messages. """ diff --git a/sarracenia/flowcb/log.py b/sarracenia/flowcb/log.py index 7350f2f42..e49aa8c51 100755 --- a/sarracenia/flowcb/log.py +++ b/sarracenia/flowcb/log.py @@ -60,9 +60,9 @@ def __reset(self): def metricsReport(self): return { 'lagMax': self.lagMax, 'lagTotal':self.lagTotal, 'lagMessageCount':self.msgCount, 'rejectCount':self.rejectCount } - def gather(self): + def gather(self, messageCountMax): if set(['gather']) & self.o.logEvents: - logger.info('') + logger.info( f' messageCountMax: {messageCountMax} ') return [] diff --git a/sarracenia/flowcb/retry.py b/sarracenia/flowcb/retry.py index 9bedb8371..47c791865 100755 --- a/sarracenia/flowcb/retry.py +++ b/sarracenia/flowcb/retry.py @@ -54,6 +54,11 @@ def __init__(self, options) -> None: return self.o.add_option( 'retry_driver', 'str', 'disk') + + # retry_refilter False -- rety to send with existing processing. + # retry_refilter True -- re-ingest and re-apply processing (if it has changed.) + self.o.add_option( 'retry_refilter', 'flag', False) + #queuedriver = os.getenv('SR3_QUEUEDRIVER', 'disk') if self.o.retry_driver == 'redis': @@ -69,12 +74,41 @@ def __init__(self, options) -> None: logger.debug('logLevel=%s' % self.o.logLevel) + + def gather(self, qty) -> None: + """ + If there are only a few new messages, get some from the download retry queue and put them into + `worklist.incoming`. + + Do this in the gather() entry point if retry_refilter is True. + + """ + if not features['retry']['present'] or not self.o.retry_refilter: + return [] + + if qty <= 0: return [] + + message_list = self.download_retry.get(qty) + + # eliminate calculated values so it is refiltered from scratch. + for m in message_list: + for k in m: + if k in m['_deleteOnPost'] or k.startswith('new_'): + del m[k] + del m['_deleteOnPost'] + + return message_list + + def after_accept(self, worklist) -> None: """ If there are only a few new messages, get some from the download retry queue and put them into `worklist.incoming`. + + Do this in the after_accept() entry point if retry_refilter is False. + """ - if not features['retry']['present'] : + if not features['retry']['present'] or self.o.retry_refilter: return qty = (self.o.batch / 2) - len(worklist.incoming) diff --git a/sarracenia/flowcb/run.py b/sarracenia/flowcb/run.py index 5b9d00e09..c183522ab 100755 --- a/sarracenia/flowcb/run.py +++ b/sarracenia/flowcb/run.py @@ -68,7 +68,7 @@ def run_script(self, script): logger.error("subprocess.run failed err={}".format(err)) logger.debug("Exception details:", exc_info=True) - def gather(self): + def gather(self, messageCountMax): """ FIXME: this does not make sense. need to figure out how to get the messages back from the script, perhaps using a json file reader? diff --git a/sarracenia/flowcb/scheduled/__init__.py b/sarracenia/flowcb/scheduled/__init__.py index f2af22e06..dd770e746 100644 --- a/sarracenia/flowcb/scheduled/__init__.py +++ b/sarracenia/flowcb/scheduled/__init__.py @@ -87,7 +87,7 @@ def __init__(self,options,logger=logger): now=datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc) self.update_appointments(now) - def gather(self): + def gather(self,messageCountMax): # for next expected post self.wait_until_next() diff --git a/sarracenia/flowcb/scheduled/wiski.py b/sarracenia/flowcb/scheduled/wiski.py index 9e3ddc4a2..256b036a5 100644 --- a/sarracenia/flowcb/scheduled/wiski.py +++ b/sarracenia/flowcb/scheduled/wiski.py @@ -131,7 +131,7 @@ def submit_tokenization_request(self): return submit_tokenization_request - def gather(self): # placeholder + def gather(self,messageCountMax): # placeholder messages=[] @@ -216,5 +216,5 @@ def gather(self): # placeholder logging.basicConfig(level=logging.DEBUG) me = Wiski(flow.o) - me.gather() + me.gather(flow.o.batch) From dc5a061ddbc7269d54afc4f19fb877ac114ba703 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Mon, 12 Feb 2024 20:24:07 -0500 Subject: [PATCH 15/30] upgrading note for 3.0.52 change in signature --- docs/source/How2Guides/UPGRADING.rst | 17 +++++++++++++++++ docs/source/fr/CommentFaire/MiseANiveau.rst | 16 ++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/docs/source/How2Guides/UPGRADING.rst b/docs/source/How2Guides/UPGRADING.rst index 4193a44fb..3611a1696 100644 --- a/docs/source/How2Guides/UPGRADING.rst +++ b/docs/source/How2Guides/UPGRADING.rst @@ -39,6 +39,23 @@ Installation Instructions git --- +3.0.52 +------ + +*CHANGE*: Additional messageCountMax arugment to flowcb.gather() entry point. +when implementing flow callbacks for scheduled flows, or poll overrides, the +gather entry point now takes one additional argument indicating the maximum +number of messages that the routine should return. + +To be compatible with previous versions, one can establish a default value +on the gather:: + + def gather(self, messageMaxCount=None): + +With the default value, plugins are downward compatible. (earlier versions +will call with only self as an argument.) + + 3.0.51 ------- diff --git a/docs/source/fr/CommentFaire/MiseANiveau.rst b/docs/source/fr/CommentFaire/MiseANiveau.rst index 15b9c3831..48ee92d4c 100644 --- a/docs/source/fr/CommentFaire/MiseANiveau.rst +++ b/docs/source/fr/CommentFaire/MiseANiveau.rst @@ -38,6 +38,22 @@ Instructions d’installation git --- +3.0.52 +------ + +*CHANGEMENT* : argument messageCountMax supplémentaire au point d'entrée flowcb.gather(). +lors de la mise en œuvre de rappels de flow pour les flux cédulés ou de remplacements d'poll, le +le point d'entrée de *gather* prend désormais un argument supplémentaire indiquant le maximum +nombre de messages que la routine doit retourner. + +Pour être compatible avec les versions précédentes, on peut établir une valeur par défaut +sur le rassemblement :: + + def gather(self, messageMaxCount=None) : + +Avec la valeur par défaut, les plugins sont compatible avec les version précédentes. + + 3.0.51 ------ From be5f380cdbd8081e48ab9a4ce5d9a5fa4581af74 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Mon, 12 Feb 2024 20:34:11 -0500 Subject: [PATCH 16/30] documenting changes since previous release --- debian/changelog | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/debian/changelog b/debian/changelog index c05534c6a..283b7b861 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,6 +1,11 @@ metpx-sr3 (3.00.52rc1) UNRELEASED; urgency=medium - * + * fix #912 enable v2 style retry processing with new --retry_refilter flag. + * fix #920 fix inflight mtime not working in watch. + * fix #920 replaced nodupe_fileAge(Min and Max) with fileAge(Min and Max) + * fix #920 inflight mtime merged with fileMinAge + * fix #907 declare exchange in admin.conf wasn't working. + * fix #922 sr3 showing wrong exchange on status/show. -- peter Fri, 02 Feb 2024 09:00:18 -0500 From c7b77188b1187ef7f4badc406178d2ea8d2f7e67 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Mon, 12 Feb 2024 21:40:06 -0500 Subject: [PATCH 17/30] update tests for fileAge changes --- tests/sarracenia/flowcb/nodupe/disk_test.py | 4 ++-- tests/sarracenia/flowcb/nodupe/nodupe_test.py | 8 ++++---- tests/sarracenia/flowcb/nodupe/redis_test.py | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/sarracenia/flowcb/nodupe/disk_test.py b/tests/sarracenia/flowcb/nodupe/disk_test.py index ebefb87e4..273122b9b 100644 --- a/tests/sarracenia/flowcb/nodupe/disk_test.py +++ b/tests/sarracenia/flowcb/nodupe/disk_test.py @@ -557,8 +557,8 @@ def test_after_accept__WithFileAges(tmp_path, capsys): nodupe = Disk(BaseOptions) nodupe.o.nodupe_ttl = 100000 - nodupe.o.nodupe_fileAgeMin = 1000 - nodupe.o.nodupe_fileAgeMax = 1000 + nodupe.o.fileAgeMin = 1000 + nodupe.o.fileAgeMax = 1000 nodupe.open() nodupe.now = nowflt() + 10 diff --git a/tests/sarracenia/flowcb/nodupe/nodupe_test.py b/tests/sarracenia/flowcb/nodupe/nodupe_test.py index c2b2fd93f..e976b4d36 100644 --- a/tests/sarracenia/flowcb/nodupe/nodupe_test.py +++ b/tests/sarracenia/flowcb/nodupe/nodupe_test.py @@ -242,8 +242,8 @@ def test_after_accept__WithFileAges(tmp_path, capsys): #Disk nodupe_disk = NoDupe_Disk(BaseOptions) nodupe_disk.o.nodupe_ttl = 100000 - nodupe_disk.o.nodupe_fileAgeMin = 1000 - nodupe_disk.o.nodupe_fileAgeMax = 1000 + nodupe_disk.o.fileAgeMin = 1000 + nodupe_disk.o.fileAgeMax = 1000 nodupe_disk.now = message_now nodupe_disk.on_start() @@ -255,8 +255,8 @@ def test_after_accept__WithFileAges(tmp_path, capsys): #Redis nodupe_redis = NoDupe_Redis(BaseOptions) nodupe_redis.o.nodupe_ttl = 100000 - nodupe_redis.o.nodupe_fileAgeMin = 1000 - nodupe_redis.o.nodupe_fileAgeMax = 1000 + nodupe_redis.o.fileAgeMin = 1000 + nodupe_redis.o.fileAgeMax = 1000 nodupe_redis.now = message_now nodupe_redis.on_start() diff --git a/tests/sarracenia/flowcb/nodupe/redis_test.py b/tests/sarracenia/flowcb/nodupe/redis_test.py index 9bb1a95fb..b4c973e1a 100644 --- a/tests/sarracenia/flowcb/nodupe/redis_test.py +++ b/tests/sarracenia/flowcb/nodupe/redis_test.py @@ -253,8 +253,8 @@ def test_after_accept__WithFileAges(tmp_path, capsys): nodupe = Redis(BaseOptions) nodupe.o.nodupe_ttl = 100000 - nodupe.o.nodupe_fileAgeMin = 1000 - nodupe.o.nodupe_fileAgeMax = 1000 + nodupe.o.fileAgeMin = 1000 + nodupe.o.fileAgeMax = 1000 nodupe.now = nowflt() + 10 From de7c617a0a55f8e32fa3f4f4febbe9277003817d Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Mon, 12 Feb 2024 19:19:10 -0500 Subject: [PATCH 18/30] somehow missed one text replacement for nodupe_fileAge --- sarracenia/flow/poll.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sarracenia/flow/poll.py b/sarracenia/flow/poll.py index ab640c0f5..f6dd49d32 100644 --- a/sarracenia/flow/poll.py +++ b/sarracenia/flow/poll.py @@ -74,8 +74,8 @@ def __init__(self, options): self.plugins['load'].insert(0, 'sarracenia.flowcb.post.message.Message') - if self.o.nodupe_ttl < self.o.nodupe_fileAgeMax: - logger.warning( f"nodupe_ttl < nodupe_fileAgeMax means some files could age out of the cache and be re-ingested ( see : https://github.com/MetPX/sarracenia/issues/904") + if self.o.nodupe_ttl < self.o.fileAgeMax: + logger.warning( f"nodupe_ttl < fileAgeMax means some files could age out of the cache and be re-ingested ( see : https://github.com/MetPX/sarracenia/issues/904") if not features['ftppoll']['present']: if hasattr( self.o, 'pollUrl' ) and ( self.o.pollUrl.startswith('ftp') ): From fd6438854cc36fae5122e049b451f674d4fbd880 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Mon, 12 Feb 2024 19:19:51 -0500 Subject: [PATCH 19/30] fix #920 add _isRetry when retrieving from retry queue --- sarracenia/flowcb/nodupe/disk.py | 4 +--- sarracenia/flowcb/nodupe/redis.py | 2 +- sarracenia/flowcb/retry.py | 4 +++- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sarracenia/flowcb/nodupe/disk.py b/sarracenia/flowcb/nodupe/disk.py index b900f55c9..ada9eeba2 100755 --- a/sarracenia/flowcb/nodupe/disk.py +++ b/sarracenia/flowcb/nodupe/disk.py @@ -154,8 +154,6 @@ def after_accept(self, worklist): if self.o.fileAgeMin > 0: max_mtime = self.now - self.o.fileAgeMin - elif type(self.o.inflight) in [ int, float ] and self.o.inflight > 0: - max_mtime = self.now - self.o.inflight else: # FIXME: should we add some time here to allow for different clocks? # 100 seconds in the future? hmm... @@ -177,7 +175,7 @@ def after_accept(self, worklist): worklist.rejected.append(m) continue - if self.check_message(m): + if '_isRetry' in m or self.check_message(m): new_incoming.append(m) else: m['_deleteOnPost'] |= set(['reject']) diff --git a/sarracenia/flowcb/nodupe/redis.py b/sarracenia/flowcb/nodupe/redis.py index 8602fb6e0..c9ce9701b 100644 --- a/sarracenia/flowcb/nodupe/redis.py +++ b/sarracenia/flowcb/nodupe/redis.py @@ -173,7 +173,7 @@ def after_accept(self, worklist): worklist.rejected.append(m) continue - if self._is_new(m): + if '_isRetry' in m or self._is_new(m): new_incoming.append(m) else: m['_deleteOnPost'] |= set(['reject']) diff --git a/sarracenia/flowcb/retry.py b/sarracenia/flowcb/retry.py index 47c791865..9cf47f04e 100755 --- a/sarracenia/flowcb/retry.py +++ b/sarracenia/flowcb/retry.py @@ -95,7 +95,9 @@ def gather(self, qty) -> None: for k in m: if k in m['_deleteOnPost'] or k.startswith('new_'): del m[k] - del m['_deleteOnPost'] + m['_isRetry'] = True + m['_deleteOnPost'] = set( [ '_isRetry' ] ) + return message_list From b1c9ee9454619251be1b6670af4b3986a1e21532 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Wed, 14 Feb 2024 07:59:56 -0500 Subject: [PATCH 20/30] make rename field processing contingent on download option being set --- sarracenia/flow/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sarracenia/flow/__init__.py b/sarracenia/flow/__init__.py index 2c98eff43..233301d6c 100644 --- a/sarracenia/flow/__init__.py +++ b/sarracenia/flow/__init__.py @@ -806,7 +806,8 @@ def updateFieldsAccepted(self, msg, urlstr, pattern, maskDir, relPath = u.path[1:] + '/' + relPath # FIXME... why the % ? why not just assign it to copy the value? - if 'rename' in msg: relPath = '%s' % msg['rename'] + if self.o.download and 'rename' in msg: + relPath = '%s' % msg['rename'] token = relPath.split('/') filename = token[-1] From 98710da33538bf51e32c33218c0943bb34537b09 Mon Sep 17 00:00:00 2001 From: Andre LeBlanc Date: Tue, 13 Feb 2024 16:11:46 +0000 Subject: [PATCH 21/30] First attempt at raw bulletin renamer #918 --- sarracenia/flowcb/rename/raw2bulletin.py | 346 +++++++++++++++++++++++ 1 file changed, 346 insertions(+) create mode 100644 sarracenia/flowcb/rename/raw2bulletin.py diff --git a/sarracenia/flowcb/rename/raw2bulletin.py b/sarracenia/flowcb/rename/raw2bulletin.py new file mode 100644 index 000000000..d809fbde6 --- /dev/null +++ b/sarracenia/flowcb/rename/raw2bulletin.py @@ -0,0 +1,346 @@ +""" +Description: + sr3 equivalent of the V2 configuration cvt_bulletin_filename_from_content + + Works essentially the same way as its v2 counterpart, except it can get the bulletin file contents 2 ways. + 1. By the sr3 message content + 2. By opening and reading the path to the file directly. + + Decoding of the data is done in the same way of the encoder in flowcb/gather/am.py + + The plugin essentially captures what was done on the V2 converter and ties it up with Sundew source code logic to make it more generalized. + What it can do that the V2 plugin cannot: + - Add the station ID in the filename + - Add the BBB in the filename + - Fetch bulletin data multiple ways + +Examples: + + RAW Ninjo file (4 letter station ID) + WACN07 CWAO 082327 + CZEG AIRMET E1 VALID 080105/080505 CWEG- + + Output: WACN07_CWAO_082327_CZEG__00001 + + Another RAW Ninjo file + FTCN32 CWAO 100500 AAM + (...) + + Output: FTCN32_CWAO_100500_AAM__00002 + + A CACN bulletin missing the correct filename + CACN00 CWAO 141600 + PQU + + Output: CACN00_CWAO_141600_PQU__00003 + +Usage: + callback rename.raw2bulletin + +Contributions: + Andre LeBlanc - First author (2024/02) + +Improvements: + Add more Sundew logic if ever some bulletins end up failing when implemented +""" + +from sarracenia.flowcb import FlowCB +import logging +from base64 import b64encode +import time + +logger = logging.getLogger(__name__) + +class Raw2bulletin(FlowCB): + + def __init__(self,options) : + super().__init__(options,logger) + self.seq = 0 + # self.o.add_option('headers2rename', 'list', ['CA', 'MA' , 'RA']) + + # If file was converted, get rid of extensions it had + def after_accept(self,worklist): + + good_msgs = [] + + for msg in worklist.incoming: + + path = msg['new_dir'] + '/' + msg['new_file'] + + filenameFirstChars = msg['new_file'].split('_')[0] + + # AM bulletins that need their filename rewritten with data should only have two chars before the first underscore + # This is concordance with Sundew logic -> https://github.com/MetPX/Sundew/blob/main/lib/bulletinAm.py#L70-L71 + # These messages are still good, so we will add them to the good_msgs list + if len(filenameFirstChars) != 2: + good_msgs.append(msg) + continue + + data = self.getData(data, path) + + if data == None: + worklist.rejected.append(msg) + continue + + ### Alternative to check for bulletins that need their filename rewritten ### + + # ok = 0 + # for header in self.o.headers2rename: + # _len = len(header) + # # Check if first chars of header match the ones we want to rename + # if data.split(b'\n')[0][0:_len] == header: + # ok = 1 + # break + + # # If nothing has matched, skip to the next iteration + # if ok == 0: + # continue + + lines = data.split(b'\n') + #first_line = lines[0].strip(b'\r') + #first_line = first_line.strip(b' ') + #first_line = first_line.strip(b'\t') + first_line = lines[0].split(b' ') + + ddhhmm = None + + # Build header from bulletin + header = self.buildHeader(first_line) + if header == None: + logger.error("Unable to fetch header contents. Skipping message") + worklist.rejected.append(msg) + continue + + # Get the station timestamp from the file contents + ddhhmm = self.getTime(data) + if ddhhmm == None: + logger.error("Unable to get julian time. Skipping message") + worklist.rejected.append(msg) + continue + + # Get the BBB + BBB = self.getBBB(first_line) + + # Get the station ID + stn_id = self.getStation(data) + + # Get sequence (random ints) + seq = self.getSequence() + + # Rename file with data fetched + try: + new_file = header + "_" + ddhhmm + "_" + BBB + "_" + stn_id + "_" + seq + + msg['new_file'] = new_file + new_path = msg['new_dir'] + '/' + msg['new_file'] + + logger.info(f"New filename (with path): {new_path}") + + good_msgs.append(msg) + + except Exception as e: + logger.error(f"Unable to rename filename. Error message: {e}") + worklist.rejected.append(msg) + continue + + worklist.incoming = good_msgs + + + def getData(self, msg, path): + + # Read file data from message or from file path directly if message content not found. + try: + if msg['content']: + data = msg['content'] + else: + self.binary = 0 + + fp = open(path, 'rb') + data = fp.read() + # bulletin = Bulletin(data) + fp.close() + + # Decode data, binary and text. Integrate inputCharset + if data.splitlines()[1][:4] in self.o.binaryInitialCharacters: + self.binary = 1 + + if not self.binary: + data = data.decode(self.o.inputCharset) + else: + data = b64encode(data).decode('ascii') + + return data + + except Exception as e: + logger.error(f"Could not fetch file data of from either message content or {path}. Error details: {e}") + return None + + + def getSequence(self): + """ sequence number to make the file unique... + """ + self.seq = self.seq + 1 + if self.seq > 99999: + self.seq = 1 + return str(self.seq).zfill(5) + + + def getStation(self, data): + """Extracted from Sundew code: https://github.com/MetPX/Sundew/blob/main/lib/bulletin.py#L327-L408 + Get the station ID from the bulletin contents. + Some station ID's are located on different lines (depends on the bulletin) + Use stn_id_loc to determine which line holds the station ID. + Examples: + CACN00 CWAO -> Station ID located on second line. + FTCN32 CWAO -> Station ID located on first line (with header) + """ + + station = '' + data = data.lstrip('\n') + data = data.split('\n') + + try: + premiereLignePleine = "" + deuxiemeLignePleine = "" + + # special case, need to get the next full line. + i = 0 + for ligne in data[1:]: + i += 1 + premiereLignePleine = ligne + if len(premiereLignePleine) > 1: + if len(data) > i+1 : deuxiemeLignePleine = data[i+1] + break + + #print " ********************* header = ", data[0][0:7] + # switch depends on bulletin type. + if data[0][0:2] == "SA": + if data[1].split()[0] in ["METAR","LWIS"]: + station = premiereLignePleine.split()[1] + else: + station = premiereLignePleine.split()[0] + + elif data[0][0:2] == "SP": + station = premiereLignePleine.split()[1] + + elif data[0][0:2] in ["SI","SM"]: + station = premiereLignePleine.split()[0] + if station == "AAXX" : + if deuxiemeLignePleine != "" : + station = deuxiemeLignePleine.split()[0] + else : + station = '' + + elif data[0][0:6] in ["SRCN40","SXCN40","SRMT60","SXAK50", "SRND20", "SRND30"]: + #elif data[0][0:6] in self.wmo_id: + station = premiereLignePleine.split()[0] + + elif data[0][0:2] in ["FC","FT"]: + if premiereLignePleine.split()[1] == "AMD": + station = premiereLignePleine.split()[2] + else: + station = premiereLignePleine.split()[1] + + elif data[0][0:2] in ["UE","UG","UK","UL","UQ","US"]: + parts = premiereLignePleine.split() + if parts[0][:2] in ['EE', 'II', 'QQ', 'UU']: + station = parts[1] + elif parts[0][:2] in ['PP', 'TT']: + station = parts[2] + else: + station = '' + + elif data[0][0:2] in ["RA","MA","CA"]: + station = premiereLignePleine.split()[0].split('/')[0] + + except Exception: + station = '' + + if station != '' : + while len(station) > 1 and station[0] == '?' : + station = station[1:] + if station[0] != '?' : + station = station.split('?')[0] + if station[-1] == '=' : station = station[:-1] + else : + station = '' + + return station + + + def getBBB(self, first_line): + """Get the BBB. If none found, return empty string. + The BBB is the field of the bulletin header that states if it was amended or not. + """ + + if len(first_line) != 4: + BBB = '' + else: + BBB = first_line[3] + + return BBB + + def buildHeader(self, first_line): + """ Build header from file contents + """ + + try: + T1T2A1A2ii = first_line[0] + CCCC = first_line[1] + # YYGGgg = parts[2] + + header = T1T2A1A2ii + "_" + CCCC # + "_" + YYGGgg + + except Exception: + header = None + + return header + + + def getTime(self, data): + """ extract time from the data of the ca station + the data's first line looks like this : x,yyyy,jul,hhmm,... + where x is an integer of no importance, followed by obs'time + yyyy = year + jul = julian day + hhmm = hour and mins + """ + + parts = data.split(b',') + + if len(parts) < 4: return None + + year = parts[1] + jul = parts[2] + hhmm = parts[3] + + # passe-passe pour le jour julien en float parfois ? + f = float(jul) + i = int(f) + jul = '%s' % i + # fin de la passe-passe + + # strange 0 filler + + while len(hhmm) < 4: + hhmm = '0' + hhmm + while len(jul) < 3: + jul = '0' + jul + + # problematic 2400 for 00z + + if hhmm != '2400': + emissionStr = year + jul + hhmm + timeStruct = time.strptime(emissionStr, '%Y%j%H%M') + ddHHMM = time.strftime("%d%H%M", timeStruct) + return ddHHMM + + # sometime hhmm is 2400, to avoid exception + # set time to 00, increase by 24 hr + + jul00 = year + jul + '0000' + timeStruct = time.strptime(jul00, '%Y%j%H%M') + ep_emission = time.mktime(timeStruct) + 24 * 60 * 60 + timeStruct = time.localtime(self.ep_emission) + ddHHMM = time.strftime('%d%H%M', timeStruct) + return ddHHMM From d9fbf2c619ad1a26c3726cf3cdef64eca46589a9 Mon Sep 17 00:00:00 2001 From: Andre LeBlanc Date: Tue, 13 Feb 2024 16:29:22 +0000 Subject: [PATCH 22/30] Fix bugs in code #918 --- sarracenia/flowcb/rename/raw2bulletin.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/sarracenia/flowcb/rename/raw2bulletin.py b/sarracenia/flowcb/rename/raw2bulletin.py index d809fbde6..7b2e33157 100644 --- a/sarracenia/flowcb/rename/raw2bulletin.py +++ b/sarracenia/flowcb/rename/raw2bulletin.py @@ -76,7 +76,7 @@ def after_accept(self,worklist): good_msgs.append(msg) continue - data = self.getData(data, path) + data = self.getData(msg, path) if data == None: worklist.rejected.append(msg) @@ -151,7 +151,7 @@ def getData(self, msg, path): # Read file data from message or from file path directly if message content not found. try: if msg['content']: - data = msg['content'] + data = msg['content']['value'] else: self.binary = 0 @@ -160,14 +160,14 @@ def getData(self, msg, path): # bulletin = Bulletin(data) fp.close() - # Decode data, binary and text. Integrate inputCharset - if data.splitlines()[1][:4] in self.o.binaryInitialCharacters: - self.binary = 1 - - if not self.binary: - data = data.decode(self.o.inputCharset) - else: - data = b64encode(data).decode('ascii') + # Decode data, binary and text. Integrate inputCharset + if data.splitlines()[1][:4] in self.o.binaryInitialCharacters: + self.binary = 1 + + if not self.binary: + data = data.decode(self.o.inputCharset) + else: + data = b64encode(data).decode('ascii') return data From 206194ced1e786440df881b574fa6594ed21e2d2 Mon Sep 17 00:00:00 2001 From: Andre LeBlanc Date: Tue, 13 Feb 2024 19:15:00 +0000 Subject: [PATCH 23/30] More bug fixes, add same default options as gather/am #918 --- sarracenia/flowcb/rename/raw2bulletin.py | 34 +++++++++++++----------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/sarracenia/flowcb/rename/raw2bulletin.py b/sarracenia/flowcb/rename/raw2bulletin.py index 7b2e33157..8e507d112 100644 --- a/sarracenia/flowcb/rename/raw2bulletin.py +++ b/sarracenia/flowcb/rename/raw2bulletin.py @@ -57,6 +57,9 @@ def __init__(self,options) : super().__init__(options,logger) self.seq = 0 # self.o.add_option('headers2rename', 'list', ['CA', 'MA' , 'RA']) + # Need to redeclare these options to have their default values be initialized. + self.o.add_option('inputCharset', 'str', 'utf-8') + self.o.add_option('binaryInitialCharacters', 'list', [b'BUFR' , b'GRIB', b'\211PNG']) # If file was converted, get rid of extensions it had def after_accept(self,worklist): @@ -96,11 +99,11 @@ def after_accept(self,worklist): # if ok == 0: # continue - lines = data.split(b'\n') - #first_line = lines[0].strip(b'\r') - #first_line = first_line.strip(b' ') - #first_line = first_line.strip(b'\t') - first_line = lines[0].split(b' ') + lines = data.split('\n') + #first_line = lines[0].strip('\r') + #first_line = first_line.strip(' ') + #first_line = first_line.strip('\t') + first_line = lines[0].split(' ') ddhhmm = None @@ -150,24 +153,25 @@ def getData(self, msg, path): # Read file data from message or from file path directly if message content not found. try: + + binary = 0 if msg['content']: data = msg['content']['value'] else: - self.binary = 0 fp = open(path, 'rb') data = fp.read() # bulletin = Bulletin(data) fp.close() - # Decode data, binary and text. Integrate inputCharset - if data.splitlines()[1][:4] in self.o.binaryInitialCharacters: - self.binary = 1 - - if not self.binary: - data = data.decode(self.o.inputCharset) - else: - data = b64encode(data).decode('ascii') + # Decode data, binary and text. Integrate inputCharset + if data.splitlines()[1][:4] in self.o.binaryInitialCharacters: + binary = 1 + + if not binary: + data = data.decode(self.o.inputCharset) + else: + data = b64encode(data).decode('ascii') return data @@ -306,7 +310,7 @@ def getTime(self, data): hhmm = hour and mins """ - parts = data.split(b',') + parts = data.split(',') if len(parts) < 4: return None From 33fed286b60c33e5e86298c3c2461a8b491302c3 Mon Sep 17 00:00:00 2001 From: Andre LeBlanc Date: Tue, 13 Feb 2024 19:25:18 +0000 Subject: [PATCH 24/30] Update example #918 --- sarracenia/examples/flow/amserver.conf | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sarracenia/examples/flow/amserver.conf b/sarracenia/examples/flow/amserver.conf index 97e1a8035..b141b4e79 100644 --- a/sarracenia/examples/flow/amserver.conf +++ b/sarracenia/examples/flow/amserver.conf @@ -1,13 +1,19 @@ +# For message content purposes +post_topicPrefix v03.post +post_format v03 callback gather.am callback post.message +# Add appropriate data to filename +callback rename.raw2bulletin + post_broker amqp://tsource@localhost post_exchange xs_tsource_am post_baseUrl file:// download on -directory /tmp/andre_am_receiver +directory /tmp/am_receiver accept .* sum sha512 AllowIPs 127.0.0.1 From 48f1066aa92939f3519a3277cde55fd9f432acc0 Mon Sep 17 00:00:00 2001 From: Andre LeBlanc Date: Tue, 13 Feb 2024 19:25:48 +0000 Subject: [PATCH 25/30] Improve comments in plugin #918 --- sarracenia/flowcb/rename/raw2bulletin.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/sarracenia/flowcb/rename/raw2bulletin.py b/sarracenia/flowcb/rename/raw2bulletin.py index 8e507d112..7105d2950 100644 --- a/sarracenia/flowcb/rename/raw2bulletin.py +++ b/sarracenia/flowcb/rename/raw2bulletin.py @@ -1,38 +1,41 @@ """ Description: sr3 equivalent of the V2 configuration cvt_bulletin_filename_from_content + Add bulletin data (full header, timestamp, station ID, BBB) to incomplete filename Works essentially the same way as its v2 counterpart, except it can get the bulletin file contents 2 ways. 1. By the sr3 message content 2. By opening and reading the path to the file directly. - - Decoding of the data is done in the same way of the encoder in flowcb/gather/am.py - - The plugin essentially captures what was done on the V2 converter and ties it up with Sundew source code logic to make it more generalized. + The plugin captures what was done on the V2 converter and ties it up with Sundew source code logic to make it more generalized. What it can do that the V2 plugin cannot: - Add the station ID in the filename - Add the BBB in the filename - Fetch bulletin data multiple ways + Decoding of the data is done in the same way of the encoder in flowcb/gather/am.py + Examples: RAW Ninjo file (4 letter station ID) WACN07 CWAO 082327 CZEG AIRMET E1 VALID 080105/080505 CWEG- - Output: WACN07_CWAO_082327_CZEG__00001 + Output filename: WACN07_CWAO_082327_CZEG__00001 Another RAW Ninjo file FTCN32 CWAO 100500 AAM (...) - Output: FTCN32_CWAO_100500_AAM__00002 + Output filename: FTCN32_CWAO_100500_AAM__00002 A CACN bulletin missing the correct filename - CACN00 CWAO 141600 - PQU + Input filename: CA__12345 + + Contents: + CACN00 CWAO 141600 + PQU - Output: CACN00_CWAO_141600_PQU__00003 + Output filename: CACN00_CWAO_141600_PQU__00003 Usage: callback rename.raw2bulletin @@ -41,6 +44,7 @@ Andre LeBlanc - First author (2024/02) Improvements: + Delegate some of the generalized methods to a parent class. To be callable by other plugins. Add more Sundew logic if ever some bulletins end up failing when implemented """ From e32f51968858e70c5bb92fbcf84aefb48531249d Mon Sep 17 00:00:00 2001 From: Andre LeBlanc Date: Tue, 13 Feb 2024 19:49:17 +0000 Subject: [PATCH 26/30] Improve comments and cleanup #918 --- sarracenia/flowcb/rename/raw2bulletin.py | 32 ++++++++---------------- 1 file changed, 10 insertions(+), 22 deletions(-) diff --git a/sarracenia/flowcb/rename/raw2bulletin.py b/sarracenia/flowcb/rename/raw2bulletin.py index 7105d2950..72026db8c 100644 --- a/sarracenia/flowcb/rename/raw2bulletin.py +++ b/sarracenia/flowcb/rename/raw2bulletin.py @@ -60,7 +60,6 @@ class Raw2bulletin(FlowCB): def __init__(self,options) : super().__init__(options,logger) self.seq = 0 - # self.o.add_option('headers2rename', 'list', ['CA', 'MA' , 'RA']) # Need to redeclare these options to have their default values be initialized. self.o.add_option('inputCharset', 'str', 'utf-8') self.o.add_option('binaryInitialCharacters', 'list', [b'BUFR' , b'GRIB', b'\211PNG']) @@ -77,7 +76,7 @@ def after_accept(self,worklist): filenameFirstChars = msg['new_file'].split('_')[0] # AM bulletins that need their filename rewritten with data should only have two chars before the first underscore - # This is concordance with Sundew logic -> https://github.com/MetPX/Sundew/blob/main/lib/bulletinAm.py#L70-L71 + # This is in concordance with Sundew logic -> https://github.com/MetPX/Sundew/blob/main/lib/bulletinAm.py#L70-L71 # These messages are still good, so we will add them to the good_msgs list if len(filenameFirstChars) != 2: good_msgs.append(msg) @@ -89,20 +88,6 @@ def after_accept(self,worklist): worklist.rejected.append(msg) continue - ### Alternative to check for bulletins that need their filename rewritten ### - - # ok = 0 - # for header in self.o.headers2rename: - # _len = len(header) - # # Check if first chars of header match the ones we want to rename - # if data.split(b'\n')[0][0:_len] == header: - # ok = 1 - # break - - # # If nothing has matched, skip to the next iteration - # if ok == 0: - # continue - lines = data.split('\n') #first_line = lines[0].strip('\r') #first_line = first_line.strip(' ') @@ -118,20 +103,20 @@ def after_accept(self,worklist): worklist.rejected.append(msg) continue - # Get the station timestamp from the file contents + # Get the station timestamp from bulletin ddhhmm = self.getTime(data) if ddhhmm == None: logger.error("Unable to get julian time. Skipping message") worklist.rejected.append(msg) continue - # Get the BBB + # Get the BBB from bulletin BBB = self.getBBB(first_line) - # Get the station ID + # Get the station ID from bulletin stn_id = self.getStation(data) - # Get sequence (random ints) + # Generate a sequence (random ints) seq = self.getSequence() # Rename file with data fetched @@ -154,6 +139,11 @@ def after_accept(self,worklist): def getData(self, msg, path): + """Get the bulletin data. + We can either get the bulletin data via + 1. Sarracenia message content + 2. Locally downloaded file + """ # Read file data from message or from file path directly if message content not found. try: @@ -196,8 +186,6 @@ def getSequence(self): def getStation(self, data): """Extracted from Sundew code: https://github.com/MetPX/Sundew/blob/main/lib/bulletin.py#L327-L408 Get the station ID from the bulletin contents. - Some station ID's are located on different lines (depends on the bulletin) - Use stn_id_loc to determine which line holds the station ID. Examples: CACN00 CWAO -> Station ID located on second line. FTCN32 CWAO -> Station ID located on first line (with header) From 9da9947e827808cd9dc4fae294481c188832e714 Mon Sep 17 00:00:00 2001 From: Andre LeBlanc Date: Wed, 14 Feb 2024 16:37:21 +0000 Subject: [PATCH 27/30] Add better error handling for erronous bulletins. Write out file with _PROBLEM at suffix --- sarracenia/flowcb/rename/raw2bulletin.py | 92 ++++++++++++++---------- 1 file changed, 55 insertions(+), 37 deletions(-) diff --git a/sarracenia/flowcb/rename/raw2bulletin.py b/sarracenia/flowcb/rename/raw2bulletin.py index 72026db8c..bb356d636 100644 --- a/sarracenia/flowcb/rename/raw2bulletin.py +++ b/sarracenia/flowcb/rename/raw2bulletin.py @@ -51,7 +51,8 @@ from sarracenia.flowcb import FlowCB import logging from base64 import b64encode -import time +import time, datetime +import subprocess logger = logging.getLogger(__name__) @@ -94,8 +95,6 @@ def after_accept(self,worklist): #first_line = first_line.strip('\t') first_line = lines[0].split(' ') - ddhhmm = None - # Build header from bulletin header = self.buildHeader(first_line) if header == None: @@ -106,9 +105,7 @@ def after_accept(self,worklist): # Get the station timestamp from bulletin ddhhmm = self.getTime(data) if ddhhmm == None: - logger.error("Unable to get julian time. Skipping message") - worklist.rejected.append(msg) - continue + logger.error("Unable to get julian time.") # Get the BBB from bulletin BBB = self.getBBB(first_line) @@ -121,7 +118,25 @@ def after_accept(self,worklist): # Rename file with data fetched try: - new_file = header + "_" + ddhhmm + "_" + BBB + "_" + stn_id + "_" + seq + # We can't disseminate bulletins downstream if they're missing the timestamp, but we want to keep the bulletins to troubleshoot source problems + # We'll append "_PROBLEM" to the filename to be able to identify erronous bulletins + if ddhhmm == None: + timehandler = datetime.datetime.now() + + # Add current time as new timestamp to filename + new_file = header + "_" + timehandler.strftime('%d%H%M') + "_" + BBB + "_" + stn_id + "_" + seq + "_PROBLEM" + + # Write the file manually as the messages don't get posted downstream. + # The message won't also get downloaded further downstream + msg['new_file'] = new_file + new_path = msg['new_dir'] + '/' + msg['new_file'] + + with open(new_path, 'w') as f: f.write(data) + + logger.error(f"New filename (for problem file): {new_file}") + raise Exception + else: + new_file = header + "_" + ddhhmm + "_" + BBB + "_" + stn_id + "_" + seq msg['new_file'] = new_file new_path = msg['new_dir'] + '/' + msg['new_file'] @@ -131,7 +146,7 @@ def after_accept(self,worklist): good_msgs.append(msg) except Exception as e: - logger.error(f"Unable to rename filename. Error message: {e}") + logger.error(f"Error in renaming. Error message: {e}") worklist.rejected.append(msg) continue @@ -302,41 +317,44 @@ def getTime(self, data): hhmm = hour and mins """ - parts = data.split(',') + try: + parts = data.split(',') - if len(parts) < 4: return None + if len(parts) < 4: return None - year = parts[1] - jul = parts[2] - hhmm = parts[3] + year = parts[1] + jul = parts[2] + hhmm = parts[3] - # passe-passe pour le jour julien en float parfois ? - f = float(jul) - i = int(f) - jul = '%s' % i - # fin de la passe-passe + # passe-passe pour le jour julien en float parfois ? + f = float(jul) + i = int(f) + jul = '%s' % i + # fin de la passe-passe - # strange 0 filler + # strange 0 filler - while len(hhmm) < 4: - hhmm = '0' + hhmm - while len(jul) < 3: - jul = '0' + jul + while len(hhmm) < 4: + hhmm = '0' + hhmm + while len(jul) < 3: + jul = '0' + jul - # problematic 2400 for 00z + # problematic 2400 for 00z - if hhmm != '2400': - emissionStr = year + jul + hhmm - timeStruct = time.strptime(emissionStr, '%Y%j%H%M') - ddHHMM = time.strftime("%d%H%M", timeStruct) - return ddHHMM + if hhmm != '2400': + emissionStr = year + jul + hhmm + timeStruct = time.strptime(emissionStr, '%Y%j%H%M') + ddHHMM = time.strftime("%d%H%M", timeStruct) + return ddHHMM - # sometime hhmm is 2400, to avoid exception - # set time to 00, increase by 24 hr + # sometime hhmm is 2400, to avoid exception + # set time to 00, increase by 24 hr - jul00 = year + jul + '0000' - timeStruct = time.strptime(jul00, '%Y%j%H%M') - ep_emission = time.mktime(timeStruct) + 24 * 60 * 60 - timeStruct = time.localtime(self.ep_emission) - ddHHMM = time.strftime('%d%H%M', timeStruct) - return ddHHMM + jul00 = year + jul + '0000' + timeStruct = time.strptime(jul00, '%Y%j%H%M') + ep_emission = time.mktime(timeStruct) + 24 * 60 * 60 + timeStruct = time.localtime(self.ep_emission) + ddHHMM = time.strftime('%d%H%M', timeStruct) + return ddHHMM + except Exception as e: + return None From f3f6376e63516831e2eb732b0813acc6c5712e22 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Wed, 14 Feb 2024 19:08:54 +0000 Subject: [PATCH 28/30] updating changelog --- debian/changelog | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/debian/changelog b/debian/changelog index 283b7b861..c47cb7de6 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,10 +1,11 @@ metpx-sr3 (3.00.52rc1) UNRELEASED; urgency=medium + * fix #907 declare exchange in admin.conf wasn't working. * fix #912 enable v2 style retry processing with new --retry_refilter flag. + * fix #918 plugin for AM to name files with incomplete headers properly. * fix #920 fix inflight mtime not working in watch. * fix #920 replaced nodupe_fileAge(Min and Max) with fileAge(Min and Max) * fix #920 inflight mtime merged with fileMinAge - * fix #907 declare exchange in admin.conf wasn't working. * fix #922 sr3 showing wrong exchange on status/show. -- peter Fri, 02 Feb 2024 09:00:18 -0500 From 745613d99c7ecaf9f9c3199712723d3793df55f3 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Wed, 14 Feb 2024 19:14:35 +0000 Subject: [PATCH 29/30] there was a fix without an issue... add that --- debian/changelog | 2 ++ 1 file changed, 2 insertions(+) diff --git a/debian/changelog b/debian/changelog index c47cb7de6..e7ad1281a 100644 --- a/debian/changelog +++ b/debian/changelog @@ -7,6 +7,8 @@ metpx-sr3 (3.00.52rc1) UNRELEASED; urgency=medium * fix #920 replaced nodupe_fileAge(Min and Max) with fileAge(Min and Max) * fix #920 inflight mtime merged with fileMinAge * fix #922 sr3 showing wrong exchange on status/show. + * processing of rename field in messages suppressed when download=False + * (required for common processing of messages.) -- peter Fri, 02 Feb 2024 09:00:18 -0500 From 96c62d89a066fca517eb390ff05d2dd4da2c4848 Mon Sep 17 00:00:00 2001 From: Peter Silva Date: Wed, 14 Feb 2024 19:37:19 +0000 Subject: [PATCH 30/30] more changelog updates for release --- debian/changelog | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/debian/changelog b/debian/changelog index e7ad1281a..00edfcaf0 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,5 +1,6 @@ -metpx-sr3 (3.00.52rc1) UNRELEASED; urgency=medium +metpx-sr3 (3.00.52rc1) unstable; urgency=medium + * nodupe_fileAgeX (X=Min or Max) name change to fileAgeX replacing inflight. * fix #907 declare exchange in admin.conf wasn't working. * fix #912 enable v2 style retry processing with new --retry_refilter flag. * fix #918 plugin for AM to name files with incomplete headers properly. @@ -10,7 +11,7 @@ metpx-sr3 (3.00.52rc1) UNRELEASED; urgency=medium * processing of rename field in messages suppressed when download=False * (required for common processing of messages.) - -- peter Fri, 02 Feb 2024 09:00:18 -0500 + -- peter Wed, 14 Feb 2024 09:00:18 -0500 metpx-sr3 (3.00.51) unstable; urgency=medium