Skip to content

Commit

Permalink
feat: manifest and consolidate methods
Browse files Browse the repository at this point in the history
  • Loading branch information
js2264 committed Oct 23, 2024
1 parent f2296b7 commit 4a7608b
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 14 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,5 @@ perfs_benchmark.log
bench/
.chromnn
*.keras
*.fa
*.fa.fai
4 changes: 4 additions & 0 deletions src/momics/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ def cli(ctx):
query,
remove,
tree,
consolidate,
manifest,
)

__all__ = [
Expand All @@ -52,4 +54,6 @@ def cli(ctx):
"cp",
"binnify",
"cloudconfig",
"consolidate",
"manifest",
]
37 changes: 37 additions & 0 deletions src/momics/cli/consolidate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import logging
import click

from momics import momics as m

from . import cli


@cli.command()
@click.option(
"--vacuum",
"-v",
help="Flag to indicate wether fragments should be vacuumed after consolidation.",
is_flag=True,
required=False,
)
@click.argument("path", metavar="MOMICS_REPO", required=True)
@click.pass_context
def consolidate(ctx, path, vacuum):
"""Consolidate a momics repository.
Consolidation is the process of compacting all the fragments from the repository
to remove any unused space. This process is useful to reduce the size of the
repository and improve performance.
The vacuum flag indicates wether the consolidated
array should be vacuumed after consolidation. This process is useful to further
reduce the size of the repository.
"""

mom = m.Momics(path)
s = mom.size()
mom.consolidate(vacuum=vacuum)
sf = mom.size()
logging.info(f"Final repository size: {sf/(1024*1024)} Mb.")
logging.info(f"Space saved after consolidation: {(s - sf)/(1024*1024)} Mb.")
return True
15 changes: 8 additions & 7 deletions src/momics/cli/cp.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@
def cp(ctx, path, type, label, force, output):
"""Copy sequence/track/feature set from a momics repo to a fa/bigwig/bed file."""

if not force:
click.confirm(
f"{output} file already exists. \
Are you sure you want to overwrite it",
abort=True,
)
os.remove(output)
if not force and output:
if os.path.exists(output):
click.confirm(
f"{output} file already exists. \
Are you sure you want to overwrite it",
abort=True,
)
os.remove(output)

m = momics.Momics(path)
if type == "sequence":
Expand Down
48 changes: 48 additions & 0 deletions src/momics/cli/manifest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import json
import os
import click

from momics import momics as m

from . import cli


@cli.command()
@click.option(
"--output",
"-o",
help="Path of output JSON file to write.",
type=str,
required=False,
)
@click.option(
"--force",
"-f",
help="Force overwrite of existing files.",
is_flag=True,
default=False,
show_default=True,
)
@click.argument("path", metavar="MOMICS_REPO", required=True)
@click.pass_context
def manifest(ctx, path, output, force):
"""Print the manifest of a momics repository."""

if not force and output:
if os.path.exists(output):
click.confirm(
f"{output} file already exists. Are you sure you want to overwrite it",
abort=True,
)
os.remove(output)

mom = m.Momics(path)
man = mom.manifest()

