From 6d9289349e3cc192ac2392a1e1e05602da3b4b89 Mon Sep 17 00:00:00 2001 From: js2264 Date: Fri, 18 Oct 2024 23:37:10 +0200 Subject: [PATCH] fix: some tests --- src/momics/utils.py | 14 +++++++++----- tests/test_cloud.py | 18 +++++++++--------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/src/momics/utils.py b/src/momics/utils.py index 3f36400..982975b 100644 --- a/src/momics/utils.py +++ b/src/momics/utils.py @@ -184,14 +184,18 @@ def pyranges_to_bw(pyranges: pr.PyRanges, scores: np.ndarray, output: str) -> No # Save chrom sizes in header bw = pyBigWig.open(output, "w") - chrom_sizes = pyranges.df.groupby("Chromosome", observed=False)["End"].max().to_dict() - chroms = list(chrom_sizes.keys()) - sizes = list(chrom_sizes.values()) - bw.addHeader(list(zip(chroms, sizes))) + # if there is only one chromosome, get its size and add it to the header + if len(pyranges.Chromosome.unique()) == 1: + chrom_size = pyranges.df["End"].max() + bw.addHeader([(next(iter(pyranges.Chromosome)), chrom_size)]) + else: + chrom_sizes = pyranges.df.groupby("Chromosome", observed=False)["End"].max().to_dict() + chroms = list(chrom_sizes.keys()) + sizes = list(chrom_sizes.values()) + bw.addHeader(list(zip(chroms, sizes))) # Iterate over the PyRanges and write corresponding scores df = pyranges.df - df.Start = df.Start for i, (chrom, start, end) in enumerate(zip(df.Chromosome, df.Start, df.End)): score = scores[i] positions = list(range(start, end)) diff --git a/tests/test_cloud.py b/tests/test_cloud.py index de10cf7..e209a43 100644 --- a/tests/test_cloud.py +++ b/tests/test_cloud.py @@ -71,15 +71,15 @@ def test_s3_IO(fa1: str, bw1: str): assert mom.tracks().__eq__(out).all().all() # Query tracks - q = MultiRangeQuery(mom, "I:1-10").query_tracks() + q = MultiRangeQuery(mom, "I:0-10").query_tracks() assert len(q.coverage) == 1 - assert len(q.coverage["bw1"]["I:1-10"]) == 10 + assert len(q.coverage["bw1"]["I:0-10"]) == 10 assert q.to_df()["chrom"].__eq__(pd.Series(["I"] * 10)).all() # Query sequences q.query_sequence() assert len(q.seq) == 1 - assert q.seq["nucleotide"]["I:1-10"] == "ATCGATCGAT" + assert q.seq["nucleotide"]["I:0-10"] == "ATCGATCGAT" ## Purge existing repo res = mom.remove() @@ -133,15 +133,15 @@ def test_gcs_IO(fa1: str, bw1: str): assert mom.tracks().__eq__(out).all().all() # Query tracks - q = MultiRangeQuery(mom, "I:1-10").query_tracks() + q = MultiRangeQuery(mom, "I:0-10").query_tracks() assert len(q.coverage) == 1 - assert len(q.coverage["bw1"]["I:1-10"]) == 10 + assert len(q.coverage["bw1"]["I:0-10"]) == 10 assert q.to_df()["chrom"].__eq__(pd.Series(["I"] * 10)).all() # Query sequences q.query_sequence() assert len(q.seq) == 1 - assert q.seq["nucleotide"]["I:1-10"] == "ATCGATCGAT" + assert q.seq["nucleotide"]["I:0-10"] == "ATCGATCGAT" ## Purge existing repo res = mom.remove() @@ -220,15 +220,15 @@ def remove_directory_until_success(vfs, dir_uri, max_retries=10, retry_delay=2): assert mom.tracks().__eq__(out).all().all() # Query tracks - q = MultiRangeQuery(mom, "I:1-10").query_tracks() + q = MultiRangeQuery(mom, "I:0-10").query_tracks() assert len(q.coverage) == 1 - assert len(q.coverage["bw1"]["I:1-10"]) == 10 + assert len(q.coverage["bw1"]["I:0-10"]) == 10 assert q.to_df()["chrom"].__eq__(pd.Series(["I"] * 10)).all() # Query sequences q.query_sequence() assert len(q.seq) == 1 - assert q.seq["nucleotide"]["I:1-10"] == "ATCGATCGAT" + assert q.seq["nucleotide"]["I:0-10"] == "ATCGATCGAT" ## Purge existing repo res = mom.remove()