diff --git a/.gitignore b/.gitignore index 8ac2a08..a6a443f 100644 --- a/.gitignore +++ b/.gitignore @@ -66,3 +66,5 @@ perfs_benchmark.log bench/ .chromnn *.keras +*.fa +*.fa.fai diff --git a/src/momics/cli/__init__.py b/src/momics/cli/__init__.py index e7804da..19faecf 100644 --- a/src/momics/cli/__init__.py +++ b/src/momics/cli/__init__.py @@ -39,6 +39,8 @@ def cli(ctx): query, remove, tree, + consolidate, + manifest, ) __all__ = [ @@ -52,4 +54,6 @@ def cli(ctx): "cp", "binnify", "cloudconfig", + "consolidate", + "manifest", ] diff --git a/src/momics/cli/consolidate.py b/src/momics/cli/consolidate.py new file mode 100644 index 0000000..dee0f8d --- /dev/null +++ b/src/momics/cli/consolidate.py @@ -0,0 +1,37 @@ +import logging +import click + +from momics import momics as m + +from . import cli + + +@cli.command() +@click.option( + "--vacuum", + "-v", + help="Flag to indicate wether fragments should be vacuumed after consolidation.", + is_flag=True, + required=False, +) +@click.argument("path", metavar="MOMICS_REPO", required=True) +@click.pass_context +def consolidate(ctx, path, vacuum): + """Consolidate a momics repository. + + Consolidation is the process of compacting all the fragments from the repository + to remove any unused space. This process is useful to reduce the size of the + repository and improve performance. + + The vacuum flag indicates wether the consolidated + array should be vacuumed after consolidation. This process is useful to further + reduce the size of the repository. + """ + + mom = m.Momics(path) + s = mom.size() + mom.consolidate(vacuum=vacuum) + sf = mom.size() + logging.info(f"Final repository size: {sf/(1024*1024)} Mb.") + logging.info(f"Space saved after consolidation: {(s - sf)/(1024*1024)} Mb.") + return True diff --git a/src/momics/cli/cp.py b/src/momics/cli/cp.py index ef100e3..40ec8ae 100644 --- a/src/momics/cli/cp.py +++ b/src/momics/cli/cp.py @@ -40,13 +40,14 @@ def cp(ctx, path, type, label, force, output): """Copy sequence/track/feature set from a momics repo to a fa/bigwig/bed file.""" - if not force: - click.confirm( - f"{output} file already exists. \ - Are you sure you want to overwrite it", - abort=True, - ) - os.remove(output) + if not force and output: + if os.path.exists(output): + click.confirm( + f"{output} file already exists. \ + Are you sure you want to overwrite it", + abort=True, + ) + os.remove(output) m = momics.Momics(path) if type == "sequence": diff --git a/src/momics/cli/manifest.py b/src/momics/cli/manifest.py new file mode 100644 index 0000000..502595a --- /dev/null +++ b/src/momics/cli/manifest.py @@ -0,0 +1,48 @@ +import json +import os +import click + +from momics import momics as m + +from . import cli + + +@cli.command() +@click.option( + "--output", + "-o", + help="Path of output JSON file to write.", + type=str, + required=False, +) +@click.option( + "--force", + "-f", + help="Force overwrite of existing files.", + is_flag=True, + default=False, + show_default=True, +) +@click.argument("path", metavar="MOMICS_REPO", required=True) +@click.pass_context +def manifest(ctx, path, output, force): + """Print the manifest of a momics repository.""" + + if not force and output: + if os.path.exists(output): + click.confirm( + f"{output} file already exists. Are you sure you want to overwrite it", + abort=True, + ) + os.remove(output) + + mom = m.Momics(path) + man = mom.manifest() + + if not output: + print(json.dumps(man, indent=2)) + return None + else: + with open(output, "w") as f: + json.dump(man, f, indent=2) + return True diff --git a/src/momics/momics.py b/src/momics/momics.py index e6e18a9..3c60db2 100644 --- a/src/momics/momics.py +++ b/src/momics/momics.py @@ -1,3 +1,4 @@ +import collections import concurrent.futures import logging import os @@ -130,12 +131,12 @@ def _create_chroms_schema(self, chr_lengths: dict) -> None: tiledb.Dim( name="chrom_index", domain=(0, len(chr_lengths) - 1), - dtype=np.int32, + dtype=np.uint32, tile=len(chr_lengths), ) ) attr_chr = tiledb.Attr(name="chrom", dtype="ascii", var=True) - attr_length = tiledb.Attr(name="length", dtype=np.int64) + attr_length = tiledb.Attr(name="length", dtype=np.uint32) schema = tiledb.ArraySchema( ctx=self.cfg.ctx, domain=dom_genome, @@ -154,7 +155,7 @@ def _create_sequence_schema(self, tile: int) -> None: tiledb.Dim( name="position", domain=(0, chrom_length), - dtype=np.int64, + dtype=np.uint32, tile=_set_tiledb_tile(tile, chrom_length), filters=TILEDB_POSITION_FILTERS, ) @@ -172,7 +173,7 @@ def _create_track_schema(self, max_bws: int, tile: int) -> None: # Create /coverage/tracks.tdb tdb = self._build_uri("coverage", "tracks.tdb") dom = tiledb.Domain( - tiledb.Dim(name="idx", domain=(0, max_bws), dtype=np.int64, tile=1), + tiledb.Dim(name="idx", domain=(0, max_bws), dtype=np.uint32, tile=1), ) attr1 = tiledb.Attr(name="label", dtype="ascii") attr2 = tiledb.Attr(name="path", dtype="ascii") @@ -188,7 +189,7 @@ def _create_track_schema(self, max_bws: int, tile: int) -> None: tiledb.Dim( name="position", domain=(0, chrom_length), - dtype=np.int64, + dtype=np.uint32, tile=_set_tiledb_tile(tile, chrom_length), filters=TILEDB_POSITION_FILTERS, ) @@ -234,14 +235,14 @@ def _create_features_schema(self, max_features: int, tile: int) -> None: tiledb.Dim( name="start", domain=(0, chrom_length), - dtype=np.int64, + dtype=np.uint32, tile=_set_tiledb_tile(tile, chrom_length), filters=TILEDB_POSITION_FILTERS, ), tiledb.Dim( name="stop", domain=(0, chrom_length), - dtype=np.int64, + dtype=np.uint32, tile=_set_tiledb_tile(tile, chrom_length), filters=TILEDB_POSITION_FILTERS, ), @@ -948,3 +949,125 @@ def export_features(self, features: str, output: Path) -> "Momics": bed = self.features(features) bed.to_bed(output) return self + + def manifest(self) -> dict: + """ + Returns the manifest of the Momics repository. The manifest lists the + configuration for all the arrays stored in the repository, including + their schema, attributes, and metadata. + """ + + def is_tiledb_array(uri, ctx): + try: + return tiledb.object_type(uri, ctx) == "array" + except tiledb.TileDBError: + return False + + def is_tiledb_group(uri, ctx): + try: + return tiledb.object_type(uri, ctx) == "group" + except tiledb.TileDBError: + return False + + def get_array_schema(uri, ctx): + try: + with tiledb.open(uri, ctx=ctx, mode="r") as array: + schema = array.schema + + d = collections.defaultdict( + None, + { + "uri": uri, + "type": "array", + "shape": schema.shape, + "sparse": schema.sparse, + "chunksize": schema.capacity, + "cell_order": schema.cell_order, + "tile_order": schema.tile_order, + }, + ) + + d["dims"] = collections.defaultdict(None) + for dim in schema.domain: + d["dims"][dim.name] = { + "domain": tuple([int(i) for i in dim.domain]), + "dtype": str(dim.dtype), + "tile": int(dim.tile), + "filters": str(dim.filters), + } + d["dims"] = dict(d["dims"]) + + d["attrs"] = collections.defaultdict(None) + attrs = [schema.attr(x) for x in range(schema.nattr)] + for attr in attrs: + d["attrs"][attr.name] = { + "dtype": str(attr.dtype), + "filters": str(attr.filters), + } + d["attrs"] = dict(d["attrs"]) + + d["modification_timestamps"] = {frag.uri: frag.timestamp_range for frag in tiledb.FragmentInfoList(uri)} + d = dict(d) + + return d + + except Exception as e: + print(f"Error reading array schema from {uri}: {e}") + return None + + def get_group_metadata(uri): + try: + return {"uri": uri, "type": "group"} + except Exception as e: + print(f"Error reading group metadata from {uri}: {e}") + return None + + def traverse_directory(base_dir, ctx): + manifest = collections.defaultdict(None) + for pointer in self.cfg.vfs.ls_recursive(base_dir): + if is_tiledb_array(pointer, ctx): + array_info = get_array_schema(pointer, ctx) + if array_info: + manifest[pointer] = array_info + elif is_tiledb_group(pointer, ctx): + group_info = get_group_metadata(pointer) + if group_info: + manifest[pointer] = group_info + + return manifest + + man = traverse_directory(self.path, self.cfg.ctx) + return man + + def consolidate(self, vacuum: bool = True) -> Literal[True]: + """ + Consolidates the fragments of all arrays in the repository. + + Args: + vacuum (bool, optional): Vacuum the consolidated array. Defaults to True. + """ + for pointer in self.cfg.vfs.ls_recursive(self.path): + try: + if tiledb.object_type(pointer) == "array": + logging.debug(f"Consolidating array at: {pointer}") + tiledb.consolidate(pointer) + if vacuum: + tiledb.vacuum(pointer) + except tiledb.TileDBError as e: + print(f"Error processing {pointer}: {e}") + + return True + + def size(self) -> int: + """ + Returns: + int: The size of the repository in bytes. + """ + size = 0 + for pointer in self.cfg.vfs.ls_recursive(self.path): + if self.cfg.vfs.is_file(pointer): + try: + size += self.cfg.vfs.file_size(pointer) + except tiledb.TileDBError as e: + print(f"Error processing {pointer}: {e}") + return size diff --git a/tests/test_cli.py b/tests/test_cli.py index 4cf1965..bde8c54 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -182,6 +182,22 @@ def test_config(runner): assert "aws_access_key_id: ABCD" in result.output +def test_consolidate(runner, path): + m = Momics(path) + s = m.size() + result = runner.invoke(cli.consolidate.consolidate, ["--vacuum", path]) + assert result.exit_code == 0 + sf = m.size() + assert sf < s + + +def test_manifest(runner, path): + result = runner.invoke(cli.manifest.manifest, ["--output", "out.json", "-f", path]) + assert result.exit_code == 0 + assert os.path.exists("out.json") + os.remove("out.json") + + def test_delete(runner, path): result = runner.invoke(cli.delete.delete, ["-y", "oiasudhncoaisuhmdcoiaushcd"]) assert result.output == "Repository oiasudhncoaisuhmdcoiaushcd does not exist.\n" diff --git a/tests/test_momics.py b/tests/test_momics.py index 36c3b3b..a8970cb 100644 --- a/tests/test_momics.py +++ b/tests/test_momics.py @@ -1,3 +1,4 @@ +import collections from pathlib import Path import numpy as np import pyranges as pr @@ -165,6 +166,22 @@ def test_Momics_binnify(momics_path: str): assert q.df.shape == (60, 3) +@pytest.mark.order(2) +def test_Momics_consolidate(momics_path: str): + mom = momics.Momics(momics_path) + s = mom.size() + x = mom.consolidate(vacuum=True) + assert x + assert mom.size() < s + + +@pytest.mark.order(2) +def test_Momics_manifest(momics_path: str): + mom = momics.Momics(momics_path) + man = mom.manifest() + assert isinstance(man, collections.defaultdict) + + @pytest.mark.order(2) def test_Momics_features(momics_path: str): mom = momics.Momics(momics_path)