From 6c29103b91d6866117c46d9a735f6b0028f3d1e5 Mon Sep 17 00:00:00 2001 From: Ilias Koutsakis Date: Wed, 25 Nov 2020 17:00:02 +0100 Subject: [PATCH] services: add handling of async results of upload * adds the task results to the record * addresses #1970 Signed-off-by: Ilias Koutsakis --- cap/modules/deposit/api.py | 8 +-- cap/modules/deposit/tasks.py | 84 ++++++++++++++++++++---- cap/modules/deposit/utils.py | 14 ++++ cap/modules/fixtures/schemas/zenodo.json | 69 +++++++++++++++++++ cap/modules/services/views/zenodo.py | 18 +++++ tests/integration/test_zenodo_upload.py | 5 +- 6 files changed, 178 insertions(+), 20 deletions(-) create mode 100644 cap/modules/fixtures/schemas/zenodo.json diff --git a/cap/modules/deposit/api.py b/cap/modules/deposit/api.py index 3477a9f1a9..af2fd55ad9 100644 --- a/cap/modules/deposit/api.py +++ b/cap/modules/deposit/api.py @@ -78,7 +78,7 @@ UpdateDepositPermission) from .review import Reviewable -from .tasks import upload_to_zenodo +from .tasks import create_zenodo_upload_tasks from .utils import create_zenodo_deposit _datastore = LocalProxy(lambda: current_app.extensions['security'].datastore) @@ -287,9 +287,9 @@ def upload(self, pid, *args, **kwargs): self.commit() # upload files to zenodo deposit - upload_to_zenodo.delay(files, recid, token, - deposit['id'], - deposit['links']['bucket']) + create_zenodo_upload_tasks(files, recid, token, + deposit['id'], + deposit['links']['bucket']) else: raise FileUploadError( 'You cannot create an empty Zenodo deposit. ' diff --git a/cap/modules/deposit/tasks.py b/cap/modules/deposit/tasks.py index 954e2b8174..faf43f4a97 100644 --- a/cap/modules/deposit/tasks.py +++ b/cap/modules/deposit/tasks.py @@ -21,14 +21,36 @@ # In applying this license, CERN does not # waive the privileges and immunities granted to it by virtue of its status # as an Intergovernmental Organization or submit itself to any jurisdiction. -"""Tasks.""" +"""Zenodo Upload Tasks.""" from __future__ import absolute_import, print_function import requests from flask import current_app -from celery import shared_task +from celery import chord, group, current_task, shared_task + from invenio_files_rest.models import FileInstance +from invenio_db import db + +from cap.modules.experiments.errors import ExternalAPIException +from .utils import get_zenodo_deposit_from_record + + +def create_zenodo_upload_tasks(files, recid, token, + zenodo_depid, zenodo_bucket_url): + """Create the upload tasks and get the results.""" + current_app.logger.info( + f'Uploading files to Zenodo {zenodo_depid}: {files}.') + + # the only way to have a task that waits for + # other tasks to finish is the `chord` structure + upload_callback = save_results_to_record.s(depid=zenodo_depid, recid=recid) + upload_tasks = group( + upload.s(filename, recid, token, zenodo_bucket_url) + for filename in files + ) + + chord(upload_tasks, upload_callback).delay() @shared_task(autoretry_for=(Exception, ), @@ -36,23 +58,59 @@ 'max_retries': 5, 'countdown': 10 }) -def upload_to_zenodo(files, recid, token, zenodo_depid, zenodo_bucket_url): - """Upload to Zenodo the files the user selected.""" +def upload(filename, recid, token, zenodo_bucket_url): + """Upload file to Zenodo.""" from cap.modules.deposit.api import CAPDeposit - rec = CAPDeposit.get_record(recid) + record = CAPDeposit.get_record(recid) - for filename in files: - file_obj = rec.files[filename] - file_ins = FileInstance.get(file_obj.file_id) + file_obj = record.files[filename] + file_ins = FileInstance.get(file_obj.file_id) + task_id = current_task.request.id - with open(file_ins.uri, 'rb') as fp: + with open(file_ins.uri, 'rb') as fp: + try: resp = requests.put( url=f'{zenodo_bucket_url}/{filename}', data=fp, params=dict(access_token=token), ) - if not resp.ok: - current_app.logger.error( - f'Uploading file {filename} to deposit {zenodo_depid} ' - f'failed with {resp.status_code}.') + current_app.logger.error( + f'{task_id}: Zenodo upload of file `{filename}`: {resp.status_code}.') # noqa + + status = resp.status_code + msg = resp.json() + except (ValueError, ExternalAPIException) as err: + status = 'FAILED' + msg = str(err) + + current_app.logger.error( + f'{task_id}: Something went wrong with the task:\n{msg}') + finally: + return { + 'task_id': task_id, + 'result': {'file': filename, 'status': status, 'message': msg} + } + + +@shared_task(autoretry_for=(Exception, ), + retry_kwargs={ + 'max_retries': 5, + 'countdown': 10 + }) +def save_results_to_record(tasks, depid, recid): + """Save the results of uploading to the record.""" + from cap.modules.deposit.api import CAPDeposit + record = CAPDeposit.get_record(recid) + + # update the tasks of the specified zenodo deposit (filename: status) + # this way we can attach multiple deposits + zenodo = get_zenodo_deposit_from_record(record, depid) + for task in tasks: + zenodo['tasks'][task['task_id']] = task['result'] + + record.commit() + db.session.commit() + + current_app.logger.info( + f'COMPLETED: Zenodo {depid} uploads:\n{tasks}') diff --git a/cap/modules/deposit/utils.py b/cap/modules/deposit/utils.py index 02c34cc2c7..ab57a6d7ca 100644 --- a/cap/modules/deposit/utils.py +++ b/cap/modules/deposit/utils.py @@ -82,6 +82,20 @@ def add_api_to_links(links): return response +def get_zenodo_deposit_from_record(record, pid): + """Get the related Zenodo information from a record.""" + try: + index = [idx for idx, deposit in enumerate(record['_zenodo']) + if deposit['id'] == pid][0] + + # set an empty dict as tasks if there is none + record['_zenodo'][index].setdefault('tasks', {}) + return record['_zenodo'][index] + except IndexError: + raise FileUploadError( + 'The Zenodo pid you provided is not associated with this record.') + + def create_zenodo_deposit(token, data=None): """Create a Zenodo deposit using the logged in user's credentials.""" zenodo_url = current_app.config.get("ZENODO_SERVER_URL") diff --git a/cap/modules/fixtures/schemas/zenodo.json b/cap/modules/fixtures/schemas/zenodo.json new file mode 100644 index 0000000000..d127a12f54 --- /dev/null +++ b/cap/modules/fixtures/schemas/zenodo.json @@ -0,0 +1,69 @@ +{ + "name": "zenodo", + "version": "0.0.1", + "fullname": "", + "experiment": null, + "is_indexed": false, + "use_deposit_as_record": true, + "allow_all": true, + "deposit_schema": { + "additionalProperties": "False", + "type": "array", + "items": { + "type": "object", + "properties": { + "id": { + "type": "number" + }, + "created": { + "type": "string" + }, + "title": { + "type": "string" + }, + "creator": { + "type": "string" + }, + "links": { + "type": "object", + "properties": { + "self": { + "type": "string" + }, + "html": { + "type": "string" + }, + "publish": { + "type": "string" + }, + "bucket": { + "type": "string" + } + } + }, + "tasks": { + "type": "object", + "patternProperties": { + "^[0-F]{8}-([0-F]{4}-){3}[0-F]{12}$": { + "type": "object", + "properties": { + "file": { + "type": "string" + }, + "status": { + "type": "string" + }, + "message": { + "type": "string" + } + } + } + } + } + }, + "title": "Zenodo Deposit" + } + }, + "deposit_mapping": {}, + "deposit_options": {} +} diff --git a/cap/modules/services/views/zenodo.py b/cap/modules/services/views/zenodo.py index 7830cc957b..e26bec5908 100644 --- a/cap/modules/services/views/zenodo.py +++ b/cap/modules/services/views/zenodo.py @@ -27,8 +27,11 @@ import requests from flask import current_app, jsonify +from invenio_pidstore.resolver import Resolver from . import blueprint +from cap.modules.access.utils import login_required +from cap.modules.deposit.api import CAPDeposit def _get_zenodo_record(zenodo_id): @@ -47,3 +50,18 @@ def get_zenodo_record(zenodo_id): """Get record from zenodo (route).""" resp, status = _get_zenodo_record(zenodo_id) return jsonify(resp), status + + +@blueprint.route('/zenodo/tasks/') +@login_required +def get_zenodo_tasks(depid): + """Get record from zenodo (route).""" + resolver = Resolver(pid_type='depid', + object_type='rec', + getter=lambda x: x) + + _, uuid = resolver.resolve(depid) + record = CAPDeposit.get_record(uuid) + tasks = record.get('_zenodo', {}).get('tasks', []) + + return jsonify(tasks), 200 diff --git a/tests/integration/test_zenodo_upload.py b/tests/integration/test_zenodo_upload.py index 4da17e0fb1..92ae21c517 100644 --- a/tests/integration/test_zenodo_upload.py +++ b/tests/integration/test_zenodo_upload.py @@ -104,7 +104,7 @@ def test_create_and_upload_to_zenodo(mock_token, app, users, deposit_with_file, assert len(record['_zenodo']) == 1 assert record['_zenodo'][0]['id'] == 111 - assert record['_zenodo'][0]['title'] == None + assert record['_zenodo'][0]['title'] is None assert record['_zenodo'][0]['created'] == '2020-11-20T11:49:39.147767+00:00' @@ -382,8 +382,7 @@ def test_zenodo_upload_file_not_uploaded_error(mock_token, app, users, deposit_w assert resp.status_code == 201 captured = capsys.readouterr() - assert 'Uploading file test-file.txt to deposit 111 failed with 500' \ - in captured.err + assert 'Zenodo upload of file `test-file.txt`: 500.' in captured.err @patch('cap.modules.deposit.api._fetch_token', return_value='test-token')