Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Template refactor #125

Merged
merged 21 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ jobs:
pip3 install -r requirements.txt
pip3 install -r requirements-dev.txt
python3 setup.py install
mkdir -p /opt/csv2bufr
cd /opt/csv2bufr
wget https://github.com/wmo-im/csv2bufr-templates/archive/refs/tags/v0.1.tar.gz
tar -zxf v0.1.tar.gz --strip-components=1 csv2bufr-templates-0.1/templates
- name: run tests ⚙️
run: |
pytest
Expand Down
93 changes: 33 additions & 60 deletions csv2bufr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import json
import logging
import os.path
import threading
from typing import Any, Iterator, Union

from eccodes import (codes_bufr_new_from_samples,
Expand All @@ -38,7 +39,7 @@
codes_bufr_keys_iterator_delete,
codes_bufr_keys_iterator_get_name, CodesInternalError)

from jsonschema import validate
import csv2bufr.templates as c2bt

# some 'constants'
SUCCESS = True
Expand All @@ -56,9 +57,6 @@

LOGGER = logging.getLogger(__name__)

THISDIR = os.path.dirname(os.path.realpath(__file__))
MAPPINGS = f"{THISDIR}{os.sep}resources{os.sep}mappings"

BUFR_TABLE_VERSION = 38 # default BUFR table version
# list of BUFR attributes
ATTRIBUTES = ['code', 'units', 'scale', 'reference', 'width']
Expand Down Expand Up @@ -94,7 +92,7 @@
'typicalSecond': 'const:0'
}

_warnings = []
_warnings_global = {}

# status codes
FAILED = 0
Expand Down Expand Up @@ -124,7 +122,8 @@

# function to find position in array of requested element
def index_(key, mapping):
global _warnings
global _warnings_global
tidx = f"t-{threading.get_ident()}"
idx = 0
for item in mapping:
if item['eccodes_key'] == key:
Expand All @@ -133,7 +132,7 @@ def index_(key, mapping):
if NULLIFY_INVALID:
msg = f"Warning: key {key} not found in {mapping}"
LOGGER.warning(msg)
_warnings.append(msg)
_warnings_global[tidx].append(msg)
return None
else:
msg = f"Error: key {key} not found in {mapping}"
Expand All @@ -142,7 +141,9 @@ def index_(key, mapping):


def parse_value(element: str, data: dict):
global _warnings
global _warnings_global
tidx = f"t-{threading.get_ident()}"

data_type = element.split(":")
if data_type[0] == "const":
value = data_type[1]
Expand All @@ -156,7 +157,7 @@ def parse_value(element: str, data: dict):
msg = f"Column {column} not found in input data: {data}"
if NULLIFY_INVALID:
LOGGER.warning(msg) # noqa
_warnings.append(msg)
_warnings_global[tidx].append(msg)
else:
# LOGGER.error(msg) # noqa
raise ValueError(msg)
Expand All @@ -182,7 +183,8 @@ def parse_value(element: str, data: dict):

# function to retrieve data
def get_(key: str, mapping: dict, data: dict):
global _warnings
global _warnings_global
tidx = f"t-{threading.get_ident()}"
# get position in mapping
try:
idx = index_(key, mapping)
Expand All @@ -192,40 +194,13 @@ def get_(key: str, mapping: dict, data: dict):
msg = f"Warning ({e}) raised getting value for {key}, None returned for {key}" # noqa
if NULLIFY_INVALID:
LOGGER.warning(msg) # noqa
_warnings.append(msg)
_warnings_global[tidx].append(msg)
value = None
else:
raise KeyError(msg)
return value


# function to validate mapping file against JSON schema
def validate_mapping(mapping: dict) -> bool:
"""
Validates dictionary containing mapping to BUFR against internal schema.
Returns True if the dictionary passes and raises an error otherwise.

:param mapping: dictionary containing mappings to specified BUFR
sequence using ecCodes key.

:returns: `bool` of validation result
"""
global _warnings
# load internal file schema for mappings
file_schema = f"{MAPPINGS}{os.sep}mapping_schema.json"
with open(file_schema) as fh:
schema = json.load(fh)

# now validate
try:
validate(mapping, schema)
except Exception as e:
msg = f"Warning ({e}). Invalid BUFR template mapping file: {mapping}"
raise RuntimeError(msg)

