From 5038dc6ec14ac4df9d96d5dd26df960128e9ce35 Mon Sep 17 00:00:00 2001 From: Cathy Zhang Date: Fri, 27 Dec 2024 13:47:43 +0000 Subject: [PATCH 1/7] Provide unified scalable deployment and benchmarking support for examples Signed-off-by: Cathy Zhang --- ChatQnA/chatqna.yaml | 61 ++++ deploy.py | 658 ++++++++++++++++++++++++++++++++++++++++ deploy_and_benchmark.py | 288 ++++++++++++++++++ 3 files changed, 1007 insertions(+) create mode 100644 ChatQnA/chatqna.yaml create mode 100644 deploy.py create mode 100644 deploy_and_benchmark.py diff --git a/ChatQnA/chatqna.yaml b/ChatQnA/chatqna.yaml new file mode 100644 index 000000000..5d6e3d21b --- /dev/null +++ b/ChatQnA/chatqna.yaml @@ -0,0 +1,61 @@ +deploy: + device: gaudi + version: 1.1.0 + modelUseHostPath: /mnt/models + HUGGINGFACEHUB_API_TOKEN: "" + node: [1, 2, 4, 8] + namespace: "" + + services: + backend: + instance_num: [2, 2, 4, 8] + cores_per_instance: "" + memory_capacity: "" + + teirerank: + enabled: True + model_id: "" + replicaCount: [1, 1, 1, 1] + cards_per_instance: 1 + + tei: + model_id: "" + replicaCount: [1, 2, 4, 8] + cores_per_instance: "" + memory_capacity: "" + + llm: + engine: tgi + model_id: "" + replicaCount: [7, 15, 31, 63] + max_batch_size: [1, 2, 4, 8] + max_input_length: "" + max_total_tokens: "" + max_batch_total_tokens: "" + max_batch_prefill_tokens: "" + cards_per_instance: 1 + + data-prep: + replicaCount: [1, 1, 1, 1] + cores_per_instance: "" + memory_capacity: "" + + retriever-usvc: + replicaCount: [2, 2, 4, 8] + cores_per_instance: "" + memory_capacity: "" + + redis-vector-db: + replicaCount: [1, 1, 1, 1] + cores_per_instance: "" + memory_capacity: "" + + chatqna-ui: + replicaCount: [1, 1, 1, 1] + + nginx: + replicaCount: [1, 1, 1, 1] + +benchmark: + llm: + max_token_size: 1024 # specify the output token size diff --git a/deploy.py b/deploy.py new file mode 100644 index 000000000..6d7506c38 --- /dev/null +++ b/deploy.py @@ -0,0 +1,658 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import argparse +import glob +import json +import os +import subprocess +import sys +import time + +import yaml + +from enum import Enum, auto + +################################################################################ +# # +# HELM VALUES GENERATION SECTION # +# # +################################################################################ + +def configure_node_selectors(values, node_selector, deploy_config): + """Configure node selectors for all services.""" + for service_name, config in deploy_config["services"].items(): + if service_name == "backend": + values["nodeSelector"] = {key: value for key, value in node_selector.items()} + elif service_name == "llm": + engine = config.get("engine", "tgi") + values[engine] = {"nodeSelector": {key: value for key, value in node_selector.items()}} + else: + values[service_name] = {"nodeSelector": {key: value for key, value in node_selector.items()}} + return values + +def configure_replica(values, deploy_config): + """Get replica configuration based on example type and node count.""" + for service_name, config in deploy_config["services"].items(): + if not config.get("replicaCount"): + continue + + if service_name == "llm": + engine = config.get("engine", "tgi") + values[engine]["replicaCount"] = config["replicaCount"] + elif service_name == "backend": + values["replicaCount"] = config["replicaCount"] + else: + values[service_name]["replicaCount"] = config["replicaCount"] + + return values + +def get_output_filename(num_nodes, with_rerank, example_type, device, action_type): + """Generate output filename based on configuration.""" + rerank_suffix = "with-rerank-" if with_rerank else "" + action_suffix = "deploy-" if action_type == 0 else "update-" if action_type == 1 else "" + + return f"{example_type}-{num_nodes}-{device}-{action_suffix}{rerank_suffix}values.yaml" + +def configure_resources(values, deploy_config): + """Configure resources when tuning is enabled.""" + resource_configs = [] + + for service_name, config in deploy_config["services"].items(): + resources = {} + if deploy_config["device"] == "gaudi" and config.get("cards_per_instance", 0) > 1: + resources = { + "limits": {"habana.ai/gaudi": config["cards_per_instance"]}, + "requests": {"habana.ai/gaudi": config["cards_per_instance"]}, + } + else: + limits = {} + requests = {} + + # Only add CPU if cores_per_instance has a value + if config.get("cores_per_instance"): + limits["cpu"] = config["cores_per_instance"] + requests["cpu"] = config["cores_per_instance"] + + # Only add memory if memory_capacity has a value + if config.get("memory_capacity"): + limits["memory"] = config["memory_capacity"] + requests["memory"] = config["memory_capacity"] + + # Only create resources if we have any limits/requests + if limits and requests: + resources["limits"] = limits + resources["requests"] = requests + + if resources: + if service_name == "llm": + engine = config.get("engine", "tgi") + resource_configs.append({ + "name": engine, + "resources": resources, + }) + else: + resource_configs.append({ + "name": service_name, + "resources": resources, + }) + + for config in [r for r in resource_configs if r]: + service_name = config["name"] + if service_name == "backend": + values["resources"] = config["resources"] + elif service_name in values: + values[service_name]["resources"] = config["resources"] + + return values + +def configure_extra_cmd_args(values, deploy_config): + """Configure extra command line arguments for services.""" + for service_name, config in deploy_config["services"].items(): + extra_cmd_args = [] + + for param in ["max_batch_size", "max_input_length", "max_total_tokens", + "max_batch_total_tokens", "max_batch_prefill_tokens"]: + if config.get(param): + extra_cmd_args.extend([f"--{param.replace('_', '-')}", str(config[param])]) + + if extra_cmd_args: + if service_name == "llm": + engine = config.get("engine", "tgi") + if engine not in values: + values[engine] = {} + values[engine]["extraCmdArgs"] = extra_cmd_args + else: + if service_name not in values: + values[service_name] = {} + values[service_name]["extraCmdArgs"] = extra_cmd_args + + return values + +def configure_models(values, deploy_config): + """Configure model settings for services.""" + for service_name, config in deploy_config["services"].items(): + # Skip if no model_id defined or service is disabled + if not config.get("model_id") or config.get("enabled") is False: + continue + + if service_name == "llm": + # For LLM service, use its engine as the key + engine = config.get("engine", "tgi") + values[engine]["LLM_MODEL_ID"] = config.get("model_id") + elif service_name == "tei": + values[service_name]["EMBEDDING_MODEL_ID"] = config.get("model_id") + elif service_name == "teirerank": + values[service_name]["RERANK_MODEL_ID"] = config.get("model_id") + + return values + +def configure_rerank(values, with_rerank, deploy_config, example_type, node_selector): + """Configure rerank service""" + if with_rerank: + if "teirerank" not in values: + values["teirerank"] = {"nodeSelector": {key: value for key, value in node_selector.items()}} + elif "nodeSelector" not in values["teirerank"]: + values["teirerank"]["nodeSelector"] = {key: value for key, value in node_selector.items()} + else: + if example_type == "chatqna": + values["image"] = {"repository": "opea/chatqna-without-rerank"} + if "teirerank" not in values: + values["teirerank"] = {"enabled": False} + elif "enabled" not in values["teirerank"]: + values["teirerank"]["enabled"] = False + return values + +def generate_helm_values(example_type, deploy_config, chart_dir, action_type, node_selector=None): + """Create a values.yaml file based on the provided configuration.""" + if deploy_config is None: + raise ValueError("deploy_config is required") + + # Ensure the chart_dir exists + if not os.path.exists(chart_dir): + return { + "status": "false", + "message": f"Chart directory {chart_dir} does not exist" + } + + num_nodes = deploy_config.get("node", 1) + with_rerank = deploy_config["services"].get("teirerank", {}).get("enabled", False) + + print(f"Generating values for {example_type} example") + print(f"with_rerank: {with_rerank}") + print(f"num_nodes: {num_nodes}") + print(f"node_selector: {node_selector}") + + # Initialize base values + values = { + "global": { + "HUGGINGFACEHUB_API_TOKEN": deploy_config.get("HUGGINGFACEHUB_API_TOKEN", ""), + "modelUseHostPath": deploy_config.get("modelUseHostPath", ""), + } + } + + # Configure components + values = configure_node_selectors(values, node_selector or {}, deploy_config) + values = configure_rerank(values, with_rerank, deploy_config, example_type, node_selector or {}) + values = configure_replica(values, deploy_config) + values = configure_resources(values, deploy_config) + values = configure_extra_cmd_args(values, deploy_config) + values = configure_models(values, deploy_config) + + device = deploy_config.get("device", "unknown") + + # Generate and write YAML file + filename = get_output_filename(num_nodes, with_rerank, example_type, device, action_type) + yaml_string = yaml.dump(values, default_flow_style=False) + + filepath = os.path.join(chart_dir, filename) + + # Write the YAML data to the file + with open(filepath, "w") as file: + file.write(yaml_string) + + print(f"YAML file {filepath} has been generated.") + return {"status": "success", "filepath": filepath} + +################################################################################ +# # +# DEPLOYMENT SECTION # +# # +################################################################################ + +def run_kubectl_command(command): + """Run a kubectl command and return the output.""" + try: + result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + return result.stdout + except subprocess.CalledProcessError as e: + print(f"Error running command: {command}\n{e.stderr}") + exit(1) + + +def get_all_nodes(): + """Get the list of all nodes in the Kubernetes cluster.""" + command = ["kubectl", "get", "nodes", "-o", "json"] + output = run_kubectl_command(command) + nodes = json.loads(output) + return [node["metadata"]["name"] for node in nodes["items"]] + + +def add_label_to_node(node_name, label): + """Add a label to the specified node.""" + command = ["kubectl", "label", "node", node_name, label, "--overwrite"] + print(f"Labeling node {node_name} with {label}...") + run_kubectl_command(command) + print(f"Label {label} added to node {node_name} successfully.") + + +def add_labels_to_nodes(node_count=None, label=None, node_names=None): + """Add a label to the specified number of nodes or to specified nodes.""" + + if node_names: + # Add label to the specified nodes + for node_name in node_names: + add_label_to_node(node_name, label) + else: + # Fetch the node list and label the specified number of nodes + all_nodes = get_all_nodes() + if node_count is None or node_count > len(all_nodes): + print(f"Error: Node count exceeds the number of available nodes ({len(all_nodes)} available).") + sys.exit(1) + + selected_nodes = all_nodes[:node_count] + for node_name in selected_nodes: + add_label_to_node(node_name, label) + + +def clear_labels_from_nodes(label, node_names=None): + """Clear the specified label from specific nodes if provided, otherwise from all nodes.""" + label_key = label.split("=")[0] # Extract key from 'key=value' format + + # If specific nodes are provided, use them; otherwise, get all nodes + nodes_to_clear = node_names if node_names else get_all_nodes() + + for node_name in nodes_to_clear: + # Check if the node has the label by inspecting its metadata + command = ["kubectl", "get", "node", node_name, "-o", "json"] + node_info = run_kubectl_command(command) + node_metadata = json.loads(node_info) + + # Check if the label exists on this node + labels = node_metadata["metadata"].get("labels", {}) + if label_key in labels: + # Remove the label from the node + command = ["kubectl", "label", "node", node_name, f"{label_key}-"] + print(f"Removing label {label_key} from node {node_name}...") + run_kubectl_command(command) + print(f"Label {label_key} removed from node {node_name} successfully.") + else: + print(f"Label {label_key} not found on node {node_name}, skipping.") + + +def get_hw_values_file(deploy_config, chart_dir): + """Get the hardware-specific values file based on the deploy configuration.""" + device_type = deploy_config.get("device", "cpu") + print(f"Device type is {device_type}. Using existing Helm chart values files...") + if device_type == "cpu": + print(f"Device type is {device_type}. Using existing Helm chart values files.") + return None + + llm_engine = deploy_config.get("services", {}).get("llm", {}).get("engine", "tgi") + version = deploy_config.get("version", "1.1.0") + + if os.path.isdir(chart_dir): + # Determine which values file to use based on version + if version in ["1.0.0", "1.1.0"]: + hw_values_file = os.path.join(chart_dir, f"{device_type}-values.yaml") + else: + hw_values_file = os.path.join(chart_dir, f"{device_type}-{llm_engine}-values.yaml") + + if not os.path.exists(hw_values_file): + print(f"Warning: {hw_values_file} not found") + hw_values_file = None + else: + print(f"Device-specific values file found: {hw_values_file}") + else: + print(f"Error: Could not find directory for {chart_dir}") + hw_values_file = None + + return hw_values_file + + +def install_helm_release(release_name, chart_name, namespace, hw_values_file, deploy_values_file): + """Deploy a Helm release with a specified name and chart. + + Parameters: + - release_name: The name of the Helm release. + - chart_name: The Helm chart name or path. + - namespace: The Kubernetes namespace for deployment. + - hw_values_file: The values file for hw specific + - deploy_values_file: The values file for deployment. + """ + + # Check if the namespace exists; if not, create it + try: + command = ["kubectl", "get", "namespace", namespace] + subprocess.run(command, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + except subprocess.CalledProcessError: + print(f"Namespace '{namespace}' does not exist. Creating it...") + command = ["kubectl", "create", "namespace", namespace] + subprocess.run(command, check=True) + print(f"Namespace '{namespace}' created successfully.") + + try: + # Prepare the Helm install command + command = ["helm", "install", release_name, chart_name, "--namespace", namespace] + + # Append values files in order + if hw_values_file: + command.extend(["-f", hw_values_file]) + if deploy_values_file: + command.extend(["-f", deploy_values_file]) + + # Execute the Helm install command + print(f"Running command: {' '.join(command)}") + subprocess.run(command, check=True) + print("Deployment initiated successfully.") + + except subprocess.CalledProcessError as e: + print(f"Error occurred while deploying Helm release: {e}") + + +def uninstall_helm_release(release_name, namespace=None): + """Uninstall a Helm release and clean up resources, optionally delete the namespace if not 'default'.""" + # Default to 'default' namespace if none is specified + if not namespace: + namespace = "default" + + try: + # Uninstall the Helm release + command = ["helm", "uninstall", release_name, "--namespace", namespace] + print(f"Uninstalling Helm release {release_name} in namespace {namespace}...") + run_kubectl_command(command) + print(f"Helm release {release_name} uninstalled successfully.") + + # If the namespace is specified and not 'default', delete it + if namespace != "default": + print(f"Deleting namespace {namespace}...") + delete_namespace_command = ["kubectl", "delete", "namespace", namespace] + run_kubectl_command(delete_namespace_command) + print(f"Namespace {namespace} deleted successfully.") + else: + print("Namespace is 'default', skipping deletion.") + + except subprocess.CalledProcessError as e: + print(f"Error occurred while uninstalling Helm release or deleting namespace: {e}") + + +def update_service(release_name, chart_name, namespace, hw_values_file, deploy_values_file, update_values_file): + """Update the deployment using helm upgrade with new values. + + Args: + release_name: The helm release name + namespace: The kubernetes namespace + deploy_config: The deployment configuration + chart_name: The chart name for the deployment + """ + + # Construct helm upgrade command + command = [ + "helm", + "upgrade", + release_name, + chart_name, + "--namespace", + namespace, + "-f", + hw_values_file, + "-f", + deploy_values_file, + "-f", + update_values_file + ] + # Execute helm upgrade + print(f"Running command: {' '.join(command)}") + run_kubectl_command(command) + print(f"Deployment updated successfully") + + +def read_deploy_config(config_path): + """Read and parse the deploy config file. + + Args: + config_path: Path to the deploy config file + + Returns: + The parsed deploy config dictionary or None if failed + """ + try: + with open(config_path, 'r') as f: + return yaml.safe_load(f) + except Exception as e: + print(f"Failed to load deploy config: {str(e)}") + return None + + +def check_deployment_ready(release_name, namespace, timeout=300, interval=5, logfile="deployment.log"): + """Wait until all pods in the deployment are running and ready. + + Args: + namespace: The Kubernetes namespace + timeout: The maximum time to wait in seconds (default 120 seconds) + interval: The interval between checks in seconds (default 5 seconds) + logfile: The file to log output to (default 'deployment.log') + + Returns: + 0 if success, 1 if failure (timeout reached) + """ + try: + # Get the list of deployments in the namespace + cmd = ["kubectl", "-n", namespace, "get", "deployments", "-o", "jsonpath='{.items[*].metadata.name}'"] + deployments_output = subprocess.check_output(cmd, text=True) + deployments = deployments_output.strip().split() + + # Strip the first and last elements of single quotes if present + deployments[0] = deployments[0].strip("'") + deployments[-1] = deployments[-1].strip("'") + + with open(logfile, "a") as log: + log.write(f"Found deployments: {', '.join(deployments)}\n") + + timer = 0 + + # Loop through each deployment to check its readiness + for deployment_name in deployments: + + if '-' not in deployment_name or 'ui' in deployment_name or 'nginx' in deployment_name: + continue + + instance_name = deployment_name.split('-', 1)[0] + app_name = deployment_name.split('-', 1)[1] + + if instance_name != release_name: + continue + + cmd = [ + "kubectl", "-n", namespace, "get", "deployment", deployment_name, + "-o", "jsonpath={.spec.replicas}" + ] + desired_replicas = int(subprocess.check_output(cmd, text=True).strip()) + + with open(logfile, "a") as log: + log.write(f"Checking deployment '{deployment_name}' with desired replicas: {desired_replicas}\n") + + while True: + cmd = [ + "kubectl", "-n", namespace, "get", "pods", + "-l", f"app.kubernetes.io/instance={instance_name}", + "-l", f"app.kubernetes.io/name={app_name}", + "--field-selector=status.phase=Running", "-o", "json" + ] + + pods_output = subprocess.check_output(cmd, text=True) + pods = json.loads(pods_output) + + ready_pods = sum( + 1 for pod in pods["items"] if + all(container.get('ready') for container in pod.get('status', {}).get('containerStatuses', [])) + ) + + terminating_pods = sum(1 for pod in pods["items"] if pod.get("metadata", {}).get("deletionTimestamp") is not None) + + with open(logfile, "a") as log: + log.write(f"Ready pods: {ready_pods}, Desired replicas: {desired_replicas}, Terminating pods: {terminating_pods}\n") + + if ready_pods == desired_replicas and terminating_pods == 0: + with open(logfile, "a") as log: + log.write(f"All pods for deployment '{deployment_name}' are running and ready.\n") + break + + if timer >= timeout: + with open(logfile, "a") as log: + log.write(f"Timeout reached for deployment '{deployment_name}'. Not all pods are running and ready.\n") + return 1 # Failure + + time.sleep(interval) + timer += interval + + return 0 # Success for all deployments + + except subprocess.CalledProcessError as e: + with open(logfile, "a") as log: + log.write(f"Error executing kubectl command: {e}\n") + return 1 # Failure + except json.JSONDecodeError as e: + with open(logfile, "a") as log: + log.write(f"Error parsing kubectl output: {e}\n") + return 1 # Failure + except Exception as e: + with open(logfile, "a") as log: + log.write(f"Unexpected error: {e}\n") + return 1 # Failure + + +def main(): + parser = argparse.ArgumentParser(description="Manage Helm Deployment.") + parser.add_argument( + "--chart-name", + type=str, + default="chatqna", + help="The chart name to deploy (default: chatqna).", + ) + parser.add_argument("--namespace", default="default", help="Kubernetes namespace (default: default).") + parser.add_argument("--user-values", help="Path to a user-specified values.yaml file.") + parser.add_argument("--deploy-config", help="Path to a deploy config yaml file.") + parser.add_argument( + "--create-values-only", action="store_true", help="Only create the values.yaml file without deploying." + ) + parser.add_argument("--uninstall", action="store_true", help="Uninstall the Helm release.") + parser.add_argument("--num-nodes", type=int, default=1, help="Number of nodes to use (default: 1).") + parser.add_argument("--node-names", nargs="*", help="Optional specific node names to label.") + parser.add_argument("--add-label", action="store_true", help="Add label to specified nodes if this flag is set.") + parser.add_argument( + "--delete-label", action="store_true", help="Delete label from specified nodes if this flag is set." + ) + parser.add_argument( + "--label", default="node-type=opea-benchmark", help="Label to add/delete (default: node-type=opea-benchmark)." + ) + parser.add_argument("--update-service", action="store_true", help="Update the deployment with new configuration.") + parser.add_argument("--check-ready", action="store_true", help="Check if all services in the deployment are ready.") + parser.add_argument("--chart-dir", default=".", help="Path to the untarred Helm chart directory.") + + args = parser.parse_args() + + # Node labeling management + if args.add_label: + add_labels_to_nodes(args.num_nodes, args.label, args.node_names) + return + elif args.delete_label: + clear_labels_from_nodes(args.label, args.node_names) + return + elif args.check_ready: + is_ready = check_deployment_ready(args.chart_name, args.namespace) + return is_ready + elif args.uninstall: + uninstall_helm_release(args.chart_name, args.namespace) + return + + # Load deploy_config if provided + deploy_config = None + if args.deploy_config: + deploy_config = read_deploy_config(args.deploy_config) + if deploy_config is None: + parser.error("Failed to load deploy config") + return + + hw_values_file = get_hw_values_file(deploy_config, args.chart_dir) + + action_type = 0 + if args.update_service: + action_type = 1 + + # The user file is provided for deploy when --update-service is not specified + if args.user_values and not args.update_service: + values_file_path = args.user_values + else: + if not args.deploy_config: + parser.error("--deploy-config is required") + + node_selector = {args.label.split("=")[0]: args.label.split("=")[1]} + + print("go to generate deploy values" if action_type == 0 else "go to generate update values") + + # Generate values file for deploy or update service + result = generate_helm_values( + example_type=args.chart_name, + deploy_config=deploy_config, + chart_dir=args.chart_dir, + action_type=action_type, # 0 - deploy, 1 - update + node_selector=node_selector, + ) + + # Check result status + if result["status"] == "success": + values_file_path = result["filepath"] + else: + parser.error(f"Failed to generate values.yaml: {result['message']}") + return + + print("start to read the generated values file") + # Read back the generated YAML file for verification + with open(values_file_path, "r") as file: + print("Generated YAML contents:") + print(file.read()) + + # Handle service update if specified + if args.update_service: + if not args.user_values: + parser.error("--user-values is required for update reference") + + try: + update_service( + args.chart_name, + args.chart_name, + args.namespace, + hw_values_file, + args.user_values, + values_file_path + ) + return + except Exception as e: + parser.error(f"Failed to update deployment: {str(e)}") + return + + # Deploy unless --create-values-only is specified + if not args.create_values_only: + install_helm_release( + args.chart_name, + args.chart_name, + args.namespace, + hw_values_file, + values_file_path + ) + print(f"values_file_path: {values_file_path}") + + +if __name__ == "__main__": + main() diff --git a/deploy_and_benchmark.py b/deploy_and_benchmark.py new file mode 100644 index 000000000..2d9814681 --- /dev/null +++ b/deploy_and_benchmark.py @@ -0,0 +1,288 @@ +import yaml +import subprocess +import sys +import os +import copy +import argparse +import shutil +import re + +def read_yaml(file_path): + try: + with open(file_path, 'r') as file: + return yaml.safe_load(file) + except Exception as e: + print(f"Error reading YAML file: {e}") + return None + +def construct_deploy_config(deploy_config, target_node, max_batch_size=None): + """ + Construct a new deploy config based on the target node number and optional max_batch_size. + + Args: + deploy_config: Original deploy config dictionary + target_node: Target node number to match in the node array + max_batch_size: Optional specific max_batch_size value to use + + Returns: + A new deploy config with single values for node and instance_num + """ + # Deep copy the original config to avoid modifying it + new_config = copy.deepcopy(deploy_config) + + # Get the node array and validate + nodes = deploy_config.get('node') + if not isinstance(nodes, list): + raise ValueError("deploy_config['node'] must be an array") + + # Find the index of the target node + try: + node_index = nodes.index(target_node) + except ValueError: + raise ValueError(f"Target node {target_node} not found in node array {nodes}") + + # Set the single node value + new_config['node'] = target_node + + # Update instance_num for each service based on the same index + for service_name, service_config in new_config.get('services', {}).items(): + if 'replicaCount' in service_config: + instance_nums = service_config['replicaCount'] + if isinstance(instance_nums, list): + if len(instance_nums) != len(nodes): + raise ValueError( + f"instance_num array length ({len(instance_nums)}) for service {service_name} " + f"doesn't match node array length ({len(nodes)})" + ) + service_config['replicaCount'] = instance_nums[node_index] + + # Update max_batch_size if specified + if max_batch_size is not None and 'llm' in new_config['services']: + new_config['services']['llm']['max_batch_size'] = max_batch_size + + return new_config + +def pull_helm_chart(chart_pull_url, version, chart_name): + # Pull and untar the chart + subprocess.run(["helm", "pull", chart_pull_url, "--version", version, "--untar"], check=True) + + current_dir = os.getcwd() + untar_dir = os.path.join(current_dir, chart_name) + + if not os.path.isdir(untar_dir): + print(f"Error: Could not find untarred directory for {chart_name}") + return None + + return untar_dir + +def main(yaml_file, target_node=None): + """ + Main function to process deployment configuration. + + Args: + yaml_file: Path to the YAML configuration file + target_node: Optional target number of nodes to deploy. If not specified, will process all nodes. + """ + config = read_yaml(yaml_file) + if config is None: + print("Failed to read YAML file.") + return None + + deploy_config = config['deploy'] + benchmark_config = config['benchmark'] + + # Extract chart name from the YAML file name + chart_name = os.path.splitext(os.path.basename(yaml_file))[0] + python_cmd = sys.executable + + # Process nodes + nodes = deploy_config.get('node', []) + if not isinstance(nodes, list): + print("Error: deploy_config['node'] must be an array") + return None + + nodes_to_process = [target_node] if target_node is not None else nodes + node_names = deploy_config.get('node_name', []) + namespace = deploy_config.get('namespace', "default") + + # Pull the Helm chart + chart_pull_url = f"oci://ghcr.io/opea-project/charts/{chart_name}" + version = deploy_config.get("version", "1.1.0") + chart_dir = pull_helm_chart(chart_pull_url, version, chart_name) + if not chart_dir: + return + + for node in nodes_to_process: + try: + print(f"\nProcessing configuration for {node} nodes...") + + # Get corresponding node names for this node count + current_node_names = node_names[:node] if node_names else [] + + # Add labels for current node configuration + print(f"Adding labels for {node} nodes...") + cmd = [ + python_cmd, + 'deploy.py', + '--chart-name', + chart_name, + '--num-nodes', + str(node), + '--add-label' + ] + if current_node_names: + cmd.extend(['--node-names'] + current_node_names) + + result = subprocess.run(cmd, check=True) + if result.returncode != 0: + print(f"Failed to add labels for {node} nodes") + continue + + try: + # Process max_batch_sizes + max_batch_sizes = deploy_config.get('services', {}).get('llm', {}).get('max_batch_size', []) + if not isinstance(max_batch_sizes, list): + max_batch_sizes = [max_batch_sizes] + + values_file_path= None + for i, max_batch_size in enumerate(max_batch_sizes): + print(f"\nProcessing max_batch_size: {max_batch_size}") + + # Construct new deploy config + new_deploy_config = construct_deploy_config(deploy_config, node, max_batch_size) + + # Write the new deploy config to a temporary file + temp_config_file = f"temp_deploy_config_{node}_{max_batch_size}.yaml" + try: + with open(temp_config_file, 'w') as f: + yaml.dump(new_deploy_config, f) + + if i == 0: + # First iteration: full deployment + cmd = [ + python_cmd, + 'deploy.py', + '--deploy-config', + temp_config_file, + '--chart-name', + chart_name, + '--namespace', + namespace, + '--chart-dir', + chart_dir + ] + result = subprocess.run(cmd, check=True, capture_output=True, text=True) + + match = re.search(r"values_file_path: (\S+)", result.stdout) + if match: + values_file_path = match.group(1) + print(f"Captured values_file_path: {values_file_path}") + else: + print("values_file_path not found in the output") + + else: + # Subsequent iterations: update services with config change + cmd = [ + python_cmd, + 'deploy.py', + '--deploy-config', + temp_config_file, + '--chart-name', + chart_name, + '--namespace', + namespace, + '--chart-dir', + chart_dir, + '--user-values', + values_file_path, + '--update-service' + ] + result = subprocess.run(cmd, check=True) + if result.returncode != 0: + print(f"Update failed for {node} nodes configuration with max_batch_size {max_batch_size}") + break # Skip remaining max_batch_sizes for this node + + # Wait for deployment to be ready + print("\nWaiting for deployment to be ready...") + cmd = [ + python_cmd, + 'deploy.py', + '--chart-name', + chart_name, + '--namespace', + namespace, + '--check-ready' + ] + try: + result = subprocess.run(cmd, check=True) + print(f"Deployments are ready!") + except subprocess.CalledProcessError as e: + print(f"Depoyments status failed with returncode: {e.returncode}") + + # TODO: Here is to call benchmark + + except Exception as e: + print(f"Error during {'deployment' if i == 0 else 'update'} for {node} nodes with max_batch_size {max_batch_size}: {str(e)}") + break # Skip remaining max_batch_sizes for this node + finally: + # Clean up the temporary file + if os.path.exists(temp_config_file): + os.remove(temp_config_file) + + finally: + # Uninstall the deployment + print(f"\nUninstalling deployment for {node} nodes...") + cmd = [ + python_cmd, + 'deploy.py', + '--chart-name', + chart_name, + '--namespace', + namespace, + '--uninstall', + ] + try: + result = subprocess.run(cmd, check=True) + if result.returncode != 0: + print(f"Failed to uninstall deployment for {node} nodes") + except Exception as e: + print(f"Error while uninstalling deployment for {node} nodes: {str(e)}") + + # Delete labels for current node configuration + print(f"Deleting labels for {node} nodes...") + cmd = [ + python_cmd, + 'deploy.py', + '--chart-name', + chart_name, + '--num-nodes', + str(node), + '--delete-label' + ] + if current_node_names: + cmd.extend(['--node-names'] + current_node_names) + + try: + result = subprocess.run(cmd, check=True) + if result.returncode != 0: + print(f"Failed to delete labels for {node} nodes") + except Exception as e: + print(f"Error while deleting labels for {node} nodes: {str(e)}") + + except Exception as e: + print(f"Error processing configuration for {node} nodes: {str(e)}") + continue + + # Cleanup: Remove the untarred directory + if chart_dir and os.path.isdir(chart_dir): + print(f"Removing temporary directory: {chart_dir}") + shutil.rmtree(chart_dir) + print("Temporary directory removed successfully.") + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Deploy and benchmark with specific node configuration.") + parser.add_argument("yaml_file", help="Path to the YAML configuration file") + parser.add_argument("--target-node", type=int, help="Optional: Target number of nodes to deploy.", default=None) + + args = parser.parse_args() + main(args.yaml_file, args.target_node) From 3f4847d28355a5700ebcd31453dbf187b2ed622a Mon Sep 17 00:00:00 2001 From: letonghan Date: Fri, 24 Jan 2025 16:15:54 +0800 Subject: [PATCH 2/7] Support benchmark for GenAIExamples. Add benchmark.py and requirements.txt to run scripts. Usage of benchmark.py: 1. Used alone: python benchmark.py 2. Used as a python function: from benchmark import run_benchmark run_benchmark(benchmark_config, chart_name, namespace) Signed-off-by: letonghan --- ChatQnA/chatqna.yaml | 21 ++- benchmark.py | 355 +++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 9 ++ 3 files changed, 384 insertions(+), 1 deletion(-) create mode 100644 benchmark.py create mode 100644 requirements.txt diff --git a/ChatQnA/chatqna.yaml b/ChatQnA/chatqna.yaml index 5d6e3d21b..aea15917c 100644 --- a/ChatQnA/chatqna.yaml +++ b/ChatQnA/chatqna.yaml @@ -57,5 +57,24 @@ deploy: replicaCount: [1, 1, 1, 1] benchmark: + # http request behavior related fields + concurrency: [1, 2, 4] + totoal_query_num: [2048, 4096] + duration: [5, 10] # unit minutes + query_num_per_concurrency: [4, 8, 16] + possion: True + possion_arrival_rate: 1.0 + warmup_iterations: 10 + seed: 1024 + + # workload, all of the test cases will run for benchmark + test_cases: + - chatqnafixed + - chatqna_qlist_pubmed: + dataset: pub_med10 # pub_med10, pub_med100, pub_med1000 + user_queries: [1, 2, 4] + query_token_size: 128 # if specified, means fixed query token size will be sent out + llm: - max_token_size: 1024 # specify the output token size + # specify the llm output token size + max_token_size: [128, 256] \ No newline at end of file diff --git a/benchmark.py b/benchmark.py new file mode 100644 index 000000000..5f41a5f15 --- /dev/null +++ b/benchmark.py @@ -0,0 +1,355 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import os +import sys +import yaml +from datetime import datetime +from kubernetes import client, config + +from evals.benchmark.stresscli.commands.load_test import locust_runtests + + +# only support chatqna for now +service_endpoints = { + "chatqna": "/v1/chatqna", +} + + +def load_yaml(file_path): + with open(file_path, "r") as f: + data = yaml.safe_load(f) + return data + + +def construct_benchmark_config(test_suite_config): + """Extract relevant data from the YAML based on the specified test cases.""" + + return { + "concurrency": test_suite_config.get("concurrency", []), + "totoal_query_num": test_suite_config.get("user_queries", []), + "duration:": test_suite_config.get("duration:", []), + "query_num_per_concurrency": test_suite_config.get("query_num_per_concurrency", []), + "possion": test_suite_config.get("possion", False), + "possion_arrival_rate": test_suite_config.get("possion_arrival_rate", 1.0), + "warmup_iterations": test_suite_config.get("warmup_iterations", 10), + "seed": test_suite_config.get("seed", None), + "test_cases": test_suite_config.get("test_cases", ["chatqnafixed"]), + "user_queries": test_suite_config.get("user_queries", [1]), + "query_token_size": test_suite_config.get("query_token_size", 128), + "llm_max_token_size": test_suite_config.get("llm", {}).get("max_token_size", [128]), + } + + +def _get_cluster_ip(service_name, namespace="default"): + """Get the Cluster IP of a service in a Kubernetes cluster.""" + # Load the Kubernetes configuration + config.load_kube_config() # or use config.load_incluster_config() if running inside a Kubernetes pod + + # Create an API client for the core API (which handles services) + v1 = client.CoreV1Api() + + try: + # Get the service object + service = v1.read_namespaced_service(name=service_name, namespace=namespace) + + # Extract the Cluster IP + cluster_ip = service.spec.cluster_ip + + # Extract the port number (assuming the first port, modify if necessary) + if service.spec.ports: + port_number = service.spec.ports[0].port # Get the first port number + else: + port_number = None + + return cluster_ip, port_number + except client.exceptions.ApiException as e: + print(f"Error fetching service: {e}") + return None + + +def _get_service_ip(service_name, deployment_type="k8s", service_ip=None, service_port=None, namespace="default"): + """Get the service IP and port based on the deployment type. + + Args: + service_name (str): The name of the service. + deployment_type (str): The type of deployment ("k8s" or "docker"). + service_ip (str): The IP address of the service (required for Docker deployment). + service_port (int): The port of the service (required for Docker deployment). + namespace (str): The namespace of the service (default is "default"). + + Returns: + (str, int): The service IP and port. + """ + if deployment_type == "k8s": + # Kubernetes IP and port retrieval logic + svc_ip, port = _get_cluster_ip(service_name, namespace) + elif deployment_type == "docker": + # For Docker deployment, service_ip and service_port must be specified + if not service_ip or not service_port: + raise ValueError( + "For Docker deployment, service_ip and service_port must be provided in the configuration." + ) + svc_ip = service_ip + port = service_port + else: + raise ValueError("Unsupported deployment type. Use 'k8s' or 'docker'.") + + return svc_ip, port + + +def _create_yaml_content(service, base_url, bench_target, test_phase, num_queries, test_params): + """Create content for the run.yaml file.""" + + # If a load shape includes the parameter concurrent_level, + # the parameter will be passed to Locust to launch fixed + # number of simulated users. + concurrency = 1 + if num_queries >= 0: + concurrency = max(1, num_queries // test_params["concurrent_level"]) + else: + concurrency = test_params["concurrent_level"] + + import importlib.util + package_name = "opea-eval" + spec = importlib.util.find_spec(package_name) + print(spec) + + # get folder path of opea-eval + eval_path = None + import pkg_resources + for dist in pkg_resources.working_set: + if 'opea-eval' in dist.project_name: + eval_path = dist.location + if not eval_path: + print(f"Fail to load opea-eval package. Please install it first.") + exit(1) + + yaml_content = { + "profile": { + "storage": {"hostpath": test_params["test_output_dir"]}, + "global-settings": { + "tool": "locust", + "locustfile": os.path.join(eval_path, "evals/benchmark/stresscli/locust/aistress.py"), + "host": base_url, + "stop-timeout": test_params["query_timeout"], + "processes": 2, + "namespace": test_params["namespace"], + "bench-target": bench_target, + "service-metric-collect": test_params["collect_service_metric"], + "service-list": service.get("service_list", []), + "dataset": service.get("dataset", "default"), + "prompts": service.get("prompts", None), + "max-output": service.get("max_output", 128), + "seed": test_params.get("seed", None), + "llm-model": test_params["llm_model"], + "deployment-type": test_params["deployment_type"], + "load-shape": test_params["load_shape"], + }, + "runs": [{"name": test_phase, "users": concurrency, "max-request": num_queries}], + } + } + + # For the following scenarios, test will stop after the specified run-time + if test_params["run_time"] is not None and test_phase != "warmup": + yaml_content["profile"]["global-settings"]["run-time"] = test_params["run_time"] + + return yaml_content + + +def _create_stresscli_confs( + case_params, test_params, test_phase, num_queries, base_url, ts +) -> str: + """Create a stresscli configuration file and persist it on disk.""" + stresscli_confs = [] + # Get the workload + test_cases = test_params["test_cases"] + for test_case in test_cases: + stresscli_conf = {} + print(test_case) + if isinstance(test_case, str): + bench_target = test_case + elif isinstance(test_case, dict): + bench_target = list(test_case.keys())[0] + dataset_conf = test_case[bench_target] + if bench_target == "chatqna_qlist_pubmed": + max_lines = dataset_conf['dataset'].split("pub_med")[-1] + stresscli_conf['envs'] = {'DATASET': f"pubmed_{max_lines}.txt", + 'MAX_LINES': max_lines} + # Generate the content of stresscli configuration file + stresscli_yaml = _create_yaml_content(case_params, base_url, bench_target, test_phase, num_queries, test_params) + + # Dump the stresscli configuration file + service_name = case_params.get("service_name") + run_yaml_path = os.path.join( + test_params["test_output_dir"], f"run_{service_name}_{ts}_{test_phase}_{num_queries}_{bench_target}.yaml" + ) + with open(run_yaml_path, "w") as yaml_file: + yaml.dump(stresscli_yaml, yaml_file) + stresscli_conf['run_yaml_path'] = run_yaml_path + stresscli_confs.append(stresscli_conf) + return stresscli_confs + + +def create_stresscli_confs(service, base_url, test_suite_config, index): + """Create and save the run.yaml file for the service being tested.""" + os.makedirs(test_suite_config["test_output_dir"], exist_ok=True) + + stresscli_confs = [] + + # Add YAML configuration of stresscli for warm-ups + warm_ups = test_suite_config["warm_ups"] + if warm_ups is not None and warm_ups > 0: + stresscli_confs.extend( + _create_stresscli_confs( + service, test_suite_config, "warmup", warm_ups, base_url, index + ) + ) + + # Add YAML configuration of stresscli for benchmark + user_queries_lst = test_suite_config["user_queries"] + if user_queries_lst is None or len(user_queries_lst) == 0: + # Test stop is controlled by run time + stresscli_confs.extend( + _create_stresscli_confs( + service, test_suite_config, "benchmark", -1, base_url, index + ) + ) + else: + # Test stop is controlled by request count + for user_queries in user_queries_lst: + stresscli_confs.extend( + _create_stresscli_confs( + service, test_suite_config, "benchmark", user_queries, base_url, index + ) + ) + + return stresscli_confs + + +def _run_service_test(example, service, test_suite_config): + """Run the test for a specific service and example.""" + print(f"[OPEA BENCHMARK] 🚀 Example: [ {example} ] Service: [ {service.get('service_name')} ], Running test...") + + # Get the service name + service_name = service.get("service_name") + + # Get the deployment type from the test suite configuration + deployment_type = test_suite_config.get("deployment_type", "k8s") + + # Get the service IP and port based on deployment type + svc_ip, port = _get_service_ip( + service_name, + deployment_type, + test_suite_config.get("service_ip"), + test_suite_config.get("service_port"), + test_suite_config.get("namespace") + ) + + base_url = f"http://{svc_ip}:{port}" + endpoint = service_endpoints[example] + url = f"{base_url}{endpoint}" + print(f"[OPEA BENCHMARK] 🚀 Running test for {service_name} at {url}") + + # Generate a unique index based on the current time + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + + # Create the run.yaml for the service + stresscli_confs = create_stresscli_confs( + service, base_url, test_suite_config, timestamp + ) + + # Do benchmark in for-loop for different user queries + output_folders = [] + for index, stresscli_conf in enumerate(stresscli_confs, start=1): + run_yaml_path = stresscli_conf['run_yaml_path'] + print(f"[OPEA BENCHMARK] 🚀 The {index} time test is running, run yaml: {run_yaml_path}...") + os.environ['MAX_TOKENS'] = str(service.get("max_output")) + if stresscli_conf.get('envs') is not None: + for key, value in stresscli_conf.get('envs').items(): + os.environ[key] = value + + output_folders.append(locust_runtests(None, run_yaml_path)) + + print(f"[OPEA BENCHMARK] 🚀 Test completed for {service_name} at {url}") + return output_folders + + +def run_benchmark(benchmark_config, chart_name, namespace, llm_model="Qwen/Qwen2.5-Coder-7B-Instruct", report=False): + # Extract data + parsed_data = construct_benchmark_config(benchmark_config) + test_suite_config = { + "user_queries": parsed_data['user_queries'], # num of user queries + "random_prompt": False, # whether to use random prompt, set to False by default + "run_time": "60m", # The max total run time for the test suite, set to 60m by default + "collect_service_metric": False, # whether to collect service metrics, set to False by default + "llm_model": llm_model, # The LLM model used for the test + "deployment_type": "k8s", # Default is "k8s", can also be "docker" + "service_ip": None, # Leave as None for k8s, specify for Docker + "service_port": None, # Leave as None for k8s, specify for Docker + "test_output_dir": os.getcwd() + "/benchmark_output", # The directory to store the test output + "load_shape": {"name": "constant", + "params": {"constant": {"concurrent_level": 4}, "poisson": {"arrival_rate": 1.0}}}, + "concurrent_level": 4, + "arrival_rate": 1.0, + "query_timeout": 120, + "warm_ups": parsed_data['warmup_iterations'], + "seed": parsed_data['seed'], + "namespace": namespace, + "test_cases": parsed_data["test_cases"], + "llm_max_token_size": parsed_data['llm_max_token_size'] + } + + dataset = None + query_data = None + + # Do benchmark in for-loop for different llm_max_token_size + for llm_max_token in parsed_data['llm_max_token_size']: + print(f"[OPEA BENCHMARK] 🚀 Run benchmark on {dataset} with llm max-output-token {llm_max_token}.") + case_data = {} + # Support chatqna only for now + if chart_name == "chatqna": + case_data = { + "run_test": True, + "service_name": "chatqna", + "service_list": [ + "chatqna", + "chatqna-chatqna-ui", + "chatqna-data-prep", + "chatqna-nginx", + "chatqna-redis-vector-db", + "chatqna-retriever-usvc", + "chatqna-tei", + "chatqna-teirerank", + "chatqna-tgi" + ], + "test_cases": parsed_data["test_cases"], + # Activate if random_prompt=true: leave blank = default dataset(WebQuestions) or sharegpt + "prompts": query_data, + "max_output": llm_max_token, # max number of output tokens + "k": 1 # number of retrieved documents + } + output_folder = _run_service_test(chart_name, case_data, test_suite_config) + + print(f"[OPEA BENCHMARK] 🚀 Test Finished. Output saved in {output_folder}.") + + if report: + print(output_folder) + all_results = dict() + for folder in output_folder: + from evals.benchmark.stresscli.commands.report import get_report_results + + results = get_report_results(folder) + all_results[folder] = results + print(f"results = {results}\n") + + return all_results + + +if __name__ == "__main__": + benchmark_config = load_yaml("./benchmark.yaml") + run_benchmark( + benchmark_config=benchmark_config, + chart_name='chatqna', + namespace='deploy-benchmark' + ) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 000000000..ec9b823f5 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,9 @@ +kubernetes +locust +numpy +opea-eval +pytest +pyyaml +requests +sseclient-py +transformers From 3229e2a570ba2b9dce7b31bb6a04c8908cbdb1d3 Mon Sep 17 00:00:00 2001 From: Cathy Zhang Date: Fri, 24 Jan 2025 12:01:58 +0000 Subject: [PATCH 3/7] Rename chatqna.yaml as benchmark_chatqna.yaml Signed-off-by: Cathy Zhang --- ChatQnA/{chatqna.yaml => benchmark_chatqna.yaml} | 2 +- deploy_and_benchmark.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) rename ChatQnA/{chatqna.yaml => benchmark_chatqna.yaml} (97%) diff --git a/ChatQnA/chatqna.yaml b/ChatQnA/benchmark_chatqna.yaml similarity index 97% rename from ChatQnA/chatqna.yaml rename to ChatQnA/benchmark_chatqna.yaml index aea15917c..5d4ab6758 100644 --- a/ChatQnA/chatqna.yaml +++ b/ChatQnA/benchmark_chatqna.yaml @@ -77,4 +77,4 @@ benchmark: llm: # specify the llm output token size - max_token_size: [128, 256] \ No newline at end of file + max_token_size: [128, 256] diff --git a/deploy_and_benchmark.py b/deploy_and_benchmark.py index 2d9814681..58871669b 100644 --- a/deploy_and_benchmark.py +++ b/deploy_and_benchmark.py @@ -92,7 +92,8 @@ def main(yaml_file, target_node=None): benchmark_config = config['benchmark'] # Extract chart name from the YAML file name - chart_name = os.path.splitext(os.path.basename(yaml_file))[0] + chart_name = os.path.splitext(os.path.basename(yaml_file))[0].split('_')[-1] + print(f"chart_name: {chart_name}") python_cmd = sys.executable # Process nodes From 61c0c31c862bbcb8ba09dbbcf5ff2c083a68fa60 Mon Sep 17 00:00:00 2001 From: Cathy Zhang Date: Fri, 24 Jan 2025 12:05:23 +0000 Subject: [PATCH 4/7] Some fixes to benchmark.py Signed-off-by: Cathy Zhang --- benchmark.py | 6 +++++- requirements.txt | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/benchmark.py b/benchmark.py index 5f41a5f15..0a8360db6 100644 --- a/benchmark.py +++ b/benchmark.py @@ -275,7 +275,11 @@ def _run_service_test(example, service, test_suite_config): return output_folders -def run_benchmark(benchmark_config, chart_name, namespace, llm_model="Qwen/Qwen2.5-Coder-7B-Instruct", report=False): +def run_benchmark(benchmark_config, chart_name, namespace, llm_model=None, report=False): + # If llm_model is None or an empty string, set to default value + if not llm_model: + llm_model = "Qwen/Qwen2.5-Coder-7B-Instruct" + # Extract data parsed_data = construct_benchmark_config(benchmark_config) test_suite_config = { diff --git a/requirements.txt b/requirements.txt index ec9b823f5..44f6445aa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ kubernetes locust numpy -opea-eval +opea-eval>=1.2 pytest pyyaml requests From 5a5271ef9b4827aab5e47a7f5f692bbe9d737218 Mon Sep 17 00:00:00 2001 From: Cathy Zhang Date: Fri, 24 Jan 2025 11:01:48 +0000 Subject: [PATCH 5/7] Invoke benchmarking in the deploy_and_benchmark process Signed-off-by: Cathy Zhang --- deploy_and_benchmark.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/deploy_and_benchmark.py b/deploy_and_benchmark.py index 58871669b..a26ed0b68 100644 --- a/deploy_and_benchmark.py +++ b/deploy_and_benchmark.py @@ -7,6 +7,8 @@ import shutil import re +from benchmark import run_benchmark + def read_yaml(file_path): try: with open(file_path, 'r') as file: @@ -220,7 +222,14 @@ def main(yaml_file, target_node=None): except subprocess.CalledProcessError as e: print(f"Depoyments status failed with returncode: {e.returncode}") - # TODO: Here is to call benchmark + # Run benchmark + run_benchmark( + benchmark_config=benchmark_config, + chart_name=chart_name, + namespace=namespace, + llm_model=deploy_config.get('services', {}).get('llm', {}).get('model_id', "") + ) + except Exception as e: print(f"Error during {'deployment' if i == 0 else 'update'} for {node} nodes with max_batch_size {max_batch_size}: {str(e)}") From 133205dc54dac8c462bc5681202366a408d100f7 Mon Sep 17 00:00:00 2001 From: Cathy Zhang Date: Fri, 24 Jan 2025 05:27:57 -0800 Subject: [PATCH 6/7] Add REAMD for deploy_and_benchmark process Signed-off-by: Cathy Zhang --- README-deploy-benchmark.md | 66 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 README-deploy-benchmark.md diff --git a/README-deploy-benchmark.md b/README-deploy-benchmark.md new file mode 100644 index 000000000..a3d8d57d5 --- /dev/null +++ b/README-deploy-benchmark.md @@ -0,0 +1,66 @@ +# ChatQnA Benchmarking + +## Purpose + +We aim to run these benchmarks and share them with the OPEA community for three primary reasons: + +- To offer insights on inference throughput in real-world scenarios, helping you choose the best service or deployment for your needs. +- To establish a baseline for validating optimization solutions across different implementations, providing clear guidance on which methods are most effective for your use case. +- To inspire the community to build upon our benchmarks, allowing us to better quantify new solutions in conjunction with current leading LLMs, serving frameworks etc. + +## Table of Contents + +- [Prerequisites](#prerequisites) +- [Overview](#overview) + - [Using deploy_and_benchmark.py](#using-deploy_and_benchmark.py-recommended) +- [Data Preparation](#data-preparation) +- [Configuration](#configuration) + +## Prerequisites + +Before running the benchmarks, ensure you have: + +1. **Kubernetes Environment** + - Kubernetes installation: Use [kubespray](https://github.com/opea-project/docs/blob/main/guide/installation/k8s_install/k8s_install_kubespray.md) or other official Kubernetes installation guides + - (Optional) [Kubernetes set up guide on Intel Gaudi product](https://github.com/opea-project/GenAIInfra/blob/main/README.md#setup-kubernetes-cluster) + +2. **Configuration YAML** + The configuration file (e.g., `./ChatQnA/benchmark_chatqna.yaml`) consists of two main sections: deployment and benchmarking. Required fields must be filled with valid values (like the Hugging Face token). For all other fields, you can either customize them according to your needs or leave them empty ("") to use the default values from the [helm charts](https://github.com/opea-project/GenAIInfra/tree/main/helm-charts). + +## Data Preparation + +Before running benchmarks, you need to: + +1. **Prepare Test Data** + - Download the retrieval file: + ```bash + wget https://github.com/opea-project/GenAIEval/tree/main/evals/benchmark/data/upload_file.txt + ``` + - For the `chatqna_qlist_pubmed` test case, prepare `pubmed_${max_lines}.txt` by following this [README](https://github.com/opea-project/GenAIEval/blob/main/evals/benchmark/stresscli/README_Pubmed_qlist.md) + +2. **Prepare Model Files (Recommended)** + ```bash + pip install -U "huggingface_hub[cli]" + sudo mkdir -p /mnt/models + sudo chmod 777 /mnt/models + huggingface-cli download --cache-dir /mnt/models Intel/neural-chat-7b-v3-3 + ``` + +## Overview + +The benchmarking process consists of two main components: deployment and benchmarking. We provide `deploy_and_benchmark.py` as a unified entry point that combines both steps. + +### Using deploy_and_benchmark.py (Recommended) + +The script `deploy_and_benchmark.py` serves as the main entry point. Here's an example using ChatQnA configuration (you can replace it with any other example's configuration YAML file): + +1. For a specific number of nodes: + ```bash + python deploy_and_benchmark.py ./ChatQnA/benchmark_chatqna.yaml --target-node 1 + ``` + +2. For all node configurations: + ```bash + python deploy_and_benchmark.py ./ChatQnA/benchmark_chatqna.yaml + ``` + This will iterate through the node list in your configuration YAML file, performing deployment and benchmarking for each node count. From e91fb710685333369982be753fa378d42ad26a98 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 24 Jan 2025 13:51:58 +0000 Subject: [PATCH 7/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- ChatQnA/benchmark_chatqna.yaml | 9 +- README-deploy-benchmark.md | 3 + benchmark.py | 78 +++++++--------- deploy.py | 122 ++++++++++++++----------- deploy_and_benchmark.py | 160 ++++++++++++++++----------------- 5 files changed, 186 insertions(+), 186 deletions(-) diff --git a/ChatQnA/benchmark_chatqna.yaml b/ChatQnA/benchmark_chatqna.yaml index 5d4ab6758..c608b8afb 100644 --- a/ChatQnA/benchmark_chatqna.yaml +++ b/ChatQnA/benchmark_chatqna.yaml @@ -1,3 +1,6 @@ +# Copyright (C) 2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + deploy: device: gaudi version: 1.1.0 @@ -68,9 +71,9 @@ benchmark: seed: 1024 # workload, all of the test cases will run for benchmark - test_cases: - - chatqnafixed - - chatqna_qlist_pubmed: + test_cases: + - chatqnafixed + - chatqna_qlist_pubmed: dataset: pub_med10 # pub_med10, pub_med100, pub_med1000 user_queries: [1, 2, 4] query_token_size: 128 # if specified, means fixed query token size will be sent out diff --git a/README-deploy-benchmark.md b/README-deploy-benchmark.md index a3d8d57d5..4b813cccc 100644 --- a/README-deploy-benchmark.md +++ b/README-deploy-benchmark.md @@ -21,6 +21,7 @@ We aim to run these benchmarks and share them with the OPEA community for three Before running the benchmarks, ensure you have: 1. **Kubernetes Environment** + - Kubernetes installation: Use [kubespray](https://github.com/opea-project/docs/blob/main/guide/installation/k8s_install/k8s_install_kubespray.md) or other official Kubernetes installation guides - (Optional) [Kubernetes set up guide on Intel Gaudi product](https://github.com/opea-project/GenAIInfra/blob/main/README.md#setup-kubernetes-cluster) @@ -32,6 +33,7 @@ Before running the benchmarks, ensure you have: Before running benchmarks, you need to: 1. **Prepare Test Data** + - Download the retrieval file: ```bash wget https://github.com/opea-project/GenAIEval/tree/main/evals/benchmark/data/upload_file.txt @@ -55,6 +57,7 @@ The benchmarking process consists of two main components: deployment and benchma The script `deploy_and_benchmark.py` serves as the main entry point. Here's an example using ChatQnA configuration (you can replace it with any other example's configuration YAML file): 1. For a specific number of nodes: + ```bash python deploy_and_benchmark.py ./ChatQnA/benchmark_chatqna.yaml --target-node 1 ``` diff --git a/benchmark.py b/benchmark.py index 0a8360db6..fb20367c0 100644 --- a/benchmark.py +++ b/benchmark.py @@ -3,12 +3,11 @@ import os import sys -import yaml from datetime import datetime -from kubernetes import client, config +import yaml from evals.benchmark.stresscli.commands.load_test import locust_runtests - +from kubernetes import client, config # only support chatqna for now service_endpoints = { @@ -111,6 +110,7 @@ def _create_yaml_content(service, base_url, bench_target, test_phase, num_querie concurrency = test_params["concurrent_level"] import importlib.util + package_name = "opea-eval" spec = importlib.util.find_spec(package_name) print(spec) @@ -118,11 +118,12 @@ def _create_yaml_content(service, base_url, bench_target, test_phase, num_querie # get folder path of opea-eval eval_path = None import pkg_resources + for dist in pkg_resources.working_set: - if 'opea-eval' in dist.project_name: + if "opea-eval" in dist.project_name: eval_path = dist.location if not eval_path: - print(f"Fail to load opea-eval package. Please install it first.") + print("Fail to load opea-eval package. Please install it first.") exit(1) yaml_content = { @@ -157,9 +158,7 @@ def _create_yaml_content(service, base_url, bench_target, test_phase, num_querie return yaml_content -def _create_stresscli_confs( - case_params, test_params, test_phase, num_queries, base_url, ts -) -> str: +def _create_stresscli_confs(case_params, test_params, test_phase, num_queries, base_url, ts) -> str: """Create a stresscli configuration file and persist it on disk.""" stresscli_confs = [] # Get the workload @@ -173,9 +172,8 @@ def _create_stresscli_confs( bench_target = list(test_case.keys())[0] dataset_conf = test_case[bench_target] if bench_target == "chatqna_qlist_pubmed": - max_lines = dataset_conf['dataset'].split("pub_med")[-1] - stresscli_conf['envs'] = {'DATASET': f"pubmed_{max_lines}.txt", - 'MAX_LINES': max_lines} + max_lines = dataset_conf["dataset"].split("pub_med")[-1] + stresscli_conf["envs"] = {"DATASET": f"pubmed_{max_lines}.txt", "MAX_LINES": max_lines} # Generate the content of stresscli configuration file stresscli_yaml = _create_yaml_content(case_params, base_url, bench_target, test_phase, num_queries, test_params) @@ -186,7 +184,7 @@ def _create_stresscli_confs( ) with open(run_yaml_path, "w") as yaml_file: yaml.dump(stresscli_yaml, yaml_file) - stresscli_conf['run_yaml_path'] = run_yaml_path + stresscli_conf["run_yaml_path"] = run_yaml_path stresscli_confs.append(stresscli_conf) return stresscli_confs @@ -200,28 +198,18 @@ def create_stresscli_confs(service, base_url, test_suite_config, index): # Add YAML configuration of stresscli for warm-ups warm_ups = test_suite_config["warm_ups"] if warm_ups is not None and warm_ups > 0: - stresscli_confs.extend( - _create_stresscli_confs( - service, test_suite_config, "warmup", warm_ups, base_url, index - ) - ) + stresscli_confs.extend(_create_stresscli_confs(service, test_suite_config, "warmup", warm_ups, base_url, index)) # Add YAML configuration of stresscli for benchmark user_queries_lst = test_suite_config["user_queries"] if user_queries_lst is None or len(user_queries_lst) == 0: # Test stop is controlled by run time - stresscli_confs.extend( - _create_stresscli_confs( - service, test_suite_config, "benchmark", -1, base_url, index - ) - ) + stresscli_confs.extend(_create_stresscli_confs(service, test_suite_config, "benchmark", -1, base_url, index)) else: # Test stop is controlled by request count for user_queries in user_queries_lst: stresscli_confs.extend( - _create_stresscli_confs( - service, test_suite_config, "benchmark", user_queries, base_url, index - ) + _create_stresscli_confs(service, test_suite_config, "benchmark", user_queries, base_url, index) ) return stresscli_confs @@ -243,7 +231,7 @@ def _run_service_test(example, service, test_suite_config): deployment_type, test_suite_config.get("service_ip"), test_suite_config.get("service_port"), - test_suite_config.get("namespace") + test_suite_config.get("namespace"), ) base_url = f"http://{svc_ip}:{port}" @@ -255,18 +243,16 @@ def _run_service_test(example, service, test_suite_config): timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # Create the run.yaml for the service - stresscli_confs = create_stresscli_confs( - service, base_url, test_suite_config, timestamp - ) + stresscli_confs = create_stresscli_confs(service, base_url, test_suite_config, timestamp) # Do benchmark in for-loop for different user queries output_folders = [] for index, stresscli_conf in enumerate(stresscli_confs, start=1): - run_yaml_path = stresscli_conf['run_yaml_path'] + run_yaml_path = stresscli_conf["run_yaml_path"] print(f"[OPEA BENCHMARK] 🚀 The {index} time test is running, run yaml: {run_yaml_path}...") - os.environ['MAX_TOKENS'] = str(service.get("max_output")) - if stresscli_conf.get('envs') is not None: - for key, value in stresscli_conf.get('envs').items(): + os.environ["MAX_TOKENS"] = str(service.get("max_output")) + if stresscli_conf.get("envs") is not None: + for key, value in stresscli_conf.get("envs").items(): os.environ[key] = value output_folders.append(locust_runtests(None, run_yaml_path)) @@ -283,7 +269,7 @@ def run_benchmark(benchmark_config, chart_name, namespace, llm_model=None, repor # Extract data parsed_data = construct_benchmark_config(benchmark_config) test_suite_config = { - "user_queries": parsed_data['user_queries'], # num of user queries + "user_queries": parsed_data["user_queries"], # num of user queries "random_prompt": False, # whether to use random prompt, set to False by default "run_time": "60m", # The max total run time for the test suite, set to 60m by default "collect_service_metric": False, # whether to collect service metrics, set to False by default @@ -292,23 +278,25 @@ def run_benchmark(benchmark_config, chart_name, namespace, llm_model=None, repor "service_ip": None, # Leave as None for k8s, specify for Docker "service_port": None, # Leave as None for k8s, specify for Docker "test_output_dir": os.getcwd() + "/benchmark_output", # The directory to store the test output - "load_shape": {"name": "constant", - "params": {"constant": {"concurrent_level": 4}, "poisson": {"arrival_rate": 1.0}}}, + "load_shape": { + "name": "constant", + "params": {"constant": {"concurrent_level": 4}, "poisson": {"arrival_rate": 1.0}}, + }, "concurrent_level": 4, "arrival_rate": 1.0, "query_timeout": 120, - "warm_ups": parsed_data['warmup_iterations'], - "seed": parsed_data['seed'], + "warm_ups": parsed_data["warmup_iterations"], + "seed": parsed_data["seed"], "namespace": namespace, "test_cases": parsed_data["test_cases"], - "llm_max_token_size": parsed_data['llm_max_token_size'] + "llm_max_token_size": parsed_data["llm_max_token_size"], } dataset = None query_data = None # Do benchmark in for-loop for different llm_max_token_size - for llm_max_token in parsed_data['llm_max_token_size']: + for llm_max_token in parsed_data["llm_max_token_size"]: print(f"[OPEA BENCHMARK] 🚀 Run benchmark on {dataset} with llm max-output-token {llm_max_token}.") case_data = {} # Support chatqna only for now @@ -325,13 +313,13 @@ def run_benchmark(benchmark_config, chart_name, namespace, llm_model=None, repor "chatqna-retriever-usvc", "chatqna-tei", "chatqna-teirerank", - "chatqna-tgi" + "chatqna-tgi", ], "test_cases": parsed_data["test_cases"], # Activate if random_prompt=true: leave blank = default dataset(WebQuestions) or sharegpt "prompts": query_data, "max_output": llm_max_token, # max number of output tokens - "k": 1 # number of retrieved documents + "k": 1, # number of retrieved documents } output_folder = _run_service_test(chart_name, case_data, test_suite_config) @@ -352,8 +340,4 @@ def run_benchmark(benchmark_config, chart_name, namespace, llm_model=None, repor if __name__ == "__main__": benchmark_config = load_yaml("./benchmark.yaml") - run_benchmark( - benchmark_config=benchmark_config, - chart_name='chatqna', - namespace='deploy-benchmark' - ) + run_benchmark(benchmark_config=benchmark_config, chart_name="chatqna", namespace="deploy-benchmark") diff --git a/deploy.py b/deploy.py index 6d7506c38..21dd278cc 100644 --- a/deploy.py +++ b/deploy.py @@ -8,17 +8,17 @@ import subprocess import sys import time +from enum import Enum, auto import yaml -from enum import Enum, auto - ################################################################################ # # # HELM VALUES GENERATION SECTION # # # ################################################################################ + def configure_node_selectors(values, node_selector, deploy_config): """Configure node selectors for all services.""" for service_name, config in deploy_config["services"].items(): @@ -31,6 +31,7 @@ def configure_node_selectors(values, node_selector, deploy_config): values[service_name] = {"nodeSelector": {key: value for key, value in node_selector.items()}} return values + def configure_replica(values, deploy_config): """Get replica configuration based on example type and node count.""" for service_name, config in deploy_config["services"].items(): @@ -47,6 +48,7 @@ def configure_replica(values, deploy_config): return values + def get_output_filename(num_nodes, with_rerank, example_type, device, action_type): """Generate output filename based on configuration.""" rerank_suffix = "with-rerank-" if with_rerank else "" @@ -54,6 +56,7 @@ def get_output_filename(num_nodes, with_rerank, example_type, device, action_typ return f"{example_type}-{num_nodes}-{device}-{action_suffix}{rerank_suffix}values.yaml" + def configure_resources(values, deploy_config): """Configure resources when tuning is enabled.""" resource_configs = [] @@ -87,15 +90,19 @@ def configure_resources(values, deploy_config): if resources: if service_name == "llm": engine = config.get("engine", "tgi") - resource_configs.append({ - "name": engine, - "resources": resources, - }) + resource_configs.append( + { + "name": engine, + "resources": resources, + } + ) else: - resource_configs.append({ - "name": service_name, - "resources": resources, - }) + resource_configs.append( + { + "name": service_name, + "resources": resources, + } + ) for config in [r for r in resource_configs if r]: service_name = config["name"] @@ -106,13 +113,19 @@ def configure_resources(values, deploy_config): return values + def configure_extra_cmd_args(values, deploy_config): """Configure extra command line arguments for services.""" for service_name, config in deploy_config["services"].items(): extra_cmd_args = [] - for param in ["max_batch_size", "max_input_length", "max_total_tokens", - "max_batch_total_tokens", "max_batch_prefill_tokens"]: + for param in [ + "max_batch_size", + "max_input_length", + "max_total_tokens", + "max_batch_total_tokens", + "max_batch_prefill_tokens", + ]: if config.get(param): extra_cmd_args.extend([f"--{param.replace('_', '-')}", str(config[param])]) @@ -129,6 +142,7 @@ def configure_extra_cmd_args(values, deploy_config): return values + def configure_models(values, deploy_config): """Configure model settings for services.""" for service_name, config in deploy_config["services"].items(): @@ -147,8 +161,9 @@ def configure_models(values, deploy_config): return values + def configure_rerank(values, with_rerank, deploy_config, example_type, node_selector): - """Configure rerank service""" + """Configure rerank service.""" if with_rerank: if "teirerank" not in values: values["teirerank"] = {"nodeSelector": {key: value for key, value in node_selector.items()}} @@ -163,6 +178,7 @@ def configure_rerank(values, with_rerank, deploy_config, example_type, node_sele values["teirerank"]["enabled"] = False return values + def generate_helm_values(example_type, deploy_config, chart_dir, action_type, node_selector=None): """Create a values.yaml file based on the provided configuration.""" if deploy_config is None: @@ -170,10 +186,7 @@ def generate_helm_values(example_type, deploy_config, chart_dir, action_type, no # Ensure the chart_dir exists if not os.path.exists(chart_dir): - return { - "status": "false", - "message": f"Chart directory {chart_dir} does not exist" - } + return {"status": "false", "message": f"Chart directory {chart_dir} does not exist"} num_nodes = deploy_config.get("node", 1) with_rerank = deploy_config["services"].get("teirerank", {}).get("enabled", False) @@ -214,12 +227,14 @@ def generate_helm_values(example_type, deploy_config, chart_dir, action_type, no print(f"YAML file {filepath} has been generated.") return {"status": "success", "filepath": filepath} + ################################################################################ # # # DEPLOYMENT SECTION # # # ################################################################################ + def run_kubectl_command(command): """Run a kubectl command and return the output.""" try: @@ -388,14 +403,14 @@ def uninstall_helm_release(release_name, namespace=None): def update_service(release_name, chart_name, namespace, hw_values_file, deploy_values_file, update_values_file): """Update the deployment using helm upgrade with new values. - + Args: release_name: The helm release name namespace: The kubernetes namespace deploy_config: The deployment configuration chart_name: The chart name for the deployment """ - + # Construct helm upgrade command command = [ "helm", @@ -409,13 +424,13 @@ def update_service(release_name, chart_name, namespace, hw_values_file, deploy_v "-f", deploy_values_file, "-f", - update_values_file + update_values_file, ] # Execute helm upgrade print(f"Running command: {' '.join(command)}") run_kubectl_command(command) - print(f"Deployment updated successfully") - + print("Deployment updated successfully") + def read_deploy_config(config_path): """Read and parse the deploy config file. @@ -427,7 +442,7 @@ def read_deploy_config(config_path): The parsed deploy config dictionary or None if failed """ try: - with open(config_path, 'r') as f: + with open(config_path, "r") as f: return yaml.safe_load(f) except Exception as e: print(f"Failed to load deploy config: {str(e)}") @@ -464,19 +479,16 @@ def check_deployment_ready(release_name, namespace, timeout=300, interval=5, log # Loop through each deployment to check its readiness for deployment_name in deployments: - if '-' not in deployment_name or 'ui' in deployment_name or 'nginx' in deployment_name: + if "-" not in deployment_name or "ui" in deployment_name or "nginx" in deployment_name: continue - instance_name = deployment_name.split('-', 1)[0] - app_name = deployment_name.split('-', 1)[1] + instance_name = deployment_name.split("-", 1)[0] + app_name = deployment_name.split("-", 1)[1] if instance_name != release_name: continue - cmd = [ - "kubectl", "-n", namespace, "get", "deployment", deployment_name, - "-o", "jsonpath={.spec.replicas}" - ] + cmd = ["kubectl", "-n", namespace, "get", "deployment", deployment_name, "-o", "jsonpath={.spec.replicas}"] desired_replicas = int(subprocess.check_output(cmd, text=True).strip()) with open(logfile, "a") as log: @@ -484,24 +496,37 @@ def check_deployment_ready(release_name, namespace, timeout=300, interval=5, log while True: cmd = [ - "kubectl", "-n", namespace, "get", "pods", - "-l", f"app.kubernetes.io/instance={instance_name}", - "-l", f"app.kubernetes.io/name={app_name}", - "--field-selector=status.phase=Running", "-o", "json" + "kubectl", + "-n", + namespace, + "get", + "pods", + "-l", + f"app.kubernetes.io/instance={instance_name}", + "-l", + f"app.kubernetes.io/name={app_name}", + "--field-selector=status.phase=Running", + "-o", + "json", ] pods_output = subprocess.check_output(cmd, text=True) pods = json.loads(pods_output) ready_pods = sum( - 1 for pod in pods["items"] if - all(container.get('ready') for container in pod.get('status', {}).get('containerStatuses', [])) + 1 + for pod in pods["items"] + if all(container.get("ready") for container in pod.get("status", {}).get("containerStatuses", [])) ) - terminating_pods = sum(1 for pod in pods["items"] if pod.get("metadata", {}).get("deletionTimestamp") is not None) + terminating_pods = sum( + 1 for pod in pods["items"] if pod.get("metadata", {}).get("deletionTimestamp") is not None + ) with open(logfile, "a") as log: - log.write(f"Ready pods: {ready_pods}, Desired replicas: {desired_replicas}, Terminating pods: {terminating_pods}\n") + log.write( + f"Ready pods: {ready_pods}, Desired replicas: {desired_replicas}, Terminating pods: {terminating_pods}\n" + ) if ready_pods == desired_replicas and terminating_pods == 0: with open(logfile, "a") as log: @@ -510,7 +535,9 @@ def check_deployment_ready(release_name, namespace, timeout=300, interval=5, log if timer >= timeout: with open(logfile, "a") as log: - log.write(f"Timeout reached for deployment '{deployment_name}'. Not all pods are running and ready.\n") + log.write( + f"Timeout reached for deployment '{deployment_name}'. Not all pods are running and ready.\n" + ) return 1 # Failure time.sleep(interval) @@ -606,7 +633,7 @@ def main(): example_type=args.chart_name, deploy_config=deploy_config, chart_dir=args.chart_dir, - action_type=action_type, # 0 - deploy, 1 - update + action_type=action_type, # 0 - deploy, 1 - update node_selector=node_selector, ) @@ -630,12 +657,7 @@ def main(): try: update_service( - args.chart_name, - args.chart_name, - args.namespace, - hw_values_file, - args.user_values, - values_file_path + args.chart_name, args.chart_name, args.namespace, hw_values_file, args.user_values, values_file_path ) return except Exception as e: @@ -644,13 +666,7 @@ def main(): # Deploy unless --create-values-only is specified if not args.create_values_only: - install_helm_release( - args.chart_name, - args.chart_name, - args.namespace, - hw_values_file, - values_file_path - ) + install_helm_release(args.chart_name, args.chart_name, args.namespace, hw_values_file, values_file_path) print(f"values_file_path: {values_file_path}") diff --git a/deploy_and_benchmark.py b/deploy_and_benchmark.py index a26ed0b68..1dc4c4308 100644 --- a/deploy_and_benchmark.py +++ b/deploy_and_benchmark.py @@ -1,69 +1,75 @@ -import yaml -import subprocess -import sys -import os -import copy +# Copyright (C) 2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + import argparse -import shutil +import copy +import os import re +import shutil +import subprocess +import sys + +import yaml from benchmark import run_benchmark + def read_yaml(file_path): try: - with open(file_path, 'r') as file: + with open(file_path, "r") as file: return yaml.safe_load(file) except Exception as e: print(f"Error reading YAML file: {e}") return None + def construct_deploy_config(deploy_config, target_node, max_batch_size=None): - """ - Construct a new deploy config based on the target node number and optional max_batch_size. - + """Construct a new deploy config based on the target node number and optional max_batch_size. + Args: deploy_config: Original deploy config dictionary target_node: Target node number to match in the node array max_batch_size: Optional specific max_batch_size value to use - + Returns: A new deploy config with single values for node and instance_num """ # Deep copy the original config to avoid modifying it new_config = copy.deepcopy(deploy_config) - + # Get the node array and validate - nodes = deploy_config.get('node') + nodes = deploy_config.get("node") if not isinstance(nodes, list): raise ValueError("deploy_config['node'] must be an array") - + # Find the index of the target node try: node_index = nodes.index(target_node) except ValueError: raise ValueError(f"Target node {target_node} not found in node array {nodes}") - + # Set the single node value - new_config['node'] = target_node - + new_config["node"] = target_node + # Update instance_num for each service based on the same index - for service_name, service_config in new_config.get('services', {}).items(): - if 'replicaCount' in service_config: - instance_nums = service_config['replicaCount'] + for service_name, service_config in new_config.get("services", {}).items(): + if "replicaCount" in service_config: + instance_nums = service_config["replicaCount"] if isinstance(instance_nums, list): if len(instance_nums) != len(nodes): raise ValueError( f"instance_num array length ({len(instance_nums)}) for service {service_name} " f"doesn't match node array length ({len(nodes)})" ) - service_config['replicaCount'] = instance_nums[node_index] - + service_config["replicaCount"] = instance_nums[node_index] + # Update max_batch_size if specified - if max_batch_size is not None and 'llm' in new_config['services']: - new_config['services']['llm']['max_batch_size'] = max_batch_size - + if max_batch_size is not None and "llm" in new_config["services"]: + new_config["services"]["llm"]["max_batch_size"] = max_batch_size + return new_config + def pull_helm_chart(chart_pull_url, version, chart_name): # Pull and untar the chart subprocess.run(["helm", "pull", chart_pull_url, "--version", version, "--untar"], check=True) @@ -77,9 +83,9 @@ def pull_helm_chart(chart_pull_url, version, chart_name): return untar_dir + def main(yaml_file, target_node=None): - """ - Main function to process deployment configuration. + """Main function to process deployment configuration. Args: yaml_file: Path to the YAML configuration file @@ -90,23 +96,23 @@ def main(yaml_file, target_node=None): print("Failed to read YAML file.") return None - deploy_config = config['deploy'] - benchmark_config = config['benchmark'] + deploy_config = config["deploy"] + benchmark_config = config["benchmark"] # Extract chart name from the YAML file name - chart_name = os.path.splitext(os.path.basename(yaml_file))[0].split('_')[-1] + chart_name = os.path.splitext(os.path.basename(yaml_file))[0].split("_")[-1] print(f"chart_name: {chart_name}") python_cmd = sys.executable # Process nodes - nodes = deploy_config.get('node', []) + nodes = deploy_config.get("node", []) if not isinstance(nodes, list): print("Error: deploy_config['node'] must be an array") return None nodes_to_process = [target_node] if target_node is not None else nodes - node_names = deploy_config.get('node_name', []) - namespace = deploy_config.get('namespace', "default") + node_names = deploy_config.get("node_name", []) + namespace = deploy_config.get("namespace", "default") # Pull the Helm chart chart_pull_url = f"oci://ghcr.io/opea-project/charts/{chart_name}" @@ -124,17 +130,9 @@ def main(yaml_file, target_node=None): # Add labels for current node configuration print(f"Adding labels for {node} nodes...") - cmd = [ - python_cmd, - 'deploy.py', - '--chart-name', - chart_name, - '--num-nodes', - str(node), - '--add-label' - ] + cmd = [python_cmd, "deploy.py", "--chart-name", chart_name, "--num-nodes", str(node), "--add-label"] if current_node_names: - cmd.extend(['--node-names'] + current_node_names) + cmd.extend(["--node-names"] + current_node_names) result = subprocess.run(cmd, check=True) if result.returncode != 0: @@ -143,11 +141,11 @@ def main(yaml_file, target_node=None): try: # Process max_batch_sizes - max_batch_sizes = deploy_config.get('services', {}).get('llm', {}).get('max_batch_size', []) + max_batch_sizes = deploy_config.get("services", {}).get("llm", {}).get("max_batch_size", []) if not isinstance(max_batch_sizes, list): max_batch_sizes = [max_batch_sizes] - values_file_path= None + values_file_path = None for i, max_batch_size in enumerate(max_batch_sizes): print(f"\nProcessing max_batch_size: {max_batch_size}") @@ -157,22 +155,22 @@ def main(yaml_file, target_node=None): # Write the new deploy config to a temporary file temp_config_file = f"temp_deploy_config_{node}_{max_batch_size}.yaml" try: - with open(temp_config_file, 'w') as f: + with open(temp_config_file, "w") as f: yaml.dump(new_deploy_config, f) if i == 0: # First iteration: full deployment cmd = [ python_cmd, - 'deploy.py', - '--deploy-config', + "deploy.py", + "--deploy-config", temp_config_file, - '--chart-name', + "--chart-name", chart_name, - '--namespace', + "--namespace", namespace, - '--chart-dir', - chart_dir + "--chart-dir", + chart_dir, ] result = subprocess.run(cmd, check=True, capture_output=True, text=True) @@ -187,52 +185,55 @@ def main(yaml_file, target_node=None): # Subsequent iterations: update services with config change cmd = [ python_cmd, - 'deploy.py', - '--deploy-config', + "deploy.py", + "--deploy-config", temp_config_file, - '--chart-name', + "--chart-name", chart_name, - '--namespace', + "--namespace", namespace, - '--chart-dir', + "--chart-dir", chart_dir, - '--user-values', + "--user-values", values_file_path, - '--update-service' + "--update-service", ] result = subprocess.run(cmd, check=True) if result.returncode != 0: - print(f"Update failed for {node} nodes configuration with max_batch_size {max_batch_size}") + print( + f"Update failed for {node} nodes configuration with max_batch_size {max_batch_size}" + ) break # Skip remaining max_batch_sizes for this node # Wait for deployment to be ready print("\nWaiting for deployment to be ready...") cmd = [ python_cmd, - 'deploy.py', - '--chart-name', + "deploy.py", + "--chart-name", chart_name, - '--namespace', + "--namespace", namespace, - '--check-ready' + "--check-ready", ] try: result = subprocess.run(cmd, check=True) - print(f"Deployments are ready!") + print("Deployments are ready!") except subprocess.CalledProcessError as e: - print(f"Depoyments status failed with returncode: {e.returncode}") + print(f"Deployments status failed with returncode: {e.returncode}") # Run benchmark run_benchmark( benchmark_config=benchmark_config, chart_name=chart_name, namespace=namespace, - llm_model=deploy_config.get('services', {}).get('llm', {}).get('model_id', "") + llm_model=deploy_config.get("services", {}).get("llm", {}).get("model_id", ""), ) - except Exception as e: - print(f"Error during {'deployment' if i == 0 else 'update'} for {node} nodes with max_batch_size {max_batch_size}: {str(e)}") + print( + f"Error during {'deployment' if i == 0 else 'update'} for {node} nodes with max_batch_size {max_batch_size}: {str(e)}" + ) break # Skip remaining max_batch_sizes for this node finally: # Clean up the temporary file @@ -244,12 +245,12 @@ def main(yaml_file, target_node=None): print(f"\nUninstalling deployment for {node} nodes...") cmd = [ python_cmd, - 'deploy.py', - '--chart-name', + "deploy.py", + "--chart-name", chart_name, - '--namespace', + "--namespace", namespace, - '--uninstall', + "--uninstall", ] try: result = subprocess.run(cmd, check=True) @@ -260,17 +261,9 @@ def main(yaml_file, target_node=None): # Delete labels for current node configuration print(f"Deleting labels for {node} nodes...") - cmd = [ - python_cmd, - 'deploy.py', - '--chart-name', - chart_name, - '--num-nodes', - str(node), - '--delete-label' - ] + cmd = [python_cmd, "deploy.py", "--chart-name", chart_name, "--num-nodes", str(node), "--delete-label"] if current_node_names: - cmd.extend(['--node-names'] + current_node_names) + cmd.extend(["--node-names"] + current_node_names) try: result = subprocess.run(cmd, check=True) @@ -289,6 +282,7 @@ def main(yaml_file, target_node=None): shutil.rmtree(chart_dir) print("Temporary directory removed successfully.") + if __name__ == "__main__": parser = argparse.ArgumentParser(description="Deploy and benchmark with specific node configuration.") parser.add_argument("yaml_file", help="Path to the YAML configuration file")