Skip to content

Commit

Permalink
New API workflow (#616)
Browse files Browse the repository at this point in the history
* move transform into api, publish data from message

* as_string and _meta

* fix as_string for bytes

* as_string base64_encode=True

* minor updates plus function execute_api_process using async

* access jobs-url from within docker-stack by replacing url !

* fix job-location url

* fix conversion via API

* update template name for campbell csv2bufr

* back to wis2box-api:latest

* remove data-mappings.yml

* corrections following Tom's review

* add warnings to Grafana dashboard

* fix MessageData-plugin

* add test counter

* reduce count congo in test

* update -another- count in test

* add requested changes

* add doc-string for as_bytes and as_string
  • Loading branch information
maaikelimper authored Mar 13, 2024
1 parent 9c47bb3 commit b161192
Show file tree
Hide file tree
Showing 15 changed files with 466 additions and 486 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ services:

wis2box-api:
container_name: wis2box-api
image: ghcr.io/wmo-im/wis2box-api:1.0b7
image: ghcr.io/wmo-im/wis2box-api:latest
restart: always
env_file:
- wis2box.env
Expand Down
35 changes: 34 additions & 1 deletion grafana/dashboards/home.json
Original file line number Diff line number Diff line change
Expand Up @@ -588,9 +588,42 @@
"legendFormat": "",
"queryType": "range",
"refId": "B"
},
{
"datasource": {
"type": "loki",
"uid": "P55348B596EBB51C3"
},
"editorMode": "builder",
"expr": "{compose_service=\"wis2box-management\"} |= `WARNING`",
"hide": false,
"queryType": "range",
"refId": "A"
},
{
"datasource": {
"type": "loki",
"uid": "P55348B596EBB51C3"
},
"editorMode": "builder",
"expr": "{compose_service=\"wis2box-api\"} |= `ERROR`",
"hide": false,
"queryType": "range",
"refId": "C"
},
{
"datasource": {
"type": "loki",
"uid": "P55348B596EBB51C3"
},
"editorMode": "builder",
"expr": "{compose_service=\"wis2box-api\"} |= `WARNING` != `int() argument must be a string`",
"hide": false,
"queryType": "range",
"refId": "D"
}
],
"title": "wis2box ERRORs",
"title": "wis2box ERRORs and WARNINGs",
"type": "logs"
}
],
Expand Down
65 changes: 0 additions & 65 deletions tests/data/data-mappings.yml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ wis2box:
plugins:
csv:
- plugin: wis2box.data.csv2bufr.ObservationDataCSV2BUFR
template: synop_bufr
template: CampbellAfrica-v1-template
notify: true
file-pattern: '^WIGOS_(\d-\d+-\d+-\w+)_.*\.csv$'
bufr4:
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ def test_message_api():
'mw_met_centre': 25,
'roma_met_centre': 33,
'alger_met_centre': 29,
'rnimh': 116,
'brazza_met_centre': 15,
'rnimh': 111,
'brazza_met_centre': 14,
'wmo-test': 151
}
for key, value in counts.items():
Expand Down Expand Up @@ -291,7 +291,7 @@ def test_message_api():
assert not props['data_id'].startswith('wis2')
assert not props['data_id'].startswith('origin/a/wis2')
assert props['data_id'].startswith('cd')
assert props['content']['size'] == 257
assert props['content']['size'] == 253
assert props['content']['encoding'] == 'base64'
assert props['content']['value'] is not None

Expand Down
12 changes: 4 additions & 8 deletions wis2box-management/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
#
###############################################################################

FROM ghcr.io/wmo-im/dim_eccodes_baseimage:latest
FROM ubuntu:focal

LABEL maintainer="[email protected]"
LABEL maintainer="[email protected]; [email protected]"

ARG WIS2BOX_PIP3_EXTRA_PACKAGES
ENV TZ="Etc/UTC" \
DEBIAN_FRONTEND="noninteractive" \
DEBIAN_PACKAGES="cron bash vim curl git libffi-dev python3-cryptography libssl-dev libudunits2-0 python3-paho-mqtt python3-dateparser python3-tz python3-setuptools unzip"
DEBIAN_PACKAGES="cron bash vim curl git libffi-dev python3-cryptography libssl-dev libudunits2-0 python3 python3-pip curl python3-paho-mqtt python3-dateparser python3-tz python3-setuptools unzip"

RUN if [ "$WIS2BOX_PIP3_EXTRA_PACKAGES" = "None" ]; \
then export WIS2BOX_PIP3_EXTRA_PACKAGES=echo; \
Expand All @@ -39,11 +39,7 @@ RUN if [ "$WIS2BOX_PIP3_EXTRA_PACKAGES" = "None" ]; \
RUN apt-get update -y && apt-get install -y ${DEBIAN_PACKAGES} \
# install wis2box data pipeline dependencies
&& pip3 install --no-cache-dir \
https://github.com/wmo-im/csv2bufr/archive/refs/tags/v0.7.4.zip \
https://github.com/wmo-im/bufr2geojson/archive/refs/tags/v0.5.1.zip \
https://github.com/wmo-im/pymetdecoder/archive/refs/tags/v0.1.10.zip \
https://github.com/wmo-cop/pyoscar/archive/master.zip \
https://github.com/wmo-im/synop2bufr/archive/refs/tags/v0.6.2.zip \
https://github.com/wmo-cop/pyoscar/archive/refs/tags/0.6.4.zip \
https://github.com/geopython/pygeometa/archive/master.zip \
https://github.com/wmo-im/pywis-topics/archive/refs/tags/0.2.0.zip \
# install shapely
Expand Down
56 changes: 54 additions & 2 deletions wis2box-management/wis2box/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@

