From c1d7af365a8d51795fff5fdba4ec77153186fdd3 Mon Sep 17 00:00:00 2001 From: Gibraan Rahman Date: Mon, 17 May 2021 10:43:40 -0700 Subject: [PATCH 01/12] Remove dask.delayed decorator from single feat fit --- birdman/model_util.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/birdman/model_util.py b/birdman/model_util.py index f3eb64a..5052400 100644 --- a/birdman/model_util.py +++ b/birdman/model_util.py @@ -175,7 +175,7 @@ def multiple_fits_to_inference( inf_list = [] for fit in fits: - single_feat_inf = _single_feature_to_inf( + single_feat_inf = dask.delayed(_single_feature_to_inf)( fit=fit, coords=coords, dims=new_dims, @@ -220,7 +220,6 @@ def multiple_fits_to_inference( return az.concat(*all_group_inferences) -@dask.delayed def _single_feature_to_inf( fit: CmdStanMCMC, coords: dict, From 06470ff5561117cc5cdb8c535a242dadfca45a86 Mon Sep 17 00:00:00 2001 From: Gibraan Rahman Date: Mon, 17 May 2021 12:22:18 -0700 Subject: [PATCH 02/12] Preliminary restructure of inference conversion Still need to check that non-concatenation works. --- birdman/default_models.py | 69 ++++------------ birdman/model_base.py | 158 ++++++++++++++++++++----------------- birdman/model_util.py | 80 ++++++++++--------- tests/conftest.py | 3 +- tests/test_custom_model.py | 8 +- tests/test_diagnostics.py | 1 - 6 files changed, 151 insertions(+), 168 deletions(-) diff --git a/birdman/default_models.py b/birdman/default_models.py index 8fc5819..132a2e9 100644 --- a/birdman/default_models.py +++ b/birdman/default_models.py @@ -3,7 +3,6 @@ import arviz as az import biom -import dask_jobqueue import numpy as np import pandas as pd @@ -108,51 +107,24 @@ def __init__( } self.add_parameters(param_dict) - def to_inference_object( - self, - dask_cluster: dask_jobqueue.JobQueueCluster = None, - jobs: int = 4 - ) -> az.InferenceData: - """Convert fitted Stan model into ``arviz`` InferenceData object. - - :param dask_cluster: Dask jobqueue to run parallel jobs (optional) - :type dask_cluster: dask_jobqueue.JobQueueCluster, optional - - :param jobs: Number of jobs to run in parallel, defaults to 4 - :type jobs: int - - :returns: ``arviz`` InferenceData object with selected values - :rtype: az.InferenceData - """ - dims = { + self.specifications["params"] = ["beta", "phi"] + self.specifications["dims"] = { "beta": ["covariate", "feature"], "phi": ["feature"], "log_lhood": ["tbl_sample", "feature"], "y_predict": ["tbl_sample", "feature"] } - coords = { + self.specifications["coords"] = { "covariate": self.colnames, "feature": self.feature_names, "tbl_sample": self.sample_names } + self.specifications["include_observed_data"] = True + self.specifications["posterior_predictive"] = "y_predict" + self.specifications["log_likelihood"] = "log_lhood" - # TODO: May want to allow not passing PP/LL/OD in the future - args = dict() if self.parallelize_across == "chains": - args["alr_params"] = ["beta"] - else: - args["dask_cluster"] = dask_cluster - args["jobs"] = jobs - inf = super().to_inference_object( - params=["beta", "phi"], - dims=dims, - coords=coords, - posterior_predictive="y_predict", - log_likelihood="log_lhood", - include_observed_data=True, - **args - ) - return inf + self.specifications["alr_params"] = ["beta"] class NegativeBinomialLME(RegressionModel): @@ -259,37 +231,26 @@ def __init__( } self.add_parameters(param_dict) - def to_inference_object(self) -> az.InferenceData: - """Convert fitted Stan model into ``arviz`` InferenceData object. - - :returns: ``arviz`` InferenceData object with selected values - :rtype: az.InferenceData - """ - dims = { + self.specifications["params"] = ["beta", "phi"] + self.specifications["dims"] = { "beta": ["covariate", "feature"], "phi": ["feature"], "subj_int": ["group", "feature"], "log_lhood": ["tbl_sample", "feature"], "y_predict": ["tbl_sample", "feature"] } - coords = { + self.specifications["coords"] = { "covariate": self.colnames, "feature": self.feature_names, "tbl_sample": self.sample_names, "group": self.groups } + self.specifications["include_observed_data"] = True + self.specifications["posterior_predictive"] = "y_predict" + self.specifications["log_likelihood"] = "log_lhood" - # TODO: May want to allow not passing PP/LL/OD in the future - inf = super().to_inference_object( - params=["beta", "phi"], - dims=dims, - coords=coords, - posterior_predictive="y_predict", - log_likelihood="log_lhood", - alr_params=["beta", "subj_int"], - include_observed_data=True, - ) - return inf + if self.parallelize_across == "chains": + self.specifications["alr_params"] = ["beta", "subj_int"] class Multinomial(RegressionModel): diff --git a/birdman/model_base.py b/birdman/model_base.py index 62ee34d..74d29a1 100644 --- a/birdman/model_base.py +++ b/birdman/model_base.py @@ -73,10 +73,64 @@ def __init__( "N": table.shape[1], # number of samples } + self.specifications = dict() + def compile_model(self) -> None: """Compile Stan model.""" self.sm = CmdStanModel(stan_file=self.model_path) + def specify_model( + self, + params: Sequence[str], + coords: dict, + dims: dict, + concatenation_name: str = "feature", + alr_params: Sequence[str] = None, + include_observed_data: bool = False, + posterior_predictive: str = None, + log_likelihood: str = None, + ) -> None: + """Specify coordinates and dimensions of model. + + :param params: Posterior fitted parameters to include + :type params: Sequence[str] + + :param coords: Mapping of entries in dims to labels + :type coords: dict + + :param dims: Dimensions of parameters in the model + :type dims: dict + + :param concatenation_name: Name to aggregate features when combining + multiple fits, defaults to 'feature' + :type concatentation_name: str + + :param alr_params: Parameters to convert from ALR to CLR (this will + be ignored if the model has been parallelized across features) + :type alr_params: Sequence[str], optional + + :param include_observed_data: Whether to include the original feature + table values into the ``arviz`` InferenceData object, default is + False + :type include_observed_data: bool + + :param posterior_predictive: Name of posterior predictive values from + Stan model to include in ``arviz`` InferenceData object + :type posterior_predictive: str, optional + + :param log_likelihood: Name of log likelihood values from Stan model + to include in ``arviz`` InferenceData object + :type log_likelihood: str, optional + """ + self.specifications["params"] = params + self.specifications["coords"] = coords + self.specifications["dims"] = dims + self.specifications["alr_params"] = alr_params + self.specifications["concatenation_name"] = concatenation_name + self.specifications["include_observed_data"] = include_observed_data + self.specifications["posterior_predictive"] = posterior_predictive + self.specifications["log_likelihood"] = log_likelihood + def add_parameters(self, param_dict=None) -> None: """Add parameters from dict to be passed to Stan.""" self.dat.update(param_dict) @@ -185,65 +239,14 @@ def _fit_single(self, values, sampler_args): def to_inference_object( self, - params: Sequence[str], - coords: dict, - dims: dict, - concatenation_name: str = "feature", - alr_params: Sequence[str] = None, - include_observed_data: bool = False, - posterior_predictive: str = None, - log_likelihood: str = None, + combine_individual_fits: bool = True, dask_cluster: dask_jobqueue.JobQueueCluster = None, jobs: int = 4 ) -> az.InferenceData: """Convert fitted Stan model into ``arviz`` InferenceData object. - Example for a simple Negative Binomial model: - - .. code-block:: python - - inf_obj = model.to_inference_object( - params=['beta', 'phi'], - coords={ - 'feature': model.feature_names, - 'covariate': model.colnames - }, - dims={ - 'beta': ['covariate', 'feature'], - 'phi': ['feature'] - }, - alr_params=['beta'] - ) - - :param params: Posterior fitted parameters to include - :type params: Sequence[str] - - :param coords: Mapping of entries in dims to labels - :type coords: dict - - :param dims: Dimensions of parameters in the model - :type dims: dict - - :param concatenation_name: Name to aggregate features when combining - multiple fits, defaults to 'feature' - :type concatentation_name: str - - :param alr_params: Parameters to convert from ALR to CLR (this will - be ignored if the model has been parallelized across features) - :type alr_params: Sequence[str], optional - - :param include_observed_data: Whether to include the original feature - table values into the ``arviz`` InferenceData object, default is - False - :type include_observed_data: bool - - :param posterior_predictive: Name of posterior predictive values from - Stan model to include in ``arviz`` InferenceData object - :type posterior_predictive: str, optional - - :param log_likelihood: Name of log likelihood values from Stan model - to include in ``arviz`` InferenceData object - :type log_likelihood: str, optional + :param combine_invididual_fits: Whether to combine the results of + parallelized feature fits, defaults to True :param dask_cluster: Dask jobqueue to run parallel jobs (optional) :type dask_cluster: dask_jobqueue @@ -258,39 +261,52 @@ def to_inference_object( raise ValueError("Model has not been fit!") args = { - "params": params, - "coords": coords, - "dims": dims, - "posterior_predictive": posterior_predictive, - "log_likelihood": log_likelihood, + k: self.specifications.get(k) + for k in ["params", "coords", "dims", "posterior_predictive", + "log_likelihood"] } if isinstance(self.fit, CmdStanMCMC): fit_to_inference = single_fit_to_inference - args["alr_params"] = alr_params + args["alr_params"] = self.specifications["alr_params"] elif isinstance(self.fit, Sequence): fit_to_inference = multiple_fits_to_inference - args["concatenation_name"] = concatenation_name + if combine_individual_fits: + args["concatenation_name"] = self.specifications.get( + "concatenation_name", "feature" + ) + args["concatenate"] = True args["dask_cluster"] = dask_cluster args["jobs"] = jobs # TODO: Check that dims and concatenation_match - if alr_params is not None: + if self.specifications.get("alr_params") is not None: warnings.warn("ALR to CLR not performed on parallel models.", UserWarning) else: raise ValueError("Unrecognized fit type!") inference = fit_to_inference(self.fit, **args) - if include_observed_data: - obs = az.from_dict( - observed_data={"observed": self.dat["y"]}, - coords={ - "tbl_sample": self.sample_names, - "feature": self.feature_names - }, - dims={"observed": ["tbl_sample", "feature"]} + if self.specifications["include_observed_data"]: + # Can't include observed data in individual fits + include_obs_fail = ( + not combine_individual_fits + and self.parallelize_across == "features" ) - inference = az.concat(inference, obs) + if include_obs_fail: + warnings.warn( + "Cannot include observed data in un-concatenated" + "fits!" + ) + else: + obs = az.from_dict( + observed_data={"observed": self.dat["y"]}, + coords={ + "tbl_sample": self.sample_names, + "feature": self.feature_names + }, + dims={"observed": ["tbl_sample", "feature"]} + ) + inference = az.concat(inference, obs) return inference def diagnose(self): diff --git a/birdman/model_util.py b/birdman/model_util.py index 5052400..a0f306d 100644 --- a/birdman/model_util.py +++ b/birdman/model_util.py @@ -1,5 +1,5 @@ import re -from typing import Sequence +from typing import List, Sequence, Union import arviz as az from cmdstanpy import CmdStanMCMC @@ -111,12 +111,13 @@ def multiple_fits_to_inference( params: Sequence[str], coords: dict, dims: dict, - concatenation_name: str, + concatenate: bool = True, + concatenation_name: str = "feature", posterior_predictive: str = None, log_likelihood: str = None, dask_cluster: dask_jobqueue.JobQueueCluster = None, jobs: int = 4 -) -> az.InferenceData: +) -> Union[az.InferenceData, List[az.InferenceData]]: """Save fitted parameters to xarray DataSet for multiple fits. :param fits: Fitted models for each feature @@ -131,6 +132,10 @@ def multiple_fits_to_inference( :param dims: Dimensions of parameters in the model :type dims: dict + :param concatenate: Whether to concatenate all fits together, defaults to + True + :type concatenate: bool + :param_concatenation_name: Name to aggregate features when combining multiple fits, defaults to 'feature' :type concatentation_name: str, optional @@ -143,18 +148,9 @@ def multiple_fits_to_inference( to include in ``arviz`` InferenceData object :type log_likelihood: str, optional - :param dask_cluster: Dask jobqueue to run parallel jobs (optional) - :type dask_cluster: dask_jobqueue.JobQueueCluster, optional - - :param jobs: Number of jobs to run in parallel, defaults to 4 - :type jobs: int - :returns: ``arviz`` InferenceData object with selected values :rtype: az.InferenceData """ - if dask_cluster is not None: - dask_cluster.scale(jobs=jobs) - # Remove the concatenation dimension from each individual fit new_dims = { k: [dim for dim in v if dim != concatenation_name] @@ -185,12 +181,44 @@ def multiple_fits_to_inference( ) inf_list.append(single_feat_inf) + inf_list = dask.compute(*inf_list) + if not concatenate: # Return list of individual InferenceData objects + return inf_list + else: + return concatenate_inferences(inf_list, coords, concatenation_name) + + +def _single_feature_to_inf( + fit: CmdStanMCMC, + coords: dict, + dims: dict, + vars_to_drop: Sequence[str], + posterior_predictive: str = None, + log_likelihood: str = None, +) -> az.InferenceData: + """Convert single feature fit to InferenceData.""" + feat_inf = az.from_cmdstanpy( + posterior=fit, + posterior_predictive=posterior_predictive, + log_likelihood=log_likelihood, + coords=coords, + dims=dims + ) + feat_inf.posterior = _drop_data(feat_inf.posterior, vars_to_drop) + return feat_inf + + +def concatenate_inferences( + inf_list: List[az.InferenceData], + coords: dict, + concatenation_name: str = "feature" +) -> az.InferenceData: group_list = [] group_list.append(dask.persist(*[x.posterior for x in inf_list])) group_list.append(dask.persist(*[x.sample_stats for x in inf_list])) - if log_likelihood is not None: + if "log_likelihood" in inf_list[0].groups(): group_list.append(dask.persist(*[x.log_likelihood for x in inf_list])) - if posterior_predictive is not None: + if "posterior_predictive" in inf_list[0].groups(): group_list.append( dask.persist(*[x.posterior_predictive for x in inf_list]) ) @@ -200,10 +228,10 @@ def multiple_fits_to_inference( ss_ds = xr.concat(group_list[1], concatenation_name) group_dict = {"posterior": po_ds, "sample_stats": ss_ds} - if log_likelihood is not None: + if "log_likelihood" in inf_list[0].groups(): ll_ds = xr.concat(group_list[2], concatenation_name) group_dict["log_likelihood"] = ll_ds - if posterior_predictive is not None: + if "posterior_predictive" in inf_list[0].groups(): pp_ds = xr.concat(group_list[3], concatenation_name) group_dict["posterior_predictive"] = pp_ds @@ -220,26 +248,6 @@ def multiple_fits_to_inference( return az.concat(*all_group_inferences) -def _single_feature_to_inf( - fit: CmdStanMCMC, - coords: dict, - dims: dict, - vars_to_drop: Sequence[str], - posterior_predictive: str = None, - log_likelihood: str = None, -) -> az.InferenceData: - """Convert single feature fit to InferenceData.""" - feat_inf = az.from_cmdstanpy( - posterior=fit, - posterior_predictive=posterior_predictive, - log_likelihood=log_likelihood, - coords=coords, - dims=dims - ) - feat_inf.posterior = _drop_data(feat_inf.posterior, vars_to_drop) - return feat_inf - - def _drop_data( dataset: xr.Dataset, vars_to_drop: Sequence[str], diff --git a/tests/conftest.py b/tests/conftest.py index 1ee0761..b994e53 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,7 +10,6 @@ TEST_DATA = resource_filename("tests", "data") TBL_FILE = os.path.join(TEST_DATA, "macaque_tbl.biom") MD_FILE = os.path.join(TEST_DATA, "macaque_metadata.tsv") -print(TBL_FILE) def example_biom(): @@ -91,5 +90,5 @@ def example_inf(): @pytest.fixture(scope="session") def example_parallel_inf(): nb = parallel_model() - inference = nb.to_inference_object() + inference = nb.to_inference_object(combine_individual_fits=True) return inference diff --git a/tests/test_custom_model.py b/tests/test_custom_model.py index a1cccb1..643e251 100644 --- a/tests/test_custom_model.py +++ b/tests/test_custom_model.py @@ -26,10 +26,7 @@ def test_custom_model(table_biom, metadata): "depth": np.log(custom_model.table.sum(axis="sample")), } ) - custom_model.compile_model() - custom_model.fit_model() - - inference = custom_model.to_inference_object( + custom_model.specify_model( params=["beta_var"], coords={ "feature": custom_model.feature_names, @@ -41,6 +38,9 @@ def test_custom_model(table_biom, metadata): }, alr_params=["beta_var"] ) + custom_model.compile_model() + custom_model.fit_model() + inference = custom_model.to_inference_object() assert set(inference.groups()) == {"posterior", "sample_stats"} ds = inference.posterior diff --git a/tests/test_diagnostics.py b/tests/test_diagnostics.py index a486b0f..402dbef 100644 --- a/tests/test_diagnostics.py +++ b/tests/test_diagnostics.py @@ -17,6 +17,5 @@ def test_r2_score(example_inf, example_parallel_inf): def test_loo(example_inf, example_parallel_inf): - print(example_inf) diag.loo(example_inf) diag.loo(example_parallel_inf) From e47b0292f58f5f7a60740b14ae664e136fe4c6ef Mon Sep 17 00:00:00 2001 From: Gibraan Rahman Date: Mon, 17 May 2021 13:22:35 -0700 Subject: [PATCH 03/12] Add auto-conversion to inference of // models --- birdman/model_base.py | 97 ++++++++++++++++++++++++++++++++++------ tests/test_model.py | 24 ++++++++++ tests/test_model_util.py | 21 +++++++++ 3 files changed, 128 insertions(+), 14 deletions(-) diff --git a/birdman/model_base.py b/birdman/model_base.py index 74d29a1..c361035 100644 --- a/birdman/model_base.py +++ b/birdman/model_base.py @@ -9,7 +9,8 @@ import pandas as pd from patsy import dmatrix -from .model_util import single_fit_to_inference, multiple_fits_to_inference +from .model_util import (single_fit_to_inference, multiple_fits_to_inference, + _single_feature_to_inf) class BaseModel: @@ -138,6 +139,8 @@ def add_parameters(self, param_dict=None) -> None: def fit_model( self, sampler_args: dict = None, + auto_convert_to_inference: bool = False, + combine_individual_fits: bool = False, dask_cluster: dask_jobqueue.JobQueueCluster = None, jobs: int = 4, ) -> None: @@ -147,6 +150,15 @@ def fit_model( sampler (optional) :type sampler_args: dict + :param auto_convert_to_inference: Whether to create individual + InferenceData objects for individual feature fits, defaults to + False + :type auto_convert_to_inference: bool + + :param combine_individual_fits: Whether to combine the results of + parallelized feature fits, defaults to True + :type combine_individual_fits: bool + :param dask_cluster: Dask jobqueue to run parallel jobs if parallelizing across features (optional) :type dask_cluster: dask_jobqueue @@ -156,15 +168,19 @@ def fit_model( :type jobs: int """ if self.parallelize_across == "features": - self.fit = self._fit_parallel(dask_cluster=dask_cluster, jobs=jobs, - sampler_args=sampler_args) + self._fit_parallel( + dask_cluster=dask_cluster, + jobs=jobs, + sampler_args=sampler_args, + auto_convert_to_inference=auto_convert_to_inference + ) elif self.parallelize_across == "chains": if None not in [dask_cluster, jobs]: warnings.warn( "dask_cluster and jobs ignored when parallelizing" " across chains." ) - self.fit = self._fit_serial(sampler_args) + self._fit_serial(sampler_args=sampler_args) else: raise ValueError("parallelize_across must be features or chains!") @@ -178,7 +194,7 @@ def _fit_serial(self, sampler_args: dict = None) -> CmdStanMCMC: if sampler_args is None: sampler_args = dict() - fit = self.sm.sample( + self.fit = self.sm.sample( chains=self.chains, parallel_chains=self.chains, # run all chains in parallel data=self.dat, @@ -187,16 +203,21 @@ def _fit_serial(self, sampler_args: dict = None) -> CmdStanMCMC: seed=self.seed, **sampler_args ) - return fit def _fit_parallel( self, + auto_convert_to_inference: bool = False, dask_cluster: dask_jobqueue.JobQueueCluster = None, jobs: int = 4, sampler_args: dict = None, ) -> List[CmdStanMCMC]: """Fit model by parallelizing across features. + :param auto_convert_to_inference: Whether to create individual + InferenceData objects for individual feature fits, defaults to + False + :type auto_convert_to_inference: bool + :param dask_cluster: Dask jobqueue to run parallel jobs (optional) :type dask_cluster: dask_jobqueue @@ -213,18 +234,48 @@ def _fit_parallel( if dask_cluster is not None: dask_cluster.scale(jobs=jobs) + if auto_convert_to_inference: + if not self.specifications: + raise ValueError( + "Model parameters, coords, and dims have not been ", + "specified!" + ) + concatenation_name = self.specifications.get("concatenation_name", + "feature") + new_dims = { + k: [dim for dim in v if dim != concatenation_name] + for k, v in self.specifications["dims"].items() + } + else: + new_dims = dict() + + _infs = [] _fits = [] for v, i, d in self.table.iter(axis="observation"): - _fit = dask.delayed(self._fit_single)(v, sampler_args) - _fits.append(_fit) + _tmp = dask.delayed(self._fit_single)( + v, + sampler_args, + auto_convert_to_inference, + new_dims + ) + if auto_convert_to_inference: + _fits.append(_tmp[0]) + _infs.append(_tmp[1]) + else: + _fits.append(_tmp) + + if auto_convert_to_inference: + inf_futures = dask.persist(*_infs) + all_infs = dask.compute(inf_futures)[0] + self.inf = all_infs - futures = dask.persist(*_fits) - all_fits = dask.compute(futures)[0] + fit_futures = dask.persist(*_fits) + all_fits = dask.compute(fit_futures)[0] # Set data back to full table self.dat["y"] = self.table.matrix_data.todense().T.astype(int) - return all_fits + self.fit = all_fits - def _fit_single(self, values, sampler_args): + def _fit_single(self, values, sampler_args, auto_convert, new_dims): dat = self.dat dat["y"] = values.astype(int) _fit = self.sm.sample( @@ -235,7 +286,22 @@ def _fit_single(self, values, sampler_args): seed=self.seed, **sampler_args ) - return _fit + if not auto_convert: + return _fit + else: + _inf = _single_feature_to_inf( + fit=_fit, + coords=self.specifications["coords"], + dims=new_dims, + vars_to_drop=[], + posterior_predictive=self.specifications.get( + "posterior_predictive" + ), + log_likelihood=self.specifications.get( + "log_likelihood" + ) + ) + return _fit, _inf def to_inference_object( self, @@ -245,8 +311,9 @@ def to_inference_object( ) -> az.InferenceData: """Convert fitted Stan model into ``arviz`` InferenceData object. - :param combine_invididual_fits: Whether to combine the results of + :param combine_individual_fits: Whether to combine the results of parallelized feature fits, defaults to True + :type combine_individual_fits: bool :param dask_cluster: Dask jobqueue to run parallel jobs (optional) :type dask_cluster: dask_jobqueue @@ -275,6 +342,8 @@ def to_inference_object( "concatenation_name", "feature" ) args["concatenate"] = True + else: + args["concatenate"] = False args["dask_cluster"] = dask_cluster args["jobs"] = jobs # TODO: Check that dims and concatenation_match diff --git a/tests/test_model.py b/tests/test_model.py index 93a39d5..0d8be61 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -85,3 +85,27 @@ def test_parallel_to_inference(self, example_parallel_model): target_groups = {"posterior", "sample_stats", "log_likelihood", "posterior_predictive", "observed_data"} assert set(inference_data.groups()) == target_groups + + def test_parallel_to_inference_no_concat(self, example_parallel_model): + inf = example_parallel_model.to_inference_object( + combine_individual_fits=False + ) + assert len(inf) == 28 + + def test_parallel_auto_inf(self, table_biom, metadata): + nb = NegativeBinomial( + table=table_biom, + formula="host_common_name", + metadata=metadata, + chains=4, + seed=42, + beta_prior=2.0, + cauchy_scale=2.0, + parallelize_across="features" + ) + nb.compile_model() + nb.fit_model( + auto_convert_to_inference=True + ) + assert len(nb.inf) == 28 + assert len(nb.fit) == 28 diff --git a/tests/test_model_util.py b/tests/test_model_util.py index d30cca1..cf5dcd9 100644 --- a/tests/test_model_util.py +++ b/tests/test_model_util.py @@ -61,6 +61,27 @@ def test_parallel_to_inference(self, example_parallel_model): ) self.dataset_comparison(example_parallel_model, inf.posterior) + def test_parallel_to_inference_no_concat(self, example_parallel_model): + inf = mu.multiple_fits_to_inference( + fits=example_parallel_model.fit, + params=["beta", "phi"], + coords={ + "feature": example_parallel_model.feature_names, + "covariate": example_parallel_model.colnames + }, + dims={ + "beta": ["covariate", "feature"], + "phi": ["feature"] + }, + concatenate=False, + log_likelihood="log_lhood", + posterior_predictive="y_predict", + ) + assert len(inf) == 28 + exp_groups = {"sample_stats", "posterior", "log_likelihood", + "posterior_predictive"} + assert set(inf[0].groups()) == exp_groups + def test_parallel_to_inference_wrong_concat(self, example_parallel_model): with pytest.raises(ValueError) as excinfo: mu.multiple_fits_to_inference( From f5cf57d03fb6755d603fc9fc5f60404cc2e2c5f9 Mon Sep 17 00:00:00 2001 From: Gibraan Rahman Date: Mon, 17 May 2021 13:42:56 -0700 Subject: [PATCH 04/12] Remove unwanted vars when auto-converting to inf --- birdman/model_base.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/birdman/model_base.py b/birdman/model_base.py index c361035..54e344e 100644 --- a/birdman/model_base.py +++ b/birdman/model_base.py @@ -289,11 +289,22 @@ def _fit_single(self, values, sampler_args, auto_convert, new_dims): if not auto_convert: return _fit else: + all_vars = _fit.stan_variables().keys() + vars_to_drop = set(all_vars).difference(self.specifications.get( + "params" + )) + ll = self.specifications.get("log_likelihood") + pp = self.specifications.get("posterior_predictive") + if ll is not None: + vars_to_drop.remove(ll) + if pp is not None: + vars_to_drop.remove(pp) + _inf = _single_feature_to_inf( fit=_fit, coords=self.specifications["coords"], dims=new_dims, - vars_to_drop=[], + vars_to_drop=vars_to_drop, posterior_predictive=self.specifications.get( "posterior_predictive" ), From 648573d4dd7ed5411a288c281ecfd48e0958e715 Mon Sep 17 00:00:00 2001 From: Gibraan Rahman Date: Mon, 17 May 2021 13:49:21 -0700 Subject: [PATCH 05/12] Update default model specifications --- birdman/default_models.py | 109 +++++++++++++++++--------------------- 1 file changed, 50 insertions(+), 59 deletions(-) diff --git a/birdman/default_models.py b/birdman/default_models.py index 132a2e9..bcc1f51 100644 --- a/birdman/default_models.py +++ b/birdman/default_models.py @@ -1,7 +1,6 @@ import os from pkg_resources import resource_filename -import arviz as az import biom import numpy as np import pandas as pd @@ -107,21 +106,23 @@ def __init__( } self.add_parameters(param_dict) - self.specifications["params"] = ["beta", "phi"] - self.specifications["dims"] = { - "beta": ["covariate", "feature"], - "phi": ["feature"], - "log_lhood": ["tbl_sample", "feature"], - "y_predict": ["tbl_sample", "feature"] - } - self.specifications["coords"] = { - "covariate": self.colnames, - "feature": self.feature_names, - "tbl_sample": self.sample_names - } - self.specifications["include_observed_data"] = True - self.specifications["posterior_predictive"] = "y_predict" - self.specifications["log_likelihood"] = "log_lhood" + self.specify_model( + params=["beta", "phi"], + dims={ + "beta": ["covariate", "feature"], + "phi": ["feature"], + "log_lhood": ["tbl_sample", "feature"], + "y_predict": ["tbl_sample", "feature"] + }, + coords={ + "covariate": self.colnames, + "feature": self.feature_names, + "tbl_sample": self.sample_names + }, + include_observed_data=True, + posterior_predictive="y_predict", + log_likelihood="log_lhood" + ) if self.parallelize_across == "chains": self.specifications["alr_params"] = ["beta"] @@ -231,23 +232,25 @@ def __init__( } self.add_parameters(param_dict) - self.specifications["params"] = ["beta", "phi"] - self.specifications["dims"] = { - "beta": ["covariate", "feature"], - "phi": ["feature"], - "subj_int": ["group", "feature"], - "log_lhood": ["tbl_sample", "feature"], - "y_predict": ["tbl_sample", "feature"] - } - self.specifications["coords"] = { - "covariate": self.colnames, - "feature": self.feature_names, - "tbl_sample": self.sample_names, - "group": self.groups - } - self.specifications["include_observed_data"] = True - self.specifications["posterior_predictive"] = "y_predict" - self.specifications["log_likelihood"] = "log_lhood" + self.specify_model( + params=["beta", "phi", "subj_int"], + dims={ + "beta": ["covariate", "feature"], + "phi": ["feature"], + "subj_int": ["group", "feature"], + "log_lhood": ["tbl_sample", "feature"], + "y_predict": ["tbl_sample", "feature"] + }, + coords={ + "covariate": self.colnames, + "feature": self.feature_names, + "tbl_sample": self.sample_names, + "group": self.groups + }, + include_observed_data=True, + posterior_predictive="y_predict", + log_likelihood="log_lhood" + ) if self.parallelize_across == "chains": self.specifications["alr_params"] = ["beta", "subj_int"] @@ -324,31 +327,19 @@ def __init__( } self.add_parameters(param_dict) - def to_inference_object(self) -> az.InferenceData: - """Convert fitted Stan model into ``arviz`` InferenceData object. - - :returns: ``arviz`` InferenceData object with selected values - :rtype: az.InferenceData - """ - dims = { - "beta": ["covariate", "feature"], - "log_lhood": ["tbl_sample"], - "y_predict": ["tbl_sample", "feature"] - } - coords = { - "covariate": self.colnames, - "feature": self.feature_names, - "tbl_sample": self.sample_names - } - - # TODO: May want to allow not passing PP/LL/OD in the future - inf = super().to_inference_object( - params=["beta", "phi"], - dims=dims, - coords=coords, - alr_params=["beta"], - posterior_predictive="y_predict", - log_likelihood="log_lhood", + self.specify_model( + params=["beta"], + dims={ + "beta": ["covariate", "feature"], + "log_lhood": ["tbl_sample"], + "y_predict": ["tbl_sample", "feature"] + }, + coords={ + "covariate": self.colnames, + "feature": self.feature_names, + "tbl_sample": self.sample_names, + }, include_observed_data=True, + posterior_predictive="y_predict", + log_likelihood="log_lhood" ) - return inf From 294f72637a3e17d7b3d94973673be0dbfe9575ba Mon Sep 17 00:00:00 2001 From: Gibraan Rahman Date: Wed, 19 May 2021 14:30:23 -0700 Subject: [PATCH 06/12] Restructure again with comments --- birdman/model_base.py | 140 ++++++++++++++++++++------------------- birdman/model_util.py | 56 ++++++++++++---- tests/test_model.py | 8 +-- tests/test_model_util.py | 4 +- 4 files changed, 120 insertions(+), 88 deletions(-) diff --git a/birdman/model_base.py b/birdman/model_base.py index 54e344e..45fb9d2 100644 --- a/birdman/model_base.py +++ b/birdman/model_base.py @@ -1,4 +1,4 @@ -from typing import List, Sequence +from typing import List, Sequence, Union import warnings import arviz as az @@ -6,6 +6,7 @@ from cmdstanpy import CmdStanModel, CmdStanMCMC import dask import dask_jobqueue +import numpy as np import pandas as pd from patsy import dmatrix @@ -132,15 +133,16 @@ def specify_model( self.specifications["posterior_predictive"] = posterior_predictive self.specifications["log_likelihood"] = log_likelihood - def add_parameters(self, param_dict=None) -> None: + def add_parameters(self, param_dict: dict = None) -> None: """Add parameters from dict to be passed to Stan.""" + if param_dict is None: + param_dict = dict() self.dat.update(param_dict) def fit_model( self, sampler_args: dict = None, - auto_convert_to_inference: bool = False, - combine_individual_fits: bool = False, + convert_to_inference: bool = False, dask_cluster: dask_jobqueue.JobQueueCluster = None, jobs: int = 4, ) -> None: @@ -150,14 +152,7 @@ def fit_model( sampler (optional) :type sampler_args: dict - :param auto_convert_to_inference: Whether to create individual - InferenceData objects for individual feature fits, defaults to - False - :type auto_convert_to_inference: bool - - :param combine_individual_fits: Whether to combine the results of - parallelized feature fits, defaults to True - :type combine_individual_fits: bool + : :param dask_cluster: Dask jobqueue to run parallel jobs if parallelizing across features (optional) @@ -168,11 +163,17 @@ def fit_model( :type jobs: int """ if self.parallelize_across == "features": + cn = self.specifications["concatenation_name"] + self.specifications["dims"] = { + k: [dim for dim in v if dim != cn] + for k, v in self.specifications["dims"].items() + + } self._fit_parallel( dask_cluster=dask_cluster, jobs=jobs, sampler_args=sampler_args, - auto_convert_to_inference=auto_convert_to_inference + convert_to_inference=convert_to_inference ) elif self.parallelize_across == "chains": if None not in [dask_cluster, jobs]: @@ -180,11 +181,18 @@ def fit_model( "dask_cluster and jobs ignored when parallelizing" " across chains." ) - self._fit_serial(sampler_args=sampler_args) + self._fit_serial( + sampler_args=sampler_args, + convert_to_inference=convert_to_inference + ) else: raise ValueError("parallelize_across must be features or chains!") - def _fit_serial(self, sampler_args: dict = None) -> CmdStanMCMC: + def _fit_serial( + self, + sampler_args: dict = None, + convert_to_inference: bool = False, + ) -> CmdStanMCMC: """Fit model by parallelizing across chains. :param sampler_args: Additional parameters to pass to CmdStanPy @@ -194,7 +202,7 @@ def _fit_serial(self, sampler_args: dict = None) -> CmdStanMCMC: if sampler_args is None: sampler_args = dict() - self.fit = self.sm.sample( + _fit = self.sm.sample( chains=self.chains, parallel_chains=self.chains, # run all chains in parallel data=self.dat, @@ -203,14 +211,27 @@ def _fit_serial(self, sampler_args: dict = None) -> CmdStanMCMC: seed=self.seed, **sampler_args ) + if convert_to_inference: + _fit = single_fit_to_inference( + fit=_fit, + params=self.specifications.get("params"), + coords=self.specifications.get("coords"), + dims=self.specifications.get("dims"), + alr_params=self.specifications.get("alr_params"), + posterior_predictive=self.specifications.get( + "posterior_predictive" + ), + log_likelihood=self.specifications.get("log_likelihood") + ) + self.fit = _fit def _fit_parallel( self, - auto_convert_to_inference: bool = False, + convert_to_inference: bool = False, dask_cluster: dask_jobqueue.JobQueueCluster = None, jobs: int = 4, sampler_args: dict = None, - ) -> List[CmdStanMCMC]: + ) -> Union[List[CmdStanMCMC], List[az.InferenceData]]: """Fit model by parallelizing across features. :param auto_convert_to_inference: Whether to create individual @@ -234,40 +255,14 @@ def _fit_parallel( if dask_cluster is not None: dask_cluster.scale(jobs=jobs) - if auto_convert_to_inference: - if not self.specifications: - raise ValueError( - "Model parameters, coords, and dims have not been ", - "specified!" - ) - concatenation_name = self.specifications.get("concatenation_name", - "feature") - new_dims = { - k: [dim for dim in v if dim != concatenation_name] - for k, v in self.specifications["dims"].items() - } - else: - new_dims = dict() - - _infs = [] _fits = [] for v, i, d in self.table.iter(axis="observation"): - _tmp = dask.delayed(self._fit_single)( + _fit = dask.delayed(self._fit_single)( v, sampler_args, - auto_convert_to_inference, - new_dims + convert_to_inference, ) - if auto_convert_to_inference: - _fits.append(_tmp[0]) - _infs.append(_tmp[1]) - else: - _fits.append(_tmp) - - if auto_convert_to_inference: - inf_futures = dask.persist(*_infs) - all_infs = dask.compute(inf_futures)[0] - self.inf = all_infs + _fits.append(_fit) fit_futures = dask.persist(*_fits) all_fits = dask.compute(fit_futures)[0] @@ -275,7 +270,12 @@ def _fit_parallel( self.dat["y"] = self.table.matrix_data.todense().T.astype(int) self.fit = all_fits - def _fit_single(self, values, sampler_args, auto_convert, new_dims): + def _fit_single( + self, + values: np.ndarray, + sampler_args: dict = None, + convert_to_inference: bool = False, + ) -> Union[CmdStanMCMC, az.InferenceData]: dat = self.dat dat["y"] = values.astype(int) _fit = self.sm.sample( @@ -286,33 +286,30 @@ def _fit_single(self, values, sampler_args, auto_convert, new_dims): seed=self.seed, **sampler_args ) - if not auto_convert: - return _fit - else: + + if convert_to_inference: all_vars = _fit.stan_variables().keys() - vars_to_drop = set(all_vars).difference(self.specifications.get( - "params" - )) - ll = self.specifications.get("log_likelihood") - pp = self.specifications.get("posterior_predictive") - if ll is not None: - vars_to_drop.remove(ll) - if pp is not None: - vars_to_drop.remove(pp) - - _inf = _single_feature_to_inf( + vars_to_drop = set(all_vars).difference( + self.specifications["params"] + ) + if self.specifications.get("posterior_predictive") is not None: + vars_to_drop.remove( + self.specifications["posterior_predictive"] + ) + if self.specifications.get("log_likelihood") is not None: + vars_to_drop.remove(self.specifications["log_likelihood"]) + + _fit = _single_feature_to_inf( fit=_fit, - coords=self.specifications["coords"], - dims=new_dims, + coords=self.specifications.get("coords"), + dims=self.specifications.get("dims"), vars_to_drop=vars_to_drop, posterior_predictive=self.specifications.get( "posterior_predictive" ), - log_likelihood=self.specifications.get( - "log_likelihood" - ) + log_likelihood=self.specifications.get("log_likelihood") ) - return _fit, _inf + return _fit def to_inference_object( self, @@ -338,6 +335,13 @@ def to_inference_object( if self.fit is None: raise ValueError("Model has not been fit!") + # if already Inference, just return + if isinstance(self.fit, az.InferenceData): + return self.fit + if isinstance(self.fit, list): + if isinstance(self.fit[0], az.InferenceData): + return self.fit + args = { k: self.specifications.get(k) for k in ["params", "coords", "dims", "posterior_predictive", diff --git a/birdman/model_util.py b/birdman/model_util.py index a0f306d..45d8b3c 100644 --- a/birdman/model_util.py +++ b/birdman/model_util.py @@ -58,7 +58,7 @@ def single_fit_to_inference( raise KeyError("Must include dimensions for posterior predictive!") inference = az.from_cmdstanpy( - posterior=fit, + fit, coords=coords, log_likelihood=log_likelihood, posterior_predictive=posterior_predictive, @@ -151,16 +151,13 @@ def multiple_fits_to_inference( :returns: ``arviz`` InferenceData object with selected values :rtype: az.InferenceData """ - # Remove the concatenation dimension from each individual fit + # Remove the concatenation dimension from each individual fit if + # necessary new_dims = { k: [dim for dim in v if dim != concatenation_name] for k, v in dims.items() } - # If dims are unchanged it means the concatenation_name was not found - if new_dims == dims: - raise ValueError("concatenation_name must match dimensions in dims") - # Remove the unnecessary posterior dimensions all_vars = fits[0].stan_variables().keys() vars_to_drop = set(all_vars).difference(params) @@ -196,7 +193,26 @@ def _single_feature_to_inf( posterior_predictive: str = None, log_likelihood: str = None, ) -> az.InferenceData: - """Convert single feature fit to InferenceData.""" + """Convert single feature fit to InferenceData. + + :param fit: Single feature fit with CmdStanPy + :type fit: cmdstanpy.CmdStanMCMC + + :param coords: Coordinates to use for annotating Inference dims + :type coords: dict + + :param dims: Dimensions of parameters in fitted model + :type dims: dict + + :param posterior_predictive: Name of variable holding PP values + :type posterior_predictive: str + + :param log_likelihood: Name of variable holding LL values + :type log_likelihood: str + + :returns: InferenceData object of single feature + :rtype: az.InferenceData + """ feat_inf = az.from_cmdstanpy( posterior=fit, posterior_predictive=posterior_predictive, @@ -213,17 +229,29 @@ def concatenate_inferences( coords: dict, concatenation_name: str = "feature" ) -> az.InferenceData: + """Concatenates multiple single feature fits into one object. + + :param inf_list: List of InferenceData objects for each feature + :type inf_list: List[az.InferenceData] + + :param coords: Coordinates containing concatenation name labels + :type coords: dict + + :param concatenation_name: Name of feature dimension used when + concatenating, defaults to "feature" + :type concatenation_name: str + + :returns: Combined InferenceData object + :rtype: az.InferenceData + """ group_list = [] - group_list.append(dask.persist(*[x.posterior for x in inf_list])) - group_list.append(dask.persist(*[x.sample_stats for x in inf_list])) + group_list.append([x.posterior for x in inf_list]) + group_list.append([x.sample_stats for x in inf_list]) if "log_likelihood" in inf_list[0].groups(): - group_list.append(dask.persist(*[x.log_likelihood for x in inf_list])) + group_list.append([x.log_likelihood for x in inf_list]) if "posterior_predictive" in inf_list[0].groups(): - group_list.append( - dask.persist(*[x.posterior_predictive for x in inf_list]) - ) + group_list.append([x.posterior_predictive for x in inf_list]) - group_list = dask.compute(*group_list) po_ds = xr.concat(group_list[0], concatenation_name) ss_ds = xr.concat(group_list[1], concatenation_name) group_dict = {"posterior": po_ds, "sample_stats": ss_ds} diff --git a/tests/test_model.py b/tests/test_model.py index 0d8be61..a5923df 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -1,6 +1,7 @@ import os from pkg_resources import resource_filename +from arviz import InferenceData import numpy as np from birdman import Multinomial, NegativeBinomial, NegativeBinomialLME @@ -98,14 +99,13 @@ def test_parallel_auto_inf(self, table_biom, metadata): formula="host_common_name", metadata=metadata, chains=4, + num_iter=100, seed=42, beta_prior=2.0, cauchy_scale=2.0, parallelize_across="features" ) nb.compile_model() - nb.fit_model( - auto_convert_to_inference=True - ) - assert len(nb.inf) == 28 + nb.fit_model(convert_to_inference=True) assert len(nb.fit) == 28 + assert isinstance(nb.fit[0], InferenceData) diff --git a/tests/test_model_util.py b/tests/test_model_util.py index cf5dcd9..635e9fd 100644 --- a/tests/test_model_util.py +++ b/tests/test_model_util.py @@ -97,8 +97,8 @@ def test_parallel_to_inference_wrong_concat(self, example_parallel_model): }, concatenation_name="mewtwo", ) - assert str(excinfo.value) == ("concatenation_name must match " - "dimensions in dims") + assert str(excinfo.value) == ("different number of dimensions on " + "data and dims: 3 vs 4") # Posterior predictive & log likelihood From 0eebfbf7704ce7dc6bbf269e8a0d501185ac3988 Mon Sep 17 00:00:00 2001 From: Gibraan Rahman Date: Wed, 26 May 2021 08:58:08 -0700 Subject: [PATCH 07/12] Remove dask cluster & jobs args --- birdman/model_base.py | 45 ++----------------------------------------- birdman/model_util.py | 3 --- 2 files changed, 2 insertions(+), 46 deletions(-) diff --git a/birdman/model_base.py b/birdman/model_base.py index 45fb9d2..38a576b 100644 --- a/birdman/model_base.py +++ b/birdman/model_base.py @@ -5,7 +5,6 @@ import biom from cmdstanpy import CmdStanModel, CmdStanMCMC import dask -import dask_jobqueue import numpy as np import pandas as pd from patsy import dmatrix @@ -143,24 +142,12 @@ def fit_model( self, sampler_args: dict = None, convert_to_inference: bool = False, - dask_cluster: dask_jobqueue.JobQueueCluster = None, - jobs: int = 4, ) -> None: """Fit model according to parallelization configuration. :param sampler_args: Additional parameters to pass to CmdStanPy sampler (optional) :type sampler_args: dict - - : - - :param dask_cluster: Dask jobqueue to run parallel jobs if - parallelizing across features (optional) - :type dask_cluster: dask_jobqueue - - :param jobs: Number of jobs to run in parallel jobs if parallelizing - across features, defaults to 4 - :type jobs: int """ if self.parallelize_across == "features": cn = self.specifications["concatenation_name"] @@ -170,17 +157,10 @@ def fit_model( } self._fit_parallel( - dask_cluster=dask_cluster, - jobs=jobs, sampler_args=sampler_args, convert_to_inference=convert_to_inference ) elif self.parallelize_across == "chains": - if None not in [dask_cluster, jobs]: - warnings.warn( - "dask_cluster and jobs ignored when parallelizing" - " across chains." - ) self._fit_serial( sampler_args=sampler_args, convert_to_inference=convert_to_inference @@ -228,22 +208,14 @@ def _fit_serial( def _fit_parallel( self, convert_to_inference: bool = False, - dask_cluster: dask_jobqueue.JobQueueCluster = None, - jobs: int = 4, sampler_args: dict = None, ) -> Union[List[CmdStanMCMC], List[az.InferenceData]]: """Fit model by parallelizing across features. - :param auto_convert_to_inference: Whether to create individual + :param convert_to_inference: Whether to create individual InferenceData objects for individual feature fits, defaults to False - :type auto_convert_to_inference: bool - - :param dask_cluster: Dask jobqueue to run parallel jobs (optional) - :type dask_cluster: dask_jobqueue - - :param jobs: Number of jobs to run parallel in parallel, defaults to 4 - :type jobs: int + :type convert_to_inference: bool :param sampler_args: Additional parameters to pass to CmdStanPy sampler (optional) @@ -252,9 +224,6 @@ def _fit_parallel( if sampler_args is None: sampler_args = dict() - if dask_cluster is not None: - dask_cluster.scale(jobs=jobs) - _fits = [] for v, i, d in self.table.iter(axis="observation"): _fit = dask.delayed(self._fit_single)( @@ -314,8 +283,6 @@ def _fit_single( def to_inference_object( self, combine_individual_fits: bool = True, - dask_cluster: dask_jobqueue.JobQueueCluster = None, - jobs: int = 4 ) -> az.InferenceData: """Convert fitted Stan model into ``arviz`` InferenceData object. @@ -323,12 +290,6 @@ def to_inference_object( parallelized feature fits, defaults to True :type combine_individual_fits: bool - :param dask_cluster: Dask jobqueue to run parallel jobs (optional) - :type dask_cluster: dask_jobqueue - - :param jobs: Number of jobs to run in parallel, defaults to 4 - :type jobs: int - :returns: ``arviz`` InferenceData object with selected values :rtype: az.InferenceData """ @@ -359,8 +320,6 @@ def to_inference_object( args["concatenate"] = True else: args["concatenate"] = False - args["dask_cluster"] = dask_cluster - args["jobs"] = jobs # TODO: Check that dims and concatenation_match if self.specifications.get("alr_params") is not None: diff --git a/birdman/model_util.py b/birdman/model_util.py index 45d8b3c..f4075de 100644 --- a/birdman/model_util.py +++ b/birdman/model_util.py @@ -4,7 +4,6 @@ import arviz as az from cmdstanpy import CmdStanMCMC import dask -import dask_jobqueue import numpy as np import xarray as xr @@ -115,8 +114,6 @@ def multiple_fits_to_inference( concatenation_name: str = "feature", posterior_predictive: str = None, log_likelihood: str = None, - dask_cluster: dask_jobqueue.JobQueueCluster = None, - jobs: int = 4 ) -> Union[az.InferenceData, List[az.InferenceData]]: """Save fitted parameters to xarray DataSet for multiple fits. From f5f7e011decf172b148ba8063c9585ec5aee93a1 Mon Sep 17 00:00:00 2001 From: Gibraan Rahman Date: Wed, 26 May 2021 09:51:24 -0700 Subject: [PATCH 08/12] Concatenate inferences in to_inference_object Now if self.fit is a sequence of InferenceData objects, can concatenate them in to_inference_object. --- birdman/model_base.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/birdman/model_base.py b/birdman/model_base.py index 38a576b..7e7bb21 100644 --- a/birdman/model_base.py +++ b/birdman/model_base.py @@ -10,7 +10,7 @@ from patsy import dmatrix from .model_util import (single_fit_to_inference, multiple_fits_to_inference, - _single_feature_to_inf) + _single_feature_to_inf, concatenate_inferences) class BaseModel: @@ -299,9 +299,18 @@ def to_inference_object( # if already Inference, just return if isinstance(self.fit, az.InferenceData): return self.fit - if isinstance(self.fit, list): + # if sequence of Inferences, concatenate if specified + if isinstance(self.fit, list) or isinstance(self.fit, tuple): if isinstance(self.fit[0], az.InferenceData): - return self.fit + if combine_individual_fits: + cat_name = self.specifications["concatenation_name"] + return concatenate_inferences( + self.fit, + coords=self.specifications["coords"], + concatenation_name=cat_name + ) + else: + return self.fit args = { k: self.specifications.get(k) From 039e0f9b896891d02435259aab98924f1219ef1b Mon Sep 17 00:00:00 2001 From: Gibraan Rahman Date: Thu, 27 May 2021 08:43:32 -0700 Subject: [PATCH 09/12] Raise error if attempted fit before compilation --- birdman/model_base.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/birdman/model_base.py b/birdman/model_base.py index 7e7bb21..8a270cf 100644 --- a/birdman/model_base.py +++ b/birdman/model_base.py @@ -148,7 +148,14 @@ def fit_model( :param sampler_args: Additional parameters to pass to CmdStanPy sampler (optional) :type sampler_args: dict + + :param convert_to_inference: Whether to automatically convert the + fitted model to an az.InferenceData object (defaults to False) + :type convert_to_inference: bool """ + if self.sm is None: + raise ValueError("Model must be compiled first!") + if self.parallelize_across == "features": cn = self.specifications["concatenation_name"] self.specifications["dims"] = { From b20f1af63ce5ae536fd21eaddc6fa8e21ab30312 Mon Sep 17 00:00:00 2001 From: Gibraan Rahman Date: Thu, 27 May 2021 08:45:49 -0700 Subject: [PATCH 10/12] Setup changes Remove dask-jobqueue as dependency as that can be handled outside of BIRDMAn. Bump version to 0.0.3. --- environment.yml | 1 - setup.py | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/environment.yml b/environment.yml index 210ed77..90b231c 100644 --- a/environment.yml +++ b/environment.yml @@ -11,7 +11,6 @@ dependencies: - xarray - patsy - dask - - dask-jobqueue - biom-format - arviz - docutils==0.16 diff --git a/setup.py b/setup.py index 07d72bd..42c4b88 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,7 @@ # Inspired by the EMPress setup.py file from setuptools import find_packages, setup -__version__ = "0.0.3dev" +__version__ = "0.0.3" classifiers = """ Development Status :: 3 - Alpha @@ -39,7 +39,6 @@ "numpy", "cmdstanpy", "dask[complete]", - "dask-jobqueue", "biom-format", "patsy", "xarray", From ddc1ca9d8d6333105beadc95b6dee875c378dfd9 Mon Sep 17 00:00:00 2001 From: Gibraan Rahman Date: Thu, 27 May 2021 08:59:13 -0700 Subject: [PATCH 11/12] Update custom model docs page Also addresses #41. --- docs/custom_model.rst | 42 +++++++++++++++++++++++++----------------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/docs/custom_model.rst b/docs/custom_model.rst index 995799c..5deb347 100644 --- a/docs/custom_model.rst +++ b/docs/custom_model.rst @@ -14,8 +14,8 @@ First we will download the BIOM table and metadata from Qiita. .. code-block:: bash - wget -O data.zip "https://qiita.ucsd.edu/public_artifact_download/?artifact_id=44773" - wget -O metadata.zip "https://qiita.ucsd.edu/public_download/?data=sample_information&study_id=107" + wget -O data.zip "https://qiita.ucsd.edu/public_artifact_download/?artifact_id=94270" + wget -O metadata.zip "https://qiita.ucsd.edu/public_download/?data=sample_information&study_id=11913" unzip data.zip unzip metadata.zip @@ -265,20 +265,6 @@ Now we can add all the necessary parameters to BIRDMAn with the ``add_parameters } nb_lme.add_parameters(param_dict) -Finally, we compile and fit the model. - -.. note:: - - Fitting this model took approximately 6 minutes on my laptop. - -.. code-block:: python - - nb_lme.compile_model() - nb_lme.fit_model() - -Converting to ``InferenceData`` -------------------------------- - With a custom model there is a bit more legwork involved in converting to the ``arviz.InferenceData`` data structure. We will step through each of the parameters in this example. * ``params``: List of parameters you want to include in the posterior draws (must match Stan code). @@ -289,9 +275,11 @@ With a custom model there is a bit more legwork involved in converting to the `` * ``log_likelihood``: Name of variable holding log likelihood values (optional). * ``include_observed_data``: Whether to include the original feature table as a group. This is useful for certain diagnostics. +We pass all these arguments into the ``specify_model`` method of the ``Model`` object. + .. code-block:: python - inference = nb_lme.to_inference_object( + nb_lme.specify_model( params=["beta", "phi", "subj_int"], dims={ "beta": ["covariate", "feature"], @@ -312,4 +300,24 @@ With a custom model there is a bit more legwork involved in converting to the `` include_observed_data=True ) +Finally, we compile and fit the model. + +.. note:: + + Fitting this model took approximately 6 minutes on my laptop. + +.. code-block:: python + + nb_lme.compile_model() + nb_lme.fit_model() + +Converting to ``InferenceData`` +------------------------------- + +When the model has finished fitting, you can convert to an inference data assuming you have specified your model previously. + +.. code-block:: python + + inference = nb_lme.to_inference_object() + With this you can use the rest of the BIRDMAn suite as usual or directly interact with the ``arviz`` library! From 53bd058d4d616a59289c7a88a354460baf6df0f6 Mon Sep 17 00:00:00 2001 From: Gibraan Rahman Date: Thu, 27 May 2021 09:02:05 -0700 Subject: [PATCH 12/12] Update parallelization docs --- docs/parallelization.rst | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/docs/parallelization.rst b/docs/parallelization.rst index 14b3cfc..18fea07 100644 --- a/docs/parallelization.rst +++ b/docs/parallelization.rst @@ -16,6 +16,7 @@ In this example we will assume that we have access to a SLURM cluster on which w import biom from birdman import NegativeBinomial from dask_jobqueue import SLURMCluster + from dask.distributed import Client import pandas as pd table = biom.load_table("example.biom") @@ -28,6 +29,9 @@ In this example we will assume that we have access to a SLURM cluster on which w walltime='01:00:00', local_directory='/scratch' ) + cluster.scale(jobs=8) # 8 microbes at a time + + client = Client(cluster) nb = NegativeBinomial( table=table, @@ -38,12 +42,12 @@ In this example we will assume that we have access to a SLURM cluster on which w parallelize_across="features" # note this argument! ) nb.compile_model() - nb.fit_model(dask_cluster=cluster, jobs=8) # run 8 microbes at a time + nb.fit_model() -This code will run 8 microbes in parallel which will be faster than running the 4 chains in parallel. You can scale this up further depending on your computational resources by increasing the ``jobs`` argument. +This code will run 8 microbes in parallel which will be faster than running the 4 chains in parallel. -When fitting in parallel, CmdStanPy fits a model for each feature in your table. Converting to an ``InferenceData`` object is then just a matter of combining all the individual fits. We can also parallelize this process to speed-up runtime. +When fitting in parallel, CmdStanPy fits a model for each feature in your table. Converting to an ``InferenceData`` object is then just a matter of combining all the individual fits. .. code-block:: python - nb.to_inference_object(dask_cluster=cluster, jobs=8) + nb.to_inference_object(combine_individual_fits=True)