From a853182b48c0a9ada8fed8ae1e6f4d6f6af24076 Mon Sep 17 00:00:00 2001 From: ChengjieLi Date: Wed, 27 Mar 2024 14:10:43 +0800 Subject: [PATCH] dev --- xinference/core/worker.py | 47 ++++++++++++++++++++++++++++++--------- 1 file changed, 37 insertions(+), 10 deletions(-) diff --git a/xinference/core/worker.py b/xinference/core/worker.py index 1486c3dc13..afa6805a60 100644 --- a/xinference/core/worker.py +++ b/xinference/core/worker.py @@ -74,7 +74,10 @@ def __init__( self._model_uid_to_model_spec: Dict[str, ModelDescription] = {} self._gpu_to_model_uid: Dict[int, str] = {} self._gpu_to_embedding_model_uids: Dict[int, Set[str]] = defaultdict(set) - self._user_specified_gpu_to_model_uids: Dict[int, Set[str]] = defaultdict(set) + # Dict structure: gpu_index: {(replica_model_uid, model_type)} + self._user_specified_gpu_to_model_uids: Dict[ + int, Set[Tuple[str, str]] + ] = defaultdict(set) self._model_uid_to_addr: Dict[str, str] = {} self._model_uid_to_recover_count: Dict[str, int] = {} self._model_uid_to_launch_args: Dict[str, Dict] = {} @@ -285,7 +288,7 @@ async def allocate_devices_for_embedding(self, model_uid: str) -> int: not has_vllm_model and _dev in self._user_specified_gpu_to_model_uids ): - for rep_uid in self._user_specified_gpu_to_model_uids[_dev]: + for rep_uid, _ in self._user_specified_gpu_to_model_uids[_dev]: has_vllm_model = await self.is_model_vllm_backend(rep_uid) if has_vllm_model: break @@ -313,9 +316,20 @@ async def allocate_devices_for_embedding(self, model_uid: str) -> int: return device def allocate_devices(self, model_uid: str, n_gpu: int) -> List[int]: - allocated_devices = set( - list(self._user_specified_gpu_to_model_uids.keys()) - + list(self._gpu_to_model_uid.keys()) + user_specified_allocated_devices: Set[int] = set() + for dev, model_infos in self._user_specified_gpu_to_model_uids.items(): + allocated_non_embedding_rerank_models = False + for _, model_type in model_infos: + allocated_non_embedding_rerank_models = model_type not in [ + "embedding", + "rerank", + ] + if allocated_non_embedding_rerank_models: + break + if not allocated_non_embedding_rerank_models: + user_specified_allocated_devices.add(dev) + allocated_devices = set(self._gpu_to_model_uid.keys()).union( + user_specified_allocated_devices ) if n_gpu > len(self._total_gpu_devices) - len(allocated_devices): raise RuntimeError("No available slot found for the model") @@ -324,7 +338,7 @@ def allocate_devices(self, model_uid: str, n_gpu: int) -> List[int]: dev for dev in self._total_gpu_devices if dev not in self._gpu_to_model_uid - and dev not in self._user_specified_gpu_to_model_uids + and dev not in user_specified_allocated_devices ][:n_gpu] for dev in devices: self._gpu_to_model_uid[int(dev)] = model_uid @@ -332,7 +346,7 @@ def allocate_devices(self, model_uid: str, n_gpu: int) -> List[int]: return sorted(devices) async def allocate_devices_with_gpu_idx( - self, model_uid: str, gpu_idx: List[int] + self, model_uid: str, model_type: str, gpu_idx: List[int] ) -> List[int]: """ When user specifies the gpu_idx, allocate models on user-specified GPUs whenever possible @@ -360,7 +374,7 @@ async def allocate_devices_with_gpu_idx( # If user has run the vLLM model on the GPU that was forced to be specified, # it is not possible to force this GPU to be allocated again if idx in self._user_specified_gpu_to_model_uids: - for rep_uid in self._user_specified_gpu_to_model_uids[idx]: + for rep_uid, _ in self._user_specified_gpu_to_model_uids[idx]: is_vllm_model = await self.is_model_vllm_backend(rep_uid) if is_vllm_model: raise RuntimeError( @@ -375,7 +389,7 @@ async def allocate_devices_with_gpu_idx( ) for idx in gpu_idx: - self._user_specified_gpu_to_model_uids[idx].add(model_uid) + self._user_specified_gpu_to_model_uids[idx].add((model_uid, model_type)) return sorted(gpu_idx) def release_devices(self, model_uid: str): @@ -392,6 +406,17 @@ def release_devices(self, model_uid: str): if model_uid in self._gpu_to_embedding_model_uids[dev]: self._gpu_to_embedding_model_uids[dev].remove(model_uid) + # check user-specified slots + for dev in self._user_specified_gpu_to_model_uids: + model_infos = list( + filter( + lambda x: x[0] == model_uid, + self._user_specified_gpu_to_model_uids[dev], + ) + ) + for model_info in model_infos: + self._user_specified_gpu_to_model_uids[dev].remove(model_info) + async def _create_subpool( self, model_uid: str, @@ -417,7 +442,9 @@ async def _create_subpool( logger.debug(f"GPU disabled for model {model_uid}") else: assert isinstance(gpu_idx, list) - devices = await self.allocate_devices_with_gpu_idx(model_uid, gpu_idx) + devices = await self.allocate_devices_with_gpu_idx( + model_uid, model_type, gpu_idx # type: ignore + ) env["CUDA_VISIBLE_DEVICES"] = ",".join([str(dev) for dev in devices]) if os.name != "nt" and platform.system() != "Darwin":