Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish to Pub/Sub on upload job success #23

Merged
merged 1 commit into from
Jul 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@ POSTGRES_URI='postgresql://cidcdev:1234@localhost:5432/cidc'
# CLOUD_SQL_DB_USER='postgres'
# CLOUD_SQL_DB_NAME='cidc'

GOOGLE_PROJECT_ID='cidc-dfci-staging'
SECRETS_BUCKET_NAME='cidc-secrets-staging'
GOOGLE_UPLOAD_BUCKET='cidc-uploads-staging'
GOOGLE_UPLOAD_TOPIC='uploads'
2 changes: 2 additions & 0 deletions app.prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ env_variables:
CLOUD_SQL_INSTANCE_NAME: 'cidc-dfci:us-central1:cidc-postgres'
CLOUD_SQL_DB_USER: 'postgres'
CLOUD_SQL_DB_NAME: 'cidc'
GOOGLE_PROJECT_ID: 'cidc-dfci'
GOOGLE_UPLOAD_BUCKET: 'cidc-uploads-prod'
GOOGLE_UPLOAD_TOPIC: 'uploads'
2 changes: 2 additions & 0 deletions app.staging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ env_variables:
CLOUD_SQL_INSTANCE_NAME: 'cidc-dfci-staging:us-central1:cidc-postgres'
CLOUD_SQL_DB_USER: 'postgres'
CLOUD_SQL_DB_NAME: 'cidc'
GOOGLE_PROJECT_ID: 'cidc-dfci-staging'
GOOGLE_UPLOAD_BUCKET: 'cidc-uploads-staging'
GOOGLE_UPLOAD_TOPIC: 'uploads'
84 changes: 84 additions & 0 deletions cidc-api/gcloud_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""Utilities for interacting with the Google Cloud Platform APIs."""

import datetime

from google.cloud import storage
from google.cloud import pubsub

from settings import (
GOOGLE_UPLOAD_ROLE,
GOOGLE_UPLOAD_BUCKET,
GOOGLE_UPLOAD_TOPIC,
GOOGLE_PROJECT_ID,
)


def _get_bucket(bucket_name: str) -> storage.Bucket:
"""Get the bucket with name `bucket_name` from GCS."""
client = storage.Client()
bucket = client.get_bucket(bucket_name)
return bucket


def _iam_id(user_email: str) -> str:
"""Append the appropriate IAM account type to a user's email"""
return f"user:{user_email}"


def grant_upload_access(bucket_name: str, user_email: str):
"""
Grant a user upload access to the given bucket. Upload access
means a user can write objects to the bucket but cannot delete,
overwrite, or read objects from this bucket.
"""
bucket = _get_bucket(bucket_name)

# Update the bucket IAM policy to include the user as an uploader.
policy = bucket.get_iam_policy()
policy[GOOGLE_UPLOAD_ROLE].add(_iam_id(user_email))
bucket.set_iam_policy(policy)


def revoke_upload_access(bucket_name: str, user_email: str):
"""
Revoke a user's upload access for the given bucket.
"""
bucket = _get_bucket(bucket_name)

# Update the bucket IAM policy to remove the user's uploader privileges.
policy = bucket.get_iam_policy()
policy[GOOGLE_UPLOAD_ROLE].discard(_iam_id(user_email))
bucket.set_iam_policy(policy)


def get_signed_url(object_name: str, method: str = "PUT", expiry_mins: int = 5) -> str:
"""
Generate a signed URL for `object_name` to give a client temporary access.

See: https://cloud.google.com/storage/docs/access-control/signing-urls-with-helpers
"""
storage_client = storage.Client()
bucket = storage_client.get_bucket(GOOGLE_UPLOAD_BUCKET)
blob = bucket.blob(object_name)

# Generate the signed URL, allowing a client to use `method` for `expiry_mins` minutes
expiration = datetime.timedelta(minutes=expiry_mins)
url = blob.generate_signed_url(version="v4", expiration=expiration, method=method)

return url


def publish_upload_success(job_id: int):
"""Publish to the uploads topic that the upload job with the provided `job_id` succeeded."""
publisher = pubsub.PublisherClient()
topic = publisher.topic_path(GOOGLE_PROJECT_ID, GOOGLE_UPLOAD_TOPIC)

