Skip to content

Commit

Permalink
Merge pull request #897 from shankari/fix_big_jump_all_cluster
Browse files Browse the repository at this point in the history
PR to filter big jumps even if all segments are in clusters
  • Loading branch information
shankari authored Jan 25, 2023
2 parents 2f70994 + 29e78de commit b7749d0
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ def filter(self, with_speeds_df):
(self.segment_list, len(self.segment_list)))
if len(self.segment_list) == 1:
# there were no jumps, so there's nothing to do
logging.info("No jumps, nothing to filter")
logging.info("No jumps, nothing to filter, early return")
self.inlier_mask_ = self.inlier_mask_.to_numpy()
return
start_segment_idx = self.find_start_segment(self.segment_list)
self.segment_list[start_segment_idx].state = Segment.State.GOOD
Expand All @@ -256,21 +257,13 @@ def filter(self, with_speeds_df):
for segment in bad_segments:
self.inlier_mask_[segment.start:segment.end] = False

logging.debug("after setting values, outlier_mask = %s" % np.nonzero((self.inlier_mask_ == False).to_numpy()))
self.inlier_mask_ = self.inlier_mask_.to_numpy()
logging.debug("after setting values, outlier_mask = %s" % np.nonzero(np.logical_not(self.inlier_mask_)))
# logging.debug("point details are %s" % with_speeds_df[np.logical_not(self.inlier_mask_)])

# TODO: This is not the right place for this - adds too many dependencies
# Should do this in the outer class in general so that we can do
# multiple passes of any filtering algorithm
import emission.analysis.intake.cleaning.cleaning_methods.speed_outlier_detection as cso
import emission.analysis.intake.cleaning.location_smoothing as ls

recomputed_speeds_df = ls.recalc_speed(self.with_speeds_df[self.inlier_mask_])
recomputed_threshold = cso.BoxplotOutlier(ignore_zeros = True).get_threshold(recomputed_speeds_df)
# assert recomputed_speeds_df[recomputed_speeds_df.speed > recomputed_threshold].shape[0] == 0, "After first round, still have outliers %s" % recomputed_speeds_df[recomputed_speeds_df.speed > recomputed_threshold]
if recomputed_speeds_df[recomputed_speeds_df.speed > recomputed_threshold].shape[0] != 0:
logging.info("After first round, still have outliers %s" % recomputed_speeds_df[recomputed_speeds_df.speed > recomputed_threshold])

### Re-implemented from the prior POSDAP algorithm
### This does seem to use some kind of max speed

