From aa954330e1dfb1bb6cacb9f2980c96f15f2c3437 Mon Sep 17 00:00:00 2001 From: Ram Senthamarai Date: Mon, 3 Mar 2025 19:16:57 -0800 Subject: [PATCH] Fix for timestamp datatype mismatch --- src/seer/anomaly_detection/accessors.py | 14 ++++---- .../anomaly_detection/anomaly_detection.py | 4 +-- .../detectors/anomaly_scorer.py | 34 +++++++++++++++---- .../anomaly_detection/models/timeseries.py | 3 +- .../detectors/test_anomaly_scorer.py | 6 ++-- 5 files changed, 39 insertions(+), 22 deletions(-) diff --git a/src/seer/anomaly_detection/accessors.py b/src/seer/anomaly_detection/accessors.py index 69635c3cf..5c98802b3 100644 --- a/src/seer/anomaly_detection/accessors.py +++ b/src/seer/anomaly_detection/accessors.py @@ -118,11 +118,11 @@ def _hydrate_alert( use_suss = [True] * n_points n_predictions = len(db_alert.prophet_predictions) - prophet_timestamps = np.array([None] * n_predictions) - prophet_ys = np.array([None] * n_predictions) - prophet_yhats = np.array([None] * n_predictions) - prophet_yhat_lowers = np.array([None] * n_predictions) - prophet_yhat_uppers = np.array([None] * n_predictions) + prophet_timestamps = np.full(n_predictions, None) + prophet_ys = np.full(n_predictions, None) + prophet_yhats = np.full(n_predictions, None) + prophet_yhat_lowers = np.full(n_predictions, None) + prophet_yhat_uppers = np.full(n_predictions, None) # If the timeseries does not have both matrix profiles, then we only use the suss window only_suss = len(timeseries) > 0 and any( @@ -483,7 +483,7 @@ def store_prophet_predictions(self, alert_id: int, predictions: ProphetPredictio prediction_values = [ { "dynamic_alert_id": alert_id, - "timestamp": datetime.fromtimestamp(predictions.timestamps[i]), + "timestamp": predictions.timestamps[i], "yhat": predictions.yhat[i], "yhat_lower": predictions.yhat_lower[i], "yhat_upper": predictions.yhat_upper[i], @@ -491,7 +491,6 @@ def store_prophet_predictions(self, alert_id: int, predictions: ProphetPredictio for i in range(len(predictions.timestamps)) ] stmt = insert(DbProphetAlertTimeSeries).values(prediction_values) - update_stmt = stmt.on_conflict_do_update( index_elements=["dynamic_alert_id", "timestamp"], set_={ @@ -500,6 +499,5 @@ def store_prophet_predictions(self, alert_id: int, predictions: ProphetPredictio "yhat_upper": stmt.excluded.yhat_upper, }, ) - session.execute(update_stmt) session.commit() diff --git a/src/seer/anomaly_detection/anomaly_detection.py b/src/seer/anomaly_detection/anomaly_detection.py index 46a2f3486..c7d7f8b20 100644 --- a/src/seer/anomaly_detection/anomaly_detection.py +++ b/src/seer/anomaly_detection/anomaly_detection.py @@ -95,6 +95,7 @@ def _batch_detect( config.time_period, config.sensitivity, ) + prophet_df.ds = prophet_df.ds.astype(int) / 10**9 anomalies_suss = batch_detector.detect( ts_internal, config, @@ -191,8 +192,6 @@ def _online_detect( ) anomalies: MPTimeSeriesAnomalies = historic.anomalies - # TODO: Need to check the time gap between historic data and the new datapoint against the alert configuration - # Get the original flags from the historic data original_flags = anomalies.original_flags if original_flags is not None and len(original_flags) != len( @@ -540,7 +539,6 @@ def store_data( alert_data_accessor.store_prophet_predictions( saved_alert_id, ProphetPrediction.from_prophet_df(prophet_df) ) - return StoreDataResponse(success=True) @inject diff --git a/src/seer/anomaly_detection/detectors/anomaly_scorer.py b/src/seer/anomaly_detection/detectors/anomaly_scorer.py index 7c8ea8b0d..7c70f3af5 100644 --- a/src/seer/anomaly_detection/detectors/anomaly_scorer.py +++ b/src/seer/anomaly_detection/detectors/anomaly_scorer.py @@ -174,7 +174,7 @@ def stream_score( row_exists = (prophet_df["ds"] == streamed_timestamp).any() if not row_exists: logger.warning( - "Row for timestamp not found in prophet_df, skipping prophet scoring", + "Timestamp not found in prophet_df, skipping prophet scoring", extra={"streamed_timestamp": streamed_timestamp}, ) return mp_flags_and_scores @@ -225,12 +225,14 @@ def _merge_prophet_mp_results( # todo: return prophet thresholds def merge(timestamps, mp_flags, prophet_map): flags = [] + missing = 0 found = 0 previous_mp_flag: AnomalyFlags = "none" + missing_timestamps = [] for timestamp, mp_flag in zip(timestamps, mp_flags): - if pd.to_datetime(timestamp) in prophet_map["flag"]: + pd_dt = float(timestamp) + if pd_dt in prophet_map["flag"]: found += 1 - pd_dt = pd.to_datetime(timestamp) prophet_flag = prophet_map["flag"][pd_dt] prophet_score = prophet_map["score"][pd_dt] prophet_flag = self._adjust_prophet_flag_for_location( @@ -253,17 +255,35 @@ def merge(timestamps, mp_flags, prophet_map): else: flags.append("none") else: + missing += 1 + missing_timestamps.append(timestamp) flags.append(mp_flag) previous_mp_flag = mp_flag - # todo: publish metrics for found/total - # if debug: - # print(f"found {found} out of {len(timestamps)}") + + if missing > 0: + logger.warning( + "Some of the MP flags did not have corresponding prophet flags", + extra={ + "total mp flags": len(timestamps), + "missing prophet flags": missing, + "matching prophet flags": found, + "upto 5 missing timestamps": ( + missing_timestamps[0:5] + if len(missing_timestamps) > 5 + else missing_timestamps + ), + "upto 5 prophet timestamps": ( + list(prophet_map["flag"].keys())[0:5] + if len(prophet_map["flag"].keys()) > 5 + else list(prophet_map["flag"].keys()) + ), + }, + ) return flags prophet_predictions_map = prophet_predictions.set_index("ds")[ ["flag", "score", "y", "yhat", "yhat_lower", "yhat_upper"] ].to_dict() - flags = merge(timestamps, mp_flags_and_scores.flags, prophet_predictions_map) return FlagsAndScores( diff --git a/src/seer/anomaly_detection/models/timeseries.py b/src/seer/anomaly_detection/models/timeseries.py index 6738cae8f..c964f578c 100644 --- a/src/seer/anomaly_detection/models/timeseries.py +++ b/src/seer/anomaly_detection/models/timeseries.py @@ -50,8 +50,9 @@ def as_dataframe(self) -> pd.DataFrame: @staticmethod def from_prophet_df(prophet_df: pd.DataFrame) -> "ProphetPrediction": + return ProphetPrediction( - timestamps=np.array([date.timestamp() for date in prophet_df.ds], dtype=np.float64), + timestamps=np.array([pd.to_datetime(date, unit="s") for date in prophet_df.ds]), y=np.array(prophet_df.y), yhat=np.array(prophet_df.yhat), yhat_lower=np.array(prophet_df.yhat_lower), diff --git a/tests/seer/anomaly_detection/detectors/test_anomaly_scorer.py b/tests/seer/anomaly_detection/detectors/test_anomaly_scorer.py index 5195a3e9e..eff4e102a 100644 --- a/tests/seer/anomaly_detection/detectors/test_anomaly_scorer.py +++ b/tests/seer/anomaly_detection/detectors/test_anomaly_scorer.py @@ -401,7 +401,7 @@ def test_prophet_negative_overrides_mp( prophet_predictions = pd.DataFrame( { - "ds": pd.to_datetime(timestamps), + "ds": timestamps, "flag": ["none", "none", "none"], "score": [0.5, 2.5, 0.3], "y": [10.0, 20.0, 15.0], @@ -456,7 +456,7 @@ def test_mp_and_prophet_both_high_confidence_anomaly( prophet_predictions = pd.DataFrame( { - "ds": pd.to_datetime(timestamps), + "ds": timestamps, "flag": ["none", "anomaly_higher_confidence", "none"], "score": [0.5, 2.5, 0.3], "y": [10.0, 20.0, 15.0], @@ -512,7 +512,7 @@ def test_prophet_high_confidence_anomaly( prophet_predictions = pd.DataFrame( { - "ds": pd.to_datetime(timestamps), + "ds": timestamps, "flag": ["none", "anomaly_higher_confidence", "none"], "score": [0.5, 2.5, 0.3], "y": [10.0, 20.0, 15.0],