# The Pub/Sub publisher client returns a concurrent.futures.Future
# containing info about whether the publishing was successful.
data = bytes(str(job_id), "utf-8")
report = publisher.publish(topic, data=data)

# For now, just wait till we get a response back from Pub/Sub.
# If there was an error, the below call will throw an exception.
# TODO: evaluate if it's worth trying to leverage asynchrony here.
report.result()
42 changes: 0 additions & 42 deletions cidc-api/gcs_iam.py

This file was deleted.

38 changes: 13 additions & 25 deletions cidc-api/services/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@

from werkzeug.exceptions import BadRequest, InternalServerError, NotImplemented

from google.cloud import storage
from eve import Eve
from eve.auth import requires_auth
from flask import Blueprint, request, Request, Response, jsonify, _request_ctx_stack
from cidc_schemas import constants, validate_xlsx, prism

import gcs_iam
import gcloud_client
from models import UploadJobs, STATUSES
from settings import GOOGLE_UPLOAD_BUCKET, HINT_TO_SCHEMA, SCHEMA_TO_HINT

Expand Down Expand Up @@ -158,7 +157,7 @@ def upload():
job = UploadJobs.create(user_email, gcs_uris, metadata_json)

# Grant the user upload access to the upload bucket
gcs_iam.grant_upload_access(GOOGLE_UPLOAD_BUCKET, user_email)
gcloud_client.grant_upload_access(GOOGLE_UPLOAD_BUCKET, user_email)

