Skip to content

Commit

Permalink
[Storage] R2 Cloud Object Storage integration (#1736)
Browse files Browse the repository at this point in the history
* first r2 integration commit

* integrating R2 as part of TestStorageCredentials

* R2 test conditional skip in test_smoke.py

* r2 multi-file support COPY

* add r2 storage mounting test
  • Loading branch information
landscapepainter authored Mar 26, 2023
1 parent fae047f commit 01e98e8
Show file tree
Hide file tree
Showing 11 changed files with 774 additions and 27 deletions.
121 changes: 121 additions & 0 deletions sky/adaptors/cloudflare.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""Cloudflare cloud adaptors"""

# pylint: disable=import-outside-toplevel

import functools
import threading
import os

boto3 = None
botocore = None
_session_creation_lock = threading.RLock()
ACCOUNT_ID_PATH = '~/.cloudflare/accountid'
R2_PROFILE_NAME = 'r2'


def import_package(func):

@functools.wraps(func)
def wrapper(*args, **kwargs):
global boto3, botocore
if boto3 is None or botocore is None:
try:
import boto3 as _boto3
import botocore as _botocore
boto3 = _boto3
botocore = _botocore
except ImportError:
raise ImportError('Fail to import dependencies for Cloudflare.'
'Try pip install "skypilot[aws]"') from None
return func(*args, **kwargs)

return wrapper


# lru_cache() is thread-safe and it will return the same session object
# for different threads.
# Reference: https://docs.python.org/3/library/functools.html#functools.lru_cache # pylint: disable=line-too-long
@functools.lru_cache()
@import_package
def session():
"""Create an AWS session."""
# Creating the session object is not thread-safe for boto3,
# so we add a reentrant lock to synchronize the session creation.
# Reference: https://github.com/boto/boto3/issues/1592
# However, the session object itself is thread-safe, so we are
# able to use lru_cache() to cache the session object.
with _session_creation_lock:
return boto3.session.Session(profile_name=R2_PROFILE_NAME)


@functools.lru_cache()
@import_package
def resource(resource_name: str, **kwargs):
"""Create a Cloudflare resource.
Args:
resource_name: Cloudflare resource name (e.g., 's3').
kwargs: Other options.
"""
# Need to use the resource retrieved from the per-thread session
# to avoid thread-safety issues (Directly creating the client
# with boto3.resource() is not thread-safe).
# Reference: https://stackoverflow.com/a/59635814

session_ = session()
cloudflare_credentials = session_.get_credentials().get_frozen_credentials()
endpoint = create_endpoint()

return session_.resource(
resource_name,
endpoint_url=endpoint,
aws_access_key_id=cloudflare_credentials.access_key,
aws_secret_access_key=cloudflare_credentials.secret_key,
region_name='auto',
**kwargs)


@functools.lru_cache()
def client(service_name: str, region):
"""Create an CLOUDFLARE client of a certain service.
Args:
service_name: CLOUDFLARE service name (e.g., 's3').
kwargs: Other options.
"""
# Need to use the client retrieved from the per-thread session
# to avoid thread-safety issues (Directly creating the client
# with boto3.client() is not thread-safe).
# Reference: https://stackoverflow.com/a/59635814

session_ = session()
cloudflare_credentials = session_.get_credentials().get_frozen_credentials()
endpoint = create_endpoint()

return session_.client(
service_name,
endpoint_url=endpoint,
aws_access_key_id=cloudflare_credentials.access_key,
aws_secret_access_key=cloudflare_credentials.secret_key,
region_name=region)


@import_package
def botocore_exceptions():
"""AWS botocore exception."""
from botocore import exceptions
return exceptions


def create_endpoint():
"""Reads accountid necessary to interact with R2"""

accountid_path = os.path.expanduser(ACCOUNT_ID_PATH)
with open(accountid_path, 'r') as f:
lines = f.readlines()
accountid = lines[0]

accountid = accountid.strip()
endpoint = 'https://' + accountid + '.r2.cloudflarestorage.com'

return endpoint
62 changes: 61 additions & 1 deletion sky/cloud_stores.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from sky.clouds import gcp
from sky.data import data_utils
from sky.adaptors import aws
from sky.adaptors import aws, cloudflare


class CloudStorage:
Expand Down Expand Up @@ -146,6 +146,65 @@ def make_sync_file_command(self, source: str, destination: str) -> str:
return ' && '.join(all_commands)


class R2CloudStorage(CloudStorage):
"""Cloudflare Cloud Storage."""

# List of commands to install AWS CLI
_GET_AWSCLI = [
'aws --version >/dev/null 2>&1 || pip3 install awscli',
]

def is_directory(self, url: str) -> bool:
"""Returns whether R2 'url' is a directory.
In cloud object stores, a "directory" refers to a regular object whose
name is a prefix of other objects.
"""
r2 = cloudflare.resource('s3')
bucket_name, path = data_utils.split_r2_path(url)
bucket = r2.Bucket(bucket_name)

num_objects = 0
for obj in bucket.objects.filter(Prefix=path):
num_objects += 1
if obj.key == path:
return False
# If there are more than 1 object in filter, then it is a directory
if num_objects == 3:
return True

# A directory with few or no items
return True

def make_sync_dir_command(self, source: str, destination: str) -> str:
"""Downloads using AWS CLI."""
# AWS Sync by default uses 10 threads to upload files to the bucket.
# To increase parallelism, modify max_concurrent_requests in your
# aws config file (Default path: ~/.aws/config).
endpoint_url = cloudflare.create_endpoint()
if 'r2://' in source:
source = source.replace('r2://', 's3://')
download_via_awscli = ('aws s3 sync --no-follow-symlinks '
f'{source} {destination} '
f'--endpoint {endpoint_url} '
f'--profile={cloudflare.R2_PROFILE_NAME}')

all_commands = list(self._GET_AWSCLI)
all_commands.append(download_via_awscli)
return ' && '.join(all_commands)

def make_sync_file_command(self, source: str, destination: str) -> str:
"""Downloads a file using AWS CLI."""
endpoint_url = cloudflare.create_endpoint()
download_via_awscli = (f'aws s3 cp s3://{source} {destination} '
f'--endpoint {endpoint_url} '
f'--profile={cloudflare.R2_PROFILE_NAME}')

all_commands = list(self._GET_AWSCLI)
all_commands.append(download_via_awscli)
return ' && '.join(all_commands)


def get_storage_from_path(url: str) -> CloudStorage:
"""Returns a CloudStorage by identifying the scheme:// in a URL."""
result = urllib.parse.urlsplit(url)
Expand All @@ -159,4 +218,5 @@ def get_storage_from_path(url: str) -> CloudStorage:
_REGISTRY = {
'gs': GcsCloudStorage(),
's3': S3CloudStorage(),
'r2': R2CloudStorage()
}
58 changes: 58 additions & 0 deletions sky/data/data_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
TODO:
- All combinations of Azure Transfer
- All combinations of R2 Transfer
- GCS -> S3
"""
import json
Expand Down Expand Up @@ -116,6 +117,21 @@ def s3_to_gcs(s3_bucket_name: str, gs_bucket_name: str) -> None:
f'Transfer finished in {(time.time() - start) / 60:.2f} minutes.')


def s3_to_r2(s3_bucket_name: str, r2_bucket_name: str) -> None:
"""Creates a one-time transfer from Amazon S3 to Google Cloud Storage.
Can be viewed from: https://console.cloud.google.com/transfer/cloud
it will block until the transfer is complete.
Args:
s3_bucket_name: str; Name of the Amazon S3 Bucket
r2_bucket_name: str; Name of the Cloudflare R2 Bucket
"""
raise NotImplementedError('Moving data directly from clouds to R2 is '
'currently not supported. Please specify '
'a local source for the storage object.')


def gcs_to_s3(gs_bucket_name: str, s3_bucket_name: str) -> None:
"""Creates a one-time transfer from Google Cloud Storage to Amazon S3.
Expand All @@ -129,6 +145,48 @@ def gcs_to_s3(gs_bucket_name: str, s3_bucket_name: str) -> None:
subprocess.call(sync_command, shell=True)


def gcs_to_r2(gs_bucket_name: str, r2_bucket_name: str) -> None:
"""Creates a one-time transfer from Google Cloud Storage to Amazon S3.
Args:
gs_bucket_name: str; Name of the Google Cloud Storage Bucket
r2_bucket_name: str; Name of the Cloudflare R2 Bucket
"""
raise NotImplementedError('Moving data directly from clouds to R2 is '
'currently not supported. Please specify '
'a local source for the storage object.')


def r2_to_gcs(r2_bucket_name: str, gs_bucket_name: str) -> None:
"""Creates a one-time transfer from Cloudflare R2 to Google Cloud Storage.
Can be viewed from: https://console.cloud.google.com/transfer/cloud
it will block until the transfer is complete.
Args:
r2_bucket_name: str; Name of the Cloudflare R2 Bucket
gs_bucket_name: str; Name of the Google Cloud Storage Bucket
"""
raise NotImplementedError('Moving data directly from R2 to clouds is '
'currently not supported. Please specify '
'a local source for the storage object.')


def r2_to_s3(r2_bucket_name: str, s3_bucket_name: str) -> None:
"""Creates a one-time transfer from Amazon S3 to Google Cloud Storage.
Can be viewed from: https://console.cloud.google.com/transfer/cloud
it will block until the transfer is complete.
Args:
r2_bucket_name: str; Name of the Cloudflare R2 Bucket\
s3_bucket_name: str; Name of the Amazon S3 Bucket
"""
raise NotImplementedError('Moving data directly from R2 to clouds is '
'currently not supported. Please specify '
'a local source for the storage object.')


def _add_bucket_iam_member(bucket_name: str, role: str, member: str) -> None:
storage_client = gcp.storage_client()
bucket = storage_client.bucket(bucket_name)
Expand Down
36 changes: 34 additions & 2 deletions sky/data/data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from sky import exceptions
from sky import sky_logging
from sky.adaptors import aws, gcp
from sky.adaptors import aws, gcp, cloudflare
from sky.utils import ux_utils

Client = Any
Expand Down Expand Up @@ -40,6 +40,18 @@ def split_gcs_path(gcs_path: str) -> Tuple[str, str]:
return bucket, key


def split_r2_path(r2_path: str) -> Tuple[str, str]:
"""Splits R2 Path into Bucket name and Relative Path to Bucket
Args:
r2_path: str; R2 Path, e.g. r2://imagenet/train/
"""
path_parts = r2_path.replace('r2://', '').split('/')
bucket = path_parts.pop(0)
key = '/'.join(path_parts)
return bucket, key


def create_s3_client(region: str = 'us-east-2') -> Client:
"""Helper method that connects to Boto3 client for S3 Bucket
Expand Down Expand Up @@ -73,6 +85,26 @@ def verify_gcs_bucket(name: str) -> bool:
return False


def create_r2_client(region: str = 'auto') -> Client:
"""Helper method that connects to Boto3 client for R2 Bucket
Args:
region: str; Region for CLOUDFLARE R2 is set to auto
"""
return cloudflare.client('s3', region)


def verify_r2_bucket(name: str) -> bool:
"""Helper method that checks if the R2 bucket exists
Args:
name: str; Name of R2 Bucket (without r2:// prefix)
"""
r2 = cloudflare.resource('s3')
bucket = r2.Bucket(name)
return bucket in r2.buckets.all()


def is_cloud_store_url(url):
result = urllib.parse.urlsplit(url)
# '' means non-cloud URLs.
Expand Down Expand Up @@ -118,7 +150,7 @@ def parallel_upload(source_path_list: List[str],
max_concurrent_uploads: Optional[int] = None) -> None:
"""Helper function to run parallel uploads for a list of paths.
Used by S3Store and GCSStore to run rsync commands in parallel by
Used by S3Store, GCSStore, and R2Store to run rsync commands in parallel by
providing appropriate command generators.
Args:
Expand Down
Loading

0 comments on commit 01e98e8

Please sign in to comment.