Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zenodo: add metadata on deposit #1957

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions cap/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -720,10 +720,7 @@ def _(x):

# Zenodo
# ======
ZENODO_SERVER_URL = os.environ.get('APP_ZENODO_SERVER_URL',
'https://zenodo.org/api')

ZENODO_ACCESS_TOKEN = os.environ.get('APP_ZENODO_ACCESS_TOKEN', 'CHANGE_ME')
ZENODO_SERVER_URL = os.environ.get('APP_ZENODO_SERVER_URL', 'https://zenodo.org/api') # noqa

# Endpoints
# =========
Expand Down
133 changes: 86 additions & 47 deletions cap/modules/deposit/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from sqlalchemy.orm.exc import NoResultFound
from werkzeug.local import LocalProxy

from cap.modules.auth.ext import _fetch_token
from cap.modules.deposit.errors import DisconnectWebhookError, FileUploadError
from cap.modules.deposit.validators import NoRequiredValidator
from cap.modules.experiments.permissions import exp_need_factory
Expand All @@ -60,14 +61,15 @@
from cap.modules.repos.tasks import download_repo, download_repo_file
from cap.modules.repos.utils import (create_webhook, disconnect_subscriber,
parse_git_url)
from cap.modules.services.serializers.zenodo import ZenodoUploadSchema
from cap.modules.schemas.resolvers import (resolve_schema_by_url,
schema_name_to_url)
from cap.modules.user.errors import DoesNotExistInLDAP
from cap.modules.user.utils import (get_existing_or_register_role,
get_existing_or_register_user)

from .errors import (DepositValidationError, UpdateDepositPermissionsError,
ReviewError)
ReviewError, InputValidationError)
from .fetchers import cap_deposit_fetcher
from .minters import cap_deposit_minter
from .permissions import (AdminDepositPermission, CloneDepositPermission,
Expand All @@ -76,6 +78,8 @@
UpdateDepositPermission)

from .review import Reviewable
from .tasks import create_zenodo_upload_tasks
from .utils import create_zenodo_deposit

_datastore = LocalProxy(lambda: current_app.extensions['security'].datastore)

Expand Down Expand Up @@ -254,55 +258,90 @@ def upload(self, pid, *args, **kwargs):
with UpdateDepositPermission(self).require(403):
if request:
_, rec = request.view_args.get('pid_value').data
record_uuid = str(rec.id)
recid = str(rec.id)
data = request.get_json()
webhook = data.get('webhook', False)
event_type = data.get('event_type', 'release')

try:
url = data['url']
except KeyError:
raise FileUploadError('Missing url parameter.')

try:
host, owner, repo, branch, filepath = parse_git_url(url)
api = create_git_api(host, owner, repo, branch,
current_user.id)

if filepath:
if webhook:
raise FileUploadError(
'You cannot create a webhook on a file')

download_repo_file(
record_uuid,
f'repositories/{host}/{owner}/{repo}/{api.branch or api.sha}/{filepath}', # noqa
*api.get_file_download(filepath),
api.auth_headers,
)
elif webhook:
if event_type == 'release':
if branch:
raise FileUploadError(
'You cannot create a release webhook'
' for a specific branch or sha.')
target = data.get('target')

if target == 'zenodo':
# check for token
token = _fetch_token('zenodo')
if not token:
raise FileUploadError(
'Please connect your Zenodo account '
'before creating a deposit.')

files = data.get('files', [])
zenodo_data = data.get('zenodo_data')
input = dict(files=files, data=zenodo_data) \
if zenodo_data else dict(files=files)

if files:
_, errors = ZenodoUploadSchema(recid=recid).load(input)
if errors:
raise InputValidationError(
'Validation error in Zenodo input data.',
errors=errors)

deposit = create_zenodo_deposit(token, zenodo_data)
self.setdefault('_zenodo', []).append(deposit)
self.commit()

# upload files to zenodo deposit
create_zenodo_upload_tasks(files, recid, token,
deposit['id'],
deposit['links']['bucket'])
else:
raise FileUploadError(
'You cannot create an empty Zenodo deposit. '
'Please add some files.')
else:
webhook = data.get('webhook', False)
event_type = data.get('event_type', 'release')

if event_type == 'push' and \
api.branch is None and api.sha:
raise FileUploadError(
'You cannot create a push webhook'
' for a specific sha.')
try:
url = data['url']
except KeyError:
raise FileUploadError('Missing url parameter.')

create_webhook(record_uuid, api, event_type)
else:
download_repo.delay(
record_uuid,
f'repositories/{host}/{owner}/{repo}/{api.branch or api.sha}.tar.gz', # noqa
api.get_repo_download(),
api.auth_headers)

except GitError as e:
raise FileUploadError(str(e))
try:
host, owner, repo, branch, filepath = parse_git_url(url) # noqa
api = create_git_api(host, owner, repo, branch,
current_user.id)

if filepath:
if webhook:
raise FileUploadError(
'You cannot create a webhook on a file')

download_repo_file(
recid,
f'repositories/{host}/{owner}/{repo}/{api.branch or api.sha}/{filepath}', # noqa
*api.get_file_download(filepath),
api.auth_headers,
)
elif webhook:
if event_type == 'release':
if branch:
raise FileUploadError(
'You cannot create a release webhook'
' for a specific branch or sha.')

