diff --git a/src/genomicarrays/__init__.py b/src/genomicarrays/__init__.py index f9f3ba5..185b801 100644 --- a/src/genomicarrays/__init__.py +++ b/src/genomicarrays/__init__.py @@ -14,3 +14,5 @@ __version__ = "unknown" finally: del version, PackageNotFoundError + +from .build_genomicarray import build_genomicarray \ No newline at end of file diff --git a/src/genomicarrays/build_genomicarray.py b/src/genomicarrays/build_genomicarray.py new file mode 100644 index 0000000..a019074 --- /dev/null +++ b/src/genomicarrays/build_genomicarray.py @@ -0,0 +1,227 @@ +"""Build the `GenomicArrayDatset`. + +The `GenomicArrayDatset` method is designed to store genomic range-based +datasets from BigWig, BigBed or other similar files. + +Example: + + .. code-block:: python + + import pyBigWig as bw + import numpy as np + import tempfile + from genomicarrays import build_genomicarray, MatrixOptions + + # Create a temporary directory + tempdir = tempfile.mkdtemp() + + # Read BigWig objects + bw1 = bw.open("path/to/object1.bw", "r") + # or just provide the path + bw2 = "path/to/object2.bw" + + # Build GenomicArray + dataset = build_genomicarray( + output_path=tempdir, + files=[bw1, bw2], + matrix_options=MatrixOptions(dtype=np.float32), + ) +""" + +import itertools +import os +import warnings +from multiprocessing import Pool +from typing import Dict, Union + +import numpy as np +import pandas as pd +import pyBigWig as bw +import tiledb +from cellarr import buildutils_tiledb_frame as utf + +from . import build_options as bopt +from . import buildutils_tiledb_array as uta +from . import utils_bw as ubw + +# from .GenomicArrayDataset import GenomicArrayDataset + +__author__ = "Jayaram Kancherla" +__copyright__ = "Jayaram Kancherla" +__license__ = "MIT" + + +# TODO: Accept files as a dictionary with names to each dataset. +# TODO: options to only extract specific regions. +def build_genomicarray( + files: list, + output_path: str, + filter_to_regions: Union[str, pd.DataFrame], + genome: Union[str, pd.DataFrame] = "hg38", + sample_metadata: Union[pd.DataFrame, str] = None, + sample_metadata_options: bopt.SampleMetadataOptions = bopt.SampleMetadataOptions(), + matrix_options: bopt.MatrixOptions = bopt.MatrixOptions(), + optimize_tiledb: bool = True, + num_threads: int = 1, +): + """Generate the `GenomicArrayDatset`. + + All files are expected to be consistent and any modifications + to make them consistent is outside the scope of this function + and package. + + Args: + files: + List of file paths to `BigWig` files. + + output_path: + Path to where the output TileDB files should be stored. + + sample_metadata: + A :py:class:`~pandas.DataFrame` containing the sample + metadata for each file in ``files``. Hences the number of rows + in the dataframe must match the number of ``files``. + + Alternatively, may provide path to the file containing a + concatenated sample metadata across all BigWig files. In this case, + the first row is expected to contain the column names. + + Additionally, the order of rows is expected to be in the same + order as the input list of ``files``. + + Defaults to `None`, in which case, we create a simple sample + metadata dataframe containing the list of datasets, aka + each BigWig files. Each dataset is named as ``sample_{i}`` + where `i` refers to the index position of the object in ``files``. + + sample_metadata_options: + Optional parameters when generating ``sample_metadata`` store. + + matrix_options: + Optional parameters when generating ``matrix`` store. + + optimize_tiledb: + Whether to run TileDB's vaccum and consolidation (may take long). + + num_threads: + Number of threads. + Defaults to 1. + """ + if not os.path.isdir(output_path): + raise ValueError("'output_path' must be a directory.") + + #### + ## Writing the sample metadata file + #### + if isinstance(genome, str): + chrom_sizes = pd.read_csv( + "https://hgdownload.soe.ucsc.edu/goldenpath/hg38/bigZips/hg38.chrom.sizes", + sep="\t", + header=None, + names=["chrom", "length"], + ) + elif isinstance(genome, pd.DataFrame): + chrom_sizes = genome + if "chrom" not in chrom_sizes: + raise ValueError(f"genome does not contain column: 'chrom'.") + + if "length" not in chrom_sizes: + raise ValueError(f"genome does not contain column: 'length'.") + + else: + raise TypeError( + "'genome' is not an expected type (either 'str' or 'Dataframe')." + ) + + #### + ## Writing the sample metadata file + #### + _samples = [] + for idx, _ in enumerate(files): + _samples.append(f"sample_{idx + 1}") + + if sample_metadata is None: + warnings.warn( + "Sample metadata is not provided, each dataset in 'files' is considered a sample", + UserWarning, + ) + + sample_metadata = pd.DataFrame({"genarr_sample": _samples}) + elif isinstance(sample_metadata, str): + sample_metadata = pd.read_csv(sample_metadata, header=0) + if "genarr_sample" not in sample_metadata.columns: + sample_metadata["genarr_sample"] = _samples + elif isinstance(sample_metadata, pd.DataFrame): + if "genarr_sample" not in sample_metadata.columns: + sample_metadata["genarr_sample"] = _samples + else: + raise TypeError("'sample_metadata' is not an expected type.") + + if not sample_metadata_options.skip: + _col_types = utf.infer_column_types( + sample_metadata, sample_metadata_options.column_types + ) + + _sample_output_uri = ( + f"{output_path}/{sample_metadata_options.tiledb_store_name}" + ) + utf.create_tiledb_frame_from_dataframe( + _sample_output_uri, sample_metadata, column_types=_col_types + ) + + if optimize_tiledb: + uta.optimize_tiledb_array(_sample_output_uri) + + #### + ## Writing the genomic ranges file + #### + if not matrix_options.skip: + tiledb.group_create(f"{output_path}/ranges") + + _chrm_group_base = f"{output_path}/ranges" + for idx, seq in chrom_sizes.iterrows(): + uta.create_tiledb_array_chrm( + f"{_chrm_group_base}/{seq['chrom']}", + x_dim_length=seq["length"], + y_dim_length=len(files), + matrix_attr_name=matrix_options.matrix_attr_name, + matrix_dim_dtype=matrix_options.dtype, + ) + + all_bws = [(_chrm_group_base, bwpath, idx) for idx, bwpath in enumerate(files)] + input_options = list(itertools.product(all_bws, chrom_sizes["chrom"])) + + _write_bws_to_tiledb(input_options, num_threads=num_threads) + + # if optimize_tiledb: + # uta.optimize_tiledb_array(_counts_uri) + + # return GenomicArrayDataset( + # dataset_path=output_path, + # sample_metadata_uri=sample_metadata_options.tiledb_store_name, + # cell_metadata_uri=cell_metadata_options.tiledb_store_name, + # gene_annotation_uri=gene_annotation_options.tiledb_store_name, + # matrix_tdb_uri=matrix_options.tiledb_store_name, + # ) + + +def _range_writer(outpath, bwpath, bwidx, chrm): + print("query params: ", outpath, bwpath, bwidx, chrm) + data = ubw.extract_bw_intervals(bwpath, chrm) + + if data is not None: + print("data is not None:::", type(data)) + uta.write_frame_intervals_to_tiledb(f"{outpath}/{chrm}", data=data, y_idx=bwidx) + + +def _wrapper_extract_info(args): + iopt, chrm = args + return _range_writer(iopt[0], iopt[1], iopt[2], chrm) + + +def _write_bws_to_tiledb( + input_options: list, + num_threads: int = 1, +): + with Pool(num_threads) as p: + return p.map(_wrapper_extract_info, input_options) diff --git a/src/genomicarrays/build_options.py b/src/genomicarrays/build_options.py new file mode 100644 index 0000000..6745a52 --- /dev/null +++ b/src/genomicarrays/build_options.py @@ -0,0 +1,70 @@ +from dataclasses import dataclass +from typing import Dict + +import numpy as np + +__author__ = "Jayaram Kancherla" +__copyright__ = "Jayaram Kancherla" +__license__ = "MIT" + + +@dataclass +class MatrixOptions: + """Optional arguments for the ``matrix`` store for + :py:func:`~genomicarrays.build_genomicarray.build_genomicarray`. + + Attributes: + matrix_attr_name: + Name of the matrix to be stored in the TileDB file. + Defaults to "data". + + skip: + Whether to skip generating matrix TileDB. + Defaults to False. + + dtype: + NumPy dtype for the values in the matrix. + Defaults to np.uint16. + + Note: make sure the matrix values fit + within the range limits of chosen-dtype. + """ + + skip: bool = False + matrix_attr_name: str = "data" + dtype: np.dtype = np.uint16 + + +@dataclass +class SampleMetadataOptions: + """Optional arguments for the ``sample`` store for + :py:func:`~genomicarrays.build_genomicarray.build_genomicarray`. + + Attributes: + skip: + Whether to skip generating sample TileDB. + Defaults to False. + + dtype: + NumPy dtype for the sample dimension. + Defaults to np.uint32. + + Note: make sure the number of samples fit + within the integer limits of chosen dtype. + + tiledb_store_name: + Name of the TileDB file. + Defaults to "sample_metadata". + + column_types: + A dictionary containing column names as keys + and the value representing the type to in + the TileDB. + + If `None`, all columns are cast as 'ascii'. + """ + + skip: bool = False + dtype: np.dtype = np.uint32 + tiledb_store_name: str = "sample_metadata" + column_types: Dict[str, np.dtype] = None diff --git a/src/genomicarrays/buildutils_tiledb_array.py b/src/genomicarrays/buildutils_tiledb_array.py new file mode 100644 index 0000000..f1dfd33 --- /dev/null +++ b/src/genomicarrays/buildutils_tiledb_array.py @@ -0,0 +1,179 @@ +import os +import shutil +from typing import Union + +import numpy as np +import pandas as pd +import tiledb + +__author__ = "Jayaram Kancherla" +__copyright__ = "Jayaram Kancherla" +__license__ = "MIT" + + +def create_tiledb_array_chrm( + tiledb_uri_path: str, + x_dim_length: int = None, + y_dim_length: int = None, + x_dim_name: str = "base", + y_dim_name: str = "sample_index", + matrix_attr_name: str = "data", + x_dim_dtype: np.dtype = np.uint32, + y_dim_dtype: np.dtype = np.uint32, + matrix_dim_dtype: np.dtype = np.uint32, + is_sparse: bool = True, +): + """Create a TileDB file with the provided attributes to persistent storage. + + This will materialize the array directory and all + related schema files. + + Args: + tiledb_uri_path: + Path to create the array TileDB file. + + x_dim_length: + Number of entries along the x/fastest-changing dimension. + e.g. Number of cells. + Defaults to None, in which case, the max integer value of + ``x_dim_dtype`` is used. + + y_dim_length: + Number of entries along the y dimension. + e.g. Number of genes. + Defaults to None, in which case, the max integer value of + ``y_dim_dtype`` is used. + + x_dim_name: + Name for the x-dimension. + Defaults to "cell_index". + + y_dim_name: + Name for the y-dimension. + Defaults to "gene_index". + + matrix_attr_name: + Name for the attribute in the array. + Defaults to "data". + + x_dim_dtype: + NumPy dtype for the x-dimension. + Defaults to np.uint32. + + y_dim_dtype: + NumPy dtype for the y-dimension. + Defaults to np.uint32. + + matrix_dim_dtype: + NumPy dtype for the values in the matrix. + Defaults to np.uint32. + + is_sparse: + Whether the matrix is sparse. + Defaults to True. + """ + + if x_dim_length is None: + x_dim_length = np.iinfo(x_dim_dtype).max + + if y_dim_length is None: + y_dim_length = np.iinfo(y_dim_dtype).max + + xdim = tiledb.Dim(name=x_dim_name, domain=(0, x_dim_length - 1), dtype=x_dim_dtype) + ydim = tiledb.Dim(name=y_dim_name, domain=(0, y_dim_length - 1), dtype=y_dim_dtype) + + dom = tiledb.Domain(xdim, ydim) + + # expecting counts + tdb_attr = tiledb.Attr( + name=matrix_attr_name, + dtype=matrix_dim_dtype, + filters=tiledb.FilterList([tiledb.GzipFilter()]), + ) + + schema = tiledb.ArraySchema(domain=dom, sparse=is_sparse, attrs=[tdb_attr]) + + if os.path.exists(tiledb_uri_path): + shutil.rmtree(tiledb_uri_path) + + tiledb.Array.create(tiledb_uri_path, schema) + + tdbfile = tiledb.open(tiledb_uri_path, "w") + tdbfile.close() + + +def write_frame_intervals_to_tiledb( + tiledb_array_uri: Union[str, tiledb.SparseArray], + data: pd.DataFrame, + y_idx: int, + value_dtype: np.dtype = np.uint32, +): + """Append and save intervals to TileDB. + + Args: + tiledb_array_uri: + TileDB array object or path to a TileDB object. + + data: + Input dataframe to write to TileDB, must contain + columns, "start", "end" and "value". + + value_dtype: + NumPy dtype to reformat the matrix values. + Defaults to ``uint32``. + """ + tiledb_fp = tiledb_array_uri + if isinstance(tiledb_array_uri, str): + tiledb_fp = tiledb.open(tiledb_array_uri, "w") + + if not isinstance(data, (pd.DataFrame)): + raise TypeError("Intervals not provided as pandas DataFrame.") + + if data is None or len(data) == 0: + return + + for _, row in data.iterrows(): + _len = row["end"] - row["start"] + tiledb_fp[np.arange(row["start"], row["end"]), np.repeat(y_idx, _len)] = ( + np.repeat(row["value"], _len) + ) + + tiledb_fp.close() + + +def write_array_to_tiledb( + tiledb_array_uri: Union[str, tiledb.SparseArray], + data: np.ndarray, + x_idx: np.ndarray, + y_idx: int, + value_dtype: np.dtype = np.uint32, +): + tiledb_fp = tiledb_array_uri + if isinstance(tiledb_array_uri, str): + tiledb_fp = tiledb.open(tiledb_array_uri, "w") + + if not isinstance(data, (np.ndarray)): + raise TypeError("'data' is not an `ndarray`.") + + tiledb_fp[0:x_idx, y_idx] = data.astype(value_dtype) + tiledb_fp.close() + + +def optimize_tiledb_array(tiledb_array_uri: str, verbose: bool = True): + """Consolidate TileDB fragments.""" + if verbose: + print(f"Optimizing {tiledb_array_uri}") + + frags = tiledb.array_fragments(tiledb_array_uri) + if verbose: + print("Fragments before consolidation: {}".format(len(frags))) + + cfg = tiledb.Config() + cfg["sm.consolidation.step_min_frags"] = 1 + cfg["sm.consolidation.step_max_frags"] = 200 + tiledb.consolidate(tiledb_array_uri, config=cfg) + tiledb.vacuum(tiledb_array_uri) + + frags = tiledb.array_fragments(tiledb_array_uri) + if verbose: + print("Fragments after consolidation: {}".format(len(frags))) diff --git a/src/genomicarrays/utils_bw.py b/src/genomicarrays/utils_bw.py new file mode 100644 index 0000000..29ae58c --- /dev/null +++ b/src/genomicarrays/utils_bw.py @@ -0,0 +1,41 @@ +from typing import Optional, Tuple + +import numpy as np +import pandas as pd +import pyBigWig as bw + +__author__ = "Jayaram Kancherla" +__copyright__ = "Jayaram Kancherla" +__license__ = "MIT" + + +def extract_bw_values( + bw_path: str, + chrom: str, +) -> Tuple[np.ndarray, int]: + bwfile = bw.open(bw_path) + if chrom not in bwfile.chroms(): + return None, None + + chrom_length = bwfile.chroms(chrom) + data = bwfile.values(chrom, 0, chrom_length) + return np.array(data), chrom_length + + +def extract_bw_intervals( + bw_path: str, + chrom: str, +) -> Optional[pd.DataFrame]: + bwfile = bw.open(bw_path) + if chrom not in bwfile.chroms(): + return None + + data = pd.DataFrame(bwfile.intervals(chrom), columns=["start", "end", "value"]) + data["chrom"] = chrom + data_nnz = data[data["value"] > 0] + + data_nnz["tmp_group"] = (data_nnz["value"] != data_nnz["value"].shift()).cumsum() + result = data_nnz.groupby("tmp_group").agg( + {"start": "first", "end": "last", "value": "first"} + ) + return result