From bb509d7a2f297838b11c63544d2a2fe1426ac1d7 Mon Sep 17 00:00:00 2001 From: Ken Kehoe Date: Wed, 11 Dec 2024 09:34:35 -0700 Subject: [PATCH 1/3] Adding step detection test --- act/qc/qctests.py | 139 +++++++++++++++++++++++++++++ examples/qc/plot_qc_step_change.py | 60 +++++++++++++ tests/qc/test_qctests.py | 88 ++++++++++++++++++ 3 files changed, 287 insertions(+) create mode 100644 examples/qc/plot_qc_step_change.py diff --git a/act/qc/qctests.py b/act/qc/qctests.py index 4d57661489..efdb0f205b 100644 --- a/act/qc/qctests.py +++ b/act/qc/qctests.py @@ -1629,3 +1629,142 @@ def add_atmospheric_pressure_test( ) return result + + def add_step_change_test( + self, + var_name, + k=1.0, + detrend=True, + n_flagged=2, + add_nan=False, + test_meaning=None, + test_assessment='Indeterminate', + test_number=None, + flag_value=False, + prepend_text=None, + ): + """ + Method to detect a shift change in values using the CUSUM (cumulative sum control chart) test. + + Parameters + ---------- + var_name : str + Data variable name in Dataset to use for testing. Results are inserted into + accompanying embedded quality control variable. + k : float + Reference value. This is typically around the value of the shift size change to + to be detected when detrend=True. Will typically be around half the value + of the shift size when no detrend is applied. + detrend : bool + Remove the trend in the data by differencing the data before + applying the CUSUM algorithm. Needed for most data that have atmospheric variability. + n_flagged : int + Number of time steps to flag in the quality control variable. Default is to + flag the point of and one after the step change since we will not know which + value of the two will be suspect. Can set to any number of time steps + or -1 to flag the remaining data after the detected step change. + add_nan : bool + Should a NaN value be added to the data where a value is missing. A value is + determined to be missing when the time step is larger than the mode of the + difference in time values. This will stop the reporting of a step when there + is an outage and the data normally rises/decends but with the gap is appears + as a step change. + test_meaning : str + Optional text description to add to flag_meanings + describing the test. Will use a default if not set. + test_assessment : str + Optional single word describing the assessment of the test. + Will use a default if not set. + test_number : int + Optional test number to use. If not set will use next available + test number. + flag_value : boolean + Indicates that the tests are stored as integers + not bit packed values in quality control variable. + prepend_text : str + Optional text to prepend to the test meaning. + Example is indicate what institution added the test. + + Returns + ------- + test_info : tuple + A tuple containing test information including var_name, qc + variable name, test_number, test_meaning, test_assessment + + """ + + def cusum(data, k, mean_val, lower=False): + """ + CUSUM algrithm used to detect step changes. + + data : numpy array + 1D numpy array of time series data to analze + k : float + Reference value. This is typically half the value of the shift size change to + to be detected. + mean_val : float + Mean value of data. + lower : bool + Option to loof for lower shifts vs. upper shifts + + """ + + C = np.zeros(data.size) + if lower: + for ii in range(1, data.size): + C[ii] = max(0, C[ii - 1] - (data[ii] - mean_val + k)) + else: + for ii in range(1, data.size): + C[ii] = max(0, C[ii - 1] + (data[ii] - mean_val - k)) + + return C + + data = self._ds[var_name].values + time = self._ds['time'].values + if add_nan: + from act.utils.data_utils import add_in_nan + + time, data = add_in_nan(time, data) + + data = data.astype(float) + if detrend: + data = np.diff(data) + data = np.append(np.nan, data) + + if n_flagged < 0: + n_flagged = data.size + + mean_val = np.nanmean(data) + index = np.full(data.size, False) + for lower in [False, True]: + C = cusum(data, k, mean_val, lower=lower) + found_ind = np.where(np.diff(C) > 0.0)[0] + for ind in found_ind: + ind = np.arange(ind, ind + n_flagged) + ind = ind[ind < data.size] + index[ind] = True + + if add_nan: + import xarray as xr + + da = xr.DataArray(index, dims=['time'], coords=[time]) + result = da.sel(time=self._ds['time']) + index = result.values + del result + + if test_meaning is None: + test_meaning = f'Shift in data detected with CUSUM algrithm: k={round(k, 2)}' + + if prepend_text is not None: + test_meaning = ': '.join((prepend_text, test_meaning)) + + result = self._ds.qcfilter.add_test( + var_name, + index=index, + test_number=test_number, + test_meaning=test_meaning, + test_assessment=test_assessment, + flag_value=flag_value, + ) + + return result diff --git a/examples/qc/plot_qc_step_change.py b/examples/qc/plot_qc_step_change.py new file mode 100644 index 0000000000..f725a3d273 --- /dev/null +++ b/examples/qc/plot_qc_step_change.py @@ -0,0 +1,60 @@ +""" + +This is an example for how to use the step change detection test. +The test uses the cumulative sum control chart to detect when +a sudden shift in values occurs. It has an option to insert +NaN value when there is a data gap to not have those periods +returned as a data shift. This example produces two plots, +one with the data gap flagged and one without. + +""" + +from matplotlib import pyplot as plt +import numpy as np + +from arm_test_data import DATASETS +from act.io.arm import read_arm_netcdf + + +# Get example data from ARM Test Data repository +EXAMPLE_MET = DATASETS.fetch('sgpmetE13.b1.20190101.000000.cdf') +variable = 'temp_mean' +ds = read_arm_netcdf(EXAMPLE_MET, keep_variables=variable) + +# Add shifts in the data +data = ds[variable].values +data[600:] += 2 +data[1000:] -= 2 +ds[variable].values = data + +# Remove data from the Dataset to simulate instrument being off-line +ds = ds.where((ds["time.hour"] < 3) | (ds["time.hour"] > 5), drop=True) + +# Add step change test +ds.qcfilter.add_step_change_test(variable) + +# Add step change test but insert NaN values during period of missing data +# so it does not trip the test. +ds.qcfilter.add_step_change_test(variable, add_nan=True) + +# Make plot with results from the step change test for when the missing data +# is included and a second plot without including the missing data gap. +title = 'Step change detection' +for ii in range(1, 3): + plt.figure(figsize=(10, 6)) + plt.plot(ds['time'].values, ds[variable].values, label='Data') + plt.xlabel('Time') + plt.ylabel(f"{ds[variable].attrs['long_name']} ({ds[variable].attrs['units']})") + plt.title(title) + plt.grid(lw=2, ls=':') + + label = 'Step change' + index = ds.qcfilter.get_qc_test_mask(var_name=variable, test_number=ii) + for jj in np.where(index)[0]: + plt.axvline(x=ds['time'].values[jj], color='orange', linestyle='--', label=label) + label = None + + title += ' with NaN added in data gaps' + + plt.legend() + plt.show() diff --git a/tests/qc/test_qctests.py b/tests/qc/test_qctests.py index 0a244eea7c..2e41ab70f9 100644 --- a/tests/qc/test_qctests.py +++ b/tests/qc/test_qctests.py @@ -417,3 +417,91 @@ def test_add_atmospheric_pressure_test(): ds.close del ds + + +def test_add_step_change_test(): + variable = 'temp_mean' + qc_variable = f"qc_{variable}" + ds = read_arm_netcdf(EXAMPLE_MET1, keep_variables=['temp_mean', 'atmos_pressure']) + ds.load() + + result = ds.qcfilter.add_step_change_test(variable) + assert result == { + 'test_number': 1, + 'test_meaning': 'Shift in data detected with CUSUM algrithm: k=1.0', + 'test_assessment': 'Indeterminate', + 'qc_variable_name': qc_variable, + 'variable_name': variable, + } + index = ds.qcfilter.get_qc_test_mask(var_name=variable, test_number=1) + assert len(np.where(index)[0]) == 0 + assert ds[qc_variable].attrs['flag_meanings'] == [ + 'Shift in data detected with CUSUM algrithm: k=1.0' + ] + assert ds[qc_variable].attrs['flag_assessments'] == ['Indeterminate'] + + data = ds[variable].values + data[100:] -= 5 + data[600:] += 4 + data[800:] += 10 + data[1000:] -= 2 + ds[variable].values = data + + ds.qcfilter.add_step_change_test(variable) + index = ds.qcfilter.get_qc_test_mask(var_name=variable, test_number=2) + assert np.all(np.where(index)[0] == [99, 100, 599, 600, 799, 800, 999, 1000]) + assert ( + ds[qc_variable].attrs['flag_meanings'][1] + == 'Shift in data detected with CUSUM algrithm: k=1.0' + ) + assert ds[qc_variable].attrs['flag_assessments'][1] == 'Indeterminate' + + ds.qcfilter.add_step_change_test(variable, k=4, prepend_text='ARM') + index = ds.qcfilter.get_qc_test_mask(var_name=variable, test_number=3) + assert np.all(np.where(index)[0] == [99, 100, 599, 600, 799, 800]) + assert ( + ds[qc_variable].attrs['flag_meanings'][2] + == 'ARM: Shift in data detected with CUSUM algrithm: k=4' + ) + + ds.qcfilter.add_step_change_test(variable, n_flagged=3) + index = ds.qcfilter.get_qc_test_mask(var_name=variable, test_number=4) + assert np.all( + np.where(index)[0] == [99, 100, 101, 599, 600, 601, 799, 800, 801, 999, 1000, 1001] + ) + + ds.qcfilter.add_step_change_test(variable, n_flagged=-1, k=5.1, test_assessment='Suspect') + index = ds.qcfilter.get_qc_test_mask(var_name=variable, test_number=5) + assert np.all(np.where(index)[0] == np.arange(799, 1440)) + assert ( + ds[qc_variable].attrs['flag_meanings'][4] + == 'Shift in data detected with CUSUM algrithm: k=5.1' + ) + assert ds[qc_variable].attrs['flag_assessments'][4] == 'Suspect' + + variable = 'atmos_pressure' + ds.qcfilter.add_step_change_test(variable, detrend=False) + index = ds.qcfilter.get_qc_test_mask(var_name=variable, test_number=1) + assert len(np.where(index)[0]) == 0 + + ds.close + del ds + + # Test add_nan keyword + variable = 'temp_mean' + ds = read_arm_netcdf(EXAMPLE_MET1, keep_variables=variable) + data = ds[variable].values + data[600:] += 2 + ds[variable].values = data + + ds = ds.where((ds["time.hour"] < 3) | (ds["time.hour"] > 5), drop=True) + + ds.qcfilter.add_step_change_test(variable, add_nan=False) + index = ds.qcfilter.get_qc_test_mask(var_name=variable, test_number=1) + assert np.all(np.where(index)[0] == [179, 180, 419, 420]) + + ds.qcfilter.add_step_change_test(variable, add_nan=True) + index = ds.qcfilter.get_qc_test_mask(var_name=variable, test_number=2) + assert np.all(np.where(index)[0] == [419, 420]) + + del ds From 161737c102ed543a1c527b7754c0dd776cadb44a Mon Sep 17 00:00:00 2001 From: Ken Kehoe Date: Wed, 11 Dec 2024 12:09:59 -0700 Subject: [PATCH 2/3] Changing method in cusum() to not require running twice. --- act/qc/qctests.py | 59 +++++++++++++++++++++++------------------------ 1 file changed, 29 insertions(+), 30 deletions(-) diff --git a/act/qc/qctests.py b/act/qc/qctests.py index efdb0f205b..b4e619d240 100644 --- a/act/qc/qctests.py +++ b/act/qc/qctests.py @@ -1693,38 +1693,39 @@ def add_step_change_test( """ - def cusum(data, k, mean_val, lower=False): + def cusum(data, k): """ - CUSUM algrithm used to detect step changes. + CUSUM algorithm used to detect step changes. data : numpy array 1D numpy array of time series data to analze k : float Reference value. This is typically half the value of the shift size change to - to be detected. - mean_val : float - Mean value of data. - lower : bool - Option to loof for lower shifts vs. upper shifts + to be detected or the size of the shift change if the data is detrended by + differencing. + + Returns + ------- + C : numpy array + Numpy array containing a 0 when there is no shift detected + or positive value when a shift is detected. """ - C = np.zeros(data.size) - if lower: - for ii in range(1, data.size): - C[ii] = max(0, C[ii - 1] - (data[ii] - mean_val + k)) - else: - for ii in range(1, data.size): - C[ii] = max(0, C[ii - 1] + (data[ii] - mean_val - k)) + mean_val = np.nanmean(data) + Cu = np.zeros(data.size, dtype=np.float16) + Cl = np.zeros(data.size, dtype=np.float16) + for ii in range(1, data.size): + Cl[ii] = max(0.0, Cl[ii - 1] - (data[ii] - mean_val + k)) + Cu[ii] = max(0.0, Cu[ii - 1] + (data[ii] - mean_val - k)) - return C + return np.maximum(Cu, Cl) data = self._ds[var_name].values - time = self._ds['time'].values if add_nan: from act.utils.data_utils import add_in_nan - time, data = add_in_nan(time, data) + time, data = add_in_nan(self._ds['time'].values, data) data = data.astype(float) if detrend: @@ -1734,29 +1735,27 @@ def cusum(data, k, mean_val, lower=False): if n_flagged < 0: n_flagged = data.size - mean_val = np.nanmean(data) index = np.full(data.size, False) - for lower in [False, True]: - C = cusum(data, k, mean_val, lower=lower) - found_ind = np.where(np.diff(C) > 0.0)[0] - for ind in found_ind: - ind = np.arange(ind, ind + n_flagged) - ind = ind[ind < data.size] - index[ind] = True + C = cusum(data, k) + found_ind = np.where(np.diff(C) > 0.0)[0] + for ind in found_ind: + ind = np.arange(ind, ind + n_flagged) + ind = ind[ind < data.size] + index[ind] = True if add_nan: import xarray as xr da = xr.DataArray(index, dims=['time'], coords=[time]) - result = da.sel(time=self._ds['time']) - index = result.values - del result + da = da.sel(time=self._ds['time']) + index = da.values + del da if test_meaning is None: - test_meaning = f'Shift in data detected with CUSUM algrithm: k={round(k, 2)}' + test_meaning = f'Shift in data detected with CUSUM algorithm: k={round(k, 2)}' if prepend_text is not None: - test_meaning = ': '.join((prepend_text, test_meaning)) + test_meaning = f'{prepend_text}: {test_meaning}' result = self._ds.qcfilter.add_test( var_name, From 4eddc2ad830571f788583734976edcc77ed2b5b3 Mon Sep 17 00:00:00 2001 From: Ken Kehoe Date: Wed, 11 Dec 2024 12:10:16 -0700 Subject: [PATCH 3/3] Correcting a misspelling. --- tests/qc/test_qctests.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/qc/test_qctests.py b/tests/qc/test_qctests.py index 2e41ab70f9..89e3b48830 100644 --- a/tests/qc/test_qctests.py +++ b/tests/qc/test_qctests.py @@ -428,7 +428,7 @@ def test_add_step_change_test(): result = ds.qcfilter.add_step_change_test(variable) assert result == { 'test_number': 1, - 'test_meaning': 'Shift in data detected with CUSUM algrithm: k=1.0', + 'test_meaning': 'Shift in data detected with CUSUM algorithm: k=1.0', 'test_assessment': 'Indeterminate', 'qc_variable_name': qc_variable, 'variable_name': variable, @@ -436,7 +436,7 @@ def test_add_step_change_test(): index = ds.qcfilter.get_qc_test_mask(var_name=variable, test_number=1) assert len(np.where(index)[0]) == 0 assert ds[qc_variable].attrs['flag_meanings'] == [ - 'Shift in data detected with CUSUM algrithm: k=1.0' + 'Shift in data detected with CUSUM algorithm: k=1.0' ] assert ds[qc_variable].attrs['flag_assessments'] == ['Indeterminate'] @@ -452,7 +452,7 @@ def test_add_step_change_test(): assert np.all(np.where(index)[0] == [99, 100, 599, 600, 799, 800, 999, 1000]) assert ( ds[qc_variable].attrs['flag_meanings'][1] - == 'Shift in data detected with CUSUM algrithm: k=1.0' + == 'Shift in data detected with CUSUM algorithm: k=1.0' ) assert ds[qc_variable].attrs['flag_assessments'][1] == 'Indeterminate' @@ -461,7 +461,7 @@ def test_add_step_change_test(): assert np.all(np.where(index)[0] == [99, 100, 599, 600, 799, 800]) assert ( ds[qc_variable].attrs['flag_meanings'][2] - == 'ARM: Shift in data detected with CUSUM algrithm: k=4' + == 'ARM: Shift in data detected with CUSUM algorithm: k=4' ) ds.qcfilter.add_step_change_test(variable, n_flagged=3) @@ -475,7 +475,7 @@ def test_add_step_change_test(): assert np.all(np.where(index)[0] == np.arange(799, 1440)) assert ( ds[qc_variable].attrs['flag_meanings'][4] - == 'Shift in data detected with CUSUM algrithm: k=5.1' + == 'Shift in data detected with CUSUM algorithm: k=5.1' ) assert ds[qc_variable].attrs['flag_assessments'][4] == 'Suspect'