diff --git a/birdman/default_models.py b/birdman/default_models.py index 8fc5819..bcc1f51 100644 --- a/birdman/default_models.py +++ b/birdman/default_models.py @@ -1,9 +1,7 @@ import os from pkg_resources import resource_filename -import arviz as az import biom -import dask_jobqueue import numpy as np import pandas as pd @@ -108,51 +106,26 @@ def __init__( } self.add_parameters(param_dict) - def to_inference_object( - self, - dask_cluster: dask_jobqueue.JobQueueCluster = None, - jobs: int = 4 - ) -> az.InferenceData: - """Convert fitted Stan model into ``arviz`` InferenceData object. - - :param dask_cluster: Dask jobqueue to run parallel jobs (optional) - :type dask_cluster: dask_jobqueue.JobQueueCluster, optional - - :param jobs: Number of jobs to run in parallel, defaults to 4 - :type jobs: int - - :returns: ``arviz`` InferenceData object with selected values - :rtype: az.InferenceData - """ - dims = { - "beta": ["covariate", "feature"], - "phi": ["feature"], - "log_lhood": ["tbl_sample", "feature"], - "y_predict": ["tbl_sample", "feature"] - } - coords = { - "covariate": self.colnames, - "feature": self.feature_names, - "tbl_sample": self.sample_names - } - - # TODO: May want to allow not passing PP/LL/OD in the future - args = dict() - if self.parallelize_across == "chains": - args["alr_params"] = ["beta"] - else: - args["dask_cluster"] = dask_cluster - args["jobs"] = jobs - inf = super().to_inference_object( + self.specify_model( params=["beta", "phi"], - dims=dims, - coords=coords, - posterior_predictive="y_predict", - log_likelihood="log_lhood", + dims={ + "beta": ["covariate", "feature"], + "phi": ["feature"], + "log_lhood": ["tbl_sample", "feature"], + "y_predict": ["tbl_sample", "feature"] + }, + coords={ + "covariate": self.colnames, + "feature": self.feature_names, + "tbl_sample": self.sample_names + }, include_observed_data=True, - **args + posterior_predictive="y_predict", + log_likelihood="log_lhood" ) - return inf + + if self.parallelize_across == "chains": + self.specifications["alr_params"] = ["beta"] class NegativeBinomialLME(RegressionModel): @@ -259,37 +232,28 @@ def __init__( } self.add_parameters(param_dict) - def to_inference_object(self) -> az.InferenceData: - """Convert fitted Stan model into ``arviz`` InferenceData object. - - :returns: ``arviz`` InferenceData object with selected values - :rtype: az.InferenceData - """ - dims = { - "beta": ["covariate", "feature"], - "phi": ["feature"], - "subj_int": ["group", "feature"], - "log_lhood": ["tbl_sample", "feature"], - "y_predict": ["tbl_sample", "feature"] - } - coords = { - "covariate": self.colnames, - "feature": self.feature_names, - "tbl_sample": self.sample_names, - "group": self.groups - } - - # TODO: May want to allow not passing PP/LL/OD in the future - inf = super().to_inference_object( - params=["beta", "phi"], - dims=dims, - coords=coords, - posterior_predictive="y_predict", - log_likelihood="log_lhood", - alr_params=["beta", "subj_int"], + self.specify_model( + params=["beta", "phi", "subj_int"], + dims={ + "beta": ["covariate", "feature"], + "phi": ["feature"], + "subj_int": ["group", "feature"], + "log_lhood": ["tbl_sample", "feature"], + "y_predict": ["tbl_sample", "feature"] + }, + coords={ + "covariate": self.colnames, + "feature": self.feature_names, + "tbl_sample": self.sample_names, + "group": self.groups + }, include_observed_data=True, + posterior_predictive="y_predict", + log_likelihood="log_lhood" ) - return inf + + if self.parallelize_across == "chains": + self.specifications["alr_params"] = ["beta", "subj_int"] class Multinomial(RegressionModel): @@ -363,31 +327,19 @@ def __init__( } self.add_parameters(param_dict) - def to_inference_object(self) -> az.InferenceData: - """Convert fitted Stan model into ``arviz`` InferenceData object. - - :returns: ``arviz`` InferenceData object with selected values - :rtype: az.InferenceData - """ - dims = { - "beta": ["covariate", "feature"], - "log_lhood": ["tbl_sample"], - "y_predict": ["tbl_sample", "feature"] - } - coords = { - "covariate": self.colnames, - "feature": self.feature_names, - "tbl_sample": self.sample_names - } - - # TODO: May want to allow not passing PP/LL/OD in the future - inf = super().to_inference_object( - params=["beta", "phi"], - dims=dims, - coords=coords, - alr_params=["beta"], - posterior_predictive="y_predict", - log_likelihood="log_lhood", + self.specify_model( + params=["beta"], + dims={ + "beta": ["covariate", "feature"], + "log_lhood": ["tbl_sample"], + "y_predict": ["tbl_sample", "feature"] + }, + coords={ + "covariate": self.colnames, + "feature": self.feature_names, + "tbl_sample": self.sample_names, + }, include_observed_data=True, + posterior_predictive="y_predict", + log_likelihood="log_lhood" ) - return inf diff --git a/birdman/model_base.py b/birdman/model_base.py index 62ee34d..8a270cf 100644 --- a/birdman/model_base.py +++ b/birdman/model_base.py @@ -1,15 +1,16 @@ -from typing import List, Sequence +from typing import List, Sequence, Union import warnings import arviz as az import biom from cmdstanpy import CmdStanModel, CmdStanMCMC import dask -import dask_jobqueue +import numpy as np import pandas as pd from patsy import dmatrix -from .model_util import single_fit_to_inference, multiple_fits_to_inference +from .model_util import (single_fit_to_inference, multiple_fits_to_inference, + _single_feature_to_inf, concatenate_inferences) class BaseModel: @@ -73,19 +74,74 @@ def __init__( "N": table.shape[1], # number of samples } + self.specifications = dict() + def compile_model(self) -> None: """Compile Stan model.""" self.sm = CmdStanModel(stan_file=self.model_path) - def add_parameters(self, param_dict=None) -> None: + def specify_model( + self, + params: Sequence[str], + coords: dict, + dims: dict, + concatenation_name: str = "feature", + alr_params: Sequence[str] = None, + include_observed_data: bool = False, + posterior_predictive: str = None, + log_likelihood: str = None, + ) -> None: + """Specify coordinates and dimensions of model. + + :param params: Posterior fitted parameters to include + :type params: Sequence[str] + + :param coords: Mapping of entries in dims to labels + :type coords: dict + + :param dims: Dimensions of parameters in the model + :type dims: dict + + :param concatenation_name: Name to aggregate features when combining + multiple fits, defaults to 'feature' + :type concatentation_name: str + + :param alr_params: Parameters to convert from ALR to CLR (this will + be ignored if the model has been parallelized across features) + :type alr_params: Sequence[str], optional + + :param include_observed_data: Whether to include the original feature + table values into the ``arviz`` InferenceData object, default is + False + :type include_observed_data: bool + + :param posterior_predictive: Name of posterior predictive values from + Stan model to include in ``arviz`` InferenceData object + :type posterior_predictive: str, optional + + :param log_likelihood: Name of log likelihood values from Stan model + to include in ``arviz`` InferenceData object + :type log_likelihood: str, optional + """ + self.specifications["params"] = params + self.specifications["coords"] = coords + self.specifications["dims"] = dims + self.specifications["alr_params"] = alr_params + self.specifications["concatenation_name"] = concatenation_name + self.specifications["include_observed_data"] = include_observed_data + self.specifications["posterior_predictive"] = posterior_predictive + self.specifications["log_likelihood"] = log_likelihood + + def add_parameters(self, param_dict: dict = None) -> None: """Add parameters from dict to be passed to Stan.""" + if param_dict is None: + param_dict = dict() self.dat.update(param_dict) def fit_model( self, sampler_args: dict = None, - dask_cluster: dask_jobqueue.JobQueueCluster = None, - jobs: int = 4, + convert_to_inference: bool = False, ) -> None: """Fit model according to parallelization configuration. @@ -93,28 +149,37 @@ def fit_model( sampler (optional) :type sampler_args: dict - :param dask_cluster: Dask jobqueue to run parallel jobs if - parallelizing across features (optional) - :type dask_cluster: dask_jobqueue - - :param jobs: Number of jobs to run in parallel jobs if parallelizing - across features, defaults to 4 - :type jobs: int + :param convert_to_inference: Whether to automatically convert the + fitted model to an az.InferenceData object (defaults to False) + :type convert_to_inference: bool """ + if self.sm is None: + raise ValueError("Model must be compiled first!") + if self.parallelize_across == "features": - self.fit = self._fit_parallel(dask_cluster=dask_cluster, jobs=jobs, - sampler_args=sampler_args) + cn = self.specifications["concatenation_name"] + self.specifications["dims"] = { + k: [dim for dim in v if dim != cn] + for k, v in self.specifications["dims"].items() + + } + self._fit_parallel( + sampler_args=sampler_args, + convert_to_inference=convert_to_inference + ) elif self.parallelize_across == "chains": - if None not in [dask_cluster, jobs]: - warnings.warn( - "dask_cluster and jobs ignored when parallelizing" - " across chains." - ) - self.fit = self._fit_serial(sampler_args) + self._fit_serial( + sampler_args=sampler_args, + convert_to_inference=convert_to_inference + ) else: raise ValueError("parallelize_across must be features or chains!") - def _fit_serial(self, sampler_args: dict = None) -> CmdStanMCMC: + def _fit_serial( + self, + sampler_args: dict = None, + convert_to_inference: bool = False, + ) -> CmdStanMCMC: """Fit model by parallelizing across chains. :param sampler_args: Additional parameters to pass to CmdStanPy @@ -124,7 +189,7 @@ def _fit_serial(self, sampler_args: dict = None) -> CmdStanMCMC: if sampler_args is None: sampler_args = dict() - fit = self.sm.sample( + _fit = self.sm.sample( chains=self.chains, parallel_chains=self.chains, # run all chains in parallel data=self.dat, @@ -133,21 +198,31 @@ def _fit_serial(self, sampler_args: dict = None) -> CmdStanMCMC: seed=self.seed, **sampler_args ) - return fit + if convert_to_inference: + _fit = single_fit_to_inference( + fit=_fit, + params=self.specifications.get("params"), + coords=self.specifications.get("coords"), + dims=self.specifications.get("dims"), + alr_params=self.specifications.get("alr_params"), + posterior_predictive=self.specifications.get( + "posterior_predictive" + ), + log_likelihood=self.specifications.get("log_likelihood") + ) + self.fit = _fit def _fit_parallel( self, - dask_cluster: dask_jobqueue.JobQueueCluster = None, - jobs: int = 4, + convert_to_inference: bool = False, sampler_args: dict = None, - ) -> List[CmdStanMCMC]: + ) -> Union[List[CmdStanMCMC], List[az.InferenceData]]: """Fit model by parallelizing across features. - :param dask_cluster: Dask jobqueue to run parallel jobs (optional) - :type dask_cluster: dask_jobqueue - - :param jobs: Number of jobs to run parallel in parallel, defaults to 4 - :type jobs: int + :param convert_to_inference: Whether to create individual + InferenceData objects for individual feature fits, defaults to + False + :type convert_to_inference: bool :param sampler_args: Additional parameters to pass to CmdStanPy sampler (optional) @@ -156,21 +231,27 @@ def _fit_parallel( if sampler_args is None: sampler_args = dict() - if dask_cluster is not None: - dask_cluster.scale(jobs=jobs) - _fits = [] for v, i, d in self.table.iter(axis="observation"): - _fit = dask.delayed(self._fit_single)(v, sampler_args) + _fit = dask.delayed(self._fit_single)( + v, + sampler_args, + convert_to_inference, + ) _fits.append(_fit) - futures = dask.persist(*_fits) - all_fits = dask.compute(futures)[0] + fit_futures = dask.persist(*_fits) + all_fits = dask.compute(fit_futures)[0] # Set data back to full table self.dat["y"] = self.table.matrix_data.todense().T.astype(int) - return all_fits + self.fit = all_fits - def _fit_single(self, values, sampler_args): + def _fit_single( + self, + values: np.ndarray, + sampler_args: dict = None, + convert_to_inference: bool = False, + ) -> Union[CmdStanMCMC, az.InferenceData]: dat = self.dat dat["y"] = values.astype(int) _fit = self.sm.sample( @@ -181,75 +262,40 @@ def _fit_single(self, values, sampler_args): seed=self.seed, **sampler_args ) + + if convert_to_inference: + all_vars = _fit.stan_variables().keys() + vars_to_drop = set(all_vars).difference( + self.specifications["params"] + ) + if self.specifications.get("posterior_predictive") is not None: + vars_to_drop.remove( + self.specifications["posterior_predictive"] + ) + if self.specifications.get("log_likelihood") is not None: + vars_to_drop.remove(self.specifications["log_likelihood"]) + + _fit = _single_feature_to_inf( + fit=_fit, + coords=self.specifications.get("coords"), + dims=self.specifications.get("dims"), + vars_to_drop=vars_to_drop, + posterior_predictive=self.specifications.get( + "posterior_predictive" + ), + log_likelihood=self.specifications.get("log_likelihood") + ) return _fit def to_inference_object( self, - params: Sequence[str], - coords: dict, - dims: dict, - concatenation_name: str = "feature", - alr_params: Sequence[str] = None, - include_observed_data: bool = False, - posterior_predictive: str = None, - log_likelihood: str = None, - dask_cluster: dask_jobqueue.JobQueueCluster = None, - jobs: int = 4 + combine_individual_fits: bool = True, ) -> az.InferenceData: """Convert fitted Stan model into ``arviz`` InferenceData object. - Example for a simple Negative Binomial model: - - .. code-block:: python - - inf_obj = model.to_inference_object( - params=['beta', 'phi'], - coords={ - 'feature': model.feature_names, - 'covariate': model.colnames - }, - dims={ - 'beta': ['covariate', 'feature'], - 'phi': ['feature'] - }, - alr_params=['beta'] - ) - - :param params: Posterior fitted parameters to include - :type params: Sequence[str] - - :param coords: Mapping of entries in dims to labels - :type coords: dict - - :param dims: Dimensions of parameters in the model - :type dims: dict - - :param concatenation_name: Name to aggregate features when combining - multiple fits, defaults to 'feature' - :type concatentation_name: str - - :param alr_params: Parameters to convert from ALR to CLR (this will - be ignored if the model has been parallelized across features) - :type alr_params: Sequence[str], optional - - :param include_observed_data: Whether to include the original feature - table values into the ``arviz`` InferenceData object, default is - False - :type include_observed_data: bool - - :param posterior_predictive: Name of posterior predictive values from - Stan model to include in ``arviz`` InferenceData object - :type posterior_predictive: str, optional - - :param log_likelihood: Name of log likelihood values from Stan model - to include in ``arviz`` InferenceData object - :type log_likelihood: str, optional - - :param dask_cluster: Dask jobqueue to run parallel jobs (optional) - :type dask_cluster: dask_jobqueue - - :param jobs: Number of jobs to run in parallel, defaults to 4 - :type jobs: int + :param combine_individual_fits: Whether to combine the results of + parallelized feature fits, defaults to True + :type combine_individual_fits: bool :returns: ``arviz`` InferenceData object with selected values :rtype: az.InferenceData @@ -257,40 +303,69 @@ def to_inference_object( if self.fit is None: raise ValueError("Model has not been fit!") + # if already Inference, just return + if isinstance(self.fit, az.InferenceData): + return self.fit + # if sequence of Inferences, concatenate if specified + if isinstance(self.fit, list) or isinstance(self.fit, tuple): + if isinstance(self.fit[0], az.InferenceData): + if combine_individual_fits: + cat_name = self.specifications["concatenation_name"] + return concatenate_inferences( + self.fit, + coords=self.specifications["coords"], + concatenation_name=cat_name + ) + else: + return self.fit + args = { - "params": params, - "coords": coords, - "dims": dims, - "posterior_predictive": posterior_predictive, - "log_likelihood": log_likelihood, + k: self.specifications.get(k) + for k in ["params", "coords", "dims", "posterior_predictive", + "log_likelihood"] } if isinstance(self.fit, CmdStanMCMC): fit_to_inference = single_fit_to_inference - args["alr_params"] = alr_params + args["alr_params"] = self.specifications["alr_params"] elif isinstance(self.fit, Sequence): fit_to_inference = multiple_fits_to_inference - args["concatenation_name"] = concatenation_name - args["dask_cluster"] = dask_cluster - args["jobs"] = jobs + if combine_individual_fits: + args["concatenation_name"] = self.specifications.get( + "concatenation_name", "feature" + ) + args["concatenate"] = True + else: + args["concatenate"] = False # TODO: Check that dims and concatenation_match - if alr_params is not None: + if self.specifications.get("alr_params") is not None: warnings.warn("ALR to CLR not performed on parallel models.", UserWarning) else: raise ValueError("Unrecognized fit type!") inference = fit_to_inference(self.fit, **args) - if include_observed_data: - obs = az.from_dict( - observed_data={"observed": self.dat["y"]}, - coords={ - "tbl_sample": self.sample_names, - "feature": self.feature_names - }, - dims={"observed": ["tbl_sample", "feature"]} + if self.specifications["include_observed_data"]: + # Can't include observed data in individual fits + include_obs_fail = ( + not combine_individual_fits + and self.parallelize_across == "features" ) - inference = az.concat(inference, obs) + if include_obs_fail: + warnings.warn( + "Cannot include observed data in un-concatenated" + "fits!" + ) + else: + obs = az.from_dict( + observed_data={"observed": self.dat["y"]}, + coords={ + "tbl_sample": self.sample_names, + "feature": self.feature_names + }, + dims={"observed": ["tbl_sample", "feature"]} + ) + inference = az.concat(inference, obs) return inference def diagnose(self): diff --git a/birdman/model_util.py b/birdman/model_util.py index f3eb64a..f4075de 100644 --- a/birdman/model_util.py +++ b/birdman/model_util.py @@ -1,10 +1,9 @@ import re -from typing import Sequence +from typing import List, Sequence, Union import arviz as az from cmdstanpy import CmdStanMCMC import dask -import dask_jobqueue import numpy as np import xarray as xr @@ -58,7 +57,7 @@ def single_fit_to_inference( raise KeyError("Must include dimensions for posterior predictive!") inference = az.from_cmdstanpy( - posterior=fit, + fit, coords=coords, log_likelihood=log_likelihood, posterior_predictive=posterior_predictive, @@ -111,12 +110,11 @@ def multiple_fits_to_inference( params: Sequence[str], coords: dict, dims: dict, - concatenation_name: str, + concatenate: bool = True, + concatenation_name: str = "feature", posterior_predictive: str = None, log_likelihood: str = None, - dask_cluster: dask_jobqueue.JobQueueCluster = None, - jobs: int = 4 -) -> az.InferenceData: +) -> Union[az.InferenceData, List[az.InferenceData]]: """Save fitted parameters to xarray DataSet for multiple fits. :param fits: Fitted models for each feature @@ -131,6 +129,10 @@ def multiple_fits_to_inference( :param dims: Dimensions of parameters in the model :type dims: dict + :param concatenate: Whether to concatenate all fits together, defaults to + True + :type concatenate: bool + :param_concatenation_name: Name to aggregate features when combining multiple fits, defaults to 'feature' :type concatentation_name: str, optional @@ -143,28 +145,16 @@ def multiple_fits_to_inference( to include in ``arviz`` InferenceData object :type log_likelihood: str, optional - :param dask_cluster: Dask jobqueue to run parallel jobs (optional) - :type dask_cluster: dask_jobqueue.JobQueueCluster, optional - - :param jobs: Number of jobs to run in parallel, defaults to 4 - :type jobs: int - :returns: ``arviz`` InferenceData object with selected values :rtype: az.InferenceData """ - if dask_cluster is not None: - dask_cluster.scale(jobs=jobs) - - # Remove the concatenation dimension from each individual fit + # Remove the concatenation dimension from each individual fit if + # necessary new_dims = { k: [dim for dim in v if dim != concatenation_name] for k, v in dims.items() } - # If dims are unchanged it means the concatenation_name was not found - if new_dims == dims: - raise ValueError("concatenation_name must match dimensions in dims") - # Remove the unnecessary posterior dimensions all_vars = fits[0].stan_variables().keys() vars_to_drop = set(all_vars).difference(params) @@ -175,7 +165,7 @@ def multiple_fits_to_inference( inf_list = [] for fit in fits: - single_feat_inf = _single_feature_to_inf( + single_feat_inf = dask.delayed(_single_feature_to_inf)( fit=fit, coords=coords, dims=new_dims, @@ -185,25 +175,88 @@ def multiple_fits_to_inference( ) inf_list.append(single_feat_inf) + inf_list = dask.compute(*inf_list) + if not concatenate: # Return list of individual InferenceData objects + return inf_list + else: + return concatenate_inferences(inf_list, coords, concatenation_name) + + +def _single_feature_to_inf( + fit: CmdStanMCMC, + coords: dict, + dims: dict, + vars_to_drop: Sequence[str], + posterior_predictive: str = None, + log_likelihood: str = None, +) -> az.InferenceData: + """Convert single feature fit to InferenceData. + + :param fit: Single feature fit with CmdStanPy + :type fit: cmdstanpy.CmdStanMCMC + + :param coords: Coordinates to use for annotating Inference dims + :type coords: dict + + :param dims: Dimensions of parameters in fitted model + :type dims: dict + + :param posterior_predictive: Name of variable holding PP values + :type posterior_predictive: str + + :param log_likelihood: Name of variable holding LL values + :type log_likelihood: str + + :returns: InferenceData object of single feature + :rtype: az.InferenceData + """ + feat_inf = az.from_cmdstanpy( + posterior=fit, + posterior_predictive=posterior_predictive, + log_likelihood=log_likelihood, + coords=coords, + dims=dims + ) + feat_inf.posterior = _drop_data(feat_inf.posterior, vars_to_drop) + return feat_inf + + +def concatenate_inferences( + inf_list: List[az.InferenceData], + coords: dict, + concatenation_name: str = "feature" +) -> az.InferenceData: + """Concatenates multiple single feature fits into one object. + + :param inf_list: List of InferenceData objects for each feature + :type inf_list: List[az.InferenceData] + + :param coords: Coordinates containing concatenation name labels + :type coords: dict + + :param concatenation_name: Name of feature dimension used when + concatenating, defaults to "feature" + :type concatenation_name: str + + :returns: Combined InferenceData object + :rtype: az.InferenceData + """ group_list = [] - group_list.append(dask.persist(*[x.posterior for x in inf_list])) - group_list.append(dask.persist(*[x.sample_stats for x in inf_list])) - if log_likelihood is not None: - group_list.append(dask.persist(*[x.log_likelihood for x in inf_list])) - if posterior_predictive is not None: - group_list.append( - dask.persist(*[x.posterior_predictive for x in inf_list]) - ) + group_list.append([x.posterior for x in inf_list]) + group_list.append([x.sample_stats for x in inf_list]) + if "log_likelihood" in inf_list[0].groups(): + group_list.append([x.log_likelihood for x in inf_list]) + if "posterior_predictive" in inf_list[0].groups(): + group_list.append([x.posterior_predictive for x in inf_list]) - group_list = dask.compute(*group_list) po_ds = xr.concat(group_list[0], concatenation_name) ss_ds = xr.concat(group_list[1], concatenation_name) group_dict = {"posterior": po_ds, "sample_stats": ss_ds} - if log_likelihood is not None: + if "log_likelihood" in inf_list[0].groups(): ll_ds = xr.concat(group_list[2], concatenation_name) group_dict["log_likelihood"] = ll_ds - if posterior_predictive is not None: + if "posterior_predictive" in inf_list[0].groups(): pp_ds = xr.concat(group_list[3], concatenation_name) group_dict["posterior_predictive"] = pp_ds @@ -220,27 +273,6 @@ def multiple_fits_to_inference( return az.concat(*all_group_inferences) -@dask.delayed -def _single_feature_to_inf( - fit: CmdStanMCMC, - coords: dict, - dims: dict, - vars_to_drop: Sequence[str], - posterior_predictive: str = None, - log_likelihood: str = None, -) -> az.InferenceData: - """Convert single feature fit to InferenceData.""" - feat_inf = az.from_cmdstanpy( - posterior=fit, - posterior_predictive=posterior_predictive, - log_likelihood=log_likelihood, - coords=coords, - dims=dims - ) - feat_inf.posterior = _drop_data(feat_inf.posterior, vars_to_drop) - return feat_inf - - def _drop_data( dataset: xr.Dataset, vars_to_drop: Sequence[str], diff --git a/docs/custom_model.rst b/docs/custom_model.rst index 995799c..5deb347 100644 --- a/docs/custom_model.rst +++ b/docs/custom_model.rst @@ -14,8 +14,8 @@ First we will download the BIOM table and metadata from Qiita. .. code-block:: bash - wget -O data.zip "https://qiita.ucsd.edu/public_artifact_download/?artifact_id=44773" - wget -O metadata.zip "https://qiita.ucsd.edu/public_download/?data=sample_information&study_id=107" + wget -O data.zip "https://qiita.ucsd.edu/public_artifact_download/?artifact_id=94270" + wget -O metadata.zip "https://qiita.ucsd.edu/public_download/?data=sample_information&study_id=11913" unzip data.zip unzip metadata.zip @@ -265,20 +265,6 @@ Now we can add all the necessary parameters to BIRDMAn with the ``add_parameters } nb_lme.add_parameters(param_dict) -Finally, we compile and fit the model. - -.. note:: - - Fitting this model took approximately 6 minutes on my laptop. - -.. code-block:: python - - nb_lme.compile_model() - nb_lme.fit_model() - -Converting to ``InferenceData`` -------------------------------- - With a custom model there is a bit more legwork involved in converting to the ``arviz.InferenceData`` data structure. We will step through each of the parameters in this example. * ``params``: List of parameters you want to include in the posterior draws (must match Stan code). @@ -289,9 +275,11 @@ With a custom model there is a bit more legwork involved in converting to the `` * ``log_likelihood``: Name of variable holding log likelihood values (optional). * ``include_observed_data``: Whether to include the original feature table as a group. This is useful for certain diagnostics. +We pass all these arguments into the ``specify_model`` method of the ``Model`` object. + .. code-block:: python - inference = nb_lme.to_inference_object( + nb_lme.specify_model( params=["beta", "phi", "subj_int"], dims={ "beta": ["covariate", "feature"], @@ -312,4 +300,24 @@ With a custom model there is a bit more legwork involved in converting to the `` include_observed_data=True ) +Finally, we compile and fit the model. + +.. note:: + + Fitting this model took approximately 6 minutes on my laptop. + +.. code-block:: python + + nb_lme.compile_model() + nb_lme.fit_model() + +Converting to ``InferenceData`` +------------------------------- + +When the model has finished fitting, you can convert to an inference data assuming you have specified your model previously. + +.. code-block:: python + + inference = nb_lme.to_inference_object() + With this you can use the rest of the BIRDMAn suite as usual or directly interact with the ``arviz`` library! diff --git a/docs/parallelization.rst b/docs/parallelization.rst index 14b3cfc..18fea07 100644 --- a/docs/parallelization.rst +++ b/docs/parallelization.rst @@ -16,6 +16,7 @@ In this example we will assume that we have access to a SLURM cluster on which w import biom from birdman import NegativeBinomial from dask_jobqueue import SLURMCluster + from dask.distributed import Client import pandas as pd table = biom.load_table("example.biom") @@ -28,6 +29,9 @@ In this example we will assume that we have access to a SLURM cluster on which w walltime='01:00:00', local_directory='/scratch' ) + cluster.scale(jobs=8) # 8 microbes at a time + + client = Client(cluster) nb = NegativeBinomial( table=table, @@ -38,12 +42,12 @@ In this example we will assume that we have access to a SLURM cluster on which w parallelize_across="features" # note this argument! ) nb.compile_model() - nb.fit_model(dask_cluster=cluster, jobs=8) # run 8 microbes at a time + nb.fit_model() -This code will run 8 microbes in parallel which will be faster than running the 4 chains in parallel. You can scale this up further depending on your computational resources by increasing the ``jobs`` argument. +This code will run 8 microbes in parallel which will be faster than running the 4 chains in parallel. -When fitting in parallel, CmdStanPy fits a model for each feature in your table. Converting to an ``InferenceData`` object is then just a matter of combining all the individual fits. We can also parallelize this process to speed-up runtime. +When fitting in parallel, CmdStanPy fits a model for each feature in your table. Converting to an ``InferenceData`` object is then just a matter of combining all the individual fits. .. code-block:: python - nb.to_inference_object(dask_cluster=cluster, jobs=8) + nb.to_inference_object(combine_individual_fits=True) diff --git a/environment.yml b/environment.yml index 210ed77..90b231c 100644 --- a/environment.yml +++ b/environment.yml @@ -11,7 +11,6 @@ dependencies: - xarray - patsy - dask - - dask-jobqueue - biom-format - arviz - docutils==0.16 diff --git a/setup.py b/setup.py index 07d72bd..42c4b88 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,7 @@ # Inspired by the EMPress setup.py file from setuptools import find_packages, setup -__version__ = "0.0.3dev" +__version__ = "0.0.3" classifiers = """ Development Status :: 3 - Alpha @@ -39,7 +39,6 @@ "numpy", "cmdstanpy", "dask[complete]", - "dask-jobqueue", "biom-format", "patsy", "xarray", diff --git a/tests/conftest.py b/tests/conftest.py index 1ee0761..b994e53 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,7 +10,6 @@ TEST_DATA = resource_filename("tests", "data") TBL_FILE = os.path.join(TEST_DATA, "macaque_tbl.biom") MD_FILE = os.path.join(TEST_DATA, "macaque_metadata.tsv") -print(TBL_FILE) def example_biom(): @@ -91,5 +90,5 @@ def example_inf(): @pytest.fixture(scope="session") def example_parallel_inf(): nb = parallel_model() - inference = nb.to_inference_object() + inference = nb.to_inference_object(combine_individual_fits=True) return inference diff --git a/tests/test_custom_model.py b/tests/test_custom_model.py index a1cccb1..643e251 100644 --- a/tests/test_custom_model.py +++ b/tests/test_custom_model.py @@ -26,10 +26,7 @@ def test_custom_model(table_biom, metadata): "depth": np.log(custom_model.table.sum(axis="sample")), } ) - custom_model.compile_model() - custom_model.fit_model() - - inference = custom_model.to_inference_object( + custom_model.specify_model( params=["beta_var"], coords={ "feature": custom_model.feature_names, @@ -41,6 +38,9 @@ def test_custom_model(table_biom, metadata): }, alr_params=["beta_var"] ) + custom_model.compile_model() + custom_model.fit_model() + inference = custom_model.to_inference_object() assert set(inference.groups()) == {"posterior", "sample_stats"} ds = inference.posterior diff --git a/tests/test_diagnostics.py b/tests/test_diagnostics.py index a486b0f..402dbef 100644 --- a/tests/test_diagnostics.py +++ b/tests/test_diagnostics.py @@ -17,6 +17,5 @@ def test_r2_score(example_inf, example_parallel_inf): def test_loo(example_inf, example_parallel_inf): - print(example_inf) diag.loo(example_inf) diag.loo(example_parallel_inf) diff --git a/tests/test_model.py b/tests/test_model.py index 93a39d5..a5923df 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -1,6 +1,7 @@ import os from pkg_resources import resource_filename +from arviz import InferenceData import numpy as np from birdman import Multinomial, NegativeBinomial, NegativeBinomialLME @@ -85,3 +86,26 @@ def test_parallel_to_inference(self, example_parallel_model): target_groups = {"posterior", "sample_stats", "log_likelihood", "posterior_predictive", "observed_data"} assert set(inference_data.groups()) == target_groups + + def test_parallel_to_inference_no_concat(self, example_parallel_model): + inf = example_parallel_model.to_inference_object( + combine_individual_fits=False + ) + assert len(inf) == 28 + + def test_parallel_auto_inf(self, table_biom, metadata): + nb = NegativeBinomial( + table=table_biom, + formula="host_common_name", + metadata=metadata, + chains=4, + num_iter=100, + seed=42, + beta_prior=2.0, + cauchy_scale=2.0, + parallelize_across="features" + ) + nb.compile_model() + nb.fit_model(convert_to_inference=True) + assert len(nb.fit) == 28 + assert isinstance(nb.fit[0], InferenceData) diff --git a/tests/test_model_util.py b/tests/test_model_util.py index d30cca1..635e9fd 100644 --- a/tests/test_model_util.py +++ b/tests/test_model_util.py @@ -61,6 +61,27 @@ def test_parallel_to_inference(self, example_parallel_model): ) self.dataset_comparison(example_parallel_model, inf.posterior) + def test_parallel_to_inference_no_concat(self, example_parallel_model): + inf = mu.multiple_fits_to_inference( + fits=example_parallel_model.fit, + params=["beta", "phi"], + coords={ + "feature": example_parallel_model.feature_names, + "covariate": example_parallel_model.colnames + }, + dims={ + "beta": ["covariate", "feature"], + "phi": ["feature"] + }, + concatenate=False, + log_likelihood="log_lhood", + posterior_predictive="y_predict", + ) + assert len(inf) == 28 + exp_groups = {"sample_stats", "posterior", "log_likelihood", + "posterior_predictive"} + assert set(inf[0].groups()) == exp_groups + def test_parallel_to_inference_wrong_concat(self, example_parallel_model): with pytest.raises(ValueError) as excinfo: mu.multiple_fits_to_inference( @@ -76,8 +97,8 @@ def test_parallel_to_inference_wrong_concat(self, example_parallel_model): }, concatenation_name="mewtwo", ) - assert str(excinfo.value) == ("concatenation_name must match " - "dimensions in dims") + assert str(excinfo.value) == ("different number of dimensions on " + "data and dims: 3 vs 4") # Posterior predictive & log likelihood