Skip to content

Commit

Permalink
Add edge segment size to filter out change points that are observed o…
Browse files Browse the repository at this point in the history
…n the data edge (#28780)

* Add edge_segment_length

* Add issue to remove workaround

* Move is_edge_change_point into find_latest_change_point_index

* Fix python formatting
  • Loading branch information
AnandInguva authored Oct 7, 2023
1 parent b117ac8 commit d5b8fb8
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 1 deletion.
1 change: 1 addition & 0 deletions sdks/python/apache_beam/testing/analyzers/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,4 @@
}]

_ANOMALY_MARKER = ' <---- Anomaly'
_EDGE_SEGMENT_SIZE = 3
10 changes: 10 additions & 0 deletions sdks/python/apache_beam/testing/analyzers/perf_analysis_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from apache_beam.testing.analyzers import github_issues_utils
from apache_beam.testing.analyzers.perf_analysis_utils import BigQueryMetricsFetcher
from apache_beam.testing.analyzers.perf_analysis_utils import is_change_point_in_valid_window
from apache_beam.testing.analyzers.perf_analysis_utils import is_edge_change_point
from apache_beam.testing.analyzers.perf_analysis_utils import is_perf_alert
from apache_beam.testing.analyzers.perf_analysis_utils import e_divisive
from apache_beam.testing.analyzers.perf_analysis_utils import filter_change_points_by_median_threshold
Expand Down Expand Up @@ -242,6 +243,15 @@ def read_csv(path):
metric_values, change_points)
self.assertEqual(len(valid_points), 0)

def test_change_point_on_edge_segment(self):
data = [1] * 50 + [100]
change_points = find_change_points(data)
self.assertEqual(change_points, [50])

self.assertEqual(is_edge_change_point(change_points[0], len(data)), True)

self.assertEqual(find_latest_change_point_index(data), None)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
Expand Down
31 changes: 30 additions & 1 deletion sdks/python/apache_beam/testing/analyzers/perf_analysis_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,22 @@ def find_latest_change_point_index(metric_values: List[Union[float, int]]):
if not change_points_indices:
return None
change_points_indices.sort()
return change_points_indices[-1]
# Remove the change points that are at the edges of the data.
# https://github.com/apache/beam/issues/28757
# Remove this workaround once we have a good solution to deal
# with the edge change points.
change_point_index = change_points_indices[-1]
if is_edge_change_point(change_point_index,
len(metric_values),
constants._EDGE_SEGMENT_SIZE):
logging.info(
'The change point %s is located at the edge of the data with an edge '
'segment size of %s. This change point will be ignored for now, '
'awaiting additional data. Should the change point persist after '
'gathering more data, an alert will be raised.' %
(change_point_index, constants._EDGE_SEGMENT_SIZE))
return None
return change_point_index


def publish_issue_metadata_to_big_query(issue_metadata, table_name):
Expand Down Expand Up @@ -231,6 +246,20 @@ def filter_change_points_by_median_threshold(
return valid_change_points


def is_edge_change_point(
change_point_index,
data_size,
edge_segment_size=constants._EDGE_SEGMENT_SIZE):
"""
Removes the change points that are at the edges of the data.
Args:
change_point_index: Index of the change point.
data_size: Size of the data.
edge_segment_size: Size of the edge segment.
"""
return change_point_index > data_size - edge_segment_size


class MetricsFetcher(metaclass=abc.ABCMeta):
@abc.abstractmethod
def fetch_metric_data(
Expand Down

0 comments on commit d5b8fb8

Please sign in to comment.