Skip to content

Commit

Permalink
[spot] Suppress unnecessary spot launch filemount translation loggi…
Browse files Browse the repository at this point in the history
…ng. (skypilot-org#1227)

* Suppress unnecessary `spot launch` filemount translation logging.

* Fix logging.

* Update
  • Loading branch information
concretevitamin authored and ewzeng committed Oct 24, 2022
1 parent 451c41d commit 8caa3e5
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 21 deletions.
16 changes: 8 additions & 8 deletions sky/data/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,18 +582,18 @@ def add_store(self, store_type: Union[str, StoreType]) -> AbstractStore:
except exceptions.StorageBucketCreateError:
# Creation failed, so this must be sky managed store. Add failure
# to state.
logger.error(f'Sky could not create {store_type} store '
logger.error(f'Could not create {store_type} store '
f'with name {self.name}.')
global_user_state.set_storage_status(self.name,
StorageStatus.INIT_FAILED)
raise
except exceptions.StorageBucketGetError:
# Bucket get failed, so this is not sky managed. Do not update state
logger.error(f'Sky could not get {store_type} store '
logger.error(f'Could not get {store_type} store '
f'with name {self.name}.')
raise
except exceptions.StorageInitError:
logger.error(f'Sky could not initialize {store_type} store with '
logger.error(f'Could not initialize {store_type} store with '
f'name {self.name}. General initialization error.')
raise

Expand Down Expand Up @@ -669,7 +669,7 @@ def _sync_store(self, store: AbstractStore):
try:
store.upload()
except exceptions.StorageUploadError:
logger.error(f'Sky could not upload {self.source} to store '
logger.error(f'Could not upload {self.source} to store '
f'name {store.name}.')
if store.is_sky_managed:
global_user_state.set_storage_status(
Expand Down Expand Up @@ -829,7 +829,7 @@ def sync_local_dir(self) -> None:
f'{source} s3://{self.name}/')
with backend_utils.safe_console_status(
f'[bold cyan]Syncing '
f'[green]{self.source} to s3://{self.name}/'):
f'[green]{self.source}[/] to [green]s3://{self.name}/[/]'):
# TODO(zhwu): Use log_lib.run_with_log() and redirect the output
# to a log file.
with subprocess.Popen(sync_command.split(' '),
Expand All @@ -846,7 +846,7 @@ def sync_local_dir(self) -> None:
process.kill()
with ux_utils.print_exception_no_traceback():
raise PermissionError(
'Sky Storage failed to upload files to '
'Failed to upload files to '
'the S3 bucket. The bucket does not have '
'write permissions. It is possible that '
'the bucket is public.')
Expand Down Expand Up @@ -1079,7 +1079,7 @@ def sync_local_dir(self) -> None:
sync_command = f'gsutil -m rsync -d -r {source} gs://{self.name}/'
with backend_utils.safe_console_status(
f'[bold cyan]Syncing '
f'[green]{self.source} to gs://{self.name}/'):
f'[green]{self.source}[/] to [green]gs://{self.name}/[/]'):
with subprocess.Popen(sync_command.split(' '),
stderr=subprocess.PIPE,
stdout=subprocess.DEVNULL) as process:
Expand All @@ -1094,7 +1094,7 @@ def sync_local_dir(self) -> None:
process.kill()
with ux_utils.print_exception_no_traceback():
raise PermissionError(
'Sky Storage failed to upload files to '
'Failed to upload files to '
'GCS. The bucket does not have '
'write permissions. It is possible that '
'the bucket is public.')
Expand Down
40 changes: 27 additions & 13 deletions sky/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ def spot_launch(
f'{colorama.Style.RESET_ALL}')
return

task = _translate_local_file_mounts(task)
task = _maybe_translate_local_file_mounts_and_sync_up(task)

with tempfile.NamedTemporaryFile(prefix=f'spot-task-{name}-',
mode='w') as f:
Expand Down Expand Up @@ -425,7 +425,17 @@ def spot_launch(
)


def _translate_local_file_mounts(task: task_lib.Task) -> task_lib.Task:
def _maybe_translate_local_file_mounts_and_sync_up(
task: task_lib.Task) -> task_lib.Task:
"""Translates local->VM mounts into Storage->VM, then syncs up any Storage.
Eagerly syncing up local->Storage ensures Storage->VM would work at task
launch time.
If there are no local source paths to be translated, this function would
still sync up any storage mounts with local source paths (which do not
undergo translation).
"""
# ================================================================
# Translate the workdir and local file mounts to cloud file mounts.
# ================================================================
Expand All @@ -434,10 +444,17 @@ def _translate_local_file_mounts(task: task_lib.Task) -> task_lib.Task:
original_file_mounts = task.file_mounts if task.file_mounts else {}
original_storage_mounts = task.storage_mounts if task.storage_mounts else {}

copy_mounts = task.get_local_to_remote_file_mounts()
if copy_mounts is None:
copy_mounts = {}

has_local_source_paths = (task.workdir is not None) or copy_mounts
if has_local_source_paths:
logger.info(
f'{colorama.Fore.YELLOW}Translating file_mounts with local '
f'source paths to SkyPilot Storage...{colorama.Style.RESET_ALL}')

# Step 1: Translate the workdir to SkyPilot storage.
logger.info(
f'{colorama.Fore.YELLOW}Translating file_mounts with local '
f'source paths to SkyPilot Storage...{colorama.Style.RESET_ALL}')
new_storage_mounts = dict()
if task.workdir is not None:
bucket_name = spot.constants.SPOT_WORKDIR_BUCKET_NAME.format(
Expand Down Expand Up @@ -467,9 +484,6 @@ def _translate_local_file_mounts(task: task_lib.Task) -> task_lib.Task:
# TODO(zhwu): Optimize this by:
# 1. Use the same bucket for all the mounts.
# 2. When the src is the same, use the same bucket.
copy_mounts = task.get_local_to_remote_file_mounts()
if copy_mounts is None:
copy_mounts = {}
copy_mounts_with_file_in_src = dict()
for i, (dst, src) in enumerate(copy_mounts.items()):
task.file_mounts.pop(dst)
Expand All @@ -492,9 +506,6 @@ def _translate_local_file_mounts(task: task_lib.Task) -> task_lib.Task:

# Step 3: Translate local file mounts with file in src to SkyPilot storage.
# Hard link the files in src to a temporary directory, and upload folder.
original_storage_mounts = {}
if task.storage_mounts is not None:
original_storage_mounts = task.storage_mounts
local_fm_path = os.path.join(
tempfile.gettempdir(),
spot.constants.SPOT_FM_LOCAL_TMP_DIR.format(id=run_id))
Expand Down Expand Up @@ -533,8 +544,11 @@ def _translate_local_file_mounts(task: task_lib.Task) -> task_lib.Task:
# Upload the local source to a bucket. The task will not be executed
# locally, so we need to upload the files/folders to the bucket manually
# here before sending the task to the remote spot controller.
logger.info(f'{colorama.Fore.YELLOW}Uploading sources to cloud storage.'
f'{colorama.Style.RESET_ALL} See sky storage ls')
if task.storage_mounts:
# There may be existing (non-translated) storage mounts, so log this
# whenever task.storage_mounts is non-empty.
logger.info(f'{colorama.Fore.YELLOW}Uploading sources to cloud storage.'
f'{colorama.Style.RESET_ALL} See: sky storage ls')
task.sync_storage_mounts()

# Step 5: Add the file download into the file mounts, such as
Expand Down

0 comments on commit 8caa3e5

Please sign in to comment.