From 3be037bf8b2b1682cd9457c83318c42f53fc13f0 Mon Sep 17 00:00:00 2001 From: wangjian Date: Thu, 8 Aug 2024 20:45:41 +0800 Subject: [PATCH 01/17] add lazyllm before group --- README.CN.md | 4 ++-- README.md | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/README.CN.md b/README.CN.md index 8942563c..6aca225d 100644 --- a/README.CN.md +++ b/README.CN.md @@ -269,9 +269,9 @@ def test(input): def test_cmd(input): return f'echo input is {input}' -# >>> demo.test()(1) +# >>> lazyllm.demo.test()(1) # 'input is 1' -# >>> demo.test_cmd(launcher=launchers.slurm)(2) +# >>> lazyllm.demo.test_cmd(launcher=launchers.slurm)(2) # Command: srun -p pat_rd -N 1 --job-name=xf488db3 -n1 bash -c 'echo input is 2' ``` diff --git a/README.md b/README.md index 57dbd723..34533877 100644 --- a/README.md +++ b/README.md @@ -276,9 +276,9 @@ def test(input): def test_cmd(input): return f'echo input is {input}' -# >>> demo.test()(1) +# >>> lazyllm.demo.test()(1) # 'input is 1' -# >>> demo.test_cmd(launcher=launchers.slurm)(2) +# >>> lazyllm.demo.test_cmd(launcher=launchers.slurm)(2) # Command: srun -p pat_rd -N 1 --job-name=xf488db3 -n1 bash -c 'echo input is 2' ``` From da52fdcfdeeccc96235e1e046adedea8354d1db3 Mon Sep 17 00:00:00 2001 From: wangjian Date: Wed, 20 Nov 2024 11:13:38 +0800 Subject: [PATCH 02/17] add kubernetes launcher --- lazyllm/launcher.py | 697 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 696 insertions(+), 1 deletion(-) diff --git a/lazyllm/launcher.py b/lazyllm/launcher.py index 28a82dc7..230b999a 100644 --- a/lazyllm/launcher.py +++ b/lazyllm/launcher.py @@ -18,6 +18,12 @@ import lazyllm from lazyllm import LazyLLMRegisterMetaClass, LazyLLMCMD, final, timeout, LOG +from kubernetes import client, config +from kubernetes.client.rest import ApiException +import requests +import yaml +from typing import Literal + class Status(Enum): TBSubmitted = 0, InQueue = 1 @@ -74,6 +80,8 @@ def clone(self): lazyllm.config.add('sco_keep_record', bool, False, 'SCO_KEEP_RECORD') lazyllm.config.add('sco_resource_type', str, 'N3lS.Ii.I60', 'SCO_RESOURCE_TYPE') lazyllm.config.add('cuda_visible', bool, False, 'CUDA_VISIBLE') +lazyllm.config.add('k8s_env_name', str, '', 'K8S_ENV_NAME') +lazyllm.config.add('k8s_config_path', str, '', 'K8S_CONFIG_PATH') # store cmd, return message and command output. @@ -175,6 +183,693 @@ def _generate_name(self): def __deepcopy__(self, memo=None): raise RuntimeError('Cannot copy Job object') +@final +class K8sLauncher(LazyLLMLaunchersBase): + all_processes = defaultdict(list) + namespace = "default" + + class Job(Job): + def __init__(self, cmd, launcher, *, sync=True): + super().__init__(cmd, launcher, sync=sync) + self.cmd = cmd + self.launcher = launcher + self.sync = sync + self.deployment_name = f"deployment-{uuid.uuid4().hex[:8]}" + self.namespace = launcher.namespace + self.volume_config = launcher.volume_config + self.gateway_name = launcher.gateway_name + self.deployment_port = 8080 + self.host = launcher.http_host + self.path = launcher.http_path + self.svc_type = launcher.svc_type + self.gateway_retry = launcher.gateway_retry + self.image = launcher.image + self.resource_config = launcher.resource_config + + def _wrap_cmd(self, cmd): + pythonpath = os.getenv("PYTHONPATH", '') + precmd = (f'''export PYTHONPATH={os.getcwd()}:{pythonpath}:$PYTHONPATH ''' + f'''&& export PATH={os.path.join(os.path.expanduser('~'), '.local/bin')}:$PATH &&''') + if lazyllm.config['k8s_env_name']: + precmd = f"source activate {lazyllm.config['k8s_env_name']} && " + precmd + env_vars = os.environ + lazyllm_vars = {k: v for k, v in env_vars.items() if k.startswith("LAZYLLM")} + if lazyllm_vars: + precmd += " && ".join(f"export {k}={v}" for k, v in lazyllm_vars.items()) + " && " + precmd += '''ifconfig | grep "inet " | awk "{printf \\"LAZYLLMIP %s\\\\n\\", \$2}" &&''' # noqa W605 + LOG.info(f"precmd: {precmd}\n cmd: {cmd}") + port_match = re.search(r"--open_port=(\d+)", cmd) + if port_match: + port = port_match.group(1) + LOG.info(f"Port: {port}") + self.deployment_port = int(port) + else: + LOG.info("Port not found") + raise ValueError("Failed to obtain application port.") + return precmd + " " + cmd + + def _create_deployment_spec(self, cmd, volume_config=None): + container = client.V1Container( + name=self.deployment_name, + image=self.image, + command=["bash", "-c", cmd], + resources=client.V1ResourceRequirements( + requests=self.resource_config.get("requests", {"cpu": "2", "memory": "16Gi"}), + limits=self.resource_config.get("requests", {"cpu": "2", "memory": "16Gi"}) + ), + volume_mounts=[] if not volume_config else [ + client.V1VolumeMount( + mount_path=volume_config['mount_path'], + name=volume_config.get("name", "lazyllm-k8s") + ) + ] + ) + + volumes = [] + if volume_config: + volumes.append( + client.V1Volume( + name=volume_config.get("name", "lazyllm-k8s"), + host_path=client.V1HostPathVolumeSource( + path=volume_config["host_path"], + type="Directory" + ) + ) + ) + template = client.V1PodTemplateSpec( + metadata=client.V1ObjectMeta(labels={"app": self.deployment_name}), + spec=client.V1PodSpec(restart_policy="Always", containers=[container], volumes=volumes) + ) + deployment_spec = client.V1DeploymentSpec( + replicas=1, + template=template, + selector=client.V1LabelSelector(match_labels={"app": self.deployment_name}) + ) + return client.V1Deployment( + api_version="apps/v1", + kind="Deployment", + metadata=client.V1ObjectMeta(name=self.deployment_name), + spec=deployment_spec + ) + + def _create_deployment(self, *, fixed=False): + api_instance = client.AppsV1Api() + cmd = self.get_executable_cmd(fixed=fixed) + deployment = self._create_deployment_spec(cmd.cmd, self.volume_config) + try: + api_instance.create_namespaced_deployment( + body=deployment, + namespace=self.namespace + ) + LOG.info(f"Kubernetes Deployment '{self.deployment_name}' created successfully.") + except ApiException as e: + LOG.error(f"Exception when creating Kubernetes Deployment: {e}") + raise + + def _delete_deployment(self): + config.load_kube_config(self.launcher.kube_config_path) + api_instance = client.AppsV1Api() + try: + api_instance.delete_namespaced_deployment( + name=self.deployment_name, + namespace=self.namespace, + body=client.V1DeleteOptions(propagation_policy="Foreground") + ) + LOG.info(f"Kubernetes Deployment {self.deployment_name} deleted.") + except ApiException as e: + LOG.error(f"Exception when deleting Kubernetes Deployment: {e}") + + def _expose_deployment(self): + api_instance = client.CoreV1Api() + service = client.V1Service( + api_version="v1", + kind="Service", + metadata=client.V1ObjectMeta(name=f"service-{self.deployment_name}"), + spec=client.V1ServiceSpec( + selector={"app": self.deployment_name}, + ports=[client.V1ServicePort(port=self.deployment_port, target_port=self.deployment_port)], + type="ClusterIP" + ) + ) + try: + api_instance.create_namespaced_service( + namespace=self.namespace, + body=service + ) + LOG.info(f"Kubernetes Service 'service-{self.deployment_name}' created and exposed successfully.") + except ApiException as e: + LOG.info(f"Exception when creating Service: {e}") + raise + + def _delete_service(self): + config.load_kube_config(self.launcher.kube_config_path) + svc_instance = client.CoreV1Api() + try: + svc_instance.delete_namespaced_service( + name=f"service-{self.deployment_name}", + namespace=self.namespace, + body=client.V1DeleteOptions(propagation_policy="Foreground") + ) + LOG.info(f"Kubernetes Service 'service-{self.deployment_name}' deleted.") + except ApiException as e: + LOG.error(f"Exception when deleting Kubernetes Service: {e}") + + def _create_or_update_gateway(self): + networking_api = client.CustomObjectsApi() + gateway_spec = { + "apiVersion": "gateway.networking.k8s.io/v1beta1", + "kind": "Gateway", + "metadata": { + "name": self.gateway_name, + "namespace": self.namespace, + "annotations": { + "networking.istio.io/service-type": self.svc_type + } + }, + "spec": { + "gatewayClassName": "istio", + "listeners": [ + { + "name": f"httproute-{self.deployment_name}", + "port": self.deployment_port, + "protocol": "HTTP", + } + ] + } + } + + try: + existing_gateway = networking_api.get_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1beta1", + namespace=self.namespace, + plural="gateways", + name=self.gateway_name + ) + + existing_gateway['spec']["listeners"].extend(gateway_spec['spec']["listeners"]) + networking_api.replace_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1beta1", + namespace=self.namespace, + plural="gateways", + name=self.gateway_name, + body=existing_gateway + ) + LOG.info(f"Kubernetes Gateway '{self.gateway_name}' updated successfully.") + except ApiException as e: + if e.status == 404: + try: + networking_api.create_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1beta1", + namespace=self.namespace, + plural="gateways", + body=gateway_spec + ) + LOG.info(f"Kubernetes Gateway '{self.gateway_name}' created successfully.") + except ApiException as e_create: + LOG.error(f"Exception when creating Gateway: {e_create}") + else: + LOG.error(f"Exception when updating Gateway: {e}") + + def _delete_or_update_gateway(self): + config.load_kube_config(self.launcher.kube_config_path) + gateway_instance = client.CustomObjectsApi() + try: + gateway = gateway_instance.get_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1beta1", + namespace=self.namespace, + plural="gateways", + name=self.gateway_name + ) + + listeners = gateway['spec']['listeners'] + gateway['spec']['listeners'] = [ + listener for listener in listeners if listener['name'] != f"httproute-{self.deployment_name}" + ] + + if gateway['spec']['listeners']: + gateway_instance.replace_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1beta1", + namespace=self.namespace, + plural="gateways", + name=self.gateway_name, + body=gateway + ) + LOG.info(f"Kubernetes Gateway '{self.gateway_name}' deleted updated.") + else: + gateway_instance.delete_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1beta1", + namespace=self.namespace, + plural="gateways", + name=self.gateway_name + ) + LOG.info(f"Kubernetes Gateway '{self.gateway_name}' deleted.") + except ApiException as e: + LOG.error(f"Exception when deleting or updating Gateway: {e}") + + def _create_httproute(self): + custom_api = client.CustomObjectsApi() + + httproute_spec = { + "apiVersion": "gateway.networking.k8s.io/v1beta1", + "kind": "HTTPRoute", + "metadata": { + "name": f"httproute-{self.deployment_name}", + "namespace": self.namespace + }, + "spec": { + "parentRefs": [{ + "name": self.gateway_name, + "port": self.deployment_port + }], + "rules": [{ + "matches": [{ + "path": { + "type": "PathPrefix", + "value": self.path + } + }], + "backendRefs": [{ + "name": f"service-{self.deployment_name}", + "port": self.deployment_port + }] + }] + } + } + + if self.host: + httproute_spec["spec"]["hostnames"] = [self.host] + + try: + custom_api.create_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1beta1", + namespace=self.namespace, + plural="httproutes", + body=httproute_spec + ) + LOG.info(f"Kubernetes HTTPRoute 'httproute-{self.deployment_name}' created successfully.") + except ApiException as e: + LOG.error(f"Exception when creating HTTPRoute: {e}") + + def _delete_httproute(self): + config.load_kube_config(self.launcher.kube_config_path) + httproute_instance = client.CustomObjectsApi() + try: + httproute_instance.delete_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1beta1", + namespace=self.namespace, + plural="httproutes", + name=f"httproute-{self.deployment_name}" + ) + LOG.info(f"Kubernetes HTTPRoute 'httproute-{self.deployment_name}' delete.") + except ApiException as e: + LOG.error(f"Exception when deleting HTTPRoute: {e}") + + def _start(self, *, fixed=False): + self._create_deployment(fixed=fixed) + self._expose_deployment() + self._create_or_update_gateway() + self._create_httproute() + self.jobid = self._get_jobid() + self.launcher.all_processes[self.launcher._id].append((self.jobid, self)) + # if self.sync: + ret = self.wait() + LOG.info(ret) + + def start(self, *, restart=3, fixed=False): + self._start(fixed=fixed) + if not (lazyllm.config['mode'] == lazyllm.Mode.Display or self._fixed_cmd.checkf(self)): + if restart > 0: + for _ in range(restart): + self.restart(fixed=fixed) + if self._fixed_cmd.checkf(self): break + else: + raise RuntimeError(f'Job failed after retrying {restart} times') + else: + raise RuntimeError('Job failed without retries') + self._set_return_value() + + def stop(self): + self._delete_or_update_gateway() + self._delete_httproute() + self._delete_service() + self._delete_deployment() + + def _get_jobid(self): + return f"service-{self.deployment_name}" + + def _get_gateway_service_name(self): + core_api = client.CoreV1Api() + try: + services = core_api.list_namespaced_service(namespace=self.namespace) + + for service in services.items: + labels = service.metadata.labels + if labels and ("gateway" in labels.get("app", "") or self.gateway_name in service.metadata.name): + LOG.info(f"Kubernetes Gateway service name: {service.metadata.name}") + return service.metadata.name + + LOG.warning("No Service was found corresponding to the specified Gateway.") + return None + except ApiException as e: + LOG.error(f"Exception when retrieving Gateway Service: {e}") + return None + + def _get_gateway_deployment_name(self): + core_api = client.CoreV1Api() + apps_v1 = client.AppsV1Api() + + gateway_service_name = self._get_gateway_service_name() + try: + service = core_api.read_namespaced_service(gateway_service_name, self.namespace) + selector = service.spec.selector + if selector: + label_selector = ",".join(f"{k}={v}" for k, v in selector.items()) + deployments = apps_v1.list_namespaced_deployment(self.namespace, label_selector=label_selector) + if deployments.items: + deployments = [dep.metadata.name for dep in deployments.items] + LOG.info(f"Kubernetes Gateway deployment name: {deployments}") + return deployments + else: + LOG.warning(f"No Deployment found for Gateway '{self.gateway_name}' in namespace " + f"'{self.namespace}'.") + return None + else: + LOG.warning(f"Kubernetes Service '{gateway_service_name}' does not have a selector.") + return None + except ApiException as e: + LOG.error(f"Error fetching Service '{gateway_service_name}': {e}") + return None + + def _get_gateway_ip(self): + core_api = client.CoreV1Api() + gateway_service_name = self._get_gateway_service_name() + if gateway_service_name is None: + raise ValueError("Kubernetes Gateway service name not found.") + try: + service = core_api.read_namespaced_service( + name=gateway_service_name, + namespace=self.namespace + ) + + if service.spec.type == "LoadBalancer": + if service.status.load_balancer.ingress: + ip = service.status.load_balancer.ingress[0].ip + return ip + else: + LOG.warning("The LoadBalancer IP has not been assigned yet.") + return None + elif service.spec.type == "NodePort": + nodes = core_api.list_node() + node_ip = nodes.items[0].status.addresses[0].address + return node_ip + elif service.spec.type == "ClusterIP": + return service.spec.cluster_ip + else: + LOG.warning("Unsupported Service type.") + return None + except ApiException as e: + LOG.error(f"Exception when retrieving gateway IP: {e}") + return None + + def _get_httproute_host(self): + custom_api = client.CustomObjectsApi() + try: + httproute = custom_api.get_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1beta1", + namespace=self.namespace, + plural="httproutes", + name=f"httproute-{self.deployment_name}" + ) + + hostnames = httproute.get("spec", {}).get("hostnames", []) + if hostnames: + return hostnames[0] + else: + LOG.warning("Kubernetes HTTPRoute has no configured hostnames.") + return None + except ApiException as e: + LOG.error(f"Exception when retrieving HTTPRoute host: {e}") + return None + + def get_jobip(self): + host = self._get_httproute_host() + ip = self._get_gateway_ip() + LOG.info(f"gateway ip: {ip}, hostname: {host}") + return host if host else ip + + def wait_for_deployment_ready(self, timeout=300): + api_instance = client.AppsV1Api() + start_time = time.time() + while time.time() - start_time < timeout: + try: + deployment_status = api_instance.read_namespaced_deployment_status( + name=self.deployment_name, + namespace=self.namespace + ).status + if deployment_status.available_replicas and deployment_status.available_replicas > 0: + LOG.info(f"Kubernetes Deployment '{self.deployment_name}' is running.") + return True + time.sleep(2) + except ApiException as e: + LOG.error(f"Exception when reading Deployment status: {e}") + raise + LOG.warning(f"Timed out waiting for Deployment '{self.deployment_name}' to be ready.") + return False + + def wait_for_service_ready(self, timeout=300): + svc_instance = client.CoreV1Api() + start_time = time.time() + while time.time() - start_time < timeout: + try: + service = svc_instance.read_namespaced_service( + name=f"service-{self.deployment_name}", + namespace=self.namespace + ) + if service.spec.type == "LoadBalancer" and service.status.load_balancer.ingress: + ip = service.status.load_balancer.ingress[0].ip + LOG.info(f"Kubernetes Service 'service-{self.deployment_name}' is ready with IP: {ip}") + return ip + elif service.spec.cluster_ip: + LOG.info(f"Kubernetes Service 'service-{self.deployment_name}' is ready with ClusterIP: " + f"{service.spec.cluster_ip}") + return service.spec.cluster_ip + elif service.spec.type == "NodePort": + node_ports = [p.node_port for p in service.spec.ports] + if node_ports: + nodes = svc_instance.list_node() + for node in nodes.items: + for address in node.status.addresses: + if address.type == "InternalIP": + node_ip = address.address + LOG.info(f"Kubernetes Service 'service-{self.deployment_name}' is ready on " + f"NodePort(s): {node_ports} at Node IP: {node_ip}") + return {"ip": node_ip, "ports": node_ports} + elif address.type == "ExternalIP": + node_ip = address.address + LOG.info(f"Kubernetes Service 'service-{self.deployment_name}' is ready on " + f"NodePort(s): {node_ports} at External Node IP: {node_ip}") + return {"ip": node_ip, "ports": node_ports} + LOG.info(f"Kubernetes Service 'service-{self.deployment_name}' is not ready yet. Retrying...") + time.sleep(2) + except ApiException as e: + LOG.error(f"Exception when reading Service status: {e}") + raise + LOG.warning(f"Timed out waiting for Service 'service-{self.deployment_name}' to be ready.") + return None + + def _is_gateway_ready(self, timeout): + url = f"http://{self.get_jobip()}:{self.deployment_port}{self.path}" + for i in range(self.gateway_retry): + try: + response = requests.get(url, timeout=timeout) + if response.status_code != 503: + LOG.info(f"Kubernetes Service is ready at '{url}'") + self.queue.put(f"Uvicorn running on {url}") + return True + else: + LOG.info(f"Kubernetes Service at '{url}' returned status code {response.status_code}") + except requests.RequestException as e: + LOG.error(f"Failed to access service at '{url}': {e}") + time.sleep(timeout) + + self.queue.put(f"ERROR: Kubernetes Service failed to start on '{url}'.") + return False + + def wait_for_gateway(self, timeout=300, interval=5): # noqa: C901 + core_v1 = client.CoreV1Api() + apps_v1 = client.AppsV1Api() + gateway_service_name = self._get_gateway_service_name() + gateway_deployment_names = self._get_gateway_deployment_name() + service_ready = False + deployment_ready = False + + start_time = time.time() + while time.time() - start_time < timeout: + if not service_ready: + try: + service = core_v1.read_namespaced_service(gateway_service_name, self.namespace) + if service.spec.type in ["NodePort", "LoadBalancer"]: + if service.spec.type == "LoadBalancer": + if service.status.load_balancer.ingress: + LOG.info(f"Kubernetes Service '{gateway_service_name}' is ready with " + "LoadBalancer IP.") + service_ready = True + else: + LOG.info(f"Kubernetes Service '{gateway_service_name}' LoadBalancer IP " + "not available yet.") + service_ready = False + else: + if any(port.node_port for port in service.spec.ports): + LOG.info(f"Kubernetes Service '{gateway_service_name}' is ready with " + "NodePort configuration.") + service_ready = True + else: + LOG.info(f"Kubernetes Service '{gateway_service_name}' NodePort not assigned yet.") + service_ready = False + else: + LOG.error(f"Unexpected Kubernetes Service type: {service.spec.type}.") + service_ready = False + except ApiException as e: + LOG.error(f"Kubernetes Service '{gateway_service_name}' not found yet: {e}") + service_ready = False + if not deployment_ready: + for deployment_name in gateway_deployment_names: + try: + deployment = apps_v1.read_namespaced_deployment(deployment_name, self.namespace) + if deployment.status.available_replicas and deployment.status.available_replicas > 0: + LOG.info(f"Kubernetes Deployment '{deployment_name}' is ready with " + f"{deployment.status.available_replicas} replicas.") + deployment_ready = True + break + else: + LOG.info(f"Kubernetes Deployment '{deployment_name}' is not fully ready yet.") + deployment_ready = False + except ApiException as e: + LOG.warning(f"Kubernetes Deployment '{deployment_name}' not found yet: {e}") + deployment_ready = False + + if service_ready and deployment_ready and self._is_gateway_ready(timeout=interval): + LOG.info(f"Kubernetes Gateway '{self.gateway_name}' is fully ready.") + return True + + time.sleep(interval) + + LOG.error(f"Kubernetes Gateway '{self.gateway_name}' failed to become ready with {timeout} seconds.") + return False + + def wait_for_httproute(self, timeout=300): + custom_api = client.CustomObjectsApi() + start_time = time.time() + while time.time() - start_time < timeout: + try: + httproutes = custom_api.list_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1alpha2", + namespace=self.namespace, + plural="httproutes" + ).get('items', []) + + for httproute in httproutes: + if httproute['metadata']['name'] == f"httproute-{self.deployment_name}": + LOG.info(f"Kubernetes HTTPRoute 'httproute-{self.deployment_name}' is ready.") + return True + LOG.info(f"Waiting for HTTPRoute 'httproute-{self.deployment_name}' to be ready...") + except ApiException as e: + LOG.error(f"Exception when checking HTTPRoute status: {e}") + + time.sleep(2) + LOG.warning(f"Timeout waiting for HTTPRoute 'httproute-{self.deployment_name}' to be ready.") + return False + + def wait(self): + deployment_ready = self.wait_for_deployment_ready() + if not deployment_ready: + raise TimeoutError("Kubernetes Deployment did not become ready in time.") + + service_ip = self.wait_for_service_ready() + if not service_ip: + raise TimeoutError("Kubernetes Service did not become ready in time.") + + httproute_ready = self.wait_for_httproute() + if not httproute_ready: + raise TimeoutError("Kubernetes Httproute did not become ready in time.") + + gateway_ready = self.wait_for_gateway() + if not gateway_ready: + raise TimeoutError("Kubernetes Gateway did not become ready in time.") + + return {"deployment": Status.Running, "service_ip": service_ip, + "gateway": Status.Running, "httproute": Status.Running} + + @property + def status(self): + api_instance = client.AppsV1Api() + try: + deployment_status = api_instance.read_namespaced_deployment_status( + name=self.deployment_name, + namespace=self.namespace + ).status + if deployment_status.available_replicas and deployment_status.available_replicas > 0: + return Status.Running + else: + return Status.Pending + except ApiException as e: + LOG.error(f"Exception when reading Deployment status: {e}") + return Status.Failed + + def __init__(self, kube_config_path=None, volume_config=None, image=None, resource_config=None, + namespace="default", gateway_name="lazyllm-gateway", host=None, path='/generate', + svc_type: Literal["LoadBalancer", "NodePort", "ClusterIP"] = "LoadBalancer", retry=3, + sync=True, ngpus=None, **kwargs): + super().__init__() + self.gateway_retry = retry + self.sync = sync + self.ngpus = ngpus + config_data = self._read_config_file(lazyllm.config['k8s_config_path']) + self.volume_config = volume_config if volume_config else config_data.get('volume', {}) + self.image = image if image else config_data.get('container_image', "lazyllm/lazyllm:k8s_launcher") + self.resource_config = resource_config if resource_config else config_data.get('resource', {}) + self.kube_config_path = kube_config_path if kube_config_path \ + else config_data.get('kube_config_path', "~/.kube/config") + self.svc_type = config_data.get("svc_type") if config_data.get("svc_type", None) else svc_type + self.namespace = config_data.get("namespace") if config_data.get("namespace", None) else namespace + self.gateway_name = config_data.get("gateway_name") if config_data.get("gateway_name", None) else gateway_name + self.http_host = config_data.get("host") if config_data.get("host", None) else host + self.http_path = config_data.get("path") if config_data.get("path", None) else path + + def _read_config_file(self, file_path): + assert os.path.isabs(file_path), "Resource config file path must be an absolute path." + with open(file_path, 'r') as fp: + try: + data = yaml.safe_load(fp) + return data + except yaml.YAMLError as e: + LOG.error(f"Exception when reading resource configuration file: {e}") + raise ValueError("Kubernetes resource configuration file format error.") + + def makejob(self, cmd): + config.load_kube_config(self.kube_config_path) + return K8sLauncher.Job(cmd, launcher=self, sync=self.sync) + + def launch(self, f, *args, **kw): + if isinstance(f, K8sLauncher.Job): + f.start() + LOG.info("launch come here.") + self.job = f + return f.return_value + elif callable(f): + LOG.info("Async execution in Kubernetes is not supported currently.") + raise RuntimeError("Kubernetes launcher requires a Deployment object.") + @final class EmptyLauncher(LazyLLMLaunchersBase): all_processes = defaultdict(list) @@ -645,7 +1340,7 @@ def __new__(cls, *args, sync=False, ngpus=1, **kwargs): def cleanup(): # empty - for m in (EmptyLauncher, SlurmLauncher, ScoLauncher): + for m in (EmptyLauncher, SlurmLauncher, ScoLauncher, K8sLauncher): while m.all_processes: _, vs = m.all_processes.popitem() for k, v in vs: From 3d6fd0f6fcf4dc093ed2e94d74b83679430ea273 Mon Sep 17 00:00:00 2001 From: wangjian Date: Wed, 20 Nov 2024 11:24:49 +0800 Subject: [PATCH 03/17] Delete comments --- lazyllm/launcher.py | 1 - 1 file changed, 1 deletion(-) diff --git a/lazyllm/launcher.py b/lazyllm/launcher.py index 230b999a..4a4a57e5 100644 --- a/lazyllm/launcher.py +++ b/lazyllm/launcher.py @@ -499,7 +499,6 @@ def _start(self, *, fixed=False): self._create_httproute() self.jobid = self._get_jobid() self.launcher.all_processes[self.launcher._id].append((self.jobid, self)) - # if self.sync: ret = self.wait() LOG.info(ret) From e779b63b3a7ded97374568d68938c854ebfe5624 Mon Sep 17 00:00:00 2001 From: wangjian Date: Wed, 20 Nov 2024 12:01:51 +0800 Subject: [PATCH 04/17] =?UTF-8?q?1=E3=80=81modify=20launcher=20config=20fi?= =?UTF-8?q?le;=202=E3=80=81add=20test=20units?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lazyllm/launcher.py | 17 +++++++++-------- tests/basic_tests/test_launcher.py | 15 +++++++++++++++ 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/lazyllm/launcher.py b/lazyllm/launcher.py index 4a4a57e5..f0b9d8a9 100644 --- a/lazyllm/launcher.py +++ b/lazyllm/launcher.py @@ -826,24 +826,25 @@ def status(self): return Status.Failed def __init__(self, kube_config_path=None, volume_config=None, image=None, resource_config=None, - namespace="default", gateway_name="lazyllm-gateway", host=None, path='/generate', - svc_type: Literal["LoadBalancer", "NodePort", "ClusterIP"] = "LoadBalancer", retry=3, + namespace=None, gateway_name=None, host=None, path=None, + svc_type: Literal["LoadBalancer", "NodePort", "ClusterIP"] = None, retry=3, sync=True, ngpus=None, **kwargs): super().__init__() self.gateway_retry = retry self.sync = sync self.ngpus = ngpus - config_data = self._read_config_file(lazyllm.config['k8s_config_path']) + config_data = self._read_config_file(lazyllm.config['k8s_config_path']) if lazyllm.config['k8s_config_path'] \ + else {} self.volume_config = volume_config if volume_config else config_data.get('volume', {}) self.image = image if image else config_data.get('container_image', "lazyllm/lazyllm:k8s_launcher") self.resource_config = resource_config if resource_config else config_data.get('resource', {}) self.kube_config_path = kube_config_path if kube_config_path \ else config_data.get('kube_config_path', "~/.kube/config") - self.svc_type = config_data.get("svc_type") if config_data.get("svc_type", None) else svc_type - self.namespace = config_data.get("namespace") if config_data.get("namespace", None) else namespace - self.gateway_name = config_data.get("gateway_name") if config_data.get("gateway_name", None) else gateway_name - self.http_host = config_data.get("host") if config_data.get("host", None) else host - self.http_path = config_data.get("path") if config_data.get("path", None) else path + self.svc_type = svc_type if svc_type else config_data.get("svc_type", "LoadBalancer") + self.namespace = namespace if namespace else config_data.get("namespace", "default") + self.gateway_name = gateway_name if gateway_name else config_data.get("gateway_name", "lazyllm-gateway") + self.http_host = host if host else config_data.get("host", None) + self.http_path = path if path else config_data.get("path", '/generate') def _read_config_file(self, file_path): assert os.path.isabs(file_path), "Resource config file path must be an absolute path." diff --git a/tests/basic_tests/test_launcher.py b/tests/basic_tests/test_launcher.py index 53223aed..a449ed0f 100644 --- a/tests/basic_tests/test_launcher.py +++ b/tests/basic_tests/test_launcher.py @@ -30,6 +30,14 @@ def test_sco(self): ) assert launcher.partition == 'pat_rd' + def test_k8s(self): + launcher = launchers.k8s( + kube_config_path="~/.kube/config", + namespace="lazyllm", + host="myapp.lazyllm.com" + ) + assert launcher.namespace == "lazyllm" + def test_remote(self): # empty launcher origin_launcher = lazyllm.config.impl['launcher'] @@ -54,6 +62,13 @@ def test_remote(self): ) assert type(launcher) is launchers.sco assert not launcher.sync + os.environ["LAZYLLM_DEFAULT_LAUNCHER"] = 'k8s' + lazyllm.config.add('launcher', str, 'empty', 'DEFAULT_LAUNCHER') + launcher = launchers.remote( + sync=True + ) + assert type(launcher) is launchers.k8s + assert launcher.sync os.environ["LAZYLLM_DEFAULT_LAUNCHER"] = origin_launcher lazyllm.config.add('launcher', str, 'empty', 'DEFAULT_LAUNCHER') From a3a84bb89dd4dcaf3b88be14432d5f5f6172cce0 Mon Sep 17 00:00:00 2001 From: wangjian Date: Wed, 27 Nov 2024 16:59:23 +0800 Subject: [PATCH 05/17] k8s launcher supports nfs mounting --- lazyllm/launcher.py | 52 +++++++++++++++++++++++++++++---------------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/lazyllm/launcher.py b/lazyllm/launcher.py index a684975a..5e84916f 100644 --- a/lazyllm/launcher.py +++ b/lazyllm/launcher.py @@ -201,7 +201,7 @@ def __init__(self, cmd, launcher, *, sync=True): self.sync = sync self.deployment_name = f"deployment-{uuid.uuid4().hex[:8]}" self.namespace = launcher.namespace - self.volume_config = launcher.volume_config + self.volume_configs = launcher.volume_configs self.gateway_name = launcher.gateway_name self.deployment_port = 8080 self.host = launcher.http_host @@ -233,7 +233,7 @@ def _wrap_cmd(self, cmd): raise ValueError("Failed to obtain application port.") return precmd + " " + cmd - def _create_deployment_spec(self, cmd, volume_config=None): + def _create_deployment_spec(self, cmd, volume_configs=None): container = client.V1Container( name=self.deployment_name, image=self.image, @@ -242,25 +242,41 @@ def _create_deployment_spec(self, cmd, volume_config=None): requests=self.resource_config.get("requests", {"cpu": "2", "memory": "16Gi"}), limits=self.resource_config.get("requests", {"cpu": "2", "memory": "16Gi"}) ), - volume_mounts=[] if not volume_config else [ + volume_mounts=[] if not volume_configs else [ client.V1VolumeMount( - mount_path=volume_config['mount_path'], - name=volume_config.get("name", "lazyllm-k8s") - ) + mount_path=vol_config["mount_path"], + name=vol_config["name"] + ) for vol_config in volume_configs ] ) volumes = [] - if volume_config: - volumes.append( - client.V1Volume( - name=volume_config.get("name", "lazyllm-k8s"), - host_path=client.V1HostPathVolumeSource( - path=volume_config["host_path"], - type="Directory" + if volume_configs: + for vol_config in volume_configs: + if "nfs_server" in vol_config and "nfs_path" in vol_config: + volumes.append( + client.V1Volume( + name=vol_config["name"], + nfs=client.V1NFSVolumeSource( + server=vol_config["nfs_server"], + path=vol_config["nfs_path"], + read_only=vol_config.get("read_only", False) + ) + ) ) - ) - ) + elif "host_path" in vol_config: + volumes.append( + client.V1Volume( + name=vol_config["name"], + host_path=client.V1HostPathVolumeSource( + path=vol_config["host_path"], + type="Directory" + ) + ) + ) + else: + LOG.error(f"{vol_config} configuration error.") + template = client.V1PodTemplateSpec( metadata=client.V1ObjectMeta(labels={"app": self.deployment_name}), spec=client.V1PodSpec(restart_policy="Always", containers=[container], volumes=volumes) @@ -280,7 +296,7 @@ def _create_deployment_spec(self, cmd, volume_config=None): def _create_deployment(self, *, fixed=False): api_instance = client.AppsV1Api() cmd = self.get_executable_cmd(fixed=fixed) - deployment = self._create_deployment_spec(cmd.cmd, self.volume_config) + deployment = self._create_deployment_spec(cmd.cmd, self.volume_configs) try: api_instance.create_namespaced_deployment( body=deployment, @@ -830,7 +846,7 @@ def status(self): LOG.error(f"Exception when reading Deployment status: {e}") return Status.Failed - def __init__(self, kube_config_path=None, volume_config=None, image=None, resource_config=None, + def __init__(self, kube_config_path=None, volume_configs=None, image=None, resource_config=None, namespace=None, gateway_name=None, host=None, path=None, svc_type: Literal["LoadBalancer", "NodePort", "ClusterIP"] = None, retry=3, sync=True, ngpus=None, **kwargs): @@ -840,7 +856,7 @@ def __init__(self, kube_config_path=None, volume_config=None, image=None, resour self.ngpus = ngpus config_data = self._read_config_file(lazyllm.config['k8s_config_path']) if lazyllm.config['k8s_config_path'] \ else {} - self.volume_config = volume_config if volume_config else config_data.get('volume', {}) + self.volume_configs = volume_configs if volume_configs else config_data.get('volume', []) self.image = image if image else config_data.get('container_image', "lazyllm/lazyllm:k8s_launcher") self.resource_config = resource_config if resource_config else config_data.get('resource', {}) self.kube_config_path = kube_config_path if kube_config_path \ From 98b0ee8f086e8c2befee02ae81868379249ddd0f Mon Sep 17 00:00:00 2001 From: wangjian Date: Mon, 9 Dec 2024 15:07:18 +0800 Subject: [PATCH 06/17] modify kubernetes package import path --- lazyllm/launcher.py | 136 +++++++++++++++++---------------- lazyllm/thirdparty/__init__.py | 2 +- 2 files changed, 70 insertions(+), 68 deletions(-) diff --git a/lazyllm/launcher.py b/lazyllm/launcher.py index 5e84916f..a0a0aee3 100644 --- a/lazyllm/launcher.py +++ b/lazyllm/launcher.py @@ -18,8 +18,7 @@ import lazyllm from lazyllm import LazyLLMRegisterMetaClass, LazyLLMCMD, final, LOG -from kubernetes import client, config -from kubernetes.client.rest import ApiException +from lazyllm.thirdparty import kubernetes as k8s import requests import yaml from typing import Literal @@ -203,6 +202,7 @@ def __init__(self, cmd, launcher, *, sync=True): self.namespace = launcher.namespace self.volume_configs = launcher.volume_configs self.gateway_name = launcher.gateway_name + self.gateway_class_name = launcher.gateway_class_name self.deployment_port = 8080 self.host = launcher.http_host self.path = launcher.http_path @@ -234,16 +234,16 @@ def _wrap_cmd(self, cmd): return precmd + " " + cmd def _create_deployment_spec(self, cmd, volume_configs=None): - container = client.V1Container( + container = k8s.client.V1Container( name=self.deployment_name, image=self.image, command=["bash", "-c", cmd], - resources=client.V1ResourceRequirements( + resources=k8s.client.V1ResourceRequirements( requests=self.resource_config.get("requests", {"cpu": "2", "memory": "16Gi"}), limits=self.resource_config.get("requests", {"cpu": "2", "memory": "16Gi"}) ), volume_mounts=[] if not volume_configs else [ - client.V1VolumeMount( + k8s.client.V1VolumeMount( mount_path=vol_config["mount_path"], name=vol_config["name"] ) for vol_config in volume_configs @@ -255,9 +255,9 @@ def _create_deployment_spec(self, cmd, volume_configs=None): for vol_config in volume_configs: if "nfs_server" in vol_config and "nfs_path" in vol_config: volumes.append( - client.V1Volume( + k8s.client.V1Volume( name=vol_config["name"], - nfs=client.V1NFSVolumeSource( + nfs=k8s.client.V1NFSVolumeSource( server=vol_config["nfs_server"], path=vol_config["nfs_path"], read_only=vol_config.get("read_only", False) @@ -266,9 +266,9 @@ def _create_deployment_spec(self, cmd, volume_configs=None): ) elif "host_path" in vol_config: volumes.append( - client.V1Volume( + k8s.client.V1Volume( name=vol_config["name"], - host_path=client.V1HostPathVolumeSource( + host_path=k8s.client.V1HostPathVolumeSource( path=vol_config["host_path"], type="Directory" ) @@ -277,24 +277,24 @@ def _create_deployment_spec(self, cmd, volume_configs=None): else: LOG.error(f"{vol_config} configuration error.") - template = client.V1PodTemplateSpec( - metadata=client.V1ObjectMeta(labels={"app": self.deployment_name}), - spec=client.V1PodSpec(restart_policy="Always", containers=[container], volumes=volumes) + template = k8s.client.V1PodTemplateSpec( + metadata=k8s.client.V1ObjectMeta(labels={"app": self.deployment_name}), + spec=k8s.client.V1PodSpec(restart_policy="Always", containers=[container], volumes=volumes) ) - deployment_spec = client.V1DeploymentSpec( + deployment_spec = k8s.client.V1DeploymentSpec( replicas=1, template=template, - selector=client.V1LabelSelector(match_labels={"app": self.deployment_name}) + selector=k8s.client.V1LabelSelector(match_labels={"app": self.deployment_name}) ) - return client.V1Deployment( + return k8s.client.V1Deployment( api_version="apps/v1", kind="Deployment", - metadata=client.V1ObjectMeta(name=self.deployment_name), + metadata=k8s.client.V1ObjectMeta(name=self.deployment_name), spec=deployment_spec ) def _create_deployment(self, *, fixed=False): - api_instance = client.AppsV1Api() + api_instance = k8s.client.AppsV1Api() cmd = self.get_executable_cmd(fixed=fixed) deployment = self._create_deployment_spec(cmd.cmd, self.volume_configs) try: @@ -303,32 +303,32 @@ def _create_deployment(self, *, fixed=False): namespace=self.namespace ) LOG.info(f"Kubernetes Deployment '{self.deployment_name}' created successfully.") - except ApiException as e: + except k8s.client.rest.ApiException as e: LOG.error(f"Exception when creating Kubernetes Deployment: {e}") raise def _delete_deployment(self): - config.load_kube_config(self.launcher.kube_config_path) - api_instance = client.AppsV1Api() + k8s.config.load_kube_config(self.launcher.kube_config_path) + api_instance = k8s.client.AppsV1Api() try: api_instance.delete_namespaced_deployment( name=self.deployment_name, namespace=self.namespace, - body=client.V1DeleteOptions(propagation_policy="Foreground") + body=k8s.client.V1DeleteOptions(propagation_policy="Foreground") ) LOG.info(f"Kubernetes Deployment {self.deployment_name} deleted.") - except ApiException as e: + except k8s.client.rest.ApiException as e: LOG.error(f"Exception when deleting Kubernetes Deployment: {e}") def _expose_deployment(self): - api_instance = client.CoreV1Api() - service = client.V1Service( + api_instance = k8s.client.CoreV1Api() + service = k8s.client.V1Service( api_version="v1", kind="Service", - metadata=client.V1ObjectMeta(name=f"service-{self.deployment_name}"), - spec=client.V1ServiceSpec( + metadata=k8s.client.V1ObjectMeta(name=f"service-{self.deployment_name}"), + spec=k8s.client.V1ServiceSpec( selector={"app": self.deployment_name}, - ports=[client.V1ServicePort(port=self.deployment_port, target_port=self.deployment_port)], + ports=[k8s.client.V1ServicePort(port=self.deployment_port, target_port=self.deployment_port)], type="ClusterIP" ) ) @@ -338,25 +338,25 @@ def _expose_deployment(self): body=service ) LOG.info(f"Kubernetes Service 'service-{self.deployment_name}' created and exposed successfully.") - except ApiException as e: + except k8s.client.rest.ApiException as e: LOG.info(f"Exception when creating Service: {e}") raise def _delete_service(self): - config.load_kube_config(self.launcher.kube_config_path) - svc_instance = client.CoreV1Api() + k8s.config.load_kube_config(self.launcher.kube_config_path) + svc_instance = k8s.client.CoreV1Api() try: svc_instance.delete_namespaced_service( name=f"service-{self.deployment_name}", namespace=self.namespace, - body=client.V1DeleteOptions(propagation_policy="Foreground") + body=k8s.client.V1DeleteOptions(propagation_policy="Foreground") ) LOG.info(f"Kubernetes Service 'service-{self.deployment_name}' deleted.") - except ApiException as e: + except k8s.client.rest.ApiException as e: LOG.error(f"Exception when deleting Kubernetes Service: {e}") def _create_or_update_gateway(self): - networking_api = client.CustomObjectsApi() + networking_api = k8s.client.CustomObjectsApi() gateway_spec = { "apiVersion": "gateway.networking.k8s.io/v1beta1", "kind": "Gateway", @@ -368,7 +368,7 @@ def _create_or_update_gateway(self): } }, "spec": { - "gatewayClassName": "istio", + "gatewayClassName": self.gateway_class_name, "listeners": [ { "name": f"httproute-{self.deployment_name}", @@ -398,7 +398,7 @@ def _create_or_update_gateway(self): body=existing_gateway ) LOG.info(f"Kubernetes Gateway '{self.gateway_name}' updated successfully.") - except ApiException as e: + except k8s.client.rest.ApiException as e: if e.status == 404: try: networking_api.create_namespaced_custom_object( @@ -409,14 +409,14 @@ def _create_or_update_gateway(self): body=gateway_spec ) LOG.info(f"Kubernetes Gateway '{self.gateway_name}' created successfully.") - except ApiException as e_create: + except k8s.client.rest.ApiException as e_create: LOG.error(f"Exception when creating Gateway: {e_create}") else: LOG.error(f"Exception when updating Gateway: {e}") def _delete_or_update_gateway(self): - config.load_kube_config(self.launcher.kube_config_path) - gateway_instance = client.CustomObjectsApi() + k8s.config.load_kube_config(self.launcher.kube_config_path) + gateway_instance = k8s.client.CustomObjectsApi() try: gateway = gateway_instance.get_namespaced_custom_object( group="gateway.networking.k8s.io", @@ -450,11 +450,11 @@ def _delete_or_update_gateway(self): name=self.gateway_name ) LOG.info(f"Kubernetes Gateway '{self.gateway_name}' deleted.") - except ApiException as e: + except k8s.client.rest.ApiException as e: LOG.error(f"Exception when deleting or updating Gateway: {e}") def _create_httproute(self): - custom_api = client.CustomObjectsApi() + custom_api = k8s.client.CustomObjectsApi() httproute_spec = { "apiVersion": "gateway.networking.k8s.io/v1beta1", @@ -495,12 +495,12 @@ def _create_httproute(self): body=httproute_spec ) LOG.info(f"Kubernetes HTTPRoute 'httproute-{self.deployment_name}' created successfully.") - except ApiException as e: + except k8s.client.rest.ApiException as e: LOG.error(f"Exception when creating HTTPRoute: {e}") def _delete_httproute(self): - config.load_kube_config(self.launcher.kube_config_path) - httproute_instance = client.CustomObjectsApi() + k8s.config.load_kube_config(self.launcher.kube_config_path) + httproute_instance = k8s.client.CustomObjectsApi() try: httproute_instance.delete_namespaced_custom_object( group="gateway.networking.k8s.io", @@ -510,7 +510,7 @@ def _delete_httproute(self): name=f"httproute-{self.deployment_name}" ) LOG.info(f"Kubernetes HTTPRoute 'httproute-{self.deployment_name}' delete.") - except ApiException as e: + except k8s.client.rest.ApiException as e: LOG.error(f"Exception when deleting HTTPRoute: {e}") def _start(self, *, fixed=False): @@ -546,7 +546,7 @@ def _get_jobid(self): return f"service-{self.deployment_name}" def _get_gateway_service_name(self): - core_api = client.CoreV1Api() + core_api = k8s.client.CoreV1Api() try: services = core_api.list_namespaced_service(namespace=self.namespace) @@ -558,13 +558,13 @@ def _get_gateway_service_name(self): LOG.warning("No Service was found corresponding to the specified Gateway.") return None - except ApiException as e: + except k8s.client.rest.ApiException as e: LOG.error(f"Exception when retrieving Gateway Service: {e}") return None def _get_gateway_deployment_name(self): - core_api = client.CoreV1Api() - apps_v1 = client.AppsV1Api() + core_api = k8s.client.CoreV1Api() + apps_v1 = k8s.client.AppsV1Api() gateway_service_name = self._get_gateway_service_name() try: @@ -584,12 +584,12 @@ def _get_gateway_deployment_name(self): else: LOG.warning(f"Kubernetes Service '{gateway_service_name}' does not have a selector.") return None - except ApiException as e: + except k8s.client.rest.ApiException as e: LOG.error(f"Error fetching Service '{gateway_service_name}': {e}") return None def _get_gateway_ip(self): - core_api = client.CoreV1Api() + core_api = k8s.client.CoreV1Api() gateway_service_name = self._get_gateway_service_name() if gateway_service_name is None: raise ValueError("Kubernetes Gateway service name not found.") @@ -615,12 +615,12 @@ def _get_gateway_ip(self): else: LOG.warning("Unsupported Service type.") return None - except ApiException as e: + except k8s.client.rest.ApiException as e: LOG.error(f"Exception when retrieving gateway IP: {e}") return None def _get_httproute_host(self): - custom_api = client.CustomObjectsApi() + custom_api = k8s.client.CustomObjectsApi() try: httproute = custom_api.get_namespaced_custom_object( group="gateway.networking.k8s.io", @@ -636,7 +636,7 @@ def _get_httproute_host(self): else: LOG.warning("Kubernetes HTTPRoute has no configured hostnames.") return None - except ApiException as e: + except k8s.client.rest.ApiException as e: LOG.error(f"Exception when retrieving HTTPRoute host: {e}") return None @@ -647,7 +647,7 @@ def get_jobip(self): return host if host else ip def wait_for_deployment_ready(self, timeout=300): - api_instance = client.AppsV1Api() + api_instance = k8s.client.AppsV1Api() start_time = time.time() while time.time() - start_time < timeout: try: @@ -659,14 +659,14 @@ def wait_for_deployment_ready(self, timeout=300): LOG.info(f"Kubernetes Deployment '{self.deployment_name}' is running.") return True time.sleep(2) - except ApiException as e: + except k8s.client.rest.ApiException as e: LOG.error(f"Exception when reading Deployment status: {e}") raise LOG.warning(f"Timed out waiting for Deployment '{self.deployment_name}' to be ready.") return False def wait_for_service_ready(self, timeout=300): - svc_instance = client.CoreV1Api() + svc_instance = k8s.client.CoreV1Api() start_time = time.time() while time.time() - start_time < timeout: try: @@ -700,7 +700,7 @@ def wait_for_service_ready(self, timeout=300): return {"ip": node_ip, "ports": node_ports} LOG.info(f"Kubernetes Service 'service-{self.deployment_name}' is not ready yet. Retrying...") time.sleep(2) - except ApiException as e: + except k8s.client.rest.ApiException as e: LOG.error(f"Exception when reading Service status: {e}") raise LOG.warning(f"Timed out waiting for Service 'service-{self.deployment_name}' to be ready.") @@ -725,8 +725,8 @@ def _is_gateway_ready(self, timeout): return False def wait_for_gateway(self, timeout=300, interval=5): # noqa: C901 - core_v1 = client.CoreV1Api() - apps_v1 = client.AppsV1Api() + core_v1 = k8s.client.CoreV1Api() + apps_v1 = k8s.client.AppsV1Api() gateway_service_name = self._get_gateway_service_name() gateway_deployment_names = self._get_gateway_deployment_name() service_ready = False @@ -758,7 +758,7 @@ def wait_for_gateway(self, timeout=300, interval=5): # noqa: C901 else: LOG.error(f"Unexpected Kubernetes Service type: {service.spec.type}.") service_ready = False - except ApiException as e: + except k8s.client.rest.ApiException as e: LOG.error(f"Kubernetes Service '{gateway_service_name}' not found yet: {e}") service_ready = False if not deployment_ready: @@ -773,7 +773,7 @@ def wait_for_gateway(self, timeout=300, interval=5): # noqa: C901 else: LOG.info(f"Kubernetes Deployment '{deployment_name}' is not fully ready yet.") deployment_ready = False - except ApiException as e: + except k8s.client.rest.ApiException as e: LOG.warning(f"Kubernetes Deployment '{deployment_name}' not found yet: {e}") deployment_ready = False @@ -787,7 +787,7 @@ def wait_for_gateway(self, timeout=300, interval=5): # noqa: C901 return False def wait_for_httproute(self, timeout=300): - custom_api = client.CustomObjectsApi() + custom_api = k8s.client.CustomObjectsApi() start_time = time.time() while time.time() - start_time < timeout: try: @@ -803,7 +803,7 @@ def wait_for_httproute(self, timeout=300): LOG.info(f"Kubernetes HTTPRoute 'httproute-{self.deployment_name}' is ready.") return True LOG.info(f"Waiting for HTTPRoute 'httproute-{self.deployment_name}' to be ready...") - except ApiException as e: + except k8s.client.rest.ApiException as e: LOG.error(f"Exception when checking HTTPRoute status: {e}") time.sleep(2) @@ -832,7 +832,7 @@ def wait(self): @property def status(self): - api_instance = client.AppsV1Api() + api_instance = k8s.client.AppsV1Api() try: deployment_status = api_instance.read_namespaced_deployment_status( name=self.deployment_name, @@ -842,12 +842,12 @@ def status(self): return Status.Running else: return Status.Pending - except ApiException as e: + except k8s.client.rest.ApiException as e: LOG.error(f"Exception when reading Deployment status: {e}") return Status.Failed def __init__(self, kube_config_path=None, volume_configs=None, image=None, resource_config=None, - namespace=None, gateway_name=None, host=None, path=None, + namespace=None, gateway_name=None, gateway_class_name=None, host=None, path=None, svc_type: Literal["LoadBalancer", "NodePort", "ClusterIP"] = None, retry=3, sync=True, ngpus=None, **kwargs): super().__init__() @@ -864,6 +864,8 @@ def __init__(self, kube_config_path=None, volume_configs=None, image=None, resou self.svc_type = svc_type if svc_type else config_data.get("svc_type", "LoadBalancer") self.namespace = namespace if namespace else config_data.get("namespace", "default") self.gateway_name = gateway_name if gateway_name else config_data.get("gateway_name", "lazyllm-gateway") + self.gateway_class_name = gateway_class_name if gateway_class_name \ + else config_data.get("gateway_class_name", "istio") self.http_host = host if host else config_data.get("host", None) self.http_path = path if path else config_data.get("path", '/generate') @@ -878,7 +880,7 @@ def _read_config_file(self, file_path): raise ValueError("Kubernetes resource configuration file format error.") def makejob(self, cmd): - config.load_kube_config(self.kube_config_path) + k8s.config.load_kube_config(self.kube_config_path) return K8sLauncher.Job(cmd, launcher=self, sync=self.sync) def launch(self, f, *args, **kw): diff --git a/lazyllm/thirdparty/__init__.py b/lazyllm/thirdparty/__init__.py index dac0f62e..3c75e943 100644 --- a/lazyllm/thirdparty/__init__.py +++ b/lazyllm/thirdparty/__init__.py @@ -80,6 +80,6 @@ def __getattribute__(self, __name): modules = ['redis', 'huggingface_hub', 'jieba', 'modelscope', 'pandas', 'jwt', 'rank_bm25', 'redisvl', 'datasets', 'deepspeed', 'fire', 'numpy', 'peft', 'torch', 'transformers', 'collie', 'faiss', 'flash_attn', 'google', 'lightllm', 'vllm', 'ChatTTS', 'wandb', 'funasr', 'sklearn', 'torchvision', 'scipy', 'pymilvus', - 'sentence_transformers', 'gradio', 'chromadb', 'nltk', 'PIL', 'httpx', 'bm25s'] + 'sentence_transformers', 'gradio', 'chromadb', 'nltk', 'PIL', 'httpx', 'bm25s', 'kubernetes'] for m in modules: vars()[m] = PackageWrapper(m) From fc75ba4255dea91886ac7a55dfd1739b170e55ae Mon Sep 17 00:00:00 2001 From: wangjian Date: Fri, 13 Dec 2024 20:01:38 +0800 Subject: [PATCH 07/17] modify k8s launcher for ecs --- lazyllm/launcher.py | 62 +++++++++++++++++++++++++++------------------ 1 file changed, 38 insertions(+), 24 deletions(-) diff --git a/lazyllm/launcher.py b/lazyllm/launcher.py index a0a0aee3..a4f72093 100644 --- a/lazyllm/launcher.py +++ b/lazyllm/launcher.py @@ -195,9 +195,6 @@ class K8sLauncher(LazyLLMLaunchersBase): class Job(Job): def __init__(self, cmd, launcher, *, sync=True): super().__init__(cmd, launcher, sync=sync) - self.cmd = cmd - self.launcher = launcher - self.sync = sync self.deployment_name = f"deployment-{uuid.uuid4().hex[:8]}" self.namespace = launcher.namespace self.volume_configs = launcher.volume_configs @@ -276,6 +273,7 @@ def _create_deployment_spec(self, cmd, volume_configs=None): ) else: LOG.error(f"{vol_config} configuration error.") + raise template = k8s.client.V1PodTemplateSpec( metadata=k8s.client.V1ObjectMeta(labels={"app": self.deployment_name}), @@ -339,7 +337,7 @@ def _expose_deployment(self): ) LOG.info(f"Kubernetes Service 'service-{self.deployment_name}' created and exposed successfully.") except k8s.client.rest.ApiException as e: - LOG.info(f"Exception when creating Service: {e}") + LOG.error(f"Exception when creating Service: {e}") raise def _delete_service(self): @@ -411,8 +409,10 @@ def _create_or_update_gateway(self): LOG.info(f"Kubernetes Gateway '{self.gateway_name}' created successfully.") except k8s.client.rest.ApiException as e_create: LOG.error(f"Exception when creating Gateway: {e_create}") + raise else: LOG.error(f"Exception when updating Gateway: {e}") + raise def _delete_or_update_gateway(self): k8s.config.load_kube_config(self.launcher.kube_config_path) @@ -452,6 +452,7 @@ def _delete_or_update_gateway(self): LOG.info(f"Kubernetes Gateway '{self.gateway_name}' deleted.") except k8s.client.rest.ApiException as e: LOG.error(f"Exception when deleting or updating Gateway: {e}") + raise def _create_httproute(self): custom_api = k8s.client.CustomObjectsApi() @@ -497,6 +498,7 @@ def _create_httproute(self): LOG.info(f"Kubernetes HTTPRoute 'httproute-{self.deployment_name}' created successfully.") except k8s.client.rest.ApiException as e: LOG.error(f"Exception when creating HTTPRoute: {e}") + raise def _delete_httproute(self): k8s.config.load_kube_config(self.launcher.kube_config_path) @@ -523,19 +525,6 @@ def _start(self, *, fixed=False): ret = self.wait() LOG.info(ret) - def start(self, *, restart=3, fixed=False): - self._start(fixed=fixed) - if not (lazyllm.config['mode'] == lazyllm.Mode.Display or self._fixed_cmd.checkf(self)): - if restart > 0: - for _ in range(restart): - self.restart(fixed=fixed) - if self._fixed_cmd.checkf(self): break - else: - raise RuntimeError(f'Job failed after retrying {restart} times') - else: - raise RuntimeError('Job failed without retries') - self._set_return_value() - def stop(self): self._delete_or_update_gateway() self._delete_httproute() @@ -562,7 +551,7 @@ def _get_gateway_service_name(self): LOG.error(f"Exception when retrieving Gateway Service: {e}") return None - def _get_gateway_deployment_name(self): + def _get_gateway_deployment_name(self): # noqa: C901 core_api = k8s.client.CoreV1Api() apps_v1 = k8s.client.AppsV1Api() @@ -572,11 +561,34 @@ def _get_gateway_deployment_name(self): selector = service.spec.selector if selector: label_selector = ",".join(f"{k}={v}" for k, v in selector.items()) - deployments = apps_v1.list_namespaced_deployment(self.namespace, label_selector=label_selector) - if deployments.items: - deployments = [dep.metadata.name for dep in deployments.items] - LOG.info(f"Kubernetes Gateway deployment name: {deployments}") - return deployments + pods = core_api.list_namespaced_pod(self.namespace, label_selector=label_selector).items + if not pods: + LOG.warning(f"No Pods found for Service '{gateway_service_name}' in namespace " + f"'{self.namespace}'.") + return None + + deployments = set() + for pod in pods: + for owner in pod.metadata.owner_references: + if owner.kind == "ReplicaSet": + rs = apps_v1.read_namespaced_replica_set(owner.name, self.namespace) + for rs_owner in rs.metadata.owner_references: + if rs_owner.kind == "Deployment": + deployments.add(rs_owner.name) + + if deployments: + for deployment_name in deployments: + isRestart = False + deployment = apps_v1.read_namespaced_deployment(deployment_name, self.namespace) + for container in deployment.spec.template.spec.containers: + if container.name == "istio-proxy" and container.image_pull_policy == "Always": + container.image_pull_policy = "IfNotPresent" + isRestart = True + if isRestart: + apps_v1.replace_namespaced_deployment(name=deployment_name, namespace=self.namespace, + body=deployment) + LOG.info(f"Updated {deployment_name} with imagePullPolicy 'IfNotPresent'") + return list(deployments) else: LOG.warning(f"No Deployment found for Gateway '{self.gateway_name}' in namespace " f"'{self.namespace}'.") @@ -719,6 +731,7 @@ def _is_gateway_ready(self, timeout): LOG.info(f"Kubernetes Service at '{url}' returned status code {response.status_code}") except requests.RequestException as e: LOG.error(f"Failed to access service at '{url}': {e}") + raise time.sleep(timeout) self.queue.put(f"ERROR: Kubernetes Service failed to start on '{url}'.") @@ -793,7 +806,7 @@ def wait_for_httproute(self, timeout=300): try: httproutes = custom_api.list_namespaced_custom_object( group="gateway.networking.k8s.io", - version="v1alpha2", + version="v1beta1", namespace=self.namespace, plural="httproutes" ).get('items', []) @@ -805,6 +818,7 @@ def wait_for_httproute(self, timeout=300): LOG.info(f"Waiting for HTTPRoute 'httproute-{self.deployment_name}' to be ready...") except k8s.client.rest.ApiException as e: LOG.error(f"Exception when checking HTTPRoute status: {e}") + raise time.sleep(2) LOG.warning(f"Timeout waiting for HTTPRoute 'httproute-{self.deployment_name}' to be ready.") From 5b0cd7766453192c952543ebc51fb3cee7f79af4 Mon Sep 17 00:00:00 2001 From: wangjian Date: Mon, 16 Dec 2024 19:57:13 +0800 Subject: [PATCH 08/17] add k8s test units --- .github/workflows/k8s_test.yml | 61 ++++++++++++ lazyllm/launcher.py | 158 +++++++++++++++++++++++++++--- tests/k8s_tests/test_example.py | 166 ++++++++++++++++++++++++++++++++ 3 files changed, 372 insertions(+), 13 deletions(-) create mode 100644 .github/workflows/k8s_test.yml create mode 100644 tests/k8s_tests/test_example.py diff --git a/.github/workflows/k8s_test.yml b/.github/workflows/k8s_test.yml new file mode 100644 index 00000000..13c44c80 --- /dev/null +++ b/.github/workflows/k8s_test.yml @@ -0,0 +1,61 @@ +name: Test on k8s + +on: + workflow_dispatch: + push: + branches: + - main + pull_request: + branches: + - main + paths-ignore: + - "**.md" + +env: + CI_PATH: '/mnt/nfs_share/GitHub/${{ github.repository }}/${GITHUB_RUN_NUMBER}' + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + k8s_tests: + runs-on: tps_k8s + + container: + image: registry.cn-sh-01.sensecore.cn/ai-expert-service/lazyllm_ci:latest + options: --mount type=bind,source=/mnt/nfs_share,target=/mnt/nfs_share --mount type=bind,source=/mnt/lustre/share_data,target=/mnt/lustre/share_data + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Create custom directory + run: | + set -ex + echo ${{ env.CI_PATH }} + mkdir -p ${{ env.CI_PATH }} + + - name: Clean custom directory + run: | + set -ex + if [ -d "${{ env.CI_PATH }}" ]; then + rm -rf ${{ env.CI_PATH }}/* + fi + + - name: Move code to custom directory + run: | + set -ex + mv $GITHUB_WORKSPACE/* ${{ env.CI_PATH }}/ + + - name: test + run: | + cd ${{ env.CI_PATH }} + export PYTHONPATH=$PWD:$PYTHONPATH + export LAZYLLM_DEFAULT_LAUNCHER="k8s" + export LAZYLLM_DATA_PATH="/mnt/lustre/share_data/lazyllm/data" + export LAZYLLM_K8S_ENV_NAME="lazyllm" + export LAZYLLM_K8S_CONFIG_PATH="/mnt/nfs_share/k8s_config.yaml" + mkdir -p $LAZYLLM_HOME + source ~/ENV/env.sh + python -m pytest --lf --last-failed-no-failures=all --durations=0 --reruns=2 -v tests/k8s_tests diff --git a/lazyllm/launcher.py b/lazyllm/launcher.py index a4f72093..461ba877 100644 --- a/lazyllm/launcher.py +++ b/lazyllm/launcher.py @@ -234,6 +234,7 @@ def _create_deployment_spec(self, cmd, volume_configs=None): container = k8s.client.V1Container( name=self.deployment_name, image=self.image, + image_pull_policy="IfNotPresent", command=["bash", "-c", cmd], resources=k8s.client.V1ResourceRequirements( requests=self.resource_config.get("requests", {"cpu": "2", "memory": "16Gi"}), @@ -305,7 +306,7 @@ def _create_deployment(self, *, fixed=False): LOG.error(f"Exception when creating Kubernetes Deployment: {e}") raise - def _delete_deployment(self): + def _delete_deployment(self, wait_for_completion=True, timeout=60, check_interval=5): k8s.config.load_kube_config(self.launcher.kube_config_path) api_instance = k8s.client.AppsV1Api() try: @@ -315,8 +316,32 @@ def _delete_deployment(self): body=k8s.client.V1DeleteOptions(propagation_policy="Foreground") ) LOG.info(f"Kubernetes Deployment {self.deployment_name} deleted.") + + if wait_for_completion: + self._wait_for_deployment_deletion(timeout=timeout, check_interval=check_interval) except k8s.client.rest.ApiException as e: - LOG.error(f"Exception when deleting Kubernetes Deployment: {e}") + if e.status == 404: + LOG.info(f"Kubernetes Deployment '{self.deployment_name}' already deleted.") + else: + LOG.error(f"Exception when deleting Kubernetes Deployment: {e}") + raise + + def _wait_for_deployment_deletion(self, timeout, check_interval): + api_instance = k8s.client.AppsV1Api() + start_time = time.time() + while time.time() - start_time < timeout: + try: + api_instance.read_namespaced_deployment(name=self.deployment_name, namespace=self.namespace) + LOG.info(f"Waiting for Kubernetes Deployment '{self.deployment_name}' to be deleted...") + except k8s.client.rest.ApiException as e: + if e.status == 404: + LOG.info(f"Kubernetes Deployment '{self.deployment_name}' successfully deleted.") + return + else: + LOG.error(f"Error while checking Deployment deletion status: {e}") + raise + time.sleep(check_interval) + LOG.warning(f"Timeout while waiting for Kubernetes Deployment '{self.deployment_name}' to be deleted.") def _expose_deployment(self): api_instance = k8s.client.CoreV1Api() @@ -340,18 +365,45 @@ def _expose_deployment(self): LOG.error(f"Exception when creating Service: {e}") raise - def _delete_service(self): + def _delete_service(self, wait_for_completion=True, timeout=60, check_interval=5): k8s.config.load_kube_config(self.launcher.kube_config_path) svc_instance = k8s.client.CoreV1Api() + service_name = f"service-{self.deployment_name}" try: svc_instance.delete_namespaced_service( - name=f"service-{self.deployment_name}", + name=service_name, namespace=self.namespace, body=k8s.client.V1DeleteOptions(propagation_policy="Foreground") ) - LOG.info(f"Kubernetes Service 'service-{self.deployment_name}' deleted.") + LOG.info(f"Kubernetes Service '{service_name}' deleted.") + + if wait_for_completion: + self._wait_for_service_deletion(service_name=service_name, + timeout=timeout, + check_interval=check_interval) except k8s.client.rest.ApiException as e: - LOG.error(f"Exception when deleting Kubernetes Service: {e}") + if e.status == 404: + LOG.info(f"Kubernetes Service '{service_name}' already deleted.") + else: + LOG.error(f"Exception when deleting Kubernetes Service: {e}") + raise + + def _wait_for_service_deletion(self, service_name, timeout, check_interval): + svc_instance = k8s.client.CoreV1Api() + start_time = time.time() + while time.time() - start_time < timeout: + try: + svc_instance.read_namespaced_service(name=service_name, namespace=self.namespace) + LOG.info(f"Waiting for kubernetes Service '{service_name}' to be deleted...") + except k8s.client.rest.ApiException as e: + if e.status == 404: + LOG.info(f"Kubernetes Service '{service_name}' successfully deleted.") + return + else: + LOG.error(f"Error while checking Service deletion status: {e}") + raise + time.sleep(check_interval) + LOG.warning(f"Timeout while waiting for kubernetes Service '{service_name}' to be deleted.") def _create_or_update_gateway(self): networking_api = k8s.client.CustomObjectsApi() @@ -414,7 +466,7 @@ def _create_or_update_gateway(self): LOG.error(f"Exception when updating Gateway: {e}") raise - def _delete_or_update_gateway(self): + def _delete_or_update_gateway(self, wait_for_completion=True, timeout=60, check_interval=5): k8s.config.load_kube_config(self.launcher.kube_config_path) gateway_instance = k8s.client.CustomObjectsApi() try: @@ -441,6 +493,9 @@ def _delete_or_update_gateway(self): body=gateway ) LOG.info(f"Kubernetes Gateway '{self.gateway_name}' deleted updated.") + + if wait_for_completion: + self._wait_for_gateway_update(timeout=timeout, check_interval=check_interval) else: gateway_instance.delete_namespaced_custom_object( group="gateway.networking.k8s.io", @@ -450,9 +505,58 @@ def _delete_or_update_gateway(self): name=self.gateway_name ) LOG.info(f"Kubernetes Gateway '{self.gateway_name}' deleted.") + + if wait_for_completion: + self._wait_for_gateway_deletion(timeout=timeout, check_interval=check_interval) except k8s.client.rest.ApiException as e: - LOG.error(f"Exception when deleting or updating Gateway: {e}") - raise + if e.status == 404: + LOG.info(f"Gateway '{self.gateway_name}' already deleted.") + else: + LOG.error(f"Exception when deleting or updating Gateway: {e}") + raise + + def _wait_for_gateway_deletion(self, timeout, check_interval): + gateway_instance = k8s.client.CustomObjectsApi() + start_time = time.time() + while time.time() - start_time < timeout: + try: + gateway_instance.get_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1beta1", + namespace=self.namespace, + plural="gateways", + name=self.gateway_name + ) + LOG.info(f"Waiting for Gateway '{self.gateway_name}' to be deleted...") + except k8s.client.rest.ApiException as e: + if e.status == 404: + LOG.info(f"Gateway '{self.gateway_name}' successfully deleted.") + return + else: + LOG.error(f"Error while checking Gateway deletion status: {e}") + raise + time.sleep(check_interval) + LOG.warning(f"Timeout while waiting for Gateway '{self.gateway_name}' to be deleted.") + + def _wait_for_gateway_update(self, timeout, check_interval): + gateway_instance = k8s.client.CustomObjectsApi() + start_time = time.time() + while time.time() - start_time < timeout: + try: + gateway_instance.get_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1beta1", + namespace=self.namespace, + plural="gateways", + name=self.gateway_name + ) + LOG.info(f"Gateway '{self.gateway_name}' status check passed.") + return + except k8s.client.rest.ApiException as e: + LOG.error(f"Error while checking Gateway update status: {e}") + raise + time.sleep(check_interval) + LOG.warning(f"Timeout while waiting for Gateway '{self.gateway_name}' update.") def _create_httproute(self): custom_api = k8s.client.CustomObjectsApi() @@ -500,20 +604,48 @@ def _create_httproute(self): LOG.error(f"Exception when creating HTTPRoute: {e}") raise - def _delete_httproute(self): + def _delete_httproute(self, wait_for_deletion=True, timeout=60, check_interval=5): k8s.config.load_kube_config(self.launcher.kube_config_path) httproute_instance = k8s.client.CustomObjectsApi() + httproute_name = f"httproute-{self.deployment_name}" try: httproute_instance.delete_namespaced_custom_object( group="gateway.networking.k8s.io", version="v1beta1", namespace=self.namespace, plural="httproutes", - name=f"httproute-{self.deployment_name}" + name=httproute_name ) - LOG.info(f"Kubernetes HTTPRoute 'httproute-{self.deployment_name}' delete.") + LOG.info(f"Kubernetes HTTPRoute '{httproute_name}' delete initiated.") except k8s.client.rest.ApiException as e: - LOG.error(f"Exception when deleting HTTPRoute: {e}") + if e.status == 404: + LOG.info(f"HTTPRoute '{httproute_name}' already deleted.") + return + else: + LOG.error(f"Exception when deleting HTTPRoute: {e}") + raise + + if wait_for_deletion: + start_time = time.time() + while time.time() - start_time < timeout: + try: + httproute_instance.get_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1beta1", + namespace=self.namespace, + plural="httproutes", + name=httproute_name + ) + LOG.info(f"Waiting for HTTPRoute '{httproute_name}' to be deleted...") + except k8s.client.rest.ApiException as e: + if e.status == 404: + LOG.info(f"HTTPRoute '{httproute_name}' successfully deleted.") + return + else: + LOG.error(f"Error while checking HTTPRoute status: {e}") + raise + time.sleep(check_interval) + LOG.warning(f"Timeout while waiting for HTTPRoute '{httproute_name}' to be deleted.") def _start(self, *, fixed=False): self._create_deployment(fixed=fixed) diff --git a/tests/k8s_tests/test_example.py b/tests/k8s_tests/test_example.py new file mode 100644 index 00000000..2a1a3087 --- /dev/null +++ b/tests/k8s_tests/test_example.py @@ -0,0 +1,166 @@ +import lazyllm +from lazyllm import OnlineEmbeddingModule, launchers +from lazyllm.engine import LightEngine +import time +from gradio_client import Client + + +class TestExample(object): + def setup_method(self): + self.launcher = launchers.k8s() + + def teardown_method(self): + lazyllm.launcher.cleanup() + + def test_fc(self): + class Graph(lazyllm.ModuleBase): + def __init__(self): + super().__init__() + self._engine_conf = { + 'nodes': [ + { + 'id': ( + 'a0550cf15ea0f29f4d172d867d0df559c3c75a5b3fefb8ff419c61dd87e6d8f9' + '_1729740030006' + ), + 'kind': 'FunctionCall', + 'name': '1729740030006', + 'args': { + 'llm': '4ec882c0_d7a6_4796_b498_ca1ee6b2bd27', + 'tools': ['782b588f_7e87_415f_9e3b_3bfc9de2de7f'], + 'algorithm': 'ReWOO' + } + } + ], + 'edges': [ + { + 'iid': '__start__', + 'oid': ( + 'a0550cf15ea0f29f4d172d867d0df559c3c75a5b3fefb8ff419c61dd87e6d8f9' + '_1729740030006' + ) + }, + { + 'iid': ( + 'a0550cf15ea0f29f4d172d867d0df559c3c75a5b3fefb8ff419c61dd87e6d8f9' + '_1729740030006' + ), + 'oid': '__end__' + } + ], + 'resources': [ + { + 'id': '782b588f_7e87_415f_9e3b_3bfc9de2de7f', + 'kind': 'HttpTool', + 'name': 'Func20', + 'extras': {'provider_name': 'Func20'}, + 'args': { + 'timeout': 1, + 'doc': ( + '\n奇数偶数判断\n\nArgs:\n number (int): 输入数值\n\nReturns:\n output (str): 输出' + ), + 'code_str': ( + 'def is_even_or_odd(number):\r\n' + ' """\r\n' + ' 定义一个函数,用于判断一个数字是奇数还是偶数\r\n' + '\r\n' + ' args:\r\n' + ' int: number\r\n' + '\r\n' + ' returns:\r\n' + ' str: result\r\n' + ' """\r\n' + ' if number % 2 == 0:\r\n' + ' return f"这是工具代码的输出:{number}偶数"\r\n' + ' else:\r\n' + ' return f"这是工具代码的输出:{number}是奇数"' + ) + } + }, + { + 'id': '4ec882c0_d7a6_4796_b498_ca1ee6b2bd27', + 'kind': 'OnlineLLM', + 'name': '4ec882c0_d7a6_4796_b498_ca1ee6b2bd27', + 'args': { + 'source': 'glm', + 'prompt': None, + 'stream': False + } + } + ] + } + self.start_engine() + + def start_engine(self): + self._engine = LightEngine() + self._eid = self._engine.start(self._engine_conf.get("nodes", []), + self._engine_conf.get("edges", []), + self._engine_conf.get("resources", [])) + + def forward(self, query, **kw): + res = self._engine.run(self._eid, query) + return res + g = Graph() + web_module = lazyllm.ServerModule(g, launcher=self.launcher) + web_module.start() + r = web_module("3是奇数还是偶数") + assert '3是奇数' in r + + def test_rag(self): + def demo(query): + prompt = ("作为国学大师,你将扮演一个人工智能国学问答助手的角色,完成一项对话任务。在这个任务中,你需要根据给定的已知国学篇章以及" + "问题,给出你的结论。请注意,你的回答应基于给定的国学篇章,而非你的先验知识,且注意你回答的前后逻辑不要出现" + "重复,且不需要提到具体文件名称。\n任务示例如下:\n示例国学篇章:《礼记 大学》大学之道,在明明德,在亲民,在止于至善" + "。\n问题:什么是大学?\n回答:“大学”在《礼记》中代表的是一种理想的教育和社会实践过程,旨在通过个人的" + "道德修养和社会实践达到最高的善治状态。\n注意以上仅为示例,禁止在下面任务中提取或使用上述示例已知国学篇章。" + "\n现在,请对比以下给定的国学篇章和给出的问题。如果已知国学篇章中有该问题相关的原文,请提取相关原文出来。\n" + "已知国学篇章:{context_str}\n") + resources = [ + dict(id='00', kind='OnlineEmbedding', name='e1', args=dict(source='glm')), + dict(id='0', kind='Document', name='d1', args=dict(dataset_path='rag_master', embed='00', node_group=[ + dict(name='sentence', transform='SentenceSplitter', chunk_size=100, chunk_overlap=10)]))] + nodes = [dict(id='1', kind='Retriever', name='ret1', + args=dict(doc='0', group_name='CoarseChunk', similarity='bm25_chinese', topk=3)), + dict(id='2', kind='Retriever', name='ret2', + args=dict(doc='0', group_name='sentence', similarity='cosine', topk=3)), + dict(id='3', kind='JoinFormatter', name='c', args=dict(type='sum')), + dict(id='4', kind='Reranker', name='rek1', + args=dict(type='ModuleReranker', output_format='content', join=True, + arguments=dict(model=OnlineEmbeddingModule(type="rerank"), topk=1))), + dict(id='5', kind='Code', name='c1', + args='def test(nodes, query): return dict(context_str=nodes, query=query)'), + dict(id='6', kind='OnlineLLM', name='m1', + args=dict(source='glm', prompt=dict(system=prompt, user='问题: {query}')))] + edges = [dict(iid='__start__', oid='1'), dict(iid='__start__', oid='2'), dict(iid='1', oid='3'), + dict(iid='2', oid='3'), dict(iid='3', oid='4'), dict(iid='__start__', oid='4'), + dict(iid='4', oid='5'), dict(iid='__start__', oid='5'), dict(iid='5', oid='6'), + dict(iid='6', oid='__end__')] + engine = LightEngine() + gid = engine.start(nodes, edges, resources) + r = engine.run(gid, query) + return r + web_module = lazyllm.ServerModule(demo, launcher=self.launcher) + web_module.start() + r = web_module("何为天道?") + assert '观天之道,执天之行' in r or '天命之谓性,率性之谓道' in r + + def test_engine_server(self): + nodes = [dict(id='1', kind='Code', name='m1', args='def test(x: int):\n return 2 * x\n')] + edges = [dict(iid='__start__', oid='1'), dict(iid='1', oid='__end__')] + resources = [dict(id='2', kind='server', name='s1', args=dict(port=None, launcher=self.launcher)), + dict(id='3', kind='web', name='w1', args=dict(port=None, title='网页', history=[], audio=False)) + ] + engine = LightEngine() + gid = engine.start(nodes, edges, resources, gid='graph-1') + assert engine.status(gid) == {'1': 'running', '2': lazyllm.launcher.Status.Running, '3': 'running'} + assert engine.run(gid, 1) == 2 + time.sleep(3) + web = engine.build_node('graph-1').func._web + assert engine.build_node('graph-1').func.api_url is not None + assert engine.build_node('graph-1').func.web_url == web.url + client = Client(web.url, download_files=web.cach_path) + chat_history = [['123', None]] + ans = client.predict(False, chat_history, False, False, api_name="/_respond_stream") + assert ans[0][-1][-1] == '123123' + client.close() + web.stop() From 46f8026dbebbc630322b66f72a79416cc9be9565 Mon Sep 17 00:00:00 2001 From: wangjian Date: Tue, 17 Dec 2024 10:18:07 +0800 Subject: [PATCH 09/17] modify k8s test yaml --- .github/workflows/k8s_test.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/k8s_test.yml b/.github/workflows/k8s_test.yml index 13c44c80..c22187e6 100644 --- a/.github/workflows/k8s_test.yml +++ b/.github/workflows/k8s_test.yml @@ -56,6 +56,7 @@ jobs: export LAZYLLM_DATA_PATH="/mnt/lustre/share_data/lazyllm/data" export LAZYLLM_K8S_ENV_NAME="lazyllm" export LAZYLLM_K8S_CONFIG_PATH="/mnt/nfs_share/k8s_config.yaml" + export LAZYLLM_HOME="${{ env.CI_PATH }}/${{ github.run_id }}-${{ github.job }}" mkdir -p $LAZYLLM_HOME - source ~/ENV/env.sh + source /mnt/nfs_share/env.sh python -m pytest --lf --last-failed-no-failures=all --durations=0 --reruns=2 -v tests/k8s_tests From e31aea4cea33d8123f44e72c53e1daae1f5bb5a4 Mon Sep 17 00:00:00 2001 From: wangjian Date: Tue, 17 Dec 2024 10:29:29 +0800 Subject: [PATCH 10/17] add shell config for k8s test --- .github/workflows/k8s_test.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/k8s_test.yml b/.github/workflows/k8s_test.yml index c22187e6..256a06b2 100644 --- a/.github/workflows/k8s_test.yml +++ b/.github/workflows/k8s_test.yml @@ -49,6 +49,7 @@ jobs: mv $GITHUB_WORKSPACE/* ${{ env.CI_PATH }}/ - name: test + shell: bash run: | cd ${{ env.CI_PATH }} export PYTHONPATH=$PWD:$PYTHONPATH From 91368bdb129c13aa3e2bdc77984124fb0291fb6b Mon Sep 17 00:00:00 2001 From: wangjian Date: Tue, 17 Dec 2024 10:40:38 +0800 Subject: [PATCH 11/17] add install requirments --- .github/workflows/k8s_test.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/k8s_test.yml b/.github/workflows/k8s_test.yml index 256a06b2..7a494ba6 100644 --- a/.github/workflows/k8s_test.yml +++ b/.github/workflows/k8s_test.yml @@ -52,6 +52,7 @@ jobs: shell: bash run: | cd ${{ env.CI_PATH }} + pip install -r tests/requirements.txt export PYTHONPATH=$PWD:$PYTHONPATH export LAZYLLM_DEFAULT_LAUNCHER="k8s" export LAZYLLM_DATA_PATH="/mnt/lustre/share_data/lazyllm/data" From 0190105e7c18045e3604d1831e083b5904f3a247 Mon Sep 17 00:00:00 2001 From: wangjian Date: Tue, 17 Dec 2024 10:57:11 +0800 Subject: [PATCH 12/17] modify k8s_test.yaml --- .github/workflows/k8s_test.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/k8s_test.yml b/.github/workflows/k8s_test.yml index 7a494ba6..320367ff 100644 --- a/.github/workflows/k8s_test.yml +++ b/.github/workflows/k8s_test.yml @@ -48,10 +48,11 @@ jobs: set -ex mv $GITHUB_WORKSPACE/* ${{ env.CI_PATH }}/ - - name: test + - name: k8s_test shell: bash run: | cd ${{ env.CI_PATH }} + source activate lazyllm pip install -r tests/requirements.txt export PYTHONPATH=$PWD:$PYTHONPATH export LAZYLLM_DEFAULT_LAUNCHER="k8s" From d83b638e81118534840f5833e9a54d1c10c066a0 Mon Sep 17 00:00:00 2001 From: wangjian Date: Tue, 17 Dec 2024 14:40:34 +0800 Subject: [PATCH 13/17] add debug code for k8s test --- .github/workflows/k8s_test.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/k8s_test.yml b/.github/workflows/k8s_test.yml index 320367ff..42d2e698 100644 --- a/.github/workflows/k8s_test.yml +++ b/.github/workflows/k8s_test.yml @@ -62,4 +62,5 @@ jobs: export LAZYLLM_HOME="${{ env.CI_PATH }}/${{ github.run_id }}-${{ github.job }}" mkdir -p $LAZYLLM_HOME source /mnt/nfs_share/env.sh + sleep inf python -m pytest --lf --last-failed-no-failures=all --durations=0 --reruns=2 -v tests/k8s_tests From 30505112ffecf3d656afb8b2ad2065240100e44d Mon Sep 17 00:00:00 2001 From: wangjian Date: Tue, 17 Dec 2024 15:47:55 +0800 Subject: [PATCH 14/17] config launcher volume path --- lazyllm/launcher.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/lazyllm/launcher.py b/lazyllm/launcher.py index 461ba877..88f19af0 100644 --- a/lazyllm/launcher.py +++ b/lazyllm/launcher.py @@ -242,7 +242,8 @@ def _create_deployment_spec(self, cmd, volume_configs=None): ), volume_mounts=[] if not volume_configs else [ k8s.client.V1VolumeMount( - mount_path=vol_config["mount_path"], + mount_path=vol_config["mount_path"] if "__CURRENT_DIR__" not in vol_config['mount_path'] + else vol_config['mount_path'].replace("__CURRENT_DIR__", os.getcwd()), name=vol_config["name"] ) for vol_config in volume_configs ] @@ -257,7 +258,8 @@ def _create_deployment_spec(self, cmd, volume_configs=None): name=vol_config["name"], nfs=k8s.client.V1NFSVolumeSource( server=vol_config["nfs_server"], - path=vol_config["nfs_path"], + path=vol_config["nfs_path"] if "__CURRENT_DIR__" not in vol_config['nfs_path'] + else vol_config['nfs_path'].replace("__CURRENT_DIR__", os.getcwd()), read_only=vol_config.get("read_only", False) ) ) @@ -267,7 +269,8 @@ def _create_deployment_spec(self, cmd, volume_configs=None): k8s.client.V1Volume( name=vol_config["name"], host_path=k8s.client.V1HostPathVolumeSource( - path=vol_config["host_path"], + path=vol_config["host_path"] if "__CURRENT_DIR__" not in vol_config['host_path'] + else vol_config['host_path'].replace("__CURRENT_DIR__", os.getcwd()), type="Directory" ) ) From dde1576bfc4d04a1ded00e7179b7b323cb7fdeb3 Mon Sep 17 00:00:00 2001 From: wangjian Date: Tue, 17 Dec 2024 20:10:21 +0800 Subject: [PATCH 15/17] del sleep for debug --- .github/workflows/k8s_test.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/k8s_test.yml b/.github/workflows/k8s_test.yml index 42d2e698..ba6aff41 100644 --- a/.github/workflows/k8s_test.yml +++ b/.github/workflows/k8s_test.yml @@ -39,7 +39,7 @@ jobs: - name: Clean custom directory run: | set -ex - if [ -d "${{ env.CI_PATH }}" ]; then +systemctl status nfs-server if [ -d "${{ env.CI_PATH }}" ]; then rm -rf ${{ env.CI_PATH }}/* fi @@ -62,5 +62,4 @@ jobs: export LAZYLLM_HOME="${{ env.CI_PATH }}/${{ github.run_id }}-${{ github.job }}" mkdir -p $LAZYLLM_HOME source /mnt/nfs_share/env.sh - sleep inf python -m pytest --lf --last-failed-no-failures=all --durations=0 --reruns=2 -v tests/k8s_tests From cc0a08e86aacbdfe70398990ee60e347e9f19906 Mon Sep 17 00:00:00 2001 From: wangjian Date: Wed, 18 Dec 2024 10:00:51 +0800 Subject: [PATCH 16/17] modify k8s test yaml --- .github/workflows/k8s_test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/k8s_test.yml b/.github/workflows/k8s_test.yml index ba6aff41..320367ff 100644 --- a/.github/workflows/k8s_test.yml +++ b/.github/workflows/k8s_test.yml @@ -39,7 +39,7 @@ jobs: - name: Clean custom directory run: | set -ex -systemctl status nfs-server if [ -d "${{ env.CI_PATH }}" ]; then + if [ -d "${{ env.CI_PATH }}" ]; then rm -rf ${{ env.CI_PATH }}/* fi From eae27091b9b3edb30cc8c6ed91c8a47fe3d37d34 Mon Sep 17 00:00:00 2001 From: wangjian Date: Wed, 18 Dec 2024 10:36:25 +0800 Subject: [PATCH 17/17] delete the command to print cmd --- lazyllm/launcher.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lazyllm/launcher.py b/lazyllm/launcher.py index 88f19af0..296d7d3e 100644 --- a/lazyllm/launcher.py +++ b/lazyllm/launcher.py @@ -219,7 +219,6 @@ def _wrap_cmd(self, cmd): if lazyllm_vars: precmd += " && ".join(f"export {k}={v}" for k, v in lazyllm_vars.items()) + " && " precmd += '''ifconfig | grep "inet " | awk "{printf \\"LAZYLLMIP %s\\\\n\\", \$2}" &&''' # noqa W605 - LOG.info(f"precmd: {precmd}\n cmd: {cmd}") port_match = re.search(r"--open_port=(\d+)", cmd) if port_match: port = port_match.group(1) @@ -1035,7 +1034,7 @@ def makejob(self, cmd): def launch(self, f, *args, **kw): if isinstance(f, K8sLauncher.Job): f.start() - LOG.info("launch come here.") + LOG.info("Launcher started successfully.") self.job = f return f.return_value elif callable(f):