Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding PredictiveAnomalyDetection to river/anomaly #1458

Merged
merged 37 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
b5b698b
Created outline for new estimator + file for manual tests
Nov 8, 2023
a4ca4f9
fixed import order
Nov 8, 2023
b48bced
fixed import order again
Nov 8, 2023
00c9bab
fixed import order again
Nov 8, 2023
50c8528
testing local credential helper
Nov 8, 2023
14da4c4
changed class to base off of supervised AD
Nov 8, 2023
94dd511
basic RAD functionality but yet without docs
Nov 8, 2023
884aa6e
fixed missing type annotation
Nov 8, 2023
35318a6
tested and fixed for ARIMA, added warmup
Nov 9, 2023
1d2af97
changed typing of learn_one2
Nov 9, 2023
c09fafe
added comments for doc purposes
Nov 15, 2023
9b468b0
Merge branch 'online-ml:main' into main
sebiwtt Nov 16, 2023
ce80c93
added docstring
Nov 23, 2023
3498030
corrected syntax error
Nov 23, 2023
99941f3
corrected syntax error
Nov 23, 2023
192a96c
corrected syntax error
Nov 23, 2023
5c23200
corrected syntax error
Nov 23, 2023
7e60f95
corrected syntax error
Nov 23, 2023
2d854c8
corrected syntax error
Nov 23, 2023
5ceb70f
corrected syntax error
Nov 23, 2023
0a44e1d
corrected syntax error
Nov 23, 2023
1b1a16d
corrected syntax error
Nov 23, 2023
9f577eb
corrected syntax error
Nov 23, 2023
49c884d
Merge branch 'online-ml:main' into main
sebiwtt Nov 23, 2023
5b8a8dc
reverted some changes that were made accidentally
Nov 29, 2023
0a86151
fixed the docstring error that prevented unit test from passing
Nov 29, 2023
e9f8d72
Merge branch 'online-ml:main' into main
sebiwtt Dec 5, 2023
8f3150e
changed the updates on mean and var stats to fit into warmup period
Feb 8, 2024
49ead1c
fixed the docstring error that prevented unit test from passing (again)
Feb 8, 2024
83f7176
Refactor PAD.py
hoanganhngo610 Feb 12, 2024
26b4a5b
Update CONTRIBUTING.md
hoanganhngo610 Mar 17, 2024
4463b49
Delete setup.py
hoanganhngo610 Mar 17, 2024
e179d0c
Update base.py
hoanganhngo610 Mar 18, 2024
862f44f
Update lof.py
hoanganhngo610 Mar 18, 2024
9094f82
Update lof.py
hoanganhngo610 Mar 18, 2024
ebd98d8
Update test_lof.py
hoanganhngo610 Mar 18, 2024
c3ea315
Update pad.py
hoanganhngo610 Mar 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions river/anomaly/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
model.

"""

from __future__ import annotations

from . import base
from .filter import QuantileFilter, ThresholdFilter
from .gaussian import GaussianScorer
from .hst import HalfSpaceTrees
from .lof import LocalOutlierFactor
from .pad import PredictiveAnomalyDetection
from .sad import StandardAbsoluteDeviation
from .svm import OneClassSVM

Expand All @@ -31,4 +33,5 @@
"StandardAbsoluteDeviation",
"ThresholdFilter",
"LocalOutlierFactor",
"PredictiveAnomalyDetection",
]
4 changes: 3 additions & 1 deletion river/anomaly/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ class AnomalyFilter(base.Wrapper, base.Estimator):

"""

def __init__(self, anomaly_detector: AnomalyDetector, protect_anomaly_detector=True):
def __init__(
self, anomaly_detector: AnomalyDetector, protect_anomaly_detector=True
):
self.anomaly_detector = anomaly_detector
self.protect_anomaly_detector = protect_anomaly_detector

