From 67d774f1406b65d5e4d058718d0c95b73000f781 Mon Sep 17 00:00:00 2001 From: d33bs Date: Tue, 30 Jul 2024 17:05:42 -0600 Subject: [PATCH 01/27] move to keyset pagination --- cytotable/convert.py | 209 ++++++++++++++++++++++--------------------- cytotable/presets.py | 38 ++++++++ cytotable/utils.py | 38 +++----- 3 files changed, 157 insertions(+), 128 deletions(-) diff --git a/cytotable/convert.py b/cytotable/convert.py index a6e3e26..fc92d41 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -183,13 +183,14 @@ def _prep_cast_column_data_types( @python_app -def _get_table_chunk_offsets( +def _get_table_keyset_pagination_sets( chunk_size: int, + page_key: str, source: Optional[Dict[str, Any]] = None, sql_stmt: Optional[str] = None, -) -> Union[List[int], None]: +) -> Union[List[Any], None]: """ - Get table data chunk offsets for later use in capturing segments + Get table data chunk keys for later use in capturing segments of values. This work also provides a chance to catch problematic input data which will be ignored with warnings. @@ -199,15 +200,15 @@ def _get_table_chunk_offsets( file or table of some kind. chunk_size: int The size in rowcount of the chunks to create. + page_key: str + The column name to be used as the key for pagination. Returns: - List[int] - List of integers which represent offsets to use for reading - the data later on. + List[Any] + List of keys to use for reading the data later on. """ import logging - import pathlib import duckdb from cloudpathlib import AnyPath, CloudPath @@ -223,18 +224,15 @@ def _get_table_chunk_offsets( source_type = str(source_path.suffix).lower() try: - # gather the total rowcount from csv or sqlite data input sources with _duckdb_reader() as ddb_reader: - rowcount = int( - ddb_reader.execute( - # nosec - f"SELECT COUNT(*) from read_csv_auto('{source_path}', header=TRUE, delim=',')" - if source_type == ".csv" - else f"SELECT COUNT(*) from sqlite_scan('{source_path}', '{table_name}')" - ).fetchone()[0] - ) + if source_type == ".csv": + sql_query = f"SELECT {page_key} FROM read_csv_auto('{source_path}', header=TRUE, delim=',') ORDER BY {page_key}" + else: + sql_query = f"SELECT {page_key} FROM sqlite_scan('{source_path}', '{table_name}') ORDER BY {page_key}" + + keys = ddb_reader.execute(sql_query).fetchall() + keys = [key[0] for key in keys] - # catch input errors which will result in skipped files except ( duckdb.InvalidInputException, NoInputDataException, @@ -245,26 +243,19 @@ def _get_table_chunk_offsets( return None - # find chunk offsets from sql statement elif sql_stmt is not None: - # gather the total rowcount from csv or sqlite data input sources with _duckdb_reader() as ddb_reader: - rowcount = int( - ddb_reader.execute( - # nosec - f"SELECT COUNT(*) FROM ({sql_stmt})" - ).fetchone()[0] - ) + sql_query = f"SELECT {page_key} FROM ({sql_stmt}) ORDER BY {page_key}" + keys = ddb_reader.execute(sql_query).fetchall() + keys = [key[0] for key in keys] + + # Create chunks of keys + chunks = [ + ((values := keys[i : i + chunk_size])[0], values[-1]) + for i in range(0, len(keys), chunk_size) + ] - return list( - range( - 0, - # gather rowcount from table and use as maximum for range - rowcount, - # step through using chunk size - chunk_size, - ) - ) + return chunks @python_app @@ -272,7 +263,7 @@ def _source_chunk_to_parquet( source_group_name: str, source: Dict[str, Any], chunk_size: int, - offset: int, + pageset: Tuple[int, int], dest_path: str, sort_output: bool, ) -> str: @@ -287,8 +278,10 @@ def _source_chunk_to_parquet( file or table of some kind along with collected information about table. chunk_size: int Row count to use for chunked output. - offset: int - The offset for chunking the data from source. + page_keys: Dict[str, str] + ... + pageset: Tuple[int, int] + The pageset for chunking the data from source. dest_path: str Path to store the output data. sort_output: bool @@ -303,7 +296,6 @@ def _source_chunk_to_parquet( import duckdb from cloudpathlib import AnyPath - from pyarrow import parquet from cytotable.constants import CYOTABLE_META_COLUMN_TYPES from cytotable.utils import ( @@ -324,21 +316,7 @@ def _source_chunk_to_parquet( if "table_name" not in source.keys() else f"{source['source_path']}_table_{source['table_name']}" ) - # build the column selection block of query - # add cytotable metadata columns - cytotable_metadata_cols = [ - ( - f"CAST( '{source_path_str}' " - f"AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_source_path']})" - ' AS "cytotable_meta_source_path"' - ), - f"CAST( {offset} AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_offset']}) AS \"cytotable_meta_offset\"", - ( - f"CAST( (row_number() OVER ()) AS {CYOTABLE_META_COLUMN_TYPES['cytotable_meta_rownum']})" - ' AS "cytotable_meta_rownum"' - ), - ] # add source table columns casted_source_cols = [ # here we cast the column to the specified type ensure the colname remains the same @@ -349,7 +327,7 @@ def _source_chunk_to_parquet( # create selection statement from lists above select_columns = ",".join( # if we should sort the output, add the metadata_cols - cytotable_metadata_cols + casted_source_cols + casted_source_cols if sort_output else casted_source_cols ) @@ -364,7 +342,7 @@ def _source_chunk_to_parquet( base_query = f"SELECT {select_columns} FROM sqlite_scan('{str(source['source_path'])}', '{str(source['table_name'])}')" result_filepath_base = f"{source_dest_path}/{str(source['source_path'].stem)}.{source['table_name']}" - result_filepath = f"{result_filepath_base}-{offset}.parquet" + result_filepath = f"{result_filepath_base}-{pageset[0]}-{pageset[1]}.parquet" # Attempt to read the data to parquet file # using duckdb for extraction and pyarrow for @@ -377,14 +355,14 @@ def _source_chunk_to_parquet( table=ddb_reader.execute( f""" {base_query} + WHERE {source['page_key']} BETWEEN {pageset[0]} AND {pageset[1]} /* order by all columns for deterministic output */ - ORDER BY ALL - LIMIT {chunk_size} OFFSET {offset} + ORDER BY {source['page_key']} """ if sort_output else f""" {base_query} - LIMIT {chunk_size} OFFSET {offset} + WHERE {source['page_key']} BETWEEN {pageset[0]} AND {pageset[1]} """ ).arrow(), where=result_filepath, @@ -407,8 +385,8 @@ def _source_chunk_to_parquet( source_path=str(source["source_path"]), table_name=str(source["table_name"]), chunk_size=chunk_size, - offset=offset, - add_cytotable_meta=True if sort_output else False, + page_key=source["page_key"], + pageset=pageset, sort_output=sort_output, ), where=result_filepath, @@ -741,8 +719,6 @@ def _prepare_join_sql( """ import pathlib - from cytotable.constants import CYOTABLE_META_COLUMN_TYPES - # replace with real location of sources for join sql order_by_tables = [] for key, val in sources.items(): @@ -754,17 +730,8 @@ def _prepare_join_sql( ) order_by_tables.append(table_name) - # create order by statement with from all tables using cytotable metadata - order_by_sql = "ORDER BY " + ", ".join( - [ - f"{table}.{meta_column}" - for table in order_by_tables - for meta_column in CYOTABLE_META_COLUMN_TYPES - ] - ) - # add the order by statements to the join - return joins + order_by_sql if sort_output else joins + return joins @python_app @@ -772,7 +739,9 @@ def _join_source_chunk( dest_path: str, joins: str, chunk_size: int, - offset: int, + page_key: str, + pageset: Tuple[int, int], + sort_output: bool, drop_null: bool, ) -> str: """ @@ -798,31 +767,45 @@ def _join_source_chunk( import pathlib - from cytotable.constants import CYOTABLE_META_COLUMN_TYPES from cytotable.utils import _duckdb_reader, _write_parquet_table_with_metadata - # Attempt to read the data to parquet file - # using duckdb for extraction and pyarrow for - # writing data to a parquet file. - # read data with chunk size + offset - # and export to parquet - exclude_meta_cols = [ - f"c NOT LIKE '{col}%'" for col in list(CYOTABLE_META_COLUMN_TYPES.keys()) - ] + print( + f""" + WITH joined AS ( + {joins} + ) + WHERE {page_key} BETWEEN {pageset[0]} AND {pageset[1]} + ORDER BY {page_key}; + """ + if sort_output + else f""" + WITH joined AS ( + {joins} + ) + WHERE {page_key} BETWEEN {pageset[0]} AND {pageset[1]}; + """ + ) with _duckdb_reader() as ddb_reader: result = ddb_reader.execute( f""" - WITH joined AS ( + WITH joined AS ( {joins} - LIMIT {chunk_size} OFFSET {offset} - ) - SELECT - /* exclude metadata columns from the results - by using a lambda on column names based on exclude_meta_cols. */ - COLUMNS (c -> ({" AND ".join(exclude_meta_cols)})) - FROM joined; - """ + ) + SELECT * + FROM joined + WHERE {page_key} BETWEEN {pageset[0]} AND {pageset[1]} + ORDER BY {page_key}; + """ + if sort_output + else f""" + WITH joined AS ( + {joins} + ) + SELECT * + FROM joined + WHERE {page_key} BETWEEN {pageset[0]} AND {pageset[1]}; + """ ).arrow() # drop nulls if specified @@ -1042,6 +1025,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals infer_common_schema: bool, drop_null: bool, sort_output: bool, + page_keys: Dict[str, str], data_type_cast_map: Optional[Dict[str, str]] = None, **kwargs, ) -> Union[Dict[str, List[Dict[str, Any]]], str]: @@ -1082,6 +1066,9 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals Whether to drop null results. sort_output: bool Specifies whether to sort cytotable output or not. + page_keys: Dict[str, str] + A dictionary which defines which column names are used for keyset pagination + in order to perform data extraction. data_type_cast_map: Dict[str, str] A dictionary mapping data type groups to specific types. Roughly includes Arrow data types language from: @@ -1112,16 +1099,24 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals # expand the destination path expanded_dest_path = _expand_path(path=dest_path) - # prepare offsets for chunked data export from source tables - offsets_prepared = { + # prepare pagesets for chunked data export from source tables + pagesets_prepared = { source_group_name: [ dict( source, **{ - "offsets": _get_table_chunk_offsets( + "page_key": ( + page_key := [ + value + for key, value in page_keys.items() + if key.lower() in source_group_name.lower() + ][0] + ), + "pagesets": _get_table_keyset_pagination_sets( source=source, chunk_size=chunk_size, - ) + page_key=page_key, + ), }, ) for source in source_group_vals @@ -1136,10 +1131,10 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals # ensure we have offsets source for source in source_group_vals - if source["offsets"] is not None + if source["pagesets"] is not None ] for source_group_name, source_group_vals in evaluate_futures( - offsets_prepared + pagesets_prepared ).items() # ensure we have source_groups with at least one source table if len(source_group_vals) > 0 @@ -1177,7 +1172,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals source_group_name=source_group_name, source=source, chunk_size=chunk_size, - offset=offset, + pageset=pageset, dest_path=expanded_dest_path, sort_output=sort_output, ), @@ -1186,7 +1181,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals metadata=metadata, compartments=compartments, ) - for offset in source["offsets"] + for pageset in source["pagesets"] ] }, ) @@ -1243,6 +1238,10 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals sources=evaluated_results, joins=joins, sort_output=sort_output ).result() + page_key_join = [ + value for key, value in page_keys.items() if key.lower() == "join" + ][0] + # map joined results based on the join groups gathered above # note: after mapping we end up with a list of strings (task returns str) join_sources_result = [ @@ -1253,15 +1252,18 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals dest_path=expanded_dest_path, joins=prepared_joins_sql, chunk_size=chunk_size, - offset=offset, + page_key=page_key_join, + pageset=pageset, + sort_output=sort_output, drop_null=drop_null, ) # create join group for querying the concatenated # data in order to perform memory-safe joining # per user chunk size specification. - for offset in _get_table_chunk_offsets( + for pageset in _get_table_keyset_pagination_sets( sql_stmt=prepared_joins_sql, chunk_size=chunk_size, + page_key=page_key_join, ).result() ] @@ -1293,6 +1295,7 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals infer_common_schema: bool = True, drop_null: bool = False, data_type_cast_map: Optional[Dict[str, str]] = None, + page_keys: Dict[str, str] = None, sort_output: bool = True, preset: Optional[str] = "cellprofiler_csv", parsl_config: Optional[parsl.Config] = None, @@ -1440,6 +1443,11 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals if chunk_size is None else chunk_size ) + page_keys = ( + cast(dict, config[preset]["CONFIG_PAGE_KEYS"]) + if page_keys is None + else page_keys + ) # send sources to be written to parquet if selected if dest_datatype == "parquet": @@ -1458,6 +1466,7 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals drop_null=drop_null, data_type_cast_map=data_type_cast_map, sort_output=sort_output, + page_keys=page_keys, **kwargs, ) diff --git a/cytotable/presets.py b/cytotable/presets.py index c53e958..7100d5b 100644 --- a/cytotable/presets.py +++ b/cytotable/presets.py @@ -22,6 +22,13 @@ "Parent_Cells", "Parent_Nuclei", ), + "CONFIG_PAGE_KEYS": { + "image": "ImageNumber", + "cells": "ObjectNumber", + "nuclei": "ObjectNumber", + "cytoplasm": "ObjectNumber", + "join": "Cytoplasm_Number_Object_Number" + }, # chunk size to use for join operations to help with possible performance issues # note: this number is an estimate and is may need changes contingent on data # and system used by this library. @@ -61,6 +68,13 @@ "Parent_Cells", "Parent_Nuclei", ), + "CONFIG_PAGE_KEYS": { + "image": "ImageNumber", + "cells": "ObjectNumber", + "nuclei": "ObjectNumber", + "cytoplasm": "ObjectNumber", + "join": "Cytoplasm_Number_Object_Number" + }, # chunk size to use for join operations to help with possible performance issues # note: this number is an estimate and is may need changes contingent on data # and system used by this library. @@ -104,6 +118,13 @@ "Parent_Cells", "Parent_Nuclei", ), + "CONFIG_PAGE_KEYS": { + "image": "ImageNumber", + "cells": "ObjectNumber", + "nuclei": "ObjectNumber", + "cytoplasm": "ObjectNumber", + "join": "Cytoplasm_Number_Object_Number" + }, # chunk size to use for join operations to help with possible performance issues # note: this number is an estimate and is may need changes contingent on data # and system used by this library. @@ -155,6 +176,13 @@ "Cells_Number_Object_Number", "Nuclei_Number_Object_Number", ), + "CONFIG_PAGE_KEYS": { + "image": "ImageNumber", + "cells": "ObjectNumber", + "nuclei": "ObjectNumber", + "cytoplasm": "ObjectNumber", + "join": "Cytoplasm_Number_Object_Number" + }, # chunk size to use for join operations to help with possible performance issues # note: this number is an estimate and is may need changes contingent on data # and system used by this library. @@ -203,6 +231,13 @@ "Cells_ObjectNumber", "Nuclei_ObjectNumber", ), + "CONFIG_PAGE_KEYS": { + "image": "ImageNumber", + "cells": "ObjectNumber", + "nuclei": "ObjectNumber", + "cytoplasm": "ObjectNumber", + "join": "Cytoplasm_Number_Object_Number" + }, # chunk size to use for join operations to help with possible performance issues # note: this number is an estimate and is may need changes contingent on data # and system used by this library. @@ -248,6 +283,9 @@ "Z", "T", ), + "CONFIG_PAGE_KEYS": { + "test": "OBJECT ID", + }, # chunk size to use for join operations to help with possible performance issues # note: this number is an estimate and is may need changes contingent on data # and system used by this library. diff --git a/cytotable/utils.py b/cytotable/utils.py index c95d4fb..d7be010 100644 --- a/cytotable/utils.py +++ b/cytotable/utils.py @@ -5,7 +5,7 @@ import logging import os import pathlib -from typing import Any, Dict, List, Optional, Union, cast +from typing import Any, Dict, List, Optional, Union, cast, Tuple import duckdb import parsl @@ -174,9 +174,9 @@ def _sqlite_mixed_type_query_to_parquet( source_path: str, table_name: str, chunk_size: int, - offset: int, + page_key: str, + pageset: Tuple[int, int], sort_output: bool, - add_cytotable_meta: bool = False, ) -> str: """ Performs SQLite table data extraction where one or many @@ -190,8 +190,10 @@ def _sqlite_mixed_type_query_to_parquet( The name of the table being queried. chunk_size: int: Row count to use for chunked output. - offset: int: - The offset for chunking the data from source. + page_key: str, + ... + pageset: Tuple[int, int]: + The pageset for chunking the data from source. sort_output: bool Specifies whether to sort cytotable output or not. add_cytotable_meta: bool, default=False: @@ -206,7 +208,6 @@ def _sqlite_mixed_type_query_to_parquet( import pyarrow as pa from cytotable.constants import ( - CYOTABLE_META_COLUMN_TYPES, SQLITE_AFFINITY_DATA_TYPE_SYNONYMS, ) from cytotable.exceptions import DatatypeException @@ -268,40 +269,21 @@ def _sqlite_affinity_data_type_lookup(col_type: str) -> str: for col in column_info ] - if add_cytotable_meta: - query_parts += [ - ( - f"CAST( '{f'{source_path}_table_{table_name}'}' " - f"AS {_sqlite_affinity_data_type_lookup(CYOTABLE_META_COLUMN_TYPES['cytotable_meta_source_path'].lower())}) " - "AS cytotable_meta_source_path" - ), - ( - f"CAST( {offset} " - f"AS {_sqlite_affinity_data_type_lookup(CYOTABLE_META_COLUMN_TYPES['cytotable_meta_offset'].lower())}) " - "AS cytotable_meta_offset" - ), - ( - f"CAST( (ROW_NUMBER() OVER ()) AS " - f"{_sqlite_affinity_data_type_lookup(CYOTABLE_META_COLUMN_TYPES['cytotable_meta_rownum'].lower())}) " - "AS cytotable_meta_rownum" - ), - ] - # perform the select using the cases built above and using chunksize + offset sql_stmt = ( f""" SELECT {', '.join(query_parts)} FROM {table_name} - ORDER BY {', '.join([col['column_name'] for col in column_info])} - LIMIT {chunk_size} OFFSET {offset}; + WHERE {page_key} BETWEEN {pageset[0]} AND {pageset[1]} + ORDER BY {page_key}; """ if sort_output else f""" SELECT {', '.join(query_parts)} FROM {table_name} - LIMIT {chunk_size} OFFSET {offset}; + WHERE {page_key} BETWEEN {pageset[0]} AND {pageset[1]}; """ ) From 1a4c6ed0b1ca94c97685a3af3e809ab089db7503 Mon Sep 17 00:00:00 2001 From: d33bs Date: Tue, 30 Jul 2024 17:08:14 -0600 Subject: [PATCH 02/27] linting --- cytotable/presets.py | 10 +++++----- cytotable/utils.py | 6 ++---- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/cytotable/presets.py b/cytotable/presets.py index 7100d5b..22cffeb 100644 --- a/cytotable/presets.py +++ b/cytotable/presets.py @@ -27,7 +27,7 @@ "cells": "ObjectNumber", "nuclei": "ObjectNumber", "cytoplasm": "ObjectNumber", - "join": "Cytoplasm_Number_Object_Number" + "join": "Cytoplasm_Number_Object_Number", }, # chunk size to use for join operations to help with possible performance issues # note: this number is an estimate and is may need changes contingent on data @@ -73,7 +73,7 @@ "cells": "ObjectNumber", "nuclei": "ObjectNumber", "cytoplasm": "ObjectNumber", - "join": "Cytoplasm_Number_Object_Number" + "join": "Cytoplasm_Number_Object_Number", }, # chunk size to use for join operations to help with possible performance issues # note: this number is an estimate and is may need changes contingent on data @@ -123,7 +123,7 @@ "cells": "ObjectNumber", "nuclei": "ObjectNumber", "cytoplasm": "ObjectNumber", - "join": "Cytoplasm_Number_Object_Number" + "join": "Cytoplasm_Number_Object_Number", }, # chunk size to use for join operations to help with possible performance issues # note: this number is an estimate and is may need changes contingent on data @@ -181,7 +181,7 @@ "cells": "ObjectNumber", "nuclei": "ObjectNumber", "cytoplasm": "ObjectNumber", - "join": "Cytoplasm_Number_Object_Number" + "join": "Cytoplasm_Number_Object_Number", }, # chunk size to use for join operations to help with possible performance issues # note: this number is an estimate and is may need changes contingent on data @@ -236,7 +236,7 @@ "cells": "ObjectNumber", "nuclei": "ObjectNumber", "cytoplasm": "ObjectNumber", - "join": "Cytoplasm_Number_Object_Number" + "join": "Cytoplasm_Number_Object_Number", }, # chunk size to use for join operations to help with possible performance issues # note: this number is an estimate and is may need changes contingent on data diff --git a/cytotable/utils.py b/cytotable/utils.py index d7be010..de266c4 100644 --- a/cytotable/utils.py +++ b/cytotable/utils.py @@ -5,7 +5,7 @@ import logging import os import pathlib -from typing import Any, Dict, List, Optional, Union, cast, Tuple +from typing import Any, Dict, List, Optional, Tuple, Union, cast import duckdb import parsl @@ -207,9 +207,7 @@ def _sqlite_mixed_type_query_to_parquet( import pyarrow as pa - from cytotable.constants import ( - SQLITE_AFFINITY_DATA_TYPE_SYNONYMS, - ) + from cytotable.constants import SQLITE_AFFINITY_DATA_TYPE_SYNONYMS from cytotable.exceptions import DatatypeException # open sqlite3 connection From df1f7a55323114c85db51127e02bd9fd934dfd30 Mon Sep 17 00:00:00 2001 From: d33bs Date: Wed, 31 Jul 2024 10:30:05 -0600 Subject: [PATCH 03/27] adjustments for further data integration --- cytotable/convert.py | 57 ++++++++++++-------------------------------- cytotable/presets.py | 14 +++++------ cytotable/utils.py | 3 --- 3 files changed, 22 insertions(+), 52 deletions(-) diff --git a/cytotable/convert.py b/cytotable/convert.py index fc92d41..c44beb3 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -109,13 +109,8 @@ def _get_table_columns_and_types( arrow_data_tbl = _sqlite_mixed_type_query_to_parquet( source_path=str(source["source_path"]), table_name=str(source["table_name"]), - # chunk size is set to 5 as a limit similar - # to above SQL within select_query variable - chunk_size=5, - # offset is set to 0 start at first row - # result from table - offset=0, - add_cytotable_meta=False, + page_key=source["page_key"], + pageset=source["pagesets"][0], sort_output=sort_output, ) with _duckdb_reader() as ddb_reader: @@ -357,12 +352,7 @@ def _source_chunk_to_parquet( {base_query} WHERE {source['page_key']} BETWEEN {pageset[0]} AND {pageset[1]} /* order by all columns for deterministic output */ - ORDER BY {source['page_key']} - """ - if sort_output - else f""" - {base_query} - WHERE {source['page_key']} BETWEEN {pageset[0]} AND {pageset[1]} + {"ORDER BY " + source['page_key'] if sort_output else ""}; """ ).arrow(), where=result_filepath, @@ -384,7 +374,6 @@ def _source_chunk_to_parquet( table=_sqlite_mixed_type_query_to_parquet( source_path=str(source["source_path"]), table_name=str(source["table_name"]), - chunk_size=chunk_size, page_key=source["page_key"], pageset=pageset, sort_output=sort_output, @@ -450,7 +439,7 @@ def _prepend_column_name( if len(targets) == 0: logger.warning( msg=( - "Skipping column name prepend operations" + "Skipping column name prepend operations " "because no compartments or metadata were provided." ) ) @@ -769,23 +758,6 @@ def _join_source_chunk( from cytotable.utils import _duckdb_reader, _write_parquet_table_with_metadata - print( - f""" - WITH joined AS ( - {joins} - ) - WHERE {page_key} BETWEEN {pageset[0]} AND {pageset[1]} - ORDER BY {page_key}; - """ - if sort_output - else f""" - WITH joined AS ( - {joins} - ) - WHERE {page_key} BETWEEN {pageset[0]} AND {pageset[1]}; - """ - ) - with _duckdb_reader() as ddb_reader: result = ddb_reader.execute( f""" @@ -795,16 +767,7 @@ def _join_source_chunk( SELECT * FROM joined WHERE {page_key} BETWEEN {pageset[0]} AND {pageset[1]} - ORDER BY {page_key}; - """ - if sort_output - else f""" - WITH joined AS ( - {joins} - ) - SELECT * - FROM joined - WHERE {page_key} BETWEEN {pageset[0]} AND {pageset[1]}; + {"ORDER BY " + page_key if sort_output else ""}; """ ).arrow() @@ -1449,6 +1412,16 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals else page_keys ) + # Raise an exception for scenarios where one configures CytoTable to join + # but does not provide a pagination key for the joins. + if join and (page_keys is None or "join" not in page_keys.keys()): + raise CytoTableException( + ( + "When using join=True one must pass a 'join' pagination key " + "in the page_keys parameter." + ) + ) + # send sources to be written to parquet if selected if dest_datatype == "parquet": output = _to_parquet( diff --git a/cytotable/presets.py b/cytotable/presets.py index 22cffeb..02225ee 100644 --- a/cytotable/presets.py +++ b/cytotable/presets.py @@ -70,9 +70,9 @@ ), "CONFIG_PAGE_KEYS": { "image": "ImageNumber", - "cells": "ObjectNumber", - "nuclei": "ObjectNumber", - "cytoplasm": "ObjectNumber", + "cells": "Cells_Number_Object_Number", + "nuclei": "Nuclei_Number_Object_Number", + "cytoplasm": "Cytoplasm_Number_Object_Number", "join": "Cytoplasm_Number_Object_Number", }, # chunk size to use for join operations to help with possible performance issues @@ -178,9 +178,9 @@ ), "CONFIG_PAGE_KEYS": { "image": "ImageNumber", - "cells": "ObjectNumber", - "nuclei": "ObjectNumber", - "cytoplasm": "ObjectNumber", + "cells": "Cells_Number_Object_Number", + "nuclei": "Nuclei_Number_Object_Number", + "cytoplasm": "Cytoplasm_Number_Object_Number", "join": "Cytoplasm_Number_Object_Number", }, # chunk size to use for join operations to help with possible performance issues @@ -284,7 +284,7 @@ "T", ), "CONFIG_PAGE_KEYS": { - "test": "OBJECT ID", + "test": '"OBJECT ID"', }, # chunk size to use for join operations to help with possible performance issues # note: this number is an estimate and is may need changes contingent on data diff --git a/cytotable/utils.py b/cytotable/utils.py index de266c4..4f43a74 100644 --- a/cytotable/utils.py +++ b/cytotable/utils.py @@ -173,7 +173,6 @@ def _duckdb_reader() -> duckdb.DuckDBPyConnection: def _sqlite_mixed_type_query_to_parquet( source_path: str, table_name: str, - chunk_size: int, page_key: str, pageset: Tuple[int, int], sort_output: bool, @@ -188,8 +187,6 @@ def _sqlite_mixed_type_query_to_parquet( A str which is a path to a SQLite database file. table_name: str: The name of the table being queried. - chunk_size: int: - Row count to use for chunked output. page_key: str, ... pageset: Tuple[int, int]: From 23e8f0b2a4bf7674ee7b892fecdec8b26b39e1b6 Mon Sep 17 00:00:00 2001 From: d33bs Date: Wed, 31 Jul 2024 16:01:26 -0600 Subject: [PATCH 04/27] test adjustments --- cytotable/convert.py | 65 +++++++++++++++++++++++++++++--------- cytotable/utils.py | 9 +----- tests/test_convert.py | 73 +++++++++++++++++++++++++++++-------------- 3 files changed, 101 insertions(+), 46 deletions(-) diff --git a/cytotable/convert.py b/cytotable/convert.py index c44beb3..ccca171 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -43,10 +43,7 @@ def _get_table_columns_and_types( list of dictionaries which each include column level information """ - import pathlib - import duckdb - from cloudpathlib import AnyPath from cytotable.utils import _duckdb_reader, _sqlite_mixed_type_query_to_parquet @@ -102,6 +99,7 @@ def _get_table_columns_and_types( ) except duckdb.Error as e: + print(source["pagesets"]) # if we see a mismatched type error # run a more nuanced query through sqlite # to handle the mixed types @@ -206,7 +204,8 @@ def _get_table_keyset_pagination_sets( import logging import duckdb - from cloudpathlib import AnyPath, CloudPath + import sqlite3 + from contextlib import closing from cytotable.exceptions import NoInputDataException from cytotable.utils import _duckdb_reader @@ -225,8 +224,20 @@ def _get_table_keyset_pagination_sets( else: sql_query = f"SELECT {page_key} FROM sqlite_scan('{source_path}', '{table_name}') ORDER BY {page_key}" - keys = ddb_reader.execute(sql_query).fetchall() - keys = [key[0] for key in keys] + keys = [key[0] for key in ddb_reader.execute(sql_query).fetchall()] + + # exception case for when we have mixed types + # (i.e. integer col with string and ints) in a sqlite column + except duckdb.TypeMismatchException: + with closing(sqlite3.connect(source_path)) as cx: + with cx: + keys = [ + key[0] + for key in cx.execute( + f"SELECT {page_key} FROM {table_name} ORDER BY {page_key};" + ).fetchall() + if isinstance(key[0], (int, float)) + ] except ( duckdb.InvalidInputException, @@ -245,10 +256,25 @@ def _get_table_keyset_pagination_sets( keys = [key[0] for key in keys] # Create chunks of keys - chunks = [ - ((values := keys[i : i + chunk_size])[0], values[-1]) - for i in range(0, len(keys), chunk_size) - ] + chunks = [] + i = 0 + while i < len(keys): + start_key = keys[i] + end_index = min(i + chunk_size, len(keys)) - 1 + end_key = keys[end_index] + + # Ensure non-overlapping by incrementing the start of the next range + next_start_index = end_index + 1 + if next_start_index < len(keys): + next_start_key = keys[next_start_index] + while next_start_key == end_key and next_start_index + 1 < len(keys): + next_start_index += 1 + next_start_key = keys[next_start_index] + chunks.append((start_key, next_start_key - 1)) + else: + chunks.append((start_key, end_key)) + + i = next_start_index return chunks @@ -687,7 +713,6 @@ def _concat_source_group( def _prepare_join_sql( sources: Dict[str, List[Dict[str, Any]]], joins: str, - sort_output: bool, ) -> str: """ Prepare join SQL statement with actual locations of data based on the sources. @@ -727,7 +752,6 @@ def _prepare_join_sql( def _join_source_chunk( dest_path: str, joins: str, - chunk_size: int, page_key: str, pageset: Tuple[int, int], sort_output: bool, @@ -1062,6 +1086,16 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals # expand the destination path expanded_dest_path = _expand_path(path=dest_path) + # check that each source group name has a pagination key + for source_group_name in sources.keys(): + matching_keys = [ + key for key in page_keys.keys() if key.lower() in source_group_name.lower() + ] + if not matching_keys: + raise CytoTableException( + f"No matching key found in page_keys for source_group_name: {source_group_name}" + ) + # prepare pagesets for chunked data export from source tables pagesets_prepared = { source_group_name: [ @@ -1103,6 +1137,8 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals if len(source_group_vals) > 0 } + print(invalid_files_dropped) + # gather column names and types from source tables column_names_and_types_gathered = { source_group_name: [ @@ -1197,8 +1233,10 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals # evaluate the results as they're used multiple times below evaluated_results = evaluate_futures(results) + print(evaluated_results) + prepared_joins_sql = _prepare_join_sql( - sources=evaluated_results, joins=joins, sort_output=sort_output + sources=evaluated_results, joins=joins ).result() page_key_join = [ @@ -1214,7 +1252,6 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals # full concat results dest_path=expanded_dest_path, joins=prepared_joins_sql, - chunk_size=chunk_size, page_key=page_key_join, pageset=pageset, sort_output=sort_output, diff --git a/cytotable/utils.py b/cytotable/utils.py index 4f43a74..be209ef 100644 --- a/cytotable/utils.py +++ b/cytotable/utils.py @@ -271,14 +271,7 @@ def _sqlite_affinity_data_type_lookup(col_type: str) -> str: {', '.join(query_parts)} FROM {table_name} WHERE {page_key} BETWEEN {pageset[0]} AND {pageset[1]} - ORDER BY {page_key}; - """ - if sort_output - else f""" - SELECT - {', '.join(query_parts)} - FROM {table_name} - WHERE {page_key} BETWEEN {pageset[0]} AND {pageset[1]}; + {"ORDER BY " + page_key if sort_output else ""}; """ ) diff --git a/tests/test_convert.py b/tests/test_convert.py index ead14dc..71c073a 100644 --- a/tests/test_convert.py +++ b/tests/test_convert.py @@ -61,6 +61,7 @@ def test_config(): "CONFIG_CHUNK_SIZE", "CONFIG_JOINS", "CONFIG_SOURCE_VERSION", + "CONFIG_PAGE_KEYS", ] ) == sorted(config_preset.keys()) @@ -403,8 +404,7 @@ def test_prepare_join_sql( WHERE cells.Cells_ObjectNumber = cytoplasm.Cytoplasm_Parent_Cells AND nuclei.Nuclei_ObjectNumber = cytoplasm.Cytoplasm_Parent_Nuclei - """, - sort_output=True, + """ ).result() ).df() @@ -475,21 +475,23 @@ def test_join_source_chunk(load_parsl_default: None, fx_tempdir: str): FROM read_parquet('{test_path_a}') as example_a JOIN read_parquet('{test_path_b}') as example_b USING(id1, id2) """, - chunk_size=2, - offset=0, + page_key="id1", + pageset=(1, 2), drop_null=True, + sort_output=True, ).result() assert isinstance(result, str) result_table = parquet.read_table(source=result) + assert result_table.equals( other=pa.Table.from_pydict( { - "field1": ["foo", "bar"], - "field2": [True, False], - "id1": [1, 2], - "id2": ["a", "a"], + "field1": ["foo", "foo", "bar", "bar"], + "field2": [True, True, False, False], + "id1": [1, 1, 2, 2], + "id2": ["a", "b", "a", "b"], }, # use schema from result as a reference for col order schema=result_table.schema, @@ -629,6 +631,12 @@ def test_to_parquet( compartments=["cytoplasm", "cells", "nuclei"], metadata=["image"], identifying_columns=["imagenumber"], + page_keys={ + "image": "ImageNumber", + "cells": "Cells_ObjectNumber", + "nuclei": "Nuclei_ObjectNumber", + "cytoplasm": "Cytoplasm_ObjectNumber", + }, concat=False, join=False, joins=None, @@ -687,6 +695,12 @@ def test_to_parquet_unsorted( compartments=["cytoplasm", "cells", "nuclei"], metadata=["image"], identifying_columns=["imagenumber"], + page_keys={ + "image": "ImageNumber", + "cells": "Cells_ObjectNumber", + "nuclei": "Nuclei_ObjectNumber", + "cytoplasm": "Cytoplasm_ObjectNumber", + }, concat=False, join=False, joins=None, @@ -1048,6 +1062,21 @@ def test_convert_cellprofiler_sqlite_pycytominer_merge( # note: we cast into pycytominer_table's schema types in order to # properly perform comparisons as pycytominer and cytotable differ in their # datatyping implementations + + print(cytotable_table.to_pandas().drop_duplicates().shape) + + import pandas as pd + + pd.testing.assert_frame_equal( + pycytominer_table.to_pandas() + .sort_values(by=list(pycytominer_table.schema.names)) + .reset_index(drop=True), + cytotable_table.to_pandas() + .drop_duplicates() + .sort_values(by=list(pycytominer_table.schema.names)) + .reset_index(drop=True), + ) + assert pycytominer_table.schema.equals( cytotable_table.cast(target_schema=pycytominer_table.schema).schema ) @@ -1083,8 +1112,8 @@ def test_sqlite_mixed_type_query_to_parquet( table=_sqlite_mixed_type_query_to_parquet( source_path=example_sqlite_mixed_types_database, table_name=table_name, - chunk_size=2, - offset=0, + page_key="col_integer", + pageset=(1, 2), sort_output=True, ), where=result_filepath, @@ -1106,10 +1135,11 @@ def test_sqlite_mixed_type_query_to_parquet( ] # check the values per column assert parquet.read_table(source=result_filepath).to_pydict() == { - "col_integer": [None, 1], - "col_text": ["sample", "sample"], - "col_blob": [b"another_blob", b"sample_blob"], - "col_real": [None, 0.5], + # note: we drop None / NA values due to pagination keys + "col_integer": [1], + "col_text": ["sample"], + "col_blob": [b"sample_blob"], + "col_real": [0.5], } # run full convert on mixed type database @@ -1119,6 +1149,7 @@ def test_sqlite_mixed_type_query_to_parquet( dest_datatype="parquet", source_datatype="sqlite", compartments=[table_name], + page_keys={"tbl_a": "col_integer"}, join=False, ) @@ -1126,16 +1157,10 @@ def test_sqlite_mixed_type_query_to_parquet( assert parquet.read_table( source=result["Tbl_a.sqlite"][0]["table"][0] ).to_pydict() == { - "Tbl_a_col_integer": [None, 1], - "Tbl_a_col_text": ["sample", "sample"], - "Tbl_a_col_blob": [b"another_blob", b"sample_blob"], - "Tbl_a_col_real": [None, 0.5], - "cytotable_meta_source_path": [ - f"{pathlib.Path(fx_tempdir).resolve()}/example_mixed_types.sqlite_table_tbl_a", - f"{pathlib.Path(fx_tempdir).resolve()}/example_mixed_types.sqlite_table_tbl_a", - ], - "cytotable_meta_offset": [0, 0], - "cytotable_meta_rownum": [2, 1], + "Tbl_a_col_integer": [1], + "Tbl_a_col_text": ["sample"], + "Tbl_a_col_blob": [b"sample_blob"], + "Tbl_a_col_real": [0.5], } From 0d82ecb9e4ccdcd94cd419209c7c7cfd087bd3ab Mon Sep 17 00:00:00 2001 From: d33bs Date: Wed, 31 Jul 2024 20:05:05 -0600 Subject: [PATCH 05/27] linting corrections --- cytotable/convert.py | 8 ++++---- cytotable/utils.py | 4 +--- tests/test_convert.py | 14 +------------- 3 files changed, 6 insertions(+), 20 deletions(-) diff --git a/cytotable/convert.py b/cytotable/convert.py index ccca171..806c704 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -202,11 +202,11 @@ def _get_table_keyset_pagination_sets( """ import logging - - import duckdb import sqlite3 from contextlib import closing + import duckdb + from cytotable.exceptions import NoInputDataException from cytotable.utils import _duckdb_reader @@ -1295,7 +1295,7 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals infer_common_schema: bool = True, drop_null: bool = False, data_type_cast_map: Optional[Dict[str, str]] = None, - page_keys: Dict[str, str] = None, + page_keys: Optional[Dict[str, str]] = None, sort_output: bool = True, preset: Optional[str] = "cellprofiler_csv", parsl_config: Optional[parsl.Config] = None, @@ -1476,7 +1476,7 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals drop_null=drop_null, data_type_cast_map=data_type_cast_map, sort_output=sort_output, - page_keys=page_keys, + page_keys=cast(dict, page_keys), **kwargs, ) diff --git a/cytotable/utils.py b/cytotable/utils.py index be209ef..e02b07a 100644 --- a/cytotable/utils.py +++ b/cytotable/utils.py @@ -265,15 +265,13 @@ def _sqlite_affinity_data_type_lookup(col_type: str) -> str: ] # perform the select using the cases built above and using chunksize + offset - sql_stmt = ( - f""" + sql_stmt = f""" SELECT {', '.join(query_parts)} FROM {table_name} WHERE {page_key} BETWEEN {pageset[0]} AND {pageset[1]} {"ORDER BY " + page_key if sort_output else ""}; """ - ) # execute the sql stmt cursor.execute(sql_stmt) diff --git a/tests/test_convert.py b/tests/test_convert.py index 71c073a..bef4af8 100644 --- a/tests/test_convert.py +++ b/tests/test_convert.py @@ -404,7 +404,7 @@ def test_prepare_join_sql( WHERE cells.Cells_ObjectNumber = cytoplasm.Cytoplasm_Parent_Cells AND nuclei.Nuclei_ObjectNumber = cytoplasm.Cytoplasm_Parent_Nuclei - """ + """, ).result() ).df() @@ -1065,18 +1065,6 @@ def test_convert_cellprofiler_sqlite_pycytominer_merge( print(cytotable_table.to_pandas().drop_duplicates().shape) - import pandas as pd - - pd.testing.assert_frame_equal( - pycytominer_table.to_pandas() - .sort_values(by=list(pycytominer_table.schema.names)) - .reset_index(drop=True), - cytotable_table.to_pandas() - .drop_duplicates() - .sort_values(by=list(pycytominer_table.schema.names)) - .reset_index(drop=True), - ) - assert pycytominer_table.schema.equals( cytotable_table.cast(target_schema=pycytominer_table.schema).schema ) From e8e95657821b769068f7375e5b11189619f0d5e0 Mon Sep 17 00:00:00 2001 From: d33bs Date: Wed, 31 Jul 2024 20:07:35 -0600 Subject: [PATCH 06/27] run dev workflow --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index adc37c5..fb163a0 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -3,7 +3,7 @@ name: run tests on: push: - branches: [main] + branches: [main, pagination-changes] pull_request: branches: [main] schedule: From b80969559df30cd04f31502e0c428b649ae28e53 Mon Sep 17 00:00:00 2001 From: d33bs Date: Wed, 31 Jul 2024 20:10:18 -0600 Subject: [PATCH 07/27] fix docs --- docs/source/python-api.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/python-api.md b/docs/source/python-api.md index 7868f2c..a43ad6f 100644 --- a/docs/source/python-api.md +++ b/docs/source/python-api.md @@ -25,7 +25,7 @@ Convert | -.. autofunction:: _get_table_chunk_offsets +.. autofunction:: _get_table_keyset_pagination_sets | From c3bfea5e8512730101e2b1f781f7868f0c2bc10d Mon Sep 17 00:00:00 2001 From: d33bs Date: Wed, 31 Jul 2024 20:43:13 -0600 Subject: [PATCH 08/27] remove custom sql join for large data test --- tests/test_convert_threaded.py | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/tests/test_convert_threaded.py b/tests/test_convert_threaded.py index baac08b..b7b23f2 100644 --- a/tests/test_convert_threaded.py +++ b/tests/test_convert_threaded.py @@ -102,31 +102,6 @@ def test_convert_s3_path_sqlite_join( # sequential s3 SQLite files. See below for more information # https://cloudpathlib.drivendata.org/stable/caching/#automatically local_cache_dir=f"{fx_tempdir}/sqlite_s3_cache/2", - # note: we use a custom join to limit the - # data processing required within the context - # of GitHub Actions runner image resources. - joins=""" - SELECT - image.Image_TableNumber, - image.Metadata_ImageNumber, - image.Metadata_Plate, - image.Metadata_Well, - image.Image_Metadata_Site, - image.Image_Metadata_Row, - cytoplasm.* EXCLUDE (Metadata_ImageNumber), - cells.* EXCLUDE (Metadata_ImageNumber), - nuclei.* EXCLUDE (Metadata_ImageNumber) - FROM - (SELECT * FROM read_parquet('cytoplasm.parquet') LIMIT 5000) AS cytoplasm - LEFT JOIN (SELECT * FROM read_parquet('cells.parquet') LIMIT 5000) AS cells ON - cells.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber - AND cells.Metadata_ObjectNumber = cytoplasm.Cytoplasm_Parent_Cells - LEFT JOIN (SELECT * FROM read_parquet('nuclei.parquet') LIMIT 5000) AS nuclei ON - nuclei.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber - AND nuclei.Metadata_ObjectNumber = cytoplasm.Cytoplasm_Parent_Nuclei - LEFT JOIN (SELECT * FROM read_parquet('image.parquet') LIMIT 5000) AS image ON - image.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber - """, ) # read only the metadata from parquet file From 1e391c2ca97c5c9020447d7d4e3a9bf2a6656313 Mon Sep 17 00:00:00 2001 From: d33bs Date: Thu, 1 Aug 2024 07:35:20 -0600 Subject: [PATCH 09/27] test with rowcount update --- tests/test_convert_threaded.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_convert_threaded.py b/tests/test_convert_threaded.py index b7b23f2..b513562 100644 --- a/tests/test_convert_threaded.py +++ b/tests/test_convert_threaded.py @@ -108,10 +108,10 @@ def test_convert_s3_path_sqlite_join( parquet_file_meta = parquet.ParquetFile(s3_result).metadata # check the shape of the data - assert (parquet_file_meta.num_rows, parquet_file_meta.num_columns) == (5000, 5928) + assert (parquet_file_meta.num_rows, parquet_file_meta.num_columns) == (74226, 5928) # check that dropping duplicates results in the same shape - assert pd.read_parquet(s3_result).drop_duplicates().shape == (5000, 5928) + assert pd.read_parquet(s3_result).drop_duplicates().shape == (74226, 5928) def test_get_source_filepaths( From c2aaadbbb25aa37a40a16bcb2046f215fff6dc48 Mon Sep 17 00:00:00 2001 From: d33bs Date: Thu, 1 Aug 2024 09:13:09 -0600 Subject: [PATCH 10/27] linting, docs, cleanup --- cytotable/constants.py | 7 ------- cytotable/convert.py | 38 +++++++++++--------------------------- cytotable/utils.py | 4 ++-- tests/conftest.py | 9 +-------- tests/test_convert.py | 7 +------ 5 files changed, 15 insertions(+), 50 deletions(-) diff --git a/cytotable/constants.py b/cytotable/constants.py index 2fb3182..591b323 100644 --- a/cytotable/constants.py +++ b/cytotable/constants.py @@ -68,13 +68,6 @@ ], } -# metadata column names and types for internal use within CytoTable -CYOTABLE_META_COLUMN_TYPES = { - "cytotable_meta_source_path": "VARCHAR", - "cytotable_meta_offset": "BIGINT", - "cytotable_meta_rownum": "BIGINT", -} - CYTOTABLE_DEFAULT_PARQUET_METADATA = { "data-producer": "https://github.com/cytomining/CytoTable", "data-producer-version": str(_get_cytotable_version()), diff --git a/cytotable/convert.py b/cytotable/convert.py index 806c704..bbcfd58 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -86,7 +86,7 @@ def _get_table_columns_and_types( # with exception handling to read mixed-type data # using sqlite3 and special utility function try: - # isolate using new connection to read data with chunk size + offset + # isolate using new connection to read data based on pageset # and export directly to parquet via duckdb (avoiding need to return data to python) # perform the query and create a list of dictionaries with the column data for table with _duckdb_reader() as ddb_reader: @@ -283,7 +283,6 @@ def _get_table_keyset_pagination_sets( def _source_chunk_to_parquet( source_group_name: str, source: Dict[str, Any], - chunk_size: int, pageset: Tuple[int, int], dest_path: str, sort_output: bool, @@ -297,10 +296,8 @@ def _source_chunk_to_parquet( source: Dict[str, Any] Contains the source data to be chunked. Represents a single file or table of some kind along with collected information about table. - chunk_size: int - Row count to use for chunked output. - page_keys: Dict[str, str] - ... + page_keys: str: + The table and column names to be used for key pagination. pageset: Tuple[int, int] The pageset for chunking the data from source. dest_path: str @@ -318,7 +315,6 @@ def _source_chunk_to_parquet( import duckdb from cloudpathlib import AnyPath - from cytotable.constants import CYOTABLE_META_COLUMN_TYPES from cytotable.utils import ( _duckdb_reader, _sqlite_mixed_type_query_to_parquet, @@ -332,12 +328,6 @@ def _source_chunk_to_parquet( ) pathlib.Path(source_dest_path).mkdir(parents=True, exist_ok=True) - source_path_str = ( - source["source_path"] - if "table_name" not in source.keys() - else f"{source['source_path']}_table_{source['table_name']}" - ) - # add source table columns casted_source_cols = [ # here we cast the column to the specified type ensure the colname remains the same @@ -451,10 +441,7 @@ def _prepend_column_name( import pyarrow.parquet as parquet - from cytotable.constants import ( - CYOTABLE_META_COLUMN_TYPES, - CYTOTABLE_ARROW_USE_MEMORY_MAPPING, - ) + from cytotable.constants import CYTOTABLE_ARROW_USE_MEMORY_MAPPING from cytotable.utils import _write_parquet_table_with_metadata logger = logging.getLogger(__name__) @@ -502,10 +489,8 @@ def _prepend_column_name( # source_group_name_stem: 'Cells' # column_name: 'AreaShape_Area' # updated_column_name: 'Cells_AreaShape_Area' - if ( - column_name not in identifying_columns - and not column_name.startswith(source_group_name_stem.capitalize()) - and column_name not in CYOTABLE_META_COLUMN_TYPES + if column_name not in identifying_columns and not column_name.startswith( + source_group_name_stem.capitalize() ): updated_column_names.append(f"{source_group_name_stem}_{column_name}") # if-condition for prepending 'Metadata_' to column name @@ -630,7 +615,6 @@ def _concat_source_group( CYTOTABLE_DEFAULT_PARQUET_METADATA, ) from cytotable.exceptions import SchemaException - from cytotable.utils import _write_parquet_table_with_metadata # build a result placeholder concatted: List[Dict[str, Any]] = [ @@ -868,7 +852,6 @@ def _concat_join_sources( CYTOTABLE_ARROW_USE_MEMORY_MAPPING, CYTOTABLE_DEFAULT_PARQUET_METADATA, ) - from cytotable.utils import _write_parquet_table_with_metadata # remove the unjoined concatted compartments to prepare final dest_path usage # (we now have joined results) @@ -1121,11 +1104,11 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals for source_group_name, source_group_vals in sources.items() } - # if offsets is none and we haven't halted, remove the file as there + # if pagesets is none and we haven't halted, remove the file as there # were input formatting errors which will create challenges downstream invalid_files_dropped = { source_group_name: [ - # ensure we have offsets + # ensure we have pagesets source for source in source_group_vals if source["pagesets"] is not None @@ -1166,11 +1149,10 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals "table": [ # perform column renaming and create potential return result _prepend_column_name( - # perform chunked data export to parquet using offsets + # perform chunked data export to parquet using pagesets table_path=_source_chunk_to_parquet( source_group_name=source_group_name, source=source, - chunk_size=chunk_size, pageset=pageset, dest_path=expanded_dest_path, sort_output=sort_output, @@ -1344,6 +1326,8 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals A dictionary mapping data type groups to specific types. Roughly includes Arrow data types language from: https://arrow.apache.org/docs/python/api/datatypes.html + page_keys: str: + The table and column names to be used for key pagination. sort_output: bool (Default value = True) Specifies whether to sort cytotable output or not. drop_null: bool (Default value = False) diff --git a/cytotable/utils.py b/cytotable/utils.py index e02b07a..e75f1f7 100644 --- a/cytotable/utils.py +++ b/cytotable/utils.py @@ -187,8 +187,8 @@ def _sqlite_mixed_type_query_to_parquet( A str which is a path to a SQLite database file. table_name: str: The name of the table being queried. - page_key: str, - ... + page_key: str: + The column name to be used as the key for pagination. pageset: Tuple[int, int]: The pageset for chunking the data from source. sort_output: bool diff --git a/tests/conftest.py b/tests/conftest.py index 43d1fbe..6045ace 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -22,7 +22,6 @@ from pycytominer.cyto_utils.cells import SingleCells from sqlalchemy.util import deprecations -from cytotable.constants import CYOTABLE_META_COLUMN_TYPES from cytotable.utils import _column_sort, _default_parsl_config, _parsl_loaded # filters sqlalchemy 2.0 uber warning @@ -324,13 +323,7 @@ def fixture_example_local_sources( csv.write_csv( # we remove simulated cytotable metadata columns to be more realistic # (incoming sources would not usually contain these) - table.select( - [ - column - for column in table.column_names - if column not in CYOTABLE_META_COLUMN_TYPES - ] - ), + table.select(list(table.column_names)), f"{fx_tempdir}/example/{number}/{name}.csv", ) # write example output diff --git a/tests/test_convert.py b/tests/test_convert.py index bef4af8..9eb1f44 100644 --- a/tests/test_convert.py +++ b/tests/test_convert.py @@ -21,7 +21,6 @@ from pyarrow import csv, parquet from pycytominer.cyto_utils.cells import SingleCells -from cytotable.constants import CYOTABLE_META_COLUMN_TYPES from cytotable.convert import ( _concat_join_sources, _concat_source_group, @@ -1262,11 +1261,7 @@ def test_in_carta_to_parquet( # drop cytotable metadata columns for comparisons (example sources won't contain these) cytotable_result_table = cytotable_result_table.select( - [ - column - for column in cytotable_result_table.column_names - if column not in CYOTABLE_META_COLUMN_TYPES - ] + list(cytotable_result_table.column_names) ) # check the data against one another From df677f4a0df046c45d44fbd5cde27f15ef1af380 Mon Sep 17 00:00:00 2001 From: d33bs Date: Thu, 1 Aug 2024 10:53:29 -0600 Subject: [PATCH 11/27] cleanup --- cytotable/convert.py | 5 ----- tests/test_convert.py | 2 -- 2 files changed, 7 deletions(-) diff --git a/cytotable/convert.py b/cytotable/convert.py index bbcfd58..b636085 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -99,7 +99,6 @@ def _get_table_columns_and_types( ) except duckdb.Error as e: - print(source["pagesets"]) # if we see a mismatched type error # run a more nuanced query through sqlite # to handle the mixed types @@ -1120,8 +1119,6 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals if len(source_group_vals) > 0 } - print(invalid_files_dropped) - # gather column names and types from source tables column_names_and_types_gathered = { source_group_name: [ @@ -1215,8 +1212,6 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals # evaluate the results as they're used multiple times below evaluated_results = evaluate_futures(results) - print(evaluated_results) - prepared_joins_sql = _prepare_join_sql( sources=evaluated_results, joins=joins ).result() diff --git a/tests/test_convert.py b/tests/test_convert.py index 9eb1f44..3d14c32 100644 --- a/tests/test_convert.py +++ b/tests/test_convert.py @@ -1062,8 +1062,6 @@ def test_convert_cellprofiler_sqlite_pycytominer_merge( # properly perform comparisons as pycytominer and cytotable differ in their # datatyping implementations - print(cytotable_table.to_pandas().drop_duplicates().shape) - assert pycytominer_table.schema.equals( cytotable_table.cast(target_schema=pycytominer_table.schema).schema ) From 0501350ac5e9829d72759b58ad7ef03e9373b01c Mon Sep 17 00:00:00 2001 From: d33bs Date: Thu, 1 Aug 2024 10:53:47 -0600 Subject: [PATCH 12/27] remove test branch --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index fb163a0..adc37c5 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -3,7 +3,7 @@ name: run tests on: push: - branches: [main, pagination-changes] + branches: [main] pull_request: branches: [main] schedule: From 6fbf1b2d1f4589666fbe55413c5a775c8eb5b7dd Mon Sep 17 00:00:00 2001 From: d33bs Date: Fri, 2 Aug 2024 13:08:10 -0600 Subject: [PATCH 13/27] rework pageset generation --- cytotable/convert.py | 25 +----------- cytotable/utils.py | 53 +++++++++++++++++++++++++ tests/test_utils.py | 92 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 147 insertions(+), 23 deletions(-) create mode 100644 tests/test_utils.py diff --git a/cytotable/convert.py b/cytotable/convert.py index b636085..f7e8d04 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -207,7 +207,7 @@ def _get_table_keyset_pagination_sets( import duckdb from cytotable.exceptions import NoInputDataException - from cytotable.utils import _duckdb_reader + from cytotable.utils import _duckdb_reader, _generate_pagesets logger = logging.getLogger(__name__) @@ -254,28 +254,7 @@ def _get_table_keyset_pagination_sets( keys = ddb_reader.execute(sql_query).fetchall() keys = [key[0] for key in keys] - # Create chunks of keys - chunks = [] - i = 0 - while i < len(keys): - start_key = keys[i] - end_index = min(i + chunk_size, len(keys)) - 1 - end_key = keys[end_index] - - # Ensure non-overlapping by incrementing the start of the next range - next_start_index = end_index + 1 - if next_start_index < len(keys): - next_start_key = keys[next_start_index] - while next_start_key == end_key and next_start_index + 1 < len(keys): - next_start_index += 1 - next_start_key = keys[next_start_index] - chunks.append((start_key, next_start_key - 1)) - else: - chunks.append((start_key, end_key)) - - i = next_start_index - - return chunks + return _generate_pagesets(keys, chunk_size) @python_app diff --git a/cytotable/utils.py b/cytotable/utils.py index e75f1f7..3715bc6 100644 --- a/cytotable/utils.py +++ b/cytotable/utils.py @@ -17,6 +17,8 @@ from parsl.errors import NoDataFlowKernelError from parsl.executors import HighThroughputExecutor +from cytotable.exceptions import CytoTableException + logger = logging.getLogger(__name__) # reference the original init @@ -568,3 +570,54 @@ def evaluate_futures(sources: Union[Dict[str, List[Dict[str, Any]]], str]) -> An if isinstance(sources, dict) else _unwrap_value(sources) ) + + +def _generate_pagesets( + keys: List[Union[int, float]], chunk_size: int +) -> List[Tuple[Union[int, float], Union[int, float]]]: + """ + Generate a pageset (keyset pagination) from a list of keys. + + Parameters: + keys List[Union[int, float]]: + List of keys to paginate. + chunk_size int: + Size of each chunk/page. + + Returns: + List[Tuple[Union[int, float], Union[int, float]]]: + List of (start_key, end_key) tuples representing each page. + """ + + # if we have no keys, raise an exception + if not keys: + raise CytoTableException("No keys were provided to pageset generation process.") + + # Initialize an empty list to store the chunks/pages + chunks = [] + + # Start index for iteration through the keys + i = 0 + + while i < len(keys): + # Get the start key for the current chunk + start_key = keys[i] + + # Calculate the end index for the current chunk + end_index = min(i + chunk_size, len(keys)) - 1 + + # Get the end key for the current chunk + end_key = keys[end_index] + + # Ensure non-overlapping by incrementing the start of the next range if there are duplicates + while end_index + 1 < len(keys) and keys[end_index + 1] == end_key: + end_index += 1 + + # Append the current chunk (start_key, end_key) to the list of chunks + chunks.append((start_key, end_key)) + + # Update the index to start from the next chunk + i = end_index + 1 + + # Return the list of chunks/pages + return chunks diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..2bad3bc --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,92 @@ +""" +Testing CytoTable utility functions found within util.py +""" + +import pytest +from cytotable.utils import _generate_pagesets +from cytotable.exceptions import CytoTableException + + +def test_generate_pageset(): + """ + Test the generate_pageset function with various scenarios. + """ + + # Test case with a single element + keys = [1] + chunk_size = 3 + expected = [(1, 1)] + assert _generate_pagesets(keys, chunk_size) == expected + + # Test case when chunk size is larger than the list + keys = [1, 2, 3] + chunk_size = 10 + expected = [(1, 3)] + assert _generate_pagesets(keys, chunk_size) == expected + + # Test case with all elements being the same + keys = [1, 1, 1, 1, 1] + chunk_size = 2 + expected = [(1, 1)] + assert _generate_pagesets(keys, chunk_size) == expected + + # Test case with one duplicate of chunk size and others + keys = [1, 1, 1, 2, 3, 4] + chunk_size = 3 + expected = [(1, 1), (2, 4)] + assert _generate_pagesets(keys, chunk_size) == expected + + # Test case with a chunk size of one + keys = [1, 2, 3, 4, 5] + chunk_size = 1 + expected = [(1, 1), (2, 2), (3, 3), (4, 4), (5, 5)] + assert _generate_pagesets(keys, chunk_size) == expected + + # Test case with no duplicates + keys = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + chunk_size = 3 + expected = [(1, 3), (4, 6), (7, 9), (10, 10)] + assert _generate_pagesets(keys, chunk_size) == expected + + # Test case with non-continuous keys + keys = [1, 3, 5, 7, 9, 12, 14] + chunk_size = 2 + expected = [(1, 3), (5, 7), (9, 12), (14, 14)] + assert _generate_pagesets(keys, chunk_size) == expected + + # Test case with inconsistent duplicates + keys = [1, 1, 3, 4, 5, 5, 8, 8, 8] + chunk_size = 3 + expected = [(1, 3), (4, 5), (8, 8)] + assert _generate_pagesets(keys, chunk_size) == expected + + # Bigger test case with inconsistent duplicates + keys = [1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9, 9, 10, 10, 12, 12, 12] + chunk_size = 3 + expected = [(1, 2), (3, 4), (5, 6), (7, 8), (9, 10), (12, 12)] + assert _generate_pagesets(keys, chunk_size) == expected + + # Float test case with no duplicates + keys = [1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9, 10.1] + chunk_size = 3 + expected = [(1.1, 3.3), (4.4, 6.6), (7.7, 9.9), (10.1, 10.1)] + assert _generate_pagesets(keys, chunk_size) == expected + + # Float test case with non-continuous float keys + keys = [1.1, 3.3, 5.5, 7.7, 9.9, 12.12, 14.14] + chunk_size = 2 + expected = [(1.1, 3.3), (5.5, 7.7), (9.9, 12.12), (14.14, 14.14)] + assert _generate_pagesets(keys, chunk_size) == expected + + # Float test case with inconsistent duplicates + keys = [1.1, 1.1, 3.3, 4.4, 5.5, 5.5, 8.8, 8.8, 8.8] + chunk_size = 3 + expected = [(1.1, 3.3), (4.4, 5.5), (8.8, 8.8)] + assert _generate_pagesets(keys, chunk_size) == expected + + # Test case with an empty list + keys = [] + chunk_size = 3 + expected = [] + with pytest.raises(CytoTableException): + _generate_pagesets(keys, chunk_size) From 2517df6c04e8f2afd1dee7ac66fb21da67564428 Mon Sep 17 00:00:00 2001 From: d33bs Date: Fri, 2 Aug 2024 13:09:24 -0600 Subject: [PATCH 14/27] linting --- tests/test_utils.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/test_utils.py b/tests/test_utils.py index 2bad3bc..a30f2b4 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -3,11 +3,12 @@ """ import pytest -from cytotable.utils import _generate_pagesets + from cytotable.exceptions import CytoTableException +from cytotable.utils import _generate_pagesets -def test_generate_pageset(): +def test_generate_pageset(): # pylint: disable=too-many-statements """ Test the generate_pageset function with various scenarios. """ From 9001152f4853953b3fc75508c1924c22348f87c0 Mon Sep 17 00:00:00 2001 From: d33bs Date: Fri, 2 Aug 2024 13:16:08 -0600 Subject: [PATCH 15/27] fix types and docstring Co-Authored-By: Faisal Alquaddoomi --- cytotable/convert.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/cytotable/convert.py b/cytotable/convert.py index f7e8d04..418bf03 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -180,7 +180,7 @@ def _get_table_keyset_pagination_sets( page_key: str, source: Optional[Dict[str, Any]] = None, sql_stmt: Optional[str] = None, -) -> Union[List[Any], None]: +) -> Union[List[Tuple[Union[int, float], Union[int, float]]], None]: """ Get table data chunk keys for later use in capturing segments of values. This work also provides a chance to catch problematic @@ -261,7 +261,7 @@ def _get_table_keyset_pagination_sets( def _source_chunk_to_parquet( source_group_name: str, source: Dict[str, Any], - pageset: Tuple[int, int], + pageset: Tuple[Union[int, float], Union[int, float]], dest_path: str, sort_output: bool, ) -> str: @@ -274,8 +274,6 @@ def _source_chunk_to_parquet( source: Dict[str, Any] Contains the source data to be chunked. Represents a single file or table of some kind along with collected information about table. - page_keys: str: - The table and column names to be used for key pagination. pageset: Tuple[int, int] The pageset for chunking the data from source. dest_path: str From 2deef1e57d276adbb41c3ff08fa21b220a3c3bf3 Mon Sep 17 00:00:00 2001 From: d33bs Date: Fri, 2 Aug 2024 13:55:54 -0600 Subject: [PATCH 16/27] docs and types --- cytotable/convert.py | 9 +++++++++ cytotable/presets.py | 18 ++++++++++++++++++ cytotable/utils.py | 2 +- docs/source/overview.md | 13 +++++++++++++ 4 files changed, 41 insertions(+), 1 deletion(-) diff --git a/cytotable/convert.py b/cytotable/convert.py index 418bf03..64ed46e 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -194,6 +194,11 @@ def _get_table_keyset_pagination_sets( The size in rowcount of the chunks to create. page_key: str The column name to be used as the key for pagination. + Expected to be of numeric type (int, float) for ordering. + sql_stmt: + Optional sql statement to form the pagination set from. + Default behavior extracts pagination sets from the full + data source. Returns: List[Any] @@ -1300,6 +1305,10 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals https://arrow.apache.org/docs/python/api/datatypes.html page_keys: str: The table and column names to be used for key pagination. + Uses the form: {"table_name":"column_name"}. + Expects columns to include numeric data (ints or floats). + Interacts with the `chunk_size` parameter to form + pages of `chunk_size`. sort_output: bool (Default value = True) Specifies whether to sort cytotable output or not. drop_null: bool (Default value = False) diff --git a/cytotable/presets.py b/cytotable/presets.py index 02225ee..efaa05c 100644 --- a/cytotable/presets.py +++ b/cytotable/presets.py @@ -22,6 +22,9 @@ "Parent_Cells", "Parent_Nuclei", ), + # pagination keys for use with this data + # of the rough format "table" -> "column". + # note: page keys are expected to be numeric (int, float) "CONFIG_PAGE_KEYS": { "image": "ImageNumber", "cells": "ObjectNumber", @@ -68,6 +71,9 @@ "Parent_Cells", "Parent_Nuclei", ), + # pagination keys for use with this data + # of the rough format "table" -> "column". + # note: page keys are expected to be numeric (int, float) "CONFIG_PAGE_KEYS": { "image": "ImageNumber", "cells": "Cells_Number_Object_Number", @@ -118,6 +124,9 @@ "Parent_Cells", "Parent_Nuclei", ), + # pagination keys for use with this data + # of the rough format "table" -> "column". + # note: page keys are expected to be numeric (int, float) "CONFIG_PAGE_KEYS": { "image": "ImageNumber", "cells": "ObjectNumber", @@ -176,6 +185,9 @@ "Cells_Number_Object_Number", "Nuclei_Number_Object_Number", ), + # pagination keys for use with this data + # of the rough format "table" -> "column". + # note: page keys are expected to be numeric (int, float) "CONFIG_PAGE_KEYS": { "image": "ImageNumber", "cells": "Cells_Number_Object_Number", @@ -231,6 +243,9 @@ "Cells_ObjectNumber", "Nuclei_ObjectNumber", ), + # pagination keys for use with this data + # of the rough format "table" -> "column". + # note: page keys are expected to be numeric (int, float) "CONFIG_PAGE_KEYS": { "image": "ImageNumber", "cells": "ObjectNumber", @@ -283,6 +298,9 @@ "Z", "T", ), + # pagination keys for use with this data + # of the rough format "table" -> "column". + # note: page keys are expected to be numeric (int, float) "CONFIG_PAGE_KEYS": { "test": '"OBJECT ID"', }, diff --git a/cytotable/utils.py b/cytotable/utils.py index 3715bc6..cf756dd 100644 --- a/cytotable/utils.py +++ b/cytotable/utils.py @@ -176,7 +176,7 @@ def _sqlite_mixed_type_query_to_parquet( source_path: str, table_name: str, page_key: str, - pageset: Tuple[int, int], + pageset: Tuple[Union[int, float], Union[int, float]], sort_output: bool, ) -> str: """ diff --git a/docs/source/overview.md b/docs/source/overview.md index 5aacc2e..609834d 100644 --- a/docs/source/overview.md +++ b/docs/source/overview.md @@ -286,3 +286,16 @@ Also see CytoTable's presets found here: :data:`presets.config `). +The ``page_keys`` parameter expects a dictionary where the keys are names of tables and values which are columns to be used for the keyset pagination pages. +Pagination is created in conjunction with the ``chunk_size`` parameter which indicates the size of each page. +``` From 73f9ac47b9beb23842cbf68e367aa0e29c0ad720 Mon Sep 17 00:00:00 2001 From: d33bs Date: Thu, 8 Aug 2024 10:37:27 -0600 Subject: [PATCH 17/27] update to use pageset instead of chunk fxn names Co-Authored-By: Jenna Tomkinson <107513215+jenna-tomkinson@users.noreply.github.com> --- cytotable/convert.py | 10 +++++----- docs/source/python-api.md | 4 ++-- tests/test_convert.py | 8 ++++---- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/cytotable/convert.py b/cytotable/convert.py index 64ed46e..6691c04 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -33,7 +33,7 @@ def _get_table_columns_and_types( Args: source: Dict[str, Any] - Contains the source data to be chunked. Represents a single + Contains source data details. Represents a single file or table of some kind. sort_output: Specifies whether to sort cytotable output or not. @@ -263,7 +263,7 @@ def _get_table_keyset_pagination_sets( @python_app -def _source_chunk_to_parquet( +def _source_pageset_to_parquet( source_group_name: str, source: Dict[str, Any], pageset: Tuple[Union[int, float], Union[int, float]], @@ -714,7 +714,7 @@ def _prepare_join_sql( @python_app -def _join_source_chunk( +def _join_source_pageset( dest_path: str, joins: str, page_key: str, @@ -1129,7 +1129,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals # perform column renaming and create potential return result _prepend_column_name( # perform chunked data export to parquet using pagesets - table_path=_source_chunk_to_parquet( + table_path=_source_pageset_to_parquet( source_group_name=source_group_name, source=source, pageset=pageset, @@ -1205,7 +1205,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals # map joined results based on the join groups gathered above # note: after mapping we end up with a list of strings (task returns str) join_sources_result = [ - _join_source_chunk( + _join_source_pageset( # gather the result of concatted sources prior to # join group merging as each mapped task run will need # full concat results diff --git a/docs/source/python-api.md b/docs/source/python-api.md index a43ad6f..ab1fc62 100644 --- a/docs/source/python-api.md +++ b/docs/source/python-api.md @@ -33,7 +33,7 @@ Convert | -.. autofunction:: _join_source_chunk +.. autofunction:: _join_source_pageset | @@ -49,7 +49,7 @@ Convert | -.. autofunction:: _source_chunk_to_parquet +.. autofunction:: _source_pageset_to_parquet | diff --git a/tests/test_convert.py b/tests/test_convert.py index 3d14c32..bb823ee 100644 --- a/tests/test_convert.py +++ b/tests/test_convert.py @@ -25,7 +25,7 @@ _concat_join_sources, _concat_source_group, _infer_source_group_common_schema, - _join_source_chunk, + _join_source_pageset, _prepare_join_sql, _prepend_column_name, _to_parquet, @@ -434,9 +434,9 @@ def test_prepare_join_sql( } -def test_join_source_chunk(load_parsl_default: None, fx_tempdir: str): +def test_join_source_pageset(load_parsl_default: None, fx_tempdir: str): """ - Tests _join_source_chunk + Tests _join_source_pageset """ # form test path a @@ -467,7 +467,7 @@ def test_join_source_chunk(load_parsl_default: None, fx_tempdir: str): where=test_path_b, ) - result = _join_source_chunk( + result = _join_source_pageset( dest_path=f"{fx_tempdir}/destination.parquet", joins=f""" SELECT * From b59aac58526aac13537d30aa0b814d0d5705ed6e Mon Sep 17 00:00:00 2001 From: d33bs Date: Mon, 26 Aug 2024 08:45:48 -0600 Subject: [PATCH 18/27] update variable names Co-Authored-By: Gregory Way --- cytotable/convert.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cytotable/convert.py b/cytotable/convert.py index 6691c04..4308dd9 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -228,14 +228,14 @@ def _get_table_keyset_pagination_sets( else: sql_query = f"SELECT {page_key} FROM sqlite_scan('{source_path}', '{table_name}') ORDER BY {page_key}" - keys = [key[0] for key in ddb_reader.execute(sql_query).fetchall()] + page_keys = [results[0] for results in ddb_reader.execute(sql_query).fetchall()] # exception case for when we have mixed types # (i.e. integer col with string and ints) in a sqlite column except duckdb.TypeMismatchException: with closing(sqlite3.connect(source_path)) as cx: with cx: - keys = [ + page_keys = [ key[0] for key in cx.execute( f"SELECT {page_key} FROM {table_name} ORDER BY {page_key};" @@ -256,10 +256,10 @@ def _get_table_keyset_pagination_sets( elif sql_stmt is not None: with _duckdb_reader() as ddb_reader: sql_query = f"SELECT {page_key} FROM ({sql_stmt}) ORDER BY {page_key}" - keys = ddb_reader.execute(sql_query).fetchall() - keys = [key[0] for key in keys] + page_keys = ddb_reader.execute(sql_query).fetchall() + page_keys = [key[0] for key in page_keys] - return _generate_pagesets(keys, chunk_size) + return _generate_pagesets(page_keys, chunk_size) @python_app From c04ebf32990dbc9111e80c6ee042530cce3288f0 Mon Sep 17 00:00:00 2001 From: d33bs Date: Mon, 26 Aug 2024 10:14:32 -0600 Subject: [PATCH 19/27] better exception docs --- cytotable/convert.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/cytotable/convert.py b/cytotable/convert.py index 4308dd9..c3d399f 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -1057,7 +1057,8 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals ] if not matching_keys: raise CytoTableException( - f"No matching key found in page_keys for source_group_name: {source_group_name}" + f"No matching key found in page_keys for source_group_name: {source_group_name}." + "Please include a pagination key based on a column name from the table." ) # prepare pagesets for chunked data export from source tables @@ -1420,7 +1421,10 @@ def convert( # pylint: disable=too-many-arguments,too-many-locals raise CytoTableException( ( "When using join=True one must pass a 'join' pagination key " - "in the page_keys parameter." + "in the page_keys parameter. The 'join' pagination key is a column " + "name found within the joined results based on the SQL provided from " + "the joins parameter. This special key is required as not all columns " + "from the source tables might not be included." ) ) From d0c68922656fd7b031694a14681e4deaf4786bc6 Mon Sep 17 00:00:00 2001 From: d33bs Date: Mon, 26 Aug 2024 10:17:25 -0600 Subject: [PATCH 20/27] better docstring parameter description Co-Authored-By: Gregory Way --- cytotable/convert.py | 2 +- cytotable/utils.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cytotable/convert.py b/cytotable/convert.py index c3d399f..1e3ed8c 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -193,7 +193,7 @@ def _get_table_keyset_pagination_sets( chunk_size: int The size in rowcount of the chunks to create. page_key: str - The column name to be used as the key for pagination. + The column name to be used to identify pagination chunks. Expected to be of numeric type (int, float) for ordering. sql_stmt: Optional sql statement to form the pagination set from. diff --git a/cytotable/utils.py b/cytotable/utils.py index cf756dd..eff96b4 100644 --- a/cytotable/utils.py +++ b/cytotable/utils.py @@ -190,7 +190,7 @@ def _sqlite_mixed_type_query_to_parquet( table_name: str: The name of the table being queried. page_key: str: - The column name to be used as the key for pagination. + The column name to be used to identify pagination chunks. pageset: Tuple[int, int]: The pageset for chunking the data from source. sort_output: bool From e9681f73a07115d14157439505d8ba4349b52efd Mon Sep 17 00:00:00 2001 From: d33bs Date: Mon, 26 Aug 2024 10:23:15 -0600 Subject: [PATCH 21/27] better documentation and exception msg --- cytotable/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cytotable/utils.py b/cytotable/utils.py index eff96b4..429a669 100644 --- a/cytotable/utils.py +++ b/cytotable/utils.py @@ -192,7 +192,7 @@ def _sqlite_mixed_type_query_to_parquet( page_key: str: The column name to be used to identify pagination chunks. pageset: Tuple[int, int]: - The pageset for chunking the data from source. + The range for values used for paginating data from source. sort_output: bool Specifies whether to sort cytotable output or not. add_cytotable_meta: bool, default=False: @@ -591,7 +591,7 @@ def _generate_pagesets( # if we have no keys, raise an exception if not keys: - raise CytoTableException("No keys were provided to pageset generation process.") + raise CytoTableException("Keys were not provided to the pageset generation process.") # Initialize an empty list to store the chunks/pages chunks = [] From 85678e2cde8d6a824690fbcd05e77550ef769185 Mon Sep 17 00:00:00 2001 From: d33bs Date: Mon, 26 Aug 2024 10:24:57 -0600 Subject: [PATCH 22/27] remove exception --- cytotable/utils.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/cytotable/utils.py b/cytotable/utils.py index 429a669..09ca82e 100644 --- a/cytotable/utils.py +++ b/cytotable/utils.py @@ -589,10 +589,6 @@ def _generate_pagesets( List of (start_key, end_key) tuples representing each page. """ - # if we have no keys, raise an exception - if not keys: - raise CytoTableException("Keys were not provided to the pageset generation process.") - # Initialize an empty list to store the chunks/pages chunks = [] From 178149c4b4419b23c80ab1d138c5dc29c3ad66dc Mon Sep 17 00:00:00 2001 From: d33bs Date: Mon, 26 Aug 2024 10:27:40 -0600 Subject: [PATCH 23/27] point to the example in docs Co-Authored-By: Gregory Way --- docs/source/overview.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/overview.md b/docs/source/overview.md index 609834d..cc99e66 100644 --- a/docs/source/overview.md +++ b/docs/source/overview.md @@ -291,7 +291,7 @@ Within CytoTable, we opt to describe these operations with "join" to avoid confu CytoTable uses keyset pagination to help manage system-specific reasonable memory usage when working with large datasets. [Pagination](https://en.wikipedia.org/wiki/Pagination), sometimes also called paging or "data chunking", allows CytoTable to avoid loading entire datasets into memory at once while accomplishing tasks. -Keyset pagination leverages existing column data as _pagesets_ to perform data extractions which focus on only a subset of the data as defined within the _pageset_ keys. +Keyset pagination leverages existing column data as _pagesets_ to perform data extractions which focus on only a subset of the data as defined within the _pageset_ keys (see example usage below). We use keyset pagination to reduce the overall memory footprint during extractions where other methods inadvertently may not scale for whole dataset work (such as offset-based pagination, which extracts then drops the offset data)([see here for more information](https://use-the-index-luke.com/no-offset)). ```{eval-rst} From e33de2029daa3c2b1ced6743f1c6d1a63a75bdcd Mon Sep 17 00:00:00 2001 From: d33bs Date: Mon, 26 Aug 2024 10:36:41 -0600 Subject: [PATCH 24/27] add further notes about when/why to customize --- docs/source/overview.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/source/overview.md b/docs/source/overview.md index cc99e66..fb24bde 100644 --- a/docs/source/overview.md +++ b/docs/source/overview.md @@ -297,5 +297,8 @@ We use keyset pagination to reduce the overall memory footprint during extractio ```{eval-rst} Keyset pagination definitions may be defined using the ``page_keys`` parameter: :code:`convert(..., page_keys={"table_name": "column_name" }, ...)` (:mod:`convert() `). The ``page_keys`` parameter expects a dictionary where the keys are names of tables and values which are columns to be used for the keyset pagination pages. -Pagination is created in conjunction with the ``chunk_size`` parameter which indicates the size of each page. +Pagination is implemented in conjunction with the ``chunk_size`` parameter which indicates the size of each page. +We provide preset configurations for these parameters through the ``preset`` parameter :code:`convert(..., preset="", ...)`. +Customizing the ``chunk_size`` or ``page_keys`` parameters allows you to tune the process to the size of your data and the resources available on your system. +For large datasets, smaller chunk sizes or specific pagination columns can help manage the workload by enabling smaller, more manageable data extraction at a time. ``` From 22c326b05bdc3d21fbab6cb68855e09ed5df61e0 Mon Sep 17 00:00:00 2001 From: d33bs Date: Mon, 26 Aug 2024 10:37:46 -0600 Subject: [PATCH 25/27] linting --- cytotable/convert.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cytotable/convert.py b/cytotable/convert.py index 1e3ed8c..d7d2869 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -228,7 +228,9 @@ def _get_table_keyset_pagination_sets( else: sql_query = f"SELECT {page_key} FROM sqlite_scan('{source_path}', '{table_name}') ORDER BY {page_key}" - page_keys = [results[0] for results in ddb_reader.execute(sql_query).fetchall()] + page_keys = [ + results[0] for results in ddb_reader.execute(sql_query).fetchall() + ] # exception case for when we have mixed types # (i.e. integer col with string and ints) in a sqlite column From f3a671d2acef9c81598b2eea9b1a1af0e52e7cc8 Mon Sep 17 00:00:00 2001 From: d33bs Date: Mon, 26 Aug 2024 11:30:09 -0600 Subject: [PATCH 26/27] remove test raise --- tests/test_utils.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/tests/test_utils.py b/tests/test_utils.py index a30f2b4..6e951ec 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -84,10 +84,3 @@ def test_generate_pageset(): # pylint: disable=too-many-statements chunk_size = 3 expected = [(1.1, 3.3), (4.4, 5.5), (8.8, 8.8)] assert _generate_pagesets(keys, chunk_size) == expected - - # Test case with an empty list - keys = [] - chunk_size = 3 - expected = [] - with pytest.raises(CytoTableException): - _generate_pagesets(keys, chunk_size) From c41796efa295d0bd3a6f13f7096f4a8ec0517dba Mon Sep 17 00:00:00 2001 From: d33bs Date: Mon, 26 Aug 2024 15:36:36 -0600 Subject: [PATCH 27/27] sorted results from concatenation by natural sort --- cytotable/convert.py | 32 ++++++++++++++++++++++++-------- cytotable/utils.py | 29 +++++++++++++++++++++++++++-- tests/test_utils.py | 26 ++++++++++++++++++++++++-- 3 files changed, 75 insertions(+), 12 deletions(-) diff --git a/cytotable/convert.py b/cytotable/convert.py index d7d2869..48b9514 100644 --- a/cytotable/convert.py +++ b/cytotable/convert.py @@ -4,7 +4,6 @@ import itertools import logging -import uuid from typing import Any, Dict, List, Literal, Optional, Tuple, Union, cast import parsl @@ -336,6 +335,7 @@ def _source_pageset_to_parquet( base_query = f"SELECT {select_columns} FROM sqlite_scan('{str(source['source_path'])}', '{str(source['table_name'])}')" result_filepath_base = f"{source_dest_path}/{str(source['source_path'].stem)}.{source['table_name']}" + # form a filepath which indicates the pageset result_filepath = f"{result_filepath_base}-{pageset[0]}-{pageset[1]}.parquet" # Attempt to read the data to parquet file @@ -350,7 +350,7 @@ def _source_pageset_to_parquet( f""" {base_query} WHERE {source['page_key']} BETWEEN {pageset[0]} AND {pageset[1]} - /* order by all columns for deterministic output */ + /* optional ordering per pageset */ {"ORDER BY " + source['page_key'] if sort_output else ""}; """ ).arrow(), @@ -535,6 +535,7 @@ def _concat_source_group( source_group: List[Dict[str, Any]], dest_path: str, common_schema: Optional[List[Tuple[str, str]]] = None, + sort_output: bool = True, ) -> List[Dict[str, Any]]: """ Concatenate group of source data together as single file. @@ -581,6 +582,8 @@ def _concat_source_group( common_schema: List[Tuple[str, str]] (Default value = None) Common schema to use for concatenation amongst arrow tables which may have slightly different but compatible schema. + sort_output: bool + Specifies whether to sort cytotable output or not. Returns: List[Dict[str, Any]] @@ -598,6 +601,7 @@ def _concat_source_group( CYTOTABLE_DEFAULT_PARQUET_METADATA, ) from cytotable.exceptions import SchemaException + from cytotable.utils import _natural_sort # build a result placeholder concatted: List[Dict[str, Any]] = [ @@ -636,7 +640,10 @@ def _concat_source_group( # (all must be the same schema) with parquet.ParquetWriter(str(destination_path), writer_schema) as writer: for source in source_group: - for table in [table for table in source["table"]]: + tables = [table for table in source["table"]] + if sort_output: + tables = _natural_sort(tables) + for table in tables: # if we haven't inferred the common schema # check that our file matches the expected schema, otherwise raise an error if common_schema is None and not writer_schema.equals( @@ -758,6 +765,7 @@ def _join_source_pageset( SELECT * FROM joined WHERE {page_key} BETWEEN {pageset[0]} AND {pageset[1]} + /* optional sorting per pagset */ {"ORDER BY " + page_key if sort_output else ""}; """ ).arrow() @@ -784,10 +792,8 @@ def _join_source_pageset( f"{str(pathlib.Path(dest_path).parent)}/" # use the dest_path stem in the name f"{str(pathlib.Path(dest_path).stem)}-" - # give the join chunk result a unique to arbitrarily - # differentiate from other chunk groups which are mapped - # and before they are brought together as one dataset - f"{str(uuid.uuid4().hex)}.parquet" + # add the pageset indication to the filename + f"{pageset[0]}-{pageset[1]}.parquet" ) # write the result @@ -804,6 +810,7 @@ def _concat_join_sources( sources: Dict[str, List[Dict[str, Any]]], dest_path: str, join_sources: List[str], + sort_output: bool = True, ) -> str: """ Concatenate join sources from parquet-based chunks. @@ -820,6 +827,8 @@ def _concat_join_sources( join_sources: List[str]: List of local filepath destination for join source chunks which will be concatenated. + sort_output: bool + Specifies whether to sort cytotable output or not. Returns: str @@ -835,6 +844,7 @@ def _concat_join_sources( CYTOTABLE_ARROW_USE_MEMORY_MAPPING, CYTOTABLE_DEFAULT_PARQUET_METADATA, ) + from cytotable.utils import _natural_sort # remove the unjoined concatted compartments to prepare final dest_path usage # (we now have joined results) @@ -854,7 +864,11 @@ def _concat_join_sources( CYTOTABLE_DEFAULT_PARQUET_METADATA ) with parquet.ParquetWriter(str(dest_path), writer_schema) as writer: - for table_path in join_sources: + for table_path in ( + join_sources + if not sort_output + else _natural_sort(list_to_sort=join_sources) + ): writer.write_table( parquet.read_table( table_path, @@ -1185,6 +1199,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals source_group=source_group_vals[0]["sources"], dest_path=expanded_dest_path, common_schema=source_group_vals[0]["common_schema"], + sort_output=sort_output, ) for source_group_name, source_group_vals in evaluate_futures( common_schema_determined @@ -1236,6 +1251,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals dest_path=expanded_dest_path, join_sources=[join.result() for join in join_sources_result], sources=evaluated_results, + sort_output=sort_output, ) # wrap the final result as a future and return diff --git a/cytotable/utils.py b/cytotable/utils.py index 09ca82e..1ea9792 100644 --- a/cytotable/utils.py +++ b/cytotable/utils.py @@ -17,8 +17,6 @@ from parsl.errors import NoDataFlowKernelError from parsl.executors import HighThroughputExecutor -from cytotable.exceptions import CytoTableException - logger = logging.getLogger(__name__) # reference the original init @@ -617,3 +615,30 @@ def _generate_pagesets( # Return the list of chunks/pages return chunks + + +def _natural_sort(list_to_sort): + """ + Sorts the given iterable using natural sort adapted from approach + provided by the following link: + https://stackoverflow.com/a/4836734 + + Args: + list_to_sort: List: + The list to sort. + + Returns: + List: The sorted list. + """ + import re + + return sorted( + list_to_sort, + # use a custom key to sort the list + key=lambda key: [ + # use integer of c if it's a digit, otherwise str + int(c) if c.isdigit() else c + # Split the key into parts, separating numbers from alphabetic characters + for c in re.split("([0-9]+)", str(key)) + ], + ) diff --git a/tests/test_utils.py b/tests/test_utils.py index 6e951ec..5df0546 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -4,8 +4,7 @@ import pytest -from cytotable.exceptions import CytoTableException -from cytotable.utils import _generate_pagesets +from cytotable.utils import _generate_pagesets, _natural_sort def test_generate_pageset(): # pylint: disable=too-many-statements @@ -84,3 +83,26 @@ def test_generate_pageset(): # pylint: disable=too-many-statements chunk_size = 3 expected = [(1.1, 3.3), (4.4, 5.5), (8.8, 8.8)] assert _generate_pagesets(keys, chunk_size) == expected + + +@pytest.mark.parametrize( + "input_list, expected", + [ + ([], []), + (["a1"], ["a1"]), + (["a1", "a10", "a2", "a3"], ["a1", "a2", "a3", "a10"]), + (["1", "10", "2", "11", "21", "20"], ["1", "2", "10", "11", "20", "21"]), + (["b1", "a1", "b2", "a2"], ["a1", "a2", "b1", "b2"]), + (["apple1", "Apple10", "apple2"], ["Apple10", "apple1", "apple2"]), + (["a1", "A1", "a10", "A10"], ["A1", "A10", "a1", "a10"]), + ( + ["a-1", "a-10", "b-2", "B-1", "b-3", "a-2", "A-3"], + ["A-3", "B-1", "a-1", "a-2", "a-10", "b-2", "b-3"], + ), + ], +) +def test_natural_sort(input_list, expected): + """ + Tests for _natural_sort + """ + assert _natural_sort(input_list) == expected