diff --git a/pymc_marketing/mmm/transformers.py b/pymc_marketing/mmm/transformers.py index f8b9a9516..ff8001af9 100644 --- a/pymc_marketing/mmm/transformers.py +++ b/pymc_marketing/mmm/transformers.py @@ -3,6 +3,7 @@ import numpy as np import numpy.typing as npt +import pymc as pm import pytensor.tensor as pt from pytensor.tensor.random.utils import params_broadcast_shapes @@ -14,6 +15,12 @@ class ConvMode(str, Enum): Overlap = "Overlap" +class WeibullType(str, Enum): + # TODO: use StrEnum when we upgrade to python 3.11 + PDF = "PDF" + CDF = "CDF" + + def batched_convolution( x, w, @@ -266,6 +273,104 @@ def delayed_adstock( return batched_convolution(x, w, axis=axis, mode=ConvMode.After) +def weibull_adstock( + x, + lam=1, + k=1, + l_max: int = 12, + axis: int = 0, + type: Union[WeibullType, str] = WeibullType.PDF, +): + R"""Weibull Adstocking Transformation. + + This transformation is similar to geometric adstock transformation but has more degrees of freedom, adding more flexibility. + + .. plot:: + :context: close-figs + + import matplotlib.pyplot as plt + import numpy as np + import arviz as az + from pymc_marketing.mmm.transformers import WeibullType, weibull_adstock + plt.style.use('arviz-darkgrid') + + spend = np.zeros(50) + spend[0] = 1 + + shapes = [0.5, 1., 1.5, 5.] + scales = [10, 20, 40] + modes = [WeibullType.PDF, WeibullType.CDF] + + fig, axes = plt.subplots( + len(shapes), len(modes), figsize=(12, 8), sharex=True, sharey=True + ) + fig.suptitle("Effect of Changing Weibull Adstock Parameters", fontsize=16) + + for m, mode in enumerate(modes): + axes[0, m].set_title(f"Mode: {mode.value}") + + for i, shape in enumerate(shapes): + for j, scale in enumerate(scales): + adstock = weibull_adstock( + spend, lam=scale, k=shape, type=mode, l_max=len(spend) + ).eval() + + axes[i, m].plot( + np.arange(len(spend)), + adstock, + label=f"Scale={scale}", + linestyle="-", + ) + + fig.legend( + *axes[0, 0].get_legend_handles_labels(), + loc="center right", + bbox_to_anchor=(1.2, 0.85), + ) + + plt.tight_layout(rect=[0, 0, 0.9, 1]) + plt.show() + + + + Parameters + ---------- + x : tensor + Input tensor. + lam : float, by default 1. + Scale parameter of the Weibull distribution. Must be positive. + k : float, by default 1. + Shape parameter of the Weibull distribution. Must be positive. + l_max : int, by default 12 + Maximum duration of carryover effect. + type : WeibullType or str, by default WeibullType.PDF + Type of Weibull adstock transformation to be applied (PDF or CDF). + + Returns + ------- + tensor + Transformed tensor based on Weibull adstock transformation. + """ + lam = pt.as_tensor(lam)[..., None] + k = pt.as_tensor(k)[..., None] + t = pt.arange(l_max, dtype=x.dtype) + 1 + + if type == WeibullType.PDF: + w = pt.exp(pm.Weibull.logp(t, k, lam)) + w = (w - pt.min(w, axis=-1)[..., None]) / ( + pt.max(w, axis=-1)[..., None] - pt.min(w, axis=-1)[..., None] + ) + elif type == WeibullType.CDF: + w = 1 - pt.exp(pm.Weibull.logcdf(t, k, lam)) + shape = (*w.shape[:-1], w.shape[-1] + 1) + padded_w = pt.ones(shape, dtype=w.dtype) + padded_w = pt.set_subtensor(padded_w[..., 1:], w) + w = pt.cumprod(padded_w, axis=-1) + else: + raise ValueError(f"Wrong WeibullType: {type}, expected of WeibullType") + return batched_convolution(x, w, axis=axis) + + def logistic_saturation(x, lam: Union[npt.NDArray[np.float_], float] = 0.5): """Logistic saturation transformation. diff --git a/tests/mmm/test_transformers.py b/tests/mmm/test_transformers.py index 6e559a21a..61595aa5d 100644 --- a/tests/mmm/test_transformers.py +++ b/tests/mmm/test_transformers.py @@ -1,18 +1,23 @@ +from contextlib import nullcontext as does_not_raise + import numpy as np import pytensor import pytensor.tensor as pt import pytest +import scipy as sp from pytensor.tensor.variable import TensorVariable from pymc_marketing.mmm.transformers import ( ConvMode, TanhSaturationParameters, + WeibullType, batched_convolution, delayed_adstock, geometric_adstock, logistic_saturation, tanh_saturation, tanh_saturation_baselined, + weibull_adstock, ) @@ -168,6 +173,91 @@ def test_delayed_adstock_vectorized(self, dummy_design_matrix): assert y.shape == x.shape np.testing.assert_almost_equal(actual=y, desired=ys, decimal=12) + @pytest.mark.parametrize( + "x, lam, k, l_max", + [ + (np.zeros(shape=(100)), 1, 1, 4), + (np.ones(shape=(100)), 0.3, 0.5, 10), + (np.ones(shape=(100)), 0.7, 1, 100), + (np.zeros(shape=(100)), 0.2, 0.2, 5), + (np.ones(shape=(100)), 0.5, 0.8, 7), + (np.linspace(start=0.0, stop=1.0, num=50), 0.8, 1.5, 3), + (np.linspace(start=0.0, stop=1.0, num=50), 0.8, 1, 50), + ], + ) + def test_weibull_pdf_adstock(self, x, lam, k, l_max): + y = weibull_adstock(x=x, lam=lam, k=k, l_max=l_max, type=WeibullType.PDF).eval() + + assert np.all(np.isfinite(y)) + w = sp.stats.weibull_min.pdf(np.arange(l_max) + 1, c=k, scale=lam) + w = (w - np.min(w)) / (np.max(w) - np.min(w)) + sp_y = batched_convolution(x, w).eval() + + np.testing.assert_almost_equal(y, sp_y) + + @pytest.mark.parametrize( + "x, lam, k, l_max", + [ + (np.zeros(shape=(100)), 1, 1, 4), + (np.ones(shape=(100)), 0.3, 0.5, 10), + (np.ones(shape=(100)), 0.7, 1, 100), + (np.zeros(shape=(100)), 0.2, 0.2, 5), + (np.ones(shape=(100)), 0.5, 0.8, 7), + (np.linspace(start=0.0, stop=1.0, num=50), 0.8, 1.5, 3), + (np.linspace(start=0.0, stop=1.0, num=50), 0.8, 1, 50), + ], + ) + def test_weibull_cdf_adsotck(self, x, lam, k, l_max): + y = weibull_adstock(x=x, lam=lam, k=k, l_max=l_max, type=WeibullType.CDF).eval() + + assert np.all(np.isfinite(y)) + w = 1 - sp.stats.weibull_min.cdf(np.arange(l_max) + 1, c=k, scale=lam) + w = np.cumprod(np.concatenate([[1], w])) + sp_y = batched_convolution(x, w).eval() + np.testing.assert_almost_equal(y, sp_y) + + @pytest.mark.parametrize( + "type", + [ + WeibullType.PDF, + WeibullType.CDF, + ], + ) + def test_weibull_adstock_vectorized(self, type, dummy_design_matrix): + x = dummy_design_matrix.copy() + x_tensor = pt.as_tensor_variable(x) + lam = [0.9, 0.33, 0.5, 0.1, 1.0] + lam_tensor = pt.as_tensor_variable(lam) + k = [0.8, 0.2, 0.6, 0.4, 1.0] + k_tensor = pt.as_tensor_variable(k) + y = weibull_adstock( + x=x_tensor, lam=lam_tensor, k=k_tensor, l_max=12, type=type + ).eval() + + y_tensors = [ + weibull_adstock( + x=x_tensor[:, i], lam=lam_tensor[i], k=k_tensor[i], l_max=12, type=type + ) + for i in range(x.shape[1]) + ] + ys = np.concatenate([y_t.eval()[..., None] for y_t in y_tensors], axis=1) + assert y.shape == x.shape + np.testing.assert_almost_equal(actual=y, desired=ys, decimal=12) + + @pytest.mark.parametrize( + "type, expectation", + [ + ("PDF", does_not_raise()), + ("CDF", does_not_raise()), + ("PMF", pytest.raises(ValueError)), + (WeibullType.PDF, does_not_raise()), + (WeibullType.CDF, does_not_raise()), + ], + ) + def test_weibull_adstock_type(self, type, expectation): + with expectation: + weibull_adstock(x=np.ones(shape=(100)), lam=0.5, k=0.5, l_max=10, type=type) + class TestSaturationTransformers: def test_logistic_saturation_lam_zero(self):