Skip to content

Commit

Permalink
Pass credentials explicitly to Google clients
Browse files Browse the repository at this point in the history
Avoids issue documented in move-coop#1039 where credentials for all GCP clients
are stored in the same environment variable, leading to overwrites if
multiple clients are initialized in the same environment.
  • Loading branch information
austinweisgrau committed Jun 21, 2024
1 parent 83345b4 commit c8e4250
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 48 deletions.
31 changes: 17 additions & 14 deletions parsons/google/google_admin.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from oauth2client.service_account import ServiceAccountCredentials
from parsons.etl.table import Table
from parsons.google.utilities import setup_google_application_credentials
import httplib2
import json
import os
import uuid

from google.auth.transport.requests import AuthorizedSession

from parsons.etl.table import Table
from parsons.google.utilities import (
load_google_application_credentials,
setup_google_application_credentials,
)


class GoogleAdmin(object):
Expand All @@ -23,17 +27,16 @@ class GoogleAdmin(object):
"""

def __init__(self, app_creds=None, sub=None):
setup_google_application_credentials(app_creds)

self.client = (
ServiceAccountCredentials.from_json_keyfile_name(
os.environ["GOOGLE_APPLICATION_CREDENTIALS"],
["https://www.googleapis.com/auth/admin.directory.group"],
)
.create_delegated(sub)
.authorize(httplib2.Http())
env_credentials_path = str(uuid.uuid4())
setup_google_application_credentials(app_creds, target_env_var_name=env_credentials_path)
credentials = load_google_application_credentials(
env_credentials_path,
scopes=["https://www.googleapis.com/auth/admin.directory.group"],
subject=sub,
)

self.client = AuthorizedSession(credentials)

def _paginate_request(self, endpoint, collection, params=None):
# Build query params
param_arr = []
Expand Down
14 changes: 9 additions & 5 deletions parsons/google/google_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
from parsons.databases.table import BaseTable
from parsons.etl import Table
from parsons.google.google_cloud_storage import GoogleCloudStorage
from parsons.google.utitities import setup_google_application_credentials
from parsons.google.utilities import (
load_google_application_credentials,
setup_google_application_credentials,
)
from parsons.utilities import check_env
from parsons.utilities.files import create_temp_file

Expand Down Expand Up @@ -160,8 +163,9 @@ def __init__(
if isinstance(app_creds, Credentials):
self.credentials = app_creds
else:
self.credentials = None
setup_google_application_credentials(app_creds)
env_credential_path = str(uuid.uuid4())
setup_google_application_credentials(app_creds, target_env_var_name=env_credential_path)
self.credentials = load_google_application_credentials(env_credential_path)

self.project = project
self.location = location
Expand Down Expand Up @@ -695,7 +699,7 @@ def copy_s3(

# copy from S3 to GCS
tmp_gcs_bucket = check_env.check("GCS_TEMP_BUCKET", tmp_gcs_bucket)
gcs_client = gcs_client or GoogleCloudStorage()
gcs_client = gcs_client or GoogleCloudStorage(app_creds=self.app_creds)
temp_blob_uri = gcs_client.copy_s3_to_gcs(
aws_source_bucket=bucket,
aws_access_key_id=aws_access_key_id,
Expand Down Expand Up @@ -808,7 +812,7 @@ def copy(
schema.append(schema_row)
job_config.schema = schema

gcs_client = gcs_client or GoogleCloudStorage()
gcs_client = gcs_client or GoogleCloudStorage(app_creds=self.app_creds)
temp_blob_name = f"{uuid.uuid4()}.{data_type}"
temp_blob_uri = gcs_client.upload_table(tbl, tmp_gcs_bucket, temp_blob_name)

Expand Down
52 changes: 37 additions & 15 deletions parsons/google/google_cloud_storage.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
import google
from google.cloud import storage
from google.cloud import storage_transfer
from parsons.google.utilities import setup_google_application_credentials
from parsons.utilities import files
import datetime
import gzip
import petl
Expand All @@ -12,6 +7,15 @@
import zipfile
from typing import Optional

import google
from google.cloud import storage, storage_transfer

from parsons.google.utilities import (
load_google_application_credentials,
setup_google_application_credentials,
)
from parsons.utilities import files

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -42,17 +46,21 @@ class GoogleCloudStorage(object):
"""

def __init__(self, app_creds=None, project=None):
setup_google_application_credentials(app_creds)
env_credentials_path = str(uuid.uuid4())
setup_google_application_credentials(
app_creds, target_env_var_name=env_credentials_path
)
credentials = load_google_application_credentials(env_credentials_path)
self.project = project

# Throws an error if you pass project=None, so adding if/else statement.
if not self.project:
self.client = storage.Client()
self.client = storage.Client(credentials=credentials)
"""
Access all methods of `google.cloud` package
"""
else:
self.client = storage.Client(project=self.project)
self.client = storage.Client(credentials=credentials, project=self.project)

