Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kmer preclustering #92

Closed
wants to merge 38 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
d5d191e
towards allowing independent sample clusterings
AroneyS Dec 5, 2023
e5e9cb1
move preclusters to params
AroneyS Dec 5, 2023
bbf161f
get correct checkpoint output
AroneyS Dec 5, 2023
601c38d
fix target_elusive and cluster_graph rules
AroneyS Dec 5, 2023
68e8a2d
change final rule for tests
AroneyS Dec 5, 2023
31a98df
fix another final rule for tests
AroneyS Dec 5, 2023
71f488e
fix build
AroneyS Dec 5, 2023
e993cd3
group targets and edges with target prefix
AroneyS Dec 5, 2023
530d281
group clusters with overall reorder and rename
AroneyS Dec 5, 2023
962e069
add sourmash to Ibis env
AroneyS Dec 5, 2023
779d9ba
initial sourmash clustering ideas
AroneyS Dec 6, 2023
c916d22
kmer clustering with sourmash api and scipy
AroneyS Dec 6, 2023
6cd3406
fix is_in
AroneyS Dec 6, 2023
547de2e
remove deprecated working_dir
AroneyS Dec 6, 2023
0c4bbdf
add new args without implementation
AroneyS Dec 6, 2023
e29380b
save precluster clusters to file
AroneyS Dec 6, 2023
a1717d2
implement kmer-precluster arg
AroneyS Dec 6, 2023
963baa4
fix build
AroneyS Dec 6, 2023
6c5baa0
add target taxa filtering to precluster_samples
AroneyS Dec 6, 2023
f007707
fix build test
AroneyS Dec 6, 2023
ff51594
account for Ns in otu sequences
AroneyS Dec 7, 2023
c410df1
remove edges and targets combined outputs
AroneyS Dec 7, 2023
9f20c87
add cluster logging
AroneyS Dec 7, 2023
92cfa9b
switch to multithreaded distance calculations
AroneyS Dec 8, 2023
105655c
chunk distance calculations
AroneyS Dec 8, 2023
4e5340f
fix linkage and cluster method calls
AroneyS Dec 8, 2023
7577088
switch to spawn method
AroneyS Dec 8, 2023
4250c20
increase t precision
AroneyS Dec 8, 2023
1dcdfdb
change chunking
AroneyS Dec 11, 2023
2a502fc
add scaled test
AroneyS Dec 11, 2023
3430a9a
separate sketch and distance from preclustering
AroneyS Dec 12, 2023
713eda6
fix test filename
AroneyS Dec 12, 2023
5f99000
switch to single linkage
AroneyS Dec 12, 2023
9f79cfb
set criterion to inconsistent
AroneyS Dec 12, 2023
df2c334
choose largest cluster with size < cutoff for each sample
AroneyS Dec 12, 2023
4b6113c
switch to iterative clustering
AroneyS Dec 12, 2023
8d213eb
update kmer-precluster-samples
AroneyS Jan 5, 2024
53db344
fix residual ibis names
AroneyS Jan 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/test-binchicken.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@ jobs:
- name: Run unit tests
run: |
python test/test_query_processing.py -b
python test/test_precluster_samples.py -b
python test/test_sketch_samples.py -b
python test/test_target_elusive.py -b
python test/test_cluster_graph.py -b
python test/test_collect_reference_bins.py -b
python test/test_summarise_coassemblies.py -b
python test/test_no_genomes.py -b
python test/test_group_clusters.py -b
python test/test_aviary_commands.py
python test/test_is_interleaved.py
python test/test_build.py
Expand Down
1 change: 1 addition & 0 deletions binchicken.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ dependencies:
- pigz=2.3.*
- pyarrow=12.0.*
- parallel=20230522
- sourmash=4.8.*
- pyopenssl>22.1.0 # see https://github.com/pyca/cryptography/issues/7959#issuecomment-1368711852
27 changes: 26 additions & 1 deletion binchicken/binchicken.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

FAST_AVIARY_MODE = "fast"
COMPREHENSIVE_AVIARY_MODE = "comprehensive"
PRECLUSTER_NEVER_MODE = "never"
PRECLUSTER_SIZE_DEP_MODE = "large"
PRECLUSTER_ALWAYS_MODE = "always"

def build_reads_list(forward, reverse):
if reverse:
Expand Down Expand Up @@ -309,6 +312,8 @@ def set_standard_args(args):
args.max_coassembly_samples = None
args.max_coassembly_size = None
args.max_recovery_samples = 1
args.kmer_precluster = PRECLUSTER_NEVER_MODE
args.max_precluster_size = 1000
args.prodigal_meta = False

