Skip to content

Commit

Permalink
BigQuery email exports example (GoogleCloudPlatform#474)
Browse files Browse the repository at this point in the history
* initial commit

* config files added

* new signing func

* Consolidated deploy script and minor edits to main.py

* created readme

* Updated to generalize variable names

* Added exception catching, cleaned up scripts, and updated README

* Quick fixes to README

* Ran shellcheck

* Added comments to main.py

* updated the license

* Updated code format and licenses

* Moved files into directory

* Moving gitignore files into dir

* Revert "Moving gitignore files into dir"

This reverts commit 23f9993.

* Revert "Moved files into directory"

This reverts commit ea33ceb.

* Moved files to examples/ and updated README

* lint

* yapf

* Added Terraform for provisioning, updated Scheduler to take a payload with configs, and updated syntax in main.py

* lint

* Updated TF code to set APIs, archive files, and templatize payload. Added query and export config options in payload. Set timeouts for query and export jobs.

* Major refractor of code to microservices driven architecture

* pep8 style reformat

* Added env vars for reproducibility, renamed folders/vars for specificity

* added unit tests and DTS for scheduled queries

* added support for unsigned urls and better error catching

* added schedule time to GCS path, strtobool for signed URL env var

Co-authored-by: ishitashah <[email protected]>
Co-authored-by: bharathkkb <[email protected]>
Co-authored-by: ishitashah24 <[email protected]>
Co-authored-by: Jacob Ferriero <[email protected]>
  • Loading branch information
5 people authored and rosmo committed Feb 1, 2021
1 parent a333394 commit ae1787d
Show file tree
Hide file tree
Showing 18 changed files with 843 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The examples folder contains example solutions across a variety of Google Cloud

* [Audio Content Profiling](examples/ml-audio-content-profiling) - A tool that builds a pipeline to scale the process of moderating audio files for inappropriate content using machine learning APIs.
* [BigQuery Audit Log Dashboard](examples/bigquery-audit-log) - Solution to help audit BigQuery usage using Data Studio for visualization and a sample SQL script to query the back-end data source consisting of audit logs.
* [BigQuery Automated Email Exports](examples/bq-email-exports) - Serverless solution to automate the sending of BigQuery export results via email on a scheduled interval. The email will contain a link to a signed or unsigned URL, allowing the recipient to view query results as a JSON, CSV, or Avro file.
* [BigQuery Billing Dashboard](examples/bigquery-billing-dashboard) - Solution to help displaying billing info using Data Studio for visualization and a sample SQL script to query the back-end billing export table in BigQuery.
* [BigQuery Cross Project Slot Monitoring](examples/bigquery-cross-project-slot-monitoring) - Solution to help monitoring slot utilization across multiple projects, while breaking down allocation per project.
* [BigQuery Group Sync For Row Level Access](examples/bigquery-row-access-groups) - Sample code to synchronize group membership from G Suite/Cloud Identity into BigQuery and join that with your data to control access at row level.
Expand Down
21 changes: 21 additions & 0 deletions examples/bq-email-exports/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Macos
**/.DS_Store

# Terraform
.terraform
terraform.tfstate
terraform.tfstate.*

# Unit tests
.cache
.pytest_cache
__pycache__/
46 changes: 46 additions & 0 deletions examples/bq-email-exports/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Automated BigQuery Exports via Email

This serverless solution enables users to regularly send BigQuery export results via email. The end users will get a scheduled email with a link to either a Google Cloud Storage [signed URL](https://cloud.google.com/storage/docs/access-control/signed-urls) or an [unsigned URL](https://cloud.google.com/storage/docs/request-endpoints#cookieauth), from which they can view query results as a JSON, CSV, or Avro file.

The [signed URL](https://cloud.google.com/storage/docs/access-control/signed-urls) will allow anyone with the link to be able to download the file for a limited time. The [unsigned URL](https://cloud.google.com/storage/docs/request-endpoints#cookieauth) will require cookie-based authentication and ask the user to sign in to their Google account to identify themselves. The user must have the appropriate [IAM permissions](https://cloud.google.com/storage/docs/access-control) to access the object in Google Cloud Storage.

The functional steps are listed here:

**BigQuery Scheduled Query:** A [scheduled query](https://cloud.google.com/bigquery/docs/scheduling-queries#bq_1) is set up in BigQuery.

**Pub/Sub #1:** A Pub/Sub topic is triggered by every successful scheduled query run.

**Cloud Function #1:** A [Cloud Function](https://cloud.google.com/functions) subscribes to the above Pub/Sub topic and exports query results to GCS with a job ID prefix of `email_export`. The GCS bucket will always hold the most recent export and this file will be overwritten for each future export.

**Pub/Sub #2:** A second topic is triggered by a logging sink with a filter for export job completion with the job ID prefix of `email_export`.

**Cloud Function #2:** A second function subscribes to the above Pub/Sub topic and sends the email via the SendGrid API with a link to the signed or unsigned URL of the file.

**SendGrid API** The [SendGrid API](https://sendgrid.com/) is a web based API that sends the signed URL as an email to users.

To implement this solution, follow the steps below:

## Set Up
1. Generate a SendGrid API key by creating a free tier [SendGrid account](https://signup.sendgrid.com/).

2. Deploy with Terraform.

## Deploying the pipeline

To deploy the pipeline, run:
```bash
cd terraform
terraform init
terraform apply
```
The Terraform code will use a compressed version of the source directories that contain `main.py` and `requirements.txt` files as the respective source code for the Cloud Functions.

## Caveats and Considerations
1. BigQuery can export up to 1 GB of data to a single file. If your query results are over 1 GB, you must export your data to multiple files in GCS which this solution does not support. Another option would be to use [GCS Compose](https://cloud.google.com/storage/docs/composite-objects) to concatenate multiple objects in order to email only one file.

2. Signed URLs can be a data exfiltration risk. Consider the security risks regarding the sending of data through a signed URL.

If your use case does not meet the above constraints, another option would be to use a [Cloud Composer workflow](https://cloud.google.com/composer/docs/how-to/using/writing-dags) to execute the pipeline. If you are a GSuite user, this solution can also be implemented with a scheduled [Apps Script](https://developers.google.com/apps-script) using the [BigQuery Service](https://developers.google.com/apps-script/advanced/bigquery) and exporting data to a [Google Sheet](https://developers.google.com/apps-script/reference/spreadsheet).

## Troubleshooting
If there is an issue with the upstream query or export job, the Cloud Function will log the error. In the case that an email was not sent, please check the Cloud Functions and BigQuery logs.
Empty file.
Empty file.
75 changes: 75 additions & 0 deletions examples/bq-email-exports/export_query_results_function/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Cloud Function for exporting BigQuery results from an anonymous table to GCS.
Triggered after BigQuery query is complete.
"""

import base64
import json
import logging
import os

import google.api_core.client_info
from google.cloud import bigquery

CLIENT_INFO = google.api_core.client_info.ClientInfo(
user_agent="google-pso-example/bq-email-exports")


def main(event, context):
"""Entrypoint for Cloud Function"""

data = base64.b64decode(event['data'])
upstream_bq_dts_obj = json.loads(data)
error = upstream_bq_dts_obj.get('errorStatus')
if error:
logging.error(
RuntimeError(f"Error in upstream query job: {error['message']}."))
else:
project_id = get_env('PROJECT_ID')
dataset_id = upstream_bq_dts_obj['destinationDatasetId']
table_name = upstream_bq_dts_obj['params'][
'destination_table_name_template']
schedule_time = upstream_bq_dts_obj['scheduleTime']

bq_client = bigquery.Client(client_info=CLIENT_INFO)

dataset_ref = bigquery.DatasetReference.from_string(
dataset_id, default_project=project_id)
table_ref = dataset_ref.table(table_name)
destination_uri = get_destination_uri(schedule_time)
extract_config = bigquery.ExtractJobConfig(
compression=get_env('COMPRESSION'),
destination_format=get_env('DEST_FMT'),
field_delimeter=get_env('FIELD_DELIMITER'),
use_avro_logical_types=get_env('USE_AVRO_TYPES'))
bq_client.extract_table(table_ref,
destination_uri,
job_id_prefix="email_export_",
job_config=extract_config)
print(
f"Exporting {project_id}:{dataset_id}.{table_name} to {destination_uri}"
)


def get_destination_uri(schedule_time):
"""Returns destination GCS URI for export"""
return (f"gs://{get_env('BUCKET_NAME')}/"
f"{schedule_time}/{get_env('OBJECT_NAME')}")


def get_env(name):
"""Returns environment variable"""
return os.environ[name]
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# using SendGrid's Python Library
# https://github.com/sendgrid/sendgrid-python

google-cloud-bigquery==1.25.0
Empty file.
120 changes: 120 additions & 0 deletions examples/bq-email-exports/send_email_function/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Cloud Function for sending the email with a signed URL link.
Triggered after export job from BigQuery to GCS is complete.
Uses https://github.com/sendgrid/sendgrid-python
"""

import base64
import datetime
import distutils.util
import json
import logging
import os

from google.auth import default, exceptions, iam
from google.auth.transport import requests
from google.cloud import storage
from google.oauth2 import service_account
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail


def main(event, context):
"""Entrypoint for Cloud Function"""

data = base64.b64decode(event['data'])
log_entry = json.loads(data)
status = log_entry['severity']
if status == "ERROR":
code = log_entry['protoPayload']['status']['code']
message = log_entry['protoPayload']['status']['message']
job_id = log_entry['protoPayload']['serviceData']['jobCompletedEvent'][
'job']['jobName']['jobId']
logging.error(
RuntimeError(
f"Error in upstream export job with ID {job_id}. Code {code}: {message}"
))
else:
destination_uris = log_entry['protoPayload']['serviceData'][
'jobCompletedEvent']['job']['jobConfiguration']['extract'][
'destinationUris']
if len(destination_uris) > 1:
logging.warning(
"Multiple GCS URIs found from BigQuery export. Only the first "
"file will be linked in the email.")

blob_path = destination_uris[0]
blob = storage.Blob.from_string(blob_path)
url = generate_signed_url(blob) if distutils.util.strtobool(get_env(
'SIGNED_URL')) else get_auth_url(blob_path)

message = Mail(
from_email=get_env('FROM_EMAIL'),
to_emails=get_env('TO_EMAILS'),
subject=get_env('EMAIL_SUBJECT'),
html_content="<p> Your BigQuery export from Google Cloud Platform \
is linked <a href={}>here</a>.</p>".format(url),
)

try:
sg_client = SendGridAPIClient(get_env('SENDGRID_API_KEY'))
response = sg_client.send(message)
print(f"SendGrid response code: {response.status_code}")
except Exception as exc:
logging.error(RuntimeError(f"ERROR: sending email failed: {exc}"))


def generate_signed_url(blob):
"""Generate signed URL for storage blob"""
credentials = default(
scopes=["https://www.googleapis.com/auth/cloud-platform"])[0]
signer = iam.Signer(
request=requests.Request(),
credentials=credentials,
service_account_email=os.getenv("FUNCTION_IDENTITY"),
)
# Create token-based service account credentials for signing
signing_credentials = service_account.IDTokenCredentials(
signer=signer,
token_uri="https://www.googleapis.com/oauth2/v4/token",
target_audience="",
service_account_email=os.getenv("FUNCTION_IDENTITY"),
)
# Cloud Functions service account must have Service Account Token Creator role
try:
url = blob.generate_signed_url(
version="v4",
expiration=datetime.timedelta(
hours=int(get_env('SIGNED_URL_EXPIRATION'))),
method="GET",
credentials=signing_credentials)
except exceptions.TransportError:
logging.error(
RuntimeError("Service account running the function must have IAM "
"roles/iam.serviceAccountTokenCreator."))
else:
print("Generated signed URL.")
return url


def get_env(name):
"""Returns environment variable"""
return os.environ[name]


def get_auth_url(blob_path):
"""Returns authenticated URL given GCS URI"""
return blob_path.replace('gs://', 'https://storage.cloud.google.com/')
20 changes: 20 additions & 0 deletions examples/bq-email-exports/send_email_function/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# using SendGrid's Python Library
# https://github.com/sendgrid/sendgrid-python

google-auth==1.18.0
google-cloud-storage==1.29.0
sendgrid==6.4.3
52 changes: 52 additions & 0 deletions examples/bq-email-exports/terraform/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
## Providers

| Name | Version |
|------|---------|
| archive | n/a |
| google | ~> 3.48.0 |

## Inputs

| Name | Description | Type | Default | Required |
|------|-------------|------|---------|:-----:|
| bq\_dataset\_expiration | The default lifetime of all tables in the dataset in milliseconds. The minimum value is 3600000 ms, or one hour. | `string` | `"3600000"` | no |
| bq\_dataset\_name | Name for BQ dataset where the scheduled query results will be saved. | `any` | n/a | yes |
| bq\_table\_name | Name for BQ table where the scheduled query results will be saved. | `any` | n/a | yes |
| bucket\_lifecycle | Number of days the exported query result files should stay in the storage bucket. | `number` | `1` | no |
| email\_results\_function\_name | Name for the Cloud Function that sends email. | `string` | `"bq-email-send-email"` | no |
| email\_subject | Subject of email address containing query results. | `any` | n/a | yes |
| enable\_signed\_url | Boolean indicating whether the link sent via email should be a signed URL or unsigned URL requiring cookie-based authentication | `string` | `"True"` | no |
| export\_compression | Compression type to use for exported files. | `string` | `"NONE"` | no |
| export\_destination\_format | Exported file format. | `string` | `"NEWLINE_DELIMETED_JSON"` | no |
| export\_field\_delimiter | Delimiter to use between fields in the exported data. | `string` | `","` | no |
| export\_logging\_sink\_name | Name for the logging sink that triggers Pub/Sub topic 3 when export to GCS is completed. | `string` | `"bq-email-export-completed"` | no |
| export\_object\_name | GCS object name with JSON, CSV, or AVRO file extension for query results file | `any` | n/a | yes |
| export\_results\_function\_name | Name for the Cloud Function that exports query results. | `string` | `"bq-email-export-gcs"` | no |
| export\_use\_avro\_logical\_types | For loads of Avro data, governs whether Avro logical types are converted to their corresponding BigQuery types. | `string` | `"False"` | no |
| function\_bucket\_1 | GCS bucket name for function 1 code that runs query. | `any` | n/a | yes |
| function\_bucket\_2 | GCS bucket name for function 2 code that exports query results. | `any` | n/a | yes |
| location | Location for GCP resources | `string` | `"US"` | no |
| project\_id | Project ID for your GCP project | `any` | n/a | yes |
| pubsub\_export | Name for the Pub/Sub topic that is triggered on scheduled query completion. | `string` | `"bq-email-gcs-export"` | no |
| pubsub\_send\_email | Name for the Pub/Sub topic that is triggered on successful export to GCS. | `string` | `"bq-email-send-email"` | no |
| query | Query that will run in BQ. The results will be sent via email. | `any` | n/a | yes |
| recipient\_email\_address | Email address of recipient. | `any` | n/a | yes |
| region | Region for GCP resources | `string` | `"us-central1"` | no |
| schedule | Scheduled query schedule. Examples of valid format: 1st,3rd monday of month 15:30, every wed jan, every 15 minutes. | `any` | n/a | yes |
| scheduled\_query\_name | Display name for BQ scheduled query | `string` | `"bq-email-exports"` | no |
| sender\_email\_address | Email address of sender. | `any` | n/a | yes |
| sendgrid\_api\_key | API key for authenticating the sending of emails through SendGrid API | `any` | n/a | yes |
| service\_acct\_name | The service account used by the three BQ email export Cloud Functions | `any` | n/a | yes |
| service\_acct\_roles | Roles for the Cloud Function service account | `list` | <pre>[<br> "roles/bigquery.admin",<br> "roles/storage.admin",<br> "roles/iam.serviceAccountTokenCreator"<br>]</pre> | no |
| signed\_url\_expiration\_hrs | Number of hours until the signed URL sent via email will expire. | `number` | `24` | no |
| storage\_bucket | Name of GCS bucket to store exported query results from BQ. | `any` | n/a | yes |

## How to Use

This Terraform code creates the pipeline to schedule BigQuery query results to be sent via email.
To deploy the pipeline, run:
```bash
cd terraform
terraform init
terraform apply
```
Loading

0 comments on commit ae1787d

Please sign in to comment.