Skip to content

Commit

Permalink
Move the first round check and the backup algo code to the location s…
Browse files Browse the repository at this point in the history
…moothing file

This addresses a long-term TODO
https://github.com/e-mission/e-mission-server/blob/master/emission/analysis/intake/cleaning/cleaning_methods/jump_smoothing.py#L262

It also:
- ensures that the individual algorithms are clean and modular and don't depend on other algorithms
- we can swap in any algorithm for the backup algo
- we can support more complex backups in the future

Testing done:
- modified the test to pass in the backup algo
- tests pass
  • Loading branch information
shankari committed Jan 24, 2023
1 parent 67f5c86 commit cebb81f
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -259,41 +259,6 @@ def filter(self, with_speeds_df):
logging.debug("after setting values, outlier_mask = %s" % np.nonzero((self.inlier_mask_ == False).to_numpy()))
# logging.debug("point details are %s" % with_speeds_df[np.logical_not(self.inlier_mask_)])

# TODO: This is not the right place for this - adds too many dependencies
# Should do this in the outer class in general so that we can do
# multiple passes of any filtering algorithm
import emission.analysis.intake.cleaning.cleaning_methods.speed_outlier_detection as cso
import emission.analysis.intake.cleaning.location_smoothing as ls

recomputed_speeds_df = ls.recalc_speed(self.with_speeds_df[self.inlier_mask_])
recomputed_threshold = cso.BoxplotOutlier(ignore_zeros = True).get_threshold(recomputed_speeds_df)
logging.info("After first round, recomputed max = %s, recomputed threshold = %s" %
(recomputed_speeds_df.speed.max(), recomputed_threshold))
# assert recomputed_speeds_df[recomputed_speeds_df.speed > recomputed_threshold].shape[0] == 0, "After first round, still have outliers %s" % recomputed_speeds_df[recomputed_speeds_df.speed > recomputed_threshold]
if recomputed_speeds_df[recomputed_speeds_df.speed > recomputed_threshold].shape[0] != 0:
logging.info("After first round, still have outliers %s" % recomputed_speeds_df[recomputed_speeds_df.speed > recomputed_threshold])
MACH1 = 340.29
if recomputed_speeds_df.speed.max() > MACH1:
backup_filtering_algo = SmoothPosdap(MACH1)
backup_filtering_algo.filter(with_speeds_df)

recomputed_speeds_df = ls.recalc_speed(self.with_speeds_df[backup_filtering_algo.inlier_mask_])
recomputed_threshold = cso.BoxplotOutlier(ignore_zeros = True).get_threshold(recomputed_speeds_df)
logging.info("After second round, max = %s, recomputed threshold = %s" %
(recomputed_speeds_df.speed.max(), recomputed_threshold))
# assert recomputed_speeds_df[recomputed_speeds_df.speed > recomputed_threshold].shape[0] == 0, "After first round, still have outliers %s" % recomputed_speeds_df[recomputed_speeds_df.speed > recomputed_threshold]
if recomputed_speeds_df[recomputed_speeds_df.speed > recomputed_threshold].shape[0] == 0:
logging.info("After second round, no outliers, returning backup mask %s" % backup_filtering_algo.inlier_mask_)
self.inlier_mask_ = backup_filtering_algo.inlier_mask_
else:
logging.info("After second round, still have outliers %s" % recomputed_speeds_df[recomputed_speeds_df.speed > recomputed_threshold])
if recomputed_speeds_df.speed.max() > MACH1:
logging.info("And they are also > %s, backup algo also failed" % MACH1)
else:
logging.debug("But they are all < %s, so returning outliers %s" %
(MACH1, np.nonzero(np.logical_not(backup_filtering_algo.inlier_mask_))))
self.inlier_mask_ = backup_filtering_algo.inlier_mask_


### Re-implemented from the prior POSDAP algorithm
### This does seem to use some kind of max speed
Expand Down
47 changes: 44 additions & 3 deletions emission/analysis/intake/cleaning/location_smoothing.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

# This is what we use in the segmentation code to see if the points are "the same"
DEFAULT_SAME_POINT_DISTANCE = 100
MACH1 = 340.29

