-
Notifications
You must be signed in to change notification settings - Fork 14.5k
/
update_example_dags_paths.py
executable file
·118 lines (102 loc) · 4.86 KB
/
update_example_dags_paths.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import os
import re
from pathlib import Path
import requests
from rich.console import Console
from rich.progress import Progress
if __name__ not in ("__main__", "__mp_main__"):
raise SystemExit(
"This file is intended to be executed as an executable program. You cannot use it as a module."
f"To run this script, run the ./{__file__} command [FILE] ..."
)
console = Console(color_system="standard", width=200)
AIRFLOW_SOURCES_ROOT = Path(__file__).parents[3].resolve()
EXAMPLE_DAGS_URL_MATCHER = re.compile(
r"^(.*)(https://github.com/apache/airflow/tree/(.*)/providers/src/airflow/providers/(.*)/example_dags)(/?\".*)$"
)
SYSTEM_TESTS_URL_MATCHER = re.compile(
r"^(.*)(https://github.com/apache/airflow/tree/(.*)/providers/tests/system/(.*))(/?\".*)$"
)
def check_if_url_exists(url: str) -> bool: # type: ignore[return]
return True # uncomment to check URLs
response = requests.head(url, allow_redirects=True)
if response.status_code == 200:
return True
if response.status_code == 404:
return False
console.print(f"[red]Unexpected error received: {response.status_code}[/]")
response.raise_for_status()
def replace_match(file: str, line: str, provider: str, version: str) -> str | None:
for index, matcher in enumerate([EXAMPLE_DAGS_URL_MATCHER, SYSTEM_TESTS_URL_MATCHER]):
match = matcher.match(line)
if match:
url_path_to_dir = match.group(4)
branch = match.group(3)
if branch.startswith("providers-") and branch.endswith(f"/{version}"):
console.print(f"[green]Already corrected[/]: {provider}:{version}")
continue
system_tests_url = (
f"https://github.com/apache/airflow/tree/providers-{provider}/{version}"
f"/providers/tests/system/{url_path_to_dir}"
)
example_dags_url = (
f"https://github.com/apache/airflow/tree/providers-{provider}/{version}"
f"/providers/src/airflow/providers/{url_path_to_dir}/example_dags"
)
if check_if_url_exists(system_tests_url) and index == 1:
new_line = re.sub(matcher, r"\1" + system_tests_url + r"\5", line)
elif check_if_url_exists(example_dags_url) and index == 0:
new_line = re.sub(matcher, r"\1" + example_dags_url + r"\5", line)
else:
console.print(
f"[yellow] Neither example dags nor system tests folder"
f" exists for {provider}:{version} -> skipping:[/]"
)
console.print(line)
return line
if line != new_line:
console.print(f"[yellow] Replacing in {file}[/]\n{line.strip()}\n{new_line.strip()}")
return new_line
return line
def find_matches(_file: Path, provider: str, version: str):
lines = _file.read_text().splitlines(keepends=True)
new_lines = []
for line in lines:
new_line = replace_match(str(_file), line, provider, version)
if new_line:
new_lines.append(new_line)
_file.write_text("".join(new_lines))
if __name__ == "__main__":
curdir: Path = Path(os.curdir).resolve()
dirs: list[Path] = [p for p in curdir.iterdir() if p.is_dir()]
with Progress(console=console) as progress:
task = progress.add_task(f"Updating {len(dirs)}", total=len(dirs))
for directory in dirs:
if directory.name.startswith("apache-airflow-providers-"):
provider = directory.name[len("apache-airflow-providers-") :]
console.print(f"[bright_blue] Processing {directory}")
for version_dir in directory.iterdir():
if version_dir.is_dir():
console.print(version_dir.name)
for candidate_file in version_dir.rglob("*.html"):
if candidate_file.exists():
find_matches(candidate_file, provider, version_dir.name)
progress.advance(task)