Skip to content

Latest commit

 

History

History
266 lines (207 loc) · 9.18 KB

README.md

File metadata and controls

266 lines (207 loc) · 9.18 KB

ETL - Extract, Transform, Load for sensor data

Sources for all ETL of the Smart Emission Platform. Originally this ETL was developed for the Smart Emission Project Nijmegen and the Intemo Josene Sensor Device (2015-2017). As to accommodate other sensor devices like the EU JRC AirSensEUR, and LuftDaten.info kits, the ETL-framework has been generalized (2018/2019).

Uses deployment-specific variables for databases, passwords etc (not stored in GitHub).

All ETL is developed using Stetl. Stetl is a Python framework and programming model for any ETL process. The essence of Stetl is that each ETL process is a chain of linked Input, Filters and Output Python classes specified in a Stetl Config File.

The .sh files each invoke a Stetl ETL process via Docker using a Stetl config (.cfg) file specific for that ETL process. Stetl is run via Docker.

Additional Python files implement specific ETL modules not defined in the Stetl Framework and are available under the Python smartem package.

All ETL processes are invoked using the same SE Stetl Docker image. They can be scheduled via Kubernetes or cron.

The Docker Image is hosted as: smartemission/se-stetl at DockerHub.

The main ETL is multi-step as follows.

Step 1: Harvesters - Fetching raw sensor data

The SE ETL follows a "pull" model: raw sensor data is "harvested" from data collector servers and other sensor networks.

The following ETL configs/processes:

  • Harvester Whale: get all raw timeseries sensor-values from the Whale API for Intemo Jose sensor devices, see harvester_whale.cfg
  • Harvester Influx: get all raw timeseries sensor-values from an InfluxDB, initially for AirSensEUR (ASE) devices, see harvester_influx.cfg

As a result all raw sensor-data is stored in PostGIS using the schema db-schema-raw.sql. The Raw Data fetched via the Harvesters is further processed in Step 2 Refiner.

Step 2: Refiners

In this step all raw harvested timeseries data is "refined". Refinement involves the following:

  • validation: remove outliers (pre and post)
  • conversion: convert raw sensor values to standard units (e.g. temperature milliKelvin to degree Celsius)
  • calibration: calibrate raw sensor gas-values to standard units using ANN (e.g. resistance/Ohm to AQ ug/m3 concentration)
  • aggregation: make hourly average values for each sensor (''uurwaarden'')

See refiner.cfg and smartem/refiner. In particular the above steps are driven from the type of sensor device. The learning process for ANN calibration is implemented under smartem/calibrator.

As a result of this step, sensor-data timeseries (hour-values) are stored in PostGIS db-schema-refined.sql AND in InfluxDB.

Step 3: Publishers

In this step all refined/aggregated timeseries data is published to various IoT/SWE services. The following publishers are present:

  • SOSPublisher - publish to a remote SOS via SOS-T(ransactional) protocol sospublisher.cfg
  • STAPublisher - publish to a remote SensorThings API (STA) via REST stapublisher.cfg

All publication/output ETL uses plain Python string templates (no need for Jinja2 yet) with parameter substitution, e.g. smartem/publisher/sostemplates for SOS and smartem/publisher/statemplates for STA.

NB publication to WFS and WMS is not explicitly required: these services directly use the timeseries refined tables and Postgres VIEWs from Step 2.

Last Values

This step is special: it is a pass-through from the Raw Sensor API to a single table with (refined) last values for all sensors for the SOS emulation API (sosemu). This ETL process originated historically as no SOS and STA was initially available but the project needed to develop the SmartApp with last values.

  • Last: get and convert last sensor-values for all devices: last.cfg.

As a result this raw sensor-data is stored in PostGIS db-schema-last.sql.

Calibration

(Currently only for Intemo Josene devices)

In order to collect reference data and generate the ANN Calibration Estimator, three additional ETL processes have been added later in the project (dec 2016):

  • Extractor: to extract raw (Jose) Sensor Values from the Harvested (Step 1) RawDBInput into InfluxDB
  • Harvester_RIVM: to extract calibrated gas samples (hour averages) from RIVM LML SOS into InfluxDB

