Skip to content

Commit

Permalink
add in averaging for Stream objects (uncscode#367)
Browse files Browse the repository at this point in the history
* add in averaging for Stream objects
Fixes uncscode#364

* added example to book

* 'Refactored by Sourcery' (uncscode#368)

Co-authored-by: Sourcery AI <>

* fixed test

* pytype skip stream stats

---------

Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
  • Loading branch information
Gorkowski and sourcery-ai[bot] authored Nov 14, 2023
1 parent 4fdb046 commit ba69862
Show file tree
Hide file tree
Showing 11 changed files with 918 additions and 128 deletions.
1 change: 1 addition & 0 deletions docs/_toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ parts:
- file: examples/ionparticle_coagulation
- file: examples/loading_data_part1
- file: examples/loading_data_part2
- file: examples/stream_stats_part1
- caption: Documentation
numbered: false
chapters:
Expand Down
14 changes: 7 additions & 7 deletions docs/examples/loading_data_part1.ipynb

Large diffs are not rendered by default.

190 changes: 160 additions & 30 deletions docs/examples/loading_data_part2.ipynb

Large diffs are not rendered by default.

456 changes: 456 additions & 0 deletions docs/examples/stream_stats_part1.ipynb

Large diffs are not rendered by default.

20 changes: 6 additions & 14 deletions particula/data/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,20 +492,12 @@ def sizer_data_formatter(

if "convert_scale_from" in data_sizer_reader:
if data_sizer_reader["convert_scale_from"] == "dw":
inverse = True
elif data_sizer_reader["convert_scale_from"] == "dw/dlogdp":
inverse = False
else:
raise ValueError(
"Invalid value for convert_scale_from in data_sizer_reader." +
" Either dw/dlogdp or dw must be specified."
)
for i in range(len(epoch_time)):
data_2d[i, :] = convert.convert_sizer_dn(
diameter=np.array(header).astype(float),
dn_dlogdp=data_2d[i, :],
inverse=inverse
)
for i in range(len(epoch_time)):
data_2d[i, :] = convert.convert_sizer_dn(
diameter=np.array(header).astype(float),
dn_dlogdp=data_2d[i, :],
inverse=True
)

return epoch_time, data_2d, header

Expand Down
79 changes: 72 additions & 7 deletions particula/data/settings_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,43 @@ def for_general_1d_load(
time_shift_seconds: int = 0,
timezone_identifier: str = 'UTC',
) -> dict:
"""Generate settings file for 1d general file."""
"""
Generate a settings dictionary for loading and checking 1D data from CSV
files.
Parameters:
------------
- relative_data_folder (str): The folder path relative to the main script
where data files are located. Default is 'instrument_data'.
- filename_regex (str): Regular expression pattern to match filenames in
the data folder. Default is '*.csv'.
- file_min_size_bytes (int): Minimum size in bytes for files to be
considered valid. Default is 10.
- header_row (int): The index of the row containing column headers
(0-indexed). Default is 0.
- data_checks (Optional[dict]): A dictionary containing data quality
checks such as character length, required character counts, rows to
skip at the beginning or end. Defaults to basic checks if None.
- data_column (list of int): List of indices for columns containing data
points to be loaded. Default is [3, 5].
- data_header (List[str]): List of strings representing the header names
for data columns. Default is ['data 1', 'data 3'].
- time_column (List[int]): List of indices for columns containing time
information. Default is [0, 1].
- time_format (str): String format for parsing time columns, using
strftime conventions. Default is '%Y-%m-%d %H:%M:%S.%f'.
- delimiter (str): Character used to separate values in the file.
Default is ','.
- time_shift_seconds (int): Number of seconds by which to shift time data
(positive or negative). Default is 0.
- timezone_identifier (str): Timezone identifier for time conversion.
Default is 'UTC'.
Returns:
- dict: A dictionary with settings for data loading procedures including
file paths, size requirements, header information, and data check
parameters.
"""
if data_checks is None:
data_checks = {
"characters": [10, 100],
Expand Down Expand Up @@ -61,13 +97,42 @@ def for_general_sizer_1d_2d_load(
time_shift_seconds: int = 0,
timezone_identifier: str = 'UTC',
) -> tuple:
"""Generate settings file for 1d general file loader and
2d general sizer file loader.
"""
Generate settings for the 1D general file loader and the 2D general sizer
file loader.
Parameters:
- relative_data_folder (str): Path to the folder containing data files,
relative to the script's location.
- filename_regex (str): Regex pattern to match filenames for loading.
- file_min_size_bytes (int): Minimum file size in bytes for a file to be
considered valid for loading.
- header_row (int): Row index for the header (0-based) in the data files.
- data_checks (dict, optional): Specifications for data integrity checks
to apply when loading data.
- data_1d_column (list of int): Column indices for 1D data extraction.
- data_1d_header (list of str): Header names corresponding to the
`data_1d_column` indices.
- data_2d_dp_start_keyword (str): Keyword indicating the start of 2D data
points in a file.
- data_2d_dp_end_keyword (str): Keyword indicating the end of 2D data
points in a file.
- data_2d_convert_concentration_from (str, optional): Unit to convert from
if concentration scaling is needed for 2D data.
- time_column (list of int): Column indices for time data extraction.
- time_format (str): Format string for parsing time data.
- delimiter (str): Delimiter character for splitting data in the file.
- time_shift_seconds (int): Seconds to shift the time data by.
- timezone_identifier (str): Timezone ID for time data interpretation.
Returns:
- tuple of (dict, dict): A tuple containing two dictionaries with settings
for the 1D and 2D data loaders.
Returns
-------
Tuple[dict, dict]
The settings for the 1d loader and the 2d loader.
The function defaults `data_checks` to basic validation criteria if not
provided. It returns separate dictionaries for settings applicable to
1D and 2D data loaders, which include file paths, size checks, and
data parsing rules.
"""
if data_checks is None:
data_checks = {
Expand Down
8 changes: 4 additions & 4 deletions particula/data/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ class StreamAveraged(Stream):
"""A subclass of Stream with additional parameters related to averaging.
Attributes:
average_window (float): The size of the window used for averaging.
average_interval (float): The size of the window used for averaging.
start_time (float): The start time for averaging.
stop_time (float): The stop time for averaging.
standard_deviation (float): The standard deviation of the data.
"""

average_window: float = field(default_factory=float)
average_interval: float = field(default_factory=float)
start_time: float = field(default_factory=float)
stop_time: float = field(default_factory=float)
standard_deviation: np.ndarray = field(
Expand All @@ -98,8 +98,8 @@ def validate_averaging_params(self):
start_time and stop_time are not numbers or if start_time is
greater than or equal to stop_time.
"""
if not isinstance(self.average_window, (int, float)
) or self.average_window <= 0:
if not isinstance(self.average_interval, (int, float)
) or self.average_interval <= 0:
raise ValueError("average_window must be a positive number")
if not isinstance(self.start_time, (int, float)) or \
not isinstance(self.stop_time, (int, float)) or \
Expand Down
161 changes: 161 additions & 0 deletions particula/data/stream_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
# pytype: skip-file
"""Functions to operate on stream objects."""

from typing import Optional, Union
import numpy as np

from particula.util import stats
from particula.data.stream import StreamAveraged


def drop_masked(stream: object, mask: np.ndarray) -> object:
"""Drop rows where mask is false, and return data stream.
Parameters
----------
stream : object
data stream object
mask : np.ndarray
mask to apply to data stream
Returns
-------
object
stream object
"""
stream.data = stream.data[:, mask]
stream.time = stream.time[mask]
return stream


def average_std(
stream: object,
average_interval: Union[float, int] = 60,
new_time_array: Optional[np.ndarray] = None,
) -> object:
"""
Calculate the average and standard deviation of data within a given
'stream' object over specified intervals.
This function takes a 'stream' object, which should contain time-series
data, and computes the average and standard deviation of the data at
intervals specified by 'average_interval'. If data.time is in seconds
then the units of the interval are seconds (hour in hours etc). The
results are returned as a new 'StreamAveraged' object containing the
processed data.
Parameters:
- stream (object): The input stream object containing 'time' and 'data'
arrays along with other associated metadata.
- average_interval (float|int, optional): The time interval over which the
averaging is to be performed.
- new_time_array (np.ndarray, optional): An optional array of time points
at which the average and standard deviation are computed.
If not provided, a new time array is generated based on the start and
end times within the 'stream.time' object.
Returns:
- StreamAveraged (object): An object of type 'StreamAveraged' containing
the averaged data, time array, start and stop times, the standard
deviation of the averaged data, and other metadata from the original
'stream' object.
The function checks for an existing 'new_time_array' and generates one if
needed. It then calculates the average and standard deviation for each
interval and constructs a 'StreamAveraged' object with the results and
metadata from the original 'stream' object.
"""
# check for new time array
if new_time_array is None:
# generate new time array from start and end times
new_time_array = np.arange(
start=stream.time[0],
stop=stream.time[-1],
step=average_interval
)
# generate empty arrays for averaged data and std to be filled in
average = np.zeros([len(stream.header), len(new_time_array)])
std = np.zeros_like(average)

# average data
average, std = stats.average_to_interval(
time_raw=stream.time,
data_raw=stream.data,
average_interval=average_interval,
average_interval_array=new_time_array,
average_data=average,
average_data_std=std,
)
# write to new StreamAveraged object and return
return StreamAveraged(
header=stream.header,
data=average,
time=new_time_array,
files=stream.files,
average_interval=average_interval,
start_time=new_time_array[0],
stop_time=new_time_array[-1],
standard_deviation=std,
)


# pylint: disable=too-many-arguments
def filtering(
stream: object,
bottom: Optional[float] = None,
top: Optional[float] = None,
value: Optional[float] = None,
invert: bool = False,
replace_with: Optional[Union[float, int]] = None,
drop: Optional[bool] = True,
) -> object:
"""
Filters the data of the given 'stream' object based on the specified
bounds or specific value. The filtered data can be either dropped or
replaced with a specified value. Note, not all parameters need to be
specified, but at least one must be provided (top, bottom, value)
Parameters:
- stream (Stream): The input stream object containing 'data' and 'time'
attributes.
- bottom (float, optional): The lower bound for filtering data. Defaults
to None.
- top (float, optional): The upper bound for filtering data.
Defaults to None.
- value (float, optional): Specific value to filter from data.
Defaults to None.
- invert (bool): If True, inverts the filter criteria.
Defaults to False.
- replace_with (float|int, optional): Value to replace filtered-out data.
Defaults to None.
- drop (bool, optional): If True, filtered-out data points are dropped
from the dataset. Defaults to False.
Returns:
- Stream: The 'stream' object with data filtered as specified.
If 'drop' is True, 'replace_with' is ignored and filtered data points are
removed from the 'stream' object. Otherwise, filtered data points are
replaced with 'replace_with' value.
add specific data row to filter on
"""
# Create a mask for the data that should be retained or replaced
mask = stats.mask_outliers(
data=stream.data,
bottom=bottom,
top=top,
value=value,
invert=invert
)

if drop and replace_with is None:
# Apply mask to data and time, dropping filtered values
# if any columns are then drop that whole column
mask_sum = np.invert(np.sum(np.invert(mask), axis=0) > 0)
stream = drop_masked(stream, mask_sum)
elif replace_with is not None:
stream.data = np.where(mask, stream.data, replace_with)
# No need to modify 'stream.time' as it remains consistent with
# 'stream.data'
return stream
6 changes: 3 additions & 3 deletions particula/data/tests/stream_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def test_stream_averaged_initialization():
data = np.array([1, 2, 3])
time = np.array([1.0, 2.0, 3.0])
files = ['file1', 'file2']
average_window = 1.0
average_interval = 1.0
start_time = 0.0
stop_time = 2.0
standard_deviation = np.array([0.1, 0.2, 0.3])
Expand All @@ -62,7 +62,7 @@ def test_stream_averaged_initialization():
data=data,
time=time,
files=files,
average_window=average_window,
average_interval=average_interval,
start_time=start_time,
stop_time=stop_time,
standard_deviation=standard_deviation)
Expand All @@ -72,7 +72,7 @@ def test_stream_averaged_initialization():
assert np.array_equal(stream_averaged.data, data)
assert np.array_equal(stream_averaged.time, time)
assert stream_averaged.files == files
assert stream_averaged.average_window == average_window
assert stream_averaged.average_interval == average_interval
assert stream_averaged.start_time == start_time
assert stream_averaged.stop_time == stop_time
assert np.array_equal(stream_averaged.standard_deviation,
Expand Down
Loading

0 comments on commit ba69862

Please sign in to comment.