diff --git a/.env b/.env new file mode 100644 index 0000000..158d79f --- /dev/null +++ b/.env @@ -0,0 +1,6 @@ +# Environment variable configs for local testing +POSTGRES_URI='postgresql://cidcdev:1234@localhost:5432/cidctest' +GOOGLE_SECRETS_BUCKET='cidc-secrets-staging' +GOOGLE_CLOUD_PROJECT='cidc-dfci-staging' +GOOGLE_UPLOAD_BUCKET='cidc-uploads-staging' +GOOGLE_UPLOAD_TOPIC='uploads' diff --git a/.gitignore b/.gitignore index 548c487..2d3a7ff 100644 --- a/.gitignore +++ b/.gitignore @@ -85,7 +85,6 @@ celerybeat-schedule *.sage.py # Environments -.env .venv env/ venv/ diff --git a/.travis.yml b/.travis.yml index d192c1a..c96df24 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,6 +4,10 @@ python: cache: directories: - $HOME/google-cloud-sdk/ +services: + - postgresql +addons: + postgresql: "9.6" env: global: - PYTHONPATH=$PYTHONPATH:$(pwd)/functions @@ -34,6 +38,10 @@ before_install: - gcloud auth activate-service-account --key-file $GOOGLE_APPLICATION_CREDENTIALS install: - pip install -r requirements.txt -r requirements.dev.txt +before_script: + - psql -c "create user cidcdev with password '1234'" + - psql -c "create database cidctest" + - psql -c "grant all privileges on database cidctest to cidcdev" script: - pytest - black --check functions main.py --target-version=py36 diff --git a/functions/settings.py b/functions/settings.py index 0227622..c5f0829 100644 --- a/functions/settings.py +++ b/functions/settings.py @@ -3,7 +3,10 @@ from cidc_api.config import get_sqlalchemy_database_uri -GOOGLE_CLOUD_PROJECT = os.environ.get("GOOGLE_CLOUD_PROJECT") +# Cloud Functions provide the current GCP project id +# in the environment variable GCP_PROJECT. +# See: https://cloud.google.com/functions/docs/env-var +GOOGLE_CLOUD_PROJECT = os.environ.get("GCP_PROJECT") if not GOOGLE_CLOUD_PROJECT: from dotenv import load_dotenv diff --git a/functions/uploads.py b/functions/uploads.py index 970565a..794debd 100644 --- a/functions/uploads.py +++ b/functions/uploads.py @@ -1,7 +1,10 @@ """A pub/sub triggered functions that respond to data upload events""" import base64 -from .util import BackgroundContext, extract_pubsub_data +from flask import jsonify +from cidc_api.models import UploadJobs + +from .util import BackgroundContext, extract_pubsub_data, get_db_session def ingest_upload(event: dict, context: BackgroundContext): @@ -10,8 +13,38 @@ def ingest_upload(event: dict, context: BackgroundContext): with the upload job into the download bucket and merge the upload metadata into the appropriate clinical trial JSON. - TODO: actually implement the above functionality. Right now, the function - just logs the ID of the upload job. + TODO: actually implement the above functionality. """ - job_id = extract_pubsub_data(event) - print(f"Received upload success event for Job {job_id}") + job_id = int(extract_pubsub_data(event)) + session = get_db_session() + + job: UploadJobs = UploadJobs.find_by_id(job_id, session=session) + + print("Detected completed upload job for user %s" % job.uploader_email) + + study_id_field = "lead_organization_study_id" + if not study_id_field in job.metadata_json_patch: + # TODO: improve this error reporting... + raise Exception("Cannot find study ID in metadata. Ingestion impossible.") + + # TODO: actually merge the metadata into the clinical trial JSON + study_id = job.metadata_json_patch[study_id_field] + print( + "(DRY RUN) merging metadata from upload %d into trial %s" % (job.id, study_id) + ) + + url_mapping = {} + for upload_url in job.gcs_file_uris: + # We expected URIs in the upload bucket to have a structure like + # [trial id]/[patient id]/[sample id]/[aliquot id]/[timestamp]/[local file]. + # We strip off the /[timestamp]/[local file] suffix from the upload url, + # since we don't care when this was uploaded or where from on the uploader's + # computer. + target_url = "/".join(upload_url.split("/")[:-2]) + url_mapping[upload_url] = target_url + + print(f"(DRY RUN) copying {upload_url} to {target_url}") + + # Google won't actually do anything with this response; it's + # provided for testing purposes only. + return jsonify(url_mapping) diff --git a/functions/util.py b/functions/util.py index 37a9ebc..1c15df5 100644 --- a/functions/util.py +++ b/functions/util.py @@ -2,6 +2,24 @@ import base64 from typing import NamedTuple +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +from .settings import SQLALCHEMY_DATABASE_URI + +_session = None + + +def get_db_session(): + """Get the current SQLAlchemy session""" + global _session + + if not _session: + engine = create_engine(SQLALCHEMY_DATABASE_URI) + _session = sessionmaker(bind=engine)() + + return _session + def extract_pubsub_data(event: dict): """Pull out and decode data from a pub/sub event.""" diff --git a/requirements.txt b/requirements.txt index 3bb4643..b52dc73 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,5 @@ -# The cidc_api_models package +flask==1.1.1 +flask-sqlalchemy==2.4.0 +psycopg2-binary==2.8.3 +# The cidc_api_modules package git+https://github.com/CIMAC-CIDC/cidc-api-gae \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..140a645 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,10 @@ +import pytest + +from functions.util import get_db_session + + +# TODO: set up database migrations for this project +# so that tests can actually modify the test database instance. +@pytest.fixture +def db_session(): + return get_db_session() diff --git a/tests/functions/test_uploads.py b/tests/functions/test_uploads.py index c115794..4ead26f 100644 --- a/tests/functions/test_uploads.py +++ b/tests/functions/test_uploads.py @@ -1,9 +1,39 @@ -from tests.util import make_pubsub_event +from unittest.mock import MagicMock + +from cidc_api.models import UploadJobs + +from tests.util import make_pubsub_event, with_app_context from functions.uploads import ingest_upload -def test_ingest_upload(): - """Test stub event-processing functionality""" - job_id = "1" - successful_upload_event = make_pubsub_event(job_id) - ingest_upload(successful_upload_event, None) +@with_app_context +def test_ingest_upload(db_session, monkeypatch): + """Test upload data transfer functionality""" + + JOB_ID = 1 + URI1 = "/path/to/file1" + URI2 = "/path/to/deeper/file2" + TS_AND_PATH = "/1234/local_path1.txt" + FILE_URIS = [URI1 + TS_AND_PATH, URI2 + TS_AND_PATH] + + job = UploadJobs( + id=JOB_ID, + uploader_email="test@email.com", + gcs_file_uris=FILE_URIS, + metadata_json_patch={"lead_organization_study_id": "CIMAC-12345"}, + status="completed", + ) + + # Since the test database isn't yet set up with migrations, + # it won't have the correct relations in it, so we can't actually + # store or retrieve data from it. + find_by_id = MagicMock() + find_by_id.return_value = job + monkeypatch.setattr(UploadJobs, "find_by_id", find_by_id) + + successful_upload_event = make_pubsub_event(str(job.id)) + response = ingest_upload(successful_upload_event, None) + + assert response.json[URI1 + TS_AND_PATH] == URI1 + assert response.json[URI2 + TS_AND_PATH] == URI2 + find_by_id.assert_called_once_with(JOB_ID, session=db_session) diff --git a/tests/util.py b/tests/util.py index 9d2f638..9329eac 100644 --- a/tests/util.py +++ b/tests/util.py @@ -1,4 +1,7 @@ import base64 +from functools import wraps + +from flask import Flask def make_pubsub_event(data: str) -> dict: @@ -6,3 +9,14 @@ def make_pubsub_event(data: str) -> dict: b64data = base64.encodebytes(bytes(data, "utf-8")) return {"data": b64data} + +def with_app_context(f): + """Run `f` inside a default Flask app context""" + + @wraps(f) + def wrapped(*args, **kwargs): + app = Flask("test-app") + with app.app_context(): + return f(*args, **kwargs) + + return wrapped