From 39e6459dbfc75ee168f9f30f5034601fb28c2cb2 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Wed, 12 Oct 2022 13:40:47 -0700 Subject: [PATCH 1/4] Refactor storage and fix data transfer service --- sky/data/data_transfer.py | 46 +++++++++++++++++++++++---------------- sky/data/storage.py | 38 +++++++++++++++++++++----------- 2 files changed, 52 insertions(+), 32 deletions(-) diff --git a/sky/data/data_transfer.py b/sky/data/data_transfer.py index a40c78fab27..f78c5c94107 100644 --- a/sky/data/data_transfer.py +++ b/sky/data/data_transfer.py @@ -15,10 +15,9 @@ - All combinations of Azure Transfer - GCS -> S3 """ -from datetime import datetime import json import subprocess -from typing import Any +import time from sky import clouds from sky import sky_logging @@ -26,13 +25,15 @@ logger = sky_logging.init_logger(__name__) -S3Store = Any -GcsStore = Any +MAX_POLLS = 120000 +POLL_INTERVAL = 1 def s3_to_gcs(s3_bucket_name: str, gs_bucket_name: str) -> None: """Creates a one-time transfer from Amazon S3 to Google Cloud Storage. + Can be viewed from: https://console.cloud.google.com/transfer/cloud + it will block until the transfer is complete. Args: s3_bucket_name: str; Name of the Amazon S3 Bucket @@ -56,24 +57,11 @@ def s3_to_gcs(s3_bucket_name: str, gs_bucket_name: str) -> None: _add_bucket_iam_member(gs_bucket_name, 'roles/storage.admin', 'serviceAccount:' + storage_account['accountEmail']) - starttime = datetime.utcnow() transfer_job = { 'description': f'Transferring data from S3 Bucket \ {s3_bucket_name} to GCS Bucket {gs_bucket_name}', 'status': 'ENABLED', 'projectId': project_id, - 'schedule': { - 'scheduleStartDate': { - 'day': starttime.day, - 'month': starttime.month, - 'year': starttime.year, - }, - 'scheduleEndDate': { - 'day': starttime.day, - 'month': starttime.month, - 'year': starttime.year, - }, - }, 'transferSpec': { 'awsS3DataSource': { 'bucketName': s3_bucket_name, @@ -88,8 +76,28 @@ def s3_to_gcs(s3_bucket_name: str, gs_bucket_name: str) -> None: } } - result = storagetransfer.transferJobs().create(body=transfer_job).execute() - logger.info(f'AWS -> GCS Transfer Job: {json.dumps(result, indent=4)}') + response = storagetransfer.transferJobs().create( + body=transfer_job).execute() + operation = storagetransfer.transferJobs().run(jobName=response['name'], + body={ + 'projectId': project_id + }).execute() + + logger.info(f'AWS -> GCS Transfer Job: {json.dumps(operation, indent=4)}') + logger.info('Waiting for the transfer to finish') + start = time.time() + for _ in range(MAX_POLLS): + result = (storagetransfer.transferOperations().get( + name=operation['name']).execute()) + if 'error' in result: + raise Exception(result['error']) + + if 'done' in result and result['done']: + logger.info('Operation done.') + break + logger.info('Waiting for the data transfer to be finished...') + time.sleep(POLL_INTERVAL) + logger.info(f'Transfer finished in {time.time() - start}') def gcs_to_s3(gs_bucket_name: str, s3_bucket_name: str) -> None: diff --git a/sky/data/storage.py b/sky/data/storage.py index c7cf64aafc8..2f06890164d 100644 --- a/sky/data/storage.py +++ b/sky/data/storage.py @@ -40,10 +40,32 @@ class StoreType(enum.Enum): + """Enum for the different types of stores.""" S3 = 'S3' GCS = 'GCS' AZURE = 'AZURE' + @classmethod + def from_cloud(cls, cloud: clouds.Cloud) -> 'StoreType': + if isinstance(cloud, clouds.AWS): + return StoreType.S3 + elif isinstance(cloud, clouds.GCP): + return StoreType.GCS + elif isinstance(cloud, clouds.Azure): + return StoreType.AZURE + + raise ValueError(f'Unsupported cloud for StoreType: {cloud}') + + @classmethod + def from_store(cls, store: 'Storage') -> 'StoreType': + if isinstance(store, S3Store): + return StoreType.S3 + elif isinstance(store, GcsStore): + return StoreType.GCS + else: + with ux_utils.print_exception_no_traceback(): + raise ValueError(f'Unknown store type: {store}') + class StorageMode(enum.Enum): MOUNT = 'MOUNT' @@ -76,16 +98,6 @@ def get_store_prefix(storetype: StoreType) -> str: raise ValueError(f'Unknown store type: {storetype}') -def _get_storetype_from_store(store: 'Storage') -> StoreType: - if isinstance(store, S3Store): - return StoreType.S3 - elif isinstance(store, GcsStore): - return StoreType.GCS - else: - with ux_utils.print_exception_no_traceback(): - raise ValueError(f'Unknown store type: {store}') - - class AbstractStore: """AbstractStore abstracts away the different storage types exposed by different clouds. @@ -312,11 +324,11 @@ def __repr__(self): f'\n\tstores={self.sky_stores})') def add_store(self, store: AbstractStore) -> None: - storetype = _get_storetype_from_store(store) + storetype = StoreType.from_store(store) self.sky_stores[storetype] = store.get_metadata() def remove_store(self, store: AbstractStore) -> None: - storetype = _get_storetype_from_store(store) + storetype = StoreType.from_store(store) if storetype in self.sky_stores: del self.sky_stores[storetype] @@ -607,7 +619,7 @@ def add_store(self, store_type: Union[str, StoreType]) -> AbstractStore: def _add_store(self, store: AbstractStore, is_reconstructed: bool = False): # Adds a store object to the storage - store_type = _get_storetype_from_store(store) + store_type = StoreType.from_store(store) self.stores[store_type] = store # If store initialized and is sky managed, add to state if store.is_sky_managed: From 7ac775f1cbc7e8832c8b0f984644dd0c88993e53 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Wed, 12 Oct 2022 14:01:50 -0700 Subject: [PATCH 2/4] fix UX for the data transfer --- sky/data/data_transfer.py | 36 ++++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/sky/data/data_transfer.py b/sky/data/data_transfer.py index f78c5c94107..bd8fcfab2eb 100644 --- a/sky/data/data_transfer.py +++ b/sky/data/data_transfer.py @@ -19,9 +19,13 @@ import subprocess import time +import colorama + from sky import clouds from sky import sky_logging from sky.adaptors import aws, gcp +from sky.backends import backend_utils +from sky.utils import ux_utils logger = sky_logging.init_logger(__name__) @@ -31,7 +35,7 @@ def s3_to_gcs(s3_bucket_name: str, gs_bucket_name: str) -> None: """Creates a one-time transfer from Amazon S3 to Google Cloud Storage. - + Can be viewed from: https://console.cloud.google.com/transfer/cloud it will block until the transfer is complete. @@ -83,21 +87,25 @@ def s3_to_gcs(s3_bucket_name: str, gs_bucket_name: str) -> None: 'projectId': project_id }).execute() - logger.info(f'AWS -> GCS Transfer Job: {json.dumps(operation, indent=4)}') + logger.info(f'{colorama.Fore.GREEN}Transfer job scheduled: ' + f'{colorama.Style.RESET_ALL}' + f'AWS s3://{s3_bucket_name} -> GCS gs://{gs_bucket_name} ') + logger.debug(json.dumps(operation, indent=4)) logger.info('Waiting for the transfer to finish') start = time.time() - for _ in range(MAX_POLLS): - result = (storagetransfer.transferOperations().get( - name=operation['name']).execute()) - if 'error' in result: - raise Exception(result['error']) - - if 'done' in result and result['done']: - logger.info('Operation done.') - break - logger.info('Waiting for the data transfer to be finished...') - time.sleep(POLL_INTERVAL) - logger.info(f'Transfer finished in {time.time() - start}') + with backend_utils.safe_console_status('Transferring'): + for _ in range(MAX_POLLS): + result = (storagetransfer.transferOperations().get( + name=operation['name']).execute()) + if 'error' in result: + with ux_utils.print_exception_no_traceback(): + raise RuntimeError(result['error']) + + if 'done' in result and result['done']: + logger.info('Operation done.') + break + time.sleep(POLL_INTERVAL) + logger.info(f'Transfer finished in {time.time() - start:.2f} seconds') def gcs_to_s3(gs_bucket_name: str, s3_bucket_name: str) -> None: From 63a0248c3eb455ba58a84055d4b6a6fad708ab8f Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Wed, 12 Oct 2022 14:04:54 -0700 Subject: [PATCH 3/4] UX fixes --- sky/data/data_transfer.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sky/data/data_transfer.py b/sky/data/data_transfer.py index bd8fcfab2eb..ff6ee7cb516 100644 --- a/sky/data/data_transfer.py +++ b/sky/data/data_transfer.py @@ -89,7 +89,7 @@ def s3_to_gcs(s3_bucket_name: str, gs_bucket_name: str) -> None: logger.info(f'{colorama.Fore.GREEN}Transfer job scheduled: ' f'{colorama.Style.RESET_ALL}' - f'AWS s3://{s3_bucket_name} -> GCS gs://{gs_bucket_name} ') + f's3://{s3_bucket_name} -> gs://{gs_bucket_name} ') logger.debug(json.dumps(operation, indent=4)) logger.info('Waiting for the transfer to finish') start = time.time() @@ -102,7 +102,6 @@ def s3_to_gcs(s3_bucket_name: str, gs_bucket_name: str) -> None: raise RuntimeError(result['error']) if 'done' in result and result['done']: - logger.info('Operation done.') break time.sleep(POLL_INTERVAL) logger.info(f'Transfer finished in {time.time() - start:.2f} seconds') @@ -130,4 +129,4 @@ def _add_bucket_iam_member(bucket_name: str, role: str, member: str) -> None: bucket.set_iam_policy(policy) - logger.info(f'Added {member} with role {role} to {bucket_name}.') + logger.debug(f'Added {member} with role {role} to {bucket_name}.') From cd114b2b9f426c8271aa7f2af5d4c400e7928e54 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Sun, 16 Oct 2022 14:55:53 -0700 Subject: [PATCH 4/4] Address comments --- sky/data/data_transfer.py | 11 ++++++++++- sky/data/storage.py | 2 +- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/sky/data/data_transfer.py b/sky/data/data_transfer.py index ff6ee7cb516..4c934b96f42 100644 --- a/sky/data/data_transfer.py +++ b/sky/data/data_transfer.py @@ -104,7 +104,16 @@ def s3_to_gcs(s3_bucket_name: str, gs_bucket_name: str) -> None: if 'done' in result and result['done']: break time.sleep(POLL_INTERVAL) - logger.info(f'Transfer finished in {time.time() - start:.2f} seconds') + else: + # If we get here, we timed out. + logger.info( + f'Transfer timed out after {(time.time() - start) / 3600:.2f} ' + 'hours. Please check the status of the transfer job in the GCP ' + 'Storage Transfer Service console at ' + 'https://cloud.google.com/storage-transfer-service') + return + logger.info( + f'Transfer finished in {(time.time() - start) / 60:.2f} minutes.') def gcs_to_s3(gs_bucket_name: str, s3_bucket_name: str) -> None: diff --git a/sky/data/storage.py b/sky/data/storage.py index 2f06890164d..9c59c8ff894 100644 --- a/sky/data/storage.py +++ b/sky/data/storage.py @@ -57,7 +57,7 @@ def from_cloud(cls, cloud: clouds.Cloud) -> 'StoreType': raise ValueError(f'Unsupported cloud for StoreType: {cloud}') @classmethod - def from_store(cls, store: 'Storage') -> 'StoreType': + def from_store(cls, store: 'AbstractStore') -> 'StoreType': if isinstance(store, S3Store): return StoreType.S3 elif isinstance(store, GcsStore):