Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ray): use env for resource and deprecate deploy/undeploy #124

Merged
merged 1 commit into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion instill/helpers/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ RUN for package in ${PACKAGES}; do \
pip install --default-timeout=1000 --no-cache-dir $package; \
done;

WORKDIR /home/ray
WORKDIR /home/ray/model
COPY . .
20 changes: 9 additions & 11 deletions instill/helpers/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class VisualQuestionAnsweringInput:
extra_params: Dict[str, str] = {}


DEFAULT_RAY_ACTOR_OPRTIONS = {
DEFAULT_RAY_ACTOR_OPTIONS = {
"num_cpus": 2,
}
DEFAULT_AUTOSCALING_CONFIG = {
Expand Down Expand Up @@ -109,14 +109,12 @@ class VisualQuestionAnsweringInput:
VRAM_MINIMUM_RESERVE = 2 # GB
VRAM_UPSCALE_FACTOR = 1.25

MODEL_VRAM_OVERRIDE_LIST = {
"stable-diffusion-xl": 0.375,
"controlnet-canny": 0.375,
"llava-1-6-7b": 0.2,
"llava-1-6-13b": 0.7,
"llama2-7b-chat": 0.3,
"llama2-7b": 0.4,
"zephyr-7b": 0.4,
}

DEFAULT_DEPENDENCIES = ["protobuf==4.25.3", "grpcio-tools==1.62.0"]

ENV_MEMORY = "RAY_MEMORY"
ENV_TOTAL_VRAM = "RAY_TOTAL_VRAM"
ENV_RAY_ACCELERATOR_TYPE = "RAY_ACCELERATOR_TYPE"
ENV_NUM_OF_GPUS = "RAY_NUM_OF_GPUS"
ENV_NUM_OF_CPUS = "RAY_NUM_OF_CPUS"
ENV_NUM_OF_MIN_REPLICAS = "RAY_NUM_OF_MIN_REPLICAS"
ENV_NUM_OF_MAX_REPLICAS = "RAY_NUM_OF_MAX_REPLICAS"
167 changes: 64 additions & 103 deletions instill/helpers/ray_config.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
# pylint: disable=unused-argument
import os
from typing import Callable, Optional
from warnings import warn

import ray
from ray import serve
from ray.serve import Deployment
from ray.serve import deployment as ray_deployment

from instill.helpers.const import (
DEFAULT_AUTOSCALING_CONFIG,
DEFAULT_MAX_CONCURRENT_QUERIES,
DEFAULT_RAY_ACTOR_OPRTIONS,
DEFAULT_RUNTIME_ENV,
MODEL_VRAM_OVERRIDE_LIST,
DEFAULT_RAY_ACTOR_OPTIONS,
ENV_MEMORY,
ENV_NUM_OF_CPUS,
ENV_NUM_OF_GPUS,
ENV_NUM_OF_MAX_REPLICAS,
ENV_NUM_OF_MIN_REPLICAS,
ENV_RAY_ACCELERATOR_TYPE,
ENV_TOTAL_VRAM,
RAM_MINIMUM_RESERVE,
RAM_UPSCALE_FACTOR,
VRAM_MINIMUM_RESERVE,
Expand All @@ -24,58 +26,36 @@


class InstillDeployable:
def __init__(
self,
deployable: Deployment,
model_weight_or_folder_name: str, # kept for backward compatibility
use_gpu: bool,
) -> None:
def __init__(self, deployable: Deployment) -> None:
self._deployment: Deployment = deployable
self.use_gpu = use_gpu
# params
if use_gpu:
self.update_num_cpus(0.25)
self.update_num_gpus(0.2)
else:
self.update_num_cpus(0.25)

accelerator_type = os.getenv("RAY_ACCELERATOR_TYPE")

accelerator_type = os.getenv(ENV_RAY_ACCELERATOR_TYPE)
if accelerator_type is not None and accelerator_type != "":
self.update_accelerator_type(accelerator_type)

def update_num_cpus(self, num_cpus: float):
if self._deployment.ray_actor_options is not None:
self._deployment.ray_actor_options.update({"num_cpus": num_cpus})
num_of_gpus = os.getenv(ENV_NUM_OF_GPUS)
if num_of_gpus is not None and num_of_gpus != "":
self.update_num_gpus(float(num_of_gpus))

return self
num_of_cpus = os.getenv(ENV_NUM_OF_CPUS)
if num_of_cpus is not None and num_of_cpus != "":
self.update_num_cpus(float(num_of_cpus))

def update_memory(self, memory: float):
if self._deployment.ray_actor_options is not None:
self._deployment.ray_actor_options.update({"memory": memory})
memory = os.getenv(ENV_MEMORY)
if memory is not None and memory != "":
self.update_memory(float(memory))

return self
num_of_min_replicas = os.getenv(ENV_NUM_OF_MIN_REPLICAS)
if num_of_min_replicas is not None and num_of_min_replicas != "":
self.update_min_replicas(int(num_of_min_replicas))

def update_num_gpus(self, num_gpus: float):
if self._deployment.ray_actor_options is not None:
self._deployment.ray_actor_options.update({"num_gpus": num_gpus})
num_of_max_replicas = os.getenv(ENV_NUM_OF_MAX_REPLICAS)
if num_of_max_replicas is not None and num_of_max_replicas != "":
self.update_max_replicas(int(num_of_max_replicas))

return self

def update_accelerator_type(self, accelerator_type: str):
if self._deployment.ray_actor_options is not None:
self._deployment.ray_actor_options.update(
{"accelerator_type": accelerator_type}
)

return self

def update_num_custom_resource(self, resource_name: str, num: float):
if self._deployment.ray_actor_options is not None:
self._deployment.ray_actor_options.update(
{"resources": {resource_name: num}}
)

return self
vram = os.getenv(ENV_TOTAL_VRAM)
if vram is not None and vram != "":
self.update_num_gpus(self._determine_vram_usage(os.getcwd(), vram))

def _determine_vram_usage(self, model_path: str, total_vram: str):
warn(
Expand Down Expand Up @@ -123,6 +103,40 @@ def _determine_ram_usage(self, model_path: str):
)
raise ModelPathException

def update_num_cpus(self, num_cpus: float):
if self._deployment.ray_actor_options is not None:
self._deployment.ray_actor_options.update({"num_cpus": num_cpus})

return self

def update_memory(self, memory: float):
if self._deployment.ray_actor_options is not None:
self._deployment.ray_actor_options.update({"memory": memory})

return self

def update_num_gpus(self, num_gpus: float):
if self._deployment.ray_actor_options is not None:
self._deployment.ray_actor_options.update({"num_gpus": num_gpus})

return self

def update_accelerator_type(self, accelerator_type: str):
if self._deployment.ray_actor_options is not None:
self._deployment.ray_actor_options.update(
{"accelerator_type": accelerator_type}
)

return self

def update_num_custom_resource(self, resource_name: str, num: float):
if self._deployment.ray_actor_options is not None:
self._deployment.ray_actor_options.update(
{"resources": {resource_name: num}}
)

return self

def update_min_replicas(self, num_replicas: int):
new_autoscaling_config = DEFAULT_AUTOSCALING_CONFIG
new_autoscaling_config["min_replicas"] = num_replicas
Expand All @@ -144,66 +158,13 @@ def update_max_replicas(self, num_replicas: int):
def get_deployment_handle(self):
return self._deployment.bind()

def deploy(self, model_folder_path: str, ray_addr: str, total_vram: str):
warn(
"Deploy/Undeploy will soon be remove from the scope of SDK",
PendingDeprecationWarning,
)
if not ray.is_initialized():
ray_addr = "ray://" + ray_addr.replace("9000", "10001")
ray.init(address=ray_addr, runtime_env=DEFAULT_RUNTIME_ENV)

# /model-repository/{owner_type}/{owner_uid}/{model_id}
model_path_string_parts = model_folder_path.split("/")
application_name = "_".join(model_path_string_parts[3:])
model_name = application_name.split("_")[1]

if self.use_gpu:
if model_name in MODEL_VRAM_OVERRIDE_LIST:
self.update_num_gpus(MODEL_VRAM_OVERRIDE_LIST[model_name])
else:
self.update_num_gpus(
self._determine_vram_usage(model_folder_path, total_vram)
)
else:
self.update_memory(self._determine_ram_usage(model_folder_path))

if model_name in MODEL_VRAM_OVERRIDE_LIST:
self.update_min_replicas(1)
self.update_max_replicas(1)

serve.run(
# kept model_folder_path for backward compatibility
self._deployment.options(name=model_name).bind(model_folder_path),
name=application_name,
route_prefix=f"/{application_name}",
)

def undeploy(self, model_folder_path: str, ray_addr: str):
warn(
"Deploy/Undeploy will soon be remove from the scope of SDK",
PendingDeprecationWarning,
)
if not ray.is_initialized():
ray_addr = "ray://" + ray_addr.replace("9000", "10001")
ray.init(address=ray_addr, runtime_env=DEFAULT_RUNTIME_ENV)
# /model-repository/{owner_type}/{owner_uid}/{model_id}
model_path_string_parts = model_folder_path.split("/")
application_name = "_".join(model_path_string_parts[3:])
serve.delete(application_name)

def __call__(self):
raise RuntimeError(
"Deployments cannot be constructed directly. Use `deploy()` instead."
)


def instill_deployment(
_func_or_class: Optional[Callable] = None,
) -> Callable[[Callable], InstillDeployable]:
return ray_deployment(
_func_or_class=_func_or_class,
ray_actor_options=DEFAULT_RAY_ACTOR_OPRTIONS,
ray_actor_options=DEFAULT_RAY_ACTOR_OPTIONS,
autoscaling_config=DEFAULT_AUTOSCALING_CONFIG,
max_concurrent_queries=DEFAULT_MAX_CONCURRENT_QUERIES,
)
Loading