diff --git a/sdks/python/apache_beam/testing/analyzers/constants.py b/sdks/python/apache_beam/testing/analyzers/constants.py index 8f8bdf13300c..09ab5c595908 100644 --- a/sdks/python/apache_beam/testing/analyzers/constants.py +++ b/sdks/python/apache_beam/testing/analyzers/constants.py @@ -72,3 +72,4 @@ }] _ANOMALY_MARKER = ' <---- Anomaly' +_EDGE_SEGMENT_SIZE = 3 diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py index 9c7921300d9d..15344ab13b3a 100644 --- a/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py @@ -34,6 +34,7 @@ from apache_beam.testing.analyzers import github_issues_utils from apache_beam.testing.analyzers.perf_analysis_utils import BigQueryMetricsFetcher from apache_beam.testing.analyzers.perf_analysis_utils import is_change_point_in_valid_window + from apache_beam.testing.analyzers.perf_analysis_utils import is_edge_change_point from apache_beam.testing.analyzers.perf_analysis_utils import is_perf_alert from apache_beam.testing.analyzers.perf_analysis_utils import e_divisive from apache_beam.testing.analyzers.perf_analysis_utils import filter_change_points_by_median_threshold @@ -242,6 +243,15 @@ def read_csv(path): metric_values, change_points) self.assertEqual(len(valid_points), 0) + def test_change_point_on_edge_segment(self): + data = [1] * 50 + [100] + change_points = find_change_points(data) + self.assertEqual(change_points, [50]) + + self.assertEqual(is_edge_change_point(change_points[0], len(data)), True) + + self.assertEqual(find_latest_change_point_index(data), None) + if __name__ == '__main__': logging.getLogger().setLevel(logging.DEBUG) diff --git a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py index f9604c490fc0..91c339a766d2 100644 --- a/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py +++ b/sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py @@ -143,7 +143,22 @@ def find_latest_change_point_index(metric_values: List[Union[float, int]]): if not change_points_indices: return None change_points_indices.sort() - return change_points_indices[-1] + # Remove the change points that are at the edges of the data. + # https://github.com/apache/beam/issues/28757 + # Remove this workaround once we have a good solution to deal + # with the edge change points. + change_point_index = change_points_indices[-1] + if is_edge_change_point(change_point_index, + len(metric_values), + constants._EDGE_SEGMENT_SIZE): + logging.info( + 'The change point %s is located at the edge of the data with an edge ' + 'segment size of %s. This change point will be ignored for now, ' + 'awaiting additional data. Should the change point persist after ' + 'gathering more data, an alert will be raised.' % + (change_point_index, constants._EDGE_SEGMENT_SIZE)) + return None + return change_point_index def publish_issue_metadata_to_big_query(issue_metadata, table_name): @@ -231,6 +246,20 @@ def filter_change_points_by_median_threshold( return valid_change_points +def is_edge_change_point( + change_point_index, + data_size, + edge_segment_size=constants._EDGE_SEGMENT_SIZE): + """ + Removes the change points that are at the edges of the data. + Args: + change_point_index: Index of the change point. + data_size: Size of the data. + edge_segment_size: Size of the edge segment. + """ + return change_point_index > data_size - edge_segment_size + + class MetricsFetcher(metaclass=abc.ABCMeta): @abc.abstractmethod def fetch_metric_data(