Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery email exports example #474

Merged
merged 32 commits into from
Dec 7, 2020

Conversation

nehanene15
Copy link
Contributor

This PR contains an example of automated scheduling of BigQuery exports to email(s). Main features include:

  • Source code for a Cloud Function using BigQuery and Storage APIs to execute and export the query
  • Script to deploy pipeline including creation of service accounts, Cloud Scheduler, Pub/Sub, and Cloud Functions code.

@pull-request-size pull-request-size bot added the size/L Denotes a PR that changes 100-499 lines. label May 7, 2020
@googlebot
Copy link

All (the pull request submitter and all commit authors) CLAs are signed, but one or more commits were authored or co-authored by someone other than the pull request submitter.

We need to confirm that all authors are ok with their commits being contributed to this project. Please have them confirm that by leaving a comment that contains only @googlebot I consent. in this pull request.

Note to project maintainer: There may be cases where the author cannot leave a comment, or the comment is not properly detected as consent. In those cases, you can manually confirm consent of the commit author(s), and set the cla label to yes (if enabled on your project).

ℹ️ Googlers: Go here for more info.

@googlebot googlebot added the cla: no All committers have NOT signed a CLA label May 7, 2020
@ishitashah24
Copy link
Contributor

@googlebot I consent

@bharathkkb
Copy link
Member

@googlebot i consent

@googlebot
Copy link

We found a Contributor License Agreement for you (the sender of this pull request), but were unable to find agreements for all the commit author(s) or Co-authors. If you authored these, maybe you used a different email address in the git commits than was used to sign the CLA (login here to double check)? If these were authored by someone else, then they will need to sign a CLA as well, and confirm that they're okay with these being contributed to Google.
In order to pass this check, please resolve this problem and then comment @googlebot I fixed it.. If the bot doesn't comment, it means it doesn't think anything has changed.

ℹ️ Googlers: Go here for more info.

@ishitashah24
Copy link
Contributor

@googlebot

@ishitashah24
Copy link
Contributor

@googlebot I fixed it.

@googlebot
Copy link

CLAs look good, thanks!

ℹ️ Googlers: Go here for more info.

@googlebot googlebot added cla: yes All committers have signed a CLA and removed cla: no All committers have NOT signed a CLA labels May 7, 2020
export_compression = "NONE"
export_destination_fmt = "NEWLINE_DELIMETED_JSON"
export_use_avro = False
export_field_delimeter = ","
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

env var.

compression=export_compression,
destination_format=export_destination_fmt,
field_delimeter=export_field_delimeter,
use_avro_logical_types=export_use_avro)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a leaky abstraction.
you use avro logical types is more specific than export use avro.
In fact exporting using avro format would be controlled with export_destination_fmt.


# Set variables
signed_url_expiration_hrs = 24
from_email = "[email protected]"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

env var

# Set variables
signed_url_expiration_hrs = 24
from_email = "[email protected]"
to_email = "[email protected]"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

env var

default = 1
}

variable "function_bucket_1" {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All 3 functions can probably just put their source in a single "cloud_functions_source_bucket". What's the value of a bucket for each function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because Terraform bundles all of the files inside of the provided GCS bucket into one Cloud Function. This way the respective main.py and requirements.txt file in each bucket can have a unique CF.

description = "GCS bucket for function 3 code that sends email."
}

variable "function_name_1" {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more descriptive name?
perhaps run_query_function_name

default = "bq-email-run-query"
}

variable "function_name_2" {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more descriptive name?
perhaps export_results_function_name

}

