Skip to content

Commit

Permalink
[OCI] Support OCI Object Storage (skypilot-org#4501)
Browse files Browse the repository at this point in the history
* OCI Object Storage Support

* example yaml update

* example update

* add more example yaml

* Support RClone-RPM pkg

* Add smoke test

* ver

* smoke test

* Resolve dependancy conflict between oci-cli and runpod

* Use latest RClone version (v1.68.2)

* minor optimize

* Address review comments

* typo

* test

* sync code with repo

* Address review comments & more testing.

* address one more comment
  • Loading branch information
HysunHe authored Dec 29, 2024
1 parent 1386798 commit 7ae2d25
Show file tree
Hide file tree
Showing 11 changed files with 773 additions and 5 deletions.
35 changes: 35 additions & 0 deletions examples/oci/dataset-mount.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
name: cpu-task1

resources:
cloud: oci
region: us-sanjose-1
cpus: 2
disk_size: 256
disk_tier: medium
use_spot: False

file_mounts:
# Mount an existing oci bucket
/datasets-storage:
source: oci://skybucket
mode: MOUNT # Either MOUNT or COPY. Optional.

# Working directory (optional) containing the project codebase.
# Its contents are synced to ~/sky_workdir/ on the cluster.
workdir: .

num_nodes: 1

# Typical use: pip install -r requirements.txt
# Invoked under the workdir (i.e., can use its files).
setup: |
echo "*** Running setup for the task. ***"
# Typical use: make use of resources, such as running training.
# Invoked under the workdir (i.e., can use its files).
run: |
echo "*** Running the task on OCI ***"
timestamp=$(date +%s)
ls -lthr /datasets-storage
echo "hi" >> /datasets-storage/foo.txt
ls -lthr /datasets-storage
47 changes: 47 additions & 0 deletions examples/oci/dataset-upload-and-mount.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
name: cpu-task1

resources:
cloud: oci
region: us-sanjose-1
cpus: 2
disk_size: 256
disk_tier: medium
use_spot: False

file_mounts:
/datasets-storage:
name: skybucket # Name of storage, optional when source is bucket URI
source: ['./examples/oci'] # Source path, can be local or bucket URL. Optional, do not specify to create an empty bucket.
store: oci # E.g 'oci', 's3', 'gcs'...; default: None. Optional.
persistent: True # Defaults to True; can be set to false. Optional.
mode: MOUNT # Either MOUNT or COPY. Optional.

/datasets-storage2:
name: skybucket2 # Name of storage, optional when source is bucket URI
source: './examples/oci' # Source path, can be local or bucket URL. Optional, do not specify to create an empty bucket.
store: oci # E.g 'oci', 's3', 'gcs'...; default: None. Optional.
persistent: True # Defaults to True; can be set to false. Optional.
mode: MOUNT # Either MOUNT or COPY. Optional.

# Working directory (optional) containing the project codebase.
# Its contents are synced to ~/sky_workdir/ on the cluster.
workdir: .

num_nodes: 1

# Typical use: pip install -r requirements.txt
# Invoked under the workdir (i.e., can use its files).
setup: |
echo "*** Running setup for the task. ***"
# Typical use: make use of resources, such as running training.
# Invoked under the workdir (i.e., can use its files).
run: |
echo "*** Running the task on OCI ***"
ls -lthr /datasets-storage
echo "hi" >> /datasets-storage/foo.txt
ls -lthr /datasets-storage
ls -lthr /datasets-storage2
echo "hi" >> /datasets-storage2/foo2.txt
ls -lthr /datasets-storage2
26 changes: 26 additions & 0 deletions examples/oci/oci-mounts.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
resources:
cloud: oci

file_mounts:
~/tmpfile: ~/tmpfile
~/a/b/c/tmpfile: ~/tmpfile
/tmp/workdir: ~/tmp-workdir

/mydir:
name: skybucket
source: ['~/tmp-workdir']
store: oci
mode: MOUNT

setup: |
echo "*** Setup ***"
run: |
echo "*** Run ***"
ls -lthr ~/tmpfile
ls -lthr ~/a/b/c
echo hi >> /tmp/workdir/new_file
ls -lthr /tmp/workdir
ls -lthr /mydir
33 changes: 32 additions & 1 deletion sky/adaptors/oci.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
"""Oracle OCI cloud adaptor"""

import functools
import logging
import os

from sky.adaptors import common
from sky.clouds.utils import oci_utils

# Suppress OCI circuit breaker logging before lazy import, because
# oci modules prints additional message during imports, i.e., the
Expand All @@ -30,10 +32,16 @@ def get_config_file() -> str:

def get_oci_config(region=None, profile='DEFAULT'):
conf_file_path = get_config_file()
if not profile or profile == 'DEFAULT':
config_profile = oci_utils.oci_config.get_profile()
else:
config_profile = profile

oci_config = oci.config.from_file(file_location=conf_file_path,
profile_name=profile)
profile_name=config_profile)
if region is not None:
oci_config['region'] = region

