From f90ee9e433943fce609db68f89a81a8e399eeff8 Mon Sep 17 00:00:00 2001 From: Gerrod Ubben Date: Fri, 15 Jul 2022 21:13:19 -0400 Subject: [PATCH] Add sha256 uniqueness constraint to CollectionVersion fixes: #1052 --- CHANGES/1052.feature | 2 + .../0043_collectionversion_sha256.py | 19 +++ .../0044_collectionversion_sha256_migrate.py | 62 ++++++++ .../0045_collectionversion_unique_sha256.py | 22 +++ pulp_ansible/app/models.py | 7 +- pulp_ansible/app/serializers.py | 6 +- pulp_ansible/app/tasks/collections.py | 141 ++++++++---------- pulp_ansible/app/tasks/git.py | 31 +++- .../test_crud_collection_versions.py | 4 +- 9 files changed, 206 insertions(+), 88 deletions(-) create mode 100644 CHANGES/1052.feature create mode 100644 pulp_ansible/app/migrations/0043_collectionversion_sha256.py create mode 100644 pulp_ansible/app/migrations/0044_collectionversion_sha256_migrate.py create mode 100644 pulp_ansible/app/migrations/0045_collectionversion_unique_sha256.py diff --git a/CHANGES/1052.feature b/CHANGES/1052.feature new file mode 100644 index 000000000..bcab5c4a3 --- /dev/null +++ b/CHANGES/1052.feature @@ -0,0 +1,2 @@ +CollectionVersion global uniqueness constraint is now its sha256 digest. Repository level uniqueness +is still (namespace, name, version). diff --git a/pulp_ansible/app/migrations/0043_collectionversion_sha256.py b/pulp_ansible/app/migrations/0043_collectionversion_sha256.py new file mode 100644 index 000000000..ef49a5e68 --- /dev/null +++ b/pulp_ansible/app/migrations/0043_collectionversion_sha256.py @@ -0,0 +1,19 @@ +# Generated by Django 3.2.14 on 2022-07-15 22:51 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('ansible', '0042_ansiblerepository_gpgkey'), + ] + + operations = [ + migrations.AddField( + model_name='collectionversion', + name='sha256', + field=models.CharField(default='', max_length=64), + preserve_default=False, + ), + ] diff --git a/pulp_ansible/app/migrations/0044_collectionversion_sha256_migrate.py b/pulp_ansible/app/migrations/0044_collectionversion_sha256_migrate.py new file mode 100644 index 000000000..5369a95ee --- /dev/null +++ b/pulp_ansible/app/migrations/0044_collectionversion_sha256_migrate.py @@ -0,0 +1,62 @@ +# Generated by Django 3.2.14 on 2022-07-21 22:35 + +from django.db import migrations, transaction + + +def add_sha256_to_current_models(apps, schema_editor): + """Adds the sha256 to current CollectionVersion models.""" + CollectionVersion = apps.get_model('ansible', 'CollectionVersion') + ContentArtifact = apps.get_model('core', 'ContentArtifact') + collection_bulk = {} + collections_to_update = [] + collections_on_demand = [] + + def find_and_update_sha256(): + # All content have a ContentArtifact + content_artifacts = ContentArtifact.objects.filter(content__in=collection_bulk.keys()).select_related("artifact", "content") + for content_artifact in content_artifacts: + found_collection = collection_bulk[content_artifact.content.pk] + # The ContentArtifact could point to an Artifact or be on-demand + if (artifact := getattr(content_artifact, "artifact")) and artifact.sha256 is not None: + found_collection.sha256 = artifact.sha256 + collections_to_update.append(found_collection) + else: + collections_on_demand.append(found_collection) + collection_bulk.clear() + + for collection_version in CollectionVersion.objects.only("pk", "sha256").iterator(): + if not collection_version.sha256: + collection_bulk[collection_version.pk] = collection_version + if len(collection_bulk) == 1024: + find_and_update_sha256() + if len(collections_to_update) >= 1024: + with transaction.atomic(): + CollectionVersion.objects.bulk_update(collections_to_update, ["sha256",]) + collections_to_update.clear() + # Update remaining collections + if len(collection_bulk) > 0: + find_and_update_sha256() + if len(collections_to_update) > 0: + with transaction.atomic(): + CollectionVersion.objects.bulk_update(collections_to_update, ["sha256",]) + + # If there are on-demand collections then the next migration will fail, so error here with + # helpful message on how to fix. No work will be performed by this migration on a second-run. + if len(collections_on_demand) > 0: + raise Exception( + f"On demand collections found. Please remove or upload/sync their data: " + f"{[c.pk for c in collections_on_demand]}" + ) + + +class Migration(migrations.Migration): + + atomic = False + + dependencies = [ + ('ansible', '0043_collectionversion_sha256'), + ] + + operations = [ + migrations.RunPython(add_sha256_to_current_models, migrations.RunPython.noop) + ] diff --git a/pulp_ansible/app/migrations/0045_collectionversion_unique_sha256.py b/pulp_ansible/app/migrations/0045_collectionversion_unique_sha256.py new file mode 100644 index 000000000..086d2f316 --- /dev/null +++ b/pulp_ansible/app/migrations/0045_collectionversion_unique_sha256.py @@ -0,0 +1,22 @@ +# Generated by Django 3.2.14 on 2022-07-21 23:05 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('ansible', '0044_collectionversion_sha256_migrate'), + ] + + operations = [ + migrations.AlterField( + model_name='collectionversion', + name='sha256', + field=models.CharField(db_index=True, max_length=64, null=False), + ), + migrations.AlterUniqueTogether( + name='collectionversion', + unique_together={('sha256',)}, + ), + ] diff --git a/pulp_ansible/app/models.py b/pulp_ansible/app/models.py index d4d7a10b6..bde8eb3b2 100644 --- a/pulp_ansible/app/models.py +++ b/pulp_ansible/app/models.py @@ -14,6 +14,7 @@ SigningService, Task, ) +from pulpcore.plugin.repo_version_utils import remove_duplicates, validate_repo_version from .downloaders import AnsibleDownloaderFactory log = getLogger(__name__) @@ -128,6 +129,7 @@ class CollectionVersion(Content): """ TYPE = "collection_version" + repo_key_fields = ("name", "namespace", "version") # Data Fields authors = psql_fields.ArrayField(models.CharField(max_length=64), default=list, editable=False) @@ -146,6 +148,7 @@ class CollectionVersion(Content): repository = models.CharField(default="", blank=True, max_length=2000, editable=False) version = models.CharField(max_length=128, editable=False) requires_ansible = models.CharField(null=True, max_length=255) + sha256 = models.CharField(max_length=64, db_index=True, null=False) is_highest = models.BooleanField(editable=False, default=False) @@ -169,7 +172,7 @@ def relative_path(self): class Meta: default_related_name = "%(app_label)s_%(model_name)s" - unique_together = ("namespace", "name", "version") + unique_together = ("sha256",) constraints = [ UniqueConstraint( fields=("collection", "is_highest"), @@ -317,6 +320,7 @@ class Meta: def finalize_new_version(self, new_version): """Finalize repo version.""" + remove_duplicates(new_version) removed_collection_versions = new_version.removed( base_version=new_version.base_version ).filter(pulp_type=CollectionVersion.get_pulp_type()) @@ -346,6 +350,7 @@ def finalize_new_version(self, new_version): ) new_version.remove_content(deprecations) + validate_repo_version(new_version) class AnsibleDistribution(Distribution): diff --git a/pulp_ansible/app/serializers.py b/pulp_ansible/app/serializers.py index 026b7a369..d8ab3b6a2 100644 --- a/pulp_ansible/app/serializers.py +++ b/pulp_ansible/app/serializers.py @@ -381,9 +381,9 @@ def validate(self, data): data = self.deferred_validate(data) sha256 = data["file"].hashers["sha256"].hexdigest() - artifact = Artifact.objects.filter(sha256=sha256).first() - if artifact: - ValidationError(_("Artifact already exists")) + cv = CollectionVersion.objects.filter(sha256=sha256).first() + if cv: + raise ValidationError(_("Collection Version already exists")) temp_file = PulpTemporaryFile.init_and_validate(data.pop("file")) temp_file.save() data["temp_file_pk"] = str(temp_file.pk) diff --git a/pulp_ansible/app/tasks/collections.py b/pulp_ansible/app/tasks/collections.py index e9aa7dd85..cb94b7522 100644 --- a/pulp_ansible/app/tasks/collections.py +++ b/pulp_ansible/app/tasks/collections.py @@ -87,54 +87,30 @@ async def declarative_content_from_git_repo(remote, url, git_ref=None, metadata_ gitrepo = Repo.clone_from(url, uuid4(), depth=1, multi_options=["--recurse-submodules"]) commit_sha = gitrepo.head.commit.hexsha metadata, artifact_path = sync_collection(gitrepo.working_dir, ".") - if not metadata_only: - artifact = Artifact.init_and_validate(artifact_path) - try: - await sync_to_async(artifact.save)() - except IntegrityError: - artifact = Artifact.objects.get(sha256=artifact.sha256) - metadata["artifact_url"] = reverse("artifacts-detail", args=[artifact.pk]) - metadata["artifact"] = artifact - else: - metadata["artifact"] = None - metadata["artifact_url"] = None - metadata["remote_artifact_url"] = "{}/commit/{}".format(url.rstrip("/"), commit_sha) + artifact = Artifact.init_and_validate(artifact_path) + metadata["sha256"] = artifact.sha256 + remote_artifact_url = "{}/commit/{}".format(url.rstrip("/"), commit_sha) - artifact = metadata["artifact"] - try: - collection_version = await sync_to_async(create_collection_from_importer)( - metadata, metadata_only=metadata_only - ) - await sync_to_async(ContentArtifact.objects.get_or_create)( - artifact=artifact, - content=collection_version, - relative_path=collection_version.relative_path, - ) - except ValidationError as e: - if e.args[0]["non_field_errors"][0].code == "unique": - namespace = metadata["metadata"]["namespace"] - name = metadata["metadata"]["name"] - version = metadata["metadata"]["version"] - else: - raise e - collection_version = await sync_to_async(CollectionVersion.objects.get)( - namespace=namespace, name=name, version=version - ) - if artifact is None: - artifact = Artifact() + collection_version = await sync_to_async(create_collection_from_importer)( + metadata, metadata_only=True + ) d_artifact = DeclarativeArtifact( artifact=artifact, - url=metadata["remote_artifact_url"], + url=remote_artifact_url, relative_path=collection_version.relative_path, remote=remote, deferred_download=metadata_only, ) # TODO: docs blob support?? + extra_data = {} + if metadata_only: + extra_data["d_artifact_files"] = {d_artifact: artifact_path} d_content = DeclarativeContent( content=collection_version, d_artifacts=[d_artifact], + extra_data=extra_data, ) return d_content @@ -314,6 +290,7 @@ def import_collection( artifact = Artifact.from_pulp_temporary_file(temp_file) temp_file = None importer_result["artifact_url"] = reverse("artifacts-detail", args=[artifact.pk]) + importer_result["sha256"] = artifact.sha256 collection_version = create_collection_from_importer(importer_result) collection_version.manifest = manifest_data collection_version.files = files_data @@ -348,51 +325,53 @@ def import_collection( def create_collection_from_importer(importer_result, metadata_only=False): """ Process results from importer. + + If ``metadata_only=True``, then this is being called in a sync pipeline, do not perform + any database operations, just return an unsaved CollectionVersion. """ collection_info = importer_result["metadata"] + tags = collection_info.pop("tags") + + # Remove fields not used by this model + collection_info.pop("license_file") + collection_info.pop("readme") + + # the importer returns many None values. We need to let the defaults in the model prevail + for key in ["description", "documentation", "homepage", "issues", "repository"]: + if collection_info[key] is None: + collection_info.pop(key) + + collection_version = CollectionVersion( + **collection_info, + requires_ansible=importer_result.get("requires_ansible"), + contents=importer_result["contents"], + docs_blob=importer_result["docs_blob"], + sha256=importer_result["sha256"], + ) - with transaction.atomic(): - collection, created = Collection.objects.get_or_create( - namespace=collection_info["namespace"], name=collection_info["name"] - ) - - tags = collection_info.pop("tags") - - # Remove fields not used by this model - collection_info.pop("license_file") - collection_info.pop("readme") - - # the importer returns many None values. We need to let the defaults in the model prevail - for key in ["description", "documentation", "homepage", "issues", "repository"]: - if collection_info[key] is None: - collection_info.pop(key) - - collection_version = CollectionVersion( - collection=collection, - **collection_info, - requires_ansible=importer_result.get("requires_ansible"), - contents=importer_result["contents"], - docs_blob=importer_result["docs_blob"], - ) + serializer_fields = CollectionVersionSerializer.Meta.fields + data = {k: v for k, v in collection_version.__dict__.items() if k in serializer_fields} + data["id"] = collection_version.pulp_id - serializer_fields = CollectionVersionSerializer.Meta.fields - data = {k: v for k, v in collection_version.__dict__.items() if k in serializer_fields} - data["id"] = collection_version.pulp_id - if not metadata_only: - data["artifact"] = importer_result["artifact_url"] + serializer = CollectionVersionSerializer(data=data) - serializer = CollectionVersionSerializer(data=data) + serializer.is_valid(raise_exception=True) - serializer.is_valid(raise_exception=True) - collection_version.save() + if not metadata_only: + with transaction.atomic(): + collection, created = Collection.objects.get_or_create( + namespace=collection_info["namespace"], name=collection_info["name"] + ) + collection_version.collection = collection + collection_version.save() - for name in tags: - tag, created = Tag.objects.get_or_create(name=name) - collection_version.tags.add(tag) + for name in tags: + tag, created = Tag.objects.get_or_create(name=name) + collection_version.tags.add(tag) - _update_highest_version(collection_version) + _update_highest_version(collection_version) - collection_version.save() # Save the FK updates + collection_version.save() # Save the FK updates return collection_version @@ -607,6 +586,7 @@ async def _add_collection_version(self, api_version, collection_version_url, met setattr(collection_version, attr_name, attr_value) artifact = metadata["artifact"] + collection_version.sha256 = artifact["sha256"] d_artifact = DeclarativeArtifact( artifact=Artifact(sha256=artifact["sha256"], size=artifact["size"]), url=url, @@ -1025,10 +1005,8 @@ def _pre_save(self, batch): if not isinstance(d_content.content, CollectionVersion): continue - info = d_content.content.natural_key_dict() - collection, created = Collection.objects.get_or_create( - namespace=info["namespace"], name=info["name"] - ) + namespace, name = d_content.content.namespace, d_content.content.name + collection, created = Collection.objects.get_or_create(namespace=namespace, name=name) d_content.content.collection = collection @@ -1050,12 +1028,17 @@ def _post_save(self, batch): docs_blob = d_content.extra_data.get("docs_blob", {}) if docs_blob: collection_version.docs_blob = docs_blob + d_artifact_files = d_content.extra_data.get("d_artifact_files", {}) for d_artifact in d_content.d_artifacts: artifact = d_artifact.artifact - with artifact.file.open() as artifact_file, tarfile.open( - fileobj=artifact_file, mode="r" - ) as tar: + # TODO change logic when implementing normal on-demand syncing + # Special Case for Git sync w/ metadata_only=True + if artifact_file_name := d_artifact_files.get(d_artifact): + artifact_file = open(artifact_file_name, mode="rb") + else: + artifact_file = artifact.file.open() + with tarfile.open(fileobj=artifact_file, mode="r") as tar: runtime_metadata = get_file_obj_from_tarball( tar, "meta/runtime.yml", artifact.file.name, raise_exc=False ) @@ -1074,7 +1057,7 @@ def _post_save(self, batch): collection_version.manifest = manifest_data collection_version.files = files_data info = manifest_data["collection_info"] - + artifact_file.close() # Create the tags tags = info.pop("tags") for name in tags: diff --git a/pulp_ansible/app/tasks/git.py b/pulp_ansible/app/tasks/git.py index cfb1474da..a31bb10bf 100644 --- a/pulp_ansible/app/tasks/git.py +++ b/pulp_ansible/app/tasks/git.py @@ -7,9 +7,16 @@ from pulpcore.plugin.stages import ( DeclarativeVersion, Stage, + QueryExistingArtifacts, + QueryExistingContents, + ArtifactSaver, + RemoteArtifactSaver, ) from pulp_ansible.app.models import AnsibleRepository, GitRemote -from pulp_ansible.app.tasks.collections import declarative_content_from_git_repo +from pulp_ansible.app.tasks.collections import ( + declarative_content_from_git_repo, + CollectionContentSaver, +) log = logging.getLogger(__name__) @@ -39,10 +46,30 @@ def synchronize(remote_pk, repository_pk, mirror=False): _("Synchronizing: repository=%(r)s remote=%(p)s"), {"r": repository.name, "p": remote.name} ) first_stage = GitFirstStage(remote) - d_version = DeclarativeVersion(first_stage, repository, mirror=mirror) + d_version = GitDeclarativeVersion(first_stage, repository, mirror=mirror) return d_version.create() +class GitDeclarativeVersion(DeclarativeVersion): + """ + Subclassed Declarative version creates a custom pipeline for Git sync. + """ + + def pipeline_stages(self, new_version): + """ + Build a list of stages feeding into the ContentUnitAssociation stage. + """ + return [ + self.first_stage, + QueryExistingArtifacts(), + ArtifactSaver(), + QueryExistingContents(), + # TODO: Use DocsBlobDownloader stage for Docs Blob support? + CollectionContentSaver(new_version), + RemoteArtifactSaver(), + ] + + class GitFirstStage(Stage): """ The first stage of a pulp_ansible sync pipeline for git repositories. diff --git a/pulp_ansible/tests/functional/api/collection/test_crud_collection_versions.py b/pulp_ansible/tests/functional/api/collection/test_crud_collection_versions.py index 43ffc6609..1341d436c 100644 --- a/pulp_ansible/tests/functional/api/collection/test_crud_collection_versions.py +++ b/pulp_ansible/tests/functional/api/collection/test_crud_collection_versions.py @@ -175,6 +175,4 @@ def test_05_duplicate_raise_error(self): attrs = dict(namespace="pulp", name="squeezer", version="0.0.9") with self.assertRaises(ApiException) as ctx: self.upload_collection(**attrs) - self.assertIn( - "The fields namespace, name, version must make a unique set.", ctx.exception.body - ) + self.assertIn("Collection Version already exists", ctx.exception.body)