Skip to content

Commit

Permalink
[Storage] Support multiple files in Storage (skypilot-org#1311)
Browse files Browse the repository at this point in the history
* Set rename_dir_lim for gcsfuse

* Add support for list of sources for Storage

* fix demo yaml

* tests

* lint

* lint

* test

* add validation

* address zhwu comments

* add error on basename conflicts

* use gsutil cp -n instead of gsutil rsync

* lint

* fix name

* parallelize gsutil rsync

* parallelize aws s3 rsync

* lint

* address comments

* refactor

* lint

* address comments

* update schema
  • Loading branch information
romilbhardwaj authored and Sumanth committed Jan 15, 2023
1 parent 4cfd2ac commit 5ad49cb
Show file tree
Hide file tree
Showing 7 changed files with 469 additions and 156 deletions.
26 changes: 22 additions & 4 deletions docs/source/reference/storage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,23 @@ and storage mounting:
name: romil-output-bucket
mode: MOUNT
# *** Uploading multiple files to the same Storage object ***
#
# The source field in a storage object can also be a list of local paths.
# This is useful when multiple files or directories need to be uploaded to the
# same bucket.
#
# Note: The basenames of each path in the source list are copied recursively
# to the root of the bucket. Thus, If the source list contains a directory,
# the entire directory is copied to the root of the bucket. For instance,
# in this example, the contents of ~/datasets are copied to
# s3://sky-multisource-storage/datasets/. ~/mydir/myfile.txt will appear
# at s3://sky-multisource-storage/myfile.txt.
/datasets-multisource-storage:
name: sky-multisource-storage2 # Make sure this name is unique or you own this bucket
source: [~/mydir/myfile.txt, ~/datasets]
run: |
pwd
ls -la /
Expand Down Expand Up @@ -255,10 +272,11 @@ Storage YAML reference

sky.Storage.source: str
The source attribute specifies the local path that must be made available
in the storage object. It can either be a local path, in which case data
is uploaded to the cloud to an appropriate object store (s3 or gcs), or it
can be a remote path (s3://, gs://), in which case it is copied or mounted
directly (see mode flag below).
in the storage object. It can either be a local path or a list of local
paths or it can be a remote path (s3://, gs://).
If the source is local, data is uploaded to the cloud to an appropriate
object store (s3 or gcs). If the path is remote, the data is copied
or mounted directly (see mode flag below).

sky.Storage.store: str; either of 's3' or 'gcs'
If you wish to force sky.Storage to be backed by a specific cloud object
Expand Down
25 changes: 18 additions & 7 deletions examples/storage_demo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@

name: storage-demo

resources:
cloud: aws

###############################################
# SkyPilot storage #
###############################################
Expand Down Expand Up @@ -40,7 +37,7 @@ resources:
# 3. You want to have a write-able path to directly write files to S3 buckets -
# specify a name (to create a bucket if it doesn't exist) and set the mode
# to MOUNT. This is useful for writing code outputs, such as checkpoints or
# logs directly to a S3 bucket.
# logs directly to a S3/GCS bucket.
#
# 4. You want to have a shared file-system across workers running on different
# nodes - specify a name (to create a bucket if it doesn't exist) and set
Expand Down Expand Up @@ -94,7 +91,7 @@ file_mounts:
# When the VM is initialized, the contents of the bucket are copied to
# /datasets-storage. If the bucket already exists, it is fetched and re-used.
/datasets-storage:
name: sky-dataset-romilzz # Make sure this name is unique or you own this bucket
name: sky-dataset-mybucket # Make sure this name is unique or you own this bucket
source: ~/datasets
store: s3 # Could be either of [s3, gcs]; default: None
persistent: True # Defaults to True, can be set to false.
Expand All @@ -110,7 +107,7 @@ file_mounts:
# other storage mounts using the same bucket with MOUNT mode. Note that the
# source is synced with the remote bucket everytime this task is run.
/dataset-storage-mount:
name: sky-dataset-romilzz
name: sky-dataset-mybucket
source: ~/datasets
mode: MOUNT

Expand All @@ -137,9 +134,23 @@ file_mounts:
# this approach can also be used to create a shared filesystem across workers.
# See examples/storage/pingpong.yaml for an example.
/outputs-mount:
name: romil-output-bucketzz
name: sky-output-bucket # Make sure this name is unique or you own this bucket
mode: MOUNT

# *** Uploading multiple files to the same Storage object ***
#
# The source field in a storage object can also be a list of local paths.
# This is useful when multiple files or directories need to be uploaded to the
# same bucket.
#
# Note: If the source list contains a directory, the entire directory is copied
# to the root of the bucket. For instance, in this example, the contents of
# ~/datasets are copied to s3://sky-multisource-storage/datasets/. ~/mydir/myfile.txt
# will appear at s3://sky-multisource-storage/myfile.txt.
/datasets-multisource-storage:
name: sky-multisource-storage # Make sure this name is unique or you own this bucket
source: [~/mydir/myfile.txt, ~/datasets]

run: |
pwd
ls -la /
Expand Down
126 changes: 125 additions & 1 deletion sky/data/data_utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
"""Miscellaneous Utils for Sky Data
"""
from typing import Any, Tuple
from multiprocessing import pool
import os
import subprocess
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple
import urllib.parse

from sky import exceptions
from sky import sky_logging
from sky.adaptors import aws, gcp
from sky.utils import ux_utils

Client = Any

logger = sky_logging.init_logger(__name__)


def split_s3_path(s3_path: str) -> Tuple[str, str]:
"""Splits S3 Path into Bucket name and Relative Path to Bucket
Expand Down Expand Up @@ -69,3 +78,118 @@ def is_cloud_store_url(url):
result = urllib.parse.urlsplit(url)
# '' means non-cloud URLs.
return result.netloc


def _group_files_by_dir(
source_list: List[str]) -> Tuple[Dict[str, List[str]], List[str]]:
"""Groups a list of paths based on their directory
Given a list of paths, generates a dict of {dir_name: List[file_name]}
which groups files with same dir, and a list of dirs in the source_list.
This is used to optimize uploads by reducing the number of calls to rsync.
E.g., ['a/b/c.txt', 'a/b/d.txt', 'a/e.txt'] will be grouped into
{'a/b': ['c.txt', 'd.txt'], 'a': ['e.txt']}, and these three files can be
uploaded in two rsync calls instead of three.
Args:
source_list: List[str]; List of paths to group
"""
grouped_files = {}
dirs = []
for source in source_list:
source = os.path.abspath(os.path.expanduser(source))
if os.path.isdir(source):
dirs.append(source)
else:
base_path = os.path.dirname(source)
file_name = os.path.basename(source)
if base_path not in grouped_files:
grouped_files[base_path] = []
grouped_files[base_path].append(file_name)
return grouped_files, dirs


def parallel_upload(source_path_list: List[Path],
filesync_command_generator: Callable[[str, List[str]], str],
dirsync_command_generator: Callable[[str, str], str],
bucket_name: str,
access_denied_message: str,
create_dirs: bool = False,
max_concurrent_uploads: Optional[int] = None) -> None:
"""Helper function to run parallel uploads for a list of paths.
Used by S3Store and GCSStore to run rsync commands in parallel by
providing appropriate command generators.
Args:
source_path_list: List of paths to local files or directories
filesync_command_generator: Callable that generates rsync command
for a list of files belonging to the same dir.
dirsync_command_generator: Callable that generates rsync command
for a directory.
access_denied_message: Message to intercept from the underlying
upload utility when permissions are insufficient. Used in
exception handling.
create_dirs: If the local_path is a directory and this is set to
False, the contents of the directory are directly uploaded to
root of the bucket. If the local_path is a directory and this is
set to True, the directory is created in the bucket root and
contents are uploaded to it.
max_concurrent_uploads: Maximum number of concurrent threads to use
to upload files.
"""
# Generate gsutil rsync command for files and dirs
commands = []
grouped_files, dirs = _group_files_by_dir(source_path_list)
# Generate file upload commands
for dir_path, file_names in grouped_files.items():
sync_command = filesync_command_generator(dir_path, file_names)
commands.append(sync_command)
# Generate dir upload commands
for dir_path in dirs:
if create_dirs:
dest_dir_name = os.path.basename(dir_path)
else:
dest_dir_name = ''
sync_command = dirsync_command_generator(dir_path, dest_dir_name)
commands.append(sync_command)

# Run commands in parallel
with pool.ThreadPool(processes=max_concurrent_uploads) as p:
p.starmap(
run_upload_cli,
zip(commands, [access_denied_message] * len(commands),
[bucket_name] * len(commands)))


def run_upload_cli(command: str, access_denied_message: str, bucket_name: str):
# TODO(zhwu): Use log_lib.run_with_log() and redirect the output
# to a log file.
with subprocess.Popen(command,
stderr=subprocess.PIPE,
stdout=subprocess.DEVNULL,
shell=True) as process:
stderr = []
while True:
line = process.stderr.readline()
if not line:
break
str_line = line.decode('utf-8')
stderr.append(str_line)
if access_denied_message in str_line:
process.kill()
with ux_utils.print_exception_no_traceback():
raise PermissionError(
'Failed to upload files to '
'the remote bucket. The bucket does not have '
'write permissions. It is possible that '
'the bucket is public.')
returncode = process.wait()
if returncode != 0:
stderr = '\n'.join(stderr)
with ux_utils.print_exception_no_traceback():
logger.error(stderr)
raise exceptions.StorageUploadError(
f'Upload to bucket failed for store {bucket_name}. '
'Please check the logs.')
Loading

0 comments on commit 5ad49cb

Please sign in to comment.