diff --git a/pymc_marketing/mmm/base.py b/pymc_marketing/mmm/base.py index 506157cc3..3de7af127 100644 --- a/pymc_marketing/mmm/base.py +++ b/pymc_marketing/mmm/base.py @@ -233,6 +233,14 @@ def get_target_transformer(self) -> Pipeline: identity_transformer = FunctionTransformer() return Pipeline(steps=[("scaler", identity_transformer)]) + @property + def prior(self) -> Dataset: + if self.idata is None or "prior" not in self.idata: + raise RuntimeError( + "The model hasn't been fit yet, call .sample_prior_predictive() with extend_idata=True first" + ) + return self.idata["prior"] + @property def prior_predictive(self) -> az.InferenceData: if self.idata is None or "prior_predictive" not in self.idata: diff --git a/pymc_marketing/mmm/delayed_saturated_mmm.py b/pymc_marketing/mmm/delayed_saturated_mmm.py index 62ad9ec3e..4df0437f0 100644 --- a/pymc_marketing/mmm/delayed_saturated_mmm.py +++ b/pymc_marketing/mmm/delayed_saturated_mmm.py @@ -10,13 +10,14 @@ import pymc as pm import seaborn as sns from pytensor.tensor import TensorVariable -from xarray import DataArray +from xarray import DataArray, Dataset from pymc_marketing.mmm.base import MMM from pymc_marketing.mmm.preprocessing import MaxAbsScaleChannels, MaxAbsScaleTarget from pymc_marketing.mmm.transformers import geometric_adstock, logistic_saturation from pymc_marketing.mmm.utils import ( - apply_sklearn_transformer_across_date, + apply_sklearn_transformer_across_dim, + create_new_spend_data, generate_fourier_modes, ) from pymc_marketing.mmm.validating import ValidateControlColumns @@ -644,7 +645,6 @@ def identity(x): data: Dict[str, Union[np.ndarray, Any]] = { "channel_data": channel_transformation(new_channel_data) } - if self.control_columns is not None: control_data = X[self.control_columns].to_numpy() control_transformation = ( @@ -850,6 +850,234 @@ def plot_channel_contributions_grid( ) return fig + def new_spend_contributions( + self, + spend: Optional[np.ndarray] = None, + one_time: bool = True, + spend_leading_up: Optional[np.ndarray] = None, + prior: bool = False, + original_scale: bool = True, + **sample_posterior_predictive_kwargs, + ) -> DataArray: + """Return the upcoming contributions for a given spend. + + The spend can be one time or constant over the period. The spend leading up to the + period can also be specified in order account for the lagged effect of the spend. + + Parameters + ---------- + spend : np.ndarray, optional + Array of spend for each channel. If None, the average spend for each channel is used, by default None. + one_time : bool, optional + Whether the spends for each channel are only at the start of the period. + If True, all spends after the initial spend are zero. + If False, all spends after the initial spend are the same as the initial spend. + By default True. + spend_leading_up : np.array, optional + Array of spend for each channel leading up to the spend, by default None or 0 for each channel. + Use this parameter to account for the lagged effect of the spend. + prior : bool, optional + Whether to use the prior or posterior, by default False (posterior) + **sample_posterior_predictive_kwargs + Additional keyword arguments passed to pm.sample_posterior_predictive + + Returns + ------- + DataArray + Upcoming contributions for each channel + + Examples + -------- + Channel contributions from 1 unit on each channel only once. + + .. code-block:: python + + n_channels = len(model.channel_columns) + spend = np.ones(n_channels) + new_spend_contributions = model.new_spend_contributions(spend=spend) + + Channel contributions from continuously spending 1 unit on each channel. + + .. code-block:: python + + n_channels = len(model.channel_columns) + spend = np.ones(n_channels) + new_spend_contributions = model.new_spend_contributions(spend=spend, one_time=False) + + Channel contributions from 1 unit on each channel only once but with 1 unit leading up to the spend. + + .. code-block:: python + + n_channels = len(model.channel_columns) + spend = np.ones(n_channels) + spend_leading_up = np.ones(n_channels) + new_spend_contributions = model.new_spend_contributions(spend=spend, spend_leading_up=spend_leading_up) + """ + if spend is None: + spend = self.X.loc[:, self.channel_columns].mean().to_numpy() # type: ignore + + n_channels = len(self.channel_columns) + if len(spend) != n_channels: + raise ValueError("spend must be the same length as the number of channels") + + new_data = create_new_spend_data( + spend=spend, + adstock_max_lag=self.adstock_max_lag, + one_time=one_time, + spend_leading_up=spend_leading_up, + ) + + new_data = ( + self.channel_transformer.transform(new_data) if not prior else new_data + ) + + idata: Dataset = self.fit_result if not prior else self.prior + + coords = { + "time_since_spend": np.arange( + -self.adstock_max_lag, self.adstock_max_lag + 1 + ), + "channel": self.channel_columns, + } + with pm.Model(coords=coords): + alpha = pm.Uniform("alpha", lower=0, upper=1, dims=("channel",)) + lam = pm.HalfFlat("lam", dims=("channel",)) + beta_channel = pm.HalfFlat("beta_channel", dims=("channel",)) + + # Same as the forward pass of the model + channel_adstock = geometric_adstock( + x=new_data, + alpha=alpha, + l_max=self.adstock_max_lag, + normalize=True, + axis=0, + ) + channel_adstock_saturated = logistic_saturation(x=channel_adstock, lam=lam) + pm.Deterministic( + name="channel_contributions", + var=channel_adstock_saturated * beta_channel, + dims=("time_since_spend", "channel"), + ) + + samples = pm.sample_posterior_predictive( + idata, + var_names=["channel_contributions"], + **sample_posterior_predictive_kwargs, + ) + + channel_contributions = samples.posterior_predictive["channel_contributions"] + + if original_scale: + channel_contributions = apply_sklearn_transformer_across_dim( + data=channel_contributions, + func=self.get_target_transformer().inverse_transform, + dim_name="time_since_spend", + combined=False, + ) + + return channel_contributions + + def plot_new_spend_contributions( + self, + spend_amount: float, + one_time: bool = True, + lower: float = 0.025, + upper: float = 0.975, + ylabel: str = "Sales", + idx: Optional[slice] = None, + channels: Optional[List[str]] = None, + prior: bool = False, + original_scale: bool = True, + ax: Optional[plt.Axes] = None, + **sample_posterior_predictive_kwargs, + ) -> plt.Axes: + """Plot the upcoming sales for a given spend amount. + + Calls the new_spend_contributions method and plots the results. For more + control over the plot, use new_spend_contributions directly. + + Parameters + ---------- + spend_amount : float + The amount of spend for each channel + one_time : bool, optional + Whether the spend are one time (at start of period) or constant (over period), by default True (one time) + lower : float, optional + The lower quantile for the confidence interval, by default 0.025 + upper : float, optional + The upper quantile for the confidence interval, by default 0.975 + ylabel : str, optional + The label for the y-axis, by default "Sales" + idx : slice, optional + The index slice of days to plot, by default None or only the positive days. + More specifically, slice(0, None, None) + channels : List[str], optional + The channels to plot, by default None or all channels + prior : bool, optional + Whether to use the prior or posterior, by default False (posterior) + original_scale : bool, optional + Whether to plot in the original scale of the target variable, by default True + ax : plt.Axes, optional + The axes to plot on, by default None or current axes + **sample_posterior_predictive_kwargs + Additional keyword arguments passed to pm.sample_posterior_predictive + + Returns + ------- + plt.Axes + The plot of upcoming sales for the given spend amount + + """ + for value in [lower, upper]: + if value < 0 or value > 1: + raise ValueError("lower and upper must be between 0 and 1") + if lower > upper: + raise ValueError("lower must be less than or equal to upper") + + ax = ax or plt.gca() + total_channels = len(self.channel_columns) + contributions = self.new_spend_contributions( + np.ones(total_channels) * spend_amount, + one_time=one_time, + spend_leading_up=np.ones(total_channels) * spend_amount, + prior=prior, + original_scale=original_scale, + **sample_posterior_predictive_kwargs, + ) + + contributions_groupby = contributions.to_series().groupby( + level=["time_since_spend", "channel"] + ) + + idx = idx or pd.IndexSlice[0:] + + conf = ( + contributions_groupby.quantile([lower, upper]) + .unstack("channel") + .unstack() + .loc[idx] + ) + + channels = channels or self.channel_columns # type: ignore + for channel in channels: # type: ignore + ax.fill_between( + conf.index, + conf[channel][lower], + conf[channel][upper], + label=f"{channel} {100 * (upper - lower):.0f}% CI", + alpha=0.5, + ) + mean = contributions_groupby.mean().unstack("channel").loc[idx, channels] + color = [f"C{i}" for i in range(len(channels))] # type: ignore + mean.add_suffix(" mean").plot(ax=ax, color=color, alpha=0.75) + ax.legend().set_title("Channel") + ax.set( + xlabel="Time since spend", + ylabel=ylabel, + title=f"Upcoming sales for {spend_amount:.02f} spend", + ) + return ax + def _validate_data(self, X, y=None): return X @@ -910,9 +1138,10 @@ def sample_posterior_predictive( ) if original_scale: - posterior_predictive_samples = apply_sklearn_transformer_across_date( + posterior_predictive_samples = apply_sklearn_transformer_across_dim( data=posterior_predictive_samples, func=self.get_target_transformer().inverse_transform, + dim_name="date", combined=combined, ) diff --git a/pymc_marketing/mmm/utils.py b/pymc_marketing/mmm/utils.py index c9a635031..e8fa633da 100644 --- a/pymc_marketing/mmm/utils.py +++ b/pymc_marketing/mmm/utils.py @@ -1,5 +1,5 @@ import re -from typing import Any, Callable, Dict, List, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union import numpy as np import numpy.typing as npt @@ -289,9 +289,10 @@ def standardize_scenarios_dict_keys(d: Dict, keywords: List[str]): break -def apply_sklearn_transformer_across_date( +def apply_sklearn_transformer_across_dim( data: xr.DataArray, func: Callable[[np.ndarray], np.ndarray], + dim_name: str, combined: bool = False, ) -> xr.DataArray: """Helper function in order to use scikit-learn functions with the xarray target. @@ -300,6 +301,7 @@ def apply_sklearn_transformer_across_date( ---------- data : func : scikit-learn method to apply to the data + dim_name : Name of the dimension to apply the function to combined : Flag to indicate if the data coords have been combined or not Returns @@ -318,11 +320,99 @@ def apply_sklearn_transformer_across_date( data = xr.apply_ufunc( func, data.expand_dims(dim={"_": 1}, axis=1), - input_core_dims=[["date", "_"]], - output_core_dims=[["date", "_"]], + input_core_dims=[[dim_name, "_"]], + output_core_dims=[[dim_name, "_"]], vectorize=True, ).squeeze(dim="_") data.attrs = attrs return data + + +def create_new_spend_data( + spend: np.ndarray, + adstock_max_lag: int, + one_time: bool, + spend_leading_up: Optional[np.ndarray] = None, +) -> np.ndarray: + """Create new spend data for the channel forward pass. + + Spends must be the same length as the number of channels. + + .. plot:: + :context: close-figs + + import numpy as np + import matplotlib.pyplot as plt + import arviz as az + + from pymc_marketing.mmm.utils import create_new_spend_data + az.style.use("arviz-white") + + spend = np.array([1, 2]) + adstock_max_lag = 3 + one_time = True + spend_leading_up = np.array([4, 3]) + channel_spend = create_new_spend_data(spend, adstock_max_lag, one_time, spend_leading_up) + + time_since_spend = np.arange(-adstock_max_lag, adstock_max_lag + 1) + + ax = plt.subplot() + ax.plot( + time_since_spend, + channel_spend, + "o", + label=["Channel 1", "Channel 2"] + ) + ax.legend() + ax.set( + xticks=time_since_spend, + yticks=np.arange(0, channel_spend.max() + 1), + xlabel="Time since spend", + ylabel="Spend", + title="One time spend with spends leading up", + ) + plt.show() + + + Parameters + --------- + spend : np.ndarray + The spend data for the channels. + adstock_max_lag : int + The maximum lag for the adstock transformation. + one_time: bool, optional + If the spend is one-time, by default True. + spend_leading_up : np.ndarray, optional + The spend leading up to the first observation, by default None or 0. + + Returns + ------- + np.ndarray + The new spend data for the channel forward pass. + """ + n_channels = len(spend) + + if spend_leading_up is None: + spend_leading_up = np.zeros_like(spend) + + if len(spend_leading_up) != n_channels: + raise ValueError("spend_leading_up must be the same length as the spend") + + spend_leading_up = np.tile(spend_leading_up, adstock_max_lag).reshape( + adstock_max_lag, -1 + ) + + spend = ( + np.vstack([spend, np.zeros((adstock_max_lag, n_channels))]) + if one_time + else np.ones((adstock_max_lag + 1, n_channels)) * spend + ) + + return np.vstack( + [ + spend_leading_up, + spend, + ] + ) diff --git a/pyproject.toml b/pyproject.toml index 7bd7f4e67..70b5aa7fd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,7 +38,11 @@ docs = [ "sphinx-design", ] lint = ["mypy", "pandas-stubs", "pre-commit>=2.19.0", "ruff>=0.1.4"] -test = ["lifetimes==0.11.3", "pytest==7.0.1", "pytest-cov==3.0.0"] +test = [ + "lifetimes==0.11.3", + "pytest==7.0.1", + "pytest-cov==3.0.0", +] [tool.setuptools] packages = [ diff --git a/tests/mmm/test_base.py b/tests/mmm/test_base.py index c96eda4a4..3dc41dc75 100644 --- a/tests/mmm/test_base.py +++ b/tests/mmm/test_base.py @@ -269,3 +269,17 @@ def test_calling_fit_result_before_fit_raises_error(test_mmm, toy_X, toy_y): test_mmm.fit_result assert test_mmm.idata is not None assert "posterior" in test_mmm.idata + + +def test_calling_prior_before_sample_prior_predictive_raises_error( + test_mmm, toy_X, toy_y +): + # Arrange + test_mmm.idata = None + with pytest.raises( + RuntimeError, + match=re.escape( + "The model hasn't been fit yet, call .sample_prior_predictive() with extend_idata=True first" + ), + ): + test_mmm.prior diff --git a/tests/mmm/test_delayed_saturated_mmm.py b/tests/mmm/test_delayed_saturated_mmm.py index ef2e6864c..3b0be9423 100644 --- a/tests/mmm/test_delayed_saturated_mmm.py +++ b/tests/mmm/test_delayed_saturated_mmm.py @@ -6,6 +6,7 @@ import pandas as pd import pymc as pm import pytest +import xarray as xr from matplotlib import pyplot as plt from pymc_marketing.mmm.delayed_saturated_mmm import ( @@ -85,7 +86,7 @@ def toy_y(toy_X: pd.DataFrame) -> pd.Series: return pd.Series(data=rng.integers(low=0, high=100, size=toy_X.shape[0])) -@pytest.fixture(scope="class") +@pytest.fixture(scope="module") def mmm() -> DelayedSaturatedMMM: return DelayedSaturatedMMM( date_column="date", @@ -706,6 +707,7 @@ def test_new_data_predict_method( X_pred = generate_data(new_dates) posterior_predictive_mean = mmm.predict(X_pred=X_pred) + assert isinstance(posterior_predictive_mean, np.ndarray) assert posterior_predictive_mean.shape[0] == new_dates.size # Original scale constraint @@ -761,3 +763,98 @@ def test_create_likelihood_mu_in_top_level_kwargs(mmm): observed=np.random.randn(100), dims="obs_dim", ) + + +def new_contributions_property_checks(new_contributions, X, model): + assert isinstance(new_contributions, xr.DataArray) + + coords = new_contributions.coords + assert coords["channel"].values.tolist() == model.channel_columns + np.testing.assert_allclose( + coords["time_since_spend"].values, + np.arange(-model.adstock_max_lag, model.adstock_max_lag + 1), + ) + + # Channel contributions are non-negative + assert (new_contributions >= 0).all() + + +def test_new_spend_contributions(mmm_fitted) -> None: + new_spend = np.ones(len(mmm_fitted.channel_columns)) + new_contributions = mmm_fitted.new_spend_contributions(new_spend) + + new_contributions_property_checks(new_contributions, mmm_fitted.X, mmm_fitted) + + +def test_new_spend_contributions_prior_error(mmm) -> None: + new_spend = np.ones(len(mmm.channel_columns)) + match = "sample_prior_predictive" + with pytest.raises(RuntimeError, match=match): + mmm.new_spend_contributions(new_spend, prior=True) + + +@pytest.mark.parametrize("original_scale", [True, False]) +def test_new_spend_contributions_prior(original_scale, mmm, toy_X) -> None: + mmm.sample_prior_predictive( + X_pred=toy_X, + extend_idata=True, + ) + + new_spend = np.ones(len(mmm.channel_columns)) + new_contributions = mmm.new_spend_contributions( + new_spend, prior=True, original_scale=original_scale, random_seed=0 + ) + + new_contributions_property_checks(new_contributions, toy_X, mmm) + + +def test_plot_new_spend_contributions_original_scale(mmm_fitted) -> None: + ax = mmm_fitted.plot_new_spend_contributions( + spend_amount=1, original_scale=True, random_seed=0 + ) + + assert isinstance(ax, plt.Axes) + + +@pytest.fixture(scope="module") +def mmm_with_prior(mmm) -> DelayedSaturatedMMM: + n_chains = 1 + n_samples = 100 + + channels = mmm.channel_columns + n_channels = len(channels) + + idata = az.from_dict( + prior={ + # Arbitrary but close to the default parameterization + "alpha": rng.uniform(size=(n_chains, n_samples, n_channels)), + "lam": rng.exponential(size=(n_chains, n_samples, n_channels)), + "beta_channel": np.abs(rng.normal(size=(n_chains, n_samples, n_channels))), + }, + coords={"channel": channels}, + dims={ + "alpha": ["chain", "draw", "channel"], + "lam": ["chain", "draw", "channel"], + "beta_channel": ["chain", "draw", "channel"], + }, + ) + mmm.idata = idata + + return mmm + + +def test_plot_new_spend_contributions_prior(mmm_with_prior) -> None: + ax = mmm_with_prior.plot_new_spend_contributions( + spend_amount=1, prior=True, random_seed=0 + ) + assert isinstance(ax, plt.Axes) + + +def test_plot_new_spend_contributions_prior_select_channels( + mmm_with_prior, +) -> None: + ax = mmm_with_prior.plot_new_spend_contributions( + spend_amount=1, prior=True, channels=["channel_2"], random_seed=0 + ) + + assert isinstance(ax, plt.Axes) diff --git a/tests/mmm/test_utils.py b/tests/mmm/test_utils.py index 8b6f49689..9539e7f9d 100644 --- a/tests/mmm/test_utils.py +++ b/tests/mmm/test_utils.py @@ -4,8 +4,9 @@ import xarray as xr from pymc_marketing.mmm.utils import ( - apply_sklearn_transformer_across_date, + apply_sklearn_transformer_across_dim, compute_sigmoid_second_derivative, + create_new_spend_data, estimate_menten_parameters, estimate_sigmoid_parameters, extense_sigmoid, @@ -234,29 +235,73 @@ def _create_mock_mm_return_data(combined: bool) -> xr.DataArray: @pytest.mark.parametrize("combined", [True, False]) -def test_apply_sklearn_function_across_date( +def test_apply_sklearn_function_across_dim( mock_method, create_mock_mmm_return_data, combined: bool ) -> None: # Data that would be returned from a MMM model data = create_mock_mmm_return_data(combined=combined) - result = apply_sklearn_transformer_across_date( + result = apply_sklearn_transformer_across_dim( data, mock_method, + dim_name="date", combined=combined, ) xr.testing.assert_allclose(result, data * 2) -def test_apply_sklearn_function_across_date_error( +def test_apply_sklearn_function_across_dim_error( mock_method, create_mock_mmm_return_data, ) -> None: data = create_mock_mmm_return_data(combined=False) with pytest.raises(ValueError, match="x must be 2-dimensional"): - apply_sklearn_transformer_across_date( + apply_sklearn_transformer_across_dim( data, mock_method, + dim_name="date", combined=True, ) + + +@pytest.mark.parametrize( + "spend, adstock_max_lag, one_time, spend_leading_up, expected_result", + [ + ( + [1, 2], + 2, + True, + None, + [[0, 0], [0, 0], [1, 2], [0, 0], [0, 0]], + ), + ( + [1, 2], + 2, + False, + None, + [[0, 0], [0, 0], [1, 2], [1, 2], [1, 2]], + ), + ( + [1, 2], + 2, + True, + [3, 4], + [[3, 4], [3, 4], [1, 2], [0, 0], [0, 0]], + ), + ], +) +def test_create_new_spend_data( + spend, adstock_max_lag, one_time, spend_leading_up, expected_result +) -> None: + spend = np.array(spend) + if spend_leading_up is not None: + spend_leading_up = np.array(spend_leading_up) + new_spend_data = create_new_spend_data( + spend, adstock_max_lag, one_time, spend_leading_up + ) + + np.testing.assert_allclose( + new_spend_data, + np.array(expected_result), + )