Expand Down
27 changes: 21 additions & 6 deletions river/anomaly/lof.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,11 @@ def calc_lof(set_index: set, neighborhoods: dict, local_reach: dict, lof: dict):
"""
for i in set_index:
denominator = len(neighborhoods[i]) * local_reach[i]
lof[i] = sum(local_reach[j] for j in neighborhoods[i]) / denominator if denominator else 0
lof[i] = (
sum(local_reach[j] for j in neighborhoods[i]) / denominator
if denominator
else 0
)
return lof


Expand Down Expand Up @@ -389,8 +393,10 @@ def score_one(self, x: dict):
self.lof.copy(),
)

neighborhoods, rev_neighborhoods, k_dist, dist_dict = self._initial_calculations(
x_list_copy, nm, neighborhoods, rev_neighborhoods, k_dist, dist_dict
neighborhoods, rev_neighborhoods, k_dist, dist_dict = (
self._initial_calculations(
x_list_copy, nm, neighborhoods, rev_neighborhoods, k_dist, dist_dict
)
)
(
set_new_points,
Expand All @@ -400,7 +406,12 @@ def score_one(self, x: dict):
set_upd_lof,
) = define_sets(nm, neighborhoods, rev_neighborhoods)
reach_dist = calc_reach_dist_new_points(
set_new_points, neighborhoods, rev_neighborhoods, reach_dist, dist_dict, k_dist
set_new_points,
neighborhoods,
rev_neighborhoods,
reach_dist,
dist_dict,
k_dist,
)
reach_dist = calc_reach_dist_other_points(
set_rev_neighbors,
Expand All @@ -409,7 +420,9 @@ def score_one(self, x: dict):
dist_dict,
k_dist,
)
local_reach = calc_local_reach_dist(set_upd_lrd, neighborhoods, reach_dist, local_reach)
local_reach = calc_local_reach_dist(
set_upd_lrd, neighborhoods, reach_dist, local_reach
)
lof = calc_lof(set_upd_lof, neighborhoods, local_reach, lof)
self.x_scores = []

Expand Down Expand Up @@ -475,7 +488,9 @@ def _initial_calculations(

# Calculate new k-dist for each particle
for i, inner_dict in enumerate(dist_dict.values()):
k_distances[i] = sorted(inner_dict.values())[min(k, len(inner_dict.values())) - 1]
k_distances[i] = sorted(inner_dict.values())[
min(k, len(inner_dict.values())) - 1
]

# Only keep particles that are neighbors in distance dictionary
dist_dict = {
Expand Down
169 changes: 169 additions & 0 deletions river/anomaly/pad.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
from __future__ import annotations

import math

from river import anomaly, base, linear_model, preprocessing, stats, time_series

__all__ = ["PredictiveAnomalyDetection"]


class PredictiveAnomalyDetection(anomaly.base.SupervisedAnomalyDetector):
"""Predictive Anomaly Detection.

This semi-supervised technique to anomaly detection employs a predictive model to learn the normal behavior
of a dataset. It forecasts future data points and compares these predictions with actual values to determine
anomalies. An anomaly score is calculated based on the deviation of the prediction from the actual value, with higher
scores indicating a higher probability of an anomaly.

The actual anomaly score is calculated by comparing the squared-error to a dynamic threshold. If the error is larger
than this threshold, the score will be 1.0; else, the score will be linearly distributed within the range (0.0, 1.0),
with a higher score indicating a higher squared error compared to the threshold.

Parameters
----------
predictive_model
The underlying model that learns the normal behavior of the data and makes predictions on future behavior.
This can be an estimator of any type, depending on the type of problem (e.g. some Forecaster for Time-Series Data).
horizon
When a Forecaster is used as a predictive model, this is the horizon of its forecasts.
n_std
Number of Standard Deviations to calculate the threshold. A larger number of standard deviation will result in
a higher threshold, resulting in the model being less sensitive.
warmup_period
Duration for the model to warm up. Since the model starts with zero knowledge,
the first instances will have very high anomaly scores, resulting in bad predictions (or high error). As such,
a warm-up period is necessary to discard the first seen instances.
While the model is within the warm-up period, no score will be calculated and the score_one method will return 0.0.

Attributes
----------
dynamic_mae : stats.Mean
The running mean of the (squared) errors from the predictions of the model to update the dynamic threshold.
dynamic_se_variance : stats.Var
The running variance of the (squared) errors from the predictions of the model to update the dynamic threshold.
iter : int
The number of iterations (data points) passed.

Examples
--------

>>> from river import datasets
>>> from river import time_series
>>> from river import anomaly
>>> from river import preprocessing
>>> from river import linear_model
>>> from river import optim

>>> period = 12
>>> predictive_model = time_series.SNARIMAX(
... p=period,
... d=1,
... q=period,
... m=period,
... sd=1,
... regressor=(
... preprocessing.StandardScaler()
... | linear_model.LinearRegression(
... optimizer=optim.SGD(0.005),
... )
... ),
... )

>>> PAD = anomaly.PredictiveAnomalyDetection(
... predictive_model,
... horizon=1,
... n_std=3.5,
... warmup_period=15
... )

>>> scores = []