def list_buckets(self):
"""
Expand Down Expand Up @@ -289,7 +297,9 @@ def delete_blob(self, bucket_name, blob_name):
blob.delete()
logger.info(f"{blob_name} blob in {bucket_name} bucket deleted.")

def upload_table(self, table, bucket_name, blob_name, data_type="csv", default_acl=None):
def upload_table(
self, table, bucket_name, blob_name, data_type="csv", default_acl=None
):
"""
Load the data from a Parsons table into a blob.
Expand Down Expand Up @@ -325,7 +335,9 @@ def upload_table(self, table, bucket_name, blob_name, data_type="csv", default_a
local_file = table.to_json()
content_type = "application/json"
else:
raise ValueError(f"Unknown data_type value ({data_type}): must be one of: csv or json")
raise ValueError(
f"Unknown data_type value ({data_type}): must be one of: csv or json"
)

try:
blob.upload_from_filename(
Expand Down Expand Up @@ -395,7 +407,9 @@ def copy_bucket_to_gcs(
Secret key to authenticate storage transfer
"""
if source not in ["gcs", "s3"]:
raise ValueError(f"Blob transfer only supports gcs and s3 sources [source={source}]")
raise ValueError(
f"Blob transfer only supports gcs and s3 sources [source={source}]"
)
if source_path and source_path[-1] != "/":
raise ValueError("Source path much end in a '/'")

Expand Down Expand Up @@ -582,9 +596,13 @@ def unzip_blob(
}

file_extension = compression_params[compression_type]["file_extension"]
compression_function = compression_params[compression_type]["compression_function"]
compression_function = compression_params[compression_type][
"compression_function"
]

compressed_filepath = self.download_blob(bucket_name=bucket_name, blob_name=blob_name)
compressed_filepath = self.download_blob(
bucket_name=bucket_name, blob_name=blob_name
)

decompressed_filepath = compressed_filepath.replace(file_extension, "")
decompressed_blob_name = (
Expand Down Expand Up @@ -616,7 +634,9 @@ def __gzip_decompress_and_write_to_gcs(self, **kwargs):
bucket_name = kwargs.pop("bucket_name")

with gzip.open(compressed_filepath, "rb") as f_in:
logger.debug(f"Uploading uncompressed file to GCS: {decompressed_blob_name}")
logger.debug(
f"Uploading uncompressed file to GCS: {decompressed_blob_name}"
)
bucket = self.get_bucket(bucket_name=bucket_name)
blob = storage.Blob(name=decompressed_blob_name, bucket=bucket)
blob.upload_from_file(file_obj=f_in, rewind=True, timeout=3600)
Expand All @@ -636,7 +656,9 @@ def __zip_decompress_and_write_to_gcs(self, **kwargs):
with zipfile.ZipFile(compressed_filepath) as path_:
# Open the underlying file
with path_.open(decompressed_blob_in_archive) as f_in:
logger.debug(f"Uploading uncompressed file to GCS: {decompressed_blob_name}")
logger.debug(
f"Uploading uncompressed file to GCS: {decompressed_blob_name}"
)
bucket = self.get_bucket(bucket_name=bucket_name)
blob = storage.Blob(name=decompressed_blob_name, bucket=bucket)
blob.upload_from_file(file_obj=f_in, rewind=True, timeout=3600)
39 changes: 25 additions & 14 deletions parsons/google/google_sheets.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import os
import json
import logging

from parsons.etl.table import Table
from parsons.google.utitities import setup_google_application_credentials, hexavigesimal
import uuid

import gspread
from google.oauth2.service_account import Credentials

from parsons.etl.table import Table
from parsons.google.utilities import (
load_google_application_credentials,
setup_google_application_credentials,
hexavigesimal,
)

logger = logging.getLogger(__name__)

Expand All @@ -32,12 +34,15 @@ def __init__(self, google_keyfile_dict=None, subject=None):
"https://www.googleapis.com/auth/drive",
]

setup_google_application_credentials(google_keyfile_dict, "GOOGLE_DRIVE_CREDENTIALS")
google_credential_file = open(os.environ["GOOGLE_DRIVE_CREDENTIALS"])
credentials_dict = json.load(google_credential_file)
env_credentials_path = str(uuid.uuid4())
setup_google_application_credentials(
google_keyfile_dict,
"GOOGLE_DRIVE_CREDENTIAL",
target_env_var_name=env_credentials_path,
)

credentials = Credentials.from_service_account_info(
credentials_dict, scopes=scope, subject=subject
credentials = load_google_application_credentials(
env_credentials_path, scopes=scope, subject=subject
)

self.gspread_client = gspread.authorize(credentials)
Expand All @@ -47,12 +52,16 @@ def _get_worksheet(self, spreadsheet_id, worksheet=0):

# Check if the worksheet is an integer, if so find the sheet by index
if isinstance(worksheet, int):
return self.gspread_client.open_by_key(spreadsheet_id).get_worksheet(worksheet)
return self.gspread_client.open_by_key(spreadsheet_id).get_worksheet(
worksheet
)

elif isinstance(worksheet, str):
idx = self.list_worksheets(spreadsheet_id).index(worksheet)
try:
return self.gspread_client.open_by_key(spreadsheet_id).get_worksheet(idx)
return self.gspread_client.open_by_key(spreadsheet_id).get_worksheet(
idx
)
except: # noqa: E722
raise ValueError(f"Couldn't find worksheet {worksheet}")

Expand Down Expand Up @@ -276,7 +285,9 @@ def append_to_sheet(

# If the existing sheet is blank, then just overwrite the table.
if existing_table.num_rows == 0:
return self.overwrite_sheet(spreadsheet_id, table, worksheet, user_entered_value)
return self.overwrite_sheet(
spreadsheet_id, table, worksheet, user_entered_value
)

cells = []
for row_num, row in enumerate(table.data):
Expand Down

0 comments on commit c8e4250

Please sign in to comment.