return oci_config


Expand All @@ -54,6 +62,29 @@ def get_identity_client(region=None, profile='DEFAULT'):
return oci.identity.IdentityClient(get_oci_config(region, profile))


def get_object_storage_client(region=None, profile='DEFAULT'):
return oci.object_storage.ObjectStorageClient(
get_oci_config(region, profile))


def service_exception():
"""OCI service exception."""
return oci.exceptions.ServiceError


def with_oci_env(f):

@functools.wraps(f)
def wrapper(*args, **kwargs):
# pylint: disable=line-too-long
enter_env_cmds = [
'conda info --envs | grep "sky-oci-cli-env" || conda create -n sky-oci-cli-env python=3.10 -y',
'. $(conda info --base 2> /dev/null)/etc/profile.d/conda.sh > /dev/null 2>&1 || true',
'conda activate sky-oci-cli-env', 'pip install oci-cli',
'export OCI_CLI_SUPPRESS_FILE_PERMISSIONS_WARNING=True'
]
operation_cmd = [f(*args, **kwargs)]
leave_env_cmds = ['conda deactivate']
return ' && '.join(enter_env_cmds + operation_cmd + leave_env_cmds)

return wrapper
61 changes: 61 additions & 0 deletions sky/cloud_stores.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* Better interface.
* Better implementation (e.g., fsspec, smart_open, using each cloud's SDK).
"""
import os
import shlex
import subprocess
import time
Expand All @@ -18,6 +19,7 @@
from sky.adaptors import azure
from sky.adaptors import cloudflare
from sky.adaptors import ibm
from sky.adaptors import oci
from sky.clouds import gcp
from sky.data import data_utils
from sky.data.data_utils import Rclone
Expand Down Expand Up @@ -470,6 +472,64 @@ def make_sync_file_command(self, source: str, destination: str) -> str:
return self.make_sync_dir_command(source, destination)


class OciCloudStorage(CloudStorage):
"""OCI Cloud Storage."""

def is_directory(self, url: str) -> bool:
"""Returns whether OCI 'url' is a directory.
In cloud object stores, a "directory" refers to a regular object whose
name is a prefix of other objects.
"""
bucket_name, path = data_utils.split_oci_path(url)

client = oci.get_object_storage_client()
namespace = client.get_namespace(
compartment_id=oci.get_oci_config()['tenancy']).data

objects = client.list_objects(namespace_name=namespace,
bucket_name=bucket_name,
prefix=path).data.objects

if len(objects) == 0:
# A directory with few or no items
return True

if len(objects) > 1:
# A directory with more than 1 items
return True

object_name = objects[0].name
if path.endswith(object_name):
# An object path
return False

# A directory with only 1 item
return True

@oci.with_oci_env
def make_sync_dir_command(self, source: str, destination: str) -> str:
"""Downloads using OCI CLI."""
bucket_name, path = data_utils.split_oci_path(source)

download_via_ocicli = (f'oci os object sync --no-follow-symlinks '
f'--bucket-name {bucket_name} '
f'--prefix "{path}" --dest-dir "{destination}"')

return download_via_ocicli

@oci.with_oci_env
def make_sync_file_command(self, source: str, destination: str) -> str:
"""Downloads a file using OCI CLI."""
bucket_name, path = data_utils.split_oci_path(source)
filename = os.path.basename(path)
destination = os.path.join(destination, filename)

download_via_ocicli = (f'oci os object get --bucket-name {bucket_name} '
f'--name "{path}" --file "{destination}"')

return download_via_ocicli


def get_storage_from_path(url: str) -> CloudStorage:
"""Returns a CloudStorage by identifying the scheme:// in a URL."""
result = urllib.parse.urlsplit(url)
Expand All @@ -485,6 +545,7 @@ def get_storage_from_path(url: str) -> CloudStorage:
's3': S3CloudStorage(),
'r2': R2CloudStorage(),
'cos': IBMCosCloudStorage(),
'oci': OciCloudStorage(),
# TODO: This is a hack, as Azure URL starts with https://, we should
# refactor the registry to be able to take regex, so that Azure blob can
# be identified with `https://(.*?)\.blob\.core\.windows\.net`
Expand Down
37 changes: 37 additions & 0 deletions sky/data/data_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,40 @@ def _add_bucket_iam_member(bucket_name: str, role: str, member: str) -> None:
bucket.set_iam_policy(policy)

logger.debug(f'Added {member} with role {role} to {bucket_name}.')


def s3_to_oci(s3_bucket_name: str, oci_bucket_name: str) -> None:
"""Creates a one-time transfer from Amazon S3 to OCI Object Storage.
Args:
s3_bucket_name: str; Name of the Amazon S3 Bucket
oci_bucket_name: str; Name of the OCI Bucket
"""
# TODO(HysunHe): Implement sync with other clouds (s3, gs)
raise NotImplementedError('Moving data directly from S3 to OCI bucket '
'is currently not supported. Please specify '
'a local source for the storage object.')


def gcs_to_oci(gs_bucket_name: str, oci_bucket_name: str) -> None:
"""Creates a one-time transfer from Google Cloud Storage to
OCI Object Storage.
Args:
gs_bucket_name: str; Name of the Google Cloud Storage Bucket
oci_bucket_name: str; Name of the OCI Bucket
"""
# TODO(HysunHe): Implement sync with other clouds (s3, gs)
raise NotImplementedError('Moving data directly from GCS to OCI bucket '
'is currently not supported. Please specify '
'a local source for the storage object.')


def r2_to_oci(r2_bucket_name: str, oci_bucket_name: str) -> None:
"""Creates a one-time transfer from Cloudflare R2 to OCI Bucket.
Args:
r2_bucket_name: str; Name of the Cloudflare R2 Bucket
oci_bucket_name: str; Name of the OCI Bucket
"""
raise NotImplementedError(
'Moving data directly from Cloudflare R2 to OCI '
'bucket is currently not supported. Please specify '
'a local source for the storage object.')
11 changes: 11 additions & 0 deletions sky/data/data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -730,3 +730,14 @@ def _remove_bucket_profile_rclone(bucket_name: str,
lines_to_keep.append(line)

return lines_to_keep


def split_oci_path(oci_path: str) -> Tuple[str, str]:
"""Splits OCI Path into Bucket name and Relative Path to Bucket
Args:
oci_path: str; OCI Path, e.g. oci://imagenet/train/
"""
path_parts = oci_path.replace('oci://', '').split('/')
bucket = path_parts.pop(0)
key = '/'.join(path_parts)
return bucket, key
43 changes: 43 additions & 0 deletions sky/data/mounting_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
_BLOBFUSE_CACHE_ROOT_DIR = '~/.sky/blobfuse2_cache'
_BLOBFUSE_CACHE_DIR = ('~/.sky/blobfuse2_cache/'
'{storage_account_name}_{container_name}')
RCLONE_VERSION = 'v1.68.2'


def get_s3_mount_install_cmd() -> str:
Expand Down Expand Up @@ -158,6 +159,48 @@ def get_cos_mount_cmd(rclone_config_data: str, rclone_config_path: str,
return mount_cmd


def get_rclone_install_cmd() -> str:
""" RClone installation for both apt-get and rpm.
This would be common command.
"""
# pylint: disable=line-too-long
install_cmd = (
f'(which dpkg > /dev/null 2>&1 && (which rclone > /dev/null || (cd ~ > /dev/null'
f' && curl -O https://downloads.rclone.org/{RCLONE_VERSION}/rclone-{RCLONE_VERSION}-linux-amd64.deb'
f' && sudo dpkg -i rclone-{RCLONE_VERSION}-linux-amd64.deb'
f' && rm -f rclone-{RCLONE_VERSION}-linux-amd64.deb)))'
f' || (which rclone > /dev/null || (cd ~ > /dev/null'
f' && curl -O https://downloads.rclone.org/{RCLONE_VERSION}/rclone-{RCLONE_VERSION}-linux-amd64.rpm'
f' && sudo yum --nogpgcheck install rclone-{RCLONE_VERSION}-linux-amd64.rpm -y'
f' && rm -f rclone-{RCLONE_VERSION}-linux-amd64.rpm))')
return install_cmd


def get_oci_mount_cmd(mount_path: str, store_name: str, region: str,
namespace: str, compartment: str, config_file: str,
config_profile: str) -> str:
""" OCI specific RClone mount command for oci object storage. """
# pylint: disable=line-too-long
mount_cmd = (
f'sudo chown -R `whoami` {mount_path}'
f' && rclone config create oos_{store_name} oracleobjectstorage'
f' provider user_principal_auth namespace {namespace}'
f' compartment {compartment} region {region}'
f' oci-config-file {config_file}'
f' oci-config-profile {config_profile}'
f' && sed -i "s/oci-config-file/config_file/g;'
f' s/oci-config-profile/config_profile/g" ~/.config/rclone/rclone.conf'
f' && ([ ! -f /bin/fusermount3 ] && sudo ln -s /bin/fusermount /bin/fusermount3 || true)'
f' && (grep -q {mount_path} /proc/mounts || rclone mount oos_{store_name}:{store_name} {mount_path} --daemon --allow-non-empty)'
)
return mount_cmd


def get_rclone_version_check_cmd() -> str:
""" RClone version check. This would be common command. """
return f'rclone --version | grep -q {RCLONE_VERSION}'


def _get_mount_binary(mount_cmd: str) -> str:
"""Returns mounting binary in string given as the mount command.
Expand Down
Loading

0 comments on commit 7ae2d25

Please sign in to comment.