Skip to content

Commit

Permalink
Fix an issue with permissions (observer can change annotations) (cvat…
Browse files Browse the repository at this point in the history
…-ai#745)

* Fixed a problem with observer (check_object_permissions method was not called)
* Added a test case to cover issue cvat-ai#712.
  • Loading branch information
nmanovic authored and Chris Lee-Messer committed Mar 5, 2020
1 parent b3b7584 commit c3975b8
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 8 deletions.
23 changes: 23 additions & 0 deletions cvat/apps/engine/tests/test_rest_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1653,6 +1653,29 @@ def test_api_v1_jobs_id_annotations_user(self):
self._run_api_v1_jobs_id_annotations(self.user, self.assignee,
self.assignee)

def test_api_v1_jobs_id_annotations_observer(self):
_, jobs = self._create_task(self.user, self.assignee)
job = jobs[0]
data = {
"version": 0,
"tags": [],
"shapes": [],
"tracks": []
}

response = self._get_api_v1_jobs_id_data(job["id"], self.observer)
self.assertEqual(response.status_code, status.HTTP_200_OK)

response = self._put_api_v1_jobs_id_data(job["id"], self.observer, data)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)

response = self._patch_api_v1_jobs_id_data(job["id"], self.observer, "create", data)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)

response = self._delete_api_v1_jobs_id_data(job["id"], self.observer)
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)


def test_api_v1_jobs_id_annotations_no_auth(self):
self._run_api_v1_jobs_id_annotations(self.user, self.assignee, None)

Expand Down
18 changes: 10 additions & 8 deletions cvat/apps/engine/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def get_permissions(self):
permissions.append(auth.TaskAccessPermission)
elif http_method in ["POST"]:
permissions.append(auth.TaskCreatePermission)
elif http_method in ["PATCH", "PUT"]:
elif self.action == 'annotations' or http_method in ["PATCH", "PUT"]:
permissions.append(auth.TaskChangePermission)
elif http_method in ["DELETE"]:
permissions.append(auth.TaskDeletePermission)
Expand All @@ -207,9 +207,9 @@ def perform_destroy(self, instance):
super().perform_destroy(instance)
shutil.rmtree(task_dirname, ignore_errors=True)

@staticmethod
@action(detail=True, methods=['GET'], serializer_class=JobSerializer)
def jobs(request, pk):
def jobs(self, request, pk):
self.get_object() # force to call check_object_permissions
queryset = Job.objects.filter(segment__task_id=pk)
serializer = JobSerializer(queryset, many=True,
context={"request": request})
Expand All @@ -218,7 +218,7 @@ def jobs(request, pk):

@action(detail=True, methods=['POST'], serializer_class=TaskDataSerializer)
def data(self, request, pk):
db_task = self.get_object()
db_task = self.get_object() # call check_object_permissions as well
serializer = TaskDataSerializer(db_task, data=request.data)
if serializer.is_valid(raise_exception=True):
serializer.save()
Expand All @@ -228,6 +228,7 @@ def data(self, request, pk):
@action(detail=True, methods=['GET', 'DELETE', 'PUT', 'PATCH'],
serializer_class=LabeledDataSerializer)
def annotations(self, request, pk):
self.get_object() # force to call check_object_permissions
if request.method == 'GET':
data = annotation.get_task_data(pk, request.user)
serializer = LabeledDataSerializer(data=data)
Expand Down Expand Up @@ -267,7 +268,7 @@ def annotations(self, request, pk):
def dump(self, request, pk, filename):
filename = re.sub(r'[\\/*?:"<>|]', '_', filename)
username = request.user.username
db_task = self.get_object()
db_task = self.get_object() # call check_object_permissions as well
timestamp = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
action = request.query_params.get("action")
if action not in [None, "download"]:
Expand Down Expand Up @@ -325,6 +326,7 @@ def dump(self, request, pk, filename):

@action(detail=True, methods=['GET'], serializer_class=RqStatusSerializer)
def status(self, request, pk):
self.get_object() # force to call check_object_permissions
response = self._get_rq_response(queue="default",
job_id="/api/{}/tasks/{}".format(request.version, pk))
serializer = RqStatusSerializer(data=response)
Expand All @@ -350,12 +352,11 @@ def _get_rq_response(queue, job_id):

return response

@staticmethod
@action(detail=True, methods=['GET'], serializer_class=ImageMetaSerializer,
url_path='frames/meta')
def data_info(request, pk):
def data_info(self, request, pk):
try:
db_task = models.Task.objects.get(pk=pk)
db_task = self.get_object() # call check_object_permissions as well
meta_cache_file = open(db_task.get_image_meta_cache_path())
except OSError:
task.make_image_meta_cache(db_task)
Expand Down Expand Up @@ -404,6 +405,7 @@ def get_permissions(self):
@action(detail=True, methods=['GET', 'DELETE', 'PUT', 'PATCH'],
serializer_class=LabeledDataSerializer)
def annotations(self, request, pk):
self.get_object() # force to call check_object_permissions
if request.method == 'GET':
data = annotation.get_job_data(pk, request.user)
return Response(data)
Expand Down

0 comments on commit c3975b8

Please sign in to comment.