Skip to content

Commit

Permalink
Anomaly detection/remove dynamic window (#2078)
Browse files Browse the repository at this point in the history
  • Loading branch information
aayush-se authored and ram-senth committed Mar 5, 2025
1 parent 486425b commit 8b85677
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 123 deletions.
204 changes: 108 additions & 96 deletions src/seer/anomaly_detection/anomaly_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,31 +96,31 @@ def _batch_detect(
config.sensitivity,
)
prophet_df.ds = prophet_df.ds.astype(int) / 10**9
anomalies_suss = batch_detector.detect(
anomalies = batch_detector.detect(
ts_internal,
config,
algo_config=algo_config,
window_size=window_size,
prophet_df=prophet_df,
time_budget_ms=(
time_budget_ms // 2 if time_budget_ms else None
time_budget_ms if time_budget_ms else None
), # Time budget is split between the two detection calls
)
anomalies_fixed = batch_detector.detect(
convert_external_ts_to_internal(timeseries),
config,
algo_config=algo_config,
window_size=algo_config.mp_fixed_window_size,
time_budget_ms=(
time_budget_ms // 2 if time_budget_ms else None
), # Time budget is split between the two detection calls
prophet_df=prophet_df,
)
anomalies = DbAlertDataAccessor().combine_anomalies(
anomalies_suss, anomalies_fixed, [True] * len(timeseries)
# anomalies_fixed = batch_detector.detect(
# convert_external_ts_to_internal(timeseries),
# config,
# algo_config=algo_config,
# window_size=algo_config.mp_fixed_window_size,
# time_budget_ms=(
# time_budget_ms // 2 if time_budget_ms else None
# ), # Time budget is split between the two detection calls
# prophet_df=prophet_df,
# )
batch_anomalies = DbAlertDataAccessor().combine_anomalies(
anomalies, None, [True] * len(timeseries)
)

return timeseries, anomalies, prophet_df
return timeseries, batch_anomalies, prophet_df

@inject
@sentry_sdk.trace
Expand Down Expand Up @@ -206,64 +206,64 @@ def _online_detect(
# Run stream detection

# SuSS Window
stream_detector_suss = MPStreamAnomalyDetector(
stream_detector = MPStreamAnomalyDetector(
history_timestamps=historic.timeseries.timestamps,
history_values=historic.timeseries.values,
history_mp=anomalies.matrix_profile_suss,
window_size=anomalies.window_size,
original_flags=original_flags,
)
streamed_anomalies_suss = stream_detector_suss.detect(
streamed_anomalies = stream_detector.detect(
convert_external_ts_to_internal(ts_external),
config,
prophet_df=historic.prophet_predictions.as_dataframe(),
)

streamed_anomalies_fixed = None
if not historic.only_suss:
# Fixed Window
stream_detector_fixed = MPStreamAnomalyDetector(
history_timestamps=historic.timeseries.timestamps,
history_values=historic.timeseries.values,
history_mp=anomalies.matrix_profile_fixed,
window_size=10,
original_flags=original_flags,
)
streamed_anomalies_fixed = stream_detector_fixed.detect(
convert_external_ts_to_internal(ts_external),
config,
prophet_df=historic.prophet_predictions.as_dataframe(),
)

# Check if next detection should switch window
use_suss_window = anomalies.use_suss[-1]
if use_suss_window and streamed_anomalies_suss.flags[-1] == "anomaly_higher_confidence":
use_suss_window = False

# If we are using fixed window and we are past the SuSS anomalous region
if (
not use_suss_window
and streamed_anomalies_fixed.flags[-1] == "none"
and streamed_anomalies_suss.flags[-1] == "none"
):
use_suss_window = True

anomalies.use_suss.append(use_suss_window)

else:
anomalies.use_suss.append(True)

num_anomlies = len(streamed_anomalies_suss.flags)
streamed_anomalies = alert_data_accessor.combine_anomalies(
streamed_anomalies_suss, streamed_anomalies_fixed, anomalies.use_suss[-num_anomlies:]
# Commenting out in case we need to include dynamic window logic again
# streamed_anomalies_fixed = None
# if not historic.only_suss:
# # Fixed Window
# stream_detector_fixed = MPStreamAnomalyDetector(
# history_timestamps=historic.timeseries.timestamps,
# history_values=historic.timeseries.values,
# history_mp=anomalies.matrix_profile_fixed,
# window_size=10,
# original_flags=original_flags,
# )
# streamed_anomalies_fixed = stream_detector_fixed.detect(
# convert_external_ts_to_internal(ts_external), config
# )

# # Check if next detection should switch window
# use_suss_window = anomalies.use_suss[-1]
# if use_suss_window and streamed_anomalies_suss.flags[-1] == "anomaly_higher_confidence":
# use_suss_window = False

# # If we are using fixed window and we are past the SuSS anomalous region
# if (
# not use_suss_window
# and streamed_anomalies_fixed.flags[-1] == "none"
# and streamed_anomalies_suss.flags[-1] == "none"
# ):
# use_suss_window = True

# anomalies.use_suss.append(use_suss_window)

# else:
# anomalies.use_suss.append(True)
anomalies.use_suss.append(True)

num_anomlies = len(streamed_anomalies.flags)
streamed_anomalies_online = alert_data_accessor.combine_anomalies(
streamed_anomalies, None, anomalies.use_suss[-num_anomlies:]
)

# Save new data point
alert_data_accessor.save_timepoint(
external_alert_id=alert.id,
timepoint=ts_external[0],
anomaly=streamed_anomalies,
anomaly_algo_data=streamed_anomalies.get_anomaly_algo_data(len(ts_external))[0],
anomaly_algo_data=streamed_anomalies_online.get_anomaly_algo_data(len(ts_external))[0],
)

# Delayed import due to circular imports
Expand Down Expand Up @@ -291,7 +291,7 @@ def _online_detect(
sentry_sdk.capture_exception(e)
logger.exception(e)

return ts_external, streamed_anomalies
return ts_external, streamed_anomalies_online

def _min_required_timesteps(self, time_period, min_num_days=7):
return int(min_num_days * 24 * 60 / time_period)
Expand Down Expand Up @@ -347,76 +347,88 @@ def _combo_detect(

historic = convert_external_ts_to_internal(ts_with_history.history)
# We are doing 4 detection operations, so allocating 1/4th of the time budget for each one.
time_budget_ms = time_budget_ms // 4 if time_budget_ms else None
time_budget_ms = time_budget_ms // 2 if time_budget_ms else None

# Run batch detect on history data
batch_detector = MPBatchAnomalyDetector()
historic_anomalies_suss = batch_detector.detect(
historic_anomalies = batch_detector.detect(
historic,
config,
time_budget_ms=time_budget_ms,
# TODO Compute and pass prophet df?
)
historic_anomalies_fixed = batch_detector.detect(
historic,
config,
window_size=10,
time_budget_ms=time_budget_ms,
# TODO Compute and pass prophet df?
)
# historic_anomalies_fixed = batch_detector.detect(
# historic,
# config,
# window_size=10,
# time_budget_ms=time_budget_ms,
# # TODO Compute and pass prophet df?
# )

# Run stream detection on current data
# SuSS Window
stream_detector_suss = MPStreamAnomalyDetector(
stream_detector = MPStreamAnomalyDetector(
history_timestamps=historic.timestamps,
history_values=historic.values,
history_mp=historic_anomalies_suss.matrix_profile,
window_size=historic_anomalies_suss.window_size,
original_flags=historic_anomalies_suss.original_flags,
history_mp=historic_anomalies.matrix_profile,
window_size=historic_anomalies.window_size,
original_flags=historic_anomalies.original_flags,
)
streamed_anomalies_suss = stream_detector_suss.detect(
streamed_anomalies = stream_detector.detect(
convert_external_ts_to_internal(ts_external),
config,
time_budget_ms=time_budget_ms,
# TODO Compute and pass prophet df?
)

# Fixed Window
stream_detector_fixed = MPStreamAnomalyDetector(
history_timestamps=historic.timestamps,
history_values=historic.values,
history_mp=historic_anomalies_fixed.matrix_profile,
window_size=historic_anomalies_fixed.window_size,
original_flags=historic_anomalies_fixed.original_flags,
)
streamed_anomalies_fixed = stream_detector_fixed.detect(
convert_external_ts_to_internal(ts_external),
config,
time_budget_ms=time_budget_ms,
# TODO Compute and pass prophet df?
)
# stream_detector_fixed = MPStreamAnomalyDetector(
# history_timestamps=historic.timestamps,
# history_values=historic.values,
# history_mp=historic_anomalies_fixed.matrix_profile,
# window_size=historic_anomalies_fixed.window_size,
# original_flags=historic_anomalies_fixed.original_flags,
# )
# streamed_anomalies_fixed = stream_detector_fixed.detect(
# convert_external_ts_to_internal(ts_external),
# config,
# time_budget_ms=time_budget_ms,
# # TODO Compute and pass prophet df?
# )

# Fixed Window
# stream_detector_fixed = MPStreamAnomalyDetector(
# history_timestamps=historic.timestamps,
# history_values=historic.values,
# history_mp=historic_anomalies_fixed.matrix_profile,
# window_size=historic_anomalies_fixed.window_size,
# original_flags=historic_anomalies_fixed.original_flags,
# )
# streamed_anomalies_fixed = stream_detector_fixed.detect(
# convert_external_ts_to_internal(ts_external), config, time_budget_ms=time_budget_ms
# )

if trim_current_by > 0:
ts_external = ts_with_history.history[-trim_current_by:] + ts_external
streamed_anomalies_suss.flags = (
historic_anomalies_suss.flags[-trim_current_by:] + streamed_anomalies_suss.flags
)
streamed_anomalies_suss.scores = (
historic_anomalies_suss.scores[-trim_current_by:] + streamed_anomalies_suss.scores
)
streamed_anomalies_fixed.flags = (
historic_anomalies_fixed.flags[-trim_current_by:] + streamed_anomalies_fixed.flags
streamed_anomalies.flags = (
historic_anomalies.flags[-trim_current_by:] + streamed_anomalies.flags
)
streamed_anomalies_fixed.scores = (
historic_anomalies_fixed.scores[-trim_current_by:] + streamed_anomalies_fixed.scores
streamed_anomalies.scores = (
historic_anomalies.scores[-trim_current_by:] + streamed_anomalies.scores
)
# streamed_anomalies_fixed.flags = (
# historic_anomalies_fixed.flags[-trim_current_by:] + streamed_anomalies_fixed.flags
# )
# streamed_anomalies_fixed.scores = (
# historic_anomalies_fixed.scores[-trim_current_by:] + streamed_anomalies_fixed.scores
# )

anomalies = DbAlertDataAccessor().combine_anomalies(
streamed_anomalies_suss,
streamed_anomalies_fixed,
streamed_anomalies,
None,
[True]
* len(
streamed_anomalies_suss.flags
streamed_anomalies.flags
), # Defaulting to using SuSS window because switching logic is for streaming only
)

Expand Down
20 changes: 10 additions & 10 deletions src/seer/anomaly_detection/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,25 +185,25 @@ def _update_matrix_profiles(
anomalies_suss = MPBatchAnomalyDetector()._compute_matrix_profile(
timeseries=timeseries, ad_config=anomaly_detection_config, algo_config=algo_config
)
anomalies_fixed = MPBatchAnomalyDetector()._compute_matrix_profile(
timeseries=timeseries,
ad_config=anomaly_detection_config,
algo_config=algo_config,
window_size=algo_config.mp_fixed_window_size,
)
# anomalies_fixed = MPBatchAnomalyDetector()._compute_matrix_profile(
# timeseries=timeseries,
# ad_config=anomaly_detection_config,
# algo_config=algo_config,
# window_size=algo_config.mp_fixed_window_size,
# )
anomalies = DbAlertDataAccessor().combine_anomalies(
anomalies_suss, anomalies_fixed, [True] * len(timeseries.timestamps)
anomalies_suss, None, [True] * len(timeseries.timestamps)
)

algo_data_map = dict(
zip(timeseries.timestamps, anomalies.get_anomaly_algo_data(len(timeseries.timestamps)))
)
updateed_timeseries_points = 0
updated_timeseries_points = 0
for timestep in alert.timeseries:
timestep.anomaly_algo_data = algo_data_map[timestep.timestamp.timestamp()]
updateed_timeseries_points += 1
updated_timeseries_points += 1
alert.anomaly_algo_data = {"window_size": anomalies.window_size}
return updateed_timeseries_points
return updated_timeseries_points


@sentry_sdk.trace
Expand Down
10 changes: 6 additions & 4 deletions tests/seer/anomaly_detection/test_anomaly_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ def test_detect_anomalies_online_switch_window(
window_size=window_size,
thresholds=[],
original_flags=np.array(["none"] * len(ts_timestamps)),
use_suss=np.array([False] * len(ts_timestamps)),
use_suss=np.array([True] * len(ts_timestamps)),
),
prophet_predictions=ProphetPrediction(
timestamps=np.array([]),
Expand All @@ -393,7 +393,7 @@ def test_detect_anomalies_online_switch_window(
yhat_upper=np.array([]),
),
cleanup_predict_config=cleanup_predict_config,
only_suss=False,
only_suss=True,
data_purge_flag=TaskStatus.NOT_QUEUED,
last_queued_at=None,
)
Expand All @@ -420,7 +420,9 @@ def test_detect_anomalies_online_switch_window(
)
response = AnomalyDetection().detect_anomalies(request=request)

assert mock_stream_detector.call_count == 2
assert (
mock_stream_detector.call_count == 1
) # Only called once because the window should not switch
assert isinstance(response, DetectAnomaliesResponse)
assert isinstance(response.timeseries, list)
assert len(response.timeseries) == 1 # Checking just 1 streamed value
Expand Down Expand Up @@ -461,7 +463,7 @@ def test_detect_anomalies_online_switch_window(

response = AnomalyDetection().detect_anomalies(request=request)

assert mock_stream_detector.call_count == 3
assert mock_stream_detector.call_count == 2
assert isinstance(response, DetectAnomaliesResponse)
assert isinstance(response.timeseries, list)
assert len(response.timeseries) == 1 # Checking just 1 streamed value
Expand Down
Loading

0 comments on commit 8b85677

Please sign in to comment.