From a50e08f1cf1efd3bc92078a6d29a8decc91bc929 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=B4mulo=20Penido?= Date: Tue, 7 Nov 2023 10:30:19 -0300 Subject: [PATCH] refactor: import taxonomy view --- .../core/tagging/import_export/api.py | 20 - .../core/tagging/rest_api/v1/serializers.py | 9 +- .../core/tagging/rest_api/v1/urls.py | 5 - .../core/tagging/rest_api/v1/views.py | 80 +++- .../core/tagging/rest_api/v1/views_import.py | 54 +-- .../core/tagging/test_views.py | 345 ++++++++++++++++++ .../core/tagging/test_views_import.py | 187 ---------- 7 files changed, 431 insertions(+), 269 deletions(-) diff --git a/openedx_tagging/core/tagging/import_export/api.py b/openedx_tagging/core/tagging/import_export/api.py index 7112a3cd..6afa9096 100644 --- a/openedx_tagging/core/tagging/import_export/api.py +++ b/openedx_tagging/core/tagging/import_export/api.py @@ -48,32 +48,12 @@ from django.utils.translation import gettext as _ -from .. import api as taxonomy_api from ..models import TagImportTask, TagImportTaskState, Taxonomy from .exceptions import TagImportError from .import_plan import TagImportPlan, TagImportTask from .parsers import ParserFormat, get_parser -def create_taxonomy_and_import_tags( - taxonomy_name: str, - taxonomy_description: str, - file: BytesIO, - parser_format: ParserFormat -) -> bool: - """ - Create a taxonomy and import the tags from `file` - """ - taxonomy = taxonomy_api.create_taxonomy(taxonomy_name, taxonomy_description) - - import_success = import_tags(taxonomy, file, parser_format) - - if not import_success: - taxonomy.delete() - - return import_success - - def import_tags( taxonomy: Taxonomy, file: BytesIO, diff --git a/openedx_tagging/core/tagging/rest_api/v1/serializers.py b/openedx_tagging/core/tagging/rest_api/v1/serializers.py index a2a0cf4e..0cdb4826 100644 --- a/openedx_tagging/core/tagging/rest_api/v1/serializers.py +++ b/openedx_tagging/core/tagging/rest_api/v1/serializers.py @@ -182,8 +182,6 @@ class TaxonomyImportBodySerializer(serializers.Serializer): # pylint: disable=a """ Serializer of the body for the Taxonomy Import action """ - taxonomy_name = serializers.CharField(required=True) - taxonomy_description = serializers.CharField(default="") file = serializers.FileField(required=True) def validate(self, data): @@ -198,3 +196,10 @@ def validate(self, data): data['parser_format'] = parser_format return data + +class TaxonomyImportNewBodySerializer(TaxonomyImportBodySerializer): # pylint: disable=abstract-method + """ + Serializer of the body for the Taxonomy Create and Import action + """ + taxonomy_name = serializers.CharField(required=True) + taxonomy_description = serializers.CharField(default="") diff --git a/openedx_tagging/core/tagging/rest_api/v1/urls.py b/openedx_tagging/core/tagging/rest_api/v1/urls.py index dc0e7a83..72ff87d0 100644 --- a/openedx_tagging/core/tagging/rest_api/v1/urls.py +++ b/openedx_tagging/core/tagging/rest_api/v1/urls.py @@ -23,9 +23,4 @@ views_import.TemplateView.as_view(), name="taxonomy-import-template", ), - path( - "import/", - views_import.ImportView.as_view(), - name="taxonomy-import", - ), ] diff --git a/openedx_tagging/core/tagging/rest_api/v1/views.py b/openedx_tagging/core/tagging/rest_api/v1/views.py index 951c8e6f..088be7f2 100644 --- a/openedx_tagging/core/tagging/rest_api/v1/views.py +++ b/openedx_tagging/core/tagging/rest_api/v1/views.py @@ -4,11 +4,12 @@ from __future__ import annotations from django.db import models -from django.http import Http404, HttpResponse +from django.http import Http404, HttpResponse, HttpResponseBadRequest from rest_framework import mixins, status from rest_framework.decorators import action from rest_framework.exceptions import MethodNotAllowed, PermissionDenied, ValidationError from rest_framework.generics import ListAPIView, RetrieveUpdateDestroyAPIView +from rest_framework.request import Request from rest_framework.response import Response from rest_framework.viewsets import GenericViewSet, ModelViewSet @@ -24,7 +25,7 @@ update_tag_in_taxonomy, ) from ...data import TagDataQuerySet -from ...import_export.api import export_tags +from ...import_export.api import export_tags, import_tags from ...import_export.parsers import ParserFormat from ...models import Taxonomy from ...rules import ObjectTagPermissionItem @@ -37,6 +38,8 @@ ObjectTagUpdateQueryParamsSerializer, TagDataSerializer, TaxonomyExportQueryParamsSerializer, + TaxonomyImportBodySerializer, + TaxonomyImportNewBodySerializer, TaxonomyListQueryParamsSerializer, TaxonomySerializer, TaxonomyTagCreateBodySerializer, @@ -164,7 +167,29 @@ class TaxonomyView(ModelViewSet): * 400 - Invalid query parameter * 403 - Permission denied + **Import/Create Taxonomy Example Requests** + POST /tagging/rest_api/v1/taxonomy/import/ + { + "taxonomy_name": "Taxonomy Name", + "taxonomy_description": "This is a description", + "file": , + } + + **Import/Create Taxonomy Query Returns** + * 200 - Success + * 400 - Bad request + * 403 - Permission denied + + **Import/Update Taxonomy Example Requests** + PUT /tagging/rest_api/v1/taxonomy/:pk/tags/import/ + { + "file": , + } + **Import/Update Taxonomy Query Returns** + * 200 - Success + * 400 - Bad request + * 403 - Permission denied """ lookup_value_regex = r"\d+" @@ -239,6 +264,57 @@ def export(self, request, **_kwargs) -> HttpResponse: return HttpResponse(tags, content_type=content_type) + @action(detail=False, url_path="import", methods=["post"]) + def create_import(self, request: Request, **_kwargs) -> HttpResponse: + """ + Create a new taxonomy and imports the tags from the uploaded file. + """ + perm = "oel_tagging.import_taxonomy" + if not request.user.has_perm(perm): + raise PermissionDenied("You do not have permission to import taxonomies") + + body = TaxonomyImportNewBodySerializer(data=request.data) + body.is_valid(raise_exception=True) + + taxonomy_name = body.validated_data["taxonomy_name"] + taxonomy_description = body.validated_data["taxonomy_description"] + file = body.validated_data["file"].file + parser_format = body.validated_data["parser_format"] + + taxonomy = create_taxonomy(taxonomy_name, taxonomy_description) + import_success = import_tags(taxonomy, file, parser_format) + + if import_success: + return HttpResponse(status=200) + else: + import_error = "Error importing taxonomy" # ToDo: Get actual error message + taxonomy.delete() + return HttpResponseBadRequest(import_error) + + @action(detail=True, url_path="tags/import", methods=["put"]) + def update_import(self, request: Request, **_kwargs) -> HttpResponse: + """ + Imports tags from the uploaded file to an already created taxonomy. + """ + perm = "oel_tagging.import_taxonomy" + if not request.user.has_perm(perm): + raise PermissionDenied("You do not have permission to import taxonomies") + + body = TaxonomyImportBodySerializer(data=request.data) + body.is_valid(raise_exception=True) + + file = body.validated_data["file"].file + parser_format = body.validated_data["parser_format"] + + taxonomy = self.get_object() + import_success = import_tags(taxonomy, file, parser_format) + + if import_success: + return HttpResponse(status=200) + else: + import_error = "Error importing taxonomy" # ToDo: Get actual error message + return HttpResponseBadRequest(import_error) + @view_auth_classes class ObjectTagView( diff --git a/openedx_tagging/core/tagging/rest_api/v1/views_import.py b/openedx_tagging/core/tagging/rest_api/v1/views_import.py index 80332838..6c3a6339 100644 --- a/openedx_tagging/core/tagging/rest_api/v1/views_import.py +++ b/openedx_tagging/core/tagging/rest_api/v1/views_import.py @@ -5,14 +5,10 @@ import os -from django.http import FileResponse, Http404, HttpResponse, HttpResponseBadRequest -from rest_framework.exceptions import PermissionDenied +from django.http import FileResponse, Http404 from rest_framework.request import Request from rest_framework.views import APIView -from ...import_export import api -from .serializers import TaxonomyImportBodySerializer - class TemplateView(APIView): """ @@ -53,51 +49,3 @@ def get(self, request: Request, file_ext: str, *args, **kwargs) -> FileResponse: response = FileResponse(fh, content_type=content_type) response['Content-Disposition'] = content_disposition return response - - -class ImportView(APIView): - """ - View to import taxonomies - - **Example Requests** - POST /tagging/rest_api/v1/import/ - { - "taxonomy_name": "Taxonomy Name", - "taxonomy_description": "This is a description", - "file": , - } - - **Query Returns** - * 200 - Success - * 400 - Bad request - * 405 - Method not allowed - """ - http_method_names = ['post'] - - def post(self, request: Request, *args, **kwargs) -> HttpResponse: - """ - Imports the taxonomy from the uploaded file. - """ - perm = "oel_tagging.import_taxonomy" - if not request.user.has_perm(perm): - raise PermissionDenied("You do not have permission to import taxonomies") - - body = TaxonomyImportBodySerializer(data=request.data) - body.is_valid(raise_exception=True) - - taxonomy_name = body.validated_data["taxonomy_name"] - taxonomy_description = body.validated_data["taxonomy_description"] - file = body.validated_data["file"].file - parser_format = body.validated_data["parser_format"] - - import_success = api.create_taxonomy_and_import_tags( - taxonomy_name=taxonomy_name, - taxonomy_description=taxonomy_description, - file=file, - parser_format=parser_format, - ) - - if import_success: - return HttpResponse(status=200) - else: - return HttpResponseBadRequest("Error importing taxonomy") diff --git a/tests/openedx_tagging/core/tagging/test_views.py b/tests/openedx_tagging/core/tagging/test_views.py index 6a312546..a5b69c25 100644 --- a/tests/openedx_tagging/core/tagging/test_views.py +++ b/tests/openedx_tagging/core/tagging/test_views.py @@ -3,12 +3,14 @@ """ from __future__ import annotations +import json from urllib.parse import parse_qs, quote, quote_plus, urlparse import ddt # type: ignore[import] # typing support in rules depends on https://github.com/dfunckt/django-rules/pull/177 import rules # type: ignore[import] from django.contrib.auth import get_user_model +from django.core.files.uploadedfile import SimpleUploadedFile from rest_framework import status from rest_framework.test import APITestCase @@ -27,6 +29,8 @@ TAXONOMY_DETAIL_URL = "/tagging/rest_api/v1/taxonomies/{pk}/" TAXONOMY_EXPORT_URL = "/tagging/rest_api/v1/taxonomies/{pk}/export/" TAXONOMY_TAGS_URL = "/tagging/rest_api/v1/taxonomies/{pk}/tags/" +TAXONOMY_TAGS_IMPORT_URL = "/tagging/rest_api/v1/taxonomies/{pk}/tags/import/" +TAXONOMY_CREATE_IMPORT_URL = "/tagging/rest_api/v1/taxonomies/import/" OBJECT_TAGS_RETRIEVE_URL = "/tagging/rest_api/v1/object_tags/{object_id}/" @@ -1856,3 +1860,344 @@ def test_delete_tag_in_taxonomy_without_subtags(self): # Check that Tag no longer exists with self.assertRaises(Tag.DoesNotExist): existing_tag.refresh_from_db() + + +class ImportTaxonomyMixin(TestTaxonomyViewMixin): + """ + Mixin to test importing taxonomies. + """ + def _get_file(self, tags: list, file_format: str) -> SimpleUploadedFile: + """ + Returns a file for the given format. + """ + if file_format == "csv": + csv_data = "id,value" + for tag in tags: + csv_data += f"\n{tag['id']},{tag['value']}" + return SimpleUploadedFile("taxonomy.csv", csv_data.encode(), content_type="text/csv") + else: # json + json_data = {"tags": tags} + return SimpleUploadedFile("taxonomy.json", json.dumps(json_data).encode(), content_type="application/json") + + +@ddt.ddt +class TestCreateImportView(ImportTaxonomyMixin, APITestCase): + """ + Tests the create/import taxonomy action. + """ + @ddt.data( + "csv", + "json", + ) + def test_import(self, file_format: str) -> None: + """ + Tests importing a valid taxonomy file. + """ + url = TAXONOMY_CREATE_IMPORT_URL + new_tags = [ + {"id": "tag_1", "value": "Tag 1"}, + {"id": "tag_2", "value": "Tag 2"}, + {"id": "tag_3", "value": "Tag 3"}, + {"id": "tag_4", "value": "Tag 4"}, + ] + file = self._get_file(new_tags, file_format) + + self.client.force_authenticate(user=self.staff) + response = self.client.post( + url, + { + "taxonomy_name": "Imported Taxonomy name", + "taxonomy_description": "Imported Taxonomy description", + "file": file, + }, + format="multipart" + ) + assert response.status_code == status.HTTP_200_OK + + # Check if the taxonomy was created + taxonomy = Taxonomy.objects.get(name="Imported Taxonomy name") + assert taxonomy.description == "Imported Taxonomy description" + + # Check if the tags were created + tags = list(Tag.objects.filter(taxonomy=taxonomy)) + assert len(tags) == len(new_tags) + for i, tag in enumerate(tags): + assert tag.value == new_tags[i]["value"] + + def test_import_no_file(self) -> None: + """ + Tests importing a taxonomy without a file. + """ + url = TAXONOMY_CREATE_IMPORT_URL + self.client.force_authenticate(user=self.staff) + response = self.client.post( + url, + { + "taxonomy_name": "Imported Taxonomy name", + "taxonomy_description": "Imported Taxonomy description", + }, + format="multipart" + ) + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert response.data["file"][0] == "No file was submitted." + + # Check if the taxonomy was not created + assert not Taxonomy.objects.filter(name="Imported Taxonomy name").exists() + + @ddt.data( + "csv", + "json", + ) + def test_import_no_name(self, file_format) -> None: + """ + Tests importing a taxonomy without specifing a name. + """ + url = TAXONOMY_CREATE_IMPORT_URL + file = SimpleUploadedFile(f"taxonomy.{file_format}", b"invalid file content") + self.client.force_authenticate(user=self.staff) + response = self.client.post( + url, + { + "taxonomy_description": "Imported Taxonomy description", + "file": file, + }, + format="multipart" + ) + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert response.data["taxonomy_name"][0] == "This field is required." + + # Check if the taxonomy was not created + assert not Taxonomy.objects.filter(name="Imported Taxonomy name").exists() + + def test_import_invalid_format(self) -> None: + """ + Tests importing a taxonomy with an invalid file format. + """ + url = TAXONOMY_CREATE_IMPORT_URL + file = SimpleUploadedFile("taxonomy.invalid", b"invalid file content") + self.client.force_authenticate(user=self.staff) + response = self.client.post( + url, + { + "taxonomy_name": "Imported Taxonomy name", + "taxonomy_description": "Imported Taxonomy description", + "file": file, + }, + format="multipart" + ) + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert response.data["file"][0] == "File type not supported: invalid" + + # Check if the taxonomy was not created + assert not Taxonomy.objects.filter(name="Imported Taxonomy name").exists() + + @ddt.data( + "csv", + "json", + ) + def test_import_invalid_content(self, file_format) -> None: + """ + Tests importing a taxonomy with an invalid file content. + """ + url = TAXONOMY_CREATE_IMPORT_URL + file = SimpleUploadedFile(f"taxonomy.{file_format}", b"invalid file content") + self.client.force_authenticate(user=self.staff) + response = self.client.post( + url, + { + "taxonomy_name": "Imported Taxonomy name", + "taxonomy_description": "Imported Taxonomy description", + "file": file, + }, + format="multipart" + ) + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert response.content == b"Error importing taxonomy" + + # Check if the taxonomy was not created + assert not Taxonomy.objects.filter(name="Imported Taxonomy name").exists() + + def test_import_no_perm(self) -> None: + """ + Tests importing a taxonomy using a user without permission. + """ + url = TAXONOMY_CREATE_IMPORT_URL + new_tags = [ + {"id": "tag_1", "value": "Tag 1"}, + {"id": "tag_2", "value": "Tag 2"}, + {"id": "tag_3", "value": "Tag 3"}, + {"id": "tag_4", "value": "Tag 4"}, + ] + file = self._get_file(new_tags, "json") + + self.client.force_authenticate(user=self.user) + response = self.client.post( + url, + { + "taxonomy_name": "Imported Taxonomy name", + "taxonomy_description": "Imported Taxonomy description", + "file": file, + }, + format="multipart" + ) + assert response.status_code == status.HTTP_403_FORBIDDEN + + # Check if the taxonomy was not created + assert not Taxonomy.objects.filter(name="Imported Taxonomy name").exists() + +@ddt.ddt +class TestImportTagsView(ImportTaxonomyMixin, APITestCase): + """ + Tests the taxonomy import tags action. + """ + def setUp(self): + ImportTaxonomyMixin.setUp(self) + + self.taxonomy = Taxonomy.objects.create( + name="Test import taxonomy", + ) + tag_1 = Tag.objects.create( + taxonomy=self.taxonomy, + external_id="old_tag_1", + value="Old tag 1", + ) + tag_2 = Tag.objects.create( + taxonomy=self.taxonomy, + external_id="old_tag_2", + value="Old tag 2", + ) + self.old_tags = [tag_1, tag_2] + + @ddt.data( + "csv", + "json", + ) + def test_import(self, file_format: str) -> None: + """ + Tests importing a valid taxonomy file. + """ + url = TAXONOMY_TAGS_IMPORT_URL.format(pk=self.taxonomy.id) + new_tags = [ + {"id": "tag_1", "value": "Tag 1"}, + {"id": "tag_2", "value": "Tag 2"}, + {"id": "tag_3", "value": "Tag 3"}, + {"id": "tag_4", "value": "Tag 4"}, + ] + file = self._get_file(new_tags, file_format) + + self.client.force_authenticate(user=self.staff) + response = self.client.put( + url, + { "file": file }, + format="multipart" + ) + assert response.status_code == status.HTTP_200_OK + + # Check if the tags were created + tags = list(Tag.objects.filter(taxonomy=self.taxonomy)) + all_tags = [{"value": tag.value} for tag in self.old_tags] + new_tags + assert len(tags) == len(all_tags) + for i, tag in enumerate(tags): + assert tag.value == all_tags[i]["value"] + + def test_import_no_file(self) -> None: + """ + Tests importing a taxonomy without a file. + """ + url = TAXONOMY_TAGS_IMPORT_URL.format(pk=self.taxonomy.id) + self.client.force_authenticate(user=self.staff) + response = self.client.put( + url, + {}, + format="multipart" + ) + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert response.data["file"][0] == "No file was submitted." + + # Check if the taxonomy was not changed + tags = list(Tag.objects.filter(taxonomy=self.taxonomy)) + assert len(tags) == len(self.old_tags) + for i, tag in enumerate(tags): + assert tag.value == self.old_tags[i].value + + def test_import_invalid_format(self) -> None: + """ + Tests importing a taxonomy with an invalid file format. + """ + url = TAXONOMY_TAGS_IMPORT_URL.format(pk=self.taxonomy.id) + file = SimpleUploadedFile("taxonomy.invalid", b"invalid file content") + self.client.force_authenticate(user=self.staff) + response = self.client.put( + url, + { "file": file }, + format="multipart" + ) + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert response.data["file"][0] == "File type not supported: invalid" + + # Check if the taxonomy was not changed + tags = list(Tag.objects.filter(taxonomy=self.taxonomy)) + assert len(tags) == len(self.old_tags) + for i, tag in enumerate(tags): + assert tag.value == self.old_tags[i].value + + + @ddt.data( + "csv", + "json", + ) + def test_import_invalid_content(self, file_format) -> None: + """ + Tests importing a taxonomy with an invalid file content. + """ + url = TAXONOMY_TAGS_IMPORT_URL.format(pk=self.taxonomy.id) + file = SimpleUploadedFile(f"taxonomy.{file_format}", b"invalid file content") + self.client.force_authenticate(user=self.staff) + response = self.client.put( + url, + { + "taxonomy_name": "Imported Taxonomy name", + "taxonomy_description": "Imported Taxonomy description", + "file": file, + }, + format="multipart" + ) + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert response.content == b"Error importing taxonomy" + + # Check if the taxonomy was not changed + tags = list(Tag.objects.filter(taxonomy=self.taxonomy)) + assert len(tags) == len(self.old_tags) + for i, tag in enumerate(tags): + assert tag.value == self.old_tags[i].value + + def test_import_no_perm(self) -> None: + """ + Tests importing a taxonomy using a user without permission. + """ + url = TAXONOMY_TAGS_IMPORT_URL.format(pk=self.taxonomy.id) + new_tags = [ + {"id": "tag_1", "value": "Tag 1"}, + {"id": "tag_2", "value": "Tag 2"}, + {"id": "tag_3", "value": "Tag 3"}, + {"id": "tag_4", "value": "Tag 4"}, + ] + file = self._get_file(new_tags, "json") + + self.client.force_authenticate(user=self.user) + response = self.client.put( + url, + { + "taxonomy_name": "Imported Taxonomy name", + "taxonomy_description": "Imported Taxonomy description", + "file": file, + }, + format="multipart" + ) + assert response.status_code == status.HTTP_403_FORBIDDEN + + # Check if the taxonomy was not changed + tags = list(Tag.objects.filter(taxonomy=self.taxonomy)) + assert len(tags) == len(self.old_tags) + for i, tag in enumerate(tags): + assert tag.value == self.old_tags[i].value diff --git a/tests/openedx_tagging/core/tagging/test_views_import.py b/tests/openedx_tagging/core/tagging/test_views_import.py index 20611128..4c2d94c9 100644 --- a/tests/openedx_tagging/core/tagging/test_views_import.py +++ b/tests/openedx_tagging/core/tagging/test_views_import.py @@ -3,20 +3,11 @@ """ from __future__ import annotations -import json - import ddt # type: ignore[import] -from django.contrib.auth import get_user_model -from django.core.files.uploadedfile import SimpleUploadedFile from rest_framework import status from rest_framework.test import APITestCase -from openedx_tagging.core.tagging.models import Tag, Taxonomy - TAXONOMY_TEMPLATE_URL = "/tagging/rest_api/v1/import/{filename}" -TAXONOMY_IMPORT_URL = "/tagging/rest_api/v1/import/" - -User = get_user_model() @ddt.ddt @@ -46,181 +37,3 @@ def test_download_method_not_allowed(self): url = TAXONOMY_TEMPLATE_URL.format(filename="template.txt") response = self.client.post(url) assert response.status_code == status.HTTP_405_METHOD_NOT_ALLOWED - - -@ddt.ddt -class TestImportView(APITestCase): - """ - Tests the import taxonomy view. - """ - def setUp(self): - super().setUp() - - self.user = User.objects.create( - username="user", - email="user@example.com", - ) - - self.staff = User.objects.create( - username="staff", - email="staff@example.com", - is_staff=True, - ) - - def _get_file(self, tags: list, file_format: str) -> SimpleUploadedFile: - """ - Returns a file for the given format. - """ - if file_format == "csv": - csv_data = "id,value" - for tag in tags: - csv_data += f"\n{tag['id']},{tag['value']}" - return SimpleUploadedFile("taxonomy.csv", csv_data.encode(), content_type="text/csv") - else: # json - json_data = {"tags": tags} - return SimpleUploadedFile("taxonomy.json", json.dumps(json_data).encode(), content_type="application/json") - - @ddt.data( - "csv", - "json", - ) - def test_import(self, file_format: str) -> None: - """ - Tests importing a valid taxonomy file. - """ - url = TAXONOMY_IMPORT_URL - new_tags = [ - {"id": "tag_1", "value": "Tag 1"}, - {"id": "tag_2", "value": "Tag 2"}, - {"id": "tag_3", "value": "Tag 3"}, - {"id": "tag_4", "value": "Tag 4"}, - ] - file = self._get_file(new_tags, file_format) - - self.client.force_authenticate(user=self.staff) - response = self.client.post( - url, - { - "taxonomy_name": "Imported Taxonomy name", - "taxonomy_description": "Imported Taxonomy description", - "file": file, - }, - format="multipart" - ) - assert response.status_code == status.HTTP_200_OK - - # Check if the taxonomy was created - taxonomy = Taxonomy.objects.get(name="Imported Taxonomy name") - assert taxonomy.description == "Imported Taxonomy description" - - # Check if the tags were created - tags = list(Tag.objects.filter(taxonomy=taxonomy)) - assert len(tags) == len(new_tags) - for i, tag in enumerate(tags): - assert tag.value == new_tags[i]["value"] - - def test_import_no_file(self) -> None: - """ - Tests importing a taxonomy without a file. - """ - url = TAXONOMY_IMPORT_URL - self.client.force_authenticate(user=self.staff) - response = self.client.post( - url, - { - "taxonomy_name": "Imported Taxonomy name", - "taxonomy_description": "Imported Taxonomy description", - }, - format="multipart" - ) - assert response.status_code == status.HTTP_400_BAD_REQUEST - assert response.data["file"][0] == "No file was submitted." - - @ddt.data( - "csv", - "json", - ) - def test_import_no_name(self, file_format) -> None: - """ - Tests importing a taxonomy without specifing a name. - """ - url = TAXONOMY_IMPORT_URL - file = SimpleUploadedFile(f"taxonomy.{file_format}", b"invalid file content") - self.client.force_authenticate(user=self.staff) - response = self.client.post( - url, - { - "taxonomy_description": "Imported Taxonomy description", - "file": file, - }, - format="multipart" - ) - assert response.status_code == status.HTTP_400_BAD_REQUEST - assert response.data["taxonomy_name"][0] == "This field is required." - - def test_import_invalid_format(self) -> None: - """ - Tests importing a taxonomy with an invalid file format. - """ - url = TAXONOMY_IMPORT_URL - file = SimpleUploadedFile("taxonomy.invalid", b"invalid file content") - self.client.force_authenticate(user=self.staff) - response = self.client.post( - url, - { - "taxonomy_name": "Imported Taxonomy name", - "taxonomy_description": "Imported Taxonomy description", - "file": file, - }, - format="multipart" - ) - assert response.status_code == status.HTTP_400_BAD_REQUEST - assert response.data["file"][0] == "File type not supported: invalid" - - @ddt.data( - "csv", - "json", - ) - def test_import_invalid_content(self, file_format) -> None: - """ - Tests importing a taxonomy with an invalid file content. - """ - url = TAXONOMY_IMPORT_URL - file = SimpleUploadedFile(f"taxonomy.{file_format}", b"invalid file content") - self.client.force_authenticate(user=self.staff) - response = self.client.post( - url, - { - "taxonomy_name": "Imported Taxonomy name", - "taxonomy_description": "Imported Taxonomy description", - "file": file, - }, - format="multipart" - ) - assert response.status_code == status.HTTP_400_BAD_REQUEST - assert response.content == b"Error importing taxonomy" - - def test_import_no_perm(self) -> None: - """ - Tests importing a taxonomy using a user without permission. - """ - url = TAXONOMY_IMPORT_URL - new_tags = [ - {"id": "tag_1", "value": "Tag 1"}, - {"id": "tag_2", "value": "Tag 2"}, - {"id": "tag_3", "value": "Tag 3"}, - {"id": "tag_4", "value": "Tag 4"}, - ] - file = self._get_file(new_tags, "json") - - self.client.force_authenticate(user=self.user) - response = self.client.post( - url, - { - "taxonomy_name": "Imported Taxonomy name", - "taxonomy_description": "Imported Taxonomy description", - "file": file, - }, - format="multipart" - ) - assert response.status_code == status.HTTP_403_FORBIDDEN