Skip to content

Commit

Permalink
started implementing (exponential) moving average logic in forecast.py
Browse files Browse the repository at this point in the history
  • Loading branch information
enzbus committed Jan 9, 2024
1 parent b38f3a4 commit 2a7b2ab
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 97 deletions.
192 changes: 96 additions & 96 deletions cvxportfolio/forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,82 +94,126 @@ class BaseForecast(Estimator):

_last_time = None

def _agnostic_update(self, t, past_returns):
def __post_init__(self):
raise NotImplementedError

def initialize_estimator( # pylint: disable=arguments-differ
self, **kwargs):
"""Re-initialize whenever universe changes.
:param kwargs: Unused arguments to :meth:`initialize_estimator`.
:type kwargs: dict
"""
self.__post_init__()

def _agnostic_update(self, t, past_returns, **kwargs):
"""Choose whether to make forecast from scratch or update last one."""
if (self._last_time is None) or (
self._last_time != past_returns.index[-1]):
logger.debug(
'%s.values_in_time at time %s is computed from scratch.',
self, t)
self._initial_compute(t=t, past_returns=past_returns)
self._initial_compute(t=t, past_returns=past_returns, **kwargs)
else:
logger.debug(
'%s.values_in_time at time %s is updated from previous value.',
self, t)
self._online_update(t=t, past_returns=past_returns)
self._online_update(t=t, past_returns=past_returns, **kwargs)

def _initial_compute(self, t, past_returns):
def _initial_compute(self, **kwargs):
"""Make forecast from scratch."""
raise NotImplementedError # pragma: no cover

def _online_update(self, t, past_returns):
def _online_update(self, **kwargs):
"""Update forecast from period before."""
raise NotImplementedError # pragma: no cover

def _is_timedelta(value):
if isinstance(value, pd.Timedelta):
return True
if isinstance(value, float) and np.isposinf(value):
return False
raise ValueError(
'(Exponential) moving average windows can only be'
' pandas Timedeltas or np.inf.')

@dataclass(unsafe_hash=True)
class HistoricalMeanReturn(BaseForecast):
r"""Historical mean returns.
class BaseMeanForecast(BaseForecast):
"""Base forecaster for means."""

This ignores both the cash returns column and all missing values.
"""
ema_half_life: pd.Timedelta = np.inf
ma_window: pd.Timedelta = np.inf

def __post_init__(self):
self._last_time = None
self._last_counts = None
self._last_sum = None

def initialize_estimator( # pylint: disable=arguments-differ
self, **kwargs):
"""Re-initialize whenever universe changes.
:param kwargs: Unused arguments to :meth:`initialize_estimator`.
:type kwargs: dict
"""
self.__post_init__()
def _dataframe_selector(self, **kwargs):
"""Return dataframe to compute the historical means of."""
raise NotImplementedError # pragma: no cover

