Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add edge segment size to filter out change points that are observed on the data edge #28780

Merged
merged 5 commits into from
Oct 7, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdks/python/apache_beam/testing/analyzers/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,4 @@
}]

_ANOMALY_MARKER = ' <---- Anomaly'
_EDGE_SEGMENT_SIZE = 3
15 changes: 15 additions & 0 deletions sdks/python/apache_beam/testing/analyzers/perf_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from apache_beam.testing.analyzers.perf_analysis_utils import find_latest_change_point_index
from apache_beam.testing.analyzers.perf_analysis_utils import get_existing_issues_data
from apache_beam.testing.analyzers.perf_analysis_utils import is_change_point_in_valid_window
from apache_beam.testing.analyzers.perf_analysis_utils import is_edge_change_point
from apache_beam.testing.analyzers.perf_analysis_utils import is_perf_alert
from apache_beam.testing.analyzers.perf_analysis_utils import publish_issue_metadata_to_big_query
from apache_beam.testing.analyzers.perf_analysis_utils import read_test_config
Expand Down Expand Up @@ -84,6 +85,20 @@ def run_change_point_analysis(params, test_name, big_query_metrics_fetcher):
if not change_point_index:
logging.info("Change point is not detected for the test %s" % test_name)
return False
# Remove the change points that are at the edges of the data.
# https://github.com/apache/beam/issues/28757
# Remove this workaround once we have a good solution to deal
# with the edge change points.
if is_edge_change_point(change_point_index,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move this logic into find_latest_change_point_index since there are other considerations in that function to filter out noise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could also skip adding extra param to find_latest_change_point_index until we have a usecase to customize it with a non-default value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

len(metric_values),
constants._EDGE_SEGMENT_SIZE):
logging.info(
'The change point %s is located at the edge of the data with an edge '
'segment size of %s. This change point will be ignored for now, '
'awaiting additional data. Should the change point persist after '
'gathering more data, an alert will be raised.' %
(change_point_index, constants._EDGE_SEGMENT_SIZE))
return None
# since timestamps are ordered in ascending order and
# num_runs_in_change_point_window refers to the latest runs,
# latest_change_point_run can help determine if the change point
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from apache_beam.testing.analyzers import constants
from apache_beam.testing.analyzers import github_issues_utils
from apache_beam.testing.analyzers.perf_analysis_utils import is_change_point_in_valid_window
from apache_beam.testing.analyzers.perf_analysis_utils import is_edge_change_point
from apache_beam.testing.analyzers.perf_analysis_utils import is_perf_alert
from apache_beam.testing.analyzers.perf_analysis_utils import e_divisive
from apache_beam.testing.analyzers.perf_analysis_utils import filter_change_points_by_median_threshold
Expand Down Expand Up @@ -241,6 +242,13 @@ def read_csv(path):
metric_values, change_points)
self.assertEqual(len(valid_points), 0)

def test_change_point_on_edge_segment(self):
data = [1] * 50 + [100]
change_points = find_change_points(data)
self.assertEqual(change_points, [50])

self.assertEqual(is_edge_change_point(change_points[0], len(data)), True)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
Expand Down
14 changes: 14 additions & 0 deletions sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,17 @@ def filter_change_points_by_median_threshold(
if relative_change > threshold:
valid_change_points.append(idx)
return valid_change_points


def is_edge_change_point(
change_point_index,
data_size,
edge_segment_size=constants._EDGE_SEGMENT_SIZE):
"""
Removes the change points that are at the edges of the data.
Args:
change_point_index: Index of the change point.
data_size: Size of the data.
edge_segment_size: Size of the edge segment.
"""
return change_point_index > data_size - edge_segment_size
Loading