return(args)
Expand Down Expand Up @@ -438,6 +443,18 @@ def coassemble(args):
args.max_coassembly_samples = 1
args.max_coassembly_size = None

if args.kmer_precluster == PRECLUSTER_NEVER_MODE:
kmer_precluster = False
elif args.kmer_precluster == PRECLUSTER_SIZE_DEP_MODE:
if len(forward_reads) > 1000:
kmer_precluster = True
else:
kmer_precluster = False
elif args.kmer_precluster == PRECLUSTER_ALWAYS_MODE:
kmer_precluster = True
else:
raise ValueError(f"Invalid kmer precluster mode: {args.kmer_precluster}")

try:
build_status = args.build
except AttributeError:
Expand All @@ -462,6 +479,8 @@ def coassemble(args):
"max_coassembly_samples": args.max_coassembly_samples if args.max_coassembly_samples else args.num_coassembly_samples,
"max_coassembly_size": args.max_coassembly_size,
"max_recovery_samples": args.max_recovery_samples,
"kmer_precluster": kmer_precluster,
"max_precluster_size": args.max_precluster_size,
"prodigal_meta": args.prodigal_meta,
# Coassembly config
"assemble_unmapped": args.assemble_unmapped,
Expand Down Expand Up @@ -880,6 +899,7 @@ def build(args):
args.run_qc = True
args.coassemblies = None
args.singlem_metapackage = "."
args.kmer_precluster = PRECLUSTER_NEVER_MODE

# Create mock input files
forward_reads = [os.path.join(args.output, "sample_" + s + ".1.fq") for s in ["1", "2", "3"]]
Expand Down Expand Up @@ -941,6 +961,8 @@ def build(args):
args.output = os.path.join(output_dir, "build_aviary")
mapping_files = [os.path.join(args.output, "coassemble", "mapping", "sample_" + s + "_unmapped." + n + ".fq.gz") for s in ["1", "2", "3"] for n in ["1", "2"]]
mapping_done = os.path.join(args.output, "coassemble", "mapping", "done")
elusive_edges = os.path.join(args.output, "coassemble", "target", "elusive_edges.tsv")
targets = os.path.join(args.output, "coassemble", "target", "targets.tsv")
elusive_clusters = os.path.join(args.output, "coassemble", "target", "elusive_clusters.tsv")
summary_file = os.path.join(args.output, "coassemble", "summary.tsv")

Expand All @@ -950,7 +972,7 @@ def build(args):
with open(clusters_path, "w") as f:
f.write(clusters_text)

for item in mapping_files + [mapping_done, elusive_clusters, summary_file]:
for item in mapping_files + [mapping_done, elusive_edges, targets, elusive_clusters, summary_file]:
os.makedirs(os.path.dirname(item), exist_ok=True)
subprocess.check_call(f"touch {item}", shell=True)

Expand Down Expand Up @@ -1120,6 +1142,9 @@ def add_coassemble_arguments(parser):
coassemble_clustering.add_argument("--max-coassembly-samples", type=int, help="Upper bound for number of samples per coassembly cluster [default: --num-coassembly-samples]", default=None)
coassemble_clustering.add_argument("--max-coassembly-size", type=int, help="Maximum size (Gbp) of coassembly cluster [default: 50Gbp]", default=50)
coassemble_clustering.add_argument("--max-recovery-samples", type=int, help="Upper bound for number of related samples to use for differential abundance binning [default: 20]", default=20)
coassemble_clustering.add_argument("--kmer-precluster", help="Run kmer preclustering of unbinned window sequences. [default: large (perform preclustering when given >1000 samples)]",
default=PRECLUSTER_SIZE_DEP_MODE, choices=[PRECLUSTER_NEVER_MODE, PRECLUSTER_SIZE_DEP_MODE, PRECLUSTER_ALWAYS_MODE])
coassemble_clustering.add_argument("--max-precluster-size", type=int, help="Largest allowed precluster [default: 1000]", default=1000)
coassemble_clustering.add_argument("--prodigal-meta", action="store_true", help="Use prodigal \"-p meta\" argument (for testing)")
# Coassembly options
coassemble_coassembly = parser.add_argument_group("Coassembly options")
Expand Down
2 changes: 2 additions & 0 deletions binchicken/config/template_coassemble.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ min_coassembly_coverage: 1
num_coassembly_samples: 1
max_coassembly_samples: 1
max_recovery_samples: 1
kmer_precluster: false
max_precluster_size: 1
unmapping_min_appraised: 1
unmapping_max_identity: 1
unmapping_max_alignment: 1
Expand Down
106 changes: 93 additions & 13 deletions binchicken/workflow/coassemble.smk
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,20 @@ def get_reads(wildcards, forward=True, version=None):
def get_cat(wildcards):
return "zcat" if [r for r in get_reads(wildcards).values()][0].endswith(".gz") else "cat"

