Skip to content

Commit

Permalink
Create common function for converting data to parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
tomjholland committed Nov 29, 2024
1 parent 3b781e5 commit c38e2a3
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 44 deletions.
138 changes: 95 additions & 43 deletions pyprobe/cell.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
import shutil
import time
import warnings
from typing import Callable, Dict, List, Literal, Optional
import zipfile
from typing import Any, Callable, Dict, List, Optional
from typing import Any, Callable, Dict, List, Literal, Optional

import distinctipy
import polars as pl
Expand Down Expand Up @@ -62,10 +61,11 @@ def check_and_set_name(
values = info
return values

@validate_call
def process_cycler_file(
def _convert_to_parquet(
self,
cycler: str,
cycler: Literal[
"neware", "biologic", "biologic_MB", "arbin", "basytec", "maccor", "generic"
],
folder_path: str,
input_filename: str | Callable[[str], str],
output_filename: str | Callable[[str], str],
Expand All @@ -74,36 +74,40 @@ def process_cycler_file(
"performance", "file size", "uncompressed"
] = "performance",
overwrite_existing: bool = False,
column_dict: Optional[Dict[str, str]] = None,
) -> None:
"""Convert cycler file into PyProBE format.
"""Convert a file into PyProBE format.
Args:
cycler (str):
The cycler used to produce the data. Available cyclers are:
- 'neware'
- 'biologic'
- 'biologic_MB' (for Modulo Bat Biologic data)
folder_path (str):
cycler:
The cycler used to produce the data.
folder_path:
The path to the folder containing the data file.
input_filename (str | function):
input_filename:
A filename string or a function to generate the file name for cycler
data.
output_filename (str | function):
output_filename:
A filename string or a function to generate the file name for PyProBE
data.
filename_inputs (list):
filename_inputs:
The list of inputs to input_filename and output_filename, if they are
functions. These must be keys of the cell info.
compression_priority (str):
compression_priority:
The priority of the compression algorithm to use on the resulting
parquet file. Available options are:
- 'performance': Use the 'lz4' compression algorithm (default).
- 'file size': Use the 'zstd' compression algorithm.
- 'uncompressed': Do not use compression.
overwrite_existing (bool):
overwrite_existing:
If True, any existing parquet file with the output_filename will be
overwritten. If False, the function will skip the conversion if the
parquet file already exists.
column_dict:
A dictionary mapping the column names in the cycler file to the PyProBE
column names. The keys of the dictionary are the cycler column names and
the values are the PyProBE column names. You must use asterisks to
indicate the units of the columns. Only used for the 'generic' cycler.
E.g. :code:`{"V (*)": "Voltage [*]"}`.
"""
input_data_path = self._get_data_paths(
folder_path, input_filename, filename_inputs
Expand All @@ -120,9 +124,22 @@ def process_cycler_file(
"neware": neware.Neware,
"biologic": biologic.Biologic,
"biologic_MB": biologic.BiologicMB,
"arbin": arbin.Arbin,
"basytec": basytec.Basytec,
"maccor": maccor.Maccor,
}
t1 = time.time()
importer = cycler_dict[cycler](input_data_path=input_data_path)
if cycler == "generic":
if column_dict is None:
raise ValueError(
"column_dict must be provided for the generic cycler."
)
importer = basecycler.BaseCycler(
input_data_path=input_data_path,
column_dict=column_dict,
)
else:
importer = cycler_dict[cycler](input_data_path=input_data_path)
compression_dict = {
"uncompressed": "uncompressed",
"performance": "lz4",
Expand All @@ -137,6 +154,58 @@ def process_cycler_file(
else:
print(f"File {output_data_path} already exists. Skipping.")

@validate_call
def process_cycler_file(
self,
cycler: Literal[
"neware", "biologic", "biologic_MB", "arbin", "basytec", "maccor", "generic"
],
folder_path: str,
input_filename: str | Callable[[str], str],
output_filename: str | Callable[[str], str],
filename_inputs: Optional[List[str]] = None,
compression_priority: Literal[
"performance", "file size", "uncompressed"
] = "performance",
overwrite_existing: bool = False,
) -> None:
"""Convert a file into PyProBE format.
Args:
cycler:
The cycler used to produce the data.
folder_path:
The path to the folder containing the data file.
input_filename:
A filename string or a function to generate the file name for cycler
data.
output_filename:
A filename string or a function to generate the file name for PyProBE
data.
filename_inputs:
The list of inputs to input_filename and output_filename, if they are
functions. These must be keys of the cell info.
compression_priority:
The priority of the compression algorithm to use on the resulting
parquet file. Available options are:
- 'performance': Use the 'lz4' compression algorithm (default).
- 'file size': Use the 'zstd' compression algorithm.
- 'uncompressed': Do not use compression.
overwrite_existing:
If True, any existing parquet file with the output_filename will be
overwritten. If False, the function will skip the conversion if the
parquet file already exists.
"""
self._convert_to_parquet(
cycler,
folder_path,
input_filename,
output_filename,
filename_inputs,
compression_priority,
overwrite_existing,
)

@validate_call
def process_generic_file(
self,
Expand Down Expand Up @@ -169,39 +238,22 @@ def process_generic_file(
filename_inputs (list):
The list of inputs to input_filename and output_filename.
These must be keys of the cell info.
compression_priority (str):
compression_priority:
The priority of the compression algorithm to use on the resulting
parquet file. Available options are:
- 'performance': Use the 'lz4' compression algorithm (default).
- 'file size': Use the 'zstd' compression algorithm.
- 'uncompressed': Do not use compression.
"""
input_data_path = self._get_data_paths(
folder_path, input_filename, filename_inputs
)
output_data_path = self._get_data_paths(
folder_path, output_filename, filename_inputs
)
output_data_path = self._verify_parquet(output_data_path)
if "*" in output_data_path:
raise ValueError("* characters are not allowed for a complete data path")

t1 = time.time()
importer = basecycler.BaseCycler(
input_data_path=input_data_path,
self._convert_to_parquet(
"generic",
folder_path,
input_filename,
output_filename,
filename_inputs,
compression_priority,
column_dict=column_dict,
)
compression_dict = {
"uncompressed": "uncompressed",
"performance": "lz4",
"file size": "zstd",
}
self._write_parquet(
importer,
output_data_path,
compression=compression_dict[compression_priority],
)
print(f"\tparquet written in {time.time()-t1: .2f} seconds.")

@validate_call
def add_procedure(
Expand Down
Binary file modified tests/sample_data/neware/sample_data_neware.parquet
Binary file not shown.
9 changes: 8 additions & 1 deletion tests/test_cell.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,14 @@ def test_process_cycler_file(cell_instance, lazyframe_fixture):
folder_path = "tests/sample_data/neware/"
file_name = "sample_data_neware.xlsx"
output_name = "sample_data_neware.parquet"
cell_instance.process_cycler_file("neware", folder_path, file_name, output_name)
cell_instance.process_cycler_file(
"neware",
folder_path,
file_name,
output_name,
compression_priority="file size",
overwrite_existing=True,
)
expected_dataframe = lazyframe_fixture.collect()
expected_dataframe = expected_dataframe.with_columns(
pl.col("Date").dt.cast_time_unit("us")
Expand Down

0 comments on commit c38e2a3

Please sign in to comment.