The above two datasets in InfluxDB are used to generate the ANN Calibration Estimator object by running the Calibrator ETL process:

  • Calibrator: to read/merge RIVM and Jose values from InfluxDB to create the ANN Estimator object (pickled)

Deployment

Deployment options per ETL process. This mainly involves setting the proper environment variables. The convention is to use stetl_ names for variable names in the config files. For example pg_database within last.cfg becomes stetl_pg_database within a K8s or other Docker deployment.

ETL process: (Harvester) Last (Values)

Config file: last.cfg. The following environment vars need to be set, either via docker-compose or Kubernetes.

Environment variable
stetl_raw_device_url_1
stetl_raw_device_url_2
stetl_intemo_token
stetl_pg_host
stetl_pg_database
stetl_pg_user
stetl_pg_password
stetl_pg_schema_rt
stetl_pg_schema_calibrated

ETL process: Harvester Whale

Config file: harvester_whale.cfg. The following environment vars need to be set, either via docker-compose or Kubernetes.

Environment variable
stetl_raw_device_url_1
stetl_raw_device_url_2
stetl_intemo_token
stetl_pg_host
stetl_pg_database
stetl_pg_user
stetl_pg_password
stetl_pg_schema_raw
stetl_max_proc_time_secs
stetl_api_interval_secs

ETL process: RIVM (SOS) Harvester

Config file: harvester_rivm.cfg. The following environment vars need to be set, either via docker-compose or Kubernetes.

Environment variable
stetl_rivm_sos_base_url
stetl_max_proc_time_secs
stetl_api_interval_secs
stetl_pg_host
stetl_pg_database
stetl_pg_user
stetl_pg_password
stetl_pg_schema_raw
stetl_pg_schema_harvest_rivm
stetl_influx_host
stetl_influx_port
stetl_influx_se_database
stetl_influx_se_measurement_rivm
stetl_influx_se_writer
stetl_influx_se_writer_password

ETL process: InfluxDB Harvester

Config file: harvester_influx.cfg. The following environment vars need to be set, either via docker-compose or Kubernetes.

Environment variable
stetl_influx_dc1_host
stetl_influx_port
stetl_influx_as_database
stetl_influx_as_reader
stetl_influx_as_reader_password
stetl_max_proc_time_secs
stetl_pg_host
stetl_pg_database
stetl_pg_user
stetl_pg_password
stetl_pg_schema_raw

ETL process: Refiner

Config file: refiner.cfg. The following environment vars need to be set, either via docker-compose or Kubernetes.

Environment variable
stetl_refiner_max_input_records
stetl_refiner_raw_read_once
stetl_pg_host
stetl_pg_database
stetl_pg_user
stetl_pg_password
stetl_pg_schema_raw
stetl_pg_schema_refined
stetl_pg_schema_calibrated
stetl_influx_host
stetl_influx_port
stetl_influx_se_database
stetl_influx_se_measurement_refined
stetl_influx_se_writer
stetl_influx_se_writer_password

ETL process: STA Publisher

Config file: stapublisher.cfg. The following environment vars need to be set, either via docker-compose or Kubernetes.

Environment variable
stetl_pg_host
stetl_pg_database
stetl_pg_user
stetl_pg_password
stetl_pg_schema_refined
stetl_stapublisher_max_input_records
stetl_sta_host
stetl_sta_port
stetl_sta_path
stetl_sta_user
stetl_sta_password

ETL process: SOS Publisher

Config file: sospublisher.cfg. The following environment vars need to be set, either via docker-compose or Kubernetes.

Environment variable
stetl_pg_host
stetl_pg_database
stetl_pg_user
stetl_pg_password
stetl_pg_schema_refined
stetl_sospublisher_max_input_records
stetl_sos_host
stetl_sos_port
stetl_sos_path

ETL process: Extractor

Config file: extractor.cfg. The following environment vars need to be set, either via docker-compose or Kubernetes.

Environment variable
stetl_extractor_max_input_records
stetl_extractor_raw_read_once
stetl_pg_host
stetl_pg_database
stetl_pg_user
stetl_pg_password
stetl_pg_schema_raw
stetl_pg_schema_refined
stetl_pg_schema_extracted
stetl_influx_host
stetl_influx_port
stetl_influx_se_database
stetl_influx_se_measurement_extract
stetl_influx_se_writer
stetl_influx_se_writer_password