diff --git a/debian/changelog b/debian/changelog index 14ffe646e..00edfcaf0 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,18 @@ +metpx-sr3 (3.00.52rc1) unstable; urgency=medium + + * nodupe_fileAgeX (X=Min or Max) name change to fileAgeX replacing inflight. + * fix #907 declare exchange in admin.conf wasn't working. + * fix #912 enable v2 style retry processing with new --retry_refilter flag. + * fix #918 plugin for AM to name files with incomplete headers properly. + * fix #920 fix inflight mtime not working in watch. + * fix #920 replaced nodupe_fileAge(Min and Max) with fileAge(Min and Max) + * fix #920 inflight mtime merged with fileMinAge + * fix #922 sr3 showing wrong exchange on status/show. + * processing of rename field in messages suppressed when download=False + * (required for common processing of messages.) + + -- peter Wed, 14 Feb 2024 09:00:18 -0500 + metpx-sr3 (3.00.51) unstable; urgency=medium * PR #910 detect v02 messages without content_type header. diff --git a/docs/source/Explanation/CommandLineGuide.rst b/docs/source/Explanation/CommandLineGuide.rst index 727812c36..a1fdde4e4 100644 --- a/docs/source/Explanation/CommandLineGuide.rst +++ b/docs/source/Explanation/CommandLineGuide.rst @@ -333,7 +333,7 @@ View all configuration settings (the result of all parsing... what the flow comp 'message_ttl': 0, 'mirror': True, 'msg_total_interval': '0', - 'nodupe_fileAgeMax': 0, + 'fileAgeMax': 0, 'nodupe_ttl': 0, 'overwrite': True, 'permCopy': True, @@ -954,8 +954,10 @@ These options set what files the user wants to be notified for and where - **accept [rename=] (must be set)** - **reject (optional)** - **permDefault (default: 0o400)** -- **nodupe_fileAgeMax (default 30d)** +- **fileAgeMax (default 30d)** +nodupe_fileAgeMax should be less than nodupe_ttl when using duplicate suppression, +to avoid re-ingesting of files that have aged out of the nodupe cache. The option *filename* can be used to set a global rename to the products. Ex.: @@ -1019,12 +1021,12 @@ other nodes participating in the poll, when they don't have the vip, will subscribe to the output of the poll to keep their duplicate suppression caches current. -files that are more than nodupe_fileAgeMax are ignored. However, this +files that are more than fileAgeMax are ignored. However, this can be modified to any specified time limit in the configurations by using -the option *nodupe_fileAgeMax *. By default in components +the option *fileAgeMax *. By default in components other than poll, it is disabled by being set to zero (0). As it is a duration option, units are in seconds by default, but minutes, hours, -days, and weeks, are available. In the poll component, nodupe_fileAgeMax +days, and weeks, are available. In the poll component, fileAgeMax defaults to 30 days. Advanced Polling @@ -1116,7 +1118,7 @@ The notification protocol is defined here `sr_post(7) <../Reference/sr_post.7.ht **poll** connects to a *broker*. Every *sleep* seconds, it connects to a *pollUrl* (sftp, ftp, ftps). For each of the *directory* defined, it lists the contents. Polling is only intended to be used for recently modified -files. The *nodupe_fileAgeMax* option eliminates files that are too old +files. The *fileAgeMax* option eliminates files that are too old from consideration. When a file is found that matches a pattern given by *accept*, **poll** builds a notification message for that product. diff --git a/docs/source/Explanation/DetectFileReady.rst b/docs/source/Explanation/DetectFileReady.rst index 0a401d074..a17e3eec3 100644 --- a/docs/source/Explanation/DetectFileReady.rst +++ b/docs/source/Explanation/DetectFileReady.rst @@ -88,12 +88,12 @@ File Detection Strategy Table |sr_watch with| | | |inflight |Minimum age (modification time) |Last choice, guarantees delay only if | |number |of the file before it is considered |no other method works. | -|(mtime) |complete. | | +|(mtime) |complete. (aka: fileAgeMin) | | | | |Receiving from uncooperative | |Alternate | - Adds delay in every transfer. |sources. | |setting | - Vulnerable to network failures. | | | | - Vulnerable to clock skew. |(ok choice with PDS) | -|nodupe\_\ | | | +| | | | |fileAgeMin | |If a process is re-writing a file | | | |often, can use mtime to smooth out | | | |the i/o pattern, by slowing posts. | diff --git a/docs/source/Explanation/DuplicateSuppression.rst b/docs/source/Explanation/DuplicateSuppression.rst index a74d4ebe1..d123b66bf 100644 --- a/docs/source/Explanation/DuplicateSuppression.rst +++ b/docs/source/Explanation/DuplicateSuppression.rst @@ -170,13 +170,18 @@ Files That are Too Old in the configuration file:: - nodupe_fileAgeMax 600 + fileAgeMax 600 Files which are older than 600 seconds (10 minutes) will not be considerred for transfer. This is usually used with polls that have very long lasting directories on a remote server. example: a remote server has a permanent database of remote files. +It is often the case that nodupe_ttl should be greater than nodupe_fileAgeMax to prevent +files from aging out of the cache before they are considered "too old" and then being +(erroneously) re-ingested. A warning message is emitted if this is the case in a poll +on startup. + Roll Your Own ------------- diff --git a/docs/source/Explanation/FileCompletion.rst b/docs/source/Explanation/FileCompletion.rst index 2dcb551e6..6b08e4c62 100644 --- a/docs/source/Explanation/FileCompletion.rst +++ b/docs/source/Explanation/FileCompletion.rst @@ -49,7 +49,7 @@ Inflight Table | number |of the file before it is considered |guaranteed delay added | | (mtime) |complete. | | | | |Receiving from uncooperative | -| |Adds delay in every transfer. |sources. | +| fileAgeMin |Adds delay in every transfer. |sources. | | |Vulnerable to network failures. | | | |Vulnerable to clock skew. |(ok choice with PDS) | +-------------+---------------------------------------+--------------------------------------+ @@ -62,7 +62,7 @@ NOTES: On versions of sr_sender prior to 2.18, the default was NONE, but was documented as '.tmp' To ensure compatibility with later versions, it is likely better to explicitly write - the *inflight* setting. + the *inflight* setting. The numeric variant is the same as setting *fileAgeMin* *inflight* was renamed from the old *lock* option in January 2017. For compatibility with older versions, can use *lock*, but name is deprecated. diff --git a/docs/source/How2Guides/UPGRADING.rst b/docs/source/How2Guides/UPGRADING.rst index a4311b7e3..3611a1696 100644 --- a/docs/source/How2Guides/UPGRADING.rst +++ b/docs/source/How2Guides/UPGRADING.rst @@ -39,6 +39,23 @@ Installation Instructions git --- +3.0.52 +------ + +*CHANGE*: Additional messageCountMax arugment to flowcb.gather() entry point. +when implementing flow callbacks for scheduled flows, or poll overrides, the +gather entry point now takes one additional argument indicating the maximum +number of messages that the routine should return. + +To be compatible with previous versions, one can establish a default value +on the gather:: + + def gather(self, messageMaxCount=None): + +With the default value, plugins are downward compatible. (earlier versions +will call with only self as an argument.) + + 3.0.51 ------- @@ -309,7 +326,7 @@ V2 to Sr3 all participants in a vip update ls_files nodes subscribe to the output exchange poll builds strings to describe files poll builds stat(2) like paramiko.SftpAttributes() participants rely on their ls_files for state poll uses flowcb.nodupe module like rest of sr3 - file_time_limit to ignore older files nodupe_fileAgeMax + file_time_limit to ignore older files fileAgeMax *destination* gives where to poll *pollUrl* *directory* gives remote directory to list *path* used like in *post* and *watch* need *accept* per *directory* need only one *accept* @@ -367,7 +384,7 @@ V2 to Sr3 e fileEvents events fileEvents exchange_split exchangeSplit - file_time_limit nodupe_fileAgeMax + file_time_limit fileAgeMax hb_memory_baseline_file MemoryBaseLineFile hb_memory_max MemoryMax hb_memory_multiplier MemoryMultiplier diff --git a/docs/source/How2Guides/subscriber.rst b/docs/source/How2Guides/subscriber.rst index 8170a3d8a..1b4f444f0 100644 --- a/docs/source/How2Guides/subscriber.rst +++ b/docs/source/How2Guides/subscriber.rst @@ -30,10 +30,10 @@ The usual intent is to automatically download the data wanted to a directory on a subscriber machine where other software can process it. Please note: -- the tool is entirely command line driven (there is no GUI) More accurately, it is mostly configuration file driven. - most of the *interface* involves using a text editor to modify configuration files. -- while written to be compatible with other environments, the focus is on Linux usage. -- the tool can be used as either an end-user tool, or a system-wide transfer engine. +- The tool is entirely command line driven (there is no GUI) More accurately, it is mostly configuration file driven. + Most of the *interface* involves using a text editor to modify configuration files. +- While written to be compatible with other environments, the focus is on Linux usage. +- The tool can be used as either an end-user tool, or a system-wide transfer engine. This guide is focused on the end-user case. - All documentation of the package is available at https://metpx.github.io/sarracenia diff --git a/docs/source/Reference/sr3_options.7.rst b/docs/source/Reference/sr3_options.7.rst index 0520c2cd4..3ef81e96e 100644 --- a/docs/source/Reference/sr3_options.7.rst +++ b/docs/source/Reference/sr3_options.7.rst @@ -503,6 +503,12 @@ Once connected to an AMQP broker, the user needs to bind a queue to exchanges and topics to determine the notification messages of interest. +bufsize (default: 1MB) +----------------------------- + +Files will be copied in *bufsize*-byte blocks. for use by transfer protocols. + + byteRateMax (default: 0) -------------------------------- @@ -906,6 +912,7 @@ can also be specified as a time interval, for example, 10 for 10 seconds. When set to a time interval, file posting process ensures that it waits until the file has not been modified in that interval. So a file will not be processed until it has stayed the same for at least 10 seconds. +This is the same as setting the **fileAgeMin** setting. Lastly, **inflight** can be set to *NONE*, which case the file is written directly with the final name, where the recipient will wait to receive a post notifying it @@ -941,7 +948,17 @@ will not be posted. inlineByteMax -------------------- -the maximums size of messages to inline. +The maximum size of messages to inline. + +inlineEncoding text|binary|guess (default: guess) +_________________________________________________ + +when inlining file content, what sort of encoding should be done? Three choices: + + * text: the file content is assumed to be utf-8 text and encoded as such. + * binary: the file content is unconditionally converted to base64 binary encoding. + * guess: try making text, if that fails fall back to binary. + inlineOnly ---------- @@ -1153,14 +1170,18 @@ or: More information: `Duplicate Suppresion <../Explanation/DuplicateSuppression.html>`_ -nodupe_fileAgeMax ------------------ +fileAgeMax +---------- -If files are older than this setting (default: 30d in poll, 0 in other components), +If files are older than this setting (default: 7h in poll, 0 in other components), then ignore them, they are too old to post. 0 deactivates the setting. -nodupe_fileAgeMin ------------------ +In a Poll: + * default is 7 hours. should be less than nodupe_ttl to prevent re-ingest of duplicate data. + (full discussion here: https://github.com/MetPX/sarracenia/issues/904 ) + +fileAgeMin +---------- If files are newer than this setting (default: 0), then ignore them, they are too new to post. 0 deactivates the setting. @@ -1182,6 +1203,9 @@ dropped, so duplicates separated by a large enough interval will get through. A value of 1d (day) or 1w (week) can be appropriate. Setting the option without specifying a time will result in 300 seconds (or 5 minutes) being the expiry interval. +Default value in a Poll is 8 hours, should be longer than nodupe_fileAgeMax to prevent +re-ingesting files that have aged out of the duplicate suppression cache. + **Use of the cache is incompatible with the default *parts 0* strategy**, one must specify an alternate strategy. One must use either a fixed blocksize, or always never partition files. One must avoid the dynamic algorithm that will change the partition size used as a file grows. @@ -1528,6 +1552,19 @@ retryEmptyBeforeExit: (default: False) Used for sr_insects flow tests. Prevents Sarracenia from exiting while there are messages remaining in the retry queue(s). By default, a post will cleanly exit once it has created and attempted to publish messages for all files in the specified directory. If any messages are not successfully published, they will be saved to disk to retry later. If a post is only run once, as in the flow tests, these messages will never be retried unless retryEmptyBeforeExit is set to True. +retry_refilter (default: False) +----------------------------------------- + +The **retry_refilter** option alters how messages are reloaded when they are retrieved from +a retry queue. The default way (value: False) is to repeat the transfer using exactly +the same message as before. If **retry_refilter** is set (value: True) then all the +message's calculated fields will be discarded, and the processing re-started from the gather +phase (accept/reject processing will be repeated, destinations re-calculated.) + +The normal retry behaviour is use when the remote has had a failure, and need to +re-send later, while the retry_refilter option is used when recovering from configuration +file errors, and some messages had incorrect selection or destination criteria. + retry_ttl (default: same as expire) ---------------------------------------------- @@ -1759,6 +1796,14 @@ The **timeout** option, sets the number of seconds to wait before aborting a connection or download transfer (applied per buffer during transfer). +timezone (default: UTC) +-------------------------------- + +Interpret listings from an FTP server as being in the given timezone as per `pytz `_ +Examples: Canada/Pacific, Pacific/Nauru, Canada/Eastern, Europe/Paris +Has no effect other than in when polling an FTP server. + + tlsRigour (default: medium) --------------------------- diff --git a/docs/source/Reference/sr_post.7.rst b/docs/source/Reference/sr_post.7.rst index acf41a2c1..17f70c52f 100644 --- a/docs/source/Reference/sr_post.7.rst +++ b/docs/source/Reference/sr_post.7.rst @@ -129,9 +129,12 @@ The headers are an array of name:value pairs:: "content" - for smaller files, the content may be embedded. { - "encoding" : "utf-8" | "base64" , + "encoding" : "utf-8" | "base64" | "iso-8859-1" , "value" " "encoded file content" - } + } + Note that the iso-8859-1 encoding is only an allowance for legacy data flows. + Should normally not be used. + "contentType" : "string" - MIME-type information referring to the data. For "v03.report" topic notification messages the following addtional diff --git a/docs/source/fr/CommentFaire/MiseANiveau.rst b/docs/source/fr/CommentFaire/MiseANiveau.rst index e9ce4ba79..48ee92d4c 100644 --- a/docs/source/fr/CommentFaire/MiseANiveau.rst +++ b/docs/source/fr/CommentFaire/MiseANiveau.rst @@ -38,6 +38,22 @@ Instructions d’installation git --- +3.0.52 +------ + +*CHANGEMENT* : argument messageCountMax supplémentaire au point d'entrée flowcb.gather(). +lors de la mise en œuvre de rappels de flow pour les flux cédulés ou de remplacements d'poll, le +le point d'entrée de *gather* prend désormais un argument supplémentaire indiquant le maximum +nombre de messages que la routine doit retourner. + +Pour être compatible avec les versions précédentes, on peut établir une valeur par défaut +sur le rassemblement :: + + def gather(self, messageMaxCount=None) : + +Avec la valeur par défaut, les plugins sont compatible avec les version précédentes. + + 3.0.51 ------ @@ -295,7 +311,7 @@ V2 to Sr3 e fileEvents events fileEvents exchange_split exchangeSplit - file_time_limit nodupe_fileAgeMax + file_time_limit fileAgeMax hb_memory_baseline_file MemoryBaseLineFile hb_memory_max MemoryMax hb_memory_multiplier MemoryMultiplier diff --git a/docs/source/fr/Explication/AssurerLaLivraison.rst b/docs/source/fr/Explication/AssurerLaLivraison.rst index b0520e7f6..122ce8502 100644 --- a/docs/source/fr/Explication/AssurerLaLivraison.rst +++ b/docs/source/fr/Explication/AssurerLaLivraison.rst @@ -49,9 +49,9 @@ Tableau de Inflight +-------------+---------------------------------------+--------------------------------------+ | |Âge minimum (temps de modification) |Dernier choix, ne garantit un délai | | entier |du fichier avant que le transfer soit |que si aucun autre moyen peut servir | -| (mtime) |considéré Complèté. | | +| (mtime) |considéré Complèté. (fileAgeMin pareil)| | | | |Réception de ceux qui ne coopèrent pas| -| |Retard tous les avis | | +| fileAgeMin |Retarde tous les avis | | | |Vulnérable aux pannes de réseau. | (choix acceptable pour PDS) | | |Vulnérable aux horloges en désaccord | | +-------------+---------------------------------------+--------------------------------------+ diff --git a/docs/source/fr/Explication/GuideLigneDeCommande.rst b/docs/source/fr/Explication/GuideLigneDeCommande.rst index 7121902b5..683a35e60 100644 --- a/docs/source/fr/Explication/GuideLigneDeCommande.rst +++ b/docs/source/fr/Explication/GuideLigneDeCommande.rst @@ -334,7 +334,7 @@ Afficher tous les paramètres de configuration (le résultat de toutes les analy 'message_ttl': 0, 'mirror': True, 'msg_total_interval': '0', - 'nodupe_fileAgeMax': 0, + 'fileAgeMax': 0, 'nodupe_ttl': 0, 'overwrite': True, 'permCopy': True, @@ -960,8 +960,10 @@ Ces options définissent les fichiers pour lesquels l’utilisateur souhaite êt - **accept [rename=] (doit être défini)** - **reject (facultatif)** - **permDefault (par défaut: 0o400)** -- **nodupe_fileAgeMax (par défaut 30d)** +- **fileAgeMax (par défaut 30d)** +fileAgeMax doit être inférieur à nodupe_ttl lors de l'utilisation de la suppression des doublons, +pour éviter la réingestion de fichiers obsolètes une fois partie du cache nodupe. L’option *filename* peut être utilisée pour définir un changement de nom global pour les produits. Ex.: @@ -1024,12 +1026,12 @@ Comme pour tous les autres composants, l’option **vip** peut être utilisée p qu’un poll doit être actif sur seulement un seul nœud d’un cluster. Notez que quand d’autres nœuds participant au poll et wu’ils n’ont pas le vip, ils -les fichiers qui sont plus vieux que nodupe_fileAgeMax sont ignorés. Cela +les fichiers qui sont plus vieux que fileAgeMax sont ignorés. Cela peut être modifié à n’importe quelle limite de temps spécifiée dans les configurations en utilisant -l’option *nodupe_fileAgeMax *. Par défaut, dans les composants +l’option *fileAgeMax *. Par défaut, dans les composants autre que poll, cette option est désactivé en étant défini à zéro (0). Comme il s’agit d’une option de durée, les unités sont en secondes par défaut, mais il est possible de definir l'option -en utilisant des minutes, heures, jours ou des semaines. Dans la composante de poll, nodupe_fileAgeMax +en utilisant des minutes, heures, jours ou des semaines. Dans la composante de poll, fileAgeMax est défini à 30 jours par défaut. Sondage avancé (Advanced Polling) @@ -1118,7 +1120,7 @@ Le protocle de notification est défini ici `sr3_post(7) <../Reference/sr3_post. **poll** se connecte à un *broker*. À toutes les secondes de *sleep*, il se connecte à une *pollUrl* (sftp, ftp, ftps). Pour chacun des *directory* définis, les contenus sont listés. Le poll est seulement destinée à être utilisée pour les fichiers récemment modifiés. -L’option *nodupe_fileAgeMax* élimine les fichiers trop anciens. Lorsqu’un fichier correspondant +L’option *fileAgeMax* élimine les fichiers trop anciens. Lorsqu’un fichier correspondant à un modèle donné est trouvé by *accept*, **poll** crée un message de notification pour ce produit. Le message est ensuite verifié dans la cache dupliqué (limité en temps par l'option diff --git a/docs/source/fr/Explication/SupprimerLesDoublons.rst b/docs/source/fr/Explication/SupprimerLesDoublons.rst index 62866a4d8..316fb55bd 100644 --- a/docs/source/fr/Explication/SupprimerLesDoublons.rst +++ b/docs/source/fr/Explication/SupprimerLesDoublons.rst @@ -172,7 +172,7 @@ Fichiers trop vieux dans les fichiers de configuration:: - nodupe_fileAgeMax 600 + fileAgeMax 600 Les messages notificationsfichiers pour des fichiers qui sont agés de plus que 600 secondes (10 minutes) seront supprimés. @@ -181,6 +181,11 @@ Ceci est généralement utilisé avec des sondages (poll) qui ont des répertoir Exemple : un serveur distant dispose d'une base de données permanente de fichiers distants. ca ne sert à rien de reexaminer de fichiers vieux de deux ans. +Il arrive souvent que nodupe_ttl soit supérieur à nodupe_fileAgeMax pour éviter +les fichiers soient oubliés par la cache avant d'être considérés comme "trop vieux" et ensuite d'être +(à tort) ré-ingéré. Un message d'avertissement est émis si c'est le cas dans un sondage +au démarrage. + A votre gout! diff --git a/docs/source/fr/Reference/sr3_options.7.rst b/docs/source/fr/Reference/sr3_options.7.rst index e5ae1dd68..479c170c3 100644 --- a/docs/source/fr/Reference/sr3_options.7.rst +++ b/docs/source/fr/Reference/sr3_options.7.rst @@ -491,6 +491,10 @@ L’option broker indique à chaque composant quel courtier contacter. Une fois connecté à un courtier AMQP, l’utilisateur doit lier une fil d’attente aux échanges et aux thèmes pour déterminer le messages d'annonce en question. +bufsize (défaut: 1m) +--------------------------- + +Les fichiers seront copiés en tranches de *bufsize* octets. Utilisé par les protocoles de transfert. byteRateMax (défaut: 0) ------------------------------ @@ -890,6 +894,7 @@ peut également être spécifié comme une intervalle de temps, par exemple, 10 Lorsque l'option est défini sur une intervalle de temps, le processus de publication de fichiers attends jusqu’à ce que le fichier n’ai pas été modifié pendant cet intervalle. Ainsi, un fichier ne peux pas être traité tant qu’il n’est pas resté le même pendant au moins 10 secondes. +C'est un effet pareil à un choix de valeur pou *fileAgeMin*. Enfin, **inflight** peut être réglé a *NONE*. Dans ce cas, le fichier est écrit directement avec le nom final, où le destinataire attendra de recevoir un poste pour notifier l’arrivée du fichier. @@ -926,6 +931,18 @@ inlineByteMax la taille maximale des fichiers dont le contenu est à inclure dans un messages d'annonce (envoyé inline.) + + +inlineEncoding text|binary|guess (défaut: guess) +_________________________________________________ + +Quand on inclut les données dans le message on l'encode dans un format choisi: + + * text: le fichier doit être du utf-8 valide (ou érreur.) + * binary: le fichier peut être encodé n'importe comment, il sera en format base64. + * guess: essaie le format text en premier, utilise binary en cas d'erreur. + + inlineOnly ---------- ignorer les messages d´annonce si les données ne sont pas inline. @@ -1130,14 +1147,18 @@ ou: Pour plus d´information: `Supprimer les doublons <../Explication/SupprimerLesDoublons.html>`_ -nodupe_fileAgeMax ------------------ +fileAgeMax +---------- Si les fichiers sont plus anciens que ce paramètre (défaut: 30d), ignorez-les, ils sont trop ancien pour qu'il puisse être posté. -nodupe_fileAgeMin ------------------ +Dans un Poll : + * La valeur par défaut est 7 heures. doit être inférieur à nodupe_ttl pour empêcher la réabsorption de données en double. + (discussion complète ici (en anglais): https://github.com/MetPX/sarracenia/issues/904) + +fileAgeMin +---------- Si les fichiers sont plus neuf que ce paramètre (défaut: 0 ... désactivé), ignorez-les, ils sont trop neufs pour qu'ils puissent être postés. @@ -1503,6 +1524,21 @@ publiés avec succès, ils seront enregistrés sur le disque pour réessayer ult exécutée qu’une seule fois, comme dans les tests de flux, ces messages d'annonce ne seront jamais réessayés, sauf si retryEmptyBeforeExit est défini à True. +retry_refilter (par défaut : False) +--------------------------------------------- + +L'option **retry_refilter** modifie la façon dont les messages sont rechargés lorsqu'ils sont récupérés à partir +d'une file d'attente pour une nouvelle tentive de transfer (retry). La méthode par défaut (valeur : False) +consiste à répéter le transfert en utilisant exactement le même message que précédemment. Si **retry_refilter** +est défini (valeur : True), alors tous les Les champs calculés du message + seront supprimés et le +traitement redémarrera à partir de la phase *gather* (le traitement d'acceptation/rejet sera +répété, les destinations recalculées.) + +Le comportement normal de nouvelle tentative (retry) est utilisé lorsque la destination a subit une +panne et doit renvoyer plus tard, tandis que l'option **retry_refilter** est utilisée lors de la récupération +de la configuration + + retry_ttl (défaut: identique à expire) ------------------------------------------------- @@ -1723,6 +1759,15 @@ timeout (défaut: 0) L’option **timeout** définit le nombre de secondes à attendre avant d’interrompre un transfert de connexion ou de téléchargement (appliqué pendant le transfert). +timezone (défaut: UTC) +-------------------------------- + +Établir le fuseau horaire pour les dates afficher pour les fichiers sur un serveur FTP. +La valeur est tel que décrit timezone as per `pytz `_ +exemples: Canada/Pacific, Pacific/Nauru, Europe/Paris +Seulement actif dans le contexte de sondage de serveur FTP. + + tlsRigour (défaut: medium) -------------------------- diff --git a/docs/source/fr/Reference/sr_post.7.rst b/docs/source/fr/Reference/sr_post.7.rst index 390d4209e..9bbe52ffc 100644 --- a/docs/source/fr/Reference/sr_post.7.rst +++ b/docs/source/fr/Reference/sr_post.7.rst @@ -94,7 +94,6 @@ Les en-têtes sont un tableau de paires nom:valeur:: "type": "Feature" "geometry": RFC 7946 (geoJSON) spécification géographique compatible. - "size" - le nombre d’octets annoncés. "blocks" - si le fichier publié est partitionné, alors : { @@ -117,9 +116,12 @@ Les en-têtes sont un tableau de paires nom:valeur:: "content" - pour les fichiers plus petits, le contenu peut être incorporé. { - "encoding" : "utf-8" | "base64" , + "encoding" : "utf-8" | "base64" | "iso-8859-1" , "value" " "contenu de fichier encodé" - } + } + Noter que iso-8859-1 est là pour des raisons de transmission inaltéré de formats obsolètes. + + "contentType" : "chaine mime-type" - indique le format des données. Pour le messages de thème "v03.report", les en-têtes additionnelles qui suivent seront présents: @@ -128,9 +130,8 @@ Les en-têtes sont un tableau de paires nom:valeur:: "message" : - message de rapport d’état documenté dans `Report Messages`_ } - "contentType" : "chaine mime-type" - indique le format des données. - "type": "Feature" - utilisé pour la compatibilité geoJSON. - "geometry" : ... selon la compatibilité GoJSON RFC7946. + Autres champs optionnels: + des paires supplémentaires nom:valeur définies par l’utilisateur sont autorisées. diff --git a/docs/source/fr/Tutoriel/1_CLI_introduction.ipynb b/docs/source/fr/Tutoriel/1_CLI_introduction.ipynb index c000d4665..dfe11e6df 100644 --- a/docs/source/fr/Tutoriel/1_CLI_introduction.ipynb +++ b/docs/source/fr/Tutoriel/1_CLI_introduction.ipynb @@ -214,7 +214,7 @@ "messageAgeMax=0, messageCountMax=10, messageDebugDump=False, messageRateMax=0,\n", "messageRateMin=0,\n", "message_strategy=\"...ubborn': True, 'failure_duration': '5m'}\", message_ttl=0,\n", - "mirror=False, no=0, nodupe_fileAgeMax=0, nodupe_ttl=0, overwrite=True,\n", + "mirror=False, no=0, fileAgeMax=0, nodupe_ttl=0, overwrite=True,\n", "permCopy=True, permDefault=0, permDirDefault=509, permLog=384,\n", "pid_filename=\"...e/hpfx_amis//subscribe_hpfx_amis_00.pid'\", plugins_early=[],\n", "plugins_late=['sarracenia.flowcb.log.Log'], post_baseDir=None,\n", diff --git a/sarracenia/_version.py b/sarracenia/_version.py index 068fe814b..9335bd111 100755 --- a/sarracenia/_version.py +++ b/sarracenia/_version.py @@ -1 +1 @@ -__version__ = "3.00.51" +__version__ = "3.00.52rc1" diff --git a/sarracenia/config.py b/sarracenia/config.py index b26973392..26e0078a7 100755 --- a/sarracenia/config.py +++ b/sarracenia/config.py @@ -113,6 +113,7 @@ def __repr__(self) -> str: 'recursive' : True, 'report': False, 'retryEmptyBeforeExit': False, + 'retry_refilter': False, 'sanity_log_dead': 9999, 'sourceFromExchange': False, 'sundew_compat_regex_first_match_is_zero': False, @@ -134,15 +135,15 @@ def __repr__(self) -> str: 'follow_symlinks', 'force_polling', 'inline', 'inlineOnly', 'inplace', 'logMetrics', 'logStdout', 'logReject', 'restore', \ 'messageDebugDump', 'mirror', 'timeCopy', 'notify_only', 'overwrite', 'post_on_start', \ 'permCopy', 'queueBind', 'queueDeclare', 'randomize', 'recursive', 'realpathPost', \ - 'reconnect', 'report', 'reset', 'retryEmptyBeforeExit', 'save', 'sundew_compat_regex_first_match_is_zero', \ + 'reconnect', 'report', 'reset', 'retry_refilter', 'retryEmptyBeforeExit', 'save', 'sundew_compat_regex_first_match_is_zero', \ 'sourceFromExchange', 'statehost', 'users', 'v2compatRenameDoublePost' ] float_options = [ ] duration_options = [ - 'expire', 'housekeeping', 'logRotateInterval', 'message_ttl', 'nodupe_fileAgeMax', 'retry_ttl', - 'sanity_log_dead', 'sleep', 'timeout', 'varTimeOffset' + 'expire', 'housekeeping', 'logRotateInterval', 'message_ttl', 'fileAgeMax', 'fileAgeMin', \ + 'retry_ttl', 'sanity_log_dead', 'sleep', 'timeout', 'varTimeOffset' ] list_options = [ 'path', 'vip' ] @@ -163,7 +164,8 @@ def __repr__(self) -> str: str_options = [ 'action', 'admin', 'baseDir', 'broker', 'cluster', 'directory', 'exchange', - 'exchange_suffix', 'feeder', 'filename', 'flatten', 'flowMain', 'header', 'identity', 'logLevel', + 'exchange_suffix', 'feeder', 'filename', 'flatten', 'flowMain', 'header', 'identity', + 'inlineEncoding', 'logLevel', 'pollUrl', 'post_baseUrl', 'post_baseDir', 'post_broker', 'post_exchange', 'post_exchangeSuffix', 'post_format', 'post_topic', 'queueName', 'sendTo', 'rename', 'report_exchange', 'source', 'strip', 'timezone', 'nodupe_ttl', 'nodupe_driver', @@ -665,7 +667,9 @@ class Config: 'exchange_split': 'exchangeSplit', 'exchange_suffix': 'exchangeSuffix', 'expiry': 'expire', - 'file_time_limit' : 'nodupe_fileAgeMax', + 'file_time_limit' : 'fileAgeMax', + 'nodupe_fileAgeMax' : 'fileAgeMax', + 'nodupe_fileAgeMin' : 'fileAgeMin', 'fp' : 'force_polling', 'fs' : 'follow_symlinks', 'h' : 'help', @@ -767,7 +771,8 @@ def __init__(self, parent=None) -> 'Config': self.bufsize = 1024 * 1024 self.byteRateMax = 0 - self.nodupe_fileAgeMax = 0 # disabled. + self.fileAgeMax = 0 # disabled. + self.fileAgeMin = 0 # disabled. self.timezone = 'UTC' self.debug = False self.declared_exchanges = [] @@ -1552,7 +1557,9 @@ def parse_file(self, cfg, component=None): self._parse_v3unplugin(v) elif k in ['inflight', 'lock']: if v[:-1].isnumeric(): - setattr(self, k, durationToSeconds(v)) + vv = durationToSeconds(v) + setattr(self, k, vv) + self.fileAgeMin = vv else: if line[1].lower() in ['none', 'off', 'false']: setattr(self, k, None) @@ -1911,6 +1918,10 @@ def finalize(self, component=None, config=None): self._resolve_exchange() self._resolveQueueName(component,cfg) + valid_inlineEncodings = [ 'guess', 'text', 'binary' ] + if hasattr(self, 'inlineEncoding') and self.inlineEncoding not in valid_inlineEncodings: + logger.error( f"invalid inlineEncoding: {self.inlineEncoding} must be one of: {','.join(valid_inlineEncodings)}" ) + if hasattr(self, 'no'): if self.statehost: hostdir = self.hostdir @@ -2511,6 +2522,10 @@ def parse_args(self, isPost=False): help= 'allows simultaneous use of multiple versions and types of messages' ) + parser.add_argument('--retry_refilter', + action='store_true', + default=self.retry_refilter, + help='repeat message processing when retrying transfers (default just resends as previous attempt.)') #FIXME: select/accept/reject in parser not implemented. parser.add_argument( '--select', diff --git a/sarracenia/examples/flow/amserver.conf b/sarracenia/examples/flow/amserver.conf index 97e1a8035..b141b4e79 100644 --- a/sarracenia/examples/flow/amserver.conf +++ b/sarracenia/examples/flow/amserver.conf @@ -1,13 +1,19 @@ +# For message content purposes +post_topicPrefix v03.post +post_format v03 callback gather.am callback post.message +# Add appropriate data to filename +callback rename.raw2bulletin + post_broker amqp://tsource@localhost post_exchange xs_tsource_am post_baseUrl file:// download on -directory /tmp/andre_am_receiver +directory /tmp/am_receiver accept .* sum sha512 AllowIPs 127.0.0.1 diff --git a/sarracenia/flow/__init__.py b/sarracenia/flow/__init__.py index 7483dfb78..233301d6c 100644 --- a/sarracenia/flow/__init__.py +++ b/sarracenia/flow/__init__.py @@ -806,7 +806,8 @@ def updateFieldsAccepted(self, msg, urlstr, pattern, maskDir, relPath = u.path[1:] + '/' + relPath # FIXME... why the % ? why not just assign it to copy the value? - if 'rename' in msg: relPath = '%s' % msg['rename'] + if self.o.download and 'rename' in msg: + relPath = '%s' % msg['rename'] token = relPath.split('/') filename = token[-1] @@ -1070,9 +1071,10 @@ def filter(self) -> None: (len(self.worklist.incoming), len(self.worklist.rejected))) def gather(self) -> None: + so_far=0 for p in self.plugins["gather"]: try: - new_incoming = p() + new_incoming = p(self.o.batch-so_far) except Exception as ex: logger.error( f'flowCallback plugin {p} crashed: {ex}' ) logger.debug( "details:", exc_info=True ) @@ -1080,6 +1082,12 @@ def gather(self) -> None: if len(new_incoming) > 0: self.worklist.incoming.extend(new_incoming) + so_far += len(new_incoming) + + # if we gathered enough with a subset of plugins then return. + if so_far >= self.o.batch: + return + def do(self) -> None: diff --git a/sarracenia/flow/poll.py b/sarracenia/flow/poll.py index 8f99b17c4..f6dd49d32 100644 --- a/sarracenia/flow/poll.py +++ b/sarracenia/flow/poll.py @@ -33,7 +33,7 @@ 'post_on_start': False, 'sleep': -1, 'nodupe_ttl': 7 * 60 * 60, - 'nodupe_fileAgeMax': 30 * 24 * 60 * 60, + 'fileAgeMax': 30 * 24 * 60 * 60, } # 'sumflg': 'cod,md5', @@ -74,6 +74,9 @@ def __init__(self, options): self.plugins['load'].insert(0, 'sarracenia.flowcb.post.message.Message') + if self.o.nodupe_ttl < self.o.fileAgeMax: + logger.warning( f"nodupe_ttl < fileAgeMax means some files could age out of the cache and be re-ingested ( see : https://github.com/MetPX/sarracenia/issues/904") + if not features['ftppoll']['present']: if hasattr( self.o, 'pollUrl' ) and ( self.o.pollUrl.startswith('ftp') ): logger.critical( f"attempting to configure an FTP poll pollUrl={self.o.pollUrl}, but missing python modules: {' '.join(features['ftppoll']['modules_needed'])}" ) diff --git a/sarracenia/flowcb/__init__.py b/sarracenia/flowcb/__init__.py index cdd2cd276..5ec2c94de 100755 --- a/sarracenia/flowcb/__init__.py +++ b/sarracenia/flowcb/__init__.py @@ -80,9 +80,9 @@ def ack(self,messagelist) -> None:: Task: acknowledge messages from a gather source. - def gather(self) -> list:: + def gather(self, messageCountMax) -> list:: - Task: gather messages from a source... return a list of messages. + Task: gather messages from a source... return a list of messages in a poll, gather is always called, regardless of vip posession. in all other components, gather is only called when in posession diff --git a/sarracenia/flowcb/gather/file.py b/sarracenia/flowcb/gather/file.py index e5e157914..8238cb4dc 100755 --- a/sarracenia/flowcb/gather/file.py +++ b/sarracenia/flowcb/gather/file.py @@ -51,30 +51,6 @@ def __init__(self, parent): self.on_moved = parent.on_moved super().__init__() -def path_inflight_tooNew(inflight, lstat): - """ - check the inflight, compare fail age against it. - return True if the file is too new to be posted. - """ - - if not type(inflight) in [float, int] : - #logger.debug("ok inflight unused") - return False - - if lstat == None or not hasattr(lstat,'st_mtime'): - #logger.debug("ok lstat None") - return False - - age = nowflt() - lstat.st_mtime - if age < inflight: - logger.debug("%d vs (inflight setting) %d seconds. Too New!" % \ - (age,inflight) ) - return True - - return False - - - class File(FlowCB): """ @@ -142,7 +118,6 @@ def __init__(self, options): self.o.create_modify = ('create' in self.o.fileEvents) or ( 'modify' in self.o.fileEvents) - def post_delete(self, path, key=None, value=None,is_directory=False): #logger.debug("post_delete %s (%s,%s)" % (path, key, value)) @@ -439,7 +414,9 @@ def post1move(self, src, dst): def process_event(self, event, src, dst): """ - return a list of messages. + return a tuple: pop? + list of messages. + + """ #logger.debug("process_event %s %s %s " % (event,src,dst) ) @@ -447,19 +424,19 @@ def process_event(self, event, src, dst): if event == 'delete' : if event in self.o.fileEvents: - return self.post1file(src, None) - return [] + return (True, self.post1file(src, None)) + return (True, []) if event == 'rmdir' : if event in self.o.fileEvents: - return self.post1file(src, None, is_directory=True) - return [] + return (True, self.post1file(src, None, is_directory=True)) + return (True, []) # move if event == 'move': if self.o.create_modify: - return self.post1move(src, dst) + return (True, self.post1move(src, dst)) # create or modify @@ -472,27 +449,36 @@ def process_event(self, event, src, dst): if os.path.islink(src): if 'link' in self.o.fileEvents: - return self.post1file(src, None) - return [] + return (True, self.post1file(src, None)) + return (True, []) # file : must exists # (may have been deleted since event caught) - if not os.path.exists(src): return [] + if not os.path.exists(src): return (True, []) # file : must be old enough lstat = sarracenia.stat(src) - if path_inflight_tooNew(self.o.inflight, lstat): return [] + if lstat and hasattr(lstat,'st_mtime'): + age = time.time() - lstat.st_mtime + + if age < self.o.fileAgeMin: + logger.debug( "%d vs (inflight setting) %d seconds. Too New!" % (age,self.o.fileAgeMin) ) + return (False, []) + + if self.o.fileAgeMax > 0 and age > self.o.fileAgeMax: + logger.debug("%d vs (fileAgeMax setting) %d seconds. Too Old!" % (age,self.o.fileAgeMax) ) + return (True, []) # post it if event == 'mkdir' and 'mkdir' in self.o.fileEvents: - return self.post1file(src, lstat, is_directory=True) + return (True, self.post1file(src, lstat, is_directory=True)) elif self.o.create_modify: - return self.post1file(src, lstat) - return [] + return (True, self.post1file(src, lstat)) + return (True, []) def set_blocksize(self, bssetting, fsiz): @@ -529,9 +515,11 @@ def wakeup(self): messages = [] for key in self.cur_events: + event_done=False event, src, dst = self.cur_events[key] try: - messages.extend(self.process_event(event, src, dst)) + (event_done, new_messages) = self.process_event(event, src, dst) + messages.extend(new_messages) except OSError as err: """ This message is reduced to debug priority because it often happens when files @@ -542,7 +530,9 @@ def wakeup(self): logger.debug("skipping event that could not be processed: ({}): {}".format( event, err)) logger.debug("Exception details:", exc_info=True) - self.left_events.pop(key) + event_done=True + if event_done: + self.left_events.pop(key) return messages def walk(self, src): @@ -680,7 +670,7 @@ def on_start(self): self.queued_messages = [] self.primed = False - def gather(self): + def gather(self, messageCountMax): """ from sr_post.py/run diff --git a/sarracenia/flowcb/gather/message.py b/sarracenia/flowcb/gather/message.py index e45f0bf7f..d155b8795 100755 --- a/sarracenia/flowcb/gather/message.py +++ b/sarracenia/flowcb/gather/message.py @@ -28,7 +28,7 @@ def __init__(self, options) -> None: else: logger.critical('missing required broker specification') - def gather(self) -> list: + def gather(self, messageCountMax) -> list: """ return a current list of messages. """ diff --git a/sarracenia/flowcb/log.py b/sarracenia/flowcb/log.py index 3b134e28a..e49aa8c51 100755 --- a/sarracenia/flowcb/log.py +++ b/sarracenia/flowcb/log.py @@ -32,14 +32,14 @@ def __init__(self, options): logger.info(f'{self.o.component} initialized with: logEvents: {self.o.logEvents}, logMessageDump: {self.o.logMessageDump}') if self.o.component in ['sender']: self.action_verb = 'sent' + elif self.o.component in ['subscribe', 'sarra' ]: + self.action_verb = 'downloaded' elif self.o.component in ['post', 'poll', 'watch']: self.action_verb = 'noticed' - elif self.o.component in ['shovel']: - self.action_verb = 'filtered' - elif self.o.component in ['flow']: - self.action_verb = 'flowed' + elif self.o.component in [ 'flow', 'shovel', 'winnow']: + self.action_verb = self.o.component + 'ed' else: - self.action_verb = 'downloaded' + self.action_verb = 'done' self.started = nowflt() self.rxTopicSeparator='.' @@ -60,9 +60,9 @@ def __reset(self): def metricsReport(self): return { 'lagMax': self.lagMax, 'lagTotal':self.lagTotal, 'lagMessageCount':self.msgCount, 'rejectCount':self.rejectCount } - def gather(self): + def gather(self, messageCountMax): if set(['gather']) & self.o.logEvents: - logger.info('') + logger.info( f' messageCountMax: {messageCountMax} ') return [] diff --git a/sarracenia/flowcb/nodupe/disk.py b/sarracenia/flowcb/nodupe/disk.py index cc31d502d..ada9eeba2 100755 --- a/sarracenia/flowcb/nodupe/disk.py +++ b/sarracenia/flowcb/nodupe/disk.py @@ -30,10 +30,10 @@ class Disk(NoDupe): The expiry based on nodupe_ttl is applied every housekeeping interval. - nodupe_fileAgeMax - the oldest file that will be considered for processing. + fileAgeMax - the oldest file that will be considered for processing. files older than this threshold will be rejected. - nodupe_fileAgeMin - the newest file that can be considered for processing. + fileAgeMin - the newest file that can be considered for processing. files newer than this threshold will be rejected. if not specified, the value of option *inflight* may be referenced if it is an integer value. @@ -55,8 +55,6 @@ def __init__(self, options): logging.basicConfig(format=self.o.logFormat, level=getattr(logging, self.o.logLevel.upper())) self.o.add_option( 'nodupe_ttl', 'duration', 0 ) - self.o.add_option( 'nodupe_fileAgeMax', 'duration', 0 ) - self.o.add_option( 'nodupe_fileAgeMin', 'duration', 0 ) logger.info('time_to_live=%d, ' % (self.o.nodupe_ttl)) @@ -149,15 +147,13 @@ def check_message(self, msg) -> bool : def after_accept(self, worklist): new_incoming = [] self.now = nowflt() - if self.o.nodupe_fileAgeMax > 0: - min_mtime = self.now - self.o.nodupe_fileAgeMax + if self.o.fileAgeMax > 0: + min_mtime = self.now - self.o.fileAgeMax else: min_mtime = 0 - if self.o.nodupe_fileAgeMin > 0: - max_mtime = self.now - self.o.nodupe_fileAgeMin - elif type(self.o.inflight) in [ int, float ] and self.o.inflight > 0: - max_mtime = self.now - self.o.inflight + if self.o.fileAgeMin > 0: + max_mtime = self.now - self.o.fileAgeMin else: # FIXME: should we add some time here to allow for different clocks? # 100 seconds in the future? hmm... @@ -179,7 +175,7 @@ def after_accept(self, worklist): worklist.rejected.append(m) continue - if self.check_message(m): + if '_isRetry' in m or self.check_message(m): new_incoming.append(m) else: m['_deleteOnPost'] |= set(['reject']) diff --git a/sarracenia/flowcb/nodupe/redis.py b/sarracenia/flowcb/nodupe/redis.py index 20d92d94a..c9ce9701b 100644 --- a/sarracenia/flowcb/nodupe/redis.py +++ b/sarracenia/flowcb/nodupe/redis.py @@ -32,10 +32,10 @@ class Redis(NoDupe): The expiry based on nodupe_ttl is applied every housekeeping interval. - nodupe_fileAgeMax - the oldest file that will be considered for processing. + fileAgeMax - the oldest file that will be considered for processing. files older than this threshold will be rejected. - nodupe_fileAgeMin - the newest file that can be considered for processing. + fileAgeMin - the newest file that can be considered for processing. files newer than this threshold will be rejected. if not specified, the value of option *inflight* may be referenced if it is an integer value. @@ -50,8 +50,6 @@ def __init__(self, options): logging.basicConfig(format=self.o.logFormat, level=getattr(logging, self.o.logLevel.upper())) self.o.add_option( 'nodupe_ttl', 'duration', 0 ) - self.o.add_option( 'nodupe_fileAgeMax', 'duration', 0 ) - self.o.add_option( 'nodupe_fileAgeMin', 'duration', 0 ) logger.info('time_to_live=%d, ' % (self.o.nodupe_ttl)) @@ -145,13 +143,13 @@ def on_housekeeping(self): def after_accept(self, worklist): new_incoming = [] self.now = nowflt() - if self.o.nodupe_fileAgeMax > 0: - min_mtime = self.now - self.o.nodupe_fileAgeMax + if self.o.fileAgeMax > 0: + min_mtime = self.now - self.o.fileAgeMax else: min_mtime = 0 - if self.o.nodupe_fileAgeMin > 0: - max_mtime = self.now - self.o.nodupe_fileAgeMin + if self.o.fileAgeMin > 0: + max_mtime = self.now - self.o.fileAgeMin elif type(self.o.inflight) in [ int, float ] and self.o.inflight > 0: max_mtime = self.now - self.o.inflight else: @@ -175,7 +173,7 @@ def after_accept(self, worklist): worklist.rejected.append(m) continue - if self._is_new(m): + if '_isRetry' in m or self._is_new(m): new_incoming.append(m) else: m['_deleteOnPost'] |= set(['reject']) diff --git a/sarracenia/flowcb/rename/raw2bulletin.py b/sarracenia/flowcb/rename/raw2bulletin.py new file mode 100644 index 000000000..bb356d636 --- /dev/null +++ b/sarracenia/flowcb/rename/raw2bulletin.py @@ -0,0 +1,360 @@ +""" +Description: + sr3 equivalent of the V2 configuration cvt_bulletin_filename_from_content + Add bulletin data (full header, timestamp, station ID, BBB) to incomplete filename + + Works essentially the same way as its v2 counterpart, except it can get the bulletin file contents 2 ways. + 1. By the sr3 message content + 2. By opening and reading the path to the file directly. + The plugin captures what was done on the V2 converter and ties it up with Sundew source code logic to make it more generalized. + What it can do that the V2 plugin cannot: + - Add the station ID in the filename + - Add the BBB in the filename + - Fetch bulletin data multiple ways + + Decoding of the data is done in the same way of the encoder in flowcb/gather/am.py + +Examples: + + RAW Ninjo file (4 letter station ID) + WACN07 CWAO 082327 + CZEG AIRMET E1 VALID 080105/080505 CWEG- + + Output filename: WACN07_CWAO_082327_CZEG__00001 + + Another RAW Ninjo file + FTCN32 CWAO 100500 AAM + (...) + + Output filename: FTCN32_CWAO_100500_AAM__00002 + + A CACN bulletin missing the correct filename + Input filename: CA__12345 + + Contents: + CACN00 CWAO 141600 + PQU + + Output filename: CACN00_CWAO_141600_PQU__00003 + +Usage: + callback rename.raw2bulletin + +Contributions: + Andre LeBlanc - First author (2024/02) + +Improvements: + Delegate some of the generalized methods to a parent class. To be callable by other plugins. + Add more Sundew logic if ever some bulletins end up failing when implemented +""" + +from sarracenia.flowcb import FlowCB +import logging +from base64 import b64encode +import time, datetime +import subprocess + +logger = logging.getLogger(__name__) + +class Raw2bulletin(FlowCB): + + def __init__(self,options) : + super().__init__(options,logger) + self.seq = 0 + # Need to redeclare these options to have their default values be initialized. + self.o.add_option('inputCharset', 'str', 'utf-8') + self.o.add_option('binaryInitialCharacters', 'list', [b'BUFR' , b'GRIB', b'\211PNG']) + + # If file was converted, get rid of extensions it had + def after_accept(self,worklist): + + good_msgs = [] + + for msg in worklist.incoming: + + path = msg['new_dir'] + '/' + msg['new_file'] + + filenameFirstChars = msg['new_file'].split('_')[0] + + # AM bulletins that need their filename rewritten with data should only have two chars before the first underscore + # This is in concordance with Sundew logic -> https://github.com/MetPX/Sundew/blob/main/lib/bulletinAm.py#L70-L71 + # These messages are still good, so we will add them to the good_msgs list + if len(filenameFirstChars) != 2: + good_msgs.append(msg) + continue + + data = self.getData(msg, path) + + if data == None: + worklist.rejected.append(msg) + continue + + lines = data.split('\n') + #first_line = lines[0].strip('\r') + #first_line = first_line.strip(' ') + #first_line = first_line.strip('\t') + first_line = lines[0].split(' ') + + # Build header from bulletin + header = self.buildHeader(first_line) + if header == None: + logger.error("Unable to fetch header contents. Skipping message") + worklist.rejected.append(msg) + continue + + # Get the station timestamp from bulletin + ddhhmm = self.getTime(data) + if ddhhmm == None: + logger.error("Unable to get julian time.") + + # Get the BBB from bulletin + BBB = self.getBBB(first_line) + + # Get the station ID from bulletin + stn_id = self.getStation(data) + + # Generate a sequence (random ints) + seq = self.getSequence() + + # Rename file with data fetched + try: + # We can't disseminate bulletins downstream if they're missing the timestamp, but we want to keep the bulletins to troubleshoot source problems + # We'll append "_PROBLEM" to the filename to be able to identify erronous bulletins + if ddhhmm == None: + timehandler = datetime.datetime.now() + + # Add current time as new timestamp to filename + new_file = header + "_" + timehandler.strftime('%d%H%M') + "_" + BBB + "_" + stn_id + "_" + seq + "_PROBLEM" + + # Write the file manually as the messages don't get posted downstream. + # The message won't also get downloaded further downstream + msg['new_file'] = new_file + new_path = msg['new_dir'] + '/' + msg['new_file'] + + with open(new_path, 'w') as f: f.write(data) + + logger.error(f"New filename (for problem file): {new_file}") + raise Exception + else: + new_file = header + "_" + ddhhmm + "_" + BBB + "_" + stn_id + "_" + seq + + msg['new_file'] = new_file + new_path = msg['new_dir'] + '/' + msg['new_file'] + + logger.info(f"New filename (with path): {new_path}") + + good_msgs.append(msg) + + except Exception as e: + logger.error(f"Error in renaming. Error message: {e}") + worklist.rejected.append(msg) + continue + + worklist.incoming = good_msgs + + + def getData(self, msg, path): + """Get the bulletin data. + We can either get the bulletin data via + 1. Sarracenia message content + 2. Locally downloaded file + """ + + # Read file data from message or from file path directly if message content not found. + try: + + binary = 0 + if msg['content']: + data = msg['content']['value'] + else: + + fp = open(path, 'rb') + data = fp.read() + # bulletin = Bulletin(data) + fp.close() + + # Decode data, binary and text. Integrate inputCharset + if data.splitlines()[1][:4] in self.o.binaryInitialCharacters: + binary = 1 + + if not binary: + data = data.decode(self.o.inputCharset) + else: + data = b64encode(data).decode('ascii') + + return data + + except Exception as e: + logger.error(f"Could not fetch file data of from either message content or {path}. Error details: {e}") + return None + + + def getSequence(self): + """ sequence number to make the file unique... + """ + self.seq = self.seq + 1 + if self.seq > 99999: + self.seq = 1 + return str(self.seq).zfill(5) + + + def getStation(self, data): + """Extracted from Sundew code: https://github.com/MetPX/Sundew/blob/main/lib/bulletin.py#L327-L408 + Get the station ID from the bulletin contents. + Examples: + CACN00 CWAO -> Station ID located on second line. + FTCN32 CWAO -> Station ID located on first line (with header) + """ + + station = '' + data = data.lstrip('\n') + data = data.split('\n') + + try: + premiereLignePleine = "" + deuxiemeLignePleine = "" + + # special case, need to get the next full line. + i = 0 + for ligne in data[1:]: + i += 1 + premiereLignePleine = ligne + if len(premiereLignePleine) > 1: + if len(data) > i+1 : deuxiemeLignePleine = data[i+1] + break + + #print " ********************* header = ", data[0][0:7] + # switch depends on bulletin type. + if data[0][0:2] == "SA": + if data[1].split()[0] in ["METAR","LWIS"]: + station = premiereLignePleine.split()[1] + else: + station = premiereLignePleine.split()[0] + + elif data[0][0:2] == "SP": + station = premiereLignePleine.split()[1] + + elif data[0][0:2] in ["SI","SM"]: + station = premiereLignePleine.split()[0] + if station == "AAXX" : + if deuxiemeLignePleine != "" : + station = deuxiemeLignePleine.split()[0] + else : + station = '' + + elif data[0][0:6] in ["SRCN40","SXCN40","SRMT60","SXAK50", "SRND20", "SRND30"]: + #elif data[0][0:6] in self.wmo_id: + station = premiereLignePleine.split()[0] + + elif data[0][0:2] in ["FC","FT"]: + if premiereLignePleine.split()[1] == "AMD": + station = premiereLignePleine.split()[2] + else: + station = premiereLignePleine.split()[1] + + elif data[0][0:2] in ["UE","UG","UK","UL","UQ","US"]: + parts = premiereLignePleine.split() + if parts[0][:2] in ['EE', 'II', 'QQ', 'UU']: + station = parts[1] + elif parts[0][:2] in ['PP', 'TT']: + station = parts[2] + else: + station = '' + + elif data[0][0:2] in ["RA","MA","CA"]: + station = premiereLignePleine.split()[0].split('/')[0] + + except Exception: + station = '' + + if station != '' : + while len(station) > 1 and station[0] == '?' : + station = station[1:] + if station[0] != '?' : + station = station.split('?')[0] + if station[-1] == '=' : station = station[:-1] + else : + station = '' + + return station + + + def getBBB(self, first_line): + """Get the BBB. If none found, return empty string. + The BBB is the field of the bulletin header that states if it was amended or not. + """ + + if len(first_line) != 4: + BBB = '' + else: + BBB = first_line[3] + + return BBB + + def buildHeader(self, first_line): + """ Build header from file contents + """ + + try: + T1T2A1A2ii = first_line[0] + CCCC = first_line[1] + # YYGGgg = parts[2] + + header = T1T2A1A2ii + "_" + CCCC # + "_" + YYGGgg + + except Exception: + header = None + + return header + + + def getTime(self, data): + """ extract time from the data of the ca station + the data's first line looks like this : x,yyyy,jul,hhmm,... + where x is an integer of no importance, followed by obs'time + yyyy = year + jul = julian day + hhmm = hour and mins + """ + + try: + parts = data.split(',') + + if len(parts) < 4: return None + + year = parts[1] + jul = parts[2] + hhmm = parts[3] + + # passe-passe pour le jour julien en float parfois ? + f = float(jul) + i = int(f) + jul = '%s' % i + # fin de la passe-passe + + # strange 0 filler + + while len(hhmm) < 4: + hhmm = '0' + hhmm + while len(jul) < 3: + jul = '0' + jul + + # problematic 2400 for 00z + + if hhmm != '2400': + emissionStr = year + jul + hhmm + timeStruct = time.strptime(emissionStr, '%Y%j%H%M') + ddHHMM = time.strftime("%d%H%M", timeStruct) + return ddHHMM + + # sometime hhmm is 2400, to avoid exception + # set time to 00, increase by 24 hr + + jul00 = year + jul + '0000' + timeStruct = time.strptime(jul00, '%Y%j%H%M') + ep_emission = time.mktime(timeStruct) + 24 * 60 * 60 + timeStruct = time.localtime(self.ep_emission) + ddHHMM = time.strftime('%d%H%M', timeStruct) + return ddHHMM + except Exception as e: + return None diff --git a/sarracenia/flowcb/retry.py b/sarracenia/flowcb/retry.py index 9bedb8371..9cf47f04e 100755 --- a/sarracenia/flowcb/retry.py +++ b/sarracenia/flowcb/retry.py @@ -54,6 +54,11 @@ def __init__(self, options) -> None: return self.o.add_option( 'retry_driver', 'str', 'disk') + + # retry_refilter False -- rety to send with existing processing. + # retry_refilter True -- re-ingest and re-apply processing (if it has changed.) + self.o.add_option( 'retry_refilter', 'flag', False) + #queuedriver = os.getenv('SR3_QUEUEDRIVER', 'disk') if self.o.retry_driver == 'redis': @@ -69,12 +74,43 @@ def __init__(self, options) -> None: logger.debug('logLevel=%s' % self.o.logLevel) + + def gather(self, qty) -> None: + """ + If there are only a few new messages, get some from the download retry queue and put them into + `worklist.incoming`. + + Do this in the gather() entry point if retry_refilter is True. + + """ + if not features['retry']['present'] or not self.o.retry_refilter: + return [] + + if qty <= 0: return [] + + message_list = self.download_retry.get(qty) + + # eliminate calculated values so it is refiltered from scratch. + for m in message_list: + for k in m: + if k in m['_deleteOnPost'] or k.startswith('new_'): + del m[k] + m['_isRetry'] = True + m['_deleteOnPost'] = set( [ '_isRetry' ] ) + + + return message_list + + def after_accept(self, worklist) -> None: """ If there are only a few new messages, get some from the download retry queue and put them into `worklist.incoming`. + + Do this in the after_accept() entry point if retry_refilter is False. + """ - if not features['retry']['present'] : + if not features['retry']['present'] or self.o.retry_refilter: return qty = (self.o.batch / 2) - len(worklist.incoming) diff --git a/sarracenia/flowcb/run.py b/sarracenia/flowcb/run.py index 5b9d00e09..c183522ab 100755 --- a/sarracenia/flowcb/run.py +++ b/sarracenia/flowcb/run.py @@ -68,7 +68,7 @@ def run_script(self, script): logger.error("subprocess.run failed err={}".format(err)) logger.debug("Exception details:", exc_info=True) - def gather(self): + def gather(self, messageCountMax): """ FIXME: this does not make sense. need to figure out how to get the messages back from the script, perhaps using a json file reader? diff --git a/sarracenia/flowcb/scheduled/__init__.py b/sarracenia/flowcb/scheduled/__init__.py index f2af22e06..dd770e746 100644 --- a/sarracenia/flowcb/scheduled/__init__.py +++ b/sarracenia/flowcb/scheduled/__init__.py @@ -87,7 +87,7 @@ def __init__(self,options,logger=logger): now=datetime.datetime.fromtimestamp(time.time(),datetime.timezone.utc) self.update_appointments(now) - def gather(self): + def gather(self,messageCountMax): # for next expected post self.wait_until_next() diff --git a/sarracenia/flowcb/scheduled/wiski.py b/sarracenia/flowcb/scheduled/wiski.py index 9e3ddc4a2..256b036a5 100644 --- a/sarracenia/flowcb/scheduled/wiski.py +++ b/sarracenia/flowcb/scheduled/wiski.py @@ -131,7 +131,7 @@ def submit_tokenization_request(self): return submit_tokenization_request - def gather(self): # placeholder + def gather(self,messageCountMax): # placeholder messages=[] @@ -216,5 +216,5 @@ def gather(self): # placeholder logging.basicConfig(level=logging.DEBUG) me = Wiski(flow.o) - me.gather() + me.gather(flow.o.batch) diff --git a/sarracenia/flowcb/v2wrapper.py b/sarracenia/flowcb/v2wrapper.py index 57e1b244f..82e7b17ab 100755 --- a/sarracenia/flowcb/v2wrapper.py +++ b/sarracenia/flowcb/v2wrapper.py @@ -61,6 +61,7 @@ def sumstrFromMessage( msg ) -> str: if 'fileOp' in msg: if 'rename' in msg['fileOp']: msg['oldname'] = msg['fileOp']['rename'] + if 'link' in msg['fileOp']: hash = sha512() hash.update( bytes( msg['fileOp']['link'], encoding='utf-8' ) ) @@ -77,7 +78,6 @@ def sumstrFromMessage( msg ) -> str: sumstr = 'r,%s' % hash.hexdigest() else: sumstr = 'm,%s' % hash.hexdigest() - else: logger.error('unknown fileOp: %s' % msg['fileOp'] ) return sumstr diff --git a/sarracenia/sr.py b/sarracenia/sr.py index 70feae07d..42e35e5d2 100755 --- a/sarracenia/sr.py +++ b/sarracenia/sr.py @@ -301,15 +301,6 @@ def _read_configs(self): self.default_cfg = sarracenia.config.default_config() - #self.default_cfg = sarracenia.config.Config() - #if os.path.exists( "default.conf" ): - # self.default_cfg.parse_file("default.conf") - #if os.path.exists( "admin.conf" ): - # self.default_cfg.parse_file("admin.conf") - - #self.admin_cfg = copy.deepcopy( self.default_cfg ) - #if os.path.exists( "admin.conf" ): - # self.admin_cfg.parse_file("admin.conf") os.chdir(self.user_config_dir) for c in self.components: @@ -749,6 +740,37 @@ def __resolved_exchanges(self, c, cfg, o): exl.append(x) return exl + def __resolved_post_exchanges(self, c, cfg, o): + """ + Guess the name of an exchange. looking at either a direct setting, + or an existing queue state file, or lastly just guess based on conventions. + """ + exl = [] + #if hasattr(o,'declared_exchanges'): + # exl.extend(o.declared_exchanges) + + if hasattr(o, 'post_exchange'): + if type(o.post_exchange) == list: + exl.extend(o.post_exchange) + else: + exl.append(o.post_exchange) + return exl + + x = 'xs_%s' % o.post_broker.url.username + + if hasattr(o, 'post_exchangeSuffix'): + x += '_%s' % o.post_exchangeSuffix + + if hasattr(o, 'post_exchangeSplit'): + l = [] + for i in range(0, o.instances): + y = x + '%02d' % i + l.append(y) + return l + else: + exl.append(x) + return exl + def __guess_queueName(self, c, cfg, o): """ Guess the name of a queue. looking at either a direct setting, @@ -832,17 +854,8 @@ def _resolve_brokers(self): if hasattr(o, 'post_broker') and o.post_broker is not None and o.post_broker.url is not None: host = self._init_broker_host(o.post_broker.url.netloc) - #o.broker = o.post_broker - if hasattr(o, 'post_exchange'): - o.exchange = o.post_exchange - if hasattr(o, 'post_exchangeSplit'): - o.exchangeSplit = o.post_exchangeSplit - if hasattr(o, 'post_exchangeSuffix'): - o.exchangeSuffix = o.post_exchangeSuffix - - xl = self.__resolved_exchanges(c, cfg, o) - - self.configs[c][cfg]['options'].resolved_exchanges = xl + self.configs[c][cfg]['options'].resolved_exchanges = \ + self.__resolved_post_exchanges(c, cfg, o) if hasattr(o, 'post_exchange'): self.brokers[host]['exchange'] = o.post_exchange @@ -1339,6 +1352,20 @@ def declare(self): self.default_cfg.declared_users[u_url.username], u_url.username, u_url.password, self.options.dry_run ) + # declare admin exchanges. + if hasattr(self,'default_cfg'): + logger.info( f"Declaring exchnges for admin.conf using {self.default_cfg.admin} ") + if hasattr(self.default_cfg, 'declared_exchanges'): + xdc = sarracenia.moth.Moth.pubFactory( + { + 'broker': self.default_cfg.admin, + 'dry_run': self.options.dry_run, + 'exchange': self.default_cfg.declared_exchanges, + 'message_strategy': { 'stubborn':True } + }) + xdc.putSetup() + xdc.close() + # declare exchanges first. for f in self.filtered_configurations: if self.please_stop: @@ -1404,7 +1431,6 @@ def declare(self): flow.runCallbacksTime('on_declare') del flow flow=None - def disable(self): if len(self.filtered_configurations) == 0: diff --git a/tests/sarracenia/flowcb/nodupe/disk_test.py b/tests/sarracenia/flowcb/nodupe/disk_test.py index ebefb87e4..273122b9b 100644 --- a/tests/sarracenia/flowcb/nodupe/disk_test.py +++ b/tests/sarracenia/flowcb/nodupe/disk_test.py @@ -557,8 +557,8 @@ def test_after_accept__WithFileAges(tmp_path, capsys): nodupe = Disk(BaseOptions) nodupe.o.nodupe_ttl = 100000 - nodupe.o.nodupe_fileAgeMin = 1000 - nodupe.o.nodupe_fileAgeMax = 1000 + nodupe.o.fileAgeMin = 1000 + nodupe.o.fileAgeMax = 1000 nodupe.open() nodupe.now = nowflt() + 10 diff --git a/tests/sarracenia/flowcb/nodupe/nodupe_test.py b/tests/sarracenia/flowcb/nodupe/nodupe_test.py index c2b2fd93f..e976b4d36 100644 --- a/tests/sarracenia/flowcb/nodupe/nodupe_test.py +++ b/tests/sarracenia/flowcb/nodupe/nodupe_test.py @@ -242,8 +242,8 @@ def test_after_accept__WithFileAges(tmp_path, capsys): #Disk nodupe_disk = NoDupe_Disk(BaseOptions) nodupe_disk.o.nodupe_ttl = 100000 - nodupe_disk.o.nodupe_fileAgeMin = 1000 - nodupe_disk.o.nodupe_fileAgeMax = 1000 + nodupe_disk.o.fileAgeMin = 1000 + nodupe_disk.o.fileAgeMax = 1000 nodupe_disk.now = message_now nodupe_disk.on_start() @@ -255,8 +255,8 @@ def test_after_accept__WithFileAges(tmp_path, capsys): #Redis nodupe_redis = NoDupe_Redis(BaseOptions) nodupe_redis.o.nodupe_ttl = 100000 - nodupe_redis.o.nodupe_fileAgeMin = 1000 - nodupe_redis.o.nodupe_fileAgeMax = 1000 + nodupe_redis.o.fileAgeMin = 1000 + nodupe_redis.o.fileAgeMax = 1000 nodupe_redis.now = message_now nodupe_redis.on_start() diff --git a/tests/sarracenia/flowcb/nodupe/redis_test.py b/tests/sarracenia/flowcb/nodupe/redis_test.py index 9bb1a95fb..b4c973e1a 100644 --- a/tests/sarracenia/flowcb/nodupe/redis_test.py +++ b/tests/sarracenia/flowcb/nodupe/redis_test.py @@ -253,8 +253,8 @@ def test_after_accept__WithFileAges(tmp_path, capsys): nodupe = Redis(BaseOptions) nodupe.o.nodupe_ttl = 100000 - nodupe.o.nodupe_fileAgeMin = 1000 - nodupe.o.nodupe_fileAgeMax = 1000 + nodupe.o.fileAgeMin = 1000 + nodupe.o.fileAgeMax = 1000 nodupe.now = nowflt() + 10