From 85b4d0a81507ef224ca4e907713d64b611a3ea5b Mon Sep 17 00:00:00 2001 From: Jacob Lurye Date: Wed, 24 Jul 2019 12:59:47 -0400 Subject: [PATCH] Publish to Pub/Sub on upload job success --- .env | 2 + app.prod.yaml | 2 + app.staging.yaml | 2 + cidc-api/gcloud_client.py | 84 +++++++++++++++++++ cidc-api/gcs_iam.py | 42 ---------- cidc-api/services/ingestion.py | 38 +++------ cidc-api/settings.py | 7 +- requirements.txt | 1 + tests/services/test_ingestion.py | 29 +++++-- ...{test_gcs_iam.py => test_gcloud_client.py} | 6 +- 10 files changed, 133 insertions(+), 80 deletions(-) create mode 100644 cidc-api/gcloud_client.py delete mode 100644 cidc-api/gcs_iam.py rename tests/{test_gcs_iam.py => test_gcloud_client.py} (78%) diff --git a/.env b/.env index 1ac099bb..e9da00c3 100644 --- a/.env +++ b/.env @@ -18,5 +18,7 @@ POSTGRES_URI='postgresql://cidcdev:1234@localhost:5432/cidc' # CLOUD_SQL_DB_USER='postgres' # CLOUD_SQL_DB_NAME='cidc' +GOOGLE_PROJECT_ID='cidc-dfci-staging' SECRETS_BUCKET_NAME='cidc-secrets-staging' GOOGLE_UPLOAD_BUCKET='cidc-uploads-staging' +GOOGLE_UPLOAD_TOPIC='uploads' diff --git a/app.prod.yaml b/app.prod.yaml index 2cc748e1..a25cf081 100644 --- a/app.prod.yaml +++ b/app.prod.yaml @@ -11,4 +11,6 @@ env_variables: CLOUD_SQL_INSTANCE_NAME: 'cidc-dfci:us-central1:cidc-postgres' CLOUD_SQL_DB_USER: 'postgres' CLOUD_SQL_DB_NAME: 'cidc' + GOOGLE_PROJECT_ID: 'cidc-dfci' GOOGLE_UPLOAD_BUCKET: 'cidc-uploads-prod' + GOOGLE_UPLOAD_TOPIC: 'uploads' diff --git a/app.staging.yaml b/app.staging.yaml index 39b429d2..328ee67b 100644 --- a/app.staging.yaml +++ b/app.staging.yaml @@ -11,4 +11,6 @@ env_variables: CLOUD_SQL_INSTANCE_NAME: 'cidc-dfci-staging:us-central1:cidc-postgres' CLOUD_SQL_DB_USER: 'postgres' CLOUD_SQL_DB_NAME: 'cidc' + GOOGLE_PROJECT_ID: 'cidc-dfci-staging' GOOGLE_UPLOAD_BUCKET: 'cidc-uploads-staging' + GOOGLE_UPLOAD_TOPIC: 'uploads' diff --git a/cidc-api/gcloud_client.py b/cidc-api/gcloud_client.py new file mode 100644 index 00000000..0a888c4b --- /dev/null +++ b/cidc-api/gcloud_client.py @@ -0,0 +1,84 @@ +"""Utilities for interacting with the Google Cloud Platform APIs.""" + +import datetime + +from google.cloud import storage +from google.cloud import pubsub + +from settings import ( + GOOGLE_UPLOAD_ROLE, + GOOGLE_UPLOAD_BUCKET, + GOOGLE_UPLOAD_TOPIC, + GOOGLE_PROJECT_ID, +) + + +def _get_bucket(bucket_name: str) -> storage.Bucket: + """Get the bucket with name `bucket_name` from GCS.""" + client = storage.Client() + bucket = client.get_bucket(bucket_name) + return bucket + + +def _iam_id(user_email: str) -> str: + """Append the appropriate IAM account type to a user's email""" + return f"user:{user_email}" + + +def grant_upload_access(bucket_name: str, user_email: str): + """ + Grant a user upload access to the given bucket. Upload access + means a user can write objects to the bucket but cannot delete, + overwrite, or read objects from this bucket. + """ + bucket = _get_bucket(bucket_name) + + # Update the bucket IAM policy to include the user as an uploader. + policy = bucket.get_iam_policy() + policy[GOOGLE_UPLOAD_ROLE].add(_iam_id(user_email)) + bucket.set_iam_policy(policy) + + +def revoke_upload_access(bucket_name: str, user_email: str): + """ + Revoke a user's upload access for the given bucket. + """ + bucket = _get_bucket(bucket_name) + + # Update the bucket IAM policy to remove the user's uploader privileges. + policy = bucket.get_iam_policy() + policy[GOOGLE_UPLOAD_ROLE].discard(_iam_id(user_email)) + bucket.set_iam_policy(policy) + + +def get_signed_url(object_name: str, method: str = "PUT", expiry_mins: int = 5) -> str: + """ + Generate a signed URL for `object_name` to give a client temporary access. + + See: https://cloud.google.com/storage/docs/access-control/signing-urls-with-helpers + """ + storage_client = storage.Client() + bucket = storage_client.get_bucket(GOOGLE_UPLOAD_BUCKET) + blob = bucket.blob(object_name) + + # Generate the signed URL, allowing a client to use `method` for `expiry_mins` minutes + expiration = datetime.timedelta(minutes=expiry_mins) + url = blob.generate_signed_url(version="v4", expiration=expiration, method=method) + + return url + + +def publish_upload_success(job_id: int): + """Publish to the uploads topic that the upload job with the provided `job_id` succeeded.""" + publisher = pubsub.PublisherClient() + topic = publisher.topic_path(GOOGLE_PROJECT_ID, GOOGLE_UPLOAD_TOPIC) + + # The Pub/Sub publisher client returns a concurrent.futures.Future + # containing info about whether the publishing was successful. + data = bytes(str(job_id), "utf-8") + report = publisher.publish(topic, data=data) + + # For now, just wait till we get a response back from Pub/Sub. + # If there was an error, the below call will throw an exception. + # TODO: evaluate if it's worth trying to leverage asynchrony here. + report.result() diff --git a/cidc-api/gcs_iam.py b/cidc-api/gcs_iam.py deleted file mode 100644 index 5d40c7a7..00000000 --- a/cidc-api/gcs_iam.py +++ /dev/null @@ -1,42 +0,0 @@ -"""Utilities for managing access to GCS objects.""" - -from google.cloud import storage -from settings import GOOGLE_UPLOAD_ROLE - - -def _get_bucket(bucket_name: str) -> storage.Bucket: - """Get the bucket with name `bucket_name` from GCS.""" - client = storage.Client() - bucket = client.get_bucket(bucket_name) - return bucket - - -def _iam_id(user_email: str) -> str: - """Append the appropriate IAM account type to a user's email""" - return f"user:{user_email}" - - -def grant_upload_access(bucket_name: str, user_email: str): - """ - Grant a user upload access to the given bucket. Upload access - means a user can write objects to the bucket but cannot delete, - overwrite, or read objects from this bucket. - """ - bucket = _get_bucket(bucket_name) - - # Update the bucket IAM policy to include the user as an uploader. - policy = bucket.get_iam_policy() - policy[GOOGLE_UPLOAD_ROLE].add(_iam_id(user_email)) - bucket.set_iam_policy(policy) - - -def revoke_upload_access(bucket_name: str, user_email: str): - """ - Revoke a user's upload access for the given bucket. - """ - bucket = _get_bucket(bucket_name) - - # Update the bucket IAM policy to remove the user's uploader privileges. - policy = bucket.get_iam_policy() - policy[GOOGLE_UPLOAD_ROLE].discard(_iam_id(user_email)) - bucket.set_iam_policy(policy) diff --git a/cidc-api/services/ingestion.py b/cidc-api/services/ingestion.py index f9ba481b..c2143a39 100644 --- a/cidc-api/services/ingestion.py +++ b/cidc-api/services/ingestion.py @@ -8,13 +8,12 @@ from werkzeug.exceptions import BadRequest, InternalServerError, NotImplemented -from google.cloud import storage from eve import Eve from eve.auth import requires_auth from flask import Blueprint, request, Request, Response, jsonify, _request_ctx_stack from cidc_schemas import constants, validate_xlsx, prism -import gcs_iam +import gcloud_client from models import UploadJobs, STATUSES from settings import GOOGLE_UPLOAD_BUCKET, HINT_TO_SCHEMA, SCHEMA_TO_HINT @@ -158,7 +157,7 @@ def upload(): job = UploadJobs.create(user_email, gcs_uris, metadata_json) # Grant the user upload access to the upload bucket - gcs_iam.grant_upload_access(GOOGLE_UPLOAD_BUCKET, user_email) + gcloud_client.grant_upload_access(GOOGLE_UPLOAD_BUCKET, user_email) response = { "job_id": job.id, @@ -171,17 +170,23 @@ def upload(): def on_post_PATCH_upload_jobs(request: Request, payload: Response): """Revoke the user's write access to the objects they've uploaded to.""" - if not payload.json and not "id" in payload.json: + if not payload.json or not "id" in payload.json: raise BadRequest("Unexpected payload while updating upload_jobs") # TODO: handle the case where the user has more than one upload running, # in which case we shouldn't revoke the user's write access until they - # have no remaining jobs with status "started". This will require - # adding a "created_by" field or similar to the upload_jobs object. + # have no remaining jobs with status "started". + + job_id = payload.json["id"] + status = request.json["status"] + + # If this is a successful upload job, publish this info to Pub/Sub + if status == "completed": + gcloud_client.publish_upload_success(job_id) # Revoke the user's write access user_email = _request_ctx_stack.top.current_user.email - gcs_iam.revoke_upload_access(GOOGLE_UPLOAD_BUCKET, user_email) + gcloud_client.revoke_upload_access(GOOGLE_UPLOAD_BUCKET, user_email) @ingestion_api.route("/signed-upload-urls", methods=["POST"]) @@ -230,24 +235,7 @@ def signed_upload_urls(): for object_name in request.json["object_names"]: # Prepend objects with the given directory name full_object_name = f"{directory_name}/{object_name}" - object_url = get_signed_url(full_object_name) + object_url = gcloud_client.get_signed_url(full_object_name) object_urls[object_name] = object_url return jsonify(object_urls) - - -def get_signed_url(object_name: str, method: str = "PUT", expiry_mins: int = 5) -> str: - """ - Generate a signed URL for `object_name` to give a client temporary access. - - See: https://cloud.google.com/storage/docs/access-control/signing-urls-with-helpers - """ - storage_client = storage.Client() - bucket = storage_client.get_bucket(GOOGLE_UPLOAD_BUCKET) - blob = bucket.blob(object_name) - - # Generate the signed URL, allowing a client to use `method` for `expiry_mins` minutes - expiration = datetime.timedelta(minutes=expiry_mins) - url = blob.generate_signed_url(version="v4", expiration=expiration, method=method) - - return url diff --git a/cidc-api/settings.py b/cidc-api/settings.py index e37ca8fa..d7258ab5 100644 --- a/cidc-api/settings.py +++ b/cidc-api/settings.py @@ -37,11 +37,12 @@ def get_secrets_manager(is_testing): ALGORITHMS = ["RS256"] ## End Auth0 config -## Configure GCS +## Configure GCP +GOOGLE_PROJECT_ID = environ.get("GOOGLE_PROJECT_ID") GOOGLE_UPLOAD_BUCKET = environ.get("GOOGLE_UPLOAD_BUCKET") +GOOGLE_UPLOAD_TOPIC = environ.get("GOOGLE_UPLOAD_TOPIC") GOOGLE_UPLOAD_ROLE = "roles/storage.objectCreator" -# TODO: additional buckets for pipeline data etc.? -## End GCS config +## End GCP config ## Configure database POSTGRES_URI = environ.get("POSTGRES_URI") diff --git a/requirements.txt b/requirements.txt index ea54d01a..f5aa7659 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ python-dotenv==0.10.3 requests==2.22.0 six==1.12.0 google-cloud-storage==1.16.1 +google-cloud-pubsub==0.42.1 python-jose==3.0.1 psycopg2-binary==2.8.3 werkzeug==0.15.4 diff --git a/tests/services/test_ingestion.py b/tests/services/test_ingestion.py index a54e6e59..2147737b 100644 --- a/tests/services/test_ingestion.py +++ b/tests/services/test_ingestion.py @@ -106,7 +106,7 @@ def test_upload(app_no_auth, wes_xlsx, test_user, monkeypatch): client = app_no_auth.test_client() grant_write = MagicMock() - monkeypatch.setattr("gcs_iam.grant_upload_access", grant_write) + monkeypatch.setattr("gcloud_client.grant_upload_access", grant_write) res = client.post(UPLOAD, data=form_data("wes.xlsx", wes_xlsx, "wes")) assert res.json @@ -124,22 +124,37 @@ def test_upload(app_no_auth, wes_xlsx, test_user, monkeypatch): assert gcs_object_name.endswith(local_path) # Check that we tried to grant IAM upload access to gcs_object_name - iam_args = (GOOGLE_UPLOAD_BUCKET, test_user.email) - grant_write.assert_called_with(*iam_args) + grant_write.assert_called_with(GOOGLE_UPLOAD_BUCKET, test_user.email) - # Check that we tried to revoke IAM upload access after updating the + # Track whether we revoke IAM upload access after updating the job status revoke_write = MagicMock() - monkeypatch.setattr("gcs_iam.revoke_upload_access", revoke_write) + monkeypatch.setattr("gcloud_client.revoke_upload_access", revoke_write) + + # Track whether we publish an upload success to pub/sub after updating the job status + publish_success = MagicMock() + monkeypatch.setattr("gcloud_client.publish_upload_success", publish_success) job_id = res.json["job_id"] update_url = f"/upload_jobs/{job_id}" + + # Report an upload failure res = client.patch( update_url, - json={"status": "completed"}, + json={"status": "errored"}, headers={"If-Match": res.json["job_etag"]}, ) assert res.status_code == 200 - revoke_write.assert_called_with(*iam_args) + revoke_write.assert_called_with(GOOGLE_UPLOAD_BUCKET, test_user.email) + # This was an upload failure, so success shouldn't have been published + publish_success.assert_not_called() + + # Report an upload success + res = client.patch( + update_url, + json={"status": "completed"}, + headers={"If-Match": res.json["_etag"]}, + ) + publish_success.assert_called_with(job_id) def test_signed_upload_urls(app_no_auth, monkeypatch): diff --git a/tests/test_gcs_iam.py b/tests/test_gcloud_client.py similarity index 78% rename from tests/test_gcs_iam.py rename to tests/test_gcloud_client.py index ab6866bb..e6a7a672 100644 --- a/tests/test_gcs_iam.py +++ b/tests/test_gcloud_client.py @@ -1,4 +1,4 @@ -from gcs_iam import grant_upload_access, revoke_upload_access, _iam_id +from gcloud_client import grant_upload_access, revoke_upload_access, _iam_id from settings import GOOGLE_UPLOAD_ROLE EMAIL = "test@email.com" @@ -17,7 +17,7 @@ def get_iam_policy(self): def set_iam_policy(self, policy): assert _iam_id(EMAIL) in policy[GOOGLE_UPLOAD_ROLE] - monkeypatch.setattr("gcs_iam._get_bucket", GrantBlob) + monkeypatch.setattr("gcloud_client._get_bucket", GrantBlob) grant_upload_access("foo", EMAIL) @@ -29,5 +29,5 @@ def get_iam_policy(self): def set_iam_policy(self, policy): assert _iam_id(EMAIL) not in policy[GOOGLE_UPLOAD_ROLE] - monkeypatch.setattr("gcs_iam._get_bucket", RevokeBlob) + monkeypatch.setattr("gcloud_client._get_bucket", RevokeBlob) revoke_upload_access("foo", EMAIL)