diff --git a/config/nowcast.yaml b/config/nowcast.yaml index d3436fcc..6f94ee6c 100644 --- a/config/nowcast.yaml +++ b/config/nowcast.yaml @@ -245,7 +245,7 @@ temperature salinity: observations: - # Dataset that maps high resolution lon/lat grid on to NEMO j/i indices + # Dataset that maps high-resolution lon/lat grid on to NEMO j/i indices lon/lat to NEMO ji map: /SalishSeaCast/grid/grid_from_lat_lon_mask999.nc # ONC Strait of Georgia nodes real-time CTD data ctd data: @@ -259,12 +259,69 @@ observations: dest dir: /results/observations/ONC/CTD/ # Template for ONC CTD T&S data file path # **Must be quoted to project {} characters** - filepath template: '{station}/{station}_CTD_15m_{yyyymmdd}.nc' + filepath template: "{station}/{station}_CTD_15m_{yyyymmdd}.nc" # ONC Strait of Georgia ferry platforms real-time data ferry data: - ferries: {} - # see PR#109 for TWDP ferry example of this section and corresponding config tests + ferries: + # Tsawwassen - Duke Point route + TWDP: + route name: Tsawwassen - Duke Point + ONC station description: Mobile Platforms, British Columbia Ferries, Tsawwassen - Duke Point + location: + # ONC scalardata.getByStation API query parameters: + # + # ONC station identifiers to use for nav data, in priority order + stations: + - TWDP.N1 + - TWDP.N2 + device category: NAV + sensors: + - longitude + - latitude + terminals: + # Terminal names from salishsea_tools.places.PLACES + - Tsawwassen + - Duke Pt. + devices: + # ONC device category + TSG: + # device sensor names + sensors: + # ERDDAP sensor name: ONC sensor name + temperature: temperature + conductivity: conductivity + salinity: salinity + OXYSENSOR: + sensors: + o2_saturation: oxygen_saturation + o2_concentration_corrected: oxygen_corrected + o2_temperature: temperature + TURBCHLFL: + sensors: + cdom_fluorescence: cdom_fluorescence + chlorophyll: chlorophyll + turbidity: turbidity + CO2SENSOR: + sensors: + co2_partial_pressure: partial_pressure + co2_concentration_linearized: co2 + TEMPHUMID: + sensors: + air_temperature: air_temperature + relative_humidity: rel_humidity + BARPRESS: + sensors: + barometric_pressure: barometric_pressure + PYRANOMETER: + sensors: + solar_radiation: solar_radiation + PYRGEOMETER: + sensors: + longwave_radiation: downward_radiation + # Template for ONC ferry data file path + # **Must be quoted to project {} characters** + filepath template: "{ferry_platform}/{ferry_platform}_TSG_O2_TURBCHLFL_CO2_METEO_1m_{yyyymmdd}.nc" # Destination directory for ONC ferry data netCDF files dest dir: /results/observations/ONC/ferries/ @@ -274,7 +331,7 @@ observations: csv dir: /opp/observations/AISDATA/ # Destination directory for VFPA HADCP data netCDF files dest dir: /opp/observations/AISDATA/netcdf/ - # Template for VFPA HADCP data file path + # Template for the VFPA HADCP data file path # **Must be quoted to project {} characters** filepath template: 'VFPA_2ND_NARROWS_HADCP_2s_{yyyymm}.nc' @@ -429,7 +486,7 @@ erddap: # out of service since 22-Dec-2019; repair ETA unknown # - ubcONCUSDDLCTD15mV1 TWDP-ferry: - - ubcONCTWDP1mV1 + - ubcONCTWDP1mV18-01 nowcast-green: - ubcSSg3DBiologyFields1hV21-11 - ubcSSg3DLightFields1hV21-11 diff --git a/nowcast/workers/get_onc_ctd.py b/nowcast/workers/get_onc_ctd.py index ef91bc2b..fe5e141b 100644 --- a/nowcast/workers/get_onc_ctd.py +++ b/nowcast/workers/get_onc_ctd.py @@ -42,9 +42,7 @@ def main(): - """Set up and run the worker. - - For command-line usage see: + """For command-line usage see: :command:`python -m nowcast.workers.get_onc_ctd -h` """ @@ -61,6 +59,7 @@ def main(): help="UTC date to get ONC node CTD data for.", ) worker.run(get_onc_ctd, success, failure) + return worker def success(parsed_args): @@ -80,15 +79,15 @@ def failure(parsed_args): def get_onc_ctd(parsed_args, config, *args): ymd = parsed_args.data_date.format("YYYY-MM-DD") logger.info(f"requesting ONC {parsed_args.onc_station} CTD T&S data for {ymd}") - TOKEN = os.environ["ONC_USER_TOKEN"] onc_data = data_tools.get_onc_data( "scalardata", - "getByStation", - TOKEN, - station=parsed_args.onc_station, - deviceCategory="CTD", - sensors="salinity,temperature", + "getByLocation", + os.environ["ONC_USER_TOKEN"], + locationCode=parsed_args.onc_station, + deviceCategoryCode="CTD", + sensorCategoryCodes="salinity,temperature", dateFrom=data_tools.onc_datetime(f"{ymd} 00:00", "utc"), + dateTo=data_tools.onc_datetime(f"{ymd} 23:59", "utc"), ) try: ctd_data = data_tools.onc_json_to_dataset(onc_data) diff --git a/nowcast/workers/get_onc_ferry.py b/nowcast/workers/get_onc_ferry.py index 9c638392..0cca9996 100644 --- a/nowcast/workers/get_onc_ferry.py +++ b/nowcast/workers/get_onc_ferry.py @@ -15,9 +15,9 @@ """SalishSeaCast worker that downloads data for a specified UTC day from an ONC BC Ferries measurement platform. -The data are filtered to include only values for which qaqcFlag == 1 +The data are filtered to include only values for which qaqcFlag <= 1 or qaqcFlag >= 7 (meaning that all of ONC's automated QA/QC tests were passed). -After filtering the data are aggregated into 1 minute bins. +After filtering, the data are aggregated into 1-minute bins. The aggregation functions are mean, standard deviation, and sample count. The data are stored as a netCDF-4/HDF5 file that is accessible via @@ -138,6 +138,7 @@ def get_onc_ferry(parsed_args, config, *args): os.fspath(nc_filepath), encoding=encoding, unlimited_dims=("time",) ) checklist = {ferry_platform: os.fspath(nc_filepath)} + return checklist def _get_nav_data(ferry_platform, ymd, location_config): @@ -148,12 +149,15 @@ def _get_nav_data(ferry_platform, ymd, location_config): try: onc_data = data_tools.get_onc_data( "scalardata", - "getByStation", + "getByLocation", os.environ["ONC_USER_TOKEN"], - station=station, - deviceCategory=device_category, - sensors=sensors, + locationCode=station, + deviceCategoryCode=device_category, + sensorCategoryCodes=sensors, dateFrom=(data_tools.onc_datetime(f"{ymd} 00:00", "utc")), + dateTo=data_tools.onc_datetime(f"{ymd} 23:59", "utc"), + resampleType="avg", + resamplePeriod=1, ) except requests.HTTPError as e: msg = ( @@ -260,12 +264,15 @@ def _get_water_data(ferry_platform, device_category, ymd, devices_config): try: onc_data = data_tools.get_onc_data( "scalardata", - "getByStation", + "getByLocation", os.environ["ONC_USER_TOKEN"], - station=ferry_platform, - deviceCategory=device_category, - sensors=sensors, - dateFrom=(data_tools.onc_datetime(f"{ymd} 00:00", "utc")), + locationCode=ferry_platform, + deviceCategoryCode=device_category, + sensorCategoryCodes=sensors, + dateFrom=data_tools.onc_datetime(f"{ymd} 00:00", "utc"), + dateTo=data_tools.onc_datetime(f"{ymd} 23:59", "utc"), + resampleType="avg", + resamplePeriod=1, ) except requests.HTTPError as e: if e.response.status_code == 504: @@ -286,11 +293,13 @@ def _get_water_data(ferry_platform, device_category, ymd, devices_config): return device_data -def _empty_device_data(ferry_platform, device_category, ymd, sensors): +def _empty_device_data( + ferry_platform, device_category, ymd, sensors, time_coord="sampleTime" +): # Response from ONC contains no sensor data, so return an # empty DataArray logger.warning( - f"No ONC {ferry_platform} {device_category} data for {ymd}; " + f"No ONC {ferry_platform} {device_category} {sensors} data for {ymd}; " f"substituting empty dataset" ) onc_units = { @@ -305,7 +314,7 @@ def _empty_device_data(ferry_platform, device_category, ymd, sensors): "partial_pressure": "pCO2 uatm", "co2": "umol/mol", "air_temperature": "C", - "REL_HUMIDITY": "%", + "rel_humidity": "%", "barometric_pressure": "hPa", "solar_radiation": "W/m^2", "downward_radiation": "W/m^2", @@ -314,15 +323,21 @@ def _empty_device_data(ferry_platform, device_category, ymd, sensors): sensor: xarray.DataArray( name=sensor, data=numpy.array([], dtype=float), - coords={"sampleTime": numpy.array([], dtype="datetime64[ns]")}, - dims="sampleTime", + coords={time_coord: numpy.array([], dtype="datetime64[ns]")}, + dims=time_coord, attrs={ + "device_category": device_category, "qaqcFlag": numpy.array([], dtype=numpy.int64), "unitOfMeasure": onc_units[sensor], + "units": "degrees_Celcius" + if sensor in {"temperature", "air_temperature"} + else onc_units[sensor], }, ) for sensor in sensors.split(",") } + if len(data_arrays) == 1: + return data_arrays[sensors] return xarray.Dataset(data_arrays) @@ -332,26 +347,36 @@ def _qaqc_filter(ferry_platform, device, device_data, ymd, devices_config): for sensor, onc_sensor in devices_config[device]["sensors"].items(): logger.debug( f"filtering ONC {ferry_platform} {device} {onc_sensor} data " - f"for {ymd} to exlude qaqcFlag!=1" + f"for {ymd} to exclude 1= 7 ) - onc_data = getattr(device_data, onc_sensor) - not_nan_mask = numpy.logical_not(numpy.isnan(onc_data.values)) - sensor_qaqc_mask = onc_data.attrs["qaqcFlag"] <= 1 try: cf_units = cf_units_mapping[onc_data.unitOfMeasure] except KeyError: cf_units = onc_data.unitOfMeasure - sensor_data_arrays.append( - xarray.DataArray( + if not sensor_qaqc_mask.any(): + data_array = _empty_device_data( + ferry_platform, device, ymd, onc_sensor, time_coord="time" + ) + else: + data_array = xarray.DataArray( name=sensor, - data=onc_data[not_nan_mask][sensor_qaqc_mask].values, - coords={ - "time": onc_data.sampleTime[not_nan_mask][sensor_qaqc_mask].values - }, + data=onc_data[sensor_qaqc_mask].values, + coords={"time": onc_data.sampleTime[sensor_qaqc_mask].values}, dims="time", attrs={"device_category": device, "units": cf_units}, ) - ) + sensor_data_arrays.append(data_array) return sensor_data_arrays @@ -377,12 +402,12 @@ def count(values, axis): else: try: data_array = array.resample(time="1Min").mean() - except IndexError: + except (IndexError, ValueError): # array is empty, meaning there are no observations with - # qaqcFlag!=1, so substitute a DataArray full of NaNs + # qaqcFlag<=1 or qaqcFlac>=7, so substitute a DataArray full of NaNs logger.warning( f"ONC {ferry_platform} {array.device_category} " - f"{array.name} data for {ymd} contains no qaqcFlag==1 " + f"{array.name} data for {ymd} contains no qaqcFlag<=1 or qaqcFlac>=7 " f"values; substituting NaNs" ) nan_values = numpy.empty_like(data_vars["longitude"].values) @@ -408,7 +433,11 @@ def count(values, axis): sample_count_var = f"{var}_sample_count" sample_count_array = array.resample(time="1Min").count() sample_count_array.attrs = array.attrs - del sample_count_array.attrs["units"] + try: + del sample_count_array.attrs["units"] + except KeyError: + # empty data arrays lack units attributes + pass data_vars[sample_count_var] = _create_dataarray( sample_count_var, sample_count_array, ferry_platform, location_config ) @@ -418,7 +447,7 @@ def count(values, axis): coords={"time": data_arrays.longitude.time.values}, attrs={ "history": f"""{now} Download raw data from ONC scalardata API. -{now} Filter to exclude data with qaqcFlag != 1. +{now} Filter to exclude data with 1