Skip to content

Commit

Permalink
support filter flag for buildkite (skypilot-org#4534)
Browse files Browse the repository at this point in the history
* support filter flag

* default value

* support filter flag from env

* change AST to pytest --collect-only

* fix logic

* regex fix
  • Loading branch information
zpoint authored Jan 9, 2025
1 parent 379711b commit 9b1ec55
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 87 deletions.
167 changes: 80 additions & 87 deletions .buildkite/generate_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
clouds are not supported yet, smoke tests for those clouds are not generated.
"""

import ast
import os
import random
import re
import subprocess
from typing import Any, Dict, List, Optional

import click
from conftest import cloud_to_pytest_keyword
from conftest import default_clouds_to_run
import yaml
Expand Down Expand Up @@ -60,18 +62,8 @@
'edit directly.\n')


def _get_full_decorator_path(decorator: ast.AST) -> str:
"""Recursively get the full path of a decorator."""
if isinstance(decorator, ast.Attribute):
return f'{_get_full_decorator_path(decorator.value)}.{decorator.attr}'
elif isinstance(decorator, ast.Name):
return decorator.id
elif isinstance(decorator, ast.Call):
return _get_full_decorator_path(decorator.func)
raise ValueError(f'Unknown decorator type: {type(decorator)}')


def _extract_marked_tests(file_path: str) -> Dict[str, List[str]]:
def _extract_marked_tests(file_path: str,
filter_marks: List[str]) -> Dict[str, List[str]]:
"""Extract test functions and filter clouds using pytest.mark
from a Python test file.
Expand All @@ -85,80 +77,69 @@ def _extract_marked_tests(file_path: str) -> Dict[str, List[str]]:
rerun failures. Additionally, the parallelism would be controlled by pytest
instead of the buildkite job queue.
"""
with open(file_path, 'r', encoding='utf-8') as file:
tree = ast.parse(file.read(), filename=file_path)

for node in ast.walk(tree):
for child in ast.iter_child_nodes(node):
setattr(child, 'parent', node)

cmd = f'pytest {file_path} --collect-only'
output = subprocess.run(cmd, shell=True, capture_output=True, text=True)
matches = re.findall('Collected .+?\.py::(.+?) with marks: \[(.*?)\]',
output.stdout)
function_name_marks_map = {}
for function_name, marks in matches:
function_name = re.sub(r'\[.*?\]', '', function_name)
marks = marks.replace('\'', '').split(',')
marks = [i.strip() for i in marks]
if function_name not in function_name_marks_map:
function_name_marks_map[function_name] = set(marks)
else:
function_name_marks_map[function_name].update(marks)
function_cloud_map = {}
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef) and node.name.startswith('test_'):
class_name = None
if hasattr(node, 'parent') and isinstance(node.parent,
ast.ClassDef):
class_name = node.parent.name

clouds_to_include = []
clouds_to_exclude = []
is_serve_test = False
for decorator in node.decorator_list:
if isinstance(decorator, ast.Call):
# We only need to consider the decorator with no arguments
# to extract clouds.
filter_marks = set(filter_marks)
for function_name, marks in function_name_marks_map.items():
if filter_marks and not filter_marks & marks:
continue
clouds_to_include = []
clouds_to_exclude = []
is_serve_test = 'serve' in marks
for mark in marks:
if mark.startswith('no_'):
clouds_to_exclude.append(mark[3:])
else:
if mark not in PYTEST_TO_CLOUD_KEYWORD:
# This mark does not specify a cloud, so we skip it.
continue
full_path = _get_full_decorator_path(decorator)
if full_path.startswith('pytest.mark.'):
assert isinstance(decorator, ast.Attribute)
suffix = decorator.attr
if suffix.startswith('no_'):
clouds_to_exclude.append(suffix[3:])
else:
if suffix == 'serve':
is_serve_test = True
continue
if suffix not in PYTEST_TO_CLOUD_KEYWORD:
# This mark does not specify a cloud, so we skip it.
continue
clouds_to_include.append(
PYTEST_TO_CLOUD_KEYWORD[suffix])
clouds_to_include = (clouds_to_include if clouds_to_include else
DEFAULT_CLOUDS_TO_RUN)
clouds_to_include = [
cloud for cloud in clouds_to_include
if cloud not in clouds_to_exclude
]
cloud_queue_map = SERVE_CLOUD_QUEUE_MAP if is_serve_test else CLOUD_QUEUE_MAP
final_clouds_to_include = [
cloud for cloud in clouds_to_include if cloud in cloud_queue_map
]
if clouds_to_include and not final_clouds_to_include:
print(f'Warning: {file_path}:{node.name} '
f'is marked to run on {clouds_to_include}, '
f'but we do not have credentials for those clouds. '
f'Skipped.')
continue
if clouds_to_include != final_clouds_to_include:
excluded_clouds = set(clouds_to_include) - set(
final_clouds_to_include)
print(
f'Warning: {file_path}:{node.name} '
f'is marked to run on {clouds_to_include}, '
f'but we only have credentials for {final_clouds_to_include}. '
f'clouds {excluded_clouds} are skipped.')
function_name = (f'{class_name}::{node.name}'
if class_name else node.name)
function_cloud_map[function_name] = (final_clouds_to_include, [
cloud_queue_map[cloud] for cloud in final_clouds_to_include
])
clouds_to_include.append(PYTEST_TO_CLOUD_KEYWORD[mark])

clouds_to_include = (clouds_to_include
if clouds_to_include else DEFAULT_CLOUDS_TO_RUN)
clouds_to_include = [
cloud for cloud in clouds_to_include
if cloud not in clouds_to_exclude
]
cloud_queue_map = SERVE_CLOUD_QUEUE_MAP if is_serve_test else CLOUD_QUEUE_MAP
final_clouds_to_include = [
cloud for cloud in clouds_to_include if cloud in cloud_queue_map
]
if clouds_to_include and not final_clouds_to_include:
print(
f'Warning: {function_name} is marked to run on {clouds_to_include}, '
f'but we do not have credentials for those clouds. Skipped.')
continue
if clouds_to_include != final_clouds_to_include:
excluded_clouds = set(clouds_to_include) - set(
final_clouds_to_include)
print(
f'Warning: {function_name} is marked to run on {clouds_to_include}, '
f'but we only have credentials for {final_clouds_to_include}. '
f'clouds {excluded_clouds} are skipped.')
function_cloud_map[function_name] = (final_clouds_to_include, [
cloud_queue_map[cloud] for cloud in final_clouds_to_include
])
return function_cloud_map


def _generate_pipeline(test_file: str) -> Dict[str, Any]:
def _generate_pipeline(test_file: str,
filter_marks: List[str]) -> Dict[str, Any]:
"""Generate a Buildkite pipeline from test files."""
steps = []
function_cloud_map = _extract_marked_tests(test_file)
function_cloud_map = _extract_marked_tests(test_file, filter_marks)
for test_function, clouds_and_queues in function_cloud_map.items():
for cloud, queue in zip(*clouds_and_queues):
step = {
Expand Down Expand Up @@ -194,12 +175,12 @@ def _dump_pipeline_to_file(yaml_file_path: str,
yaml.dump(final_pipeline, file, default_flow_style=False)


def _convert_release(test_files: List[str]):
def _convert_release(test_files: List[str], filter_marks: List[str]):
yaml_file_path = '.buildkite/pipeline_smoke_tests_release.yaml'
output_file_pipelines = []
for test_file in test_files:
print(f'Converting {test_file} to {yaml_file_path}')
pipeline = _generate_pipeline(test_file)
pipeline = _generate_pipeline(test_file, filter_marks)
output_file_pipelines.append(pipeline)
print(f'Converted {test_file} to {yaml_file_path}\n\n')
# Enable all clouds by default for release pipeline.
Expand All @@ -208,15 +189,15 @@ def _convert_release(test_files: List[str]):
extra_env={cloud: '1' for cloud in CLOUD_QUEUE_MAP})


def _convert_quick_tests_core(test_files: List[str]):
def _convert_quick_tests_core(test_files: List[str], filter_marks: List[str]):
yaml_file_path = '.buildkite/pipeline_smoke_tests_quick_tests_core.yaml'
output_file_pipelines = []
for test_file in test_files:
print(f'Converting {test_file} to {yaml_file_path}')
# We want enable all clouds by default for each test function
# for pre-merge. And let the author controls which clouds
# to run by parameter.
pipeline = _generate_pipeline(test_file)
pipeline = _generate_pipeline(test_file, filter_marks)
pipeline['steps'].append({
'label': 'Backward compatibility test',
'command': 'bash tests/backward_compatibility_tests.sh',
Expand All @@ -231,7 +212,12 @@ def _convert_quick_tests_core(test_files: List[str]):
extra_env={'SKYPILOT_SUPPRESS_SENSITIVE_LOG': '1'})


def main():
@click.command()
@click.option(
'--filter-marks',
type=str,
help='Filter to include only a subset of pytest marks, e.g., managed_jobs')
def main(filter_marks):
test_files = os.listdir('tests/smoke_tests')
release_files = []
quick_tests_core_files = []
Expand All @@ -244,8 +230,15 @@ def main():
else:
release_files.append(test_file_path)

_convert_release(release_files)
_convert_quick_tests_core(quick_tests_core_files)
filter_marks = filter_marks or os.getenv('FILTER_MARKS')
if filter_marks:
filter_marks = filter_marks.split(',')
print(f'Filter marks: {filter_marks}')
else:
filter_marks = []

_convert_release(release_files, filter_marks)
_convert_quick_tests_core(quick_tests_core_files, filter_marks)


if __name__ == '__main__':
Expand Down
8 changes: 8 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,11 @@ def aws_config_region(monkeypatch: pytest.MonkeyPatch) -> str:
if isinstance(ssh_proxy_command, dict) and ssh_proxy_command:
region = list(ssh_proxy_command.keys())[0]
return region


def pytest_collection_modifyitems(config, items):
if config.option.collectonly:
for item in items:
full_name = item.nodeid
marks = [mark.name for mark in item.iter_markers()]
print(f"Collected {full_name} with marks: {marks}")

0 comments on commit 9b1ec55

Please sign in to comment.