>>> for t, (x, y) in enumerate(datasets.AirlinePassengers()):
... score = PAD.score_one(None, y)
... PAD = PAD.learn_one(None, y)
... scores.append(score)

>>> print(scores[-1])
0.05329236123455621

References
----------
[^1]: Laptev N, Amizadeh S, Flint I. Generic and scalable framework for Automated Time-series Anomaly Detection.
Proceedings of the 21st ACM SIGKDD International Conference on Knowledge Discovery and Data Mining 2015.
doi:10.1145/2783258.2788611.
"""

def __init__(
self,
predictive_model: base.Estimator | None = None,
horizon: int = 1,
n_std: float = 3.0,
warmup_period: int = 0,
):

self.predictive_model = (
predictive_model
if predictive_model is not None
else preprocessing.MinMaxScaler() | linear_model.LinearRegression()
)

self.horizon = horizon
self.n_std = n_std
self.warmup_period = warmup_period

# Initialize necessary statistical measures
self.dynamic_mae = stats.Mean()
self.dynamic_se_variance = stats.Var()

# Initialize necessary values for warm-up procedure
self.iter: int = 0

# This method is called to make the predictive model learn one example
def learn_one(self, x: dict | None, y: base.typing.Target | float):
self.iter += 1

# Check whether the model is a time-series forecasting or regression/classification model
if isinstance(
self.predictive_model, time_series.base.Forecaster
) and isinstance(y, float):
# When there's no data point as dict of features, the target will be passed
# to the forecaster as exogenous variable.
if not x:
self.predictive_model.learn_one(y=y)
else:
self.predictive_model.learn_one(y=y, x=x)
else:
self.predictive_model.learn_one(x=x, y=y)
return self

def score_one(self, x: dict, y: base.typing.Target):
# Return the predicted value of x from the predictive model, first by checking whether
# it is a time-series forecaster.
if isinstance(self.predictive_model, time_series.base.Forecaster):
y_pred = self.predictive_model.forecast(self.horizon)[0]
else:
y_pred = self.predictive_model.predict_one(x)

# Calculate the errors necessary for thresholding
squared_error = (y_pred - y) ** 2

# Based on the errors and hyperparameters, calculate threshold
threshold = self.dynamic_mae.get() + (
self.n_std * math.sqrt(self.dynamic_se_variance.get())
)

# When warmup hyper-parameter is used, the anomaly score is only returned once the warmup period has passed.
# When the warmup period has not passed, the default value of the anomaly score is 0.
if self.iter < self.warmup_period:
return 0.0

# Updating metrics only when not in warmup
self.dynamic_mae.update(squared_error)
self.dynamic_se_variance.update(squared_error)

# Every error above threshold is scored with 100% or 1.0
# Everything below is distributed linearly from 0.0 - 0.999...
if squared_error >= threshold:
return 1.0
else:
return squared_error / threshold
12 changes: 9 additions & 3 deletions river/anomaly/test_lof.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,19 @@ def test_incremental_lof_scores():
x_train_dict = [{f"feature_{i + 1}": elem[i] for i in range(2)} for elem in x_train]
ground_truth = np.ones(len(x_train), dtype=int)
ground_truth[-len(x_outliers) :] = -1
df_train = pd.DataFrame({"observations": x_train_dict, "ground_truth": ground_truth})
df_train = pd.DataFrame(
{"observations": x_train_dict, "ground_truth": ground_truth}
)
x_pred = np.random.uniform(low=-5, high=5, size=(30, 2))
x_pred_dict = [{f"feature_{i + 1}": elem[i] for i in range(2)} for elem in x_pred]
incremental_lof = anomaly.LocalOutlierFactor(n_neighbors=20)

for x in df_train["observations"]:
incremental_lof.learn_one(x)

ilof_scores_train = np.array([ilof_score for ilof_score in incremental_lof.lof.values()])
ilof_scores_train = np.array(
[ilof_score for ilof_score in incremental_lof.lof.values()]
)

ilof_scores_pred = []
for x in x_pred_dict:
Expand All @@ -46,7 +50,9 @@ def test_incremental_lof_scores():
lof_sklearn.fit_predict(x_train)
lof_sklearn_scores_train = -lof_sklearn.negative_outlier_factor_

assert np.allclose(ilof_scores_train, lof_sklearn_scores_train, rtol=1e-08, atol=1e-08)
assert np.allclose(
ilof_scores_train, lof_sklearn_scores_train, rtol=1e-08, atol=1e-08
)


def test_batch_lof_scores():
Expand Down
Loading