Skip to content

Commit

Permalink
feat: Add PMD command and include Configs in core
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristianMichelsen committed Apr 5, 2022
1 parent ecec382 commit 11eab19
Show file tree
Hide file tree
Showing 9 changed files with 254 additions and 215 deletions.
4 changes: 4 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
{
"cSpell.words": [
"editdistmax",
"editdistmin",
"fusobacteria",
"Lactobacillales",
"minmapq",
"ncbi",
"taxas"
]
}
6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ PyYAML = "^6.0"
pandas = "^1.4.1"
scipy = "^1.8.0"
pyarrow = "^7.0.0"
logger-tt = "^1.7.0"
psutil = "^5.9.0"


# scipy = { version = "^1.7.3", python = ">=3.9,<3.11" }

# Fit related packages: poetry install --extras fit
logger-tt = {version = "^1.7.0", optional = true}
iminuit = {version = "^2.10.0", optional = true}
numpyro = {version = "^0.9.1", optional = true}
joblib = {version = "^1.1.0", optional = true}
psutil = {version = "^5.9.0", optional = true}
numba = {version = "^0.55.1", optional = true}

# Viz related packages: poetry install --extras viz
Expand Down
30 changes: 22 additions & 8 deletions src/metaDMG/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,18 +270,12 @@ def compute(

utils.check_metaDMG_fit()

from metaDMG.fit import (
get_logger_port_and_path,
make_configs,
run_workflow,
setup_logger,
)
from metaDMG.fit import get_logger_port_and_path, run_workflow, setup_logger

log_port, log_path = get_logger_port_and_path()

setup_logger(log_port=log_port, log_path=log_path)

configs = make_configs(
configs = utils.make_configs(
config_file=config_file,
log_port=log_port,
log_path=log_path,
Expand Down Expand Up @@ -576,6 +570,26 @@ def mismatch_to_mapDamage(
mismatch_to_mapDamage.convert(filename=filename, csv_out=csv_out)


#%%


@cli_app.command("PMD")
def compute_PMD(
config_file: Path = typer.Argument(
...,
file_okay=True,
help="Path to the config-file.",
),
):
"""Compute the PMD scores for the samples in the config file."""

from metaDMG.utils import make_configs, run_PMD

configs = make_configs(config_file)
for config in configs:
run_PMD(config["bam"], config["path_pmd"])


#%%

# needed for sphinx documentation
Expand Down
1 change: 0 additions & 1 deletion src/metaDMG/fit/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
from metaDMG.fit.fit_utils import make_configs
from metaDMG.fit.workflow import run_workflow
from metaDMG.loggers.loggers import get_logger_port_and_path, setup_logger
189 changes: 0 additions & 189 deletions src/metaDMG/fit/fit_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import numpy as np
import pandas as pd
import psutil
import typer
import yaml
from iminuit import describe
Expand All @@ -15,8 +14,6 @@
from scipy.special import erf, erfinv
from scipy.stats import chi2 as sp_chi2

from metaDMG.utils import _update_old_config


#%%

Expand All @@ -30,192 +27,6 @@
#%%


class Config(dict):
"""Config contains the parameters related to specific alignment file."""

pass


class Configs(dict):
"""Configs contains the parameters related to config file.
Inherits from dict. Implements iterations.
"""

def __iter__(self) -> Iterator[Config]:
"""Iteration
Yields
------
Iterator[Config]
Allow for iteration
"""
dir_lca = self["output_dir"] / "lca"
samples = self["samples"].keys()
for sample in samples:
config = Config(self)
config["sample"] = sample
config["bam"] = config["samples"][sample]

config["path_mismatches_txt"] = dir_lca / f"{sample}.mismatches.txt.gz"

if config["damage_mode"] == "lca":
config["path_mismatches_stat"] = (
dir_lca / f"{sample}.mismatches.stat.txt.gz"
)
else:
config["path_mismatches_stat"] = dir_lca / f"{sample}.stat.txt"

config["path_lca"] = dir_lca / f"{sample}.lca.txt.gz"
config["path_lca_log"] = dir_lca / f"{sample}.log.txt"
config["path_tmp"] = config["output_dir"] / "tmp" / sample
yield config

def get_nth(self, n: int) -> Config:
"""Gets the n'th config
Parameters
----------
n
The index
Returns
-------
Config
A single configuration
"""
return next(islice(self, n, None))

def get_first(self) -> Config:
"""Get the first config
Returns
-------
Config
A single configuration
"""
return self.get_nth(n=0)

def __len__(self) -> int:
"""The number of configs
Returns
-------
int
The number of configs
"""
return len(self["samples"].keys())

def check_number_of_jobs(self) -> None:
"""Compare the number of configs to the number of parallel_samples used."""

parallel_samples = min(self["parallel_samples"], len(self["samples"]))
cores_per_sample = self["cores_per_sample"]
N_jobs = parallel_samples * cores_per_sample
max_cores = psutil.cpu_count(logical=True)
max_cores_real = psutil.cpu_count(logical=False)

if N_jobs > max_cores:
logger.warning(
f"The total number of jobs {N_jobs} are higher "
f"than the number of parallel_samples {max_cores}. "
f"Do not do this unless you know what you are doing. "
f"Try decreasing either 'parallel_samples' or 'parallel_samples-per-sample'."
)
elif N_jobs > max_cores_real:
logger.info(
f"The total number of jobs {N_jobs} are higher "
f"than the real number of parallel_samples {max_cores_real} (non-logical). "
f"This might decrease performance. "
)


def make_configs(
config_file: Optional[Path],
log_port: Optional[int] = None,
log_path: Optional[str] = None,
force: bool = False,
) -> Configs:
"""Create an instance of Configs from a config file
Parameters
----------
config_file
The config file to load
log_port
Optional log port, by default None
log_path
Optional log path, by default None
force
Whether or not the computations are force, by default False
Returns
-------
An instance of Configs
Raises
------
typer.Abort
If not a proper config file
"""

if config_file is None:
config_file = Path("config.yaml")

if not config_file.exists():
logger.error("Error! Please select a proper config file!")
raise typer.Abort()

logger.info(f"Using {config_file} as config file.")
with open(config_file, "r") as file:
d = yaml.safe_load(file)

d = update_old_config(d)

d["log_port"] = log_port
d["log_path"] = log_path

d.setdefault("forward_only", False)
d.setdefault("cores_per_sample", 1)
d.setdefault("damage_mode", "lca")
d["force"] = force

paths = ["names", "nodes", "acc2tax", "output_dir", "config_file"]
for path in paths:
d[path] = Path(d[path])
for key, val in d["samples"].items():
d["samples"][key] = Path(val)

for key, val in d.items():
if isinstance(val, str):
if val.isdigit():
d[key] = int(key)

d["custom_database"] = 0 if d["custom_database"] else 1

return Configs(d)


#%%


def update_old_config(d: dict) -> dict:
if "version" in d:
# new version, not changing anything
return d

# updating the old version

logger.warning(
"Using an old version of the config file. Please remake the config file."
)

d_new = _update_old_config(d)
return d_new


#%%


def downcast_dataframe(df, categories=None, fully_automatic=False):

if categories is None:
Expand Down
2 changes: 1 addition & 1 deletion src/metaDMG/fit/serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
metadamageError,
)
from metaDMG.fit import fits, mismatches, results
from metaDMG.fit.fit_utils import Config
from metaDMG.loggers.loggers import setup_logger
from metaDMG.utils import Config


#%%
Expand Down
11 changes: 6 additions & 5 deletions src/metaDMG/fit/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@

from logger_tt import logger

from metaDMG.fit import fit_utils, serial
from metaDMG.fit.serial import run_single_config
from metaDMG.utils import Configs


def run_workflow(configs: fit_utils.Configs):
def run_workflow(configs: Configs):

parallel_samples = min(configs["parallel_samples"], len(configs))

logger.info(f"Running metaDMG on {len(configs)} files in total.")

if parallel_samples == 1 or len(configs) == 1:
logger.info(f"Running in seriel (1 core)")
logger.info(f"Running in serial (1 core)")
for config in configs:
dfs = serial.run_single_config(config)
dfs = run_single_config(config)
# df_mismatches, df_fit_results, df_results = dfs

else:
Expand All @@ -25,6 +26,6 @@ def run_workflow(configs: fit_utils.Configs):

with Pool(max_workers=parallel_samples) as pool:
# for dfs in pool.imap_unordered(serial.run_single_config, configs):
for dfs in pool.map(serial.run_single_config, configs):
for dfs in pool.map(run_single_config, configs):
pass
# df_mismatches, df_fit_results, df_results = dfs
4 changes: 2 additions & 2 deletions src/metaDMG/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ def compute_config(

utils.check_metaDMG_fit()

from metaDMG.fit import make_configs, run_workflow
from metaDMG.fit import run_workflow

configs = make_configs(config_file=config_file, force=force)
configs = utils.make_configs(config_file=config_file, force=force)

run_workflow(configs)

Expand Down
Loading

0 comments on commit 11eab19

Please sign in to comment.