response = {
"job_id": job.id,
Expand All @@ -171,17 +170,23 @@ def upload():

def on_post_PATCH_upload_jobs(request: Request, payload: Response):
"""Revoke the user's write access to the objects they've uploaded to."""
if not payload.json and not "id" in payload.json:
if not payload.json or not "id" in payload.json:
raise BadRequest("Unexpected payload while updating upload_jobs")

# TODO: handle the case where the user has more than one upload running,
# in which case we shouldn't revoke the user's write access until they
# have no remaining jobs with status "started". This will require
# adding a "created_by" field or similar to the upload_jobs object.
# have no remaining jobs with status "started".

job_id = payload.json["id"]
status = request.json["status"]

# If this is a successful upload job, publish this info to Pub/Sub
if status == "completed":
gcloud_client.publish_upload_success(job_id)

# Revoke the user's write access
user_email = _request_ctx_stack.top.current_user.email
gcs_iam.revoke_upload_access(GOOGLE_UPLOAD_BUCKET, user_email)
gcloud_client.revoke_upload_access(GOOGLE_UPLOAD_BUCKET, user_email)


@ingestion_api.route("/signed-upload-urls", methods=["POST"])
Expand Down Expand Up @@ -230,24 +235,7 @@ def signed_upload_urls():
for object_name in request.json["object_names"]:
# Prepend objects with the given directory name
full_object_name = f"{directory_name}/{object_name}"
object_url = get_signed_url(full_object_name)
object_url = gcloud_client.get_signed_url(full_object_name)
object_urls[object_name] = object_url

return jsonify(object_urls)


def get_signed_url(object_name: str, method: str = "PUT", expiry_mins: int = 5) -> str:
"""
Generate a signed URL for `object_name` to give a client temporary access.

See: https://cloud.google.com/storage/docs/access-control/signing-urls-with-helpers
"""
storage_client = storage.Client()
bucket = storage_client.get_bucket(GOOGLE_UPLOAD_BUCKET)
blob = bucket.blob(object_name)

# Generate the signed URL, allowing a client to use `method` for `expiry_mins` minutes
expiration = datetime.timedelta(minutes=expiry_mins)
url = blob.generate_signed_url(version="v4", expiration=expiration, method=method)

return url
7 changes: 4 additions & 3 deletions cidc-api/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ def get_secrets_manager(is_testing):
ALGORITHMS = ["RS256"]
## End Auth0 config

## Configure GCS
## Configure GCP
GOOGLE_PROJECT_ID = environ.get("GOOGLE_PROJECT_ID")
GOOGLE_UPLOAD_BUCKET = environ.get("GOOGLE_UPLOAD_BUCKET")
GOOGLE_UPLOAD_TOPIC = environ.get("GOOGLE_UPLOAD_TOPIC")
GOOGLE_UPLOAD_ROLE = "roles/storage.objectCreator"
# TODO: additional buckets for pipeline data etc.?
## End GCS config
## End GCP config

## Configure database
POSTGRES_URI = environ.get("POSTGRES_URI")
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ python-dotenv==0.10.3
requests==2.22.0
six==1.12.0
google-cloud-storage==1.16.1
google-cloud-pubsub==0.42.1
python-jose==3.0.1
psycopg2-binary==2.8.3
werkzeug==0.15.4
Expand Down
29 changes: 22 additions & 7 deletions tests/services/test_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def test_upload(app_no_auth, wes_xlsx, test_user, monkeypatch):
client = app_no_auth.test_client()

grant_write = MagicMock()
monkeypatch.setattr("gcs_iam.grant_upload_access", grant_write)
monkeypatch.setattr("gcloud_client.grant_upload_access", grant_write)

res = client.post(UPLOAD, data=form_data("wes.xlsx", wes_xlsx, "wes"))
assert res.json
Expand All @@ -124,22 +124,37 @@ def test_upload(app_no_auth, wes_xlsx, test_user, monkeypatch):
assert gcs_object_name.endswith(local_path)

# Check that we tried to grant IAM upload access to gcs_object_name
iam_args = (GOOGLE_UPLOAD_BUCKET, test_user.email)
grant_write.assert_called_with(*iam_args)
grant_write.assert_called_with(GOOGLE_UPLOAD_BUCKET, test_user.email)

# Check that we tried to revoke IAM upload access after updating the
# Track whether we revoke IAM upload access after updating the job status
revoke_write = MagicMock()
monkeypatch.setattr("gcs_iam.revoke_upload_access", revoke_write)
monkeypatch.setattr("gcloud_client.revoke_upload_access", revoke_write)

# Track whether we publish an upload success to pub/sub after updating the job status
publish_success = MagicMock()
monkeypatch.setattr("gcloud_client.publish_upload_success", publish_success)

job_id = res.json["job_id"]
update_url = f"/upload_jobs/{job_id}"

# Report an upload failure
res = client.patch(
update_url,
json={"status": "completed"},
json={"status": "errored"},
headers={"If-Match": res.json["job_etag"]},
)
assert res.status_code == 200
revoke_write.assert_called_with(*iam_args)
revoke_write.assert_called_with(GOOGLE_UPLOAD_BUCKET, test_user.email)
# This was an upload failure, so success shouldn't have been published
publish_success.assert_not_called()

# Report an upload success
res = client.patch(
update_url,
json={"status": "completed"},
headers={"If-Match": res.json["_etag"]},
)
publish_success.assert_called_with(job_id)


def test_signed_upload_urls(app_no_auth, monkeypatch):
Expand Down
6 changes: 3 additions & 3 deletions tests/test_gcs_iam.py → tests/test_gcloud_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from gcs_iam import grant_upload_access, revoke_upload_access, _iam_id
from gcloud_client import grant_upload_access, revoke_upload_access, _iam_id
from settings import GOOGLE_UPLOAD_ROLE

EMAIL = "[email protected]"
Expand All @@ -17,7 +17,7 @@ def get_iam_policy(self):
def set_iam_policy(self, policy):
assert _iam_id(EMAIL) in policy[GOOGLE_UPLOAD_ROLE]

monkeypatch.setattr("gcs_iam._get_bucket", GrantBlob)
monkeypatch.setattr("gcloud_client._get_bucket", GrantBlob)
grant_upload_access("foo", EMAIL)


Expand All @@ -29,5 +29,5 @@ def get_iam_policy(self):
def set_iam_policy(self, policy):
assert _iam_id(EMAIL) not in policy[GOOGLE_UPLOAD_ROLE]

monkeypatch.setattr("gcs_iam._get_bucket", RevokeBlob)
monkeypatch.setattr("gcloud_client._get_bucket", RevokeBlob)
revoke_upload_access("foo", EMAIL)