Skip to content

Commit

Permalink
Added holt winters to helpers (#1438)
Browse files Browse the repository at this point in the history
* Added holt winters

* Edited helper functions to have more functionality

* Edited helper functions to have more functionality

* Reformat Code

---------

Co-authored-by: annajgibson <[email protected]>
  • Loading branch information
HuuHieuDo and annajgibson authored Oct 6, 2023
1 parent 7cfeb61 commit f44b002
Showing 1 changed file with 119 additions and 66 deletions.
185 changes: 119 additions & 66 deletions scripts/helpers/time_series_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,27 @@
Functions to support development of time series analytics work.
"""
import datetime
from typing import Tuple, Union

import io
import boto3
import numpy as np
from matplotlib import pyplot as plt
import pandas as pd
from pmdarima import auto_arima
from prophet import Prophet
from prophet.diagnostics import cross_validation, performance_metrics

import pyspark.pandas as ps
from sklearn.metrics import mean_squared_error
from statsmodels.tsa.exponential_smoothing.ets import ETSModel
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.statespace.sarimax import SARIMAX

import pandas as pd

from statsmodels.tsa.exponential_smoothing.ets import ETSModel

from statsmodels.tsa.holtwinters import ExponentialSmoothing


def get_train_test_subsets(time_series: ps.DataFrame, periods: int) -> tuple[ps.DataFrame, ps.DataFrame]:
def get_train_test_subsets(time_series: ps.DataFrame, periods: int) -> Tuple[ps.DataFrame, ps.DataFrame]:
""" Splits dataset into train and test datasets. Test subset is determined by periods which is the number
periods to test the model with. Returned dataframes contain unique rows, with no overlap.
Expand All @@ -32,13 +35,13 @@ def get_train_test_subsets(time_series: ps.DataFrame, periods: int) -> tuple[ps.
train (Dataframe): Dataframe with most recent n periods removed
test (Dataframe): Dataframe with most recent n periods only.
"""
train = time_series[: -int(periods)]
test = time_series[-int(periods):]
train = time_series[: -periods]
test = time_series[-periods:]
return train, test


def get_best_arima_model(y: ps.DataFrame, start_q: int, start_p: int, max_iter: int, m: int,
d=None, D=None, **kwargs: dict) -> tuple[tuple, tuple]:
d=None, D=None, **kwargs: dict) -> Tuple[tuple, tuple]:
"""
Uses the Auto Arima algorithm from pmdarima to determine the best parameters for order and
seasonal order in Auto Regression models. Function expects time series dataset with dates set as index, and target
Expand Down Expand Up @@ -71,10 +74,10 @@ def get_best_arima_model(y: ps.DataFrame, start_q: int, start_p: int, max_iter:


def test_sarimax(train: ps.DataFrame, test: ps.DataFrame, order: tuple,
seasonal_order: tuple, exog=None) -> tuple[dict, ps.DataFrame]:
seasonal_order: tuple, exog=None) -> Tuple[dict, ps.DataFrame]:
"""
Function that fits a SARIMAX model and calculates evaluation metrics RMSE, MAE, AIC and BIC.
See https://www.statsmodels.org/dev/generated/statsmodels.tsa.statespace.sarimax.SARIMAX.html
See https://www.statsmodels.org/dev/generated/statsmodels.tsa.statespace.sarimax.SARIMAX.html#statsmodels.tsa.statespace.sarimax.SARIMAX
for full documentation.
Args:
Expand Down Expand Up @@ -110,7 +113,7 @@ def test_sarimax(train: ps.DataFrame, test: ps.DataFrame, order: tuple,
return sarimax_metrics, predictions


def forecast_with_sarimax(train: ps.DataFrame, order: tuple, seasonal_order: tuple, steps: "int|str|datetime",
def forecast_with_sarimax(train: ps.DataFrame, order: tuple, seasonal_order: tuple, steps: tuple[int, str, datetime],
exog=None) -> ps.Series:
"""
Trains SARIMAX model with full dataset and produces a forcast for number periods (steps) specified.
Expand Down Expand Up @@ -145,7 +148,6 @@ def reshape_time_series_data(pdf: ps.DataFrame, date_col: str, var_cols: list, d
pdf (Dataframe): Dataframe containing date columns and features
date_col (str): Name of the column containing datetime
var_cols (list): List of features to keep within the reshaped dataframe
dateformat (str): Format of date using strftime format codes e.g. '%d/%m/%y'
Returns:
Reshaped dataframe
Expand All @@ -158,7 +160,6 @@ def reshape_time_series_data(pdf: ps.DataFrame, date_col: str, var_cols: list, d
pdf = pdf.set_index('ds').sort_index()
if len(var_cols) == 1:
pdf = pdf.rename(columns={var_cols[0]: 'y'})
pdf = pdf.astype({'y': float})
elif len(var_cols) > 1:
for v_col in var_cols:
counter = 0
Expand All @@ -169,29 +170,32 @@ def reshape_time_series_data(pdf: ps.DataFrame, date_col: str, var_cols: list, d
return pdf


def get_seasonal_decomposition(x: np.array, model: str, period: int) -> tuple[ps.Series, ps.Series, ps.Series]:
def get_seasonal_decomposition(x: ps.DataFrame, model: str, period: int) -> Tuple[ps.DataFrame, ps.Series,
ps.Series, ps.Series]:
"""
Drops any NAs from input dataframe. Extracts the trend, seasonality and residuals from dataset.
Args:
x (array): Array containing time series and variable.
x (Dataframe): Dataframe indexed by date.
model (str): Either of “additive” or “multiplicative”. Type of seasonal component.
period (int): Number of periods within the season e.g. 52 weeks in a year (the season).
Returns:
x_reshaped (Dataframe): reshaped dataframe with NAs dropped.
trend (Series): The upward and/or downward change in the values in the dataset.
seasonal (Series) Short term cyclical repeating pattern in data.
residual (Series) Random variation in the data once trend and seasonality removed.
"""
decompose = seasonal_decompose(x=x, model=model, period=int(period))
x_reshaped = x.dropna
decompose = seasonal_decompose(x=x_reshaped, model=model, period=period)
trend = decompose.trend
seasonality = decompose.seasonal
seasonal = decompose.seasonal
residual = decompose.resid
return trend, seasonality, residual
return x_reshaped, trend, seasonal, residual


def plot_seasonal_decomposition(x: np.array, trend: ps.Series, seasonal: ps.Series, residual: ps.Series,
bucket: str, fname=None, show=False) -> None:
"""Options to plot seasonal decomposition data as well as write a PNG to file.
def plot_seasonal_decomposition(x: ps.DataFrame, trend: ps.Series, seasonal: ps.Series, residual: ps.Series,
fname=None, show=False) -> None:
"""Options to plot seasonal decomposition data as well as save a PNG to file.
If fname=None, PNG will not be saved. Returns None.
"""
label_loc = 'upper left'
Expand All @@ -203,33 +207,28 @@ def plot_seasonal_decomposition(x: np.array, trend: ps.Series, seasonal: ps.Seri
axes[0].legend(loc=label_loc)
axes[1].plot(trend, label='Trend')
axes[1].legend(loc=label_loc)
axes[2].plot(seasonal, label='Seasonality')
axes[2].plot(seasonal, label='Cyclic')
axes[2].legend(loc=label_loc)
axes[3].plot(residual, label='Residuals')
axes[3].legend(loc=label_loc)
if fname:
plt.tight_layout()
img_data = io.BytesIO()
plt.savefig(img_data)
img_data.seek(0)
s3 = boto3.client('s3')
s3.put_object(Bucket=bucket, Key=fname, Body=img_data)
plt.savefig(fname=fname)
if show:
plt.show()


def plot_time_series_data(x: ps.DataFrame, var_dict: dict, title: str, xlabel: str, ylabel: str,
bucket: str, fname=None, show=False) -> None:
"""Options to plot time series data as well as write a PNG to file.
fname=None, show=False) -> None:
"""Options to plot time series data as well as save a PNG to file.
Args:
x (Dataframe): Dataframe containing time series data, indexed by datetime.
var_dict (dict): dict containing label and variable names e.g. {'Number people': 'y'}
title (str): Title of plot.
xlabel (str): Label of x axis.
ylabel (str): label of y axis.
bucket (): Name of bucket to write to.
fname (str): Path and filename of PNG. If None, PNG will not be saved.
fname (str): Filename of PNG. If None, PNG will not be saved.
show (bool): Option to show plot in console.
Returns:
Expand All @@ -245,18 +244,14 @@ def plot_time_series_data(x: ps.DataFrame, var_dict: dict, title: str, xlabel: s
plt.legend()
if fname:
plt.tight_layout()
img_data = io.BytesIO()
plt.savefig(img_data)
img_data.seek(0)
s3 = boto3.client('s3')
s3.put_object(Bucket=bucket, Key=fname, Body=img_data)
plt.savefig(fname=fname)
if show:
plt.show()


def plot_pred_forecast(train: ps.DataFrame, test: ps.DataFrame, predictions: ps.DataFrame, forecast: ps.Series,
train_label: str, test_label: str, title: str, suptitle: str, ylabel: str, xlabel: str,
metrics: dict, bucket: str, fname=None, show=False, ) -> None:
metrics: dict, fname=None, show=False) -> None:
"""
Args:
Expand All @@ -271,8 +266,7 @@ def plot_pred_forecast(train: ps.DataFrame, test: ps.DataFrame, predictions: ps.
ylabel (str): y axis label.
xlabel (str): x axis label.
metrics (dict): Dictionary containing model performance metrics. Any length.
bucket (str): Name of bucket to write to.
fname (str): Default is None. If not empty, then will be the folder and file path.
fname (str): Default is None. If not empty, then will be the name of the file.
show (bool): Boolean option to show chart in console.
Returns:
Expand All @@ -291,11 +285,7 @@ def plot_pred_forecast(train: ps.DataFrame, test: ps.DataFrame, predictions: ps.
plt.xlabel(xlabel)
if fname:
plt.tight_layout()
img_data = io.BytesIO()
plt.savefig(img_data)
img_data.seek(0)
s3 = boto3.client('s3')
s3.put_object(Bucket=bucket, Key=fname, Body=img_data)
plt.savefig(fname=fname)
if show:
plt.show()

Expand All @@ -305,29 +295,32 @@ def apply_prophet(df, periods, horizon):
"""
m = Prophet()
m.fit(df)
future = m.make_future_dataframe(periods=int(periods))
future = m.make_future_dataframe(periods=periods)
future.tail()
forecast = m.predict(future)
cross_val = cross_validation(m, horizon=horizon)
metrics = performance_metrics(cross_val)
return forecast, cross_val, metrics


def get_start_end_date(dataframe, period, forecast_count):
def get_start_end_date(dataframe: pd.DataFrame, period: str, forecast_count: int) -> datetime:
"""
Args:
dataframe (Dataframe): Dataframe containing training timeseries dataset.
Dataframe (Dataframe): Dataframe containing training timeseries dataset.
period (string): Description of the Period. "M" for example,
forecast_count (Int): Amount of data points you want to forecast for
Returns:
Start Date (Datetime),
End Date (Datetime)
Start Date (Date),
End Date (Date)
"""

max_index = dataframe.index.max()

print(max_index)

date_maker = {
"M": [max_index + pd.DateOffset(months=1), max_index + pd.DateOffset(months=forecast_count)],
"D": [max_index + pd.DateOffset(months=1), max_index + pd.DateOffset(months=forecast_count)],
Expand All @@ -338,36 +331,96 @@ def get_start_end_date(dataframe, period, forecast_count):

start_date = date_maker.get(period)[0]
end_date = date_maker.get(period)[1]

return start_date, end_date


def forecast_ets(dataframe, start_date, end_date, seasonal=None, damped_trend=False, seasonal_periods=None):
def forecast_ets(dataframe: pd.DataFrame, start_date: str, end_date: str, seasonal_period: str = "none",
error: str = "add", trend: str = "add", damped_trend: bool = False) -> pd.DateFrame:
"""
Args:
dataframe (Dataframe): Dataframe containing training timeseries dataset.
Dataframe (dataframe): Dataframe containing training timeseries dataset.
start_date (string): Start date of the Forecast,
end_date (string): End date of the Forecast
seasonal (String): Trend Component model. Optional. "Add", "mul" or None (default)
damped_trend (Bool): Dampen the trend component. Default is False
seasonal_periods (int): The number of periods in a complete seasonal cycle for seasonal
(Holt-Winters) models. For example, 4 for quarterly data with an annual cycle
or 7 for daily data with a weekly cycle. Required if seasonal is not None.
seasonal_period (String) Optional: Period for the data. "M" / "W" / "Q" this will fill in the parameters for the model based on very commonly used values, not including days. Default is "none", use "none" if the data is not seasonal
error (String): The error model. “add” (default) or “mul”.
trend (String): The trend component model. “add” (default), “mul”, or None.
damped_trend (Bool): Whether an included trend component is damped. Default is False.
Returns:
Forecast Results (Dataframe),
pdf (dataframe): Pandas Dataframe containing the forecast results
"""
https://www.statsmodels.org/dev/generated/statsmodels.tsa.exponential_smoothing.ets.ETSModel.html
"""

print(f'Get Prediction with: {start_date} to {end_date}')
period_dictionary = {
"M": [12, "add"],
"W": [52, "add"],
"Q": [4, "add"],
"none": [None, None]
}

seasonal = period_dictionary.get(seasonal_period)[1]
seasonal_periods = period_dictionary.get(seasonal_period)[0]

model = ETSModel(
dataframe,
error="add",
trend="add",
seasonal=seasonal,
error=error,
trend=trend,
damped_trend=damped_trend,
seasonal_periods=seasonal_periods)
seasonal=seasonal,
seasonal_periods=seasonal_periods,
)
fit = model.fit()
pred = fit.get_prediction(start=start_date, end=end_date)
df = pred.summary_frame(alpha=0.05)
return df

pred = fit.get_prediction(start=start_date, end=end_date).summary_frame()

return pred


def forecast_holt_winters_ets(dataframe: pd.DataFrame, forecast_count: int, seasonal_period: str = "none",
trend: str = "add", use_boxcox: bool = False,
initialization_method: str = "estimated") -> pd.DateFrame:
"""
Args:
Dataframe (dataframe): Dataframe containing training timeseries dataset. Must be equally spaced with Date in the Index
forecast_count (int): Amount of data points to forecast
seasonal_period (String): Period for the data. "M" / "W" / "Q" this will fill in the parameters for the model based on very commonly used values, not including days. Default is "none", use "none" if the data is not seasonal
trend (String): The trend component model. “add” (default), “mul”, or None.
use_boxcox (bool): Should the Box-Cox transform be applied to the data first? If ‘log’ then apply the log. If float then use the value as lambda. Defaults to False
initialization_method (String): Method for initialize the recursions. One of:None,‘estimated’ (Default),‘heuristic’,‘legacy-heuristic’,‘known’
Returns:
pdf (dataframe): Pandas Dataframe containing the forecast results
Delivers a generic "commonly used" forecast for the selected period
https://www.statsmodels.org/stable/generated/statsmodels.tsa.holtwinters.ExponentialSmoothing.html
"""

period_dictionary = {
"M": [12, "add"],
"W": [52, "add"],
"Q": [4, "add"],
"none": [None, None]
}

seasonal = period_dictionary.get(seasonal_period)[1]
seasonal_periods = period_dictionary.get(seasonal_period)[0]

model = ExponentialSmoothing(
dataframe,
seasonal_periods=seasonal_periods,
trend=trend,
seasonal=seasonal,
use_boxcox=use_boxcox,
initialization_method=initialization_method
).fit()

forecast = model.forecast(forecast_count)

pdf = pd.DataFrame()
pdf["mean"] = forecast

return pdf

0 comments on commit f44b002

Please sign in to comment.