diff --git a/.github/workflows/k8s_test.yml b/.github/workflows/k8s_test.yml new file mode 100644 index 00000000..320367ff --- /dev/null +++ b/.github/workflows/k8s_test.yml @@ -0,0 +1,65 @@ +name: Test on k8s + +on: + workflow_dispatch: + push: + branches: + - main + pull_request: + branches: + - main + paths-ignore: + - "**.md" + +env: + CI_PATH: '/mnt/nfs_share/GitHub/${{ github.repository }}/${GITHUB_RUN_NUMBER}' + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + k8s_tests: + runs-on: tps_k8s + + container: + image: registry.cn-sh-01.sensecore.cn/ai-expert-service/lazyllm_ci:latest + options: --mount type=bind,source=/mnt/nfs_share,target=/mnt/nfs_share --mount type=bind,source=/mnt/lustre/share_data,target=/mnt/lustre/share_data + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Create custom directory + run: | + set -ex + echo ${{ env.CI_PATH }} + mkdir -p ${{ env.CI_PATH }} + + - name: Clean custom directory + run: | + set -ex + if [ -d "${{ env.CI_PATH }}" ]; then + rm -rf ${{ env.CI_PATH }}/* + fi + + - name: Move code to custom directory + run: | + set -ex + mv $GITHUB_WORKSPACE/* ${{ env.CI_PATH }}/ + + - name: k8s_test + shell: bash + run: | + cd ${{ env.CI_PATH }} + source activate lazyllm + pip install -r tests/requirements.txt + export PYTHONPATH=$PWD:$PYTHONPATH + export LAZYLLM_DEFAULT_LAUNCHER="k8s" + export LAZYLLM_DATA_PATH="/mnt/lustre/share_data/lazyllm/data" + export LAZYLLM_K8S_ENV_NAME="lazyllm" + export LAZYLLM_K8S_CONFIG_PATH="/mnt/nfs_share/k8s_config.yaml" + export LAZYLLM_HOME="${{ env.CI_PATH }}/${{ github.run_id }}-${{ github.job }}" + mkdir -p $LAZYLLM_HOME + source /mnt/nfs_share/env.sh + python -m pytest --lf --last-failed-no-failures=all --durations=0 --reruns=2 -v tests/k8s_tests diff --git a/lazyllm/launcher.py b/lazyllm/launcher.py index 9305e2cf..c7387166 100644 --- a/lazyllm/launcher.py +++ b/lazyllm/launcher.py @@ -18,6 +18,11 @@ import lazyllm from lazyllm import LazyLLMRegisterMetaClass, LazyLLMCMD, final, LOG +from lazyllm.thirdparty import kubernetes as k8s +import requests +import yaml +from typing import Literal + class Status(Enum): TBSubmitted = 0, InQueue = 1 @@ -74,6 +79,8 @@ def clone(self): lazyllm.config.add('sco_keep_record', bool, False, 'SCO_KEEP_RECORD') lazyllm.config.add('sco_resource_type', str, 'N3lS.Ii.I60', 'SCO_RESOURCE_TYPE') lazyllm.config.add('cuda_visible', bool, False, 'CUDA_VISIBLE') +lazyllm.config.add('k8s_env_name', str, '', 'K8S_ENV_NAME') +lazyllm.config.add('k8s_config_path', str, '', 'K8S_CONFIG_PATH') # store cmd, return message and command output. @@ -180,6 +187,860 @@ def _generate_name(self): def __deepcopy__(self, memo=None): raise RuntimeError('Cannot copy Job object') +@final +class K8sLauncher(LazyLLMLaunchersBase): + all_processes = defaultdict(list) + namespace = "default" + + class Job(Job): + def __init__(self, cmd, launcher, *, sync=True): + super().__init__(cmd, launcher, sync=sync) + self.deployment_name = f"deployment-{uuid.uuid4().hex[:8]}" + self.namespace = launcher.namespace + self.volume_configs = launcher.volume_configs + self.gateway_name = launcher.gateway_name + self.gateway_class_name = launcher.gateway_class_name + self.deployment_port = 8080 + self.host = launcher.http_host + self.path = launcher.http_path + self.svc_type = launcher.svc_type + self.gateway_retry = launcher.gateway_retry + self.image = launcher.image + self.resource_config = launcher.resource_config + + def _wrap_cmd(self, cmd): + pythonpath = os.getenv("PYTHONPATH", '') + precmd = (f'''export PYTHONPATH={os.getcwd()}:{pythonpath}:$PYTHONPATH ''' + f'''&& export PATH={os.path.join(os.path.expanduser('~'), '.local/bin')}:$PATH &&''') + if lazyllm.config['k8s_env_name']: + precmd = f"source activate {lazyllm.config['k8s_env_name']} && " + precmd + env_vars = os.environ + lazyllm_vars = {k: v for k, v in env_vars.items() if k.startswith("LAZYLLM")} + if lazyllm_vars: + precmd += " && ".join(f"export {k}={v}" for k, v in lazyllm_vars.items()) + " && " + precmd += '''ifconfig | grep "inet " | awk "{printf \\"LAZYLLMIP %s\\\\n\\", \$2}" &&''' # noqa W605 + port_match = re.search(r"--open_port=(\d+)", cmd) + if port_match: + port = port_match.group(1) + LOG.info(f"Port: {port}") + self.deployment_port = int(port) + else: + LOG.info("Port not found") + raise ValueError("Failed to obtain application port.") + return precmd + " " + cmd + + def _create_deployment_spec(self, cmd, volume_configs=None): + container = k8s.client.V1Container( + name=self.deployment_name, + image=self.image, + image_pull_policy="IfNotPresent", + command=["bash", "-c", cmd], + resources=k8s.client.V1ResourceRequirements( + requests=self.resource_config.get("requests", {"cpu": "2", "memory": "16Gi"}), + limits=self.resource_config.get("requests", {"cpu": "2", "memory": "16Gi"}) + ), + volume_mounts=[] if not volume_configs else [ + k8s.client.V1VolumeMount( + mount_path=vol_config["mount_path"] if "__CURRENT_DIR__" not in vol_config['mount_path'] + else vol_config['mount_path'].replace("__CURRENT_DIR__", os.getcwd()), + name=vol_config["name"] + ) for vol_config in volume_configs + ] + ) + + volumes = [] + if volume_configs: + for vol_config in volume_configs: + if "nfs_server" in vol_config and "nfs_path" in vol_config: + volumes.append( + k8s.client.V1Volume( + name=vol_config["name"], + nfs=k8s.client.V1NFSVolumeSource( + server=vol_config["nfs_server"], + path=vol_config["nfs_path"] if "__CURRENT_DIR__" not in vol_config['nfs_path'] + else vol_config['nfs_path'].replace("__CURRENT_DIR__", os.getcwd()), + read_only=vol_config.get("read_only", False) + ) + ) + ) + elif "host_path" in vol_config: + volumes.append( + k8s.client.V1Volume( + name=vol_config["name"], + host_path=k8s.client.V1HostPathVolumeSource( + path=vol_config["host_path"] if "__CURRENT_DIR__" not in vol_config['host_path'] + else vol_config['host_path'].replace("__CURRENT_DIR__", os.getcwd()), + type="Directory" + ) + ) + ) + else: + LOG.error(f"{vol_config} configuration error.") + raise + + template = k8s.client.V1PodTemplateSpec( + metadata=k8s.client.V1ObjectMeta(labels={"app": self.deployment_name}), + spec=k8s.client.V1PodSpec(restart_policy="Always", containers=[container], volumes=volumes) + ) + deployment_spec = k8s.client.V1DeploymentSpec( + replicas=1, + template=template, + selector=k8s.client.V1LabelSelector(match_labels={"app": self.deployment_name}) + ) + return k8s.client.V1Deployment( + api_version="apps/v1", + kind="Deployment", + metadata=k8s.client.V1ObjectMeta(name=self.deployment_name), + spec=deployment_spec + ) + + def _create_deployment(self, *, fixed=False): + api_instance = k8s.client.AppsV1Api() + cmd = self.get_executable_cmd(fixed=fixed) + deployment = self._create_deployment_spec(cmd.cmd, self.volume_configs) + try: + api_instance.create_namespaced_deployment( + body=deployment, + namespace=self.namespace + ) + LOG.info(f"Kubernetes Deployment '{self.deployment_name}' created successfully.") + except k8s.client.rest.ApiException as e: + LOG.error(f"Exception when creating Kubernetes Deployment: {e}") + raise + + def _delete_deployment(self, wait_for_completion=True, timeout=60, check_interval=5): + k8s.config.load_kube_config(self.launcher.kube_config_path) + api_instance = k8s.client.AppsV1Api() + try: + api_instance.delete_namespaced_deployment( + name=self.deployment_name, + namespace=self.namespace, + body=k8s.client.V1DeleteOptions(propagation_policy="Foreground") + ) + LOG.info(f"Kubernetes Deployment {self.deployment_name} deleted.") + + if wait_for_completion: + self._wait_for_deployment_deletion(timeout=timeout, check_interval=check_interval) + except k8s.client.rest.ApiException as e: + if e.status == 404: + LOG.info(f"Kubernetes Deployment '{self.deployment_name}' already deleted.") + else: + LOG.error(f"Exception when deleting Kubernetes Deployment: {e}") + raise + + def _wait_for_deployment_deletion(self, timeout, check_interval): + api_instance = k8s.client.AppsV1Api() + start_time = time.time() + while time.time() - start_time < timeout: + try: + api_instance.read_namespaced_deployment(name=self.deployment_name, namespace=self.namespace) + LOG.info(f"Waiting for Kubernetes Deployment '{self.deployment_name}' to be deleted...") + except k8s.client.rest.ApiException as e: + if e.status == 404: + LOG.info(f"Kubernetes Deployment '{self.deployment_name}' successfully deleted.") + return + else: + LOG.error(f"Error while checking Deployment deletion status: {e}") + raise + time.sleep(check_interval) + LOG.warning(f"Timeout while waiting for Kubernetes Deployment '{self.deployment_name}' to be deleted.") + + def _expose_deployment(self): + api_instance = k8s.client.CoreV1Api() + service = k8s.client.V1Service( + api_version="v1", + kind="Service", + metadata=k8s.client.V1ObjectMeta(name=f"service-{self.deployment_name}"), + spec=k8s.client.V1ServiceSpec( + selector={"app": self.deployment_name}, + ports=[k8s.client.V1ServicePort(port=self.deployment_port, target_port=self.deployment_port)], + type="ClusterIP" + ) + ) + try: + api_instance.create_namespaced_service( + namespace=self.namespace, + body=service + ) + LOG.info(f"Kubernetes Service 'service-{self.deployment_name}' created and exposed successfully.") + except k8s.client.rest.ApiException as e: + LOG.error(f"Exception when creating Service: {e}") + raise + + def _delete_service(self, wait_for_completion=True, timeout=60, check_interval=5): + k8s.config.load_kube_config(self.launcher.kube_config_path) + svc_instance = k8s.client.CoreV1Api() + service_name = f"service-{self.deployment_name}" + try: + svc_instance.delete_namespaced_service( + name=service_name, + namespace=self.namespace, + body=k8s.client.V1DeleteOptions(propagation_policy="Foreground") + ) + LOG.info(f"Kubernetes Service '{service_name}' deleted.") + + if wait_for_completion: + self._wait_for_service_deletion(service_name=service_name, + timeout=timeout, + check_interval=check_interval) + except k8s.client.rest.ApiException as e: + if e.status == 404: + LOG.info(f"Kubernetes Service '{service_name}' already deleted.") + else: + LOG.error(f"Exception when deleting Kubernetes Service: {e}") + raise + + def _wait_for_service_deletion(self, service_name, timeout, check_interval): + svc_instance = k8s.client.CoreV1Api() + start_time = time.time() + while time.time() - start_time < timeout: + try: + svc_instance.read_namespaced_service(name=service_name, namespace=self.namespace) + LOG.info(f"Waiting for kubernetes Service '{service_name}' to be deleted...") + except k8s.client.rest.ApiException as e: + if e.status == 404: + LOG.info(f"Kubernetes Service '{service_name}' successfully deleted.") + return + else: + LOG.error(f"Error while checking Service deletion status: {e}") + raise + time.sleep(check_interval) + LOG.warning(f"Timeout while waiting for kubernetes Service '{service_name}' to be deleted.") + + def _create_or_update_gateway(self): + networking_api = k8s.client.CustomObjectsApi() + gateway_spec = { + "apiVersion": "gateway.networking.k8s.io/v1beta1", + "kind": "Gateway", + "metadata": { + "name": self.gateway_name, + "namespace": self.namespace, + "annotations": { + "networking.istio.io/service-type": self.svc_type + } + }, + "spec": { + "gatewayClassName": self.gateway_class_name, + "listeners": [ + { + "name": f"httproute-{self.deployment_name}", + "port": self.deployment_port, + "protocol": "HTTP", + } + ] + } + } + + try: + existing_gateway = networking_api.get_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1beta1", + namespace=self.namespace, + plural="gateways", + name=self.gateway_name + ) + + existing_gateway['spec']["listeners"].extend(gateway_spec['spec']["listeners"]) + networking_api.replace_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1beta1", + namespace=self.namespace, + plural="gateways", + name=self.gateway_name, + body=existing_gateway + ) + LOG.info(f"Kubernetes Gateway '{self.gateway_name}' updated successfully.") + except k8s.client.rest.ApiException as e: + if e.status == 404: + try: + networking_api.create_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1beta1", + namespace=self.namespace, + plural="gateways", + body=gateway_spec + ) + LOG.info(f"Kubernetes Gateway '{self.gateway_name}' created successfully.") + except k8s.client.rest.ApiException as e_create: + LOG.error(f"Exception when creating Gateway: {e_create}") + raise + else: + LOG.error(f"Exception when updating Gateway: {e}") + raise + + def _delete_or_update_gateway(self, wait_for_completion=True, timeout=60, check_interval=5): + k8s.config.load_kube_config(self.launcher.kube_config_path) + gateway_instance = k8s.client.CustomObjectsApi() + try: + gateway = gateway_instance.get_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1beta1", + namespace=self.namespace, + plural="gateways", + name=self.gateway_name + ) + + listeners = gateway['spec']['listeners'] + gateway['spec']['listeners'] = [ + listener for listener in listeners if listener['name'] != f"httproute-{self.deployment_name}" + ] + + if gateway['spec']['listeners']: + gateway_instance.replace_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1beta1", + namespace=self.namespace, + plural="gateways", + name=self.gateway_name, + body=gateway + ) + LOG.info(f"Kubernetes Gateway '{self.gateway_name}' deleted updated.") + + if wait_for_completion: + self._wait_for_gateway_update(timeout=timeout, check_interval=check_interval) + else: + gateway_instance.delete_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1beta1", + namespace=self.namespace, + plural="gateways", + name=self.gateway_name + ) + LOG.info(f"Kubernetes Gateway '{self.gateway_name}' deleted.") + + if wait_for_completion: + self._wait_for_gateway_deletion(timeout=timeout, check_interval=check_interval) + except k8s.client.rest.ApiException as e: + if e.status == 404: + LOG.info(f"Gateway '{self.gateway_name}' already deleted.") + else: + LOG.error(f"Exception when deleting or updating Gateway: {e}") + raise + + def _wait_for_gateway_deletion(self, timeout, check_interval): + gateway_instance = k8s.client.CustomObjectsApi() + start_time = time.time() + while time.time() - start_time < timeout: + try: + gateway_instance.get_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1beta1", + namespace=self.namespace, + plural="gateways", + name=self.gateway_name + ) + LOG.info(f"Waiting for Gateway '{self.gateway_name}' to be deleted...") + except k8s.client.rest.ApiException as e: + if e.status == 404: + LOG.info(f"Gateway '{self.gateway_name}' successfully deleted.") + return + else: + LOG.error(f"Error while checking Gateway deletion status: {e}") + raise + time.sleep(check_interval) + LOG.warning(f"Timeout while waiting for Gateway '{self.gateway_name}' to be deleted.") + + def _wait_for_gateway_update(self, timeout, check_interval): + gateway_instance = k8s.client.CustomObjectsApi() + start_time = time.time() + while time.time() - start_time < timeout: + try: + gateway_instance.get_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1beta1", + namespace=self.namespace, + plural="gateways", + name=self.gateway_name + ) + LOG.info(f"Gateway '{self.gateway_name}' status check passed.") + return + except k8s.client.rest.ApiException as e: + LOG.error(f"Error while checking Gateway update status: {e}") + raise + time.sleep(check_interval) + LOG.warning(f"Timeout while waiting for Gateway '{self.gateway_name}' update.") + + def _create_httproute(self): + custom_api = k8s.client.CustomObjectsApi() + + httproute_spec = { + "apiVersion": "gateway.networking.k8s.io/v1beta1", + "kind": "HTTPRoute", + "metadata": { + "name": f"httproute-{self.deployment_name}", + "namespace": self.namespace + }, + "spec": { + "parentRefs": [{ + "name": self.gateway_name, + "port": self.deployment_port + }], + "rules": [{ + "matches": [{ + "path": { + "type": "PathPrefix", + "value": self.path + } + }], + "backendRefs": [{ + "name": f"service-{self.deployment_name}", + "port": self.deployment_port + }] + }] + } + } + + if self.host: + httproute_spec["spec"]["hostnames"] = [self.host] + + try: + custom_api.create_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1beta1", + namespace=self.namespace, + plural="httproutes", + body=httproute_spec + ) + LOG.info(f"Kubernetes HTTPRoute 'httproute-{self.deployment_name}' created successfully.") + except k8s.client.rest.ApiException as e: + LOG.error(f"Exception when creating HTTPRoute: {e}") + raise + + def _delete_httproute(self, wait_for_deletion=True, timeout=60, check_interval=5): + k8s.config.load_kube_config(self.launcher.kube_config_path) + httproute_instance = k8s.client.CustomObjectsApi() + httproute_name = f"httproute-{self.deployment_name}" + try: + httproute_instance.delete_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1beta1", + namespace=self.namespace, + plural="httproutes", + name=httproute_name + ) + LOG.info(f"Kubernetes HTTPRoute '{httproute_name}' delete initiated.") + except k8s.client.rest.ApiException as e: + if e.status == 404: + LOG.info(f"HTTPRoute '{httproute_name}' already deleted.") + return + else: + LOG.error(f"Exception when deleting HTTPRoute: {e}") + raise + + if wait_for_deletion: + start_time = time.time() + while time.time() - start_time < timeout: + try: + httproute_instance.get_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1beta1", + namespace=self.namespace, + plural="httproutes", + name=httproute_name + ) + LOG.info(f"Waiting for HTTPRoute '{httproute_name}' to be deleted...") + except k8s.client.rest.ApiException as e: + if e.status == 404: + LOG.info(f"HTTPRoute '{httproute_name}' successfully deleted.") + return + else: + LOG.error(f"Error while checking HTTPRoute status: {e}") + raise + time.sleep(check_interval) + LOG.warning(f"Timeout while waiting for HTTPRoute '{httproute_name}' to be deleted.") + + def _start(self, *, fixed=False): + self._create_deployment(fixed=fixed) + self._expose_deployment() + self._create_or_update_gateway() + self._create_httproute() + self.jobid = self._get_jobid() + self.launcher.all_processes[self.launcher._id].append((self.jobid, self)) + ret = self.wait() + LOG.info(ret) + + def stop(self): + self._delete_or_update_gateway() + self._delete_httproute() + self._delete_service() + self._delete_deployment() + + def _get_jobid(self): + return f"service-{self.deployment_name}" + + def _get_gateway_service_name(self): + core_api = k8s.client.CoreV1Api() + try: + services = core_api.list_namespaced_service(namespace=self.namespace) + + for service in services.items: + labels = service.metadata.labels + if labels and ("gateway" in labels.get("app", "") or self.gateway_name in service.metadata.name): + LOG.info(f"Kubernetes Gateway service name: {service.metadata.name}") + return service.metadata.name + + LOG.warning("No Service was found corresponding to the specified Gateway.") + return None + except k8s.client.rest.ApiException as e: + LOG.error(f"Exception when retrieving Gateway Service: {e}") + return None + + def _get_gateway_deployment_name(self): # noqa: C901 + core_api = k8s.client.CoreV1Api() + apps_v1 = k8s.client.AppsV1Api() + + gateway_service_name = self._get_gateway_service_name() + try: + service = core_api.read_namespaced_service(gateway_service_name, self.namespace) + selector = service.spec.selector + if selector: + label_selector = ",".join(f"{k}={v}" for k, v in selector.items()) + pods = core_api.list_namespaced_pod(self.namespace, label_selector=label_selector).items + if not pods: + LOG.warning(f"No Pods found for Service '{gateway_service_name}' in namespace " + f"'{self.namespace}'.") + return None + + deployments = set() + for pod in pods: + for owner in pod.metadata.owner_references: + if owner.kind == "ReplicaSet": + rs = apps_v1.read_namespaced_replica_set(owner.name, self.namespace) + for rs_owner in rs.metadata.owner_references: + if rs_owner.kind == "Deployment": + deployments.add(rs_owner.name) + + if deployments: + for deployment_name in deployments: + isRestart = False + deployment = apps_v1.read_namespaced_deployment(deployment_name, self.namespace) + for container in deployment.spec.template.spec.containers: + if container.name == "istio-proxy" and container.image_pull_policy == "Always": + container.image_pull_policy = "IfNotPresent" + isRestart = True + if isRestart: + apps_v1.replace_namespaced_deployment(name=deployment_name, namespace=self.namespace, + body=deployment) + LOG.info(f"Updated {deployment_name} with imagePullPolicy 'IfNotPresent'") + return list(deployments) + else: + LOG.warning(f"No Deployment found for Gateway '{self.gateway_name}' in namespace " + f"'{self.namespace}'.") + return None + else: + LOG.warning(f"Kubernetes Service '{gateway_service_name}' does not have a selector.") + return None + except k8s.client.rest.ApiException as e: + LOG.error(f"Error fetching Service '{gateway_service_name}': {e}") + return None + + def _get_gateway_ip(self): + core_api = k8s.client.CoreV1Api() + gateway_service_name = self._get_gateway_service_name() + if gateway_service_name is None: + raise ValueError("Kubernetes Gateway service name not found.") + try: + service = core_api.read_namespaced_service( + name=gateway_service_name, + namespace=self.namespace + ) + + if service.spec.type == "LoadBalancer": + if service.status.load_balancer.ingress: + ip = service.status.load_balancer.ingress[0].ip + return ip + else: + LOG.warning("The LoadBalancer IP has not been assigned yet.") + return None + elif service.spec.type == "NodePort": + nodes = core_api.list_node() + node_ip = nodes.items[0].status.addresses[0].address + return node_ip + elif service.spec.type == "ClusterIP": + return service.spec.cluster_ip + else: + LOG.warning("Unsupported Service type.") + return None + except k8s.client.rest.ApiException as e: + LOG.error(f"Exception when retrieving gateway IP: {e}") + return None + + def _get_httproute_host(self): + custom_api = k8s.client.CustomObjectsApi() + try: + httproute = custom_api.get_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1beta1", + namespace=self.namespace, + plural="httproutes", + name=f"httproute-{self.deployment_name}" + ) + + hostnames = httproute.get("spec", {}).get("hostnames", []) + if hostnames: + return hostnames[0] + else: + LOG.warning("Kubernetes HTTPRoute has no configured hostnames.") + return None + except k8s.client.rest.ApiException as e: + LOG.error(f"Exception when retrieving HTTPRoute host: {e}") + return None + + def get_jobip(self): + host = self._get_httproute_host() + ip = self._get_gateway_ip() + LOG.info(f"gateway ip: {ip}, hostname: {host}") + return host if host else ip + + def wait_for_deployment_ready(self, timeout=300): + api_instance = k8s.client.AppsV1Api() + start_time = time.time() + while time.time() - start_time < timeout: + try: + deployment_status = api_instance.read_namespaced_deployment_status( + name=self.deployment_name, + namespace=self.namespace + ).status + if deployment_status.available_replicas and deployment_status.available_replicas > 0: + LOG.info(f"Kubernetes Deployment '{self.deployment_name}' is running.") + return True + time.sleep(2) + except k8s.client.rest.ApiException as e: + LOG.error(f"Exception when reading Deployment status: {e}") + raise + LOG.warning(f"Timed out waiting for Deployment '{self.deployment_name}' to be ready.") + return False + + def wait_for_service_ready(self, timeout=300): + svc_instance = k8s.client.CoreV1Api() + start_time = time.time() + while time.time() - start_time < timeout: + try: + service = svc_instance.read_namespaced_service( + name=f"service-{self.deployment_name}", + namespace=self.namespace + ) + if service.spec.type == "LoadBalancer" and service.status.load_balancer.ingress: + ip = service.status.load_balancer.ingress[0].ip + LOG.info(f"Kubernetes Service 'service-{self.deployment_name}' is ready with IP: {ip}") + return ip + elif service.spec.cluster_ip: + LOG.info(f"Kubernetes Service 'service-{self.deployment_name}' is ready with ClusterIP: " + f"{service.spec.cluster_ip}") + return service.spec.cluster_ip + elif service.spec.type == "NodePort": + node_ports = [p.node_port for p in service.spec.ports] + if node_ports: + nodes = svc_instance.list_node() + for node in nodes.items: + for address in node.status.addresses: + if address.type == "InternalIP": + node_ip = address.address + LOG.info(f"Kubernetes Service 'service-{self.deployment_name}' is ready on " + f"NodePort(s): {node_ports} at Node IP: {node_ip}") + return {"ip": node_ip, "ports": node_ports} + elif address.type == "ExternalIP": + node_ip = address.address + LOG.info(f"Kubernetes Service 'service-{self.deployment_name}' is ready on " + f"NodePort(s): {node_ports} at External Node IP: {node_ip}") + return {"ip": node_ip, "ports": node_ports} + LOG.info(f"Kubernetes Service 'service-{self.deployment_name}' is not ready yet. Retrying...") + time.sleep(2) + except k8s.client.rest.ApiException as e: + LOG.error(f"Exception when reading Service status: {e}") + raise + LOG.warning(f"Timed out waiting for Service 'service-{self.deployment_name}' to be ready.") + return None + + def _is_gateway_ready(self, timeout): + url = f"http://{self.get_jobip()}:{self.deployment_port}{self.path}" + for i in range(self.gateway_retry): + try: + response = requests.get(url, timeout=timeout) + if response.status_code != 503: + LOG.info(f"Kubernetes Service is ready at '{url}'") + self.queue.put(f"Uvicorn running on {url}") + return True + else: + LOG.info(f"Kubernetes Service at '{url}' returned status code {response.status_code}") + except requests.RequestException as e: + LOG.error(f"Failed to access service at '{url}': {e}") + raise + time.sleep(timeout) + + self.queue.put(f"ERROR: Kubernetes Service failed to start on '{url}'.") + return False + + def wait_for_gateway(self, timeout=300, interval=5): # noqa: C901 + core_v1 = k8s.client.CoreV1Api() + apps_v1 = k8s.client.AppsV1Api() + gateway_service_name = self._get_gateway_service_name() + gateway_deployment_names = self._get_gateway_deployment_name() + service_ready = False + deployment_ready = False + + start_time = time.time() + while time.time() - start_time < timeout: + if not service_ready: + try: + service = core_v1.read_namespaced_service(gateway_service_name, self.namespace) + if service.spec.type in ["NodePort", "LoadBalancer"]: + if service.spec.type == "LoadBalancer": + if service.status.load_balancer.ingress: + LOG.info(f"Kubernetes Service '{gateway_service_name}' is ready with " + "LoadBalancer IP.") + service_ready = True + else: + LOG.info(f"Kubernetes Service '{gateway_service_name}' LoadBalancer IP " + "not available yet.") + service_ready = False + else: + if any(port.node_port for port in service.spec.ports): + LOG.info(f"Kubernetes Service '{gateway_service_name}' is ready with " + "NodePort configuration.") + service_ready = True + else: + LOG.info(f"Kubernetes Service '{gateway_service_name}' NodePort not assigned yet.") + service_ready = False + else: + LOG.error(f"Unexpected Kubernetes Service type: {service.spec.type}.") + service_ready = False + except k8s.client.rest.ApiException as e: + LOG.error(f"Kubernetes Service '{gateway_service_name}' not found yet: {e}") + service_ready = False + if not deployment_ready: + for deployment_name in gateway_deployment_names: + try: + deployment = apps_v1.read_namespaced_deployment(deployment_name, self.namespace) + if deployment.status.available_replicas and deployment.status.available_replicas > 0: + LOG.info(f"Kubernetes Deployment '{deployment_name}' is ready with " + f"{deployment.status.available_replicas} replicas.") + deployment_ready = True + break + else: + LOG.info(f"Kubernetes Deployment '{deployment_name}' is not fully ready yet.") + deployment_ready = False + except k8s.client.rest.ApiException as e: + LOG.warning(f"Kubernetes Deployment '{deployment_name}' not found yet: {e}") + deployment_ready = False + + if service_ready and deployment_ready and self._is_gateway_ready(timeout=interval): + LOG.info(f"Kubernetes Gateway '{self.gateway_name}' is fully ready.") + return True + + time.sleep(interval) + + LOG.error(f"Kubernetes Gateway '{self.gateway_name}' failed to become ready with {timeout} seconds.") + return False + + def wait_for_httproute(self, timeout=300): + custom_api = k8s.client.CustomObjectsApi() + start_time = time.time() + while time.time() - start_time < timeout: + try: + httproutes = custom_api.list_namespaced_custom_object( + group="gateway.networking.k8s.io", + version="v1beta1", + namespace=self.namespace, + plural="httproutes" + ).get('items', []) + + for httproute in httproutes: + if httproute['metadata']['name'] == f"httproute-{self.deployment_name}": + LOG.info(f"Kubernetes HTTPRoute 'httproute-{self.deployment_name}' is ready.") + return True + LOG.info(f"Waiting for HTTPRoute 'httproute-{self.deployment_name}' to be ready...") + except k8s.client.rest.ApiException as e: + LOG.error(f"Exception when checking HTTPRoute status: {e}") + raise + + time.sleep(2) + LOG.warning(f"Timeout waiting for HTTPRoute 'httproute-{self.deployment_name}' to be ready.") + return False + + def wait(self): + deployment_ready = self.wait_for_deployment_ready() + if not deployment_ready: + raise TimeoutError("Kubernetes Deployment did not become ready in time.") + + service_ip = self.wait_for_service_ready() + if not service_ip: + raise TimeoutError("Kubernetes Service did not become ready in time.") + + httproute_ready = self.wait_for_httproute() + if not httproute_ready: + raise TimeoutError("Kubernetes Httproute did not become ready in time.") + + gateway_ready = self.wait_for_gateway() + if not gateway_ready: + raise TimeoutError("Kubernetes Gateway did not become ready in time.") + + return {"deployment": Status.Running, "service_ip": service_ip, + "gateway": Status.Running, "httproute": Status.Running} + + @property + def status(self): + api_instance = k8s.client.AppsV1Api() + try: + deployment_status = api_instance.read_namespaced_deployment_status( + name=self.deployment_name, + namespace=self.namespace + ).status + if deployment_status.available_replicas and deployment_status.available_replicas > 0: + return Status.Running + else: + return Status.Pending + except k8s.client.rest.ApiException as e: + LOG.error(f"Exception when reading Deployment status: {e}") + return Status.Failed + + def __init__(self, kube_config_path=None, volume_configs=None, image=None, resource_config=None, + namespace=None, gateway_name=None, gateway_class_name=None, host=None, path=None, + svc_type: Literal["LoadBalancer", "NodePort", "ClusterIP"] = None, retry=3, + sync=True, ngpus=None, **kwargs): + super().__init__() + self.gateway_retry = retry + self.sync = sync + self.ngpus = ngpus + config_data = self._read_config_file(lazyllm.config['k8s_config_path']) if lazyllm.config['k8s_config_path'] \ + else {} + self.volume_configs = volume_configs if volume_configs else config_data.get('volume', []) + self.image = image if image else config_data.get('container_image', "lazyllm/lazyllm:k8s_launcher") + self.resource_config = resource_config if resource_config else config_data.get('resource', {}) + self.kube_config_path = kube_config_path if kube_config_path \ + else config_data.get('kube_config_path', "~/.kube/config") + self.svc_type = svc_type if svc_type else config_data.get("svc_type", "LoadBalancer") + self.namespace = namespace if namespace else config_data.get("namespace", "default") + self.gateway_name = gateway_name if gateway_name else config_data.get("gateway_name", "lazyllm-gateway") + self.gateway_class_name = gateway_class_name if gateway_class_name \ + else config_data.get("gateway_class_name", "istio") + self.http_host = host if host else config_data.get("host", None) + self.http_path = path if path else config_data.get("path", '/generate') + + def _read_config_file(self, file_path): + assert os.path.isabs(file_path), "Resource config file path must be an absolute path." + with open(file_path, 'r') as fp: + try: + data = yaml.safe_load(fp) + return data + except yaml.YAMLError as e: + LOG.error(f"Exception when reading resource configuration file: {e}") + raise ValueError("Kubernetes resource configuration file format error.") + + def makejob(self, cmd): + k8s.config.load_kube_config(self.kube_config_path) + return K8sLauncher.Job(cmd, launcher=self, sync=self.sync) + + def launch(self, f, *args, **kw): + if isinstance(f, K8sLauncher.Job): + f.start() + LOG.info("Launcher started successfully.") + self.job = f + return f.return_value + elif callable(f): + LOG.info("Async execution in Kubernetes is not supported currently.") + raise RuntimeError("Kubernetes launcher requires a Deployment object.") + @final class EmptyLauncher(LazyLLMLaunchersBase): all_processes = defaultdict(list) @@ -655,7 +1516,7 @@ def __new__(cls, *args, sync=False, ngpus=1, **kwargs): def cleanup(): # empty - for m in (EmptyLauncher, SlurmLauncher, ScoLauncher): + for m in (EmptyLauncher, SlurmLauncher, ScoLauncher, K8sLauncher): while m.all_processes: _, vs = m.all_processes.popitem() for k, v in vs: diff --git a/lazyllm/thirdparty/__init__.py b/lazyllm/thirdparty/__init__.py index dac0f62e..3c75e943 100644 --- a/lazyllm/thirdparty/__init__.py +++ b/lazyllm/thirdparty/__init__.py @@ -80,6 +80,6 @@ def __getattribute__(self, __name): modules = ['redis', 'huggingface_hub', 'jieba', 'modelscope', 'pandas', 'jwt', 'rank_bm25', 'redisvl', 'datasets', 'deepspeed', 'fire', 'numpy', 'peft', 'torch', 'transformers', 'collie', 'faiss', 'flash_attn', 'google', 'lightllm', 'vllm', 'ChatTTS', 'wandb', 'funasr', 'sklearn', 'torchvision', 'scipy', 'pymilvus', - 'sentence_transformers', 'gradio', 'chromadb', 'nltk', 'PIL', 'httpx', 'bm25s'] + 'sentence_transformers', 'gradio', 'chromadb', 'nltk', 'PIL', 'httpx', 'bm25s', 'kubernetes'] for m in modules: vars()[m] = PackageWrapper(m) diff --git a/tests/basic_tests/test_launcher.py b/tests/basic_tests/test_launcher.py index 53223aed..a449ed0f 100644 --- a/tests/basic_tests/test_launcher.py +++ b/tests/basic_tests/test_launcher.py @@ -30,6 +30,14 @@ def test_sco(self): ) assert launcher.partition == 'pat_rd' + def test_k8s(self): + launcher = launchers.k8s( + kube_config_path="~/.kube/config", + namespace="lazyllm", + host="myapp.lazyllm.com" + ) + assert launcher.namespace == "lazyllm" + def test_remote(self): # empty launcher origin_launcher = lazyllm.config.impl['launcher'] @@ -54,6 +62,13 @@ def test_remote(self): ) assert type(launcher) is launchers.sco assert not launcher.sync + os.environ["LAZYLLM_DEFAULT_LAUNCHER"] = 'k8s' + lazyllm.config.add('launcher', str, 'empty', 'DEFAULT_LAUNCHER') + launcher = launchers.remote( + sync=True + ) + assert type(launcher) is launchers.k8s + assert launcher.sync os.environ["LAZYLLM_DEFAULT_LAUNCHER"] = origin_launcher lazyllm.config.add('launcher', str, 'empty', 'DEFAULT_LAUNCHER') diff --git a/tests/k8s_tests/test_example.py b/tests/k8s_tests/test_example.py new file mode 100644 index 00000000..2a1a3087 --- /dev/null +++ b/tests/k8s_tests/test_example.py @@ -0,0 +1,166 @@ +import lazyllm +from lazyllm import OnlineEmbeddingModule, launchers +from lazyllm.engine import LightEngine +import time +from gradio_client import Client + + +class TestExample(object): + def setup_method(self): + self.launcher = launchers.k8s() + + def teardown_method(self): + lazyllm.launcher.cleanup() + + def test_fc(self): + class Graph(lazyllm.ModuleBase): + def __init__(self): + super().__init__() + self._engine_conf = { + 'nodes': [ + { + 'id': ( + 'a0550cf15ea0f29f4d172d867d0df559c3c75a5b3fefb8ff419c61dd87e6d8f9' + '_1729740030006' + ), + 'kind': 'FunctionCall', + 'name': '1729740030006', + 'args': { + 'llm': '4ec882c0_d7a6_4796_b498_ca1ee6b2bd27', + 'tools': ['782b588f_7e87_415f_9e3b_3bfc9de2de7f'], + 'algorithm': 'ReWOO' + } + } + ], + 'edges': [ + { + 'iid': '__start__', + 'oid': ( + 'a0550cf15ea0f29f4d172d867d0df559c3c75a5b3fefb8ff419c61dd87e6d8f9' + '_1729740030006' + ) + }, + { + 'iid': ( + 'a0550cf15ea0f29f4d172d867d0df559c3c75a5b3fefb8ff419c61dd87e6d8f9' + '_1729740030006' + ), + 'oid': '__end__' + } + ], + 'resources': [ + { + 'id': '782b588f_7e87_415f_9e3b_3bfc9de2de7f', + 'kind': 'HttpTool', + 'name': 'Func20', + 'extras': {'provider_name': 'Func20'}, + 'args': { + 'timeout': 1, + 'doc': ( + '\n奇数偶数判断\n\nArgs:\n number (int): 输入数值\n\nReturns:\n output (str): 输出' + ), + 'code_str': ( + 'def is_even_or_odd(number):\r\n' + ' """\r\n' + ' 定义一个函数,用于判断一个数字是奇数还是偶数\r\n' + '\r\n' + ' args:\r\n' + ' int: number\r\n' + '\r\n' + ' returns:\r\n' + ' str: result\r\n' + ' """\r\n' + ' if number % 2 == 0:\r\n' + ' return f"这是工具代码的输出:{number}偶数"\r\n' + ' else:\r\n' + ' return f"这是工具代码的输出:{number}是奇数"' + ) + } + }, + { + 'id': '4ec882c0_d7a6_4796_b498_ca1ee6b2bd27', + 'kind': 'OnlineLLM', + 'name': '4ec882c0_d7a6_4796_b498_ca1ee6b2bd27', + 'args': { + 'source': 'glm', + 'prompt': None, + 'stream': False + } + } + ] + } + self.start_engine() + + def start_engine(self): + self._engine = LightEngine() + self._eid = self._engine.start(self._engine_conf.get("nodes", []), + self._engine_conf.get("edges", []), + self._engine_conf.get("resources", [])) + + def forward(self, query, **kw): + res = self._engine.run(self._eid, query) + return res + g = Graph() + web_module = lazyllm.ServerModule(g, launcher=self.launcher) + web_module.start() + r = web_module("3是奇数还是偶数") + assert '3是奇数' in r + + def test_rag(self): + def demo(query): + prompt = ("作为国学大师,你将扮演一个人工智能国学问答助手的角色,完成一项对话任务。在这个任务中,你需要根据给定的已知国学篇章以及" + "问题,给出你的结论。请注意,你的回答应基于给定的国学篇章,而非你的先验知识,且注意你回答的前后逻辑不要出现" + "重复,且不需要提到具体文件名称。\n任务示例如下:\n示例国学篇章:《礼记 大学》大学之道,在明明德,在亲民,在止于至善" + "。\n问题:什么是大学?\n回答:“大学”在《礼记》中代表的是一种理想的教育和社会实践过程,旨在通过个人的" + "道德修养和社会实践达到最高的善治状态。\n注意以上仅为示例,禁止在下面任务中提取或使用上述示例已知国学篇章。" + "\n现在,请对比以下给定的国学篇章和给出的问题。如果已知国学篇章中有该问题相关的原文,请提取相关原文出来。\n" + "已知国学篇章:{context_str}\n") + resources = [ + dict(id='00', kind='OnlineEmbedding', name='e1', args=dict(source='glm')), + dict(id='0', kind='Document', name='d1', args=dict(dataset_path='rag_master', embed='00', node_group=[ + dict(name='sentence', transform='SentenceSplitter', chunk_size=100, chunk_overlap=10)]))] + nodes = [dict(id='1', kind='Retriever', name='ret1', + args=dict(doc='0', group_name='CoarseChunk', similarity='bm25_chinese', topk=3)), + dict(id='2', kind='Retriever', name='ret2', + args=dict(doc='0', group_name='sentence', similarity='cosine', topk=3)), + dict(id='3', kind='JoinFormatter', name='c', args=dict(type='sum')), + dict(id='4', kind='Reranker', name='rek1', + args=dict(type='ModuleReranker', output_format='content', join=True, + arguments=dict(model=OnlineEmbeddingModule(type="rerank"), topk=1))), + dict(id='5', kind='Code', name='c1', + args='def test(nodes, query): return dict(context_str=nodes, query=query)'), + dict(id='6', kind='OnlineLLM', name='m1', + args=dict(source='glm', prompt=dict(system=prompt, user='问题: {query}')))] + edges = [dict(iid='__start__', oid='1'), dict(iid='__start__', oid='2'), dict(iid='1', oid='3'), + dict(iid='2', oid='3'), dict(iid='3', oid='4'), dict(iid='__start__', oid='4'), + dict(iid='4', oid='5'), dict(iid='__start__', oid='5'), dict(iid='5', oid='6'), + dict(iid='6', oid='__end__')] + engine = LightEngine() + gid = engine.start(nodes, edges, resources) + r = engine.run(gid, query) + return r + web_module = lazyllm.ServerModule(demo, launcher=self.launcher) + web_module.start() + r = web_module("何为天道?") + assert '观天之道,执天之行' in r or '天命之谓性,率性之谓道' in r + + def test_engine_server(self): + nodes = [dict(id='1', kind='Code', name='m1', args='def test(x: int):\n return 2 * x\n')] + edges = [dict(iid='__start__', oid='1'), dict(iid='1', oid='__end__')] + resources = [dict(id='2', kind='server', name='s1', args=dict(port=None, launcher=self.launcher)), + dict(id='3', kind='web', name='w1', args=dict(port=None, title='网页', history=[], audio=False)) + ] + engine = LightEngine() + gid = engine.start(nodes, edges, resources, gid='graph-1') + assert engine.status(gid) == {'1': 'running', '2': lazyllm.launcher.Status.Running, '3': 'running'} + assert engine.run(gid, 1) == 2 + time.sleep(3) + web = engine.build_node('graph-1').func._web + assert engine.build_node('graph-1').func.api_url is not None + assert engine.build_node('graph-1').func.web_url == web.url + client = Client(web.url, download_files=web.cach_path) + chat_history = [['123', None]] + ans = client.predict(False, chat_history, False, False, api_name="/_respond_stream") + assert ans[0][-1][-1] == '123123' + client.close() + web.stop()