def get_elusive_clusters(wildcards):
checkpoint_output = checkpoints.precluster_samples.get(**wildcards).output[0]

return expand(output_dir + "/target/elusive_clusters_{precluster}.tsv",
precluster=glob_wildcards(os.path.join(checkpoint_output, "unbinned_{precluster}.otu_table.tsv")).precluster)

def get_preclusters(wildcards):
checkpoint_output = checkpoints.precluster_samples.get(**wildcards).output[0]

return expand("{precluster}",
precluster=glob_wildcards(os.path.join(checkpoint_output, "unbinned_{precluster}.otu_table.tsv")).precluster)

def get_reads_coassembly(wildcards, forward=True, recover=False):
checkpoint_output = checkpoints.cluster_graph.get(**wildcards).output[0]
checkpoint_output = checkpoints.group_clusters.get(**wildcards).output.elusive_clusters
elusive_clusters = pl.read_csv(checkpoint_output, separator="\t")

if recover:
Expand All @@ -76,7 +88,7 @@ def get_reads_coassembly(wildcards, forward=True, recover=False):
return [reads[n] for n in sample_names]

def get_coassemblies(wildcards):
checkpoint_output = checkpoints.cluster_graph.get().output[0]
checkpoint_output = checkpoints.group_clusters.get().output.elusive_clusters
elusive_clusters = pl.read_csv(checkpoint_output, separator="\t")
coassemblies = elusive_clusters.get_column("coassembly").to_list()

Expand Down Expand Up @@ -355,12 +367,66 @@ rule count_bp_reads:
"::: {params.names} :::+ {input.reads_1} :::+ {input.reads_2} "
"> {output}"

rule sketch_samples:
input:
unbinned = output_dir + "/appraise/unbinned.otu_table.tsv",
output:
sketch = output_dir + "/sketch/samples.sig"
params:
taxa_of_interest = config["taxa_of_interest"],
threads: 16
resources:
mem_mb=125*1000,
runtime = "6h",
log:
logs_dir + "/precluster/sketching.log"
script:
"scripts/sketch_samples.py"

rule distance_samples:
input:
sketch = output_dir + "/sketch/samples.sig",
output:
distance = output_dir + "/sketch/samples.mat"
threads: 64
resources:
mem_mb=500*1000,
runtime = "168h",
log:
logs_dir + "/precluster/distance.log"
shell:
"sourmash compare "
"{input.sketch} "
"-o {output.distance} "
"-k 60 "
"--distance-matrix "
"-p {threads} "
"&> {log} "

checkpoint precluster_samples:
input:
distance = output_dir + "/sketch/samples.mat" if config["kmer_precluster"] else [],
unbinned = output_dir + "/appraise/unbinned.otu_table.tsv",
output:
directory(output_dir + "/precluster"),
params:
kmer_precluster = config["kmer_precluster"],
max_precluster_size = config["max_precluster_size"],
threads: 1
resources:
mem_mb=8*1000,
runtime = "12h",
log:
logs_dir + "/precluster/precluster_samples.log"
script:
"scripts/precluster_samples.py"

rule target_elusive:
input:
unbinned = output_dir + "/appraise/unbinned.otu_table.tsv"
unbinned = output_dir + "/precluster/unbinned_{precluster}.otu_table.tsv"
output:
output_edges = output_dir + "/target/elusive_edges.tsv",
output_targets = output_dir + "/target/targets.tsv",
output_edges = output_dir + "/target/elusive_edges_{precluster}.tsv",
output_targets = output_dir + "/target/targets_{precluster}.tsv",
params:
min_coassembly_coverage = config["min_coassembly_coverage"],
max_coassembly_samples = config["max_coassembly_samples"],
Expand All @@ -371,36 +437,50 @@ rule target_elusive:
mem_mb=get_mem_mb,
runtime = "24h",
log:
logs_dir + "/target/target_elusive.log"
logs_dir + "/target/target_elusive_{precluster}.log"
benchmark:
benchmarks_dir + "/target/target_elusive.tsv"
benchmarks_dir + "/target/target_elusive_{precluster}.tsv"
script:
"scripts/target_elusive.py"

