Skip to content

Commit

Permalink
improve target_elusive memory usage
Browse files Browse the repository at this point in the history
lazy streaming collect
categorical/integer types
  • Loading branch information
AroneyS committed Apr 25, 2024
1 parent c773cc3 commit a7d164f
Showing 1 changed file with 26 additions and 24 deletions.
50 changes: 26 additions & 24 deletions binchicken/workflow/scripts/target_elusive.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,15 @@ def pipeline(
"gene", "sample", "sequence", "num_hits", "coverage", "taxonomy",
pl.first("target").over(["gene", "sequence"]).rank("dense") - 1,
)
.with_columns(
pl.col("target").cast(pl.Utf8)
)
)

if unbinned.height == 0:
logging.warning("No SingleM sequences found for the given samples")
return unbinned, pl.DataFrame(schema=EDGES_COLUMNS)
return unbinned.with_columns(pl.col("target").cast(pl.Utf8)), pl.DataFrame(schema=EDGES_COLUMNS)

def process_groups(df):
if df.height == 1:
return pl.DataFrame(schema={"style": str, "cluster_size": pl.Int64, "samples": str, "target": str})
return pl.DataFrame(schema={"style": str, "cluster_size": pl.Int64, "samples": str, "target": pl.UInt32})

# Direct matching samples in pairs with coverage > MIN_COASSEMBLY_COVERAGE
dfs = [
Expand Down Expand Up @@ -163,21 +160,24 @@ def process_groups(df):

if sample_preclusters is not None:
logging.info("Using chosen clusters to find appropriate targets")
sparse_edges = (
sample_preclusters
.with_columns(sample_ids = pl.col("samples").str.split(","))
.with_columns(cluster_size = pl.col("sample_ids").list.len())
.explode("sample_ids")
.join(unbinned.select("target", "coverage", sample_ids="sample"), on="sample_ids")
.group_by("samples", "cluster_size", "target")
.agg(pl.sum("coverage"), count = pl.len())
.filter(pl.col("count") == pl.col("cluster_size"))
.filter(pl.col("coverage") > MIN_COASSEMBLY_COVERAGE)
.group_by("samples", "cluster_size")
.agg(target_ids = pl.col("target").sort().str.concat(","))
.with_columns(style = pl.lit("match"))
.select("style", "cluster_size", "samples", "target_ids")
)
with pl.StringCache():
sparse_edges = (
sample_preclusters
.lazy()
.with_columns(sample_ids = pl.col("samples").str.split(",").cast(pl.List(pl.Categorical)))
.with_columns(cluster_size = pl.col("sample_ids").list.len())
.explode("sample_ids")
.join(unbinned.lazy().select("target", "coverage", sample_ids=pl.col("sample").cast(pl.Categorical)), on="sample_ids")
.group_by("samples", "cluster_size", "target")
.agg(pl.sum("coverage"), count = pl.len())
.filter(pl.col("count") == pl.col("cluster_size"))
.filter(pl.col("coverage") > MIN_COASSEMBLY_COVERAGE)
.group_by("samples", "cluster_size")
.agg(target_ids = pl.col("target").cast(pl.Utf8).sort().str.concat(","))
.with_columns(style = pl.lit("match"))
.select("style", "cluster_size", "samples", "target_ids")
.collect(streaming=True)
)
else:
logging.info("Grouping targets into paired matches and pooled samples for clusters of size 3+")
sparse_edges = (
Expand All @@ -190,10 +190,10 @@ def process_groups(df):
.group_by("target")
.map_groups(process_groups)
.group_by(["style", "cluster_size", "samples"])
.agg(target_ids = pl.col("target").sort().str.concat(","))
.agg(target_ids = pl.col("target").cast(pl.Utf8).sort().str.concat(","))
)

return unbinned, sparse_edges
return unbinned.with_columns(pl.col("target").cast(pl.Utf8)), sparse_edges

if __name__ == "__main__":
os.environ["POLARS_MAX_THREADS"] = str(snakemake.threads)
Expand All @@ -216,9 +216,9 @@ def process_groups(df):
edges_path = snakemake.output.output_edges
samples = set(snakemake.params.samples)

unbinned = pl.read_csv(unbinned_path, separator="\t")

if distances_path:
os.environ["POLARS_STREAMING_CHUNK_SIZE"] = "1"
import polars as pl
from sourmash import fig
distances, samples = fig.load_matrix_and_labels(distances_path)
sample_preclusters = get_clusters(
Expand All @@ -230,6 +230,8 @@ def process_groups(df):
else:
sample_preclusters = None

unbinned = pl.read_csv(unbinned_path, separator="\t")

targets, edges = pipeline(
unbinned,
samples,
Expand Down

0 comments on commit a7d164f

Please sign in to comment.