variable "function_bucket" {
description = "Bucket for function code."
variable "function_name_3" {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more descriptive name?
perhaps email_results_function_name

Copy link

@jaketf jaketf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This architecturally is so much better thank you for the massive refactor!
I think that you can make this solution much more reusable by trying to make your cloud functions accept configuration (e.g. query, bucket, format, to/from email) as environment variables.
This way the user can use the exact same python source and redeploy this solution for many queries by setting env vars.)

@nehanene15 nehanene15 requested a review from jaketf October 16, 2020 17:12
Copy link

@jaketf jaketf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cloud functions look nice now thank you for the updates!
Can you please add some unit tests (e.g. mock api calls and assert you're making the expected calls for a given set of env vars)? Ideally we'd have integration tests for each function and the end to end flow but I'm happy to leave those to a future PR. We are trying to have cloudbuild files to run the tests for each asset in this repo.
As an example of a tests for a similar function using GCS and BQ you can look at this

@jaketf
Copy link

jaketf commented Nov 5, 2020

we've update the default branch for this repo to "main"
https://sfconservancy.org/news/2020/jun/23/gitbranchname/
https://github.com/github/renaming
Please change target branch to main.


The functional steps are listed here:

**Cloud Scheduler:** A [Cloud Scheduler](https://cloud.google.com/scheduler) job defines the time frame between periodic email exports.
Copy link

@jaketf jaketf Nov 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not to move the goal post again and perhaps you've already ruled this out (or built this a while ago), but why not use BigQuery Data transfer service Scheduled Queries (native to BQ no Cloud Scheduler dependency)?https://cloud.google.com/bigquery/docs/scheduling-queries#configuration_options

These can be setup w/ Pub/Sub notifications out of the box.
https://cloud.google.com/bigquery-transfer/docs/transfer-run-notifications

This would get rid of Cloud Scheduler, Pub/Sub topic #1 and Cloud Function #1 and instead use BQ scheduled queries to write directly to Pub/Sub topic. Also there will be no need for the logging sink / filter because we can handle this more explicitly.

I think this is a much cleaner solution (and one that will eliminate your VPC-SC caveat!).
This should be a small change as you can reuse Cloud Function #2 as-is and just remove the TF for extra pubsub topics / cloud function / logging sink.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely! I have implemented this in the most recent commit.

@nehanene15 nehanene15 requested a review from jaketf November 18, 2020 16:41
@nehanene15 nehanene15 changed the base branch from master to main November 19, 2020 19:31
blob_path = log_entry['protoPayload']['serviceData'][
'jobCompletedEvent']['job']['jobConfiguration']['extract'][
'destinationUris'][0]
bucket_name = get_bucket(blob_path)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be cleaner to use storage.Blob.from_string() as you only use this string parsing to construct the blob object anyway. This will do away with the need for those unit tests.

kudos to @danieldeleo for teaching me about this function recently in another code review!


def get_destination_uri():
"""Returns destination GCS URI for export"""
return f"gs://{os.environ.get('BUCKET_NAME')}/{os.environ.get('OBJECT_NAME')}"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that os.environ.get will silently return None if the env var is not present.
this should raise if these environment variables are not set.

nit: This function is also essentially just a global constant (as these env-vars are not expected to change throughout the lifetime a the function).

I would suggest refactoring this to be a global constant at the top of the file

DESTINATION_URI=f"gs://{os.environ['BUCKET_NAME']}/os.environ['OBJECT_NAME']"

This will raise a KeyError if these env vars don't exist and "fail faster" instantly on module load rather than waiting until you call this function (notably after instantiating BigQuery Client that is doomed to never be usefully used if this destination path is not properly formed)

This seems like this will overwrite the same file on each scheduled run. Should we instead encode some timestamp in the GCS destination path to distinguish between different scheduled exports?

import time

DESTINATION_URI=f"gs://{os.environ['BUCKET_NAME']}/{time.monotonic()}/os.environ['OBJECT_NAME']"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I'll read all the env vars using os.environ['BUCKET_NAME'] so it will fail faster. I'll also be creating a function to read the env vars for unit testing. Since I'll be reading them in via a function, a global constant won't have the scope to call it. Therefore, I'll most likely keep the get_destination_uri function, but open to other suggestions.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine.

"""Entrypoint for Cloud Function"""

data = base64.b64decode(event['data'])
pubsub_message = json.loads(data)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming nit: event is a pubsub message (attributes plus binary payload).
Once you decode and json.load it it's a dict that represents the bigquery DTS TransferRun object.

Suggestion:
s/pubsub_message/upstream_bq_dts_transfer_run/g
s/pubsub_message/scheduled_query_transfer_run/g

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unsure of what you mean by the suggestions. I can update the pubsub_message var to reflect that it's a dict by renaming to transfer_run_dict. Would that address this comment or am I misunderstanding?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that would work.

table_name = pubsub_message['params'][
'destination_table_name_template']

bq_client = bigquery.Client()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you set client_info (like this) so usage of API calls made by this tool can be tracked by user agent?

bq_client = bigquery.Client()

destination_uri = get_destination_uri()
dataset_ref = bigquery.DatasetReference(project_id, dataset_id)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is safer in case dataset_id includes project e.g. project_id.dataset_id or project_id:dataset_id

Suggested change
dataset_ref = bigquery.DatasetReference(project_id, dataset_id)
dataset_ref = bigquery.DatasetReference.from_string(dataset_id, default_project=project_id)

pubsub_message = json.loads(data)
error = pubsub_message.get('errorStatus')
if error:
logging.error(RuntimeError(f"Error in upstream query job:{error}"))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: also log job id for full context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The in-built error message from the object returns the job id. The error block looks like this:
{'code': 5, 'message': 'Not found: Dataset nehanene-dev:tf_test; JobID: nehanene-dev:5fb3f4cc-0000-2d86-97ed-94eb2c042718'}

I'll print out the error message instead of the whole error block for prettiness though.

else:
blob_path = log_entry['protoPayload']['serviceData'][
'jobCompletedEvent']['job']['jobConfiguration']['extract'][
'destinationUris'][0]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like you should check if destinationUris has multiple entries and log a warning that you're only returning the first one.

bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(object_name)

# Cloud Functions service account must have Service Account Token Creator role
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this permission is missing?
Can you add try / except block to catch the error and remind the user that os.getenv("FUNCTION_IDENTITY") needs roles/iam.serviceAccountTokenCreator

service_account_email=os.getenv("FUNCTION_IDENTITY"),
)

url = blob.generate_signed_url(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it'd be nice if this solution could also send unsigned urls (based on an env var).
this way you could email a URL to GCS object that is only accessible by auth'd users (e.g. they would login with their gcp identity then download)
https://storage.cloud.google.com/{bucket_id}/{object_id}



@pytest.fixture
def mock_env(monkeypatch):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are several other env vars you read to construct the export request you can unit test those as well.

It's perhaps most important to unit test the behavior when these env-vars are note properly set. You can assert that the expected exception is raised or you fall back to reasonable defaults.

@nehanene15 nehanene15 requested a review from jaketf November 24, 2020 15:41
@@ -60,4 +66,10 @@ def main(event, context):

def get_destination_uri():
"""Returns destination GCS URI for export"""
return f"gs://{os.environ.get('BUCKET_NAME')}/{os.environ.get('OBJECT_NAME')}"
return (f"gs://{get_env('BUCKET_NAME')}/"
f"{time.strftime('%Y%m%d-%H%M%S')}/{get_env('OBJECT_NAME')}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This is not a very common way to represent a timestamp https://docs.python.org/3/library/datetime.html#datetime.datetime.timestamp. I'd suggest just using a unix timestamp which is most explicit. or having something like yyyy=%Y/mm=%m/dd=%d/hr=%H/ which will be familiar to Hadoop users.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also currently this is cloud function execution time. Would it make more sense to use the scheduled time of the query (read from the transfer run)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, changed it to use the schedule time from BTS in ISO time format

blob_path = destination_uris[0]
blob = storage.Blob.from_string(blob_path)
url = generate_signed_url(blob) if get_env(
'SIGNED_URL') == 'True' else get_auth_url(blob_path)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitty nit: I like to use strtobool for this sort of thing as it accepts more possible user inputs and does the right thing.

https://docs.python.org/3/distutils/apiref.html#distutils.util.strtobool

@nehanene15 nehanene15 requested a review from jaketf December 2, 2020 17:47
Copy link

@jaketf jaketf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@jaketf jaketf merged commit 5f2606d into GoogleCloudPlatform:main Dec 7, 2020
rosmo pushed a commit to rosmo/professional-services that referenced this pull request Feb 1, 2021
* initial commit

* config files added

* new signing func

* Consolidated deploy script and minor edits to main.py

* created readme

* Updated to generalize variable names

* Added exception catching, cleaned up scripts, and updated README

* Quick fixes to README

* Ran shellcheck

* Added comments to main.py

* updated the license

* Updated code format and licenses

* Moved files into directory

* Moving gitignore files into dir

* Revert "Moving gitignore files into dir"

This reverts commit 23f9993.

* Revert "Moved files into directory"

This reverts commit ea33ceb.

* Moved files to examples/ and updated README

* lint

* yapf

* Added Terraform for provisioning, updated Scheduler to take a payload with configs, and updated syntax in main.py

* lint

* Updated TF code to set APIs, archive files, and templatize payload. Added query and export config options in payload. Set timeouts for query and export jobs.

* Major refractor of code to microservices driven architecture

* pep8 style reformat

* Added env vars for reproducibility, renamed folders/vars for specificity

* added unit tests and DTS for scheduled queries

* added support for unsigned urls and better error catching

* added schedule time to GCS path, strtobool for signed URL env var

Co-authored-by: ishitashah <[email protected]>
Co-authored-by: bharathkkb <[email protected]>
Co-authored-by: ishitashah24 <[email protected]>
Co-authored-by: Jacob Ferriero <[email protected]>
rosmo pushed a commit to rosmo/professional-services that referenced this pull request Mar 17, 2022
* initial commit

* config files added

* new signing func

* Consolidated deploy script and minor edits to main.py

* created readme

* Updated to generalize variable names

* Added exception catching, cleaned up scripts, and updated README

* Quick fixes to README

* Ran shellcheck

* Added comments to main.py

* updated the license

* Updated code format and licenses

* Moved files into directory

* Moving gitignore files into dir

* Revert "Moving gitignore files into dir"

This reverts commit 23f9993.

* Revert "Moved files into directory"

This reverts commit ea33ceb.

* Moved files to examples/ and updated README

* lint

* yapf

* Added Terraform for provisioning, updated Scheduler to take a payload with configs, and updated syntax in main.py

* lint

* Updated TF code to set APIs, archive files, and templatize payload. Added query and export config options in payload. Set timeouts for query and export jobs.

* Major refractor of code to microservices driven architecture

* pep8 style reformat

* Added env vars for reproducibility, renamed folders/vars for specificity

* added unit tests and DTS for scheduled queries

* added support for unsigned urls and better error catching

* added schedule time to GCS path, strtobool for signed URL env var

Co-authored-by: ishitashah <[email protected]>
Co-authored-by: bharathkkb <[email protected]>
Co-authored-by: ishitashah24 <[email protected]>
Co-authored-by: Jacob Ferriero <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla: yes All committers have signed a CLA size/XL Denotes a PR that changes 500-999 lines.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants