diff --git a/.gitignore b/.gitignore index 8b1ee6b888..bf10edbed0 100644 --- a/.gitignore +++ b/.gitignore @@ -110,6 +110,7 @@ venv.bak/ # scratch scratch* old!_* +test.ipynb # vscode .vscode/ diff --git a/parsons/google/google_cloud_storage.py b/parsons/google/google_cloud_storage.py index b1ec2fd2e8..d5d0ad9b86 100644 --- a/parsons/google/google_cloud_storage.py +++ b/parsons/google/google_cloud_storage.py @@ -7,7 +7,6 @@ import logging import time import uuid -from grpc import StatusCode import gzip import shutil from typing import Optional @@ -368,13 +367,13 @@ def copy_bucket_to_gcs( source_bucket (str): Source bucket name source_path (str): - Path in the source system pointing to the relevant keys / files to sync. Must end in a '/' + Path in the source system pointing to the relevant keys + / files to sync. Must end in a '/' aws_access_key_id (str): Access key to authenticate storage transfer aws_secret_access_key (str): Secret key to authenticate storage transfer """ - if source not in ["gcs", "s3"]: raise ValueError( f"Blob transfer only supports gcs and s3 sources [source={source}]" @@ -390,9 +389,11 @@ def copy_bucket_to_gcs( one_time_schedule = {"day": now.day, "month": now.month, "year": now.year} if source == "gcs": - description = f"One time GCS to GCS Transfer [{source_bucket} -> {gcs_sink_bucket}] - {uuid.uuid4()}" + description = f"""One time GCS to GCS Transfer + [{source_bucket} -> {gcs_sink_bucket}] - {uuid.uuid4()}""" elif source == "s3": - description = f"One time S3 to GCS Transfer [{source_bucket} -> {gcs_sink_bucket}] - {uuid.uuid4()}" + description = f"""One time S3 to GCS Transfer + [{source_bucket} -> {gcs_sink_bucket}] - {uuid.uuid4()}""" transfer_job_config = { "project_id": self.project, @@ -439,12 +440,10 @@ def copy_bucket_to_gcs( # Create the transfer job create_result = client.create_transfer_job(create_transfer_job_request) - logger.info(f"Created TransferJob: {create_result.name}") polling = True wait_time = 0 wait_between_attempts_in_sec = 10 - max_wait_in_sec = 60 * 10 # Ten Minutes # NOTE: This value defaults to an empty string until GCP # triggers the job internally ... we'll use this value to @@ -453,18 +452,23 @@ def copy_bucket_to_gcs( while polling: if latest_operation_name: + operation = client.get_operation({"name": latest_operation_name}) if not operation.done: logger.debug("Operation still running...") else: - if int(operation.error.code) not in StatusCode["OK"].value: + operation_metadata = storage_transfer.TransferOperation.deserialize( + operation.metadata.value + ) + error_output = operation_metadata.error_breakdowns + if len(error_output) != 0: raise Exception( - f"""{blob_storage} to GCS Transfer Job {create_result.name} failed with error: {operation.error.message} - """ + f"""{blob_storage} to GCS Transfer Job + {create_result.name} failed with error: {error_output}""" ) - if operation.response: + else: logger.info(f"TransferJob: {create_result.name} succeeded.") return