class SmoothPosdap(object):
def __init__(self, maxSpeed = 100):
Expand Down Expand Up @@ -324,8 +317,8 @@ def filter(self, with_speeds_df):
logging.info("len(last_segment) = %d, len(curr_segment) = %d, skipping" %
(len(last_segment), len(curr_segment)))
continue
get_coords = lambda i: [with_speeds_df.iloc[i]["mLongitude"], with_speeds_df.iloc[i]["mLatitude"]]
get_ts = lambda i: with_speeds_df.iloc[i]["mTime"]
get_coords = lambda i: [with_speeds_df.iloc[i]["longitude"], with_speeds_df.iloc[i]["latitude"]]
get_ts = lambda i: with_speeds_df.iloc[i]["ts"]
# I don't know why they would use time instead of distance, but
# this is what the existing POSDAP code does.
print("About to compare curr_segment duration %s with last segment duration %s" %
Expand All @@ -338,7 +331,7 @@ def filter(self, with_speeds_df):
for curr_idx in curr_segment:
print("Comparing distance %s with speed %s * time %s = %s" %
(math.fabs(ec.calDistance(get_coords(ref_idx), get_coords(curr_idx))),
old_div(self.maxSpeed, 100), abs(get_ts(ref_idx) - get_ts(curr_idx)),
self.maxSpeed / 100, abs(get_ts(ref_idx) - get_ts(curr_idx)),
self.maxSpeed / 100 * abs(get_ts(ref_idx) - get_ts(curr_idx))))

if (math.fabs(ec.calDistance(get_coords(ref_idx), get_coords(curr_idx))) >
Expand All @@ -351,14 +344,16 @@ def filter(self, with_speeds_df):
for curr_idx in reversed(last_segment):
print("Comparing distance %s with speed %s * time %s = %s" %
(math.fabs(ec.calDistance(get_coords(ref_idx), get_coords(curr_idx))),
old_div(self.maxSpeed, 1000) , abs(get_ts(ref_idx) - get_ts(curr_idx)),
self.maxSpeed / 1000, abs(get_ts(ref_idx) - get_ts(curr_idx)),
self.maxSpeed / 1000 * abs(get_ts(ref_idx) - get_ts(curr_idx))))
if (abs(ec.calDistance(get_coords(ref_idx), get_coords(curr_idx))) >
(self.maxSpeed / 1000 * abs(get_ts(ref_idx) - get_ts(curr_idx)))):
print("Distance is greater than max speed * time, deleting %s" % curr_idx)
self.inlier_mask_[curr_idx] = False
last_segment = curr_segment
logging.info("Filtering complete, removed indices = %s" % np.nonzero(self.inlier_mask_))
self.outlier_mask_ = np.logical_not(self.inlier_mask_)
logging.info("Filtering complete, retained indices = %s, removed indices = %s" %
(np.nonzero(self.inlier_mask_), np.nonzero(self.outlier_mask_)))

class SmoothPiecewiseRansac(object):
def __init__(self, maxSpeed = 100):
Expand Down
96 changes: 57 additions & 39 deletions emission/analysis/intake/cleaning/location_smoothing.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

# This is what we use in the segmentation code to see if the points are "the same"
DEFAULT_SAME_POINT_DISTANCE = 100
MACH1 = 340.29

def recalc_speed(points_df):
"""
Expand Down Expand Up @@ -139,15 +140,16 @@ def filter_jumps(user_id, section_id):
logging.debug("Found iOS section, filling in gaps with fake data")
section_points_df = _ios_fill_fake_data(section_points_df)
filtering_algo = eaicj.SmoothZigzag(is_ios, DEFAULT_SAME_POINT_DISTANCE)
backup_filtering_algo = eaicj.SmoothPosdap(MACH1)

logging.debug("len(section_points_df) = %s" % len(section_points_df))
points_to_ignore_df = get_points_to_filter(section_points_df, outlier_algo, filtering_algo)
(sel_algo, points_to_ignore_df) = get_points_to_filter(section_points_df, outlier_algo, filtering_algo, backup_filtering_algo)
if points_to_ignore_df is None:
# There were no points to delete
return
points_to_ignore_df_filtered = points_to_ignore_df._id.dropna()
logging.debug("after filtering ignored points, %s -> %s" %
(len(points_to_ignore_df), len(points_to_ignore_df_filtered)))
logging.debug("after filtering ignored points, using %s, %s -> %s" %
(sel_algo, len(points_to_ignore_df), len(points_to_ignore_df_filtered)))
# We shouldn't really filter any fuzzed points because they represent 100m in 60 secs
# but let's actually check for that
# assert len(points_to_ignore_df) == len(points_to_ignore_df_filtered)
Expand All @@ -158,12 +160,12 @@ def filter_jumps(user_id, section_id):
filter_result.section = section_id
filter_result.deleted_points = deleted_point_id_list
filter_result.outlier_algo = "BoxplotOutlier"
filter_result.filtering_algo = "SmoothZigzag"
filter_result.filtering_algo = sel_algo.__class__.__name__.split(".")[-1]

result_entry = ecwe.Entry.create_entry(user_id, "analysis/smoothing", filter_result)
ts.insert(result_entry)

def get_points_to_filter(section_points_df, outlier_algo, filtering_algo):
def get_points_to_filter(section_points_df, outlier_algo, filtering_algo, backup_filtering_algo):
"""
From the incoming dataframe, filter out large jumps using the specified outlier detection algorithm and
the specified filtering algorithm.
Expand All @@ -187,43 +189,59 @@ def get_points_to_filter(section_points_df, outlier_algo, filtering_algo):
if filtering_algo is not None:
try:
filtering_algo.filter(with_speeds_df)
to_delete_mask = np.logical_not(filtering_algo.inlier_mask_)
return with_speeds_df[to_delete_mask]
except Exception as e:
logging.info("Caught error %s while processing section, skipping..." % e)
return None
else:
logging.debug("no filtering algo specified, returning None")
return None
outlier_arr = np.nonzero(np.logical_not(filtering_algo.inlier_mask_))
logging.debug("After first filter, inliers = %s, outliers = %s of type %s" %
(filtering_algo.inlier_mask_, outlier_arr, type(outlier_arr)))
if outlier_arr[0].shape[0] == 0:
sel_algo = filtering_algo
else:
recomputed_speeds_df = recalc_speed(with_speeds_df[filtering_algo.inlier_mask_])
recomputed_threshold = outlier_algo.get_threshold(recomputed_speeds_df)
logging.info("After first round, recomputed max = %s, recomputed threshold = %s" %
(recomputed_speeds_df.speed.max(), recomputed_threshold))
# assert recomputed_speeds_df[recomputed_speeds_df.speed > recomputed_threshold].shape[0] == 0, "After first round, still have outliers %s" % recomputed_speeds_df[recomputed_speeds_df.speed > recomputed_threshold]
if recomputed_speeds_df[recomputed_speeds_df.speed > recomputed_threshold].shape[0] == 0:
logging.info("No outliers after first round, default algo worked, to_delete = %s" %
np.nonzero(np.logical_not(filtering_algo.inlier_mask_)))
sel_algo = filtering_algo
else:
logging.info("After first round, still have outliers %s" % recomputed_speeds_df[recomputed_speeds_df.speed > recomputed_threshold])
if backup_filtering_algo is None or recomputed_speeds_df.speed.max() < MACH1:
logging.debug("backup algo is %s, max < MACH1, so returning default algo outliers %s" %
(backup_filtering_algo, np.nonzero(np.logical_not(filtering_algo.inlier_mask_))))
sel_algo = filtering_algo
else:
backup_filtering_algo.filter(with_speeds_df)
recomputed_speeds_df = recalc_speed(with_speeds_df[backup_filtering_algo.inlier_mask_])
recomputed_threshold = outlier_algo.get_threshold(recomputed_speeds_df)
logging.info("After second round, max = %s, recomputed threshold = %s" %
(recomputed_speeds_df.speed.max(), recomputed_threshold))
# assert recomputed_speeds_df[recomputed_speeds_df.speed > recomputed_threshold].shape[0] == 0, "After first round, still have outliers %s" % recomputed_speeds_df[recomputed_speeds_df.speed > recomputed_threshold]
if recomputed_speeds_df[recomputed_speeds_df.speed > recomputed_threshold].shape[0] == 0:
logging.info("After second round, no outliers, returning backup to delete %s" %
np.nonzero(np.logical_not(backup_filtering_algo.inlier_mask_)))
sel_algo = backup_filtering_algo
else:
logging.info("After second round, still have outliers %s" % recomputed_speeds_df[recomputed_speeds_df.speed > recomputed_threshold])
if recomputed_speeds_df.speed.max() < MACH1:
logging.debug("But they are all < %s, so returning backup to delete %s" %
(MACH1, np.nonzero(np.logical_not(backup_filtering_algo.inlier_mask_))))
sel_algo = backup_filtering_algo
else:
logging.info("And they are also > %s, backup algo also failed, returning default to delete = %s" %
(MACH1, np.nonzero(np.logical_not(filtering_algo.inlier_mask_))))
sel_algo = filtering_algo


def get_filtered_points(section_df, outlier_algo, filtering_algo):
"""
Filter the points that correspond to the section object that is passed in.
The section object is an AttrDict with the startTs and endTs fields.
Returns a filtered df with the index after the initial filter for accuracy
TODO: Switch this to the section wrapper object going forward
TODO: Note that here, we assume that the data has already been chunked into sections.
But really, we need to filter (at least for accuracy) before segmenting in
order to avoid issues like https://github.com/e-mission/e-mission-data-collection/issues/45
"""
with_speeds_df = add_dist_heading_speed(section_df)
# if filtering algo is none, there's nothing that can use the max speed
if outlier_algo is not None and filtering_algo is not None:
maxSpeed = outlier_algo.get_threshold(with_speeds_df)
# TODO: Is this the best way to do this? Or should I pass this in as an argument to filter?
# Or create an explicit set_speed() method?
# Or pass the outlier_algo as the parameter to the filtering_algo?
filtering_algo.maxSpeed = maxSpeed
if filtering_algo is not None:
try:
filtering_algo.filter(with_speeds_df)
return with_speeds_df[filtering_algo.inlier_mask_]
to_delete_mask = np.logical_not(sel_algo.inlier_mask_)
logging.info("After all checks, inlier mask = %s, outlier_mask = %s" %
(np.nonzero(sel_algo.inlier_mask_), np.nonzero(to_delete_mask)))
return (sel_algo, with_speeds_df[to_delete_mask])
except Exception as e:
logging.info("Caught error %s while processing section, skipping..." % e)
return with_speeds_df
logging.exception("Caught error %s while processing section, skipping..." % e)
return (None, None)
else:
return with_speeds_df
logging.debug("no filtering algo specified, returning None")
return (None, None)

def _ios_fill_fake_data(locs_df):
diff_ts = locs_df.ts.diff()
Expand Down
Loading

0 comments on commit b7749d0

Please sign in to comment.