Skip to content

Commit

Permalink
Merge branch 'main' into lbh-huudo-morehelpers
Browse files Browse the repository at this point in the history
  • Loading branch information
HuuHieuDo authored Sep 13, 2023
2 parents 64bfdef + 944a0f8 commit c9268b3
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 102 deletions.
93 changes: 53 additions & 40 deletions scripts/helpers/time_series_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@
Functions to support development of time series analytics work.
"""
import datetime
from typing import Tuple, Union

import io
import boto3
import numpy as np
from matplotlib import pyplot as plt
import pandas as pd
from pmdarima import auto_arima
from prophet import Prophet
from prophet.diagnostics import cross_validation, performance_metrics

import pyspark.pandas as ps
from sklearn.metrics import mean_squared_error
from statsmodels.tsa.exponential_smoothing.ets import ETSModel
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.statespace.sarimax import SARIMAX

Expand All @@ -21,9 +24,7 @@

from statsmodels.tsa.holtwinters import ExponentialSmoothing



def get_train_test_subsets(time_series: ps.DataFrame, periods: int) -> Tuple[ps.DataFrame, ps.DataFrame]:
def get_train_test_subsets(time_series: ps.DataFrame, periods: int) -> tuple[ps.DataFrame, ps.DataFrame]:
""" Splits dataset into train and test datasets. Test subset is determined by periods which is the number
periods to test the model with. Returned dataframes contain unique rows, with no overlap.
Expand All @@ -36,13 +37,13 @@ def get_train_test_subsets(time_series: ps.DataFrame, periods: int) -> Tuple[ps.
train (Dataframe): Dataframe with most recent n periods removed
test (Dataframe): Dataframe with most recent n periods only.
"""
train = time_series[: -periods]
test = time_series[-periods:]
train = time_series[: -int(periods)]
test = time_series[-int(periods):]
return train, test


def get_best_arima_model(y: ps.DataFrame, start_q: int, start_p: int, max_iter: int, m: int,
d=None, D=None, **kwargs: dict) -> Tuple[tuple, tuple]:
d=None, D=None, **kwargs: dict) -> tuple[tuple, tuple]:
"""
Uses the Auto Arima algorithm from pmdarima to determine the best parameters for order and
seasonal order in Auto Regression models. Function expects time series dataset with dates set as index, and target
Expand Down Expand Up @@ -75,10 +76,10 @@ def get_best_arima_model(y: ps.DataFrame, start_q: int, start_p: int, max_iter:


def test_sarimax(train: ps.DataFrame, test: ps.DataFrame, order: tuple,
seasonal_order: tuple, exog=None) -> Tuple[dict, ps.DataFrame]:
seasonal_order: tuple, exog=None) -> tuple[dict, ps.DataFrame]:
"""
Function that fits a SARIMAX model and calculates evaluation metrics RMSE, MAE, AIC and BIC.
See https://www.statsmodels.org/dev/generated/statsmodels.tsa.statespace.sarimax.SARIMAX.html#statsmodels.tsa.statespace.sarimax.SARIMAX
See https://www.statsmodels.org/dev/generated/statsmodels.tsa.statespace.sarimax.SARIMAX.html
for full documentation.
Args:
Expand Down Expand Up @@ -114,7 +115,7 @@ def test_sarimax(train: ps.DataFrame, test: ps.DataFrame, order: tuple,
return sarimax_metrics, predictions


def forecast_with_sarimax(train: ps.DataFrame, order: tuple, seasonal_order: tuple, steps: tuple[int, str, datetime],
def forecast_with_sarimax(train: ps.DataFrame, order: tuple, seasonal_order: tuple, steps: "int|str|datetime",
exog=None) -> ps.Series:
"""
Trains SARIMAX model with full dataset and produces a forcast for number periods (steps) specified.
Expand Down Expand Up @@ -149,6 +150,7 @@ def reshape_time_series_data(pdf: ps.DataFrame, date_col: str, var_cols: list, d
pdf (Dataframe): Dataframe containing date columns and features
date_col (str): Name of the column containing datetime
var_cols (list): List of features to keep within the reshaped dataframe
dateformat (str): Format of date using strftime format codes e.g. '%d/%m/%y'
Returns:
Reshaped dataframe
Expand All @@ -161,6 +163,7 @@ def reshape_time_series_data(pdf: ps.DataFrame, date_col: str, var_cols: list, d
pdf = pdf.set_index('ds').sort_index()
if len(var_cols) == 1:
pdf = pdf.rename(columns={var_cols[0]: 'y'})
pdf = pdf.astype({'y': float})
elif len(var_cols) > 1:
for v_col in var_cols:
counter = 0
Expand All @@ -171,32 +174,29 @@ def reshape_time_series_data(pdf: ps.DataFrame, date_col: str, var_cols: list, d
return pdf


def get_seasonal_decomposition(x: ps.DataFrame, model: str, period: int) -> Tuple[ps.DataFrame, ps.Series,
ps.Series, ps.Series]:
def get_seasonal_decomposition(x: np.array, model: str, period: int) -> tuple[ps.Series, ps.Series, ps.Series]:
"""
Drops any NAs from input dataframe. Extracts the trend, seasonality and residuals from dataset.
Args:
x (Dataframe): Dataframe indexed by date.
x (array): Array containing time series and variable.
model (str): Either of “additive” or “multiplicative”. Type of seasonal component.
period (int): Number of periods within the season e.g. 52 weeks in a year (the season).
Returns:
x_reshaped (Dataframe): reshaped dataframe with NAs dropped.
trend (Series): The upward and/or downward change in the values in the dataset.
seasonal (Series) Short term cyclical repeating pattern in data.
residual (Series) Random variation in the data once trend and seasonality removed.
"""
x_reshaped = x.dropna
decompose = seasonal_decompose(x=x_reshaped, model=model, period=period)
decompose = seasonal_decompose(x=x, model=model, period=int(period))
trend = decompose.trend
seasonal = decompose.seasonal
seasonality = decompose.seasonal
residual = decompose.resid
return x_reshaped, trend, seasonal, residual
return trend, seasonality, residual


def plot_seasonal_decomposition(x: ps.DataFrame, trend: ps.Series, seasonal: ps.Series, residual: ps.Series,
fname=None, show=False) -> None:
"""Options to plot seasonal decomposition data as well as save a PNG to file.
def plot_seasonal_decomposition(x: np.array, trend: ps.Series, seasonal: ps.Series, residual: ps.Series,
bucket: str, fname=None, show=False) -> None:
"""Options to plot seasonal decomposition data as well as write a PNG to file.
If fname=None, PNG will not be saved. Returns None.
"""
label_loc = 'upper left'
Expand All @@ -208,28 +208,33 @@ def plot_seasonal_decomposition(x: ps.DataFrame, trend: ps.Series, seasonal: ps.
axes[0].legend(loc=label_loc)
axes[1].plot(trend, label='Trend')
axes[1].legend(loc=label_loc)
axes[2].plot(seasonal, label='Cyclic')
axes[2].plot(seasonal, label='Seasonality')
axes[2].legend(loc=label_loc)
axes[3].plot(residual, label='Residuals')
axes[3].legend(loc=label_loc)
if fname:
plt.tight_layout()
plt.savefig(fname=fname)
img_data = io.BytesIO()
plt.savefig(img_data)
img_data.seek(0)
s3 = boto3.client('s3')
s3.put_object(Bucket=bucket, Key=fname, Body=img_data)
if show:
plt.show()


def plot_time_series_data(x: ps.DataFrame, var_dict: dict, title: str, xlabel: str, ylabel: str,
fname=None, show=False) -> None:
"""Options to plot time series data as well as save a PNG to file.
bucket: str, fname=None, show=False) -> None:
"""Options to plot time series data as well as write a PNG to file.
Args:
x (Dataframe): Dataframe containing time series data, indexed by datetime.
var_dict (dict): dict containing label and variable names e.g. {'Number people': 'y'}
title (str): Title of plot.
xlabel (str): Label of x axis.
ylabel (str): label of y axis.
fname (str): Filename of PNG. If None, PNG will not be saved.
bucket (): Name of bucket to write to.
fname (str): Path and filename of PNG. If None, PNG will not be saved.
show (bool): Option to show plot in console.
Returns:
Expand All @@ -245,14 +250,18 @@ def plot_time_series_data(x: ps.DataFrame, var_dict: dict, title: str, xlabel: s
plt.legend()
if fname:
plt.tight_layout()
plt.savefig(fname=fname)
img_data = io.BytesIO()
plt.savefig(img_data)
img_data.seek(0)
s3 = boto3.client('s3')
s3.put_object(Bucket=bucket, Key=fname, Body=img_data)
if show:
plt.show()


def plot_pred_forecast(train: ps.DataFrame, test: ps.DataFrame, predictions: ps.DataFrame, forecast: ps.Series,
train_label: str, test_label: str, title: str, suptitle: str, ylabel: str, xlabel: str,
metrics: dict, fname=None, show=False) -> None:
metrics: dict, bucket: str, fname=None, show=False, ) -> None:
"""
Args:
Expand All @@ -267,7 +276,8 @@ def plot_pred_forecast(train: ps.DataFrame, test: ps.DataFrame, predictions: ps.
ylabel (str): y axis label.
xlabel (str): x axis label.
metrics (dict): Dictionary containing model performance metrics. Any length.
fname (str): Default is None. If not empty, then will be the name of the file.
bucket (str): Name of bucket to write to.
fname (str): Default is None. If not empty, then will be the folder and file path.
show (bool): Boolean option to show chart in console.
Returns:
Expand All @@ -286,7 +296,11 @@ def plot_pred_forecast(train: ps.DataFrame, test: ps.DataFrame, predictions: ps.
plt.xlabel(xlabel)
if fname:
plt.tight_layout()
plt.savefig(fname=fname)
img_data = io.BytesIO()
plt.savefig(img_data)
img_data.seek(0)
s3 = boto3.client('s3')
s3.put_object(Bucket=bucket, Key=fname, Body=img_data)
if show:
plt.show()

Expand All @@ -296,7 +310,7 @@ def apply_prophet(df, periods, horizon):
"""
m = Prophet()
m.fit(df)
future = m.make_future_dataframe(periods=periods)
future = m.make_future_dataframe(periods=int(periods))
future.tail()
forecast = m.predict(future)
cross_val = cross_validation(m, horizon=horizon)
Expand All @@ -308,7 +322,7 @@ def get_start_end_date(dataframe, period, forecast_count):
"""
Args:
Dataframe (Dataframe): Dataframe containing training timeseries dataset.
dataframe (Dataframe): Dataframe containing training timeseries dataset.
period (string): Description of the Period. "M" for example,
forecast_count (Int): Amount of data points you want to forecast for
Expand All @@ -317,7 +331,6 @@ def get_start_end_date(dataframe, period, forecast_count):
End Date (Datetime)
"""

max_index = dataframe.index.max()

date_maker = {
Expand All @@ -330,20 +343,21 @@ def get_start_end_date(dataframe, period, forecast_count):

start_date = date_maker.get(period)[0]
end_date = date_maker.get(period)[1]

return start_date, end_date


def forecast_ets(dataframe, start_date, end_date, trend="add", seasonal=None, damped_trend=False, seasonal_periods=None):
"""
Args:
Dataframe (Dataframe): Dataframe containing training timeseries dataset.
dataframe (Dataframe): Dataframe containing training timeseries dataset.
start_date (string): Start date of the Forecast,
end_date (string): End date of the Forecast
seasonal (String): Trend Component model. Optional. "Add", "mul" or None (default)
damped_trend (Bool): Whether or not an included trend component is damped. Default is False
seasonal_periods (int): The number of periods in a complete seasonal cycle for seasonal (Holt-Winters) models. For example, 4 for quarterly data with an annual cycle or 7 for daily data with a weekly cycle. Required if seasonal is not None.
damped_trend (Bool): Dampen the trend component. Default is False
seasonal_periods (int): The number of periods in a complete seasonal cycle for seasonal
(Holt-Winters) models. For example, 4 for quarterly data with an annual cycle
or 7 for daily data with a weekly cycle. Required if seasonal is not None.
Returns:
Forecast Results (Dataframe),
Expand All @@ -357,14 +371,13 @@ def forecast_ets(dataframe, start_date, end_date, trend="add", seasonal=None, da
trend=trend,
seasonal=seasonal,
damped_trend=damped_trend,
lbh-huudo-morehelpers
seasonal_periods=seasonal_periods,
)
fit = model.fit()

fit = model.fit()
pred = fit.get_prediction(start=start_date, end=end_date)

df = pred.summary_frame(alpha=0.05)

return df

def holt_winters(dataframe,forecast_count,seasonal_periods=None,seasonal=None,use_boxcox = None,initialization_method="estimated"):
Expand Down
Loading

0 comments on commit c9268b3

Please sign in to comment.