Skip to content

Commit

Permalink
Merge pull request #125 from beckernick/feature/cleanup-cluster-startup
Browse files Browse the repository at this point in the history
[REVIEW] Streamline cluster startup
  • Loading branch information
beckernick authored Nov 2, 2020
2 parents 7068944 + 590428f commit 474b00f
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 104 deletions.
40 changes: 16 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,27 @@ Before running the script, you'll make changes specific to your environment.
In `cluster_configuration/cluster-startup.sh`:
- Update `TPCX_BB_HOME=...` to location on disk of this repo
- Update `INTERFACE=...` to refer to the relevant network interface present on your cluster.
- Update `CONDA_ENV_PATH=...` to refer to your conda environment path.
- Update `CONDA_ENV_NAME=...` to refer to the name of the conda environment you created, perhaps using the `yml` files provided in this repository.
- Update `SCHEDULER=...` to refer to the host name of the node you intend to use as the scheduler.
- Update `SCHEDULER_FILE=...` to refer to the location of your scheduler file
- Update `INTERFACE=...` to refer to the relevant network interface present on your cluster.
- Update `CLUSTER_MODE="NVLINK"` to refer to your communication method, either "TCP" or "NVLINK".
- You may also need to change the `LOCAL_DIRECTORY` and `WORKER_DIR` depending on your filesystem. Make sure that these point to a location to which you have write access and that `LOCAL_DIRECTORY` is accessible from all nodes.
To start up the cluster on your scheduler node, please run the following from `tpcx_bb/cluster_configuration/`. This will spin up a scheduler and one Dask worker per GPU.
In `cluster_configuration/example-cluster-scheduler.json`:
Update the scheduler address to be the address for the network interface you chose for `INTERFACE=...` above. If you are not using UCX, you'll need to adjust the address to be `tcp://...` rather than `ucx://...`. Note that `example-cluster-scheduler.json` is just an example scheduler configuration. See [the Dask docs](https://docs.dask.org/en/latest/setup/hpc.html#using-a-shared-network-file-system-and-a-job-scheduler) for more details on how you can generate your own and make it available to all cluster nodes.
```bash
bash cluster-startup.sh SCHEDULER
```
To start up the cluster, please run the following on every node from `tpcx_bb/cluster_configuration/`.
Then run the following on every other node from `tpcx_bb/cluster_configuration/`.
```bash
bash cluster-startup.sh NVLINK
bash cluster-startup.sh
```
This will spin up one Dask worker per GPU. If you are running on a single node, you will only need to run `bash cluster-startup.sh SCHEDULER`.
## Running the Queries
Expand All @@ -82,7 +88,7 @@ To run a query, starting from the repository root, go to the query specific subd
cd tpcx_bb/queries/q07/
```
The queries assume that they can attach to a running Dask cluster. Cluster address and other benchmark configuration lives in a yaml file.
The queries assume that they can attach to a running Dask cluster. Cluster address and other benchmark configuration lives in a yaml file. You will need to fill this out as appropriate.
```bash
conda activate rapids-tpcx-bb
Expand All @@ -107,27 +113,13 @@ To run all queries, cd to `tpcx_bb/` and:
python benchmark_runner.py --config_file benchmark_runner/benchmark_config.yaml
```
By default, this will run each query once. You can control the number of repeats by changing the `N_REPEATS` variable in the script.
By default, this will run each Dask query once, and, if BlazingSQL queries are enabled in `benchmark_config.yaml`, each BlazingSQL query once. You can control the number of repeats by changing the `N_REPEATS` variable in the script.
## BlazingSQL
We include BlazingSQL implementations of several queries. As we continue scale testing BlazingSQL implementations, we'll add them to the `queries` folder in the appropriate locations.


### Cluster Configuration for TCP

BlazingSQL currently supports clusters using TCP. Please follow the instructions above, making sure to use the InfiniBand interface as the `INTERFACE` variable. Then, start the cluster with:

```bash
bash cluster_configuration/bsql-cluster-startup.sh TCP
```

### Additional useful parameters

BlazingSQL supports some useful parameters which you can set it up manually, it could achieve better performance in some cases. These parameters are by default defined in the `tpcx_bb/xbb_tools/cluster_startup.py` file.
BlazingSQL implementations of all queries are included. BlazingSQL currently supports communication via TCP. To run BlazingSQL queries, please follow the instructions above to create a cluster using `CLUSTER_MODE=TCP`.
For more context about this check it out [config options](https://docs.blazingdb.com/docs/config_options).
## Data Generation
Expand Down
42 changes: 20 additions & 22 deletions tpcx_bb/benchmark_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ def get_qnum_from_filename(name):
return m


dask_qnums = [str(i).zfill(2) for i in range(1, 31)]
# Not all queries are implemented with BSQL
bsql_query_files = sorted(glob.glob("./queries/q*/t*_sql.py"))
bsql_qnums = [get_qnum_from_filename(x.split("/")[-1]) for x in bsql_query_files]

def load_query(qnum, fn):
import importlib, types
loader = importlib.machinery.SourceFileLoader(qnum, fn)
mod = types.ModuleType(loader.name)
loader.exec_module(mod)
return mod.main


dask_qnums = [str(i).zfill(2) for i in range(1, 31)]
bsql_qnums = [str(i).zfill(2) for i in range(1, 31)]


if __name__ == "__main__":
from xbb_tools.cluster_startup import attach_to_cluster, import_query_libs
from xbb_tools.utils import run_query, tpcxbb_argparser
Expand All @@ -39,20 +39,18 @@ def load_query(qnum, fn):
for qnum in bsql_qnums
}


config = tpcxbb_argparser()
include_blazing = config.get("benchmark_runner_include_bsql")
client, bc = attach_to_cluster(config, create_blazing_context=include_blazing)

# Preload required libraries for queries on all workers
client.run(import_query_libs)

base_path = os.getcwd()

# Run Pure Dask Queries
if len(dask_qnums) > 0:
print("Pure Dask Queries")
for qnum, q_func in dask_queries.items():
# Run BSQL Queries
if include_blazing and len(bsql_qnums) > 0:
print("Blazing Queries")
for qnum, q_func in bsql_queries.items():
print(qnum)

qpath = f"{base_path}/queries/q{qnum}/"
Expand All @@ -63,16 +61,21 @@ def load_query(qnum, fn):
fp.write(qnum)

for r in range(N_REPEATS):
run_query(config=config, client=client, query_func=q_func)
run_query(
config=config,
client=client,
query_func=q_func,
blazing_context=bc,
)
client.run(gc.collect)
client.run_on_scheduler(gc.collect)
gc.collect()
time.sleep(3)

# Run BSQL Queries
if include_blazing and len(bsql_qnums) > 0:
print("Blazing Queries")
for qnum, q_func in bsql_queries.items():
# Run Pure Dask Queries
if len(dask_qnums) > 0:
print("Pure Dask Queries")
for qnum, q_func in dask_queries.items():
print(qnum)

qpath = f"{base_path}/queries/q{qnum}/"
Expand All @@ -83,12 +86,7 @@ def load_query(qnum, fn):
fp.write(qnum)

for r in range(N_REPEATS):
run_query(
config=config,
client=client,
query_func=q_func,
blazing_context=bc,
)
run_query(config=config, client=client, query_func=q_func)
client.run(gc.collect)
client.run_on_scheduler(gc.collect)
gc.collect()
Expand Down
9 changes: 4 additions & 5 deletions tpcx_bb/benchmark_runner/benchmark_config.yaml
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
# benchmark config yaml
### Please fill these accordingly
data_dir:
data_dir:
output_dir: ./
file_format: parquet
output_filetype: parquet
split_row_groups: False
repartition_small_table: True
benchmark_runner_include_bsql: False

cluster_host:
cluster_port: 8786
scheduler_file_path:
dask_profile: False
dask_dir: ./
32GB_workers: 16
num_workers: 16

verify_results: False
verify_dir:

sheet: TPCx-BB
tab: SF1000 Benchmarking Matrix
get_read_time: False
get_read_time: False
48 changes: 25 additions & 23 deletions tpcx_bb/cluster_configuration/cluster-startup.sh
Original file line number Diff line number Diff line change
@@ -1,61 +1,63 @@
#IB, NVLINK, or TCP
CLUSTER_MODE=$1
#NVLINK or TCP
ROLE=$1
CLUSTER_MODE="NVLINK"
USERNAME=$(whoami)

MAX_SYSTEM_MEMORY=$(free -m | awk '/^Mem:/{print $2}')M
DEVICE_MEMORY_LIMIT="25GB"
POOL_SIZE="30GB"

# Fill in your environment name and conda path on each node
TPCX_BB_HOME="/home/$USERNAME/shared/tpcx-bb"
TPCX_BB_HOME=$HOME/tpcx-bb
CONDA_ENV_NAME="rapids-tpcx-bb"
CONDA_ENV_PATH="/home/$USERNAME/conda/etc/profile.d/conda.sh"

# TODO: Unify interface/IP setting/getting for cluster startup
# and scheduler file
INTERFACE="ib0"

# TODO: Remove hard-coding of scheduler
SCHEDULER=$(hostname)
SCHEDULER_FILE=$TPCX_BB_HOME/tpcx_bb/cluster_configuration/example-cluster-scheduler.json
LOGDIR="/tmp/tpcx-bb-dask-logs/"
WORKER_DIR="/tmp/tpcx-bb-dask-workers/"
# Used for writing scheduler file and logs to shared storage
LOCAL_DIRECTORY=$HOME/dask-local-directory
SCHEDULER_FILE=$LOCAL_DIRECTORY/scheduler.json
LOGDIR="$LOCAL_DIRECTORY/logs"
WORKER_DIR=/tmp/$USERNAME/tpcx-bb-dask-workers/

# Purge Dask worker and log directories
rm -rf $LOGDIR/*
mkdir -p $LOGDIR
rm -rf $WORKER_DIR/*
mkdir -p $WORKER_DIR
if [ "$ROLE" = "SCHEDULER" ]; then
rm -rf $LOGDIR/*
mkdir -p $LOGDIR
rm -rf $WORKER_DIR/*
mkdir -p $WORKER_DIR
fi

# Purge Dask config directories
rm -rf ~/.config/dask

# Activate conda environment
# Activate conda environment and install the local library
source $CONDA_ENV_PATH
conda activate $CONDA_ENV_NAME

cd $TPCX_BB_HOME/tpcx_bb
python -m pip install .

# Dask/distributed configuration
export DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT="100s"
export DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP="600s"
export DASK_DISTRIBUTED__COMM__RETRY__DELAY__MIN="1s"
export DASK_DISTRIBUTED__COMM__RETRY__DELAY__MAX="60s"

# Select an interface appropriate for your cluster or machine.
INTERFACE="ib0"

# Setup scheduler
if [ "$HOSTNAME" = $SCHEDULER ]; then
if [ "$ROLE" = "SCHEDULER" ]; then
if [ "$CLUSTER_MODE" = "NVLINK" ]; then
CUDA_VISIBLE_DEVICES='0' DASK_UCX__CUDA_COPY=True DASK_UCX__TCP=True DASK_UCX__NVLINK=True DASK_UCX__INFINIBAND=False DASK_UCX__RDMACM=False nohup dask-scheduler --dashboard-address 8787 --interface $INTERFACE --protocol ucx > $LOGDIR/scheduler.log 2>&1 &
CUDA_VISIBLE_DEVICES='0' DASK_UCX__CUDA_COPY=True DASK_UCX__TCP=True DASK_UCX__NVLINK=True DASK_UCX__INFINIBAND=False DASK_UCX__RDMACM=False nohup dask-scheduler --dashboard-address 8787 --interface $INTERFACE --protocol ucx --scheduler-file $SCHEDULER_FILE > $LOGDIR/scheduler.log 2>&1 &
fi

if [ "$CLUSTER_MODE" = "TCP" ]; then
CUDA_VISIBLE_DEVICES='0' nohup dask-scheduler --dashboard-address 8787 --interface $INTERFACE --protocol tcp > $LOGDIR/scheduler.log 2>&1 &
CUDA_VISIBLE_DEVICES='0' nohup dask-scheduler --dashboard-address 8787 --interface $INTERFACE --protocol tcp --scheduler-file $SCHEDULER_FILE > $LOGDIR/scheduler.log 2>&1 &
fi
fi


# Setup workers
if [ "$CLUSTER_MODE" = "NVLINK" ]; then
dask-cuda-worker --device-memory-limit $DEVICE_MEMORY_LIMIT --local-directory $WORKER_DIR --rmm-pool-size=$POOL_SIZE --memory-limit=$MAX_SYSTEM_MEMORY --enable-tcp-over-ucx --enable-nvlink --disable-infiniband --scheduler-file $SCHEDULER_FILE >> $LOGDIR/worker.log 2>&1 &
dask-cuda-worker --device-memory-limit $DEVICE_MEMORY_LIMIT --local-directory $WORKER_DIR --rmm-pool-size=$POOL_SIZE --memory-limit=$MAX_SYSTEM_MEMORY --enable-tcp-over-ucx --enable-nvlink --disable-infiniband --scheduler-file $SCHEDULER_FILE >> $LOGDIR/worker.log 2>&1 &
fi

if [ "$CLUSTER_MODE" = "TCP" ]; then
Expand Down
8 changes: 0 additions & 8 deletions tpcx_bb/cluster_configuration/example-cluster-scheduler.json

This file was deleted.

Loading

0 comments on commit 474b00f

Please sign in to comment.