return SUCCESS


def apply_scaling(value: Union[NUMBERS], scale: Union[NUMBERS],
offset: Union[NUMBERS]) -> Union[NUMBERS]:
"""
Expand Down Expand Up @@ -270,7 +245,8 @@ def validate_value(key: str, value: Union[NUMBERS],

:returns: validated value
"""
global _warnings
global _warnings_global
tidx = f"t-{threading.get_ident()}"
# TODO move this function to the class as part of set value

if value is None:
Expand All @@ -284,7 +260,7 @@ def validate_value(key: str, value: Union[NUMBERS],
msg = f"{key}: Value ({value}) out of valid range ({valid_min} - {valid_max})." # noqa
if nullify_on_fail:
LOGGER.warning(f"{msg}; Element set to missing")
_warnings.append(f"{msg}; Element set to missing")
_warnings_global[tidx].append(f"{msg}; Element set to missing")
return None
else:
raise ValueError(msg)
Expand All @@ -309,7 +285,7 @@ def __init__(self, descriptors: list,
:param table_version: version of Master Table 0 to use, default 36

"""
global _warnings
self.warnings = []
# ================================
# first create empty bufr messages
# ================================
Expand Down Expand Up @@ -380,7 +356,6 @@ def __init__(self, descriptors: list,
# ============================================

def create_template(self) -> None:
global _warnings
template = {}
template["inputDelayedDescriptorReplicationFactor"] = \
self.delayed_replications
Expand Down Expand Up @@ -450,6 +425,7 @@ def reset(self) -> None:
for key in self.dict:
self.dict[key]["value"] = None
self.bufr = None
self.warnings = []

def set_element(self, key: str, value: object) -> None:
"""
Expand All @@ -460,7 +436,6 @@ def set_element(self, key: str, value: object) -> None:

:returns: `None`
"""
global _warnings
# TODO move value validation here

if value is not None and not isinstance(value, list):
Expand All @@ -475,7 +450,7 @@ def set_element(self, key: str, value: object) -> None:
if NULLIFY_INVALID:
value = None
LOGGER.warning(f"{e}: Unable to convert value {value} to int for {key}, set to None") # noqa
_warnings.append(f"{e}: Unable to convert value {value} to int for {key}, set to None") # noqa
self.warnings.append(f"{e}: Unable to convert value {value} to int for {key}, set to None") # noqa
else:
raise RuntimeError(f"{e}: Unable to convert value {value} to int for {key}") # noqa
elif expected_type == "float" and not isinstance(value, float):
Expand All @@ -485,7 +460,7 @@ def set_element(self, key: str, value: object) -> None:
if NULLIFY_INVALID:
value = None
LOGGER.warning(f"{e}: Unable to convert value {value} to float for {key}, set to None") # noqa
_warnings.append(f"{e}: Unable to convert value {value} to float for {key}, set to None") # noqa
self.warnings.append(f"{e}: Unable to convert value {value} to float for {key}, set to None") # noqa
else:
raise RuntimeError(f"{e}: Unable to convert value {value} to float for {key}") # noqa
else:
Expand All @@ -500,7 +475,6 @@ def get_element(self, key: str) -> Any:

:returns: value of the element
"""
global _warnings
result = None
try:
# check if we want value or an attribute (indicated by ->)
Expand All @@ -514,7 +488,7 @@ def get_element(self, key: str) -> Any:
if NULLIFY_INVALID:
result = None
LOGGER.warning(f"Error {e} whilst fetching {key} from data, None returned") # noqa
_warnings.append(f"Error {e} whilst fetching {key} from data, None returned") # noqa
self.warnings.append(f"Error {e} whilst fetching {key} from data, None returned") # noqa
else:
msg = f"Error {e} whilst fetching {key} from data."
raise RuntimeError(msg)
Expand All @@ -530,7 +504,6 @@ def as_bufr(self, use_cached: bool = False) -> bytes:

:returns: bytes containing BUFR data
"""
global _warnings
if use_cached and (self.bufr is not None):
return self.bufr
# ===========================
Expand Down Expand Up @@ -570,7 +543,7 @@ def as_bufr(self, use_cached: bool = False) -> bytes:
except CodesInternalError as e:
msg = f"Error ({e}) calling codes_set({bufr_msg}, 'pack', True). Null message returned" # noqa
LOGGER.warning(f"{msg}") # noqa
_warnings.append(f"{msg}") # noqa
self.warnings.append(f"{msg}") # noqa
codes_release(bufr_msg)
return self.bufr
except Exception as e:
Expand Down Expand Up @@ -609,7 +582,6 @@ def md5(self) -> str:

:returns: md5 of BUFR message
"""
global _warnings
return self._hash

def parse(self, data: dict, mappings: dict) -> None:
Expand All @@ -628,7 +600,6 @@ def parse(self, data: dict, mappings: dict) -> None:
# ==================================================
# Parse the data.
# ==================================================
global _warnings
for section in ("header", "data"):
for element in mappings[section]:
# get eccodes key
Expand Down Expand Up @@ -664,7 +635,7 @@ def parse(self, data: dict, mappings: dict) -> None:
except Exception as e:
if NULLIFY_INVALID:
LOGGER.warning(f"Error raised whilst validating {element['eccodes_key']}, value set to None\ndata: {data}") # noqa
_warnings.append(f"Error raised whilst validating {element['eccodes_key']}, value set to None\ndata: {data}") # noqa
self.warnings.append(f"Error raised whilst validating {element['eccodes_key']}, value set to None\ndata: {data}") # noqa
value = None
else:
# LOGGER.error(f"Error raised whilst validating {element['eccodes_key']}, raising error") # noqa
Expand Down Expand Up @@ -704,7 +675,6 @@ def get_datetime(self) -> datetime:
:returns: `datetime.datetime` of ISO8601 representation of the
characteristic date/time
"""
global _warnings
if None in [
self.get_element("typicalYear"),
self.get_element("typicalMonth"),
Expand Down Expand Up @@ -763,11 +733,14 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:

:returns: iterator
"""
global _warnings
global _warnings_global
job_id = f"t-{threading.get_ident()}" # job ID based on thread
_warnings_global[job_id] = []
# ======================
# validate mapping files
# ======================
e = validate_mapping(mappings)
e = c2bt.validate_template(mappings)

if e is not SUCCESS:
raise ValueError("Invalid mappings")

Expand Down Expand Up @@ -826,7 +799,7 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:
if _delimiter not in [",", ";", "|", "\t"]:
msg = "Invalid delimiter specified in mapping template, reverting to comma ','" # noqa
LOGGER.warning(msg)
_warnings.append(msg)
_warnings_global[job_id].append(msg)
_delimiter = ","
else:
_delimiter = DELIMITER
Expand Down Expand Up @@ -894,7 +867,7 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:
if NULLIFY_INVALID:
msg = f"csv read error, non ASCII data detected ({val}), skipping row" # noqa
LOGGER.warning(msg) # noqa
_warnings.append(msg)
_warnings_global[job_id].append(msg)
LOGGER.debug(row)
continue
else:
Expand Down Expand Up @@ -942,7 +915,7 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:
"code": PASSED,
"message": "",
"errors": [],
"warnings": _warnings
"warnings": message.warnings + _warnings_global[job_id]
}
cksum = message.md5()
# now identifier based on WSI and observation date as identifier
Expand Down Expand Up @@ -977,7 +950,7 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:
"code": FAILED,
"message": "Error encoding row, BUFR set to None",
"errors": [f"{msg}\n\t\tData: {data_dict}"],
"warnings": _warnings
"warnings": message.warnings + _warnings_global[job_id]
}
result["_meta"] = {
"id": None,
Expand All @@ -1000,6 +973,6 @@ def transform(data: str, mappings: dict) -> Iterator[dict]:
# now yield result back to caller
yield result
# clear warnings
_warnings = []
del _warnings_global[job_id]

fh.close()
7 changes: 4 additions & 3 deletions csv2bufr/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ def mappings():
@click.command('list')
@click.pass_context
def list_mappings(ctx):
for mapping in os.listdir(MAPPINGS):
msg = f"{mapping} => {MAPPINGS}{os.sep}{mapping}"
click.echo(msg)
templates = c2bt.list_templates()
click.echo(json.dumps(templates))
for tmpl in templates.items():
click.echo(json.dumps(tmpl, indent=4))


@click.command('create')
Expand Down
Loading
Loading