Skip to content

Commit

Permalink
ADD: Adding code to convert data to AmeriFlux variable naming convent…
Browse files Browse the repository at this point in the history
…ions. (ARM-DOE#830)
  • Loading branch information
AdamTheisen authored May 14, 2024
1 parent 7f282c0 commit 4275696
Show file tree
Hide file tree
Showing 5 changed files with 355 additions and 1 deletion.
14 changes: 13 additions & 1 deletion act/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,18 @@

__getattr__, __dir__, __all__ = lazy.attach(
__name__,
submodules=['arm', 'text', 'icartt', 'mpl', 'neon', 'noaagml', 'noaapsl', 'pysp2', 'hysplit'],
submodules=[
'arm',
'ameriflux',
'text',
'icartt',
'mpl',
'neon',
'noaagml',
'noaapsl',
'pysp2',
'hysplit',
],
submod_attrs={
'arm': [
'WriteDataset',
Expand All @@ -17,6 +28,7 @@
'check_if_tar_gz_file',
'read_arm_mmcr',
],
'ameriflux': ['format_as_ameriflux'],
'text': ['read_csv'],
'icartt': ['read_icartt'],
'mpl': ['proc_sigma_mplv5_read', 'read_sigma_mplv5'],
Expand Down
181 changes: 181 additions & 0 deletions act/io/ameriflux.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
"""
This module contains I/O operations for the U.S. Department of Energy
AmeriFlux program (https://ameriflux.lbl.gov/).
"""

import numpy as np
import pandas as pd
import warnings


def convert_to_ameriflux(
ds,
variable_mapping=None,
soil_mapping=None,
depth_profile=[2.5, 5, 10, 15, 20, 30, 35, 50, 75, 100],
include_missing_variables=False,
**kwargs,
):
"""
Returns `xarray.Dataset` with stored data and metadata from a user-defined
query of ARM-standard netCDF files from a single datastream. Has some procedures
to ensure time is correctly fomatted in returned Dataset.
Parameters
----------
ds : xarray.Dataset
Dataset of data to convert to AmeriFlux format
variable_mapping : dict
Dictionary of variables mappings. The key should be the name of the variable
in the Dataset with the values being dictionaries of the AmeriFlux name and units.
For example:
var_mapping = {
'co2_flux': {'name': 'FC', 'units': 'umol/(m^2 s)'},
}
soil_mapping : dict
Dictionary of soil variables mappings following the same formatting as variable_mapping.
It is understood that the AmeriFlux name may be the same for some variables. This
script attempts to automatically name these measurements. If a variable is not dimensioned
by a depth nor has a sensor_height attribute, it will automatically assume that it's
at the first depth in the depth_profile variable.
depth_profile : list
List of depths that the variables will be mapped to. If a depth is not in this list,
the index chosen will be the one closest to the depth value.
include_missing_variables : boolean
If there variables that are completely missing (-9999) chose whether or not to include
them in the DataFrame.
Returns
-------
df : pandas.DataFrame (or None)
Returns a pandas dataframe for easy writing to csv
"""
# Use ARM variable mappings if none provided
if variable_mapping is None:
warnings.warn('Variable mapping was not provided, using default ARM mapping')
# Define variable mapping and units
# The key is the variable name in the data and the name in the dictionary
# is the AmeriFlux Name
var_mapping = {
'co2_flux': {'name': 'FC', 'units': 'umol/(m^2 s)'},
'co2_molar_fraction': {'name': 'CO2', 'units': 'nmol/mol'},
'co2_mixing_ratio': {'name': 'CO2_MIXING_RATIO', 'units': 'umol/mol'},
'h2o_mole_fraction': {'name': 'H2O', 'units': 'mmol/mol'},
'h2o_mixing_ratio': {'name': 'H2O_MIXING_RATIO', 'units': 'mmol/mol'},
'ch4_mole_fraction': {'name': 'CH4', 'units': 'nmol/mol'},
'ch4_mixing_ratio': {'name': 'CH4_MIXING_RATIO', 'units': 'nmol/mol'},
'momentum_flux': {'name': 'TAU', 'units': 'kg/(m s^2)'},
'sensible_heat_flux': {'name': 'H', 'units': 'W/m^2'},
'latent_flux': {'name': 'LE', 'units': 'W/m^2'},
'air_temperature': {'name': 'TA', 'units': 'deg C'},
'air_pressure': {'name': 'PA', 'units': 'kPa'},
'relative_humidity': {'name': 'RH', 'units': '%'},
'sonic_temperature': {'name': 'T_SONIC', 'units': 'deg C'},
'water_vapor_pressure_defecit': {'name': 'VPD', 'units': 'hPa'},
'Monin_Obukhov_length': {'name': 'MO_LENGTH', 'units': 'm'},
'Monin_Obukhov_stability_parameter': {'name': 'ZL', 'units': ''},
'mean_wind': {'name': 'WS', 'units': 'm/s'},
'wind_direction_from_north': {'name': 'WD', 'units': 'deg'},
'friction_velocity': {'name': 'USTAR', 'units': 'm/s'},
'maximum_instantaneous_wind_speed': {'name': 'WS_MAX', 'units': 'm/s'},
'down_short_hemisp': {'name': 'SW_IN', 'units': 'W/m^2'},
'up_short_hemisp': {'name': 'SW_OUT', 'units': 'W/m^2'},
'down_long': {'name': 'LW_IN', 'units': 'W/m^2'},
'up_long': {'name': 'LW_OUT', 'units': 'W/m^2'},
'albedo': {'name': 'ALB', 'units': '%'},
'net_radiation': {'name': 'NETRAD', 'units': 'W/m^2'},
'par_inc': {'name': 'PPFD_IN', 'units': 'umol/(m^2 s)'},
'par_ref': {'name': 'PPFD_OUT', 'units': 'umol/(m^2 s)'},
'precip': {'name': 'P', 'units': 'mm'},
}

# Use ARM variable mappings if none provided
# Similar to the above. This has only been tested on the ARM
# ECOR, SEBS, STAMP, and AMC combined. The automated naming may
# not work for all cases
if soil_mapping is None:
warnings.warn('Soil variable mapping was not provided, using default ARM mapping')
soil_mapping = {
'surface_soil_heat_flux': {'name': 'G', 'units': 'W/m^2'},
'soil_temp': {'name': 'TS', 'units': 'deg C'},
'temp': {'name': 'TS', 'units': 'deg C'},
'soil_moisture': {'name': 'SWC', 'units': '%'},
'soil_specific_water_content': {'name': 'SWC', 'units': '%'},
'vwc': {'name': 'SWC', 'units': '%'},
}

# Loop through variables and update units to the AmeriFlux standard
for v in ds:
if v in var_mapping:
ds = ds.utils.change_units(variables=v, desired_unit=var_mapping[v]['units'])

# Get start/end time stamps
ts_start = ds['time'].dt.strftime('%Y%m%d%H%M').values
ts_end = [
pd.to_datetime(t + np.timedelta64(30, 'm')).strftime('%Y%m%d%H%M')
for t in ds['time'].values
]
data = {}
data['TIMESTAMP_START'] = ts_start
data['TIMESTAMP_END'] = ts_end

# Loop through the variables in the var mapping dictionary and add data to dictionary
for v in var_mapping:
if v in ds:
if 'missing_value' not in ds[v].attrs:
ds[v].attrs['missing_value'] = -9999
if np.all(ds[v].isnull()):
if include_missing_variables:
data[var_mapping[v]['name']] = ds[v].values
else:
data[var_mapping[v]['name']] = ds[v].values
else:
if include_missing_variables:
data[var_mapping[v]['name']] = np.full(ds['time'].shape, -9999)

# Automated naming for the soil variables
# Again, this may not work for other cases. Careful review is needed.
prev_var = ''
for var in soil_mapping:
if soil_mapping[var]['name'] != prev_var:
h = 1
r = 1
prev_var = soil_mapping[var]['name']
soil_vars = [
v2
for v2 in list(ds)
if (v2.startswith(var)) & ('std' not in v2) & ('qc' not in v2) & ('net' not in v2)
]
for i, svar in enumerate(soil_vars):
vert = 1
if ('avg' in svar) | ('average' in svar):
continue
soil_data = ds[svar].values
data_shape = soil_data.shape
if len(data_shape) > 1:
coords = ds[svar].coords
depth_name = list(coords)[-1]
depth_values = ds[depth_name].values
for depth_ind in range(len(depth_values)):
soil_data_depth = soil_data[:, depth_ind]
vert = np.where(depth_profile == depth_values[depth_ind])[0][0] + 1
new_name = '_'.join([soil_mapping[var]['name'], str(h), str(vert), str(r)])
data[new_name] = soil_data_depth
else:
if 'sensor_height' in ds[svar].attrs:
sensor_ht = ds[svar].attrs['sensor_height'].split(' ')
depth = abs(float(sensor_ht[0]))
units = sensor_ht[1]
if units == 'cm':
vert = np.argmin(np.abs(np.array(depth_profile) - depth)) + 1
new_name = '_'.join([soil_mapping[var]['name'], str(h), str(vert), str(r)])
data[new_name] = soil_data
h += 1

# Convert dictionary to dataframe and return
df = pd.DataFrame(data)
df = df.fillna(-9999.0)

return df
5 changes: 5 additions & 0 deletions act/tests/sample_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@
EXAMPLE_EBBR3 = DATASETS.fetch('sgp30ebbrE13.b1.20190601.000000.nc')
EXAMPLE_ECOR = DATASETS.fetch('sgp30ecorE14.b1.20190601.000000.cdf')
EXAMPLE_SEBS = DATASETS.fetch('sgpsebsE14.b1.20190601.000000.cdf')
EXAMPLE_SEBS_E39 = DATASETS.fetch('sgpsebsE39.b1.20230601.000000.cdf')
EXAMPLE_ECORSF_E39 = DATASETS.fetch('sgpecorsfE39.b1.20230601.000000.nc')
EXAMPLE_STAMP_E39 = DATASETS.fetch('sgpstampE39.b1.20230601.000000.nc')
EXAMPLE_STAMPPCP_E39 = DATASETS.fetch('sgpstamppcpE39.b1.20230601.000000.nc')
EXAMPLE_AMC_E39 = DATASETS.fetch('sgpamcE39.b1.20230601.000000.nc')
EXAMPLE_MFAS_SODAR = DATASETS.fetch('sodar.20230404.mnd')
EXAMPLE_ENA_MET = DATASETS.fetch('enametC1.b1.20221109.000000.cdf')
EXAMPLE_CCN = DATASETS.fetch('sgpaosccn2colaE13.b1.20170903.000000.nc')
Expand Down
123 changes: 123 additions & 0 deletions examples/io/plot_convert_ameriflux.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""
Convert Data to AmeriFlux Format
--------------------------------
This script shows how to convert ARM data to AmeriFlux format
using an ACT function, and write it out to csv. More information
on AmeriFlux and their file formats and naming conventions can be
found here: https://ameriflux.lbl.gov/
Author: Adam Theisen
"""

import act
import glob
import xarray as xr
import os
import matplotlib.pyplot as plt

# Read in the ECOR data
files = glob.glob(act.tests.sample_files.EXAMPLE_ECORSF_E39)
ds_ecor = act.io.arm.read_arm_netcdf(files)

# The ECOR time stamp as at the end of the Averaging period so adjusting
# it to be consistent with the other systems
ds_ecor = act.utils.datetime_utils.adjust_timestamp(ds_ecor)

# Clean up and QC the data based on embedded QC and ARM DQRs
ds_ecor.clean.cleanup()
ds_ecor = act.qc.arm.add_dqr_to_qc(ds_ecor)
ds_ecor.qcfilter.datafilter(
del_qc_var=False, rm_assessments=['Bad', 'Incorrect', 'Indeterminate', 'Suspect']
)

# Then we do this same thing for the other instruments
# SEBS
files = glob.glob(act.tests.sample_files.EXAMPLE_SEBS_E39)
ds_sebs = act.io.arm.read_arm_netcdf(files)
# SEBS does not have a time_bounds variable so we have to manually adjust it
ds_sebs = act.utils.datetime_utils.adjust_timestamp(ds_sebs, offset=-30 * 60)
ds_sebs.clean.cleanup()
ds_sebs = act.qc.arm.add_dqr_to_qc(ds_sebs)
ds_sebs.qcfilter.datafilter(
del_qc_var=False, rm_assessments=['Bad', 'Incorrect', 'Indeterminate', 'Suspect']
)

# STAMP
files = glob.glob(act.tests.sample_files.EXAMPLE_STAMP_E39)
ds_stamp = act.io.arm.read_arm_netcdf(files)
ds_stamp.clean.cleanup()
ds_stamp = act.qc.arm.add_dqr_to_qc(ds_stamp)
ds_stamp.qcfilter.datafilter(
del_qc_var=False, rm_assessments=['Bad', 'Incorrect', 'Indeterminate', 'Suspect']
)

# STAMP Precipitation
files = glob.glob(act.tests.sample_files.EXAMPLE_STAMPPCP_E39)
ds_stamppcp = act.io.arm.read_arm_netcdf(files)
ds_stamppcp.clean.cleanup()
ds_stamppcp = act.qc.arm.add_dqr_to_qc(ds_stamppcp)
ds_stamppcp.qcfilter.datafilter(
del_qc_var=False, rm_assessments=['Bad', 'Incorrect', 'Indeterminate', 'Suspect']
)
# These are minute data so we need to resample and sum up to 30 minutes
ds_stamppcp = ds_stamppcp['precip'].resample(time='30Min').sum()

# AMC
files = glob.glob(act.tests.sample_files.EXAMPLE_AMC_E39)
ds_amc = act.io.arm.read_arm_netcdf(files)
ds_amc.clean.cleanup()
ds_amc = act.qc.arm.add_dqr_to_qc(ds_amc)
ds_amc.qcfilter.datafilter(
del_qc_var=False, rm_assessments=['Bad', 'Incorrect', 'Indeterminate', 'Suspect']
)

# Merge these datasets together
ds = xr.merge([ds_ecor, ds_sebs, ds_stamp, ds_stamppcp, ds_amc], compat='override')

# Convert the data to AmeriFlux format and get a DataFrame in return
# Note, this does not return an xarray Dataset as it's assumed the data
# will just be written out to csv format.
df = act.io.ameriflux.convert_to_ameriflux(ds)

# Write the data out to file
site = 'US-A14'
directory = './' + site + 'mergedflux/'
if not os.path.exists(directory):
os.makedirs(directory)

# Following the AmeriFlux file naming convention
filename = (
site
+ '_HH_'
+ str(df['TIMESTAMP_START'].iloc[0])
+ '_'
+ str(df['TIMESTAMP_END'].iloc[-1])
+ '.csv'
)
df.to_csv(directory + filename, index=False)


# Plot up merged data for visualization
display = act.plotting.TimeSeriesDisplay(ds, subplot_shape=(4,), figsize=(12, 10))
display.plot('latent_flux', subplot_index=(0,))
display.plot('co2_flux', subplot_index=(0,))
display.plot('sensible_heat_flux', subplot_index=(0,))
display.day_night_background(subplot_index=(0,))

display.plot('precip', subplot_index=(1,))
display.day_night_background(subplot_index=(1,))

display.plot('surface_soil_heat_flux_1', subplot_index=(2,))
display.plot('surface_soil_heat_flux_2', subplot_index=(2,))
display.plot('surface_soil_heat_flux_3', subplot_index=(2,))
display.day_night_background(subplot_index=(2,))

display.plot('soil_specific_water_content_west', subplot_index=(3,))
display.axes[3].set_ylim(display.axes[3].get_ylim()[::-1])

display.day_night_background(subplot_index=(3,))

plt.subplots_adjust(hspace=0.35)
plt.show()
33 changes: 33 additions & 0 deletions tests/io/test_ameriflux.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import act
import glob
import xarray as xr


def test_convert_to_ameriflux():
files = glob.glob(act.tests.sample_files.EXAMPLE_ECORSF_E39)
ds_ecor = act.io.arm.read_arm_netcdf(files)

df = act.io.ameriflux.convert_to_ameriflux(ds_ecor)

assert 'FC' in df
assert 'WS_MAX' in df

files = glob.glob(act.tests.sample_files.EXAMPLE_SEBS_E39)
ds_sebs = act.io.arm.read_arm_netcdf(files)

ds = xr.merge([ds_ecor, ds_sebs])
df = act.io.ameriflux.convert_to_ameriflux(ds)

assert 'SWC_2_1_1' in df
assert 'TS_3_1_1' in df
assert 'G_2_1_1' in df

files = glob.glob(act.tests.sample_files.EXAMPLE_STAMP_E39)
ds_stamp = act.io.arm.read_arm_netcdf(files)

ds = xr.merge([ds_ecor, ds_sebs, ds_stamp], compat='override')
df = act.io.ameriflux.convert_to_ameriflux(ds)

assert 'SWC_6_10_1' in df
assert 'G_2_1_1' in df
assert 'TS_5_2_1' in df

0 comments on commit 4275696

Please sign in to comment.