def values_in_time( # pylint: disable=arguments-differ
self, t, past_returns, **kwargs):
"""Obtain current value of the mean returns.
self, **kwargs):
"""Obtain current value of the historical mean of given dataframe.
:param t: Current time.
:type t: pandas.Timestamp
:param past_returns: Past market returns, including cash.
:type past_returns: pandas.DataFrame
:param kwargs: Unused arguments to :meth:`values_in_time`.
:param kwargs: All arguments to :meth:`values_in_time`.
:type kwargs: dict
:returns: Means of past returns (excluding cash).
:returns: Historical means of given dataframe.
:rtype: numpy.array
"""
self._agnostic_update(t=t, past_returns=past_returns)
self._agnostic_update(**kwargs)
return (self._last_sum / self._last_counts).values

def _initial_compute(self, t, past_returns):
def _initial_compute(self, t, **kwargs):
"""Make forecast from scratch."""
self._last_counts = past_returns.iloc[:, :-1].count()
self._last_sum = past_returns.iloc[:, :-1].sum()
df = self._dataframe_selector(t=t, **kwargs)

# Moving average window logic
if _is_timedelta(self.ma_window):
df = df.loc[df.index >= t-self.ma_window]

self._last_counts = df.count()
if self._last_counts.min() == 0:
raise ForecastError(
f'{self.__class__.__name__} is computing the '
+ 'mean of a column with no values.')
self._last_sum = df.sum()
self._last_time = t

def _online_update(self, t, past_returns):
def _online_update(self, t, **kwargs):
"""Update forecast from period before."""
self._last_counts += ~(past_returns.iloc[-1, :-1].isnull())
self._last_sum += past_returns.iloc[-1, :-1].fillna(0.)
df = self._dataframe_selector(t=t, **kwargs)
last_row = df.iloc[-1]
self._last_counts += ~(last_row.isnull())
self._last_sum += last_row.fillna(0.)

# Moving average window logic
if _is_timedelta(self.ma_window):
skips = df.loc[
(df.index >= (self._last_time - self.ma_window))
& (df.index < (t - self.ma_window))
]
self._last_counts -= skips.count()
self._last_sum -= skips.sum().fillna(0.)

self._last_time = t

@dataclass(unsafe_hash=True)
class HistoricalMeanReturn(BaseMeanForecast):
r"""Historical mean returns.
This ignores both the cash returns column and all missing values.
"""

def _dataframe_selector(self, past_returns, **kwargs):
"""Return dataframe to compute the historical means of."""
return past_returns.iloc[:, :-1]


@dataclass(unsafe_hash=True)
class HistoricalVariance(BaseForecast):
class HistoricalVariance(BaseMeanForecast):
r"""Historical variances of non-cash returns.
:param kelly: if ``True`` compute :math:`\mathbf{E}[r^2]`, else
Expand All @@ -185,50 +229,26 @@ class HistoricalVariance(BaseForecast):
def __post_init__(self):
if not self.kelly:
self.meanforecaster = HistoricalMeanReturn()
self._last_time = None
self._last_counts = None
self._last_sum = None

def initialize_estimator( # pylint: disable=arguments-differ
self, **kwargs):
"""Re-initialize whenever universe changes.
:param kwargs: Unused arguments to :meth:`initialize_estimator`.
:type kwargs: dict
"""
self.__post_init__()
super().__post_init__()

def values_in_time( # pylint: disable=arguments-differ
self, t, past_returns, **kwargs):
self, **kwargs):
"""Obtain current value either by update or from scratch.
:param t: Current time.
:type t: pandas.Timestamp
:param past_returns: Past market returns, including cash.
:type past_returns: pandas.DataFrame
:param kwargs: Unused arguments to :meth:`values_in_time`.
:param kwargs: All arguments to :meth:`values_in_time`.
:type kwargs: dict
:returns: Variances of past returns (excluding cash).
:rtype: numpy.array
"""
self._agnostic_update(t=t, past_returns=past_returns)
result = (self._last_sum / self._last_counts).values
result = super().values_in_time(**kwargs)
if not self.kelly:
result -= self.meanforecaster.current_value**2
return result

def _initial_compute(self, t, past_returns):
"""Compute from scratch."""
self._last_counts = past_returns.iloc[:, :-1].count()
self._last_sum = (past_returns.iloc[:, :-1]**2).sum()
self._last_time = t

def _online_update(self, t, past_returns):
"""Update from estimate at t-1."""
self._last_counts += ~(past_returns.iloc[-1, :-1].isnull())
self._last_sum += past_returns.iloc[-1, :-1].fillna(0.)**2
self._last_time = t
def _dataframe_selector(self, past_returns, **kwargs):
"""Return dataframe to compute the historical means of."""
return past_returns.iloc[:, :-1]**2


@dataclass(unsafe_hash=True)
Expand All @@ -237,21 +257,17 @@ class HistoricalStandardDeviation(HistoricalVariance):

kelly: bool = True

def values_in_time(self, t, past_returns, **kwargs):
def values_in_time(self, **kwargs):
"""Obtain current value either by update or from scratch.
:param t: Current time.
:type t: pandas.Timestamp
:param past_returns: Past market returns, including cash.
:type past_returns: pandas.DataFrame
:param kwargs: Unused arguments to :meth:`values_in_time`.
:param kwargs: All arguments to :meth:`values_in_time`.
:type kwargs: dict
:returns: Standard deviations of past returns (excluding cash).
:rtype: numpy.array
"""
variances = \
super().values_in_time(t=t, past_returns=past_returns, **kwargs)
super().values_in_time(**kwargs)
return np.sqrt(variances)

