Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Neural Solution Resource Management #1060

Merged
merged 11 commits into from
Jul 7, 2023
Merged
1 change: 1 addition & 0 deletions .azure-pipelines/scripts/codeScan/pyspelling/inc_dict.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2677,3 +2677,4 @@ jJA
wWLes
xHKe
PR
hostname
49 changes: 46 additions & 3 deletions neural_solution/backend/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,54 @@ def free_resource(self, reserved_resource_lst):
"""
self.cursor.execute(sql, (free_resources[node_id], free_resources[node_id], node_id))
self.conn.commit()
# delete nodes with status of remove, some version without RETURNING syntax
self.cursor.execute("SELECT id FROM cluster WHERE status='remove' AND busy_sockets=0")
deleted_ids = self.cursor.fetchall()
deleted_ids = [str(id_tuple[0]) for id_tuple in deleted_ids]
self.cursor.execute("DELETE FROM cluster WHERE status='remove' AND busy_sockets=0")
self.conn.commit()

# remove deleted nodes from socket queue
socket_queue_delete_ids = [socket for socket in self.socket_queue if socket.split()[0] in deleted_ids]
if len(socket_queue_delete_ids) > 0:
logger.info(f"[Cluster] remove node-list {socket_queue_delete_ids} from socket_queue: {self.socket_queue}")
self.socket_queue = [socket for socket in self.socket_queue if socket.split()[0] not in deleted_ids]
logger.info(f"[Cluster] free resource {reserved_resource_lst}, now have free resource {self.socket_queue}")

@synchronized
def get_free_socket(self, num_sockets: int) -> List[str]:
"""Get the free sockets list."""
booked_socket_lst = []

# detect and append new resource
self.cursor.execute(f"SELECT id, name, total_sockets FROM cluster where status = 'join'")
new_node_lst = self.cursor.fetchall()
for index, name, total_sockets in new_node_lst:
sql = """
UPDATE cluster
SET status = ?
WHERE id = ?
"""
self.cursor.execute(sql, ('alive', index))
self.conn.commit()
self.socket_queue += [str(index) + " " + name] * total_sockets
logger.info(f"[Cluster] add new node-id {index} to socket_queue: {self.socket_queue}")

# do not assign nodes with status of remove
# remove to-delete nodes from socket queue
self.cursor.execute("SELECT id FROM cluster WHERE status='remove'")
deleted_ids = self.cursor.fetchall()
deleted_ids = [str(id_tuple[0]) for id_tuple in deleted_ids]

socket_queue_delete_ids = [socket for socket in self.socket_queue if socket.split()[0] in deleted_ids]
if len(socket_queue_delete_ids) > 0:
logger.info(f"[Cluster] remove node-list {socket_queue_delete_ids} from socket_queue: {self.socket_queue}")
self.socket_queue = [socket for socket in self.socket_queue if socket.split()[0] not in deleted_ids]

# delete nodes with status of remove
self.cursor.execute("DELETE FROM cluster WHERE status='remove' AND busy_sockets=0")
self.conn.commit()

if len(self.socket_queue) < num_sockets:
logger.info(f"Can not allocate {num_sockets} sockets, due to only {len(self.socket_queue)} left.")
return 0
Expand All @@ -111,6 +153,7 @@ def initial_cluster_from_node_lst(self, node_lst):
self.cursor.execute("drop table if exists cluster ")
self.cursor.execute(
r"create table cluster(id INTEGER PRIMARY KEY AUTOINCREMENT,"
+ "name varchar(100),"
+ "node_info varchar(500),"
+ "status varchar(100),"
+ "free_sockets int,"
Expand All @@ -121,9 +164,9 @@ def initial_cluster_from_node_lst(self, node_lst):
for index, node in enumerate(self.node_lst):
self.socket_queue += [str(index + 1) + " " + node.name] * node.num_sockets
self.cursor.execute(
r"insert into cluster(node_info, status, free_sockets, busy_sockets, total_sockets)"
+ "values ('{}', '{}', {}, {}, {})".format(
repr(node).replace("Node", f"Node{index+1}"), "alive", node.num_sockets, 0, node.num_sockets
r"insert into cluster(name, node_info, status, free_sockets, busy_sockets, total_sockets)"
+ "values ('{}', '{}', '{}', {}, {}, {})".format(
node.name, repr(node).replace("Node", f"Node{index+1}"), "alive", node.num_sockets, 0, node.num_sockets
)
)

Expand Down
42 changes: 42 additions & 0 deletions neural_solution/docs/source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
- [Query task status](#query-task-status)
- [Stop service](#stop-service)
- [Inspect logs](#inspect-logs)
- [Manage resource](#manage-resource)
- [Node States](#node-states)
- [Query cluster](#query-cluster)
- [Add node](#add-node)
- [Remove node](#remove-node)

## Install Neural Solution
### Prerequisites
Expand Down Expand Up @@ -126,3 +131,40 @@ There are several logs under workspace:

```

## Manage resource
Neural Solution supports cluster management for service maintainers, providing several command-line tools for efficient resource management.

### Node States

Each node in the cluster can have three different states:

- Alive: Represents a node that is functioning properly and available to handle requests.
- Join: Indicates that a node is in the process of being added to the cluster but has not fully joined yet.
- Remove: Indicates that a node is scheduled to be removed from the cluster.

Below are some commonly used commands and their usage:

### Query cluster
This command is used to query the current status of the cluster. No additional parameters are required, simply enter the following command:
```shell
neural_solution cluster --query
```
### Add node
This command is used to add nodes to the cluster. You can either specify a host file or provide a list of nodes separated by ";". The node format consists of three parts: hostname, number_of_sockets, and cores_per_socket. Here's a breakdown of each part:

- hostname: This refers to the name or IP address of the node that you want to add to the cluster. It identifies the specific machine or server that will be part of the cluster.

- number_of_sockets: This indicates the number of physical CPU sockets available on the node. A socket is a physical component that houses one or more CPU cores. It represents a physical processor unit.

- cores_per_socket: This specifies the number of CPU cores present in each socket. A core is an individual processing unit within a CPU.

For example:
```shell
neural_solution cluster --join "host1 2 20; host2 4 20"
Kaihui-intel marked this conversation as resolved.
Show resolved Hide resolved
```
### Remove node
This command is used to remove nodes from the cluster based on the IDs obtained from the query. The IDs can be passed as a parameter to the command. For example:
```shell
neural_solution cluster --remove <query_id>
```
Please note that the above commands are just examples and may require additional parameters or configurations based on your specific setup.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ In this example, we show how to quantize a [custom model](https://github.com/int
- Demonstrate how to start the Neural Solution Service.
- Demonstrate how to prepare an optimization task request and submit it to Neural Solution Service.
- Demonstrate how to query the status of the task and fetch the optimization result.
- Demonstrate how to query and manage the resource of the cluster.

### Requirements
Customizing the model requires preparing the following folders and files.
Expand Down Expand Up @@ -48,12 +49,12 @@ neural_solution -h

usage: neural_solution {start,stop} [-h] [--hostfile HOSTFILE] [--restful_api_port RESTFUL_API_PORT] [--grpc_api_port GRPC_API_PORT]
[--result_monitor_port RESULT_MONITOR_PORT] [--task_monitor_port TASK_MONITOR_PORT] [--api_type API_TYPE]
[--workspace WORKSPACE] [--conda_env CONDA_ENV] [--upload_path UPLOAD_PATH]
[--workspace WORKSPACE] [--conda_env CONDA_ENV] [--upload_path UPLOAD_PATH] [--query] [--join JOIN] [--remove REMOVE]

Neural Solution

positional arguments:
{start,stop} start/stop service
{start,stop,cluster} start/stop/management service

optional arguments:
-h, --help show this help message and exit
Expand All @@ -73,6 +74,9 @@ optional arguments:
specify the running environment for the task
--upload_path UPLOAD_PATH
specify the file path for the tasks
--query [cluster parameter] query cluster information
--join JOIN [cluster parameter] add new node into cluster
--remove REMOVE [cluster parameter] remove <node-id> from cluster
```


Expand Down Expand Up @@ -155,6 +159,19 @@ When using distributed quantization, the `workers` needs to be set to greater th
# download quantized_model.zip
```

### Manage resource
```shell
# query cluster information
neural_solution cluster --query

# add new node into cluster
# parameter: "<node1> <number_of_sockets> <number_of_threads>;<node2> <number_of_sockets> <number_of_threads>"
neural_solution cluster --join "host1 2 20; host2 5 20"

# remove node from cluster according to id
neural_solution cluster --remove <node-id>
```

### Stop the service
```shell
neural_solution stop
Expand Down
23 changes: 20 additions & 3 deletions neural_solution/examples/hf_models/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ In this example, we show how to quantize a Hugging Face model with Neural Soluti
- Demonstrate how to start the Neural Solution Service.
- Demonstrate how to prepare an optimization task request and submit it to Neural Solution Service.
- Demonstrate how to query the status of the task and fetch the optimization result.
- Demonstrate how to query and manage the resource of the cluster.


### Start the Neural Solution Service
Expand All @@ -27,14 +28,14 @@ neural_solution stop
neural_solution -h
# Help output

usage: neural_solution {start,stop} [-h] [--hostfile HOSTFILE] [--restful_api_port RESTFUL_API_PORT] [--grpc_api_port GRPC_API_PORT]
usage: neural_solution {start,stop,cluster} [-h] [--hostfile HOSTFILE] [--restful_api_port RESTFUL_API_PORT] [--grpc_api_port GRPC_API_PORT]
[--result_monitor_port RESULT_MONITOR_PORT] [--task_monitor_port TASK_MONITOR_PORT] [--api_type API_TYPE]
[--workspace WORKSPACE] [--conda_env CONDA_ENV] [--upload_path UPLOAD_PATH]
[--workspace WORKSPACE] [--conda_env CONDA_ENV] [--upload_path UPLOAD_PATH] [--query] [--join JOIN] [--remove REMOVE]

Neural Solution

positional arguments:
{start,stop} start/stop service
{start,stop,cluster} start/stop/management service

optional arguments:
-h, --help show this help message and exit
Expand All @@ -54,6 +55,9 @@ optional arguments:
specify the running environment for the task
--upload_path UPLOAD_PATH
specify the file path for the tasks
--query [cluster parameter] query cluster information
--join JOIN [cluster parameter] add new node into cluster
--remove REMOVE [cluster parameter] remove <node-id> from cluster
```


Expand Down Expand Up @@ -118,6 +122,19 @@ optional arguments:
``` shell
[user@server tf_example1]$ curl -X GET http://localhost:8000/download/{task_id} --output quantized_model.zip
# download quantized_model.zip
```
### Manage resource
```shell
# query cluster information
neural_solution cluster --query

# add new node into cluster
# parameter: "<node1> <number_of_sockets> <number_of_threads>;<node2> <number_of_sockets> <number_of_threads>"
Kaihui-intel marked this conversation as resolved.
Show resolved Hide resolved
neural_solution cluster --join "host1 2 20; host2 5 20"

# remove node from cluster according to id
neural_solution cluster --remove <node-id>

```
### Stop the service
```shell
Expand Down
Loading