Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add join concatenation parameter distinction #181

Merged
merged 6 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions cytotable/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -995,7 +995,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
page_keys: Dict[str, str],
data_type_cast_map: Optional[Dict[str, str]] = None,
**kwargs,
) -> Union[Dict[str, List[Dict[str, Any]]], str]:
) -> Union[Dict[str, List[Dict[str, Any]]], List[Any], str]:
"""
Export data to parquet.

Expand Down Expand Up @@ -1244,15 +1244,19 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
).result()
]

# concat our join chunks together as one cohesive dataset
# return results in common format which includes metadata
# for lineage and debugging
results = _concat_join_sources(
dest_path=expanded_dest_path,
join_sources=[join.result() for join in join_sources_result],
sources=evaluated_results,
sort_output=sort_output,
)
if concat:
# concat our join chunks together as one cohesive dataset
# return results in common format which includes metadata
# for lineage and debugging
results = _concat_join_sources(
dest_path=expanded_dest_path,
join_sources=[join.result() for join in join_sources_result],
sources=evaluated_results,
sort_output=sort_output,
)
else:
# else we leave the joined chunks as-is and return them
return evaluate_futures(join_sources_result)

# wrap the final result as a future and return
return evaluate_futures(results)
Expand All @@ -1278,7 +1282,7 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals
preset: Optional[str] = "cellprofiler_csv",
parsl_config: Optional[parsl.Config] = None,
**kwargs,
) -> Union[Dict[str, List[Dict[str, Any]]], str]:
) -> Union[Dict[str, List[Dict[str, Any]]], List[Any], str]:
"""
Convert file-based data from various sources to Pycytominer-compatible standards.

Expand Down
6 changes: 4 additions & 2 deletions cytotable/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,14 +537,16 @@ def _unwrap_source(
return _unwrap_value(source)


def evaluate_futures(sources: Union[Dict[str, List[Dict[str, Any]]], str]) -> Any:
def evaluate_futures(
sources: Union[Dict[str, List[Dict[str, Any]]], List[Any], str]
) -> Any:
"""
Evaluates any Parsl futures for use within other tasks.
This enables a pattern of Parsl app usage as "tasks" and delayed
future result evaluation for concurrency.

Args:
sources: Union[Dict[str, List[Dict[str, Any]]], str]
sources: Union[Dict[str, List[Dict[str, Any]]], List[Any], str]
Sources are an internal data structure used by CytoTable for
processing and organizing data results. They may include futures
which require asynchronous processing through Parsl, so we
Expand Down
44 changes: 44 additions & 0 deletions tests/test_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,50 @@ def test_convert_cellprofiler_sqlite(
assert test_result.equals(control_result)


def test_convert_cellprofiler_sqlite_join_with_no_concat(
load_parsl_default: None,
fx_tempdir: str,
data_dir_cellprofiler: str,
cellprofiler_merged_nf1data: pa.Table,
):
"""
Tests convert with cellprofiler sqlite exports
using joins but no concatenation.
"""

control_result = cellprofiler_merged_nf1data

# gather results as a joined list of chunk files which aren't concatenated
test_result_data = convert(
source_path=(
f"{data_dir_cellprofiler}/NF1_SchwannCell_data/all_cellprofiler.sqlite"
),
dest_path=f"{fx_tempdir}/NF1_data.parquet",
dest_datatype="parquet",
source_datatype="sqlite",
preset="cellprofiler_sqlite",
# explicitly set the chunk size to receive multiple chunk files
chunk_size=100,
join=True,
concat=False,
)

# read the result files as a single pyarrow table
test_result = parquet.ParquetDataset(path_or_paths=test_result_data).read()

# sort all values by the same columns
# we do this due to the potential for inconsistently ordered results
control_result = control_result.sort_by(
[(colname, "ascending") for colname in control_result.column_names]
)
test_result = test_result.sort_by(
[(colname, "ascending") for colname in test_result.column_names]
)

assert test_result.shape == control_result.shape
assert test_result.equals(control_result)


def test_convert_cellprofiler_csv(
load_parsl_default: None,
fx_tempdir: str,
Expand Down