From 3ad04376c132081222739c77938b20f58999293c Mon Sep 17 00:00:00 2001 From: Rohan Gudimetla Date: Fri, 2 Aug 2024 03:07:29 +0530 Subject: [PATCH] rebase-migration: Make it possible to handle chains of migrations. Previously, the rebase_migration command was not able to rebase chains of migrations in a single app. This commit introduces a new flag -- "new" which basically is used for the first migration you create, and it wipes out your migration history in max_migration.txt, and writes up that first migration in the first line. Any further migrations added without the flag are simply added under each other in the max_migration.txt. This would allow the rebase_migration command to access the chain of migrations that need to be rebased in a commit, and will rebase them accordingly. Fixes #27 --- src/django_linear_migrations/apps.py | 45 ++--- .../management/commands/makemigrations.py | 59 ++++-- .../management/commands/rebase_migration.py | 105 +++++++--- tests/test_checks.py | 13 +- tests/test_makemigrations.py | 41 +++- tests/test_rebase_migration.py | 185 ++++++++++++++---- 6 files changed, 316 insertions(+), 132 deletions(-) diff --git a/src/django_linear_migrations/apps.py b/src/django_linear_migrations/apps.py index fc599cd..e66d248 100644 --- a/src/django_linear_migrations/apps.py +++ b/src/django_linear_migrations/apps.py @@ -188,41 +188,28 @@ def check_max_migration_files( ) continue - max_migration_txt_lines = max_migration_txt.read_text().strip().splitlines() - if len(max_migration_txt_lines) > 1: - errors.append( - Error( - id="dlm.E002", - msg=f"{app_label}'s max_migration.txt contains multiple lines.", - hint=( - "This may be the result of a git merge. Fix the file" - + " to contain only the name of the latest migration," - + " or maybe use the 'rebase-migration' command." - ), + migration_txt_lines = max_migration_txt.read_text().strip().splitlines() + for migration_name in migration_txt_lines: + if migration_name not in migration_details.names: + errors.append( + Error( + id="dlm.E003", + msg=( + f"{app_label}'s max_migration.txt points to" + + f" non-existent migration {migration_name!r}." + ), + hint=( + "Edit the max_migration.txt to contain the latest" + + " migration's name." + ), + ) ) - ) - continue - - max_migration_name = max_migration_txt_lines[0] - if max_migration_name not in migration_details.names: - errors.append( - Error( - id="dlm.E003", - msg=( - f"{app_label}'s max_migration.txt points to" - + f" non-existent migration {max_migration_name!r}." - ), - hint=( - "Edit the max_migration.txt to contain the latest" - + " migration's name." - ), - ) - ) continue real_max_migration_name = [ name for gp_app_label, name in graph_plan if gp_app_label == app_label ][-1] + max_migration_name = migration_txt_lines[-1] if max_migration_name != real_max_migration_name: errors.append( Error( diff --git a/src/django_linear_migrations/management/commands/makemigrations.py b/src/django_linear_migrations/management/commands/makemigrations.py index d8f2652..b7c1ed2 100644 --- a/src/django_linear_migrations/management/commands/makemigrations.py +++ b/src/django_linear_migrations/management/commands/makemigrations.py @@ -1,6 +1,9 @@ from __future__ import annotations +from typing import Any + import django +from django.core.management.base import CommandParser from django.core.management.commands.makemigrations import Command as BaseCommand from django.db.migrations import Migration @@ -9,6 +12,18 @@ class Command(BaseCommand): + def add_arguments(self, parser: CommandParser) -> None: + super().add_arguments(parser) + parser.add_argument( + "--new", + action="store_true", + help="Create and register the migration as the first migration of the commit.", + ) + + def handle(self, *app_labels: str, **options: Any) -> None: + self.first_migration = options["new"] + super().handle(*app_labels, **options) + if django.VERSION >= (4, 2): def write_migration_files( @@ -22,7 +37,7 @@ def write_migration_files( changes, update_previous_migration_paths, ) - _post_write_migration_files(self.dry_run, changes) + self._post_write_migration_files(self.dry_run, changes) else: @@ -31,25 +46,33 @@ def write_migration_files( # type: ignore[misc,override] changes: dict[str, list[Migration]], ) -> None: super().write_migration_files(changes) - _post_write_migration_files(self.dry_run, changes) + self._post_write_migration_files(self.dry_run, changes) + def _post_write_migration_files( + self, dry_run: bool, changes: dict[str, list[Migration]] + ) -> None: + if dry_run: + return -def _post_write_migration_files( - dry_run: bool, changes: dict[str, list[Migration]] -) -> None: - if dry_run: - return + first_party_app_labels = { + app_config.label for app_config in first_party_app_configs() + } - first_party_app_labels = { - app_config.label for app_config in first_party_app_configs() - } + for app_label, app_migrations in changes.items(): + if app_label not in first_party_app_labels: + continue - for app_label, app_migrations in changes.items(): - if app_label not in first_party_app_labels: - continue + # Reload required as we've generated changes + migration_details = MigrationDetails(app_label, do_reload=True) + max_migration_name = app_migrations[-1].name + max_migration_txt = migration_details.dir / "max_migration.txt" - # Reload required as we've generated changes - migration_details = MigrationDetails(app_label, do_reload=True) - max_migration_name = app_migrations[-1].name - max_migration_txt = migration_details.dir / "max_migration.txt" - max_migration_txt.write_text(max_migration_name + "\n") + if self.first_migration: + max_migration_txt.write_text(max_migration_name + "\n") + self.first_migration = False + continue + + current_version_migrations = max_migration_txt.read_text() + max_migration_txt.write_text( + current_version_migrations + max_migration_name + "\n" + ) diff --git a/src/django_linear_migrations/management/commands/rebase_migration.py b/src/django_linear_migrations/management/commands/rebase_migration.py index a61b15e..e0bbb9a 100644 --- a/src/django_linear_migrations/management/commands/rebase_migration.py +++ b/src/django_linear_migrations/management/commands/rebase_migration.py @@ -46,27 +46,43 @@ def handle(self, *args: Any, app_label: str, **options: Any) -> None: if not max_migration_txt.exists(): raise CommandError(f"{app_label} does not have a max_migration.txt.") - migration_names = find_migration_names( - max_migration_txt.read_text().splitlines() - ) + migration_names = find_migration_names(max_migration_txt.read_text()) if migration_names is None: raise CommandError( f"{app_label}'s max_migration.txt does not seem to contain a" + " merge conflict." ) - merged_migration_name, rebased_migration_name = migration_names - if merged_migration_name not in migration_details.names: - raise CommandError( - f"Parsed {merged_migration_name!r} as the already-merged" - + f" migration name from {app_label}'s max_migration.txt, but" - + " this migration does not exist." - ) - if rebased_migration_name not in migration_details.names: - raise CommandError( - f"Parsed {rebased_migration_name!r} as the rebased migration" - + f" name from {app_label}'s max_migration.txt, but this" - + " migration does not exist." - ) + + merged_migration_names, rebased_migration_names = migration_names + + for merged_migration_name in merged_migration_names: + if merged_migration_name not in migration_details.names: + raise CommandError( + f"Parsed {merged_migration_name!r} as the already-merged" + + f" migration name from {app_label}'s max_migration.txt, but" + + " this migration does not exist." + ) + + for rebased_migration_name in rebased_migration_names: + if rebased_migration_name not in migration_details.names: + raise CommandError( + f"Parsed {rebased_migration_name!r} as the rebased migration" + + f" name from {app_label}'s max_migration.txt, but this" + + " migration does not exist." + ) + + self.last_migration_name = merged_migration_names[-1] + + first_migration = True + for rebased_migration_name in rebased_migration_names: + self.rebase_migration(app_label, rebased_migration_name, first_migration) + first_migration = False + + def rebase_migration( + self, app_label: str, rebased_migration_name: str, first_migration: bool + ) -> None: + migration_details = MigrationDetails(app_label) + max_migration_txt = migration_details.dir / "max_migration.txt" rebased_migration_filename = f"{rebased_migration_name}.py" rebased_migration_path = migration_details.dir / rebased_migration_filename @@ -136,7 +152,7 @@ def handle(self, *args: Any, app_label: str, **options: Any) -> None: ast.Tuple( elts=[ ast.Constant(app_label), - ast.Constant(merged_migration_name), + ast.Constant(self.last_migration_name), ] ) ) @@ -152,16 +168,23 @@ def handle(self, *args: Any, app_label: str, **options: Any) -> None: new_content = before_deps + ast_unparse(new_dependencies) + after_deps - merged_number, _merged_rest = merged_migration_name.split("_", 1) + last_merged_number, _merged_rest = self.last_migration_name.split("_", 1) _rebased_number, rebased_rest = rebased_migration_name.split("_", 1) - new_number = int(merged_number) + 1 + new_number = int(last_merged_number) + 1 new_name = str(new_number).zfill(4) + "_" + rebased_rest new_path_parts = rebased_migration_path.parts[:-1] + (f"{new_name}.py",) new_path = Path(*new_path_parts) rebased_migration_path.rename(new_path) new_path.write_text(new_content) - max_migration_txt.write_text(f"{new_name}\n") + + if first_migration: + max_migration_txt.write_text(f"{new_name}\n") + else: + current_version_migrations = max_migration_txt.read_text() + max_migration_txt.write_text(current_version_migrations + f"{new_name}\n") + + self.last_migration_name = new_name black_path = shutil.which("black") if black_path: # pragma: no cover @@ -176,19 +199,45 @@ def handle(self, *args: Any, app_label: str, **options: Any) -> None: ) -def find_migration_names(max_migration_lines: list[str]) -> tuple[str, str] | None: - lines = max_migration_lines - if len(lines) <= 1: +def find_migration_names( + current_version_migrations: str, +) -> tuple[list[str], list[str]] | None: + migrations_lines = current_version_migrations.strip().splitlines() + + if len(migrations_lines) <= 1: return None - if not lines[0].startswith("<<<<<<<"): + if not migrations_lines[0].startswith("<<<<<<<"): return None - if not lines[-1].startswith(">>>>>>>"): + if not migrations_lines[-1].startswith(">>>>>>>"): return None - migration_names = (lines[1].strip(), lines[-2].strip()) + + merged_migration_names = [] + rebased_migration_names = [] + + index = 0 + while index < len(migrations_lines): + if migrations_lines[index].startswith("<<<<<<<"): + index += 1 + while not migrations_lines[index].startswith("======="): + if migrations_lines[index] == "|||||||": + while not migrations_lines[index].startswith("======="): + index += 1 + else: + merged_migration_names.append(migrations_lines[index]) + index += 1 + + index += 1 + + else: + while not migrations_lines[index].startswith(">>>>>>>"): + rebased_migration_names.append(migrations_lines[index]) + index += 1 + break + if is_merge_in_progress(): # During the merge 'ours' and 'theirs' are swapped in comparison with rebase - migration_names = (migration_names[1], migration_names[0]) - return migration_names + return (rebased_migration_names, merged_migration_names) + return (merged_migration_names, rebased_migration_names) def is_merge_in_progress() -> bool: diff --git a/tests/test_checks.py b/tests/test_checks.py index 7f1a3f6..fb0db5f 100644 --- a/tests/test_checks.py +++ b/tests/test_checks.py @@ -64,17 +64,6 @@ def test_dlm_E001(self): assert result[0].id == "dlm.E001" assert result[0].msg == "testapp's max_migration.txt does not exist." - def test_dlm_E002(self): - (self.migrations_dir / "__init__.py").touch() - (self.migrations_dir / "0001_initial.py").write_text(empty_migration) - (self.migrations_dir / "max_migration.txt").write_text("line1\nline2\n") - - result = check_max_migration_files() - - assert len(result) == 1 - assert result[0].id == "dlm.E002" - assert result[0].msg == "testapp's max_migration.txt contains multiple lines." - def test_dlm_E003(self): (self.migrations_dir / "__init__.py").touch() (self.migrations_dir / "0001_initial.py").write_text(empty_migration) @@ -82,7 +71,7 @@ def test_dlm_E003(self): result = check_max_migration_files() - assert len(result) == 1 + assert len(result) == 2 assert result[0].id == "dlm.E003" assert result[0].msg == ( "testapp's max_migration.txt points to non-existent migration" diff --git a/tests/test_makemigrations.py b/tests/test_makemigrations.py index 5eaa042..8b35873 100644 --- a/tests/test_makemigrations.py +++ b/tests/test_makemigrations.py @@ -40,7 +40,7 @@ def test_dry_run(self): assert not max_migration_txt.exists() def test_creates_max_migration_txt(self): - out, err, returncode = self.call_command("testapp") + out, err, returncode = self.call_command("testapp", "--new") assert returncode == 0 max_migration_txt = self.migrations_dir / "max_migration.txt" @@ -48,7 +48,7 @@ def test_creates_max_migration_txt(self): @unittest.skipUnless(django.VERSION >= (4, 2), "--update added in Django 4.2") def test_update(self): - self.call_command("testapp") + self.call_command("testapp", "--new") max_migration_txt = self.migrations_dir / "max_migration.txt" assert max_migration_txt.read_text() == "0001_initial\n" @@ -59,10 +59,12 @@ class Meta: out, err, returncode = self.call_command("--update", "testapp") assert returncode == 0 max_migration_txt = self.migrations_dir / "max_migration.txt" - assert max_migration_txt.read_text() == "0001_initial_updated\n" + assert max_migration_txt.read_text() == "0001_initial\n0001_initial_updated\n" def test_creates_max_migration_txt_given_name(self): - out, err, returncode = self.call_command("testapp", "--name", "brand_new") + out, err, returncode = self.call_command( + "testapp", "--name", "brand_new", "--new" + ) assert returncode == 0 max_migration_txt = self.migrations_dir / "max_migration.txt" @@ -89,7 +91,36 @@ class Migration(migrations.Migration): assert returncode == 0 max_migration_txt = self.migrations_dir / "max_migration.txt" - assert max_migration_txt.read_text() == "0002_create_book\n" + assert max_migration_txt.read_text() == "0001_initial\n0002_create_book\n" + + def test_create_max_migration_txt_with_multiple_migrations(self): + max_migration_txt = self.migrations_dir / "max_migration.txt" + (self.migrations_dir / "__init__.py").touch() + + out, err, returncode = self.call_command("testapp", "--name", "first", "--new") + + assert returncode == 0 + assert max_migration_txt.read_text() == "0001_first\n" + + # Creating a second migration on without the `new` flag keeps + # the first migration, while updates the last migration in the + # "max_migration.txt" + out, err, returncode = self.call_command( + "testapp", "--empty", "--name", "second" + ) + + assert returncode == 0 + assert max_migration_txt.read_text() == "0001_first\n0002_second\n" + + # Creating a third migration on without the `new` flag keeps + # the first migration, while updates the last migration in the + # "max_migration.txt" + out, err, returncode = self.call_command( + "testapp", "--empty", "--name", "third" + ) + + assert returncode == 0 + assert max_migration_txt.read_text() == "0001_first\n0002_second\n0003_third\n" @override_settings(FIRST_PARTY_APPS=[]) def test_skips_creating_max_migration_txt_for_non_first_party_app(self): diff --git a/tests/test_rebase_migration.py b/tests/test_rebase_migration.py index 1987c83..98ed746 100644 --- a/tests/test_rebase_migration.py +++ b/tests/test_rebase_migration.py @@ -368,6 +368,66 @@ class Migration(migrations.Migration): """ ) + def test_success_with_rebasing_multiple_migrations(self): + (self.migrations_dir / "__init__.py").touch() + (self.migrations_dir / "0001_initial.py").write_text(empty_migration) + (self.migrations_dir / "0002_second.py").write_text( + dedent( + """\ + from django.db import migrations + + class Migration(migrations.Migration): + dependencies = [ + ('testapp', '0001_initial'), + ] + operations = [] + """ + ) + ) + (self.migrations_dir / "0003_longer_titles.py").write_text( + dedent( + """\ + from django.db import migrations + + class Migration(migrations.Migration): + dependencies = [ + ('testapp', '0002_second'), + ] + operations = [] + """ + ) + ) + (self.migrations_dir / "0002_author_nicknames.py").touch() + max_migration_txt = self.migrations_dir / "max_migration.txt" + max_migration_txt.write_text( + dedent( + """\ + <<<<<<< HEAD + 0002_author_nicknames + ======= + 0002_second + 0003_longer_titles + >>>>>>> 123456789 (Increase Book title length) + """ + ) + ) + + out, err, returncode = self.call_command("testapp") + + assert out == ( + "Renamed 0002_second.py to 0003_second.py," + + " updated its dependencies, and updated max_migration.txt.\n" + + "Renamed 0003_longer_titles.py to 0004_longer_titles.py," + + " updated its dependencies, and updated max_migration.txt.\n" + ) + assert err == "" + assert returncode == 0 + max_migration_txt = self.migrations_dir / "max_migration.txt" + assert max_migration_txt.read_text() == "0003_second\n0004_longer_titles\n" + + assert not (self.migrations_dir / "0002_second.py").exists() + assert not (self.migrations_dir / "0003_longer_titles.py").exists() + def test_success_swappable_dependency(self): (self.migrations_dir / "__init__.py").touch() (self.migrations_dir / "0001_initial.py").write_text(empty_migration) @@ -429,70 +489,115 @@ class Migration(migrations.Migration): class FindMigrationNamesTests(SimpleTestCase): def test_none_when_no_lines(self): - result = module.find_migration_names([]) + result = module.find_migration_names("") assert result is None def test_none_when_no_first_marker(self): - result = module.find_migration_names(["not_a_marker", "0002_author_nicknames"]) + result = module.find_migration_names("not_a_marker\n0002_author_nicknames") assert result is None def test_none_when_no_second_marker(self): - result = module.find_migration_names(["<<<<<<<", "0002_author_nicknames"]) + result = module.find_migration_names("<<<<<<<\n0002_author_nicknames") assert result is None def test_works_with_two_way_merge_during_rebase(self): result = module.find_migration_names( - [ - "<<<<<<<", - "0002_author_nicknames", - "=======", - "0002_longer_titles", - ">>>>>>>", - ] + dedent( + """\ + <<<<<<< + 0002_author_nicknames + ======= + 0002_longer_titles + >>>>>>> + """ + ) + ) + assert result == (["0002_author_nicknames"], ["0002_longer_titles"]) + + def test_works_migration_chain_with_two_way_merge_during_rebase(self): + result = module.find_migration_names( + dedent( + """\ + <<<<<<< HEAD + 0002_author_nicknames + ======= + 0002_second + 0003_longer_titles + >>>>>>> 123456789 (Increase Book title length) + """ + ) + ) + assert result == ( + ["0002_author_nicknames"], + ["0002_second", "0003_longer_titles"], + ) + + def test_works_migration_chain_with_two_way_merge_during_merge(self): + with mock.patch.object(module, "is_merge_in_progress", return_value=True): + result = module.find_migration_names( + dedent( + """\ + <<<<<<< HEAD + 0002_author_nicknames + ======= + 0002_second + 0003_longer_titles + >>>>>>> 123456789 (Increase Book title length) + """ + ) + ) + assert result == ( + ["0002_second", "0003_longer_titles"], + ["0002_author_nicknames"], ) - assert result == ("0002_author_nicknames", "0002_longer_titles") def test_works_with_three_way_merge_during_rebase(self): result = module.find_migration_names( - [ - "<<<<<<<", - "0002_author_nicknames", - "|||||||", - "0001_initial", - "=======", - "0002_longer_titles", - ">>>>>>>", - ] - ) - assert result == ("0002_author_nicknames", "0002_longer_titles") + dedent( + """\ + <<<<<<< + 0002_author_nicknames + ||||||| + 0001_initial + ======= + 0002_longer_titles + >>>>>>> + """ + ) + ) + assert result == (["0002_author_nicknames"], ["0002_longer_titles"]) def test_works_with_two_way_merge_during_merge(self): with mock.patch.object(module, "is_merge_in_progress", return_value=True): result = module.find_migration_names( - [ - "<<<<<<<", - "0002_longer_titles", - "=======", - "0002_author_nicknames", - ">>>>>>>", - ] + dedent( + """\ + <<<<<<< + 0002_longer_titles + ======= + 0002_author_nicknames + >>>>>>> + """ + ) ) - assert result == ("0002_author_nicknames", "0002_longer_titles") + assert result == (["0002_author_nicknames"], ["0002_longer_titles"]) def test_works_with_three_way_merge_during_merge(self): with mock.patch.object(module, "is_merge_in_progress", return_value=True): result = module.find_migration_names( - [ - "<<<<<<<", - "0002_longer_titles", - "|||||||", - "0001_initial", - "=======", - "0002_author_nicknames", - ">>>>>>>", - ] + dedent( + """\ + <<<<<<< + 0002_longer_titles + ||||||| + 0001_initial + ======= + 0002_author_nicknames + >>>>>>> + """ + ) ) - assert result == ("0002_author_nicknames", "0002_longer_titles") + assert result == (["0002_author_nicknames"], ["0002_longer_titles"]) class IsMergeInProgressTests(SimpleTestCase):