import click
import logging
import requests

from time import sleep

from wis2box import cli_helpers
from wis2box.api.backend import load_backend
from wis2box.api.config import load_config
from wis2box.env import (BROKER_HOST, BROKER_USERNAME, BROKER_PASSWORD,
BROKER_PORT)
BROKER_PORT, DOCKER_API_URL, API_URL)
from wis2box.plugin import load_plugin, PLUGINS


LOGGER = logging.getLogger(__name__)


Expand All @@ -44,6 +46,56 @@ def refresh_data_mappings():
local_broker.pub('wis2box/data_mappings/refresh', '{}', qos=0)


def execute_api_process(process_name: str, payload: dict) -> dict:
"""
Executes a process on the API
:param process_name: process name
:param payload: payload to send to process
:returns: `dict` with execution-result
"""

LOGGER.debug('Posting data to wis2box-api')
headers = {
'accept': 'application/json',
'Content-Type': 'application/json',
'prefer': 'respond-async'
}
url = f'{DOCKER_API_URL}/processes/{process_name}/execution'

response = requests.post(url, headers=headers, json=payload)
if response.status_code >= 400:
msg = f'Failed to post data to wis2box-api: {response.status_code}' # noqa
if response.text:
msg += f'\nError message: {response.text}'
LOGGER.error(msg)
raise ValueError(msg)

if response.status_code == 200:
return response.json()

headers_json = dict(response.headers)
location = headers_json['Location']
location = location.replace(API_URL, DOCKER_API_URL)

status = 'accepted'
while status in ['accepted', 'running']:
# get the job status
headers = {
'accept': 'application/json',
'Content-Type': 'application/json'
}
response = requests.get(location, headers=headers)
response_json = response.json()
if 'status' in response_json:
status = response_json['status']
sleep(0.1)
# get result from location/results?f=json
response = requests.get(f'{location}/results?f=json', headers=headers) # noqa
return response.json()


def setup_collection(meta: dict = {}) -> bool:
"""
Add collection to api backend and configuration
Expand Down
51 changes: 43 additions & 8 deletions wis2box-management/wis2box/data/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
# under the License.
#
###############################################################################

import base64
import json
import logging
from pathlib import Path
Expand Down Expand Up @@ -54,10 +54,10 @@ def __init__(self, defs: dict) -> None:
self.filename = None
self.incoming_filepath = None
self.topic_hierarchy = TopicHierarchy(defs['topic_hierarchy'])
self.template = defs['template']
self.file_filter = defs['pattern']
self.enable_notification = defs['notify']
self.buckets = defs['buckets']
self.template = defs.get('template', None)
self.file_filter = defs.get('pattern', '.*')
self.enable_notification = defs.get('notify', False)
self.buckets = defs.get('buckets', ())
self.output_data = {}
self.discovery_metadata = {}
# if discovery_metadata:
Expand All @@ -73,7 +73,7 @@ def publish_failure_message(self, description, wsi=None):
# load plugin for local broker
defs = {
'codepath': PLUGINS['pubsub']['mqtt']['plugin'],
'url': f"mqtt://{BROKER_USERNAME}:{BROKER_PASSWORD}@{BROKER_HOST}:{BROKER_PORT}", # noqa
'url': f'mqtt://{BROKER_USERNAME}:{BROKER_PASSWORD}@{BROKER_HOST}:{BROKER_PORT}', # noqa
'client_type': 'failure-publisher'
}
local_broker = load_plugin('pubsub', defs)
Expand Down Expand Up @@ -169,7 +169,7 @@ def notify(self, identifier: str, storage_path: str,
# load plugin for local broker
defs_local = {
'codepath': PLUGINS['pubsub']['mqtt']['plugin'],
'url': f"mqtt://{BROKER_USERNAME}:{BROKER_PASSWORD}@{BROKER_HOST}:{BROKER_PORT}", # noqa
'url': f'mqtt://{BROKER_USERNAME}:{BROKER_PASSWORD}@{BROKER_HOST}:{BROKER_PORT}', # noqa
'client_type': 'notify-publisher'
}
local_broker = load_plugin('pubsub', defs_local)
Expand Down Expand Up @@ -312,7 +312,13 @@ def get_public_filepath(self):

@staticmethod
def as_bytes(input_data):
"""Get data as bytes"""
"""Return input data as bytes
:param input_data: `str`, `bytes` or `Path` of data
:returns: `bytes` of data
"""

LOGGER.debug(f'input data is type: {type(input_data)}')
if isinstance(input_data, bytes):
return input_data
Expand All @@ -325,5 +331,34 @@ def as_bytes(input_data):
LOGGER.warning('Invalid data type')
return None

@staticmethod
def as_string(input_data, base64_encode=False):
"""Return input data as string
:param input_data: `str`, `bytes` or `Path` of data
:param base64_encode: `bool` if to use base64-encode before decoding
:returns: `str` of data
"""

LOGGER.debug(f'input data is type: {type(input_data)}')
if isinstance(input_data, bytes):
if base64_encode:
return base64.b64encode(input_data).decode('utf-8')
else:
return input_data.decode('utf-8')
elif isinstance(input_data, str):
return input_data
elif isinstance(input_data, Path):
if base64_encode:
with input_data.open('rb') as fh:
return base64.b64encode(fh.read()).decode('utf-8')
else:
with input_data.open('r') as fh:
return fh.read()
else:
LOGGER.warning('Invalid data type')
return None

def __repr__(self):
return '<BaseAbstractData>'
Loading

0 comments on commit b161192

Please sign in to comment.