Skip to content

Commit

Permalink
Removed battutaStream usage from eea-direct adapter (#1071)
Browse files Browse the repository at this point in the history
* removed battutaStream from eea-direct adapter

---------

Co-authored-by: Gabriel Fosse <[email protected]>
  • Loading branch information
majesticio and Gabriel Fosse authored Jan 29, 2024
1 parent 62b7c23 commit 04376bc
Showing 1 changed file with 56 additions and 95 deletions.
151 changes: 56 additions & 95 deletions src/adapters/eea-direct.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,9 @@ import { acceptableParameters } from '../lib/utils.js';
import log from '../lib/logger.js';
import got from 'got';
import { DateTime } from 'luxon';
import tzlookup from 'tz-lookup';
import sj from 'scramjet';
import { default as JSONStream } from 'JSONStream';
const { MultiStream, DataStream, StringStream } = sj;

const stationsLink =
'http://battuta.s3.amazonaws.com/eea-stations-all.json';

export const name = 'eea-direct';

export function fetchStream (source) {
Expand All @@ -18,13 +13,11 @@ export function fetchStream (source) {

log.debug(`Fetch stream called: ${source.name}`);

fetchMetadata(source)
.then((stations) => fetchPollutants(source, stations))
.then((stream) => stream.pipe(out))
.catch((error) => {
log.error(`Error fetching stream: ${error.message}`);
out.end();
});
const stream = fetchPollutants(source);
stream.pipe(out).catch((error) => {
log.error(`Error fetching stream: ${error.message}`);
out.end();
});

return out;
}
Expand All @@ -39,43 +32,19 @@ export async function fetchData (source, cb) {
}
}

let _battuta = null;
function getBattutaStream () {
if (!_battuta) {
const gotStream = got.stream(stationsLink);

_battuta = DataStream.pipeline(gotStream, JSONStream.parse('*'))
.catch((e) => {
gotStream.destroy();
e.stream.end();
log.debug(e);
throw e;
})
.keep(Infinity);
}

return _battuta.rewind();
}
async function fetchMetadata(source) {
return getBattutaStream()
.filter(({ stationId }) => stationId.startsWith(source.country))
.accumulate((acc, item) => (acc[item.stationId] = item), {});
}

function fetchPollutants(source, stations) {
function fetchPollutants (source) {
const pollutants = acceptableParameters.map((pollutant) =>
pollutant === 'pm25' ? 'PM2.5' : pollutant.toUpperCase()
);

const timeThreshold = source.datetime
? DateTime.fromISO(source.datetime)
: DateTime.utc().minus({ hours: source.offset || 2 });

return new MultiStream(
pollutants.map((pollutant) => {
const url =
source.url + source.country + '_' + pollutant + '.csv';
const offset = source.offset || 2;
const timeLastInsert = source.datetime
? source.datetime
: DateTime.utc().minus({ hours: offset });
let header;
const url = source.url + source.country + '_' + pollutant + '.csv';
let rowCount = 0;

return new StringStream()
.use((stream) => {
Expand All @@ -92,49 +61,48 @@ function fetchPollutants(source, stations) {
delimiter: ',',
skipEmptyLines: true,
})
.shift(1, (columns) => (header = columns[0]))
.filter((o) => o.length === header.length)
.map((o) =>
header.reduce((a, c, i) => {
a[c] = o[i];
return a;
}, {})
)
.filter((o) => {
const isoDate = o.value_datetime_inserted.replace(' ', 'T');
return (
DateTime.fromISO(isoDate, { setZone: true })
.toUTC()
.toMillis() > timeLastInsert.toMillis()
);
})

.filter((o) => o.value_validity == 1)
.filter((o) => o.value_numeric.trim() !== '')
.filter((o) => o.station_code in stations)
.map((record) => {
const matchedStation = stations[record.station_code];
const timeZone = tzlookup(
matchedStation.latitude,
matchedStation.longitude
);
.map(record => {
rowCount++;
if (rowCount === 1) return null; // Skip the first row (header)

const latitude = parseFloat(record[9]);
const longitude = parseFloat(record[8]);

if (isNaN(latitude) || isNaN(longitude)) {
log.error(`Invalid coordinate value for record: ${record}`);
return null;
}

const utcDate = record[16] && DateTime.fromSQL(record[16], { zone: 'utc' }).toISO({ suppressMilliseconds: true });
const localDate = record[15] && DateTime.fromSQL(record[15]).toISO({ suppressMilliseconds: true });

if (!utcDate || !localDate) {
log.error(`Invalid date value for record: ${record}`);
return null;
}

if (DateTime.fromISO(utcDate).toMillis() < timeThreshold.toMillis()) {
return null;
}
// fix units and convert values
if (record[23] === 'mg/m3' || record[23] === 'mg/m³') {
record[19] = parseFloat(record[19]) * 1000;
record[23] = 'µg/m³';
}
return {
location: record.station_code,
city: matchedStation.city
? matchedStation.city
: matchedStation.location
? matchedStation.location
: source.city,
location: record[1],
city: record[2],
coordinates: {
latitude: Number(matchedStation.latitude),
longitude: Number(matchedStation.longitude),
latitude,
longitude,
},
parameter: record[5].toLowerCase().replace('.', ''),
date: {
utc: utcDate,
local: localDate,
},
parameter: record.pollutant
.toLowerCase()
.replace('.', ''),
date: makeDate(record.value_datetime_end, timeZone),
value: Number(record.value_numeric),
unit: record.value_unit,
value: parseFloat(record[19]),
unit: record[23] === 'ug/m3' || record[23] === 'µg/m3' ? 'µg/m³' : record[23],
attribution: [
{
name: 'EEA',
Expand All @@ -146,17 +114,10 @@ function fetchPollutants(source, stations) {
value: 1,
},
};
});
})
.filter(record => record !== null)
})
).mux();
).mux().catch((error) => {
log.debug("Error in MultiStream:", error);
});
}

const makeDate = (date, timeZone) => {
date = DateTime.fromISO(date.replace(' ', 'T'), { setZone: true });
const localDate = date.setZone(timeZone);

return {
utc: date.toUTC().toISO({ suppressMilliseconds: true }),
local: localDate.toISO({ suppressMilliseconds: true }),
};
};

0 comments on commit 04376bc

Please sign in to comment.