-
-
Notifications
You must be signed in to change notification settings - Fork 21
/
rebase_migration.py
224 lines (194 loc) · 8.24 KB
/
rebase_migration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
from __future__ import annotations
import argparse
import ast
import re
import shutil
import subprocess
from pathlib import Path
from typing import Any
from django.apps import apps
from django.core.management import BaseCommand
from django.core.management import CommandError
from django.db import DatabaseError
from django.db import connections
from django.db.migrations.recorder import MigrationRecorder
from django_linear_migrations.apps import MigrationDetails
from django_linear_migrations.apps import is_first_party_app_config
class Command(BaseCommand):
help = (
"Fix a conflict in your migration history by rebasing the conflicting"
+ " migration on to the end of the app's migration history."
)
# Checks disabled because the django-linear-migrations' checks would
# prevent us continuing
requires_system_checks: list[str] = []
def add_arguments(self, parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"app_label",
help="Specify the app label to rebase the migration for.",
)
def handle(self, *args: Any, app_label: str, **options: Any) -> None:
app_config = apps.get_app_config(app_label)
if not is_first_party_app_config(app_config):
raise CommandError(f"{app_label!r} is not a first-party app.")
migration_details = MigrationDetails(app_label)
max_migration_txt = migration_details.dir / "max_migration.txt"
if not max_migration_txt.exists():
raise CommandError(f"{app_label} does not have a max_migration.txt.")
migration_names = find_migration_names(
max_migration_txt.read_text().splitlines()
)
if migration_names is None:
raise CommandError(
f"{app_label}'s max_migration.txt does not seem to contain a"
+ " merge conflict."
)
merged_migration_name, rebased_migration_name = migration_names
if merged_migration_name not in migration_details.names:
raise CommandError(
f"Parsed {merged_migration_name!r} as the already-merged"
+ f" migration name from {app_label}'s max_migration.txt, but"
+ " this migration does not exist."
)
if rebased_migration_name not in migration_details.names:
raise CommandError(
f"Parsed {rebased_migration_name!r} as the rebased migration"
+ f" name from {app_label}'s max_migration.txt, but this"
+ " migration does not exist."
)
rebased_migration_filename = f"{rebased_migration_name}.py"
rebased_migration_path = migration_details.dir / rebased_migration_filename
if not rebased_migration_path.exists():
raise CommandError(
f"Detected {rebased_migration_filename!r} as the rebased"
+ " migration filename, but it does not exist."
)
if migration_applied(app_label, rebased_migration_name):
raise CommandError(
f"Detected {rebased_migration_name} as the rebased migration,"
+ " but it is applied to the local database. Undo the rebase,"
+ " reverse the migration, and try again."
)
content = rebased_migration_path.read_text()
split_result = re.split(
r"(?<=dependencies = )(\[.*?\])",
content,
maxsplit=1,
flags=re.DOTALL,
)
if len(split_result) != 3:
raise CommandError(
"Could not find dependencies = [...] in"
+ f" {rebased_migration_filename!r}"
)
before_deps, deps, after_deps = split_result
try:
dependencies_module = ast.parse(deps)
except SyntaxError:
raise CommandError(
f"Encountered a SyntaxError trying to parse 'dependencies = {deps}'."
)
dependencies_node = dependencies_module.body[0]
assert isinstance(dependencies_node, ast.Expr)
dependencies = dependencies_node.value
assert isinstance(dependencies, ast.List)
new_dependencies = ast.List(elts=[])
num_this_app_dependencies = 0
for dependency in dependencies.elts:
# Skip swappable_dependency calls, other dynamically defined
# dependencies, and bad definitions
if (
not isinstance(dependency, (ast.Tuple, ast.List))
or len(dependency.elts) != 2
or not all(
isinstance(el, ast.Constant) and isinstance(el.value, str)
for el in dependency.elts
)
):
new_dependencies.elts.append(dependency)
continue
dependency_app_label_node = dependency.elts[0]
assert isinstance(dependency_app_label_node, ast.Constant)
dependency_app_label = dependency_app_label_node.value
assert isinstance(dependency_app_label, str)
if dependency_app_label == app_label:
num_this_app_dependencies += 1
new_dependencies.elts.append(
ast.Tuple(
elts=[
ast.Constant(app_label),
ast.Constant(merged_migration_name),
]
)
)
else:
new_dependencies.elts.append(dependency)
if num_this_app_dependencies != 1:
raise CommandError(
f"Cannot edit {rebased_migration_filename!r} since it has "
+ f"{num_this_app_dependencies} dependencies within "
+ f"{app_label}."
)
new_content = before_deps + ast.unparse(new_dependencies) + after_deps
merged_number, _merged_rest = merged_migration_name.split("_", 1)
_rebased_number, rebased_rest = rebased_migration_name.split("_", 1)
new_number = int(merged_number) + 1
new_name = str(new_number).zfill(4) + "_" + rebased_rest
new_path_parts = rebased_migration_path.parts[:-1] + (f"{new_name}.py",)
new_path = Path(*new_path_parts)
rebased_migration_path.rename(new_path)
new_path.write_text(new_content)
max_migration_txt.write_text(f"{new_name}\n")
black_path = shutil.which("black")
if black_path: # pragma: no cover
subprocess.run(
[black_path, "--fast", "--", new_path],
capture_output=True,
)
self.stdout.write(
f"Renamed {rebased_migration_path.parts[-1]} to {new_path.parts[-1]},"
+ " updated its dependencies, and updated max_migration.txt."
)
def find_migration_names(max_migration_lines: list[str]) -> tuple[str, str] | None:
lines = max_migration_lines
if len(lines) <= 1:
return None
if not lines[0].startswith("<<<<<<<"):
return None
if not lines[-1].startswith(">>>>>>>"):
return None
migration_names = (lines[1].strip(), lines[-2].strip())
if is_merge_in_progress():
# During the merge 'ours' and 'theirs' are swapped in comparison with rebase
migration_names = (migration_names[1], migration_names[0])
return migration_names
def is_merge_in_progress() -> bool:
try:
subprocess.run(
["git", "rev-parse", "--verify", "MERGE_HEAD"],
capture_output=True,
check=True,
text=True,
)
except (FileNotFoundError, subprocess.SubprocessError):
# Either:
# - `git` is not available, or broken
# - there is no git repository
# - no merge head exists, so assume rebasing
return False
# Merged head exists, we are merging
return True
def migration_applied(app_label: str, migration_name: str) -> bool:
Migration = MigrationRecorder.Migration
for alias in connections:
try:
if (
Migration.objects.using(alias)
.filter(app=app_label, name=migration_name)
.exists()
):
return True
except DatabaseError:
# django_migrations table does not exist -> no migrations applied
pass
return False