Skip to content

Commit

Permalink
Fix for timestamp datatype mismatch
Browse files Browse the repository at this point in the history
  • Loading branch information
ram-senth committed Mar 4, 2025
1 parent 62f6a83 commit aa95433
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 22 deletions.
14 changes: 6 additions & 8 deletions src/seer/anomaly_detection/accessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ def _hydrate_alert(
use_suss = [True] * n_points

n_predictions = len(db_alert.prophet_predictions)
prophet_timestamps = np.array([None] * n_predictions)
prophet_ys = np.array([None] * n_predictions)
prophet_yhats = np.array([None] * n_predictions)
prophet_yhat_lowers = np.array([None] * n_predictions)
prophet_yhat_uppers = np.array([None] * n_predictions)
prophet_timestamps = np.full(n_predictions, None)
prophet_ys = np.full(n_predictions, None)
prophet_yhats = np.full(n_predictions, None)
prophet_yhat_lowers = np.full(n_predictions, None)
prophet_yhat_uppers = np.full(n_predictions, None)

# If the timeseries does not have both matrix profiles, then we only use the suss window
only_suss = len(timeseries) > 0 and any(
Expand Down Expand Up @@ -483,15 +483,14 @@ def store_prophet_predictions(self, alert_id: int, predictions: ProphetPredictio
prediction_values = [
{
"dynamic_alert_id": alert_id,
"timestamp": datetime.fromtimestamp(predictions.timestamps[i]),
"timestamp": predictions.timestamps[i],
"yhat": predictions.yhat[i],
"yhat_lower": predictions.yhat_lower[i],
"yhat_upper": predictions.yhat_upper[i],
}
for i in range(len(predictions.timestamps))
]
stmt = insert(DbProphetAlertTimeSeries).values(prediction_values)

update_stmt = stmt.on_conflict_do_update(
index_elements=["dynamic_alert_id", "timestamp"],
set_={
Expand All @@ -500,6 +499,5 @@ def store_prophet_predictions(self, alert_id: int, predictions: ProphetPredictio
"yhat_upper": stmt.excluded.yhat_upper,
},
)

session.execute(update_stmt)
session.commit()
4 changes: 1 addition & 3 deletions src/seer/anomaly_detection/anomaly_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def _batch_detect(
config.time_period,
config.sensitivity,
)
prophet_df.ds = prophet_df.ds.astype(int) / 10**9
anomalies_suss = batch_detector.detect(
ts_internal,
config,
Expand Down Expand Up @@ -191,8 +192,6 @@ def _online_detect(
)
anomalies: MPTimeSeriesAnomalies = historic.anomalies

# TODO: Need to check the time gap between historic data and the new datapoint against the alert configuration

# Get the original flags from the historic data
original_flags = anomalies.original_flags
if original_flags is not None and len(original_flags) != len(
Expand Down Expand Up @@ -540,7 +539,6 @@ def store_data(
alert_data_accessor.store_prophet_predictions(
saved_alert_id, ProphetPrediction.from_prophet_df(prophet_df)
)

return StoreDataResponse(success=True)

@inject
Expand Down
34 changes: 27 additions & 7 deletions src/seer/anomaly_detection/detectors/anomaly_scorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def stream_score(
row_exists = (prophet_df["ds"] == streamed_timestamp).any()
if not row_exists:
logger.warning(
"Row for timestamp not found in prophet_df, skipping prophet scoring",
"Timestamp not found in prophet_df, skipping prophet scoring",
extra={"streamed_timestamp": streamed_timestamp},
)
return mp_flags_and_scores
Expand Down Expand Up @@ -225,12 +225,14 @@ def _merge_prophet_mp_results(
# todo: return prophet thresholds
def merge(timestamps, mp_flags, prophet_map):
flags = []
missing = 0
found = 0
previous_mp_flag: AnomalyFlags = "none"
missing_timestamps = []
for timestamp, mp_flag in zip(timestamps, mp_flags):
if pd.to_datetime(timestamp) in prophet_map["flag"]:
pd_dt = float(timestamp)
if pd_dt in prophet_map["flag"]:
found += 1
pd_dt = pd.to_datetime(timestamp)
prophet_flag = prophet_map["flag"][pd_dt]
prophet_score = prophet_map["score"][pd_dt]
prophet_flag = self._adjust_prophet_flag_for_location(
Expand All @@ -253,17 +255,35 @@ def merge(timestamps, mp_flags, prophet_map):
else:
flags.append("none")
else:
missing += 1
missing_timestamps.append(timestamp)
flags.append(mp_flag)
previous_mp_flag = mp_flag
# todo: publish metrics for found/total
# if debug:
# print(f"found {found} out of {len(timestamps)}")

if missing > 0:
logger.warning(
"Some of the MP flags did not have corresponding prophet flags",
extra={
"total mp flags": len(timestamps),
"missing prophet flags": missing,
"matching prophet flags": found,
"upto 5 missing timestamps": (
missing_timestamps[0:5]
if len(missing_timestamps) > 5
else missing_timestamps
),
"upto 5 prophet timestamps": (
list(prophet_map["flag"].keys())[0:5]
if len(prophet_map["flag"].keys()) > 5
else list(prophet_map["flag"].keys())
),
},
)
return flags

prophet_predictions_map = prophet_predictions.set_index("ds")[
["flag", "score", "y", "yhat", "yhat_lower", "yhat_upper"]
].to_dict()

flags = merge(timestamps, mp_flags_and_scores.flags, prophet_predictions_map)

return FlagsAndScores(
Expand Down
3 changes: 2 additions & 1 deletion src/seer/anomaly_detection/models/timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ def as_dataframe(self) -> pd.DataFrame:

@staticmethod
def from_prophet_df(prophet_df: pd.DataFrame) -> "ProphetPrediction":

return ProphetPrediction(
timestamps=np.array([date.timestamp() for date in prophet_df.ds], dtype=np.float64),
timestamps=np.array([pd.to_datetime(date, unit="s") for date in prophet_df.ds]),
y=np.array(prophet_df.y),
yhat=np.array(prophet_df.yhat),
yhat_lower=np.array(prophet_df.yhat_lower),
Expand Down
6 changes: 3 additions & 3 deletions tests/seer/anomaly_detection/detectors/test_anomaly_scorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ def test_prophet_negative_overrides_mp(

prophet_predictions = pd.DataFrame(
{
"ds": pd.to_datetime(timestamps),
"ds": timestamps,
"flag": ["none", "none", "none"],
"score": [0.5, 2.5, 0.3],
"y": [10.0, 20.0, 15.0],
Expand Down Expand Up @@ -456,7 +456,7 @@ def test_mp_and_prophet_both_high_confidence_anomaly(

prophet_predictions = pd.DataFrame(
{
"ds": pd.to_datetime(timestamps),
"ds": timestamps,
"flag": ["none", "anomaly_higher_confidence", "none"],
"score": [0.5, 2.5, 0.3],
"y": [10.0, 20.0, 15.0],
Expand Down Expand Up @@ -512,7 +512,7 @@ def test_prophet_high_confidence_anomaly(

prophet_predictions = pd.DataFrame(
{
"ds": pd.to_datetime(timestamps),
"ds": timestamps,
"flag": ["none", "anomaly_higher_confidence", "none"],
"score": [0.5, 2.5, 0.3],
"y": [10.0, 20.0, 15.0],
Expand Down

0 comments on commit aa95433

Please sign in to comment.