if not output:
print(json.dumps(man, indent=2))
return None
else:
with open(output, "w") as f:
json.dump(man, f, indent=2)
return True
137 changes: 130 additions & 7 deletions src/momics/momics.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import collections
import concurrent.futures
import logging
import os
Expand Down Expand Up @@ -130,12 +131,12 @@ def _create_chroms_schema(self, chr_lengths: dict) -> None:
tiledb.Dim(
name="chrom_index",
domain=(0, len(chr_lengths) - 1),
dtype=np.int32,
dtype=np.uint32,
tile=len(chr_lengths),
)
)
attr_chr = tiledb.Attr(name="chrom", dtype="ascii", var=True)
attr_length = tiledb.Attr(name="length", dtype=np.int64)
attr_length = tiledb.Attr(name="length", dtype=np.uint32)
schema = tiledb.ArraySchema(
ctx=self.cfg.ctx,
domain=dom_genome,
Expand All @@ -154,7 +155,7 @@ def _create_sequence_schema(self, tile: int) -> None:
tiledb.Dim(
name="position",
domain=(0, chrom_length),
dtype=np.int64,
dtype=np.uint32,
tile=_set_tiledb_tile(tile, chrom_length),
filters=TILEDB_POSITION_FILTERS,
)
Expand All @@ -172,7 +173,7 @@ def _create_track_schema(self, max_bws: int, tile: int) -> None:
# Create /coverage/tracks.tdb
tdb = self._build_uri("coverage", "tracks.tdb")
dom = tiledb.Domain(
tiledb.Dim(name="idx", domain=(0, max_bws), dtype=np.int64, tile=1),
tiledb.Dim(name="idx", domain=(0, max_bws), dtype=np.uint32, tile=1),
)
attr1 = tiledb.Attr(name="label", dtype="ascii")
attr2 = tiledb.Attr(name="path", dtype="ascii")
Expand All @@ -188,7 +189,7 @@ def _create_track_schema(self, max_bws: int, tile: int) -> None:
tiledb.Dim(
name="position",
domain=(0, chrom_length),
dtype=np.int64,
dtype=np.uint32,
tile=_set_tiledb_tile(tile, chrom_length),
filters=TILEDB_POSITION_FILTERS,
)
Expand Down Expand Up @@ -234,14 +235,14 @@ def _create_features_schema(self, max_features: int, tile: int) -> None:
tiledb.Dim(
name="start",
domain=(0, chrom_length),
dtype=np.int64,
dtype=np.uint32,
tile=_set_tiledb_tile(tile, chrom_length),
filters=TILEDB_POSITION_FILTERS,
),
tiledb.Dim(
name="stop",
domain=(0, chrom_length),
dtype=np.int64,
dtype=np.uint32,
tile=_set_tiledb_tile(tile, chrom_length),
filters=TILEDB_POSITION_FILTERS,
),
Expand Down Expand Up @@ -948,3 +949,125 @@ def export_features(self, features: str, output: Path) -> "Momics":
bed = self.features(features)
bed.to_bed(output)
return self

def manifest(self) -> dict:
"""
Returns the manifest of the Momics repository. The manifest lists the
configuration for all the arrays stored in the repository, including
their schema, attributes, and metadata.
"""

def is_tiledb_array(uri, ctx):
try:
return tiledb.object_type(uri, ctx) == "array"
except tiledb.TileDBError:
return False

def is_tiledb_group(uri, ctx):
try:
return tiledb.object_type(uri, ctx) == "group"
except tiledb.TileDBError:
return False

def get_array_schema(uri, ctx):
try:
with tiledb.open(uri, ctx=ctx, mode="r") as array:
schema = array.schema

d = collections.defaultdict(
None,
{
"uri": uri,
"type": "array",
"shape": schema.shape,
"sparse": schema.sparse,
"chunksize": schema.capacity,
"cell_order": schema.cell_order,
"tile_order": schema.tile_order,
},
)

d["dims"] = collections.defaultdict(None)
for dim in schema.domain:
d["dims"][dim.name] = {
"domain": tuple([int(i) for i in dim.domain]),
"dtype": str(dim.dtype),
"tile": int(dim.tile),
"filters": str(dim.filters),
}
d["dims"] = dict(d["dims"])

d["attrs"] = collections.defaultdict(None)
attrs = [schema.attr(x) for x in range(schema.nattr)]
for attr in attrs:
d["attrs"][attr.name] = {
"dtype": str(attr.dtype),
"filters": str(attr.filters),
}
d["attrs"] = dict(d["attrs"])

d["modification_timestamps"] = {frag.uri: frag.timestamp_range for frag in tiledb.FragmentInfoList(uri)}
d = dict(d)

return d

except Exception as e:
print(f"Error reading array schema from {uri}: {e}")
return None

def get_group_metadata(uri):
try:
return {"uri": uri, "type": "group"}
except Exception as e:
print(f"Error reading group metadata from {uri}: {e}")
return None

def traverse_directory(base_dir, ctx):
manifest = collections.defaultdict(None)
for pointer in self.cfg.vfs.ls_recursive(base_dir):
if is_tiledb_array(pointer, ctx):
array_info = get_array_schema(pointer, ctx)
if array_info:
manifest[pointer] = array_info
elif is_tiledb_group(pointer, ctx):
group_info = get_group_metadata(pointer)
if group_info:
manifest[pointer] = group_info

return manifest

man = traverse_directory(self.path, self.cfg.ctx)
return man

def consolidate(self, vacuum: bool = True) -> Literal[True]:
"""
Consolidates the fragments of all arrays in the repository.
Args:
vacuum (bool, optional): Vacuum the consolidated array. Defaults to True.
"""
for pointer in self.cfg.vfs.ls_recursive(self.path):
try:
if tiledb.object_type(pointer) == "array":
logging.debug(f"Consolidating array at: {pointer}")
tiledb.consolidate(pointer)
if vacuum:
tiledb.vacuum(pointer)
except tiledb.TileDBError as e:
print(f"Error processing {pointer}: {e}")

return True

def size(self) -> int:
"""
Returns:
int: The size of the repository in bytes.
"""
size = 0
for pointer in self.cfg.vfs.ls_recursive(self.path):
if self.cfg.vfs.is_file(pointer):
try:
size += self.cfg.vfs.file_size(pointer)
except tiledb.TileDBError as e:
print(f"Error processing {pointer}: {e}")
return size
16 changes: 16 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,22 @@ def test_config(runner):
assert "aws_access_key_id: ABCD" in result.output


def test_consolidate(runner, path):
m = Momics(path)
s = m.size()
result = runner.invoke(cli.consolidate.consolidate, ["--vacuum", path])
assert result.exit_code == 0
sf = m.size()
assert sf < s


def test_manifest(runner, path):
result = runner.invoke(cli.manifest.manifest, ["--output", "out.json", "-f", path])
assert result.exit_code == 0
assert os.path.exists("out.json")
os.remove("out.json")


def test_delete(runner, path):
result = runner.invoke(cli.delete.delete, ["-y", "oiasudhncoaisuhmdcoiaushcd"])
assert result.output == "Repository oiasudhncoaisuhmdcoiaushcd does not exist.\n"
Expand Down
17 changes: 17 additions & 0 deletions tests/test_momics.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import collections
from pathlib import Path
import numpy as np
import pyranges as pr
Expand Down Expand Up @@ -165,6 +166,22 @@ def test_Momics_binnify(momics_path: str):
assert q.df.shape == (60, 3)


@pytest.mark.order(2)
def test_Momics_consolidate(momics_path: str):
mom = momics.Momics(momics_path)
s = mom.size()
x = mom.consolidate(vacuum=True)
assert x
assert mom.size() < s


@pytest.mark.order(2)
def test_Momics_manifest(momics_path: str):
mom = momics.Momics(momics_path)
man = mom.manifest()
assert isinstance(man, collections.defaultdict)


@pytest.mark.order(2)
def test_Momics_features(momics_path: str):
mom = momics.Momics(momics_path)
Expand Down

0 comments on commit 4a7608b

Please sign in to comment.