checkpoint cluster_graph:
rule cluster_graph:
input:
elusive_edges = output_dir + "/target/elusive_edges.tsv",
elusive_edges = output_dir + "/target/elusive_edges_{precluster}.tsv",
read_size = output_dir + "/read_size.csv",
output:
elusive_clusters = output_dir + "/target/elusive_clusters.tsv"
elusive_clusters = output_dir + "/target/elusive_clusters_{precluster}.tsv"
params:
max_coassembly_size = config["max_coassembly_size"],
num_coassembly_samples = config["num_coassembly_samples"],
max_coassembly_samples = config["max_coassembly_samples"],
max_recovery_samples = config["max_recovery_samples"],
coassembly_samples = config["coassembly_samples"],
exclude_coassemblies = config["exclude_coassemblies"],
threads: 64
threads: 32
resources:
mem_mb=get_mem_mb,
runtime = "168h",
log:
logs_dir + "/target/cluster_graph.log"
logs_dir + "/target/cluster_graph_{precluster}.log"
benchmark:
benchmarks_dir + "/target/cluster_graph.tsv"
benchmarks_dir + "/target/cluster_graph_{precluster}.tsv"
script:
"scripts/cluster_graph.py"

checkpoint group_clusters:
input:
elusive_clusters = get_elusive_clusters,
output:
elusive_clusters = output_dir + "/target/elusive_clusters.tsv",
params:
preclusters = get_preclusters,
threads: 64
localrule: True
log:
logs_dir + "/target/group_clusters.log"
script:
"scripts/group_clusters.py"

#######################
### SRA downloading ###
#######################
Expand Down
111 changes: 111 additions & 0 deletions binchicken/workflow/scripts/group_clusters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#########################
### group_clusters.py ###
#########################
# Author: Samuel Aroney

import polars as pl
import os

EDGES_COLUMNS={
"style": str,
"cluster_size": int,
"samples": str,
"target_ids": str,
}

TARGETS_COLUMNS={
"gene": str,
"sample": str,
"sequence": str,
"num_hits": int,
"coverage": float,
"taxonomy": str,
"target": str,
}

ELUSIVE_CLUSTERS_COLUMNS={
"samples": str,
"length": int,
"total_targets": int,
"total_size": int,
"recover_samples": str,
"coassembly": str,
}

def edges_processing(edges, preclusters):
if len(edges) == 1:
return edges[0]

dfs = []
for edge, precluster in zip(edges, preclusters):
dfs.append(
edge
.with_columns(precluster = pl.lit(precluster))
.with_columns(pl.col("target_ids").str.split(","))
.explode("target_ids")
.select(
"style", "cluster_size", "samples",
target_ids = pl.concat_str("precluster", pl.lit("_"), "target_ids")
)
.group_by("style", "cluster_size", "samples")
.agg(pl.col("target_ids"))
.with_columns(pl.col("target_ids").list.join(","))
)

return (
pl.concat(dfs)
)

def targets_processing(targets, preclusters):
if len(targets) == 1:
return targets[0]

dfs = []
for target, precluster in zip(targets, preclusters):
dfs.append(
target
.with_columns(precluster = pl.lit(precluster))
.with_columns(pl.col("target").str.split(","))
.explode("target")
.select(
"gene", "sample", "sequence", "num_hits", "coverage", "taxonomy",
target = pl.concat_str("precluster", pl.lit("_"), "target")
)
.group_by("gene", "sample", "sequence", "num_hits", "coverage", "taxonomy")
.agg(pl.col("target"))
.with_columns(pl.col("target").list.join(","))
)

return (
pl.concat(dfs)
)

def cluster_processing(clusters, _):
if len(clusters) == 1:
return clusters[0]

return (
pl.concat(clusters)
.drop("coassembly")
.sort("total_targets", "total_size", descending=[True, False])
.with_row_count("coassembly")
.select(
"samples", "length", "total_targets", "total_size", "recover_samples",
coassembly = pl.lit("coassembly_") + pl.col("coassembly").cast(pl.Utf8)
)
)

if __name__ == "__main__":
os.environ["POLARS_MAX_THREADS"] = str(snakemake.threads)
import polars as pl
print(f"Polars using {str(pl.threadpool_size())} threads")

input_clusters = snakemake.input.elusive_clusters
output_clusters_path = snakemake.output.elusive_clusters
preclusters = snakemake.params.preclusters

clusters = []
for cluster in input_clusters:
clusters.append(pl.scan_csv(cluster, separator="\t", dtypes=ELUSIVE_CLUSTERS_COLUMNS))
output_clusters = cluster_processing(clusters, preclusters)
output_clusters.collect().write_csv(output_clusters_path, separator="\t")
Loading
Loading