Skip to content

Commit

Permalink
fix: update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
js2264 committed Jul 31, 2024
1 parent 9528580 commit 7e347d9
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 11 deletions.
162 changes: 162 additions & 0 deletions src/momics/Momics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import pandas as pd
from momics import utils
import os
import tiledb
import numpy as np
import pyBigWig
from datetime import datetime


class Momics:

def _get_table(self, tdb: str):
path = os.path.join(self.path, tdb)
if os.path.exists(path) == False:
return None

with tiledb.open(path, "r") as A:
a = A.df[:]

return a

def chroms(self, **kwargs):
"""Extract chromosome lengths table
Returns
-------
Pandas DataFrame
"""
tdb = os.path.join("genome", "chroms.tdb")
chroms = self._get_table(tdb)
if chroms is None:
return pd.DataFrame(columns=["chrom_index", "chr", "length"])

return chroms

def tracks(self, **kwargs):
"""Extract table of ingested bigwigs
Returns
-------
Pandas
"""
tdb = os.path.join("coverage", "tracks.tdb")
tracks = self._get_table(tdb)
if tracks is None:
return pd.DataFrame(columns=["idx", "label", "path"])

return tracks

def add_tracks(self, bws: dict):

# Abort if `chroms` have not been filled
if self.chroms().empty == True:
raise ValueError("Please fill out `chroms` table first.")

# Abort if chr lengths in provided bw do not match those in `chroms`
utils._check_chr_lengths(bws, self.chroms())

# Abort if bw labels already exist
utils._check_track_names(bws, self.tracks())

# If `path/coverage/tracks.tdb` (and `{chroms.tdb}`) do not exist, create it
tdb = os.path.join(self.path, "coverage", "tracks.tdb")
if self.tracks().empty == True:
dom = tiledb.Domain(
tiledb.Dim(name="idx", domain=(0, 9999), dtype=np.int64),
)
attr1 = tiledb.Attr(name="label", dtype="ascii")
attr2 = tiledb.Attr(name="path", dtype="ascii")
schema = tiledb.ArraySchema(domain=dom, attrs=[attr1, attr2], sparse=False)
tiledb.Array.create(tdb, schema)
chroms = self.chroms()
for chrom in chroms["chr"]:
chrom_length = np.array(chroms[chroms["chr"] == chrom]["length"])[0]
tdb = os.path.join(self.path, "coverage", f"{chrom}.tdb")
dom = tiledb.Domain(
tiledb.Dim(
name="position",
domain=(0, chrom_length - 1),
dtype=np.int64,
),
tiledb.Dim(name="idx", domain=(0, 999), dtype=np.int64),
)
attr = tiledb.Attr(
name="scores",
dtype=np.float32,
filters=tiledb.FilterList([tiledb.ZstdFilter()]),
)
schema = tiledb.ArraySchema(domain=dom, attrs=[attr], sparse=True)
tiledb.Array.create(tdb, schema)
n = 0
else:
n = self.tracks().shape[0]

# Populate `path/coverage/tracks.tdb`
tdb = os.path.join(self.path, "coverage", "tracks.tdb")
with tiledb.DenseArray(tdb, mode="w") as array:
array[n : (n + len(bws))] = {
"label": list(bws.keys()),
"path": list(bws.values()),
}

# Populate each `path/coverage/{chrom}.tdb`
chroms = self.chroms()
for chrom in chroms["chr"]:
chrom_length = np.array(chroms[chroms["chr"] == chrom]["length"])[0]
tdb = os.path.join(self.path, "coverage", f"{chrom}.tdb")
for idx, bwf in enumerate(bws):
with pyBigWig.open(bws[bwf]) as bw:
arr = np.array(bw.values(chrom, 0, chrom_length), dtype=np.float32)
print(arr.shape)
print(np.arange(0, chrom_length).shape)
print(np.arange(idx, idx + 1).shape)
with tiledb.open(tdb, mode="w") as A:
# A[np.arange(0, chrom_length), np.arange(idx, idx + 1)] = 0
A[np.arange(0, chrom_length), np.array(0)] = arr

def add_chroms(self, chr_lengths: dict, genome_version=""):
if self.chroms().empty == False:
raise ValueError("`chroms` table has already been filled out.")

if os.path.exists(os.path.join(self.path, "genome", "chroms.tdb")) == False:
# Create the `chroms` array and add it to the `genome` group
path = self.path
tdb = os.path.join(path, "genome", "chroms.tdb")
dom_genome = tiledb.Domain(
tiledb.Dim(
name="chrom_index", domain=(0, len(chr_lengths) - 1), dtype=np.int32
)
)
attr_chr = tiledb.Attr(name="chr", dtype="ascii", var=True)
attr_length = tiledb.Attr(name="length", dtype=np.int64)
schema = tiledb.ArraySchema(
domain=dom_genome, attrs=[attr_chr, attr_length], sparse=True
)
tiledb.Array.create(tdb, schema)

# Populate `chrom` array
chr = list(chr_lengths.keys())
length = list(chr_lengths.values())
with tiledb.open(tdb, "w") as array:
indices = np.arange(len(chr))
array[indices] = {"chr": np.array(chr, dtype="S"), "length": length}
array.meta["genome_assembly_version"] = genome_version
array.meta["timestamp"] = datetime.now().isoformat()

def __init__(self, path: str):

self.path = path
genome_path = os.path.join(path, "genome")
seq_path = os.path.join(path, "genome", "sequence")
coverage_path = os.path.join(path, "coverage")
features_path = os.path.join(path, "features")

if os.path.exists(path) == False:
tiledb.group_create(path)
tiledb.group_create(genome_path)
tiledb.group_create(coverage_path)
tiledb.group_create(features_path)
tiledb.group_create(seq_path)
26 changes: 15 additions & 11 deletions src/momics/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,26 @@

def get_chr_lengths(bw):
with pyBigWig.open(bw) as bw:
{chrom: length for chrom, length in bw.chroms().items()}
a = bw.chroms()
bw.close()
return a


def _check_chr_lengths(bw_files):
reference_lengths = {}
with pyBigWig.open(list(bw_files.values())[0]) as bw:
reference_lengths = {chrom: length for chrom, length in bw.chroms().items()}

# Compare chromosome lengths with other files
def _check_chr_lengths(bw_files, chroms):
reference_lengths = dict(zip(chroms["chr"], chroms["length"]))
for file in list(bw_files.values())[1:]:
with pyBigWig.open(file) as bw:
lengths = {chrom: length for chrom, length in bw.chroms().items()}
lengths = bw.chroms()
if lengths != reference_lengths:
raise Exception(
"Provided .bw files do not have identical chromomosome lengths."
f"{file} files do not have identical chromomosome lengths."
)

# return np.array(list(reference_lengths.values()), dtype=np.int64)
return reference_lengths

def _check_track_names(bw_files, tracks):
labels = set(tracks["label"])
for element in list(bw_files.keys()):
if element in labels:
raise ValueError(
f"Provided label '{element}' already present in `tracks` table"
)

0 comments on commit 7e347d9

Please sign in to comment.