Skip to content

Commit

Permalink
Merge pull request #384 from wasade/push_metadata_to_qiita
Browse files Browse the repository at this point in the history
Push metadata to qiita
  • Loading branch information
wasade authored Oct 18, 2021
2 parents 6d67f3b + 40f8e74 commit 3fd63e9
Show file tree
Hide file tree
Showing 10 changed files with 260 additions and 38 deletions.
16 changes: 3 additions & 13 deletions microsetta_private_api/admin/admin_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
from microsetta_private_api.repo.source_repo import SourceRepo
from microsetta_private_api.repo.transaction import Transaction
from microsetta_private_api.repo.admin_repo import AdminRepo
from microsetta_private_api.repo.metadata_repo import (retrieve_metadata,
drop_private_columns)
from microsetta_private_api.repo.metadata_repo import retrieve_metadata
from microsetta_private_api.tasks import send_email as celery_send_email,\
per_sample_summary as celery_per_sample_summary
from microsetta_private_api.admin.email_templates import EmailMessage
Expand All @@ -30,7 +29,7 @@
from microsetta_private_api.util.melissa import verify_address
from microsetta_private_api.util.query_builder_to_sql import build_condition
from werkzeug.exceptions import Unauthorized
from qiita_client import QiitaClient
from microsetta_private_api.qiita import qclient


def search_barcode(token_info, sample_barcode):
Expand Down Expand Up @@ -114,14 +113,11 @@ def qiita_compatible_metadata(token_info, include_private, body):
# TODO: this call constructs transactions implicitly. It would be
# better for the transaction to be established and passed in,
# similar to how other "repo" objects are managed
df, errors = retrieve_metadata(samples)
df, errors = retrieve_metadata(samples, include_private=include_private)

if errors:
return jsonify(code=404, message=str(errors)), 404

if not include_private:
df = drop_private_columns(df)

return jsonify(df.to_dict(orient='index')), 200


Expand Down Expand Up @@ -657,12 +653,6 @@ def barcode_query(body, token_info):
def qiita_barcode_query(body, token_info):
validate_admin_access(token_info)

qclient = QiitaClient(
SERVER_CONFIG["qiita_endpoint"],
SERVER_CONFIG["qiita_client_id"],
SERVER_CONFIG["qiita_client_secret"]
)

qiita_body = {
'sample_ids': ["10317." + b for b in body["barcodes"]]
}
Expand Down
4 changes: 4 additions & 0 deletions microsetta_private_api/celery_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ def __call__(self, *args, **kwargs):
"poll_daklapack_orders": {
"task": "microsetta_private_api.admin.daklapack_polling.poll_dak_orders", # noqa
"schedule": 60 * 60 * 24 # every 24 hours
},
"update_qiita_metadata": {
"task": "microsetta_private_api.tasks.update_qiita_metadata", # noqa
"schedule": 60 * 60 * 24 # every 24 hours
}
}

Expand Down
9 changes: 9 additions & 0 deletions microsetta_private_api/qiita.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from microsetta_private_api.config_manager import SERVER_CONFIG
from qiita_client import QiitaClient


qclient = QiitaClient(
SERVER_CONFIG["qiita_endpoint"],
SERVER_CONFIG["qiita_client_id"],
SERVER_CONFIG["qiita_client_secret"]
)
33 changes: 15 additions & 18 deletions microsetta_private_api/repo/admin_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from microsetta_private_api.repo.kit_repo import KitRepo
from microsetta_private_api.repo.sample_repo import SampleRepo
from microsetta_private_api.repo.source_repo import SourceRepo

from werkzeug.exceptions import NotFound
from hashlib import sha512

Expand Down Expand Up @@ -1111,23 +1110,21 @@ def search_barcode(self, sql_cond, cond_params):
# that are not returned by the select
with self._transaction.cursor() as cur:
cur.execute(
sql.SQL(
"SELECT project_barcode.barcode FROM "
"project_barcode LEFT JOIN "
"ag_kit_barcodes USING (barcode) "
"LEFT JOIN barcodes.barcode_scans "
"USING (barcode) "
"LEFT JOIN ( "
"SELECT barcode, max(scan_timestamp) "
"AS scan_timestamp_latest "
"FROM barcodes.barcode_scans "
"GROUP BY barcode "
") AS latest_scan "
"ON barcode_scans.barcode = latest_scan.barcode "
"AND barcode_scans.scan_timestamp = "
"latest_scan.scan_timestamp_latest "
"WHERE {cond}"
).format(cond=sql_cond),
sql.SQL("""SELECT project_barcode.barcode
FROM project_barcode
LEFT JOIN ag_kit_barcodes USING (barcode)
LEFT JOIN barcodes.barcode_scans USING (barcode)
LEFT JOIN (
SELECT barcode,
max(scan_timestamp)
AS scan_timestamp_latest
FROM barcodes.barcode_scans
GROUP BY barcode
) AS latest_scan
ON barcode_scans.barcode = latest_scan.barcode
AND barcode_scans.scan_timestamp =
latest_scan.scan_timestamp_latest
WHERE {cond}""").format(cond=sql_cond),
cond_params
)
return [r[0] for r in cur.fetchall()]
Expand Down
13 changes: 12 additions & 1 deletion microsetta_private_api/repo/metadata_repo/_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,15 @@ def drop_private_columns(df):
return df.drop(columns=to_drop, inplace=False)