def recalc_speed(points_df):
"""
Expand Down Expand Up @@ -139,9 +140,10 @@ def filter_jumps(user_id, section_id):
logging.debug("Found iOS section, filling in gaps with fake data")
section_points_df = _ios_fill_fake_data(section_points_df)
filtering_algo = eaicj.SmoothZigzag(is_ios, DEFAULT_SAME_POINT_DISTANCE)
backup_filtering_algo = eaicj.SmoothPosdap(MACH1)

logging.debug("len(section_points_df) = %s" % len(section_points_df))
points_to_ignore_df = get_points_to_filter(section_points_df, outlier_algo, filtering_algo)
points_to_ignore_df = get_points_to_filter(section_points_df, outlier_algo, filtering_algo, backup_filtering_algo)
if points_to_ignore_df is None:
# There were no points to delete
return
Expand All @@ -163,7 +165,7 @@ def filter_jumps(user_id, section_id):
result_entry = ecwe.Entry.create_entry(user_id, "analysis/smoothing", filter_result)
ts.insert(result_entry)

def get_points_to_filter(section_points_df, outlier_algo, filtering_algo):
def get_points_to_filter(section_points_df, outlier_algo, filtering_algo, backup_filtering_algo):
"""
From the incoming dataframe, filter out large jumps using the specified outlier detection algorithm and
the specified filtering algorithm.
Expand All @@ -187,7 +189,46 @@ def get_points_to_filter(section_points_df, outlier_algo, filtering_algo):
if filtering_algo is not None:
try:
filtering_algo.filter(with_speeds_df)
to_delete_mask = np.logical_not(filtering_algo.inlier_mask_)
recomputed_speeds_df = recalc_speed(with_speeds_df[filtering_algo.inlier_mask_])
recomputed_threshold = outlier_algo.get_threshold(recomputed_speeds_df)
logging.info("After first round, recomputed max = %s, recomputed threshold = %s" %
(recomputed_speeds_df.speed.max(), recomputed_threshold))
# assert recomputed_speeds_df[recomputed_speeds_df.speed > recomputed_threshold].shape[0] == 0, "After first round, still have outliers %s" % recomputed_speeds_df[recomputed_speeds_df.speed > recomputed_threshold]
if recomputed_speeds_df[recomputed_speeds_df.speed > recomputed_threshold].shape[0] == 0:
logging.info("No outliers after first round, default algo worked, to_delete = %s" %
np.nonzero(np.logical_not(filtering_algo.inlier_mask_)))
sel_inlier_mask = filtering_algo.inlier_mask
else:
logging.info("After first round, still have outliers %s" % recomputed_speeds_df[recomputed_speeds_df.speed > recomputed_threshold])
if backup_filtering_algo is None or recomputed_speeds_df.speed.max() < MACH1:
logging.debug("backup algo is %s, max < MACH1 %s, so returning default algo outliers %s" %
(MACH1, np.nonzero(np.logical_not(filtering_algo.inlier_mask_))))
sel_inlier_mask_ = filtering_algo.inlier_mask_
else:
backup_filtering_algo.filter(with_speeds_df)
recomputed_speeds_df = recalc_speed(with_speeds_df[backup_filtering_algo.inlier_mask_])
recomputed_threshold = outlier_algo.get_threshold(recomputed_speeds_df)
logging.info("After second round, max = %s, recomputed threshold = %s" %
(recomputed_speeds_df.speed.max(), recomputed_threshold))
# assert recomputed_speeds_df[recomputed_speeds_df.speed > recomputed_threshold].shape[0] == 0, "After first round, still have outliers %s" % recomputed_speeds_df[recomputed_speeds_df.speed > recomputed_threshold]
if recomputed_speeds_df[recomputed_speeds_df.speed > recomputed_threshold].shape[0] == 0:
logging.info("After second round, no outliers, returning backup to delete %s" %
np.nonzero(np.logical_not(backup_filtering_algo.inlier_mask_)))
sel_inlier_mask_ = backup_filtering_algo.inlier_mask_
else:
logging.info("After second round, still have outliers %s" % recomputed_speeds_df[recomputed_speeds_df.speed > recomputed_threshold])
if recomputed_speeds_df.speed.max() < MACH1:
logging.debug("But they are all < %s, so returning backup to delete %s" %
(MACH1, np.nonzero(np.logical_not(backup_filtering_algo.inlier_mask_))))
sel_inlier_mask_ = backup_filtering_algo.inlier_mask_
else:
logging.info("And they are also > %s, backup algo also failed, returning default to delete = %s" %
(MACH1, np.nonzero(np.logical_not(filtering_algo.inlier_mask_))))
sel_inlier_mask_ = filtering_algo.inlier_mask_

to_delete_mask = np.logical_not(sel_inlier_mask_)
logging.info("After all checks, inlier mask = %s, outlier_mask = %s" %
(np.nonzero(sel_inlier_mask_), np.nonzero(np.logical_not(sel_inlier_mask_))))
return with_speeds_df[to_delete_mask]
except Exception as e:
logging.exception("Caught error %s while processing section, skipping..." % e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,20 +160,21 @@ def testFilterAllClusters(self):

outlier_algo = eaics.BoxplotOutlier()
jump_algo = eaicj.SmoothZigzag(False, 100)
backup_algo = eaicj.SmoothPosdap(eaicl.MACH1)

# US to ocean jump: case 1 of https://github.com/e-mission/e-mission-docs/issues/843
with_speeds_df = pd.read_csv("emission/tests/data/smoothing_data/all_cluster_case_1.csv", index_col=0)
with_speeds_df.drop(["distance", "speed", "heading"], axis="columns", inplace=True)
with_speeds_df["loc"] = with_speeds_df["loc"].apply(lambda lstr: json.loads(lstr.replace("'", '"')))
filtered_points = eaicl.get_points_to_filter(with_speeds_df, outlier_algo, jump_algo)
filtered_points = eaicl.get_points_to_filter(with_speeds_df, outlier_algo, jump_algo, backup_algo)
expected_result_idx = list(range(16, 21))
self.assertEqual(list(filtered_points.dropna().index), expected_result_idx)

# PR to pakistan jump: case 2 of https://github.com/e-mission/e-mission-docs/issues/843
with_speeds_df = pd.read_csv("emission/tests/data/smoothing_data/all_cluster_case_2.csv")
with_speeds_df.drop(["distance", "speed", "heading"], axis="columns", inplace=True)
with_speeds_df["loc"] = with_speeds_df["loc"].apply(lambda lstr: json.loads(lstr.replace("'", '"')))
filtered_points = eaicl.get_points_to_filter(with_speeds_df, outlier_algo, jump_algo)
filtered_points = eaicl.get_points_to_filter(with_speeds_df, outlier_algo, jump_algo, backup_algo)
expected_result_idx = [11]
self.assertEqual(list(filtered_points.dropna().index), expected_result_idx)

Expand Down

0 comments on commit cebb81f

Please sign in to comment.