class HistoricalMeanError(HistoricalVariance):
Expand All @@ -266,22 +282,17 @@ class HistoricalMeanError(HistoricalVariance):
def __init__(self):
super().__init__(kelly=False)

def values_in_time(self, t, past_returns, **kwargs):
def values_in_time(self, **kwargs):
"""Obtain current value either by update or from scratch.
:param t: Current time.
:type t: pandas.Timestamp
:param past_returns: Past market returns, including cash.
:type past_returns: pandas.DataFrame
:param kwargs: Unused arguments to :meth:`values_in_time`.
:param kwargs: All arguments to :meth:`values_in_time`.
:type kwargs: dict
:returns: Standard deviation of the mean of past returns (excluding
cash).
:rtype: numpy.array
"""
variance = super().values_in_time(
t=t, past_returns=past_returns, **kwargs)
variance = super().values_in_time(**kwargs)
return np.sqrt(variance / self._last_counts.values)


Expand Down Expand Up @@ -423,15 +434,6 @@ def __post_init__(self):
self._last_sum_matrix = None
self._joint_mean = None

def initialize_estimator( # pylint: disable=arguments-differ
self, **kwargs):
"""Re-initialize whenever universe changes.
:param kwargs: Unused arguments to :meth:`initialize_estimator`.
:type kwargs: dict
"""
self.__post_init__()

@staticmethod
def _get_count_matrix(past_returns):
r"""We obtain the matrix of non-null joint counts:
Expand All @@ -451,7 +453,7 @@ def _get_initial_joint_mean(past_returns):
tmp = nonnull.T @ past_returns.iloc[:, :-1].fillna(0.)
return tmp # * tmp.T

def _initial_compute(self, t, past_returns):
def _initial_compute(self, t, past_returns, **kwargs):
self._last_counts_matrix = self._get_count_matrix(past_returns).values
filled = past_returns.iloc[:, :-1].fillna(0.).values
self._last_sum_matrix = filled.T @ filled
Expand All @@ -460,7 +462,7 @@ def _initial_compute(self, t, past_returns):

self._last_time = t

def _online_update(self, t, past_returns):
def _online_update(self, t, past_returns, **kwargs):
nonnull = ~(past_returns.iloc[-1, :-1].isnull())
self._last_counts_matrix += np.outer(nonnull, nonnull)
last_ret = past_returns.iloc[-1, :-1].fillna(0.)
Expand All @@ -471,14 +473,12 @@ def _online_update(self, t, past_returns):

@online_cache
def values_in_time( # pylint: disable=arguments-differ
self, t, past_returns, **kwargs):
self, t, **kwargs):
"""Obtain current value of the covariance estimate.
:param t: Current time.
:param t: Current time period (possibly of simulation).
:type t: pandas.Timestamp
:param past_returns: Past market returns (including cash).
:type past_returns: pandas.DataFrame
:param kwargs: Extra arguments passed to :meth:`values_in_time`.
:param kwargs: All arguments passed to :meth:`values_in_time`.
:type kwargs: dict
:raises cvxportfolio.errors.ForecastError: The procedure failed,
Expand All @@ -489,7 +489,7 @@ def values_in_time( # pylint: disable=arguments-differ
:rtype: numpy.array
"""

self._agnostic_update(t=t, past_returns=past_returns)
self._agnostic_update(t=t, **kwargs)
covariance = self._last_sum_matrix / self._last_counts_matrix

if not self.kelly:
Expand Down
2 changes: 1 addition & 1 deletion examples/strategies/strategy_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ def backtest_from_day(self, day):
el for el in day_init_holdings.index if not el == self.cash_key]
sim = cvx.StockMarketSimulator(
market_data=cvx.DownloadedMarketData(
day_universe, min_history=pd.Timedelta('0d'),
day_universe, min_history=pd.Timedelta('0d'),
cash_key=self.cash_key))

# This should be done by MarketSimulator, but for safety.
Expand Down

0 comments on commit 2a7b2ab

Please sign in to comment.