Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Weibull CDF and PDF Adstock Transformations #499

Merged
merged 12 commits into from
Jan 29, 2024
105 changes: 105 additions & 0 deletions pymc_marketing/mmm/transformers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import numpy as np
import numpy.typing as npt
import pymc as pm
import pytensor.tensor as pt
from pytensor.tensor.random.utils import params_broadcast_shapes

Expand All @@ -13,6 +14,12 @@ class ConvMode(Enum):
Overlap = "Overlap"


class WeibullType(str, Enum):
# TODO: use StrEnum when we upgrade to python 3.11
PDF = "PDF"
CDF = "CDF"


def batched_convolution(x, w, axis: int = 0, mode: ConvMode = ConvMode.Before):
R"""Apply a 1D convolution in a vectorized way across multiple batch dimensions.

Expand Down Expand Up @@ -260,6 +267,104 @@ def delayed_adstock(
return batched_convolution(x, w, axis=axis)


def weibull_adstock(
x,
lam=1,
k=1,
l_max: int = 12,
axis: int = 0,
type: Union[WeibullType, str] = WeibullType.PDF,
):
R"""Weibull Adstocking Transformation.

This transformation is similar to geometric adstock transformation but has more degrees of freedom, adding more flexibility.

.. plot::
:context: close-figs
abdalazizrashid marked this conversation as resolved.
Show resolved Hide resolved

import matplotlib.pyplot as plt
import numpy as np
import arviz as az
from pymc_marketing.mmm.transformers import WeibullType, weibull_adstock
plt.style.use('arviz-darkgrid')

spend = np.zeros(50)
spend[0] = 1

shapes = [0.5, 1., 1.5, 5.]
scales = [10, 20, 40]
modes = [WeibullType.PDF, WeibullType.CDF]

fig, axes = plt.subplots(
len(shapes), len(modes), figsize=(12, 8), sharex=True, sharey=True
)
fig.suptitle("Effect of Changing Weibull Adstock Parameters", fontsize=16)

for m, mode in enumerate(modes):
axes[0, m].set_title(f"Mode: {mode.value}")

for i, shape in enumerate(shapes):
for j, scale in enumerate(scales):
adstock = weibull_adstock(
spend, lam=scale, k=shape, type=mode, l_max=len(spend)
).eval()

axes[i, m].plot(
np.arange(len(spend)),
adstock,
label=f"Scale={scale}",
linestyle="-",
)

fig.legend(
*axes[0, 0].get_legend_handles_labels(),
loc="center right",
bbox_to_anchor=(1.2, 0.85),
)

plt.tight_layout(rect=[0, 0, 0.9, 1])
plt.show()



Parameters
----------
x : tensor
Input tensor.
lam : float, by default 1.
Scale parameter of the Weibull distribution. Must be positive.
k : float, by default 1.
Shape parameter of the Weibull distribution. Must be positive.
l_max : int, by default 12
Maximum duration of carryover effect.
type : WeibullType, by default WeibullType.PDF
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe good to add string to match with the type hint

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Type of Weibull adstock transformation to be applied (PDF or CDF).

Returns
-------
tensor
Transformed tensor based on Weibull adstock transformation.
"""
lam = pt.as_tensor(lam)[..., None]
k = pt.as_tensor(k)[..., None]
t = pt.arange(l_max, dtype=x.dtype) + 1

if type == WeibullType.PDF:
w = pt.exp(pm.Weibull.logp(t, k, lam))
w = (w - pt.min(w, axis=-1)[..., None]) / (
pt.max(w, axis=-1)[..., None] - pt.min(w, axis=-1)[..., None]
)
elif type == WeibullType.CDF:
w = 1 - pt.exp(pm.Weibull.logcdf(t, k, lam))
shape = (*w.shape[:-1], w.shape[-1] + 1)
padded_w = pt.ones(shape, dtype=w.dtype)
padded_w = pt.set_subtensor(padded_w[..., 1:], w)
w = pt.cumprod(padded_w, axis=-1)
else:
raise ValueError(f"Wrong WeibullType: {type}, expected of WeibullType")
return batched_convolution(x, w, axis=axis)


def logistic_saturation(x, lam: Union[npt.NDArray[np.float_], float] = 0.5):
"""Logistic saturation transformation.

Expand Down
90 changes: 90 additions & 0 deletions tests/mmm/test_transformers.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
from contextlib import nullcontext as does_not_raise

