diff --git a/act/io/__init__.py b/act/io/__init__.py index 4638ce4bac..4616848d58 100644 --- a/act/io/__init__.py +++ b/act/io/__init__.py @@ -7,7 +7,18 @@ __getattr__, __dir__, __all__ = lazy.attach( __name__, - submodules=['arm', 'text', 'icartt', 'mpl', 'neon', 'noaagml', 'noaapsl', 'pysp2', 'hysplit'], + submodules=[ + 'arm', + 'ameriflux', + 'text', + 'icartt', + 'mpl', + 'neon', + 'noaagml', + 'noaapsl', + 'pysp2', + 'hysplit', + ], submod_attrs={ 'arm': [ 'WriteDataset', @@ -17,6 +28,7 @@ 'check_if_tar_gz_file', 'read_arm_mmcr', ], + 'ameriflux': ['format_as_ameriflux'], 'text': ['read_csv'], 'icartt': ['read_icartt'], 'mpl': ['proc_sigma_mplv5_read', 'read_sigma_mplv5'], diff --git a/act/io/ameriflux.py b/act/io/ameriflux.py new file mode 100644 index 0000000000..0bfb63aa35 --- /dev/null +++ b/act/io/ameriflux.py @@ -0,0 +1,181 @@ +""" +This module contains I/O operations for the U.S. Department of Energy +AmeriFlux program (https://ameriflux.lbl.gov/). +""" + +import numpy as np +import pandas as pd +import warnings + + +def convert_to_ameriflux( + ds, + variable_mapping=None, + soil_mapping=None, + depth_profile=[2.5, 5, 10, 15, 20, 30, 35, 50, 75, 100], + include_missing_variables=False, + **kwargs, +): + """ + + Returns `xarray.Dataset` with stored data and metadata from a user-defined + query of ARM-standard netCDF files from a single datastream. Has some procedures + to ensure time is correctly fomatted in returned Dataset. + + Parameters + ---------- + ds : xarray.Dataset + Dataset of data to convert to AmeriFlux format + variable_mapping : dict + Dictionary of variables mappings. The key should be the name of the variable + in the Dataset with the values being dictionaries of the AmeriFlux name and units. + For example: + var_mapping = { + 'co2_flux': {'name': 'FC', 'units': 'umol/(m^2 s)'}, + } + soil_mapping : dict + Dictionary of soil variables mappings following the same formatting as variable_mapping. + It is understood that the AmeriFlux name may be the same for some variables. This + script attempts to automatically name these measurements. If a variable is not dimensioned + by a depth nor has a sensor_height attribute, it will automatically assume that it's + at the first depth in the depth_profile variable. + depth_profile : list + List of depths that the variables will be mapped to. If a depth is not in this list, + the index chosen will be the one closest to the depth value. + include_missing_variables : boolean + If there variables that are completely missing (-9999) chose whether or not to include + them in the DataFrame. + + Returns + ------- + df : pandas.DataFrame (or None) + Returns a pandas dataframe for easy writing to csv + + """ + # Use ARM variable mappings if none provided + if variable_mapping is None: + warnings.warn('Variable mapping was not provided, using default ARM mapping') + # Define variable mapping and units + # The key is the variable name in the data and the name in the dictionary + # is the AmeriFlux Name + var_mapping = { + 'co2_flux': {'name': 'FC', 'units': 'umol/(m^2 s)'}, + 'co2_molar_fraction': {'name': 'CO2', 'units': 'nmol/mol'}, + 'co2_mixing_ratio': {'name': 'CO2_MIXING_RATIO', 'units': 'umol/mol'}, + 'h2o_mole_fraction': {'name': 'H2O', 'units': 'mmol/mol'}, + 'h2o_mixing_ratio': {'name': 'H2O_MIXING_RATIO', 'units': 'mmol/mol'}, + 'ch4_mole_fraction': {'name': 'CH4', 'units': 'nmol/mol'}, + 'ch4_mixing_ratio': {'name': 'CH4_MIXING_RATIO', 'units': 'nmol/mol'}, + 'momentum_flux': {'name': 'TAU', 'units': 'kg/(m s^2)'}, + 'sensible_heat_flux': {'name': 'H', 'units': 'W/m^2'}, + 'latent_flux': {'name': 'LE', 'units': 'W/m^2'}, + 'air_temperature': {'name': 'TA', 'units': 'deg C'}, + 'air_pressure': {'name': 'PA', 'units': 'kPa'}, + 'relative_humidity': {'name': 'RH', 'units': '%'}, + 'sonic_temperature': {'name': 'T_SONIC', 'units': 'deg C'}, + 'water_vapor_pressure_defecit': {'name': 'VPD', 'units': 'hPa'}, + 'Monin_Obukhov_length': {'name': 'MO_LENGTH', 'units': 'm'}, + 'Monin_Obukhov_stability_parameter': {'name': 'ZL', 'units': ''}, + 'mean_wind': {'name': 'WS', 'units': 'm/s'}, + 'wind_direction_from_north': {'name': 'WD', 'units': 'deg'}, + 'friction_velocity': {'name': 'USTAR', 'units': 'm/s'}, + 'maximum_instantaneous_wind_speed': {'name': 'WS_MAX', 'units': 'm/s'}, + 'down_short_hemisp': {'name': 'SW_IN', 'units': 'W/m^2'}, + 'up_short_hemisp': {'name': 'SW_OUT', 'units': 'W/m^2'}, + 'down_long': {'name': 'LW_IN', 'units': 'W/m^2'}, + 'up_long': {'name': 'LW_OUT', 'units': 'W/m^2'}, + 'albedo': {'name': 'ALB', 'units': '%'}, + 'net_radiation': {'name': 'NETRAD', 'units': 'W/m^2'}, + 'par_inc': {'name': 'PPFD_IN', 'units': 'umol/(m^2 s)'}, + 'par_ref': {'name': 'PPFD_OUT', 'units': 'umol/(m^2 s)'}, + 'precip': {'name': 'P', 'units': 'mm'}, + } + + # Use ARM variable mappings if none provided + # Similar to the above. This has only been tested on the ARM + # ECOR, SEBS, STAMP, and AMC combined. The automated naming may + # not work for all cases + if soil_mapping is None: + warnings.warn('Soil variable mapping was not provided, using default ARM mapping') + soil_mapping = { + 'surface_soil_heat_flux': {'name': 'G', 'units': 'W/m^2'}, + 'soil_temp': {'name': 'TS', 'units': 'deg C'}, + 'temp': {'name': 'TS', 'units': 'deg C'}, + 'soil_moisture': {'name': 'SWC', 'units': '%'}, + 'soil_specific_water_content': {'name': 'SWC', 'units': '%'}, + 'vwc': {'name': 'SWC', 'units': '%'}, + } + + # Loop through variables and update units to the AmeriFlux standard + for v in ds: + if v in var_mapping: + ds = ds.utils.change_units(variables=v, desired_unit=var_mapping[v]['units']) + + # Get start/end time stamps + ts_start = ds['time'].dt.strftime('%Y%m%d%H%M').values + ts_end = [ + pd.to_datetime(t + np.timedelta64(30, 'm')).strftime('%Y%m%d%H%M') + for t in ds['time'].values + ] + data = {} + data['TIMESTAMP_START'] = ts_start + data['TIMESTAMP_END'] = ts_end + + # Loop through the variables in the var mapping dictionary and add data to dictionary + for v in var_mapping: + if v in ds: + if 'missing_value' not in ds[v].attrs: + ds[v].attrs['missing_value'] = -9999 + if np.all(ds[v].isnull()): + if include_missing_variables: + data[var_mapping[v]['name']] = ds[v].values + else: + data[var_mapping[v]['name']] = ds[v].values + else: + if include_missing_variables: + data[var_mapping[v]['name']] = np.full(ds['time'].shape, -9999) + + # Automated naming for the soil variables + # Again, this may not work for other cases. Careful review is needed. + prev_var = '' + for var in soil_mapping: + if soil_mapping[var]['name'] != prev_var: + h = 1 + r = 1 + prev_var = soil_mapping[var]['name'] + soil_vars = [ + v2 + for v2 in list(ds) + if (v2.startswith(var)) & ('std' not in v2) & ('qc' not in v2) & ('net' not in v2) + ] + for i, svar in enumerate(soil_vars): + vert = 1 + if ('avg' in svar) | ('average' in svar): + continue + soil_data = ds[svar].values + data_shape = soil_data.shape + if len(data_shape) > 1: + coords = ds[svar].coords + depth_name = list(coords)[-1] + depth_values = ds[depth_name].values + for depth_ind in range(len(depth_values)): + soil_data_depth = soil_data[:, depth_ind] + vert = np.where(depth_profile == depth_values[depth_ind])[0][0] + 1 + new_name = '_'.join([soil_mapping[var]['name'], str(h), str(vert), str(r)]) + data[new_name] = soil_data_depth + else: + if 'sensor_height' in ds[svar].attrs: + sensor_ht = ds[svar].attrs['sensor_height'].split(' ') + depth = abs(float(sensor_ht[0])) + units = sensor_ht[1] + if units == 'cm': + vert = np.argmin(np.abs(np.array(depth_profile) - depth)) + 1 + new_name = '_'.join([soil_mapping[var]['name'], str(h), str(vert), str(r)]) + data[new_name] = soil_data + h += 1 + + # Convert dictionary to dataframe and return + df = pd.DataFrame(data) + df = df.fillna(-9999.0) + + return df diff --git a/act/tests/sample_files.py b/act/tests/sample_files.py index fdc4a0f732..11991023f5 100644 --- a/act/tests/sample_files.py +++ b/act/tests/sample_files.py @@ -60,6 +60,11 @@ EXAMPLE_EBBR3 = DATASETS.fetch('sgp30ebbrE13.b1.20190601.000000.nc') EXAMPLE_ECOR = DATASETS.fetch('sgp30ecorE14.b1.20190601.000000.cdf') EXAMPLE_SEBS = DATASETS.fetch('sgpsebsE14.b1.20190601.000000.cdf') +EXAMPLE_SEBS_E39 = DATASETS.fetch('sgpsebsE39.b1.20230601.000000.cdf') +EXAMPLE_ECORSF_E39 = DATASETS.fetch('sgpecorsfE39.b1.20230601.000000.nc') +EXAMPLE_STAMP_E39 = DATASETS.fetch('sgpstampE39.b1.20230601.000000.nc') +EXAMPLE_STAMPPCP_E39 = DATASETS.fetch('sgpstamppcpE39.b1.20230601.000000.nc') +EXAMPLE_AMC_E39 = DATASETS.fetch('sgpamcE39.b1.20230601.000000.nc') EXAMPLE_MFAS_SODAR = DATASETS.fetch('sodar.20230404.mnd') EXAMPLE_ENA_MET = DATASETS.fetch('enametC1.b1.20221109.000000.cdf') EXAMPLE_CCN = DATASETS.fetch('sgpaosccn2colaE13.b1.20170903.000000.nc') diff --git a/examples/io/plot_convert_ameriflux.py b/examples/io/plot_convert_ameriflux.py new file mode 100644 index 0000000000..4517d6400d --- /dev/null +++ b/examples/io/plot_convert_ameriflux.py @@ -0,0 +1,123 @@ +""" +Convert Data to AmeriFlux Format +-------------------------------- + +This script shows how to convert ARM data to AmeriFlux format +using an ACT function, and write it out to csv. More information +on AmeriFlux and their file formats and naming conventions can be +found here: https://ameriflux.lbl.gov/ + +Author: Adam Theisen + +""" + +import act +import glob +import xarray as xr +import os +import matplotlib.pyplot as plt + +# Read in the ECOR data +files = glob.glob(act.tests.sample_files.EXAMPLE_ECORSF_E39) +ds_ecor = act.io.arm.read_arm_netcdf(files) + +# The ECOR time stamp as at the end of the Averaging period so adjusting +# it to be consistent with the other systems +ds_ecor = act.utils.datetime_utils.adjust_timestamp(ds_ecor) + +# Clean up and QC the data based on embedded QC and ARM DQRs +ds_ecor.clean.cleanup() +ds_ecor = act.qc.arm.add_dqr_to_qc(ds_ecor) +ds_ecor.qcfilter.datafilter( + del_qc_var=False, rm_assessments=['Bad', 'Incorrect', 'Indeterminate', 'Suspect'] +) + +# Then we do this same thing for the other instruments +# SEBS +files = glob.glob(act.tests.sample_files.EXAMPLE_SEBS_E39) +ds_sebs = act.io.arm.read_arm_netcdf(files) +# SEBS does not have a time_bounds variable so we have to manually adjust it +ds_sebs = act.utils.datetime_utils.adjust_timestamp(ds_sebs, offset=-30 * 60) +ds_sebs.clean.cleanup() +ds_sebs = act.qc.arm.add_dqr_to_qc(ds_sebs) +ds_sebs.qcfilter.datafilter( + del_qc_var=False, rm_assessments=['Bad', 'Incorrect', 'Indeterminate', 'Suspect'] +) + +# STAMP +files = glob.glob(act.tests.sample_files.EXAMPLE_STAMP_E39) +ds_stamp = act.io.arm.read_arm_netcdf(files) +ds_stamp.clean.cleanup() +ds_stamp = act.qc.arm.add_dqr_to_qc(ds_stamp) +ds_stamp.qcfilter.datafilter( + del_qc_var=False, rm_assessments=['Bad', 'Incorrect', 'Indeterminate', 'Suspect'] +) + +# STAMP Precipitation +files = glob.glob(act.tests.sample_files.EXAMPLE_STAMPPCP_E39) +ds_stamppcp = act.io.arm.read_arm_netcdf(files) +ds_stamppcp.clean.cleanup() +ds_stamppcp = act.qc.arm.add_dqr_to_qc(ds_stamppcp) +ds_stamppcp.qcfilter.datafilter( + del_qc_var=False, rm_assessments=['Bad', 'Incorrect', 'Indeterminate', 'Suspect'] +) +# These are minute data so we need to resample and sum up to 30 minutes +ds_stamppcp = ds_stamppcp['precip'].resample(time='30Min').sum() + +# AMC +files = glob.glob(act.tests.sample_files.EXAMPLE_AMC_E39) +ds_amc = act.io.arm.read_arm_netcdf(files) +ds_amc.clean.cleanup() +ds_amc = act.qc.arm.add_dqr_to_qc(ds_amc) +ds_amc.qcfilter.datafilter( + del_qc_var=False, rm_assessments=['Bad', 'Incorrect', 'Indeterminate', 'Suspect'] +) + +# Merge these datasets together +ds = xr.merge([ds_ecor, ds_sebs, ds_stamp, ds_stamppcp, ds_amc], compat='override') + +# Convert the data to AmeriFlux format and get a DataFrame in return +# Note, this does not return an xarray Dataset as it's assumed the data +# will just be written out to csv format. +df = act.io.ameriflux.convert_to_ameriflux(ds) + +# Write the data out to file +site = 'US-A14' +directory = './' + site + 'mergedflux/' +if not os.path.exists(directory): + os.makedirs(directory) + +# Following the AmeriFlux file naming convention +filename = ( + site + + '_HH_' + + str(df['TIMESTAMP_START'].iloc[0]) + + '_' + + str(df['TIMESTAMP_END'].iloc[-1]) + + '.csv' +) +df.to_csv(directory + filename, index=False) + + +# Plot up merged data for visualization +display = act.plotting.TimeSeriesDisplay(ds, subplot_shape=(4,), figsize=(12, 10)) +display.plot('latent_flux', subplot_index=(0,)) +display.plot('co2_flux', subplot_index=(0,)) +display.plot('sensible_heat_flux', subplot_index=(0,)) +display.day_night_background(subplot_index=(0,)) + +display.plot('precip', subplot_index=(1,)) +display.day_night_background(subplot_index=(1,)) + +display.plot('surface_soil_heat_flux_1', subplot_index=(2,)) +display.plot('surface_soil_heat_flux_2', subplot_index=(2,)) +display.plot('surface_soil_heat_flux_3', subplot_index=(2,)) +display.day_night_background(subplot_index=(2,)) + +display.plot('soil_specific_water_content_west', subplot_index=(3,)) +display.axes[3].set_ylim(display.axes[3].get_ylim()[::-1]) + +display.day_night_background(subplot_index=(3,)) + +plt.subplots_adjust(hspace=0.35) +plt.show() diff --git a/tests/io/test_ameriflux.py b/tests/io/test_ameriflux.py new file mode 100644 index 0000000000..395983de39 --- /dev/null +++ b/tests/io/test_ameriflux.py @@ -0,0 +1,33 @@ +import act +import glob +import xarray as xr + + +def test_convert_to_ameriflux(): + files = glob.glob(act.tests.sample_files.EXAMPLE_ECORSF_E39) + ds_ecor = act.io.arm.read_arm_netcdf(files) + + df = act.io.ameriflux.convert_to_ameriflux(ds_ecor) + + assert 'FC' in df + assert 'WS_MAX' in df + + files = glob.glob(act.tests.sample_files.EXAMPLE_SEBS_E39) + ds_sebs = act.io.arm.read_arm_netcdf(files) + + ds = xr.merge([ds_ecor, ds_sebs]) + df = act.io.ameriflux.convert_to_ameriflux(ds) + + assert 'SWC_2_1_1' in df + assert 'TS_3_1_1' in df + assert 'G_2_1_1' in df + + files = glob.glob(act.tests.sample_files.EXAMPLE_STAMP_E39) + ds_stamp = act.io.arm.read_arm_netcdf(files) + + ds = xr.merge([ds_ecor, ds_sebs, ds_stamp], compat='override') + df = act.io.ameriflux.convert_to_ameriflux(ds) + + assert 'SWC_6_10_1' in df + assert 'G_2_1_1' in df + assert 'TS_5_2_1' in df