Skip to content

Commit

Permalink
Make optimization based on the entire posterior and not on the margin…
Browse files Browse the repository at this point in the history
…al mean parameters. (#1151)

* New Budget Optimizer with Risk Assessment

* Final notebooks

* Notebook

* Notebook index

* Updating tests

* Title change

* Modify text

* Notebook and adjustments

Co-Authored-By: Will Dean <[email protected]>

* Hints

* Unit tests

* pre-commit

* notebook possible issue

* seed

* pre commit

* Changes based on feedback

* Solving unit tests

* Final notebook

* Last notebook adjustment to utility

* pre-commit

* Updating notebooks

* support for newer pydantic message

* support for newer pydantic message for adstock

* Requested changes

* Restoring version from main example notebook

---------

Co-authored-by: Juan Orduz <[email protected]>
Co-authored-by: Will Dean <[email protected]>
Co-authored-by: Will Dean <[email protected]>
  • Loading branch information
4 people authored Dec 11, 2024
1 parent 7a7cf1d commit f4d5030
Show file tree
Hide file tree
Showing 11 changed files with 4,539 additions and 1,126 deletions.
1 change: 1 addition & 0 deletions docs/source/notebooks/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Here you will find a collection of examples and how-to guides for using PyMC-Mar

mmm/mmm_example
mmm/mmm_budget_allocation_example
mmm/mmm_allocation_assessment
mmm/mmm_lift_test
mmm/mmm_counterfactuals
mmm/mmm_tvp_example
Expand Down
1,351 changes: 1,351 additions & 0 deletions docs/source/notebooks/mmm/mmm_allocation_assessment.ipynb

Large diffs are not rendered by default.

314 changes: 185 additions & 129 deletions docs/source/notebooks/mmm/mmm_budget_allocation_example.ipynb

Large diffs are not rendered by default.

2,483 changes: 1,587 additions & 896 deletions docs/source/notebooks/mmm/mmm_case_study.ipynb

Large diffs are not rendered by default.

Binary file modified docs/source/notebooks/mmm/model.nc
Binary file not shown.
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ dependencies:
- pytest-cov==3.0.0
- pytest-mock
- mlflow
- hatch
178 changes: 136 additions & 42 deletions pymc_marketing/mmm/budget_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,21 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Budget optimization module."""

import warnings
from typing import Any
from typing import Any, ClassVar

import numpy as np
import pytensor.tensor as pt
from pydantic import BaseModel, ConfigDict, Field
from pytensor import function
from scipy.optimize import minimize

from pymc_marketing.mmm.components.adstock import AdstockTransformation
from pymc_marketing.mmm.components.saturation import SaturationTransformation
from pymc_marketing.mmm.utility import UtilityFunctionType, average_response


class MinimizeException(Exception):
Expand Down Expand Up @@ -53,9 +57,15 @@ class BudgetOptimizer(BaseModel):
The number of time units.
parameters : dict
A dictionary of parameters for each channel.
scales : np.ndarray
The scale parameter for each channel variable.
response_scaler : float, optional
The scaling factor for the target response variable. Default is 1.
adstock_first : bool, optional
Whether to apply adstock transformation first or saturation transformation first.
Default is True.
utility_function : UtilityFunctionType, optional
The utility function to maximize. Default is the mean of the response distribution.
"""

Expand All @@ -70,19 +80,82 @@ class BudgetOptimizer(BaseModel):
gt=0,
description="The number of time units at time granularity which the budget is to be allocated.",
)
parameters: dict[str, dict[str, dict[str, float]]] = Field(
parameters: dict[str, Any] = Field(
..., description="A dictionary of parameters for each channel."
)
scales: np.ndarray = Field(
..., description="The scale parameter for each channel variable"
)
response_scaler: float = Field(
default=1.0,
description="Scaling factor for the target response variable. Defaults to 1.",
)
adstock_first: bool = Field(
True,
description="Whether to apply adstock transformation first or saturation transformation first.",
)
model_config = ConfigDict(arbitrary_types_allowed=True)

def objective(self, budgets: list[float]) -> float:
response_scaler_sym: pt.TensorVariable = Field(
default=None,
exclude=True,
repr=False,
description="Response scaler tensor variable.",
)

utility_function: UtilityFunctionType = Field(
default=average_response,
description="Utility function to maximize.",
arbitrary_types_allowed=True,
)

DEFAULT_MINIMIZE_KWARGS: ClassVar[dict] = {
"method": "SLSQP",
"options": {"ftol": 1e-9, "maxiter": 1_000},
}

def __init__(self, **data):
super().__init__(**data)
self.response_scaler_sym = pt.as_tensor_variable(self.response_scaler)
self._compiled_functions = {}
self._compile_objective_and_grad()

def _compile_objective_and_grad(self):
"""Compile the objective function and its gradient using symbolic computation."""
budgets_sym = pt.vector("budgets")

_response_distribution = self._estimate_response(budgets=budgets_sym)

response_distribution = _response_distribution.sum(axis=(2, 3)).flatten()

objective_value = -self.utility_function(
samples=response_distribution, budgets=budgets_sym
)

# Compute gradient symbolically
grad_obj = pt.grad(objective_value, budgets_sym)

# Compile the functions
utility_func = function([budgets_sym], objective_value)
grad_func = function([budgets_sym], grad_obj)

# Cache the compiled functions
self._compiled_functions[self.utility_function] = {
"objective": utility_func,
"gradient": grad_func,
}

def _objective(self, budgets: pt.TensorVariable) -> float:
"""Objective function for the budget optimization."""
return self._compiled_functions[self.utility_function]["objective"](
budgets
).item()

def _gradient(self, budgets: pt.TensorVariable) -> pt.TensorVariable:
"""Gradient of the objective function."""
return self._compiled_functions[self.utility_function]["gradient"](budgets)

def _estimate_response(self, budgets: list[float]) -> np.ndarray:
"""Calculate the total response during a period of time given the budgets.
It considers the saturation and adstock transformations.
Expand All @@ -94,36 +167,54 @@ def objective(self, budgets: list[float]) -> float:
Returns
-------
float
The negative total response value.
np.ndarray
The estimated response distribution.
"""
total_response = 0
first_transform, second_transform = (
(self.adstock, self.saturation)
if self.adstock_first
else (self.saturation, self.adstock)
)
for idx, (_channel, params) in enumerate(self.parameters.items()):
budget = budgets[idx] / self.scales[idx]
first_params = (
params["adstock_params"]
if self.adstock_first
else params["saturation_params"]
)
second_params = (
params["saturation_params"]
if self.adstock_first
else params["adstock_params"]
)
spend = np.full(self.num_periods, budget)
spend_extended = np.concatenate([spend, np.zeros(self.adstock.l_max)])
transformed_spend = second_transform.function(
x=first_transform.function(x=spend_extended, **first_params),
**second_params,
).eval()
total_response += np.sum(transformed_spend)
return -total_response

# Convert scales to a tensor variable when needed
budget = budgets / pt.as_tensor_variable(self.scales)

# Convert parameters to tensor variables if necessary
def convert_params(params):
return {
k: (pt.as_tensor_variable(v) if isinstance(v, np.ndarray) else v)
for k, v in params.items()
}

first_params = convert_params(
self.parameters["adstock_params"]
if self.adstock_first
else self.parameters["saturation_params"]
)
second_params = convert_params(
self.parameters["saturation_params"]
if self.adstock_first
else self.parameters["adstock_params"]
)

spend = pt.tile(budget, (self.num_periods, 1))
spend_extended = pt.concatenate(
[spend, pt.zeros((self.adstock.l_max, spend.shape[1]))], axis=0
)

_response = first_transform.function(x=spend_extended, **first_params)

for param_name, param_value in second_params.items():
if isinstance(param_value, pt.TensorVariable) and param_value.ndim == 3:
param_value = param_value.dimshuffle(0, 1, "x", 2)
second_params[param_name] = param_value

# Multiply by the response_scaler_sym
return (
second_transform.function(x=_response, **second_params)
* self.response_scaler_sym
)

def allocate_budget(
self,
Expand All @@ -136,16 +227,13 @@ def allocate_budget(
The default budget bounds are (0, total_budget) for each channel.
The default constraint is the sum of all budgets should be equal to the total budget.
The default constraint ensures the sum of all budgets equals the total budget.
The optimization is done using the Sequential Least Squares Quadratic Programming (SLSQP) method
and it's constrained such that:
1. The sum of budgets across all channels equals the total available budget.
2. The budget allocated to each individual channel lies within its specified range.
The purpose is to maximize the total expected objective based on the inequality
and equality constraints.
Parameters
----------
total_budget : float
Expand All @@ -161,18 +249,21 @@ def allocate_budget(
Returns
-------
tuple[dict[str, float], float]
The optimal budgets for each channel and the negative total response value.
The optimal budgets for each channel and the optimization result object.
Raises
------
Exception
MinimizeException
If the optimization fails, an exception is raised with the reason for the failure.
"""
if budget_bounds is None:
budget_bounds = {channel: (0, total_budget) for channel in self.parameters}
budget_bounds = {
channel: (0, total_budget) for channel in self.parameters["channels"]
}
warnings.warn(
"No budget bounds provided. Using default bounds (0, total_budget) for each channel.",
UserWarning,
stacklevel=2,
)
elif not isinstance(budget_bounds, dict):
Expand All @@ -182,43 +273,46 @@ def allocate_budget(
constraints = {"type": "eq", "fun": lambda x: np.sum(x) - total_budget}
warnings.warn(
"Using default equality constraint: The sum of all budgets should be equal to the total budget.",
UserWarning,
stacklevel=2,
)
elif not isinstance(custom_constraints, dict):
raise TypeError("`custom_constraints` should be a dictionary.")
else:
constraints = custom_constraints

num_channels = len(self.parameters.keys())
num_channels = len(self.parameters["channels"])
initial_guess = np.ones(num_channels) * total_budget / num_channels
bounds = [
(
(budget_bounds[channel][0], budget_bounds[channel][1])
if channel in budget_bounds
else (0, total_budget)
)
for channel in self.parameters
for channel in self.parameters["channels"]
]

if minimize_kwargs is None:
minimize_kwargs = {
"method": "SLSQP",
"options": {"ftol": 1e-9, "maxiter": 1_000},
}
minimize_kwargs = self.DEFAULT_MINIMIZE_KWARGS.copy()
else:
minimize_kwargs = {**self.DEFAULT_MINIMIZE_KWARGS, **minimize_kwargs}

result = minimize(
fun=self.objective,
fun=self._objective,
x0=initial_guess,
bounds=bounds,
constraints=constraints,
jac=self._gradient,
**minimize_kwargs,
)

if result.success:
optimal_budgets = {
name: budget
for name, budget in zip(self.parameters.keys(), result.x, strict=False)
for name, budget in zip(
self.parameters["channels"], result.x, strict=False
)
}
return optimal_budgets, -result.fun
return optimal_budgets, result
else:
raise MinimizeException(f"Optimization failed: {result.message}")
Loading

0 comments on commit f4d5030

Please sign in to comment.