Skip to content

Commit

Permalink
chore(anomaly_detection): surface computed thresholds (#1552)
Browse files Browse the repository at this point in the history
  • Loading branch information
ram-senth authored Dec 12, 2024
1 parent 3e0bc74 commit 4169341
Show file tree
Hide file tree
Showing 22 changed files with 596 additions and 283 deletions.
20 changes: 16 additions & 4 deletions src/seer/anomaly_detection/accessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,14 +392,26 @@ def combine_anomalies(
combined_flags[i] = anomalies_suss.flags[i]
combined_scores[i] = anomalies_suss.scores[i]
combined_original_flags[i] = anomalies_suss.original_flags[i]
if i < len(anomalies_suss.thresholds):
combined_thresholds[i] = anomalies_suss.thresholds[i]
if (
anomalies_suss.thresholds is not None
and anomalies_fixed.thresholds is not None
and combined_thresholds is not None
):
for j in range(len(anomalies_suss.thresholds)):
if i < len(anomalies_suss.thresholds[j]):
combined_thresholds[j][i] = anomalies_suss.thresholds[j][i]
else:
combined_flags[i] = anomalies_fixed.flags[i]
combined_scores[i] = anomalies_fixed.scores[i]
combined_original_flags[i] = anomalies_fixed.original_flags[i]
if i < len(anomalies_fixed.thresholds):
combined_thresholds[i] = anomalies_fixed.thresholds[i]
if (
anomalies_suss.thresholds is not None
and anomalies_fixed.thresholds is not None
and combined_thresholds is not None
):
for j in range(len(anomalies_suss.thresholds)):
if i < len(anomalies_fixed.thresholds[j]):
combined_thresholds[j][i] = anomalies_fixed.thresholds[j][i]

return MPTimeSeriesAnomalies(
flags=combined_flags,
Expand Down
10 changes: 8 additions & 2 deletions src/seer/anomaly_detection/anomaly_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,15 @@ def _batch_detect(
batch_detector = MPBatchAnomalyDetector()

anomalies_suss = batch_detector.detect(
convert_external_ts_to_internal(timeseries), config, window_size=window_size
convert_external_ts_to_internal(timeseries),
config,
algo_config=algo_config,
window_size=window_size,
)
anomalies_fixed = batch_detector.detect(
convert_external_ts_to_internal(timeseries),
config,
algo_config=algo_config,
window_size=algo_config.mp_fixed_window_size,
)
anomalies = DbAlertDataAccessor().combine_anomalies(
Expand Down Expand Up @@ -254,7 +258,9 @@ def _min_required_timesteps(self, time_period, min_num_days=7):

@sentry_sdk.trace
def _combo_detect(
self, ts_with_history: TimeSeriesWithHistory, config: AnomalyDetectionConfig
self,
ts_with_history: TimeSeriesWithHistory,
config: AnomalyDetectionConfig,
) -> Tuple[List[TimeSeriesPoint], MPTimeSeriesAnomalies]:
"""
Stateless online anomaly detection for a part of a time series. This function takes two parts of the time series -
Expand Down
7 changes: 4 additions & 3 deletions src/seer/anomaly_detection/anomaly_detection_di.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ def algoconfig_provider() -> AlgoConfig:
return AlgoConfig(
mp_ignore_trivial=True,
mp_normalize=False,
prophet_uncertainty_samples=1,
prophet_uncertainty_samples=5,
prophet_mcmc_samples=0,
mp_fixed_window_size=10,
return_thresholds=False,
return_predicted_range=False,
return_thresholds=True,
return_predicted_range=True,
)


Expand Down
38 changes: 24 additions & 14 deletions src/seer/anomaly_detection/detectors/anomaly_detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
AnomalyDetectionConfig,
AnomalyFlags,
MPTimeSeriesAnomaliesSingleWindow,
Threshold,
TimeSeries,
TimeSeriesAnomalies,
)
Expand All @@ -34,7 +35,9 @@ class AnomalyDetector(BaseModel, abc.ABC):
"""

@abc.abstractmethod
def detect(self, timeseries: TimeSeries, config: AnomalyDetectionConfig) -> TimeSeriesAnomalies:
def detect(
self, timeseries: TimeSeries, ad_config: AnomalyDetectionConfig, algo_config: AlgoConfig
) -> TimeSeriesAnomalies:
return NotImplemented


Expand All @@ -43,9 +46,14 @@ class MPBatchAnomalyDetector(AnomalyDetector):
This class encapsulates the logic for using Matrix Profile for batch anomaly detection.
"""

@inject
@sentry_sdk.trace
def detect(
self, timeseries: TimeSeries, config: AnomalyDetectionConfig, window_size: int | None = None
self,
timeseries: TimeSeries,
ad_config: AnomalyDetectionConfig,
algo_config: AlgoConfig = injected,
window_size: int | None = None,
) -> MPTimeSeriesAnomaliesSingleWindow:
"""
This method uses matrix profile to detect and score anonalies in the time series.
Expand All @@ -60,17 +68,17 @@ def detect(
Returns:
The input timeseries with an anomaly scores and a flag added
"""
return self._compute_matrix_profile(timeseries, config, window_size)
return self._compute_matrix_profile(timeseries, ad_config, algo_config, window_size)

@inject
@sentry_sdk.trace
def _compute_matrix_profile(
self,
timeseries: TimeSeries,
config: AnomalyDetectionConfig,
ad_config: AnomalyDetectionConfig,
algo_config: AlgoConfig,
window_size: int | None = None,
ws_selector: WindowSizeSelector = injected,
algo_config: AlgoConfig = injected,
scorer: MPScorer = injected,
mp_utils: MPUtils = injected,
) -> MPTimeSeriesAnomaliesSingleWindow:
Expand Down Expand Up @@ -113,7 +121,7 @@ def _compute_matrix_profile(
values=ts_values,
timestamps=timeseries.timestamps,
mp_dist=mp_dist,
ad_config=config,
ad_config=ad_config,
window_size=window_size,
)
if flags_and_scores is None:
Expand All @@ -125,7 +133,7 @@ def _compute_matrix_profile(
batch_flag_smoother = MajorityVoteBatchFlagSmoother()
smoothed_flags = batch_flag_smoother.smooth(
flags=flags_and_scores.flags,
ad_config=config,
ad_config=ad_config,
)

# Update the flags in flags_and_scores with the smoothed flags
Expand All @@ -136,7 +144,7 @@ def _compute_matrix_profile(
scores=flags_and_scores.scores,
matrix_profile=mp,
window_size=window_size,
thresholds=flags_and_scores.thresholds,
thresholds=flags_and_scores.thresholds if algo_config.return_thresholds else None,
original_flags=original_flags,
)

Expand Down Expand Up @@ -164,7 +172,8 @@ class MPStreamAnomalyDetector(AnomalyDetector):
def detect(
self,
timeseries: TimeSeries,
config: AnomalyDetectionConfig,
ad_config: AnomalyDetectionConfig,
algo_config: AlgoConfig = injected,
scorer: MPScorer = injected,
mp_utils: MPUtils = injected,
) -> MPTimeSeriesAnomaliesSingleWindow:
Expand Down Expand Up @@ -202,7 +211,7 @@ def detect(
scores: list[float] = []
flags: list[AnomalyFlags] = []
streamed_mp: list[list[float]] = []
thresholds: list[float] = []
thresholds: list[list[Threshold]] = []
for cur_val, cur_timestamp in zip(timeseries.values, timeseries.timestamps):
# Update the stumpi stream processor with new data
stream.update(cur_val)
Expand All @@ -218,7 +227,7 @@ def detect(
history_values=self.history_values,
history_timestamps=self.history_timestamps,
history_mp_dist=mp_dist_baseline,
ad_config=config,
ad_config=ad_config,
window_size=self.window_size,
)
if flags_and_scores is None:
Expand All @@ -231,7 +240,7 @@ def detect(
# Apply stream smoothing to the newest flag based on the previous original flags
smoothed_flags = stream_flag_smoother.smooth(
original_flags=self.original_flags,
ad_config=config,
ad_config=ad_config,
vote_threshold=0.3,
cur_flag=flags_and_scores.flags,
)
Expand All @@ -240,7 +249,8 @@ def detect(

scores.extend(flags_and_scores.scores)
flags.extend(flags_and_scores.flags)
thresholds.extend(flags_and_scores.thresholds)
if flags_and_scores.thresholds is not None:
thresholds.extend(flags_and_scores.thresholds)

# Add new data point as well as its matrix profile to baseline
self.history_timestamps = np.append(self.history_timestamps, cur_timestamp)
Expand All @@ -257,6 +267,6 @@ def detect(
excl_zone_denom=stumpy.config.STUMPY_EXCL_ZONE_DENOM,
),
window_size=self.window_size,
thresholds=thresholds,
thresholds=thresholds if algo_config.return_thresholds else None,
original_flags=self.original_flags,
)
105 changes: 74 additions & 31 deletions src/seer/anomaly_detection/detectors/location_detectors.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import abc
import logging
from enum import Enum
from typing import Optional

import numpy as np
Expand All @@ -10,13 +9,16 @@
from prophet import Prophet # type: ignore
from pydantic import BaseModel, Field

logger = logging.getLogger(__name__)

from seer.anomaly_detection.models import (
AlgoConfig,
PointLocation,
RelativeLocation,
Threshold,
ThresholdType,
)
from seer.dependency_injection import inject, injected

class PointLocation(Enum):
UP = 1
DOWN = 2
NONE = 3
logger = logging.getLogger(__name__)


class LocationDetector(BaseModel, abc.ABC):
Expand All @@ -31,7 +33,7 @@ def detect(
streamed_timestamp: np.float64,
history_values: npt.NDArray[np.float64],
history_timestamps: npt.NDArray[np.float64],
) -> Optional[PointLocation]:
) -> Optional[RelativeLocation]:
return NotImplemented


Expand All @@ -54,7 +56,7 @@ def detect(
streamed_timestamp: np.float64,
history_values: npt.NDArray[np.float64],
history_timestamps: npt.NDArray[np.float64],
) -> Optional[PointLocation]:
) -> Optional[RelativeLocation]:
"""
Detect relative location of the streamed value in the context of recent data points using linear regression.
Expand All @@ -63,7 +65,7 @@ def detect(
history_values (npt.NDArray[np.float64]): Historical time series data
Returns:
PointLocation: The detected relative location of the streamed value as compared to the trend of recent data points (UP, DOWN, or NONE).
RelativeLocation: The detected relative location of the streamed value as compared to the trend of recent data points (UP, DOWN, or NONE).
UP: The streamed value is above the trend of recent data points.
DOWN: The streamed value is below the trend of recent data points.
NONE: The streamed value is within the expected range of recent data points.
Expand All @@ -80,11 +82,20 @@ def detect(
slope, _ = np.polyfit(x, y, 1)

if slope > self.threshold:
return PointLocation.UP
return RelativeLocation(
location=PointLocation.UP,
thresholds=[],
)
elif slope < -self.threshold:
return PointLocation.DOWN
return RelativeLocation(
location=PointLocation.DOWN,
thresholds=[],
)
else:
return PointLocation.NONE
return RelativeLocation(
location=PointLocation.NONE,
thresholds=[],
)


class ProphetLocationDetector(LocationDetector):
Expand All @@ -98,18 +109,16 @@ class ProphetLocationDetector(LocationDetector):
"""

uncertainty_samples: int = Field(
default=25, description="Whether to use uncertainty samples for Prophet"
)

@sentry_sdk.trace
@inject
def detect(
self,
streamed_value: np.float64,
streamed_timestamp: np.float64,
history_values: npt.NDArray[np.float64],
history_timestamps: npt.NDArray[np.float64],
) -> Optional[PointLocation]:
algo_config: AlgoConfig = injected,
) -> Optional[RelativeLocation]:
"""
Detect relative location of the streamed value in the context of recent data points using Prophet.
Expand All @@ -120,13 +129,16 @@ def detect(
history_timestamps (npt.NDArray[np.float64]): Historical time series timestamps
Returns:
PointLocation: The detected relative location of the streamed value as compared to the predicted value (UP, DOWN, or NONE).
RelativeLocation: The detected relative location of the streamed value as compared to the predicted value (UP, DOWN, or NONE).
UP: The streamed value is above the predicted value.
DOWN: The streamed value is below the predicted value.
NONE: The streamed value is within the expected range of recent data points.
"""
# Create Prophet model and fit on historical data
model = Prophet(mcmc_samples=0, uncertainty_samples=self.uncertainty_samples)
model = Prophet(
mcmc_samples=algo_config.prophet_mcmc_samples,
uncertainty_samples=algo_config.prophet_uncertainty_samples,
)
ts = pd.DataFrame({"ds": history_timestamps, "y": history_values})
model.fit(ts)

Expand All @@ -136,20 +148,51 @@ def detect(

# Predict and compare with streamed value
forecast = model.predict(future)
if self.uncertainty_samples > 0:
prophet_yhat_upper = forecast.loc[len(forecast) - 1]["yhat_upper"]
prophet_yhat_lower = forecast.loc[len(forecast) - 1]["yhat_lower"]
if streamed_value > prophet_yhat_upper:
return PointLocation.UP
elif streamed_value < prophet_yhat_lower:
return PointLocation.DOWN
if algo_config.prophet_uncertainty_samples > 0 or algo_config.prophet_mcmc_samples > 0:
streamed_forecast = forecast.loc[len(forecast) - 1]
yhat_upper = streamed_forecast["yhat_upper"]
yhat_lower = streamed_forecast["yhat_lower"]
if algo_config.return_thresholds:
thresholds = [
Threshold(type=ThresholdType.PREDICTION, upper=yhat_upper, lower=yhat_lower),
Threshold(
type=ThresholdType.TREND,
upper=streamed_forecast["trend_upper"],
lower=streamed_forecast["trend_lower"],
),
]
else:
thresholds = []

if streamed_value > yhat_upper:
return RelativeLocation(
location=PointLocation.UP,
thresholds=thresholds,
)
elif streamed_value < yhat_lower:
return RelativeLocation(
location=PointLocation.DOWN,
thresholds=thresholds,
)
else:
return PointLocation.NONE
return RelativeLocation(
location=PointLocation.NONE,
thresholds=thresholds,
)
else:
forecast = forecast.yhat.loc[len(forecast) - 1]
if np.isclose(streamed_value, forecast, rtol=1e-5, atol=1e-8):
return PointLocation.NONE
return RelativeLocation(
location=PointLocation.NONE,
thresholds=[],
)
elif streamed_value > forecast:
return PointLocation.UP
return RelativeLocation(
location=PointLocation.UP,
thresholds=[],
)
else:
return PointLocation.DOWN
return RelativeLocation(
location=PointLocation.DOWN,
thresholds=[],
)
Loading

0 comments on commit 4169341

Please sign in to comment.