Skip to content

Commit

Permalink
NaN filing between partitions with overlap (#165)
Browse files Browse the repository at this point in the history
* simplify NaN filling in FlashLoader and make it lazy

* add the map_overlap method to do intrafile filling

* add ffill as dfops with tests

* add optional df len computation

* add test for df len compute

* implement multipass ffill

* add config for ffill multipass iteration

* add pyarrow

* load metadata directly for parquet files using pyarrow or speedup

---------

Co-authored-by: Steinn Ymir Agustsson <[email protected]>
  • Loading branch information
zain-sohail and steinnymir authored Oct 10, 2023
1 parent 8f49f40 commit e3af145
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 66 deletions.
75 changes: 71 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ tifffile = ">=2022.2.9, <2023.0.0"
tqdm = "^4.62.3"
xarray = "^0.20.2"
joblib = "^1.2.0"
pyarrow = "^13.0.0"
jupyter = {version = "^1.0.0", extras = ["notebook"], optional = true}
ipykernel = {version = "^6.9.1", extras = ["notebook"], optional = true}
sphinx = {version = ">4.4.0", extras = ["docs"], optional = true}
Expand Down
3 changes: 2 additions & 1 deletion sed/config/flash_example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ core:
dataframe:
# The offset correction to the pulseId
ubid_offset: 5

# the number of iterations to fill the pulseId forward.
forward_fill_iterations: 2
# The name of the DAQ system to use. Necessary to resolve the filenames/paths.
daq: fl1user3

Expand Down
55 changes: 55 additions & 0 deletions sed/core/dfops.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Union

import dask.dataframe
from dask.diagnostics import ProgressBar
import numpy as np
import pandas as pd

Expand Down Expand Up @@ -138,3 +139,57 @@ def map_columns_2d(
)

return df


def forward_fill_lazy(
df: dask.dataframe.DataFrame,
channels: Sequence[str],
before: Union[str, int] = 'max',
compute_lengths: bool = False,
iterations: int = 2,
) -> dask.dataframe.DataFrame:
"""Forward fill the specified columns multiple times in a dask dataframe.
Allows forward filling between partitions. This is useful for dataframes
that have sparse data, such as those with many NaNs.
Runnin the forward filling multiple times can fix the issue of having
entire partitions consisting of NaNs. By default we run this twice, which
is enough to fix the issue for dataframes with no consecutive partitions of NaNs.
Args:
df (dask.dataframe.DataFrame): The dataframe to forward fill.
channels (list): The columns to forward fill.
before (int, str, optional): The number of rows to include before the current partition.
if 'max' it takes as much as possible from the previous partition, which is
the size of the smallest partition in the dataframe. Defaults to 'max'.
after (int, optional): The number of rows to include after the current partition.
Defaults to 'part'.
compute_lengths (bool, optional): Whether to compute the length of each partition
iterations (int, optional): The number of times to forward fill the dataframe.
Returns:
dask.dataframe.DataFrame: The dataframe with the specified columns forward filled.
"""
# Define a custom function to forward fill specified columns
def forward_fill_partition(df):
df[channels] = df[channels].ffill()
return df

# calculate the number of rows in each partition and choose least
if before == 'max':
nrows = df.map_partitions(len)
if compute_lengths:
with ProgressBar():
print("Computing dataframe shape...")
nrows = nrows.compute()
before = min(nrows)
elif not isinstance(before, int):
raise TypeError('before must be an integer or "max"')
# Use map_overlap to apply forward_fill_partition
for _ in range(iterations):
df = df.map_overlap(
forward_fill_partition,
before=before,
after=0,
)
return df
Loading

0 comments on commit e3af145

Please sign in to comment.