Skip to content

Commit

Permalink
Add sha256 uniqueness constraint to CollectionVersion
Browse files Browse the repository at this point in the history
fixes: #1052
  • Loading branch information
gerrod3 committed Jul 22, 2022
1 parent 0e115f3 commit f90ee9e
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 88 deletions.
2 changes: 2 additions & 0 deletions CHANGES/1052.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
CollectionVersion global uniqueness constraint is now its sha256 digest. Repository level uniqueness
is still (namespace, name, version).
19 changes: 19 additions & 0 deletions pulp_ansible/app/migrations/0043_collectionversion_sha256.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Generated by Django 3.2.14 on 2022-07-15 22:51

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('ansible', '0042_ansiblerepository_gpgkey'),
]

operations = [
migrations.AddField(
model_name='collectionversion',
name='sha256',
field=models.CharField(default='', max_length=64),
preserve_default=False,
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Generated by Django 3.2.14 on 2022-07-21 22:35

from django.db import migrations, transaction


def add_sha256_to_current_models(apps, schema_editor):
"""Adds the sha256 to current CollectionVersion models."""
CollectionVersion = apps.get_model('ansible', 'CollectionVersion')
ContentArtifact = apps.get_model('core', 'ContentArtifact')
collection_bulk = {}
collections_to_update = []
collections_on_demand = []

def find_and_update_sha256():
# All content have a ContentArtifact
content_artifacts = ContentArtifact.objects.filter(content__in=collection_bulk.keys()).select_related("artifact", "content")
for content_artifact in content_artifacts:
found_collection = collection_bulk[content_artifact.content.pk]
# The ContentArtifact could point to an Artifact or be on-demand
if (artifact := getattr(content_artifact, "artifact")) and artifact.sha256 is not None:
found_collection.sha256 = artifact.sha256
collections_to_update.append(found_collection)
else:
collections_on_demand.append(found_collection)
collection_bulk.clear()

for collection_version in CollectionVersion.objects.only("pk", "sha256").iterator():
if not collection_version.sha256:
collection_bulk[collection_version.pk] = collection_version
if len(collection_bulk) == 1024:
find_and_update_sha256()
if len(collections_to_update) >= 1024:
with transaction.atomic():
CollectionVersion.objects.bulk_update(collections_to_update, ["sha256",])
collections_to_update.clear()
# Update remaining collections
if len(collection_bulk) > 0:
find_and_update_sha256()
if len(collections_to_update) > 0:
with transaction.atomic():
CollectionVersion.objects.bulk_update(collections_to_update, ["sha256",])

# If there are on-demand collections then the next migration will fail, so error here with
# helpful message on how to fix. No work will be performed by this migration on a second-run.
if len(collections_on_demand) > 0:
raise Exception(
f"On demand collections found. Please remove or upload/sync their data: "
f"{[c.pk for c in collections_on_demand]}"
)


class Migration(migrations.Migration):

atomic = False

dependencies = [
('ansible', '0043_collectionversion_sha256'),
]

operations = [
migrations.RunPython(add_sha256_to_current_models, migrations.RunPython.noop)
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Generated by Django 3.2.14 on 2022-07-21 23:05

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('ansible', '0044_collectionversion_sha256_migrate'),
]

operations = [
migrations.AlterField(
model_name='collectionversion',
name='sha256',
field=models.CharField(db_index=True, max_length=64, null=False),
),
migrations.AlterUniqueTogether(
name='collectionversion',
unique_together={('sha256',)},
),
]
7 changes: 6 additions & 1 deletion pulp_ansible/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
SigningService,
Task,
)
from pulpcore.plugin.repo_version_utils import remove_duplicates, validate_repo_version
from .downloaders import AnsibleDownloaderFactory

log = getLogger(__name__)
Expand Down Expand Up @@ -128,6 +129,7 @@ class CollectionVersion(Content):
"""

TYPE = "collection_version"
repo_key_fields = ("name", "namespace", "version")

# Data Fields
authors = psql_fields.ArrayField(models.CharField(max_length=64), default=list, editable=False)
Expand All @@ -146,6 +148,7 @@ class CollectionVersion(Content):
repository = models.CharField(default="", blank=True, max_length=2000, editable=False)
version = models.CharField(max_length=128, editable=False)
requires_ansible = models.CharField(null=True, max_length=255)
sha256 = models.CharField(max_length=64, db_index=True, null=False)

is_highest = models.BooleanField(editable=False, default=False)

Expand All @@ -169,7 +172,7 @@ def relative_path(self):

class Meta:
default_related_name = "%(app_label)s_%(model_name)s"
unique_together = ("namespace", "name", "version")
unique_together = ("sha256",)
constraints = [
UniqueConstraint(
fields=("collection", "is_highest"),
Expand Down Expand Up @@ -317,6 +320,7 @@ class Meta:

def finalize_new_version(self, new_version):
"""Finalize repo version."""
remove_duplicates(new_version)
removed_collection_versions = new_version.removed(
base_version=new_version.base_version
).filter(pulp_type=CollectionVersion.get_pulp_type())
Expand Down Expand Up @@ -346,6 +350,7 @@ def finalize_new_version(self, new_version):
)

new_version.remove_content(deprecations)
validate_repo_version(new_version)


class AnsibleDistribution(Distribution):
Expand Down
6 changes: 3 additions & 3 deletions pulp_ansible/app/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,9 +381,9 @@ def validate(self, data):
data = self.deferred_validate(data)

sha256 = data["file"].hashers["sha256"].hexdigest()
artifact = Artifact.objects.filter(sha256=sha256).first()
if artifact:
ValidationError(_("Artifact already exists"))
cv = CollectionVersion.objects.filter(sha256=sha256).first()
if cv:
raise ValidationError(_("Collection Version already exists"))
temp_file = PulpTemporaryFile.init_and_validate(data.pop("file"))
temp_file.save()
data["temp_file_pk"] = str(temp_file.pk)
Expand Down
141 changes: 62 additions & 79 deletions pulp_ansible/app/tasks/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,54 +87,30 @@ async def declarative_content_from_git_repo(remote, url, git_ref=None, metadata_
gitrepo = Repo.clone_from(url, uuid4(), depth=1, multi_options=["--recurse-submodules"])
commit_sha = gitrepo.head.commit.hexsha
metadata, artifact_path = sync_collection(gitrepo.working_dir, ".")
if not metadata_only:
artifact = Artifact.init_and_validate(artifact_path)
try:
await sync_to_async(artifact.save)()
except IntegrityError:
artifact = Artifact.objects.get(sha256=artifact.sha256)
metadata["artifact_url"] = reverse("artifacts-detail", args=[artifact.pk])
metadata["artifact"] = artifact
else:
metadata["artifact"] = None
metadata["artifact_url"] = None
metadata["remote_artifact_url"] = "{}/commit/{}".format(url.rstrip("/"), commit_sha)
artifact = Artifact.init_and_validate(artifact_path)
metadata["sha256"] = artifact.sha256
remote_artifact_url = "{}/commit/{}".format(url.rstrip("/"), commit_sha)

artifact = metadata["artifact"]
try:
collection_version = await sync_to_async(create_collection_from_importer)(
metadata, metadata_only=metadata_only
)
await sync_to_async(ContentArtifact.objects.get_or_create)(
artifact=artifact,
content=collection_version,
relative_path=collection_version.relative_path,
)
except ValidationError as e:
if e.args[0]["non_field_errors"][0].code == "unique":
namespace = metadata["metadata"]["namespace"]
name = metadata["metadata"]["name"]
version = metadata["metadata"]["version"]
else:
raise e
collection_version = await sync_to_async(CollectionVersion.objects.get)(
namespace=namespace, name=name, version=version
)
if artifact is None:
artifact = Artifact()
collection_version = await sync_to_async(create_collection_from_importer)(
metadata, metadata_only=True
)
d_artifact = DeclarativeArtifact(
artifact=artifact,
url=metadata["remote_artifact_url"],
url=remote_artifact_url,
relative_path=collection_version.relative_path,
remote=remote,
deferred_download=metadata_only,
)

# TODO: docs blob support??
extra_data = {}
if metadata_only:
extra_data["d_artifact_files"] = {d_artifact: artifact_path}

d_content = DeclarativeContent(
content=collection_version,
d_artifacts=[d_artifact],
extra_data=extra_data,
)
return d_content

Expand Down Expand Up @@ -314,6 +290,7 @@ def import_collection(
artifact = Artifact.from_pulp_temporary_file(temp_file)
temp_file = None
importer_result["artifact_url"] = reverse("artifacts-detail", args=[artifact.pk])
importer_result["sha256"] = artifact.sha256
collection_version = create_collection_from_importer(importer_result)
collection_version.manifest = manifest_data
collection_version.files = files_data
Expand Down Expand Up @@ -348,51 +325,53 @@ def import_collection(
def create_collection_from_importer(importer_result, metadata_only=False):
"""
Process results from importer.
If ``metadata_only=True``, then this is being called in a sync pipeline, do not perform
any database operations, just return an unsaved CollectionVersion.
"""
collection_info = importer_result["metadata"]
tags = collection_info.pop("tags")

# Remove fields not used by this model
collection_info.pop("license_file")
collection_info.pop("readme")

# the importer returns many None values. We need to let the defaults in the model prevail
for key in ["description", "documentation", "homepage", "issues", "repository"]:
if collection_info[key] is None:
collection_info.pop(key)

collection_version = CollectionVersion(
**collection_info,
requires_ansible=importer_result.get("requires_ansible"),
contents=importer_result["contents"],
docs_blob=importer_result["docs_blob"],
sha256=importer_result["sha256"],
)

with transaction.atomic():
collection, created = Collection.objects.get_or_create(
namespace=collection_info["namespace"], name=collection_info["name"]
)

tags = collection_info.pop("tags")

# Remove fields not used by this model
collection_info.pop("license_file")
collection_info.pop("readme")

# the importer returns many None values. We need to let the defaults in the model prevail
for key in ["description", "documentation", "homepage", "issues", "repository"]:
if collection_info[key] is None:
collection_info.pop(key)

collection_version = CollectionVersion(
collection=collection,
**collection_info,
requires_ansible=importer_result.get("requires_ansible"),
contents=importer_result["contents"],
docs_blob=importer_result["docs_blob"],
)
serializer_fields = CollectionVersionSerializer.Meta.fields
data = {k: v for k, v in collection_version.__dict__.items() if k in serializer_fields}
data["id"] = collection_version.pulp_id

serializer_fields = CollectionVersionSerializer.Meta.fields
data = {k: v for k, v in collection_version.__dict__.items() if k in serializer_fields}
data["id"] = collection_version.pulp_id
if not metadata_only:
data["artifact"] = importer_result["artifact_url"]
serializer = CollectionVersionSerializer(data=data)

serializer = CollectionVersionSerializer(data=data)
serializer.is_valid(raise_exception=True)

serializer.is_valid(raise_exception=True)
collection_version.save()
if not metadata_only:
with transaction.atomic():
collection, created = Collection.objects.get_or_create(
namespace=collection_info["namespace"], name=collection_info["name"]
)
collection_version.collection = collection
collection_version.save()

for name in tags:
tag, created = Tag.objects.get_or_create(name=name)
collection_version.tags.add(tag)
for name in tags:
tag, created = Tag.objects.get_or_create(name=name)
collection_version.tags.add(tag)

_update_highest_version(collection_version)
_update_highest_version(collection_version)

collection_version.save() # Save the FK updates
collection_version.save() # Save the FK updates
return collection_version


Expand Down Expand Up @@ -607,6 +586,7 @@ async def _add_collection_version(self, api_version, collection_version_url, met
setattr(collection_version, attr_name, attr_value)

artifact = metadata["artifact"]
collection_version.sha256 = artifact["sha256"]
d_artifact = DeclarativeArtifact(
artifact=Artifact(sha256=artifact["sha256"], size=artifact["size"]),
url=url,
Expand Down Expand Up @@ -1025,10 +1005,8 @@ def _pre_save(self, batch):
if not isinstance(d_content.content, CollectionVersion):
continue

info = d_content.content.natural_key_dict()
collection, created = Collection.objects.get_or_create(
namespace=info["namespace"], name=info["name"]
)
namespace, name = d_content.content.namespace, d_content.content.name
collection, created = Collection.objects.get_or_create(namespace=namespace, name=name)

d_content.content.collection = collection

Expand All @@ -1050,12 +1028,17 @@ def _post_save(self, batch):
docs_blob = d_content.extra_data.get("docs_blob", {})
if docs_blob:
collection_version.docs_blob = docs_blob
d_artifact_files = d_content.extra_data.get("d_artifact_files", {})

for d_artifact in d_content.d_artifacts:
artifact = d_artifact.artifact
with artifact.file.open() as artifact_file, tarfile.open(
fileobj=artifact_file, mode="r"
) as tar:
# TODO change logic when implementing normal on-demand syncing
# Special Case for Git sync w/ metadata_only=True
if artifact_file_name := d_artifact_files.get(d_artifact):
artifact_file = open(artifact_file_name, mode="rb")
else:
artifact_file = artifact.file.open()
with tarfile.open(fileobj=artifact_file, mode="r") as tar:
runtime_metadata = get_file_obj_from_tarball(
tar, "meta/runtime.yml", artifact.file.name, raise_exc=False
)
Expand All @@ -1074,7 +1057,7 @@ def _post_save(self, batch):
collection_version.manifest = manifest_data
collection_version.files = files_data
info = manifest_data["collection_info"]

artifact_file.close()
# Create the tags
tags = info.pop("tags")
for name in tags:
Expand Down
Loading

0 comments on commit f90ee9e

Please sign in to comment.