def retrieve_metadata(sample_barcodes):
def retrieve_metadata(sample_barcodes, include_private=False):
"""Retrieve all sample metadata for the provided barcodes
Parameters
----------
sample_barcodes : Iterable
The barcodes to request
include_private : bool, optional
If true, retain private columns
Returns
-------
Expand Down Expand Up @@ -110,6 +112,9 @@ def retrieve_metadata(sample_barcodes):
else:
df = _to_pandas_dataframe(fetched, survey_templates)

if not include_private:
df = drop_private_columns(df)

return df, error_report


Expand Down Expand Up @@ -237,6 +242,8 @@ def _to_pandas_dataframe(metadatas, survey_templates):
# remap the empty string to null so it is picked up by
# fillna
df.replace("", np.nan, inplace=True)
df.replace(r'\n', ' ', regex=True, inplace=True)
df.replace(r'\r', ' ', regex=True, inplace=True)

# fill in any other nulls that may be present in the frame
# as could happen if not all individuals took all surveys.
Expand Down Expand Up @@ -320,10 +327,14 @@ def _to_pandas_series(metadata, multiselect_map):

sample_detail = metadata['sample']
collection_timestamp = sample_detail.datetime_collected
sample_type = sample_detail.site

if source_type is None:
raise RepoException("Sample is missing a source type")

if sample_type is None and source_type in ('human', 'animal'):
raise RepoException(f"{name} is missing site_sampled")

if source_type == 'human':
sample_type = sample_detail.site
sample_invariants = HUMAN_SITE_INVARIANTS[sample_type]
Expand Down
8 changes: 4 additions & 4 deletions microsetta_private_api/repo/metadata_repo/tests/test_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def setUp(self):
}),
'survey_answers': [
{'template': 1,
'response': {'1': ['DIET_TYPE', '["Vegan"]'],
'response': {'1': ['DIET_TYPE', '["Vegan\nfoo"]'],
'2': ['MULTIVITAMIN', 'Yes'],
'3': ['PROBIOTIC_FREQUENCY', 'Unspecified'],
'4': ['VITAMIN_B_SUPPLEMENT_FREQUENCY',
Expand Down Expand Up @@ -220,9 +220,9 @@ def test_to_pandas_dataframe(self):
'true', 'true', 'false', 'false',
UNSPECIFIED,
'okay', 'No', "2013-10-15T09:30:00", '000004216'],
['XY0004216', 'bar', 'Vegan', 'Yes', 'Unspecified',
'Unspecified', 'Unspecified', 'No',
'false', 'true', 'true', 'false', 'foobar',
['XY0004216', 'bar', 'Vegan foo', 'Yes',
'Unspecified', 'Unspecified', 'Unspecified',
'No', 'false', 'true', 'true', 'false', 'foobar',
UNSPECIFIED,
UNSPECIFIED,
"2013-10-15T09:30:00", 'XY0004216']],
Expand Down
111 changes: 111 additions & 0 deletions microsetta_private_api/repo/qiita_repo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from microsetta_private_api.repo.base_repo import BaseRepo
from microsetta_private_api.qiita import qclient
from microsetta_private_api.repo.metadata_repo import retrieve_metadata
from microsetta_private_api.repo.metadata_repo._constants import MISSING_VALUE


class QiitaRepo(BaseRepo):
def push_metadata_to_qiita(self, barcodes=None):
"""Attempt to format and push metadata for the set of barcodes
Only barcodes not currently represented in Qiita will be pushed.
Parameters
----------
barcodes : Iterable or None
The list of barcodes to attempt to push. If None, all
"sample-is-valid", as based on their latest scan, will be
used.
Notes
-----
We are NOT capturing exceptions from the QiitaClient. These errors
should all be pathological.
Raises
------
KeyError
If metadata categories from Microsetta are observed to NOT
exist in Qiita.
Returns
-------
int
The number of successfully pushed samples to Qiita
list
Any error detail when constructing metadata
"""
if barcodes is None:
with self._transaction.cursor() as cur:
# obtain all barcodes, which are part of the AG table,
# which report as their latest scan being valid

# staging has site_sampled with "Please select..."
# and some examples of null source IDs. This is weird, so
# ignore for now.
cur.execute("""SELECT ag_kit_barcodes.barcode
FROM ag.ag_kit_barcodes
INNER JOIN barcodes.barcode_scans USING(barcode)
INNER JOIN (
SELECT barcode,
max(scan_timestamp)
AS scan_timestamp_latest
FROM barcodes.barcode_scans
GROUP BY barcode
) AS latest_scan
ON barcode_scans.barcode = latest_scan.barcode
AND barcode_scans.scan_timestamp =
latest_scan.scan_timestamp_latest
WHERE sample_status='sample-is-valid'
AND site_sampled IS NOT NULL
AND site_sampled != 'Please select...'
AND source_id IS NOT NULL""")

barcodes = {r[0] for r in cur.fetchall()}
else:
barcodes = set(barcodes)

# determine what samples are already known in qiita
samples_in_qiita = set(qclient.get('/api/v1/study/10317/samples'))

# throw away the 10317. study prefix
samples_in_qiita = {i.split('.', 1)[1] for i in samples_in_qiita}

# gather the categories currently used in qiita. we have to have parity
# with the categories when pushing
cats_in_qiita = qclient.get('/api/v1/study/10317/samples/info')
cats_in_qiita = set(cats_in_qiita['categories'])

# we will only push samples that are not already present.
# in testing on stating with qiita-rc, it was observed that
# large request bodies failed, so we will artificially limit to
# 1000 samples max per request. We can always use multiple
# calls to this function if and as needed.
to_push = list(barcodes - samples_in_qiita)[:1000]

# short circuit if we do not have anything to push
if len(to_push) == 0:
return 0, []

formatted, error = retrieve_metadata(to_push)
if len(formatted) == 0:
return 0, error

columns = set(formatted.columns)

# the qiita endpoint will not allow for adding new categories
# and we can determine this before we poke qiita.
# TODO: allow adding new columns to Qiita
if not cats_in_qiita.issuperset(columns):
formatted = formatted[cats_in_qiita & columns]

# if there are any categories not represented, remark them as
# missing in the metadata
for c in cats_in_qiita - columns:
formatted[c] = MISSING_VALUE

for_qiita = formatted.to_json(orient='index')
qclient.http_patch('/api/v1/study/10317/samples', data=for_qiita)
n_pushed = len(formatted)

return n_pushed, error
74 changes: 74 additions & 0 deletions microsetta_private_api/repo/tests/test_qiita.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from unittest import TestCase, main
from unittest.mock import patch
from microsetta_private_api.repo.transaction import Transaction
from microsetta_private_api.repo.qiita_repo import QiitaRepo


class FakeColumns:
def __init__(self, columns):
self._columns = columns

def __getitem__(self, thing):
pass

def __iter__(self):
return iter(self._columns)


class FakeFrame:
def __init__(self, columns):
self.columns = FakeColumns(columns)

def to_json(self, *args, **kwargs):
return "[]"

def __len__(self):
return 1


class AdminTests(TestCase):
@patch('microsetta_private_api.qiita.qclient.get')
@patch('microsetta_private_api.qiita.qclient.http_patch')
@patch('microsetta_private_api.repo.qiita_repo.retrieve_metadata')
def test_push_metadata_to_qiita(self, test_retrieve_metadata,
test_http_patch, test_get):
# fake codes
fecal_valid_barcode = '0x0004801'
oral_valid_barcode = '0x0015213'
skin_valid_barcode = '0x0027751'

blank = 'foobarblank'
test_barcodes = [fecal_valid_barcode,
oral_valid_barcode,
skin_valid_barcode]

failure = [{skin_valid_barcode: ("This barcode is not "
"associated with any surveys "
"matching this template id")}, ]
# one inserts, one fails
# using side_effect to change returns
# https://stackoverflow.com/a/24897297
test_get.side_effect = [
['foo.' + blank,
'foo.' + oral_valid_barcode, ], # first .get for samples
{'categories': ['a', 'b', 'c', 'd']}, # second .get for categories
]
test_http_patch.return_value = []
test_retrieve_metadata.return_value = (
FakeFrame(['a', 'b', 'c', 'd']),
failure
)

with Transaction() as t:
qiita_repo = QiitaRepo(t)
success, failed = qiita_repo.push_metadata_to_qiita(test_barcodes)

self.assertEqual(success, 1)
self.assertEqual(failed, [
{skin_valid_barcode: ("This barcode is not "
"associated with any surveys "
"matching this template id")}])


if __name__ == '__main__':
main()
Loading

0 comments on commit 3fd63e9

Please sign in to comment.