diff --git a/.vscode/settings.json b/.vscode/settings.json index 18dc711..94f9c13 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,7 +1,11 @@ { "cSpell.words": [ + "editdistmax", + "editdistmin", "fusobacteria", "Lactobacillales", + "minmapq", + "ncbi", "taxas" ] } diff --git a/pyproject.toml b/pyproject.toml index bd04d8f..1c36f1e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,14 +19,16 @@ PyYAML = "^6.0" pandas = "^1.4.1" scipy = "^1.8.0" pyarrow = "^7.0.0" +logger-tt = "^1.7.0" +psutil = "^5.9.0" + + # scipy = { version = "^1.7.3", python = ">=3.9,<3.11" } # Fit related packages: poetry install --extras fit -logger-tt = {version = "^1.7.0", optional = true} iminuit = {version = "^2.10.0", optional = true} numpyro = {version = "^0.9.1", optional = true} joblib = {version = "^1.1.0", optional = true} -psutil = {version = "^5.9.0", optional = true} numba = {version = "^0.55.1", optional = true} # Viz related packages: poetry install --extras viz diff --git a/src/metaDMG/cli/cli.py b/src/metaDMG/cli/cli.py index 16dd853..9cf789b 100644 --- a/src/metaDMG/cli/cli.py +++ b/src/metaDMG/cli/cli.py @@ -270,18 +270,12 @@ def compute( utils.check_metaDMG_fit() - from metaDMG.fit import ( - get_logger_port_and_path, - make_configs, - run_workflow, - setup_logger, - ) + from metaDMG.fit import get_logger_port_and_path, run_workflow, setup_logger log_port, log_path = get_logger_port_and_path() - setup_logger(log_port=log_port, log_path=log_path) - configs = make_configs( + configs = utils.make_configs( config_file=config_file, log_port=log_port, log_path=log_path, @@ -576,6 +570,26 @@ def mismatch_to_mapDamage( mismatch_to_mapDamage.convert(filename=filename, csv_out=csv_out) +#%% + + +@cli_app.command("PMD") +def compute_PMD( + config_file: Path = typer.Argument( + ..., + file_okay=True, + help="Path to the config-file.", + ), +): + """Compute the PMD scores for the samples in the config file.""" + + from metaDMG.utils import make_configs, run_PMD + + configs = make_configs(config_file) + for config in configs: + run_PMD(config["bam"], config["path_pmd"]) + + #%% # needed for sphinx documentation diff --git a/src/metaDMG/fit/__init__.py b/src/metaDMG/fit/__init__.py index 7b56468..ff4f1a7 100644 --- a/src/metaDMG/fit/__init__.py +++ b/src/metaDMG/fit/__init__.py @@ -1,3 +1,2 @@ -from metaDMG.fit.fit_utils import make_configs from metaDMG.fit.workflow import run_workflow from metaDMG.loggers.loggers import get_logger_port_and_path, setup_logger diff --git a/src/metaDMG/fit/fit_utils.py b/src/metaDMG/fit/fit_utils.py index 8d8047e..43b18fb 100644 --- a/src/metaDMG/fit/fit_utils.py +++ b/src/metaDMG/fit/fit_utils.py @@ -6,7 +6,6 @@ import numpy as np import pandas as pd -import psutil import typer import yaml from iminuit import describe @@ -15,8 +14,6 @@ from scipy.special import erf, erfinv from scipy.stats import chi2 as sp_chi2 -from metaDMG.utils import _update_old_config - #%% @@ -30,192 +27,6 @@ #%% -class Config(dict): - """Config contains the parameters related to specific alignment file.""" - - pass - - -class Configs(dict): - """Configs contains the parameters related to config file. - Inherits from dict. Implements iterations. - """ - - def __iter__(self) -> Iterator[Config]: - """Iteration - - Yields - ------ - Iterator[Config] - Allow for iteration - """ - dir_lca = self["output_dir"] / "lca" - samples = self["samples"].keys() - for sample in samples: - config = Config(self) - config["sample"] = sample - config["bam"] = config["samples"][sample] - - config["path_mismatches_txt"] = dir_lca / f"{sample}.mismatches.txt.gz" - - if config["damage_mode"] == "lca": - config["path_mismatches_stat"] = ( - dir_lca / f"{sample}.mismatches.stat.txt.gz" - ) - else: - config["path_mismatches_stat"] = dir_lca / f"{sample}.stat.txt" - - config["path_lca"] = dir_lca / f"{sample}.lca.txt.gz" - config["path_lca_log"] = dir_lca / f"{sample}.log.txt" - config["path_tmp"] = config["output_dir"] / "tmp" / sample - yield config - - def get_nth(self, n: int) -> Config: - """Gets the n'th config - - Parameters - ---------- - n - The index - - Returns - ------- - Config - A single configuration - """ - return next(islice(self, n, None)) - - def get_first(self) -> Config: - """Get the first config - - Returns - ------- - Config - A single configuration - """ - return self.get_nth(n=0) - - def __len__(self) -> int: - """The number of configs - - Returns - ------- - int - The number of configs - """ - return len(self["samples"].keys()) - - def check_number_of_jobs(self) -> None: - """Compare the number of configs to the number of parallel_samples used.""" - - parallel_samples = min(self["parallel_samples"], len(self["samples"])) - cores_per_sample = self["cores_per_sample"] - N_jobs = parallel_samples * cores_per_sample - max_cores = psutil.cpu_count(logical=True) - max_cores_real = psutil.cpu_count(logical=False) - - if N_jobs > max_cores: - logger.warning( - f"The total number of jobs {N_jobs} are higher " - f"than the number of parallel_samples {max_cores}. " - f"Do not do this unless you know what you are doing. " - f"Try decreasing either 'parallel_samples' or 'parallel_samples-per-sample'." - ) - elif N_jobs > max_cores_real: - logger.info( - f"The total number of jobs {N_jobs} are higher " - f"than the real number of parallel_samples {max_cores_real} (non-logical). " - f"This might decrease performance. " - ) - - -def make_configs( - config_file: Optional[Path], - log_port: Optional[int] = None, - log_path: Optional[str] = None, - force: bool = False, -) -> Configs: - """Create an instance of Configs from a config file - - Parameters - ---------- - config_file - The config file to load - log_port - Optional log port, by default None - log_path - Optional log path, by default None - force - Whether or not the computations are force, by default False - - Returns - ------- - An instance of Configs - - Raises - ------ - typer.Abort - If not a proper config file - """ - - if config_file is None: - config_file = Path("config.yaml") - - if not config_file.exists(): - logger.error("Error! Please select a proper config file!") - raise typer.Abort() - - logger.info(f"Using {config_file} as config file.") - with open(config_file, "r") as file: - d = yaml.safe_load(file) - - d = update_old_config(d) - - d["log_port"] = log_port - d["log_path"] = log_path - - d.setdefault("forward_only", False) - d.setdefault("cores_per_sample", 1) - d.setdefault("damage_mode", "lca") - d["force"] = force - - paths = ["names", "nodes", "acc2tax", "output_dir", "config_file"] - for path in paths: - d[path] = Path(d[path]) - for key, val in d["samples"].items(): - d["samples"][key] = Path(val) - - for key, val in d.items(): - if isinstance(val, str): - if val.isdigit(): - d[key] = int(key) - - d["custom_database"] = 0 if d["custom_database"] else 1 - - return Configs(d) - - -#%% - - -def update_old_config(d: dict) -> dict: - if "version" in d: - # new version, not changing anything - return d - - # updating the old version - - logger.warning( - "Using an old version of the config file. Please remake the config file." - ) - - d_new = _update_old_config(d) - return d_new - - -#%% - - def downcast_dataframe(df, categories=None, fully_automatic=False): if categories is None: diff --git a/src/metaDMG/fit/serial.py b/src/metaDMG/fit/serial.py index 9f14ba6..fa10d8d 100644 --- a/src/metaDMG/fit/serial.py +++ b/src/metaDMG/fit/serial.py @@ -21,8 +21,8 @@ metadamageError, ) from metaDMG.fit import fits, mismatches, results -from metaDMG.fit.fit_utils import Config from metaDMG.loggers.loggers import setup_logger +from metaDMG.utils import Config #%% diff --git a/src/metaDMG/fit/workflow.py b/src/metaDMG/fit/workflow.py index f94c3da..eb87250 100644 --- a/src/metaDMG/fit/workflow.py +++ b/src/metaDMG/fit/workflow.py @@ -4,19 +4,20 @@ from logger_tt import logger -from metaDMG.fit import fit_utils, serial +from metaDMG.fit.serial import run_single_config +from metaDMG.utils import Configs -def run_workflow(configs: fit_utils.Configs): +def run_workflow(configs: Configs): parallel_samples = min(configs["parallel_samples"], len(configs)) logger.info(f"Running metaDMG on {len(configs)} files in total.") if parallel_samples == 1 or len(configs) == 1: - logger.info(f"Running in seriel (1 core)") + logger.info(f"Running in serial (1 core)") for config in configs: - dfs = serial.run_single_config(config) + dfs = run_single_config(config) # df_mismatches, df_fit_results, df_results = dfs else: @@ -25,6 +26,6 @@ def run_workflow(configs: fit_utils.Configs): with Pool(max_workers=parallel_samples) as pool: # for dfs in pool.imap_unordered(serial.run_single_config, configs): - for dfs in pool.map(serial.run_single_config, configs): + for dfs in pool.map(run_single_config, configs): pass # df_mismatches, df_fit_results, df_results = dfs diff --git a/src/metaDMG/main.py b/src/metaDMG/main.py index a54276f..11b9fef 100644 --- a/src/metaDMG/main.py +++ b/src/metaDMG/main.py @@ -26,9 +26,9 @@ def compute_config( utils.check_metaDMG_fit() - from metaDMG.fit import make_configs, run_workflow + from metaDMG.fit import run_workflow - configs = make_configs(config_file=config_file, force=force) + configs = utils.make_configs(config_file=config_file, force=force) run_workflow(configs) diff --git a/src/metaDMG/utils.py b/src/metaDMG/utils.py index 89f98af..0c97c70 100644 --- a/src/metaDMG/utils.py +++ b/src/metaDMG/utils.py @@ -2,25 +2,204 @@ import re import warnings from functools import partial +from itertools import islice from pathlib import Path -from typing import Iterable, Optional, Union +from typing import Iterable, Iterator, Optional, Union import numpy as np import pandas as pd +import psutil import typer import yaml +from logger_tt import logger from scipy.stats import betabinom as sp_betabinom #%% -def _update_old_config(d: dict) -> dict: +class Config(dict): + """Config contains the parameters related to specific alignment file.""" + + pass + + +class Configs(dict): + """Configs contains the parameters related to config file. + Inherits from dict. Implements iterations. + """ + + def __iter__(self) -> Iterator[Config]: + """Iteration + + Yields + ------ + Iterator[Config] + Allow for iteration + """ + dir_lca = self["output_dir"] / "lca" + dir_pmd = self["output_dir"] / "pmd" + samples = self["samples"].keys() + for sample in samples: + config = Config(self) + config["sample"] = sample + config["bam"] = config["samples"][sample] + + config["path_mismatches_txt"] = dir_lca / f"{sample}.mismatches.txt.gz" + + if config["damage_mode"] == "lca": + config["path_mismatches_stat"] = ( + dir_lca / f"{sample}.mismatches.stat.txt.gz" + ) + else: + config["path_mismatches_stat"] = dir_lca / f"{sample}.stat.txt" + + config["path_lca"] = dir_lca / f"{sample}.lca.txt.gz" + config["path_lca_log"] = dir_lca / f"{sample}.log.txt" + config["path_tmp"] = config["output_dir"] / "tmp" / sample + + config["path_pmd"] = dir_pmd / f"{sample}.pmd.txt.gz" + + yield config + + def get_nth(self, n: int) -> Config: + """Gets the n'th config + + Parameters + ---------- + n + The index + + Returns + ------- + Config + A single configuration + """ + return next(islice(self, n, None)) + + def get_first(self) -> Config: + """Get the first config + + Returns + ------- + Config + A single configuration + """ + return self.get_nth(n=0) + + def __len__(self) -> int: + """The number of configs + + Returns + ------- + int + The number of configs + """ + return len(self["samples"].keys()) + + def check_number_of_jobs(self) -> None: + """Compare the number of configs to the number of parallel_samples used.""" + + parallel_samples = min(self["parallel_samples"], len(self["samples"])) + cores_per_sample = self["cores_per_sample"] + N_jobs = parallel_samples * cores_per_sample + max_cores = psutil.cpu_count(logical=True) + max_cores_real = psutil.cpu_count(logical=False) + + if N_jobs > max_cores: + logger.warning( + f"The total number of jobs {N_jobs} are higher " + f"than the number of parallel_samples {max_cores}. " + f"Do not do this unless you know what you are doing. " + f"Try decreasing either 'parallel_samples' or 'parallel_samples-per-sample'." + ) + elif N_jobs > max_cores_real: + logger.info( + f"The total number of jobs {N_jobs} are higher " + f"than the real number of parallel_samples {max_cores_real} (non-logical). " + f"This might decrease performance. " + ) + + +def make_configs( + config_file: Optional[Path], + log_port: Optional[int] = None, + log_path: Optional[str] = None, + force: bool = False, +) -> Configs: + """Create an instance of Configs from a config file + + Parameters + ---------- + config_file + The config file to load + log_port + Optional log port, by default None + log_path + Optional log path, by default None + force + Whether or not the computations are force, by default False + + Returns + ------- + An instance of Configs + + Raises + ------ + typer.Abort + If not a proper config file + """ + + if config_file is None: + config_file = Path("config.yaml") + + if not config_file.exists(): + logger.error("Error! Please select a proper config file!") + raise typer.Abort() + + logger.info(f"Using {config_file} as config file.") + with open(config_file, "r") as file: + d = yaml.safe_load(file) + + d = update_old_config(d) + + d["log_port"] = log_port + d["log_path"] = log_path + + d.setdefault("forward_only", False) + d.setdefault("cores_per_sample", 1) + d.setdefault("damage_mode", "lca") + d["force"] = force + + paths = ["names", "nodes", "acc2tax", "output_dir", "config_file"] + for path in paths: + d[path] = Path(d[path]) + for key, val in d["samples"].items(): + d["samples"][key] = Path(val) + + for key, val in d.items(): + if isinstance(val, str): + if val.isdigit(): + d[key] = int(key) + + d["custom_database"] = 0 if d["custom_database"] else 1 + + return Configs(d) + + +#%% + + +def update_old_config(d: dict) -> dict: if "version" in d: # new version, not changing anything return d + logger.warning( + "Using an old version of the config file. Please remake the config file." + ) + d_old2new = { "metaDMG-lca": "metaDMG_cpp", "minmapq": "min_mapping_quality", @@ -347,12 +526,9 @@ def get_results_dir( if config_file is None: config_file = Path("config.yaml") - with open(config_file, "r") as file: - d = yaml.safe_load(file) + configs = make_configs(config_file) - d = _update_old_config(d) - - return Path(d["output_dir"]) / "results" + return configs["output_dir"] / "results" #%% @@ -416,3 +592,35 @@ def append_fit_predictions(df_results): #%% + + +def run_PMD(alignment_file: Path, txt_out: Path): + """Run the PMD command from metaDMG-cpp and output the result to the gzipped txt_out + + Parameters + ---------- + alignment_file + Alignment file to compute the PMD scores on + txt_out + The (gzipped) output txt file + """ + + import gzip + import shlex + import subprocess + + txt_out.parent.mkdir(parents=True, exist_ok=True) + + with gzip.open(f"{txt_out}", "wt") as zip_out: + + cpp = subprocess.Popen( + shlex.split(f"./metaDMG-cpp pmd {alignment_file}"), + stdout=subprocess.PIPE, + ) + + zip = subprocess.Popen( + ["gzip"], + stdin=cpp.stdout, + stdout=zip_out, + ) + zip.communicate()