From 04376bc720da2378907a7c95eac7350689ed028a Mon Sep 17 00:00:00 2001 From: Gabriel Fosse <67290377+majesticio@users.noreply.github.com> Date: Mon, 29 Jan 2024 12:22:23 -0800 Subject: [PATCH] Removed battutaStream usage from eea-direct adapter (#1071) * removed battutaStream from eea-direct adapter --------- Co-authored-by: Gabriel Fosse --- src/adapters/eea-direct.js | 151 ++++++++++++++----------------------- 1 file changed, 56 insertions(+), 95 deletions(-) diff --git a/src/adapters/eea-direct.js b/src/adapters/eea-direct.js index 954951d5..62b46481 100644 --- a/src/adapters/eea-direct.js +++ b/src/adapters/eea-direct.js @@ -2,14 +2,9 @@ import { acceptableParameters } from '../lib/utils.js'; import log from '../lib/logger.js'; import got from 'got'; import { DateTime } from 'luxon'; -import tzlookup from 'tz-lookup'; import sj from 'scramjet'; -import { default as JSONStream } from 'JSONStream'; const { MultiStream, DataStream, StringStream } = sj; -const stationsLink = - 'http://battuta.s3.amazonaws.com/eea-stations-all.json'; - export const name = 'eea-direct'; export function fetchStream (source) { @@ -18,13 +13,11 @@ export function fetchStream (source) { log.debug(`Fetch stream called: ${source.name}`); - fetchMetadata(source) - .then((stations) => fetchPollutants(source, stations)) - .then((stream) => stream.pipe(out)) - .catch((error) => { - log.error(`Error fetching stream: ${error.message}`); - out.end(); - }); + const stream = fetchPollutants(source); + stream.pipe(out).catch((error) => { + log.error(`Error fetching stream: ${error.message}`); + out.end(); + }); return out; } @@ -39,43 +32,19 @@ export async function fetchData (source, cb) { } } -let _battuta = null; -function getBattutaStream () { - if (!_battuta) { - const gotStream = got.stream(stationsLink); - - _battuta = DataStream.pipeline(gotStream, JSONStream.parse('*')) - .catch((e) => { - gotStream.destroy(); - e.stream.end(); - log.debug(e); - throw e; - }) - .keep(Infinity); - } - - return _battuta.rewind(); -} -async function fetchMetadata(source) { - return getBattutaStream() - .filter(({ stationId }) => stationId.startsWith(source.country)) - .accumulate((acc, item) => (acc[item.stationId] = item), {}); -} - -function fetchPollutants(source, stations) { +function fetchPollutants (source) { const pollutants = acceptableParameters.map((pollutant) => pollutant === 'pm25' ? 'PM2.5' : pollutant.toUpperCase() ); + const timeThreshold = source.datetime + ? DateTime.fromISO(source.datetime) + : DateTime.utc().minus({ hours: source.offset || 2 }); + return new MultiStream( pollutants.map((pollutant) => { - const url = - source.url + source.country + '_' + pollutant + '.csv'; - const offset = source.offset || 2; - const timeLastInsert = source.datetime - ? source.datetime - : DateTime.utc().minus({ hours: offset }); - let header; + const url = source.url + source.country + '_' + pollutant + '.csv'; + let rowCount = 0; return new StringStream() .use((stream) => { @@ -92,49 +61,48 @@ function fetchPollutants(source, stations) { delimiter: ',', skipEmptyLines: true, }) - .shift(1, (columns) => (header = columns[0])) - .filter((o) => o.length === header.length) - .map((o) => - header.reduce((a, c, i) => { - a[c] = o[i]; - return a; - }, {}) - ) - .filter((o) => { - const isoDate = o.value_datetime_inserted.replace(' ', 'T'); - return ( - DateTime.fromISO(isoDate, { setZone: true }) - .toUTC() - .toMillis() > timeLastInsert.toMillis() - ); - }) - - .filter((o) => o.value_validity == 1) - .filter((o) => o.value_numeric.trim() !== '') - .filter((o) => o.station_code in stations) - .map((record) => { - const matchedStation = stations[record.station_code]; - const timeZone = tzlookup( - matchedStation.latitude, - matchedStation.longitude - ); + .map(record => { + rowCount++; + if (rowCount === 1) return null; // Skip the first row (header) + + const latitude = parseFloat(record[9]); + const longitude = parseFloat(record[8]); + + if (isNaN(latitude) || isNaN(longitude)) { + log.error(`Invalid coordinate value for record: ${record}`); + return null; + } + + const utcDate = record[16] && DateTime.fromSQL(record[16], { zone: 'utc' }).toISO({ suppressMilliseconds: true }); + const localDate = record[15] && DateTime.fromSQL(record[15]).toISO({ suppressMilliseconds: true }); + + if (!utcDate || !localDate) { + log.error(`Invalid date value for record: ${record}`); + return null; + } + + if (DateTime.fromISO(utcDate).toMillis() < timeThreshold.toMillis()) { + return null; + } + // fix units and convert values + if (record[23] === 'mg/m3' || record[23] === 'mg/m³') { + record[19] = parseFloat(record[19]) * 1000; + record[23] = 'µg/m³'; + } return { - location: record.station_code, - city: matchedStation.city - ? matchedStation.city - : matchedStation.location - ? matchedStation.location - : source.city, + location: record[1], + city: record[2], coordinates: { - latitude: Number(matchedStation.latitude), - longitude: Number(matchedStation.longitude), + latitude, + longitude, + }, + parameter: record[5].toLowerCase().replace('.', ''), + date: { + utc: utcDate, + local: localDate, }, - parameter: record.pollutant - .toLowerCase() - .replace('.', ''), - date: makeDate(record.value_datetime_end, timeZone), - value: Number(record.value_numeric), - unit: record.value_unit, + value: parseFloat(record[19]), + unit: record[23] === 'ug/m3' || record[23] === 'µg/m3' ? 'µg/m³' : record[23], attribution: [ { name: 'EEA', @@ -146,17 +114,10 @@ function fetchPollutants(source, stations) { value: 1, }, }; - }); + }) + .filter(record => record !== null) }) - ).mux(); + ).mux().catch((error) => { + log.debug("Error in MultiStream:", error); + }); } - -const makeDate = (date, timeZone) => { - date = DateTime.fromISO(date.replace(' ', 'T'), { setZone: true }); - const localDate = date.setZone(timeZone); - - return { - utc: date.toUTC().toISO({ suppressMilliseconds: true }), - local: localDate.toISO({ suppressMilliseconds: true }), - }; -};