Skip to content

Commit

Permalink
rebase-migration: Make it possible to handle chains of migrations.
Browse files Browse the repository at this point in the history
Previously, the rebase_migration command was not able to rebase
chains of migrations in a single app.

This commit introduces a new flag -- "new" which basically is used
for the first migration you create, and it wipes out your migration
history in max_migration.txt, and writes up that first migration in
the first line. Any further migrations added without the flag are
simply added under each other in the max_migration.txt.

This would allow the rebase_migration command to access the
chain of migrations that need to be rebased in a commit, and
will rebase them accordingly.

Fixes adamchainz#27
  • Loading branch information
roanster007 committed Aug 9, 2024
1 parent ec1f97a commit 3ad0437
Show file tree
Hide file tree
Showing 6 changed files with 316 additions and 132 deletions.
45 changes: 16 additions & 29 deletions src/django_linear_migrations/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,41 +188,28 @@ def check_max_migration_files(
)
continue

max_migration_txt_lines = max_migration_txt.read_text().strip().splitlines()
if len(max_migration_txt_lines) > 1:
errors.append(
Error(
id="dlm.E002",
msg=f"{app_label}'s max_migration.txt contains multiple lines.",
hint=(
"This may be the result of a git merge. Fix the file"
+ " to contain only the name of the latest migration,"
+ " or maybe use the 'rebase-migration' command."
),
migration_txt_lines = max_migration_txt.read_text().strip().splitlines()
for migration_name in migration_txt_lines:
if migration_name not in migration_details.names:
errors.append(
Error(
id="dlm.E003",
msg=(
f"{app_label}'s max_migration.txt points to"
+ f" non-existent migration {migration_name!r}."
),
hint=(
"Edit the max_migration.txt to contain the latest"
+ " migration's name."
),
)
)
)
continue

max_migration_name = max_migration_txt_lines[0]
if max_migration_name not in migration_details.names:
errors.append(
Error(
id="dlm.E003",
msg=(
f"{app_label}'s max_migration.txt points to"
+ f" non-existent migration {max_migration_name!r}."
),
hint=(
"Edit the max_migration.txt to contain the latest"
+ " migration's name."
),
)
)
continue

real_max_migration_name = [
name for gp_app_label, name in graph_plan if gp_app_label == app_label
][-1]
max_migration_name = migration_txt_lines[-1]
if max_migration_name != real_max_migration_name:
errors.append(
Error(
Expand Down
59 changes: 41 additions & 18 deletions src/django_linear_migrations/management/commands/makemigrations.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from __future__ import annotations

from typing import Any

import django
from django.core.management.base import CommandParser
from django.core.management.commands.makemigrations import Command as BaseCommand
from django.db.migrations import Migration

Expand All @@ -9,6 +12,18 @@


class Command(BaseCommand):
def add_arguments(self, parser: CommandParser) -> None:
super().add_arguments(parser)
parser.add_argument(
"--new",
action="store_true",
help="Create and register the migration as the first migration of the commit.",
)

def handle(self, *app_labels: str, **options: Any) -> None:
self.first_migration = options["new"]
super().handle(*app_labels, **options)

if django.VERSION >= (4, 2):

def write_migration_files(
Expand All @@ -22,7 +37,7 @@ def write_migration_files(
changes,
update_previous_migration_paths,
)
_post_write_migration_files(self.dry_run, changes)
self._post_write_migration_files(self.dry_run, changes)

else:

Expand All @@ -31,25 +46,33 @@ def write_migration_files( # type: ignore[misc,override]
changes: dict[str, list[Migration]],
) -> None:
super().write_migration_files(changes)
_post_write_migration_files(self.dry_run, changes)
self._post_write_migration_files(self.dry_run, changes)

def _post_write_migration_files(
self, dry_run: bool, changes: dict[str, list[Migration]]
) -> None:
if dry_run:
return

def _post_write_migration_files(
dry_run: bool, changes: dict[str, list[Migration]]
) -> None:
if dry_run:
return
first_party_app_labels = {
app_config.label for app_config in first_party_app_configs()
}

first_party_app_labels = {
app_config.label for app_config in first_party_app_configs()
}
for app_label, app_migrations in changes.items():
if app_label not in first_party_app_labels:
continue

for app_label, app_migrations in changes.items():
if app_label not in first_party_app_labels:
continue
# Reload required as we've generated changes
migration_details = MigrationDetails(app_label, do_reload=True)
max_migration_name = app_migrations[-1].name
max_migration_txt = migration_details.dir / "max_migration.txt"

# Reload required as we've generated changes
migration_details = MigrationDetails(app_label, do_reload=True)
max_migration_name = app_migrations[-1].name
max_migration_txt = migration_details.dir / "max_migration.txt"
max_migration_txt.write_text(max_migration_name + "\n")
if self.first_migration:
max_migration_txt.write_text(max_migration_name + "\n")
self.first_migration = False
continue

current_version_migrations = max_migration_txt.read_text()
max_migration_txt.write_text(
current_version_migrations + max_migration_name + "\n"
)
105 changes: 77 additions & 28 deletions src/django_linear_migrations/management/commands/rebase_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,27 +46,43 @@ def handle(self, *args: Any, app_label: str, **options: Any) -> None:
if not max_migration_txt.exists():
raise CommandError(f"{app_label} does not have a max_migration.txt.")

migration_names = find_migration_names(
max_migration_txt.read_text().splitlines()
)
migration_names = find_migration_names(max_migration_txt.read_text())
if migration_names is None:
raise CommandError(
f"{app_label}'s max_migration.txt does not seem to contain a"
+ " merge conflict."
)
merged_migration_name, rebased_migration_name = migration_names
if merged_migration_name not in migration_details.names:
raise CommandError(
f"Parsed {merged_migration_name!r} as the already-merged"
+ f" migration name from {app_label}'s max_migration.txt, but"
+ " this migration does not exist."
)
if rebased_migration_name not in migration_details.names:
raise CommandError(
f"Parsed {rebased_migration_name!r} as the rebased migration"
+ f" name from {app_label}'s max_migration.txt, but this"
+ " migration does not exist."
)

merged_migration_names, rebased_migration_names = migration_names

for merged_migration_name in merged_migration_names:
if merged_migration_name not in migration_details.names:
raise CommandError(
f"Parsed {merged_migration_name!r} as the already-merged"
+ f" migration name from {app_label}'s max_migration.txt, but"
+ " this migration does not exist."
)

for rebased_migration_name in rebased_migration_names:
if rebased_migration_name not in migration_details.names:
raise CommandError(
f"Parsed {rebased_migration_name!r} as the rebased migration"
+ f" name from {app_label}'s max_migration.txt, but this"
+ " migration does not exist."
)

self.last_migration_name = merged_migration_names[-1]

first_migration = True
for rebased_migration_name in rebased_migration_names:
self.rebase_migration(app_label, rebased_migration_name, first_migration)
first_migration = False

def rebase_migration(
self, app_label: str, rebased_migration_name: str, first_migration: bool
) -> None:
migration_details = MigrationDetails(app_label)
max_migration_txt = migration_details.dir / "max_migration.txt"

rebased_migration_filename = f"{rebased_migration_name}.py"
rebased_migration_path = migration_details.dir / rebased_migration_filename
Expand Down Expand Up @@ -136,7 +152,7 @@ def handle(self, *args: Any, app_label: str, **options: Any) -> None:
ast.Tuple(
elts=[
ast.Constant(app_label),
ast.Constant(merged_migration_name),
ast.Constant(self.last_migration_name),
]
)
)
Expand All @@ -152,16 +168,23 @@ def handle(self, *args: Any, app_label: str, **options: Any) -> None:

new_content = before_deps + ast_unparse(new_dependencies) + after_deps

merged_number, _merged_rest = merged_migration_name.split("_", 1)
last_merged_number, _merged_rest = self.last_migration_name.split("_", 1)
_rebased_number, rebased_rest = rebased_migration_name.split("_", 1)
new_number = int(merged_number) + 1
new_number = int(last_merged_number) + 1
new_name = str(new_number).zfill(4) + "_" + rebased_rest
new_path_parts = rebased_migration_path.parts[:-1] + (f"{new_name}.py",)
new_path = Path(*new_path_parts)

rebased_migration_path.rename(new_path)
new_path.write_text(new_content)
max_migration_txt.write_text(f"{new_name}\n")

if first_migration:
max_migration_txt.write_text(f"{new_name}\n")
else:
current_version_migrations = max_migration_txt.read_text()
max_migration_txt.write_text(current_version_migrations + f"{new_name}\n")

self.last_migration_name = new_name

black_path = shutil.which("black")
if black_path: # pragma: no cover
Expand All @@ -176,19 +199,45 @@ def handle(self, *args: Any, app_label: str, **options: Any) -> None:
)


def find_migration_names(max_migration_lines: list[str]) -> tuple[str, str] | None:
lines = max_migration_lines
if len(lines) <= 1:
def find_migration_names(
current_version_migrations: str,
) -> tuple[list[str], list[str]] | None:
migrations_lines = current_version_migrations.strip().splitlines()

if len(migrations_lines) <= 1:
return None
if not lines[0].startswith("<<<<<<<"):
if not migrations_lines[0].startswith("<<<<<<<"):
return None
if not lines[-1].startswith(">>>>>>>"):
if not migrations_lines[-1].startswith(">>>>>>>"):
return None
migration_names = (lines[1].strip(), lines[-2].strip())

merged_migration_names = []
rebased_migration_names = []

index = 0
while index < len(migrations_lines):
if migrations_lines[index].startswith("<<<<<<<"):
index += 1
while not migrations_lines[index].startswith("======="):
if migrations_lines[index] == "|||||||":
while not migrations_lines[index].startswith("======="):
index += 1
else:
merged_migration_names.append(migrations_lines[index])
index += 1

index += 1

else:
while not migrations_lines[index].startswith(">>>>>>>"):
rebased_migration_names.append(migrations_lines[index])
index += 1
break

if is_merge_in_progress():
# During the merge 'ours' and 'theirs' are swapped in comparison with rebase
migration_names = (migration_names[1], migration_names[0])
return migration_names
return (rebased_migration_names, merged_migration_names)
return (merged_migration_names, rebased_migration_names)


def is_merge_in_progress() -> bool:
Expand Down
13 changes: 1 addition & 12 deletions tests/test_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,25 +64,14 @@ def test_dlm_E001(self):
assert result[0].id == "dlm.E001"
assert result[0].msg == "testapp's max_migration.txt does not exist."

def test_dlm_E002(self):
(self.migrations_dir / "__init__.py").touch()
(self.migrations_dir / "0001_initial.py").write_text(empty_migration)
(self.migrations_dir / "max_migration.txt").write_text("line1\nline2\n")

result = check_max_migration_files()

assert len(result) == 1
assert result[0].id == "dlm.E002"
assert result[0].msg == "testapp's max_migration.txt contains multiple lines."

def test_dlm_E003(self):
(self.migrations_dir / "__init__.py").touch()
(self.migrations_dir / "0001_initial.py").write_text(empty_migration)
(self.migrations_dir / "max_migration.txt").write_text("0001_start\n")

result = check_max_migration_files()

assert len(result) == 1
assert len(result) == 2
assert result[0].id == "dlm.E003"
assert result[0].msg == (
"testapp's max_migration.txt points to non-existent migration"
Expand Down
41 changes: 36 additions & 5 deletions tests/test_makemigrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ def test_dry_run(self):
assert not max_migration_txt.exists()

def test_creates_max_migration_txt(self):
out, err, returncode = self.call_command("testapp")
out, err, returncode = self.call_command("testapp", "--new")

assert returncode == 0
max_migration_txt = self.migrations_dir / "max_migration.txt"
assert max_migration_txt.read_text() == "0001_initial\n"

@unittest.skipUnless(django.VERSION >= (4, 2), "--update added in Django 4.2")
def test_update(self):
self.call_command("testapp")
self.call_command("testapp", "--new")
max_migration_txt = self.migrations_dir / "max_migration.txt"
assert max_migration_txt.read_text() == "0001_initial\n"

Expand All @@ -59,10 +59,12 @@ class Meta:
out, err, returncode = self.call_command("--update", "testapp")
assert returncode == 0
max_migration_txt = self.migrations_dir / "max_migration.txt"
assert max_migration_txt.read_text() == "0001_initial_updated\n"
assert max_migration_txt.read_text() == "0001_initial\n0001_initial_updated\n"

def test_creates_max_migration_txt_given_name(self):
out, err, returncode = self.call_command("testapp", "--name", "brand_new")
out, err, returncode = self.call_command(
"testapp", "--name", "brand_new", "--new"
)

assert returncode == 0
max_migration_txt = self.migrations_dir / "max_migration.txt"
Expand All @@ -89,7 +91,36 @@ class Migration(migrations.Migration):

assert returncode == 0
max_migration_txt = self.migrations_dir / "max_migration.txt"
assert max_migration_txt.read_text() == "0002_create_book\n"
assert max_migration_txt.read_text() == "0001_initial\n0002_create_book\n"

def test_create_max_migration_txt_with_multiple_migrations(self):
max_migration_txt = self.migrations_dir / "max_migration.txt"
(self.migrations_dir / "__init__.py").touch()

out, err, returncode = self.call_command("testapp", "--name", "first", "--new")

assert returncode == 0
assert max_migration_txt.read_text() == "0001_first\n"

# Creating a second migration on without the `new` flag keeps
# the first migration, while updates the last migration in the
# "max_migration.txt"
out, err, returncode = self.call_command(
"testapp", "--empty", "--name", "second"
)

assert returncode == 0
assert max_migration_txt.read_text() == "0001_first\n0002_second\n"

# Creating a third migration on without the `new` flag keeps
# the first migration, while updates the last migration in the
# "max_migration.txt"
out, err, returncode = self.call_command(
"testapp", "--empty", "--name", "third"
)

assert returncode == 0
assert max_migration_txt.read_text() == "0001_first\n0002_second\n0003_third\n"

@override_settings(FIRST_PARTY_APPS=[])
def test_skips_creating_max_migration_txt_for_non_first_party_app(self):
Expand Down
Loading

0 comments on commit 3ad0437

Please sign in to comment.