if event_type == 'push' and \
api.branch is None and api.sha:
raise FileUploadError(
'You cannot create a push webhook'
' for a specific sha.')

create_webhook(recid, api, event_type)
else:
download_repo.delay(
recid,
f'repositories/{host}/{owner}/{repo}/{api.branch or api.sha}.tar.gz', # noqa
api.get_repo_download(),
api.auth_headers)

except GitError as e:
raise FileUploadError(str(e))

return self

Expand Down
42 changes: 42 additions & 0 deletions cap/modules/deposit/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,18 @@ def __init__(self, description, **kwargs):
self.description = description or self.description


class AuthorizationError(RESTException):
"""Exception during authorization."""

code = 401

def __init__(self, description, **kwargs):
"""Initialize exception."""
super(AuthorizationError, self).__init__(**kwargs)

self.description = description or self.description


class DisconnectWebhookError(RESTException):
"""Exception during disconnecting webhook for analysis."""

Expand Down Expand Up @@ -124,3 +136,33 @@ def __init__(self, description, errors=None, **kwargs):

self.description = description or self.description
self.errors = [FieldError(e[0], e[1]) for e in errors.items()]


class InputValidationError(RESTValidationError):
"""Review validation error exception."""

code = 400

description = "Validation error. Try again with valid data"

def __init__(self, description, errors=None, **kwargs):
"""Initialize exception."""
super(InputValidationError, self).__init__(**kwargs)

self.description = description or self.description
self.errors = [FieldError(e[0], e[1]) for e in errors.items()]


class DataValidationError(RESTValidationError):
"""Review validation error exception."""

code = 400

description = "Validation error. Try again with valid data"

def __init__(self, description, errors=None, **kwargs):
"""Initialize exception."""
super(DataValidationError, self).__init__(**kwargs)

self.description = description or self.description
self.errors = [FieldError(e['field'], e['message']) for e in errors]
116 changes: 116 additions & 0 deletions cap/modules/deposit/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# -*- coding: utf-8 -*-
#
# This file is part of CERN Analysis Preservation Framework.
# Copyright (C) 2018 CERN.
#
# CERN Analysis Preservation Framework is free software; you can redistribute
# it and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# CERN Analysis Preservation Framework is distributed in the hope that it will
# be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with CERN Analysis Preservation Framework; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307, USA.
#
# In applying this license, CERN does not
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.
"""Zenodo Upload Tasks."""

from __future__ import absolute_import, print_function

import requests
from flask import current_app
from celery import chord, group, current_task, shared_task

from invenio_files_rest.models import FileInstance
from invenio_db import db

from cap.modules.experiments.errors import ExternalAPIException
from .utils import get_zenodo_deposit_from_record


def create_zenodo_upload_tasks(files, recid, token,
zenodo_depid, zenodo_bucket_url):
"""Create the upload tasks and get the results."""
current_app.logger.info(
f'Uploading files to Zenodo {zenodo_depid}: {files}.')

# the only way to have a task that waits for
# other tasks to finish is the `chord` structure
upload_callback = save_results_to_record.s(depid=zenodo_depid, recid=recid)
upload_tasks = group(
upload.s(filename, recid, token, zenodo_bucket_url)
for filename in files
)

chord(upload_tasks, upload_callback).delay()


@shared_task(autoretry_for=(Exception, ),
retry_kwargs={
'max_retries': 5,
'countdown': 10
})
def upload(filename, recid, token, zenodo_bucket_url):
"""Upload file to Zenodo."""
from cap.modules.deposit.api import CAPDeposit
record = CAPDeposit.get_record(recid)

file_obj = record.files[filename]
file_ins = FileInstance.get(file_obj.file_id)
task_id = current_task.request.id

with open(file_ins.uri, 'rb') as fp:
try:
resp = requests.put(
url=f'{zenodo_bucket_url}/{filename}',
data=fp,
params=dict(access_token=token),
)

current_app.logger.error(
f'{task_id}: Zenodo upload of file `{filename}`: {resp.status_code}.') # noqa

status = resp.status_code
msg = resp.json()
except (ValueError, ExternalAPIException) as err:
status = 'FAILED'
msg = str(err)

current_app.logger.error(
f'{task_id}: Something went wrong with the task:\n{msg}')
finally:
return {
'task_id': task_id,
'result': {'file': filename, 'status': status, 'message': msg}
}


@shared_task(autoretry_for=(Exception, ),
retry_kwargs={
'max_retries': 5,
'countdown': 10
})
def save_results_to_record(tasks, depid, recid):
"""Save the results of uploading to the record."""
from cap.modules.deposit.api import CAPDeposit
record = CAPDeposit.get_record(recid)

# update the tasks of the specified zenodo deposit (filename: status)
# this way we can attach multiple deposits
zenodo = get_zenodo_deposit_from_record(record, depid)
for task in tasks:
zenodo['tasks'][task['task_id']] = task['result']

record.commit()
db.session.commit()

current_app.logger.info(
f'COMPLETED: Zenodo {depid} uploads:\n{tasks}')
Loading