Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add edge segment size to filter out change points that are observed on the data edge #28780

Merged
merged 5 commits into from
Oct 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdks/python/apache_beam/testing/analyzers/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,4 @@
}]

_ANOMALY_MARKER = ' <---- Anomaly'
_EDGE_SEGMENT_SIZE = 3
10 changes: 10 additions & 0 deletions sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from apache_beam.testing.analyzers import github_issues_utils
from apache_beam.testing.analyzers.perf_analysis_utils import BigQueryMetricsFetcher
from apache_beam.testing.analyzers.perf_analysis_utils import is_change_point_in_valid_window
from apache_beam.testing.analyzers.perf_analysis_utils import is_edge_change_point
from apache_beam.testing.analyzers.perf_analysis_utils import is_perf_alert
from apache_beam.testing.analyzers.perf_analysis_utils import e_divisive
from apache_beam.testing.analyzers.perf_analysis_utils import filter_change_points_by_median_threshold
Expand Down Expand Up @@ -242,6 +243,15 @@ def read_csv(path):
metric_values, change_points)
self.assertEqual(len(valid_points), 0)

def test_change_point_on_edge_segment(self):
data = [1] * 50 + [100]
change_points = find_change_points(data)
self.assertEqual(change_points, [50])

self.assertEqual(is_edge_change_point(change_points[0], len(data)), True)

self.assertEqual(find_latest_change_point_index(data), None)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
Expand Down
31 changes: 30 additions & 1 deletion sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,22 @@ def find_latest_change_point_index(metric_values: List[Union[float, int]]):
if not change_points_indices:
return None
change_points_indices.sort()
return change_points_indices[-1]
# Remove the change points that are at the edges of the data.
# https://github.com/apache/beam/issues/28757
# Remove this workaround once we have a good solution to deal
# with the edge change points.
change_point_index = change_points_indices[-1]
if is_edge_change_point(change_point_index,
len(metric_values),
constants._EDGE_SEGMENT_SIZE):
logging.info(
'The change point %s is located at the edge of the data with an edge '
'segment size of %s. This change point will be ignored for now, '
'awaiting additional data. Should the change point persist after '
'gathering more data, an alert will be raised.' %
(change_point_index, constants._EDGE_SEGMENT_SIZE))
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be considering prior change_points_indices? That is, instead of returning change_points_indices[-1], we return the latest change point that is not in the edge segment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should return the latest change point itself. Even if we ignore it for example 3 days, it gets filed.

We also ignore change points that are occurred 14 days before. Most often change_points_indices[-2] lies outside of that window or doesn't exist. So we could just follow the current approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. yes, i think it should get file eventually.

return change_point_index


def publish_issue_metadata_to_big_query(issue_metadata, table_name):
Expand Down Expand Up @@ -231,6 +246,20 @@ def filter_change_points_by_median_threshold(
return valid_change_points


def is_edge_change_point(
change_point_index,
data_size,
edge_segment_size=constants._EDGE_SEGMENT_SIZE):
"""
Removes the change points that are at the edges of the data.
Args:
change_point_index: Index of the change point.
data_size: Size of the data.
edge_segment_size: Size of the edge segment.
"""
return change_point_index > data_size - edge_segment_size


class MetricsFetcher(metaclass=abc.ABCMeta):
@abc.abstractmethod
def fetch_metric_data(
Expand Down
Loading