import numpy as np
import pytensor
import pytensor.tensor as pt
import pytest
import scipy as sp
from pytensor.tensor.var import TensorVariable

from pymc_marketing.mmm.transformers import (
ConvMode,
WeibullType,
batched_convolution,
delayed_adstock,
geometric_adstock,
logistic_saturation,
tanh_saturation,
weibull_adstock,
)


Expand Down Expand Up @@ -166,6 +171,91 @@ def test_delayed_adstock_vectorized(self, dummy_design_matrix):
assert y.shape == x.shape
np.testing.assert_almost_equal(actual=y, desired=ys, decimal=12)

@pytest.mark.parametrize(
"x, lam, k, l_max",
[
(np.zeros(shape=(100)), 1, 1, 4),
(np.ones(shape=(100)), 0.3, 0.5, 10),
(np.ones(shape=(100)), 0.7, 1, 100),
(np.zeros(shape=(100)), 0.2, 0.2, 5),
(np.ones(shape=(100)), 0.5, 0.8, 7),
(np.linspace(start=0.0, stop=1.0, num=50), 0.8, 1.5, 3),
(np.linspace(start=0.0, stop=1.0, num=50), 0.8, 1, 50),
],
)
def test_weibull_pdf_adstock(self, x, lam, k, l_max):
y = weibull_adstock(x=x, lam=lam, k=k, l_max=l_max, type=WeibullType.PDF).eval()

assert np.all(np.isfinite(y))
w = sp.stats.weibull_min.pdf(np.arange(l_max) + 1, c=k, scale=lam)
w = (w - np.min(w)) / (np.max(w) - np.min(w))
sp_y = batched_convolution(x, w).eval()

np.testing.assert_almost_equal(y, sp_y)

@pytest.mark.parametrize(
"x, lam, k, l_max",
[
(np.zeros(shape=(100)), 1, 1, 4),
(np.ones(shape=(100)), 0.3, 0.5, 10),
(np.ones(shape=(100)), 0.7, 1, 100),
(np.zeros(shape=(100)), 0.2, 0.2, 5),
(np.ones(shape=(100)), 0.5, 0.8, 7),
(np.linspace(start=0.0, stop=1.0, num=50), 0.8, 1.5, 3),
(np.linspace(start=0.0, stop=1.0, num=50), 0.8, 1, 50),
],
)
def test_weibull_cdf_adsotck(self, x, lam, k, l_max):
y = weibull_adstock(x=x, lam=lam, k=k, l_max=l_max, type=WeibullType.CDF).eval()

assert np.all(np.isfinite(y))
w = 1 - sp.stats.weibull_min.cdf(np.arange(l_max) + 1, c=k, scale=lam)
w = sp.cumprod(np.concatenate([[1], w]))
sp_y = batched_convolution(x, w).eval()
np.testing.assert_almost_equal(y, sp_y)

@pytest.mark.parametrize(
"type",
[
WeibullType.PDF,
WeibullType.CDF,
],
)
def test_weibull_adstock_vectorized(self, type, dummy_design_matrix):
x = dummy_design_matrix.copy()
x_tensor = pt.as_tensor_variable(x)
lam = [0.9, 0.33, 0.5, 0.1, 1.0]
lam_tensor = pt.as_tensor_variable(lam)
k = [0.8, 0.2, 0.6, 0.4, 1.0]
k_tensor = pt.as_tensor_variable(k)
y = weibull_adstock(
x=x_tensor, lam=lam_tensor, k=k_tensor, l_max=12, type=type
).eval()

y_tensors = [
weibull_adstock(
x=x_tensor[:, i], lam=lam_tensor[i], k=k_tensor[i], l_max=12, type=type
)
for i in range(x.shape[1])
]
ys = np.concatenate([y_t.eval()[..., None] for y_t in y_tensors], axis=1)
assert y.shape == x.shape
np.testing.assert_almost_equal(actual=y, desired=ys, decimal=12)

@pytest.mark.parametrize(
"type, expectation",
[
("PDF", does_not_raise()),
("CDF", does_not_raise()),
("PMF", pytest.raises(ValueError)),
(WeibullType.PDF, does_not_raise()),
(WeibullType.CDF, does_not_raise()),
],
)
def test_weibull_adstock_type(self, type, expectation):
with expectation:
weibull_adstock(x=np.ones(shape=(100)), lam=0.5, k=0.5, l_max=10, type=type)


class TestSaturationTransformers:
def test_logistic_saturation_lam_zero(self):
Expand Down