From 8485ede8e47b5b253764263b48d348db8e271c1d Mon Sep 17 00:00:00 2001 From: Rong Ou Date: Wed, 26 Apr 2023 15:38:13 -0700 Subject: [PATCH] Add demo for vertical federated learning --- demo/nvflare/README.md | 61 ++---------- demo/nvflare/config/config_fed_client.json | 23 ----- demo/nvflare/config/config_fed_server.json | 22 ----- demo/nvflare/horizontal/README.md | 63 ++++++++++++ .../{ => horizontal}/custom/controller.py | 0 .../{ => horizontal}/custom/trainer.py | 0 demo/nvflare/{ => horizontal}/prepare_data.sh | 4 +- demo/nvflare/vertical/README.md | 59 +++++++++++ demo/nvflare/vertical/custom/controller.py | 68 +++++++++++++ demo/nvflare/vertical/custom/trainer.py | 97 +++++++++++++++++++ demo/nvflare/vertical/prepare_data.sh | 65 +++++++++++++ 11 files changed, 360 insertions(+), 102 deletions(-) delete mode 100755 demo/nvflare/config/config_fed_client.json delete mode 100755 demo/nvflare/config/config_fed_server.json create mode 100644 demo/nvflare/horizontal/README.md rename demo/nvflare/{ => horizontal}/custom/controller.py (100%) rename demo/nvflare/{ => horizontal}/custom/trainer.py (100%) rename demo/nvflare/{ => horizontal}/prepare_data.sh (88%) create mode 100644 demo/nvflare/vertical/README.md create mode 100644 demo/nvflare/vertical/custom/controller.py create mode 100644 demo/nvflare/vertical/custom/trainer.py create mode 100755 demo/nvflare/vertical/prepare_data.sh diff --git a/demo/nvflare/README.md b/demo/nvflare/README.md index 328dd7212a98..93f38820828d 100644 --- a/demo/nvflare/README.md +++ b/demo/nvflare/README.md @@ -3,61 +3,12 @@ This directory contains a demo of Federated Learning using [NVFlare](https://nvidia.github.io/NVFlare/). -## Training with CPU only +## Horizontal Federated XGBoost -To run the demo, first build XGBoost with the federated learning plugin enabled (see the -[README](../../plugin/federated/README.md)). +For horizontal federated learning using XGBoost (data is split row-wise), check out the `horizontal` directory +(see the [README](horizontal/README.md)). -Install NVFlare (note that currently NVFlare only supports Python 3.8): -```shell -pip install nvflare -``` +## Vertical Federated XGBoost -Prepare the data: -```shell -./prepare_data.sh -``` - -Start the NVFlare federated server: -```shell -/tmp/nvflare/poc/server/startup/start.sh -``` - -In another terminal, start the first worker: -```shell -/tmp/nvflare/poc/site-1/startup/start.sh -``` - -And the second worker: -```shell -/tmp/nvflare/poc/site-2/startup/start.sh -``` - -Then start the admin CLI: -```shell -/tmp/nvflare/poc/admin/startup/fl_admin.sh -``` - -In the admin CLI, run the following command: -```shell -submit_job hello-xgboost -``` - -Once the training finishes, the model file should be written into -`/tmp/nvlfare/poc/site-1/run_1/test.model.json` and `/tmp/nvflare/poc/site-2/run_1/test.model.json` -respectively. - -Finally, shutdown everything from the admin CLI, using `admin` as password: -```shell -shutdown client -shutdown server -``` - -## Training with GPUs - -To demo with Federated Learning using GPUs, make sure your machine has at least 2 GPUs. -Build XGBoost with the federated learning plugin enabled along with CUDA, but with NCCL -turned off (see the [README](../../plugin/federated/README.md)). - -Modify `config/config_fed_client.json` and set `use_gpus` to `true`, then repeat the steps -above. +For vertical federated learning using XGBoost (data is split column-wise), check out the `vertical` directory +(see the [README](vertical/README.md)). diff --git a/demo/nvflare/config/config_fed_client.json b/demo/nvflare/config/config_fed_client.json deleted file mode 100755 index c15a1997c1c8..000000000000 --- a/demo/nvflare/config/config_fed_client.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "format_version": 2, - "executors": [ - { - "tasks": [ - "train" - ], - "executor": { - "path": "trainer.XGBoostTrainer", - "args": { - "server_address": "localhost:9091", - "world_size": 2, - "server_cert_path": "server-cert.pem", - "client_key_path": "client-key.pem", - "client_cert_path": "client-cert.pem", - "use_gpus": "false" - } - } - } - ], - "task_result_filters": [], - "task_data_filters": [] -} diff --git a/demo/nvflare/config/config_fed_server.json b/demo/nvflare/config/config_fed_server.json deleted file mode 100755 index 32993b65215f..000000000000 --- a/demo/nvflare/config/config_fed_server.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "format_version": 2, - "server": { - "heart_beat_timeout": 600 - }, - "task_data_filters": [], - "task_result_filters": [], - "workflows": [ - { - "id": "server_workflow", - "path": "controller.XGBoostController", - "args": { - "port": 9091, - "world_size": 2, - "server_key_path": "server-key.pem", - "server_cert_path": "server-cert.pem", - "client_cert_path": "client-cert.pem" - } - } - ], - "components": [] -} diff --git a/demo/nvflare/horizontal/README.md b/demo/nvflare/horizontal/README.md new file mode 100644 index 000000000000..93ea3794c349 --- /dev/null +++ b/demo/nvflare/horizontal/README.md @@ -0,0 +1,63 @@ +# Experimental Support of Horizontal Federated XGBoost using NVFlare + +This directory contains a demo of Horizontal Federated Learning using +[NVFlare](https://nvidia.github.io/NVFlare/). + +## Training with CPU only + +To run the demo, first build XGBoost with the federated learning plugin enabled (see the +[README](../../plugin/federated/README.md)). + +Install NVFlare (note that currently NVFlare only supports Python 3.8): +```shell +pip install nvflare +``` + +Prepare the data: +```shell +./prepare_data.sh +``` + +Start the NVFlare federated server: +```shell +/tmp/nvflare/poc/server/startup/start.sh +``` + +In another terminal, start the first worker: +```shell +/tmp/nvflare/poc/site-1/startup/start.sh +``` + +And the second worker: +```shell +/tmp/nvflare/poc/site-2/startup/start.sh +``` + +Then start the admin CLI: +```shell +/tmp/nvflare/poc/admin/startup/fl_admin.sh +``` + +In the admin CLI, run the following command: +```shell +submit_job horizontal-xgboost +``` + +Once the training finishes, the model file should be written into +`/tmp/nvlfare/poc/site-1/run_1/test.model.json` and `/tmp/nvflare/poc/site-2/run_1/test.model.json` +respectively. + +Finally, shutdown everything from the admin CLI, using `admin` as password: +```shell +shutdown client +shutdown server +``` + +## Training with GPUs + +To demo with Federated Learning using GPUs, make sure your machine has at least 2 GPUs. +Build XGBoost with the federated learning plugin enabled along with CUDA, but with NCCL +turned off (see the [README](../../plugin/federated/README.md)). + +Modify `config/config_fed_client.json` and set `use_gpus` to `true`, then repeat the steps +above. diff --git a/demo/nvflare/custom/controller.py b/demo/nvflare/horizontal/custom/controller.py similarity index 100% rename from demo/nvflare/custom/controller.py rename to demo/nvflare/horizontal/custom/controller.py diff --git a/demo/nvflare/custom/trainer.py b/demo/nvflare/horizontal/custom/trainer.py similarity index 100% rename from demo/nvflare/custom/trainer.py rename to demo/nvflare/horizontal/custom/trainer.py diff --git a/demo/nvflare/prepare_data.sh b/demo/nvflare/horizontal/prepare_data.sh similarity index 88% rename from demo/nvflare/prepare_data.sh rename to demo/nvflare/horizontal/prepare_data.sh index 1c88c65fec04..6a32008f8116 100755 --- a/demo/nvflare/prepare_data.sh +++ b/demo/nvflare/horizontal/prepare_data.sh @@ -15,8 +15,8 @@ split -n l/${world_size} --numeric-suffixes=1 -a 1 ../data/agaricus.txt.train ag split -n l/${world_size} --numeric-suffixes=1 -a 1 ../data/agaricus.txt.test agaricus.txt.test-site- nvflare poc -n 2 --prepare -mkdir -p /tmp/nvflare/poc/admin/transfer/hello-xgboost -cp -fr config custom /tmp/nvflare/poc/admin/transfer/hello-xgboost +mkdir -p /tmp/nvflare/poc/admin/transfer/horizontal-xgboost +cp -fr config custom /tmp/nvflare/poc/admin/transfer/horizontal-xgboost cp server-*.pem client-cert.pem /tmp/nvflare/poc/server/ for id in $(eval echo "{1..$world_size}"); do cp server-cert.pem client-*.pem /tmp/nvflare/poc/site-"$id"/ diff --git a/demo/nvflare/vertical/README.md b/demo/nvflare/vertical/README.md new file mode 100644 index 000000000000..83c3111b64c6 --- /dev/null +++ b/demo/nvflare/vertical/README.md @@ -0,0 +1,59 @@ +# Experimental Support of Vertical Federated XGBoost using NVFlare + +This directory contains a demo of Vertical Federated Learning using +[NVFlare](https://nvidia.github.io/NVFlare/). + +## Training with CPU only + +To run the demo, first build XGBoost with the federated learning plugin enabled (see the +[README](../../plugin/federated/README.md)). + +Install NVFlare (note that currently NVFlare only supports Python 3.8): +```shell +pip install nvflare +``` + +Prepare the data (note that this step will download the HIGGS dataset, which is 2.6GB compressed, and 7.5GB +uncompressed, so make sure you have enough disk space and are on a fast internet connection): +```shell +./prepare_data.sh +``` + +Start the NVFlare federated server: +```shell +/tmp/nvflare/poc/server/startup/start.sh +``` + +In another terminal, start the first worker: +```shell +/tmp/nvflare/poc/site-1/startup/start.sh +``` + +And the second worker: +```shell +/tmp/nvflare/poc/site-2/startup/start.sh +``` + +Then start the admin CLI: +```shell +/tmp/nvflare/poc/admin/startup/fl_admin.sh +``` + +In the admin CLI, run the following command: +```shell +submit_job vertical-xgboost +``` + +Once the training finishes, the model file should be written into +`/tmp/nvlfare/poc/site-1/run_1/test.model.json` and `/tmp/nvflare/poc/site-2/run_1/test.model.json` +respectively. + +Finally, shutdown everything from the admin CLI, using `admin` as password: +```shell +shutdown client +shutdown server +``` + +## Training with GPUs + +Currently GPUs are not yet supported by vertical federated XGBoost. diff --git a/demo/nvflare/vertical/custom/controller.py b/demo/nvflare/vertical/custom/controller.py new file mode 100644 index 000000000000..dd3e39f46cf3 --- /dev/null +++ b/demo/nvflare/vertical/custom/controller.py @@ -0,0 +1,68 @@ +""" +Example of training controller with NVFlare +=========================================== +""" +import multiprocessing + +from nvflare.apis.client import Client +from nvflare.apis.fl_context import FLContext +from nvflare.apis.impl.controller import Controller, Task +from nvflare.apis.shareable import Shareable +from nvflare.apis.signal import Signal +from trainer import SupportedTasks + +import xgboost.federated + + +class XGBoostController(Controller): + def __init__(self, port: int, world_size: int, server_key_path: str, + server_cert_path: str, client_cert_path: str): + """Controller for federated XGBoost. + + Args: + port: the port for the gRPC server to listen on. + world_size: the number of sites. + server_key_path: the path to the server key file. + server_cert_path: the path to the server certificate file. + client_cert_path: the path to the client certificate file. + """ + super().__init__() + self._port = port + self._world_size = world_size + self._server_key_path = server_key_path + self._server_cert_path = server_cert_path + self._client_cert_path = client_cert_path + self._server = None + + def start_controller(self, fl_ctx: FLContext): + self._server = multiprocessing.Process( + target=xgboost.federated.run_federated_server, + args=(self._port, self._world_size, self._server_key_path, + self._server_cert_path, self._client_cert_path)) + self._server.start() + + def stop_controller(self, fl_ctx: FLContext): + if self._server: + self._server.terminate() + + def process_result_of_unknown_task(self, client: Client, task_name: str, + client_task_id: str, result: Shareable, + fl_ctx: FLContext): + self.log_warning(fl_ctx, f"Unknown task: {task_name} from client {client.name}.") + + def control_flow(self, abort_signal: Signal, fl_ctx: FLContext): + self.log_info(fl_ctx, "XGBoost training control flow started.") + if abort_signal.triggered: + return + task = Task(name=SupportedTasks.TRAIN, data=Shareable()) + self.broadcast_and_wait( + task=task, + min_responses=self._world_size, + fl_ctx=fl_ctx, + wait_time_after_min_received=1, + abort_signal=abort_signal, + ) + if abort_signal.triggered: + return + + self.log_info(fl_ctx, "XGBoost training control flow finished.") diff --git a/demo/nvflare/vertical/custom/trainer.py b/demo/nvflare/vertical/custom/trainer.py new file mode 100644 index 000000000000..cd420129c9d8 --- /dev/null +++ b/demo/nvflare/vertical/custom/trainer.py @@ -0,0 +1,97 @@ +import os + +from nvflare.apis.executor import Executor +from nvflare.apis.fl_constant import FLContextKey, ReturnCode +from nvflare.apis.fl_context import FLContext +from nvflare.apis.shareable import Shareable, make_reply +from nvflare.apis.signal import Signal + +import xgboost as xgb +from xgboost import callback + + +class SupportedTasks(object): + TRAIN = "train" + + +class XGBoostTrainer(Executor): + def __init__(self, server_address: str, world_size: int, server_cert_path: str, + client_key_path: str, client_cert_path: str): + """Trainer for federated XGBoost. + + Args: + server_address: address for the gRPC server to connect to. + world_size: the number of sites. + server_cert_path: the path to the server certificate file. + client_key_path: the path to the client key file. + client_cert_path: the path to the client certificate file. + """ + super().__init__() + self._server_address = server_address + self._world_size = world_size + self._server_cert_path = server_cert_path + self._client_key_path = client_key_path + self._client_cert_path = client_cert_path + + def execute(self, task_name: str, shareable: Shareable, fl_ctx: FLContext, + abort_signal: Signal) -> Shareable: + self.log_info(fl_ctx, f"Executing {task_name}") + try: + if task_name == SupportedTasks.TRAIN: + self._do_training(fl_ctx) + return make_reply(ReturnCode.OK) + else: + self.log_error(fl_ctx, f"{task_name} is not a supported task.") + return make_reply(ReturnCode.TASK_UNKNOWN) + except BaseException as e: + self.log_exception(fl_ctx, + f"Task {task_name} failed. Exception: {e.__str__()}") + return make_reply(ReturnCode.EXECUTION_EXCEPTION) + + def _do_training(self, fl_ctx: FLContext): + client_name = fl_ctx.get_prop(FLContextKey.CLIENT_NAME) + rank = int(client_name.split('-')[1]) - 1 + communicator_env = { + 'xgboost_communicator': 'federated', + 'federated_server_address': self._server_address, + 'federated_world_size': self._world_size, + 'federated_rank': rank, + 'federated_server_cert': self._server_cert_path, + 'federated_client_key': self._client_key_path, + 'federated_client_cert': self._client_cert_path + } + with xgb.collective.CommunicatorContext(**communicator_env): + # Load file, file will not be sharded in federated mode. + if rank == 0: + label = '&label_column=0' + else: + label = '' + dtrain = xgb.DMatrix(f'higgs.train.csv?format=csv{label}', data_split_mode=1) + dtest = xgb.DMatrix(f'higgs.test.csv?format=csv{label}', data_split_mode=1) + + # specify parameters via map + param = { + 'validate_parameters': True, + 'eta': 0.1, + 'gamma': 1.0, + 'max_depth': 8, + 'min_child_weight': 100, + 'tree_method': 'approx', + 'grow_policy': 'depthwise', + 'objective': 'binary:logistic', + 'eval_metric': 'auc', + } + + # specify validations set to watch performance + watchlist = [(dtest, "eval"), (dtrain, "train")] + # number of boosting rounds + num_round = 10 + + bst = xgb.train(param, dtrain, num_round, evals=watchlist, early_stopping_rounds=2) + + # Save the model. + workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT) + run_number = fl_ctx.get_prop(FLContextKey.CURRENT_RUN) + run_dir = workspace.get_run_dir(run_number) + bst.save_model(os.path.join(run_dir, "higgs.model.federated.vertical.json")) + xgb.collective.communicator_print("Finished training\n") diff --git a/demo/nvflare/vertical/prepare_data.sh b/demo/nvflare/vertical/prepare_data.sh new file mode 100755 index 000000000000..86ec3dfa2c17 --- /dev/null +++ b/demo/nvflare/vertical/prepare_data.sh @@ -0,0 +1,65 @@ +#!/bin/bash + +set -e + +rm -fr ./*.pem /tmp/nvflare/poc + +world_size=2 + +# Generate server and client certificates. +openssl req -x509 -newkey rsa:2048 -days 7 -nodes -keyout server-key.pem -out server-cert.pem -subj "/C=US/CN=localhost" +openssl req -x509 -newkey rsa:2048 -days 7 -nodes -keyout client-key.pem -out client-cert.pem -subj "/C=US/CN=localhost" + +# Download HIGGS dataset. +if [ -f "HIGGS.csv" ]; then + echo "HIGGS.csv exists, skipping download." +else + echo "Downloading HIGGS dataset." + wget https://archive.ics.uci.edu/ml/machine-learning-databases/00280/HIGGS.csv.gz + gunzip HIGGS.csv.gz +fi + +# Split into train/test. +if [[ -f higgs.train.csv && -f higgs.test.csv ]]; then + echo "higgs.train.csv and higgs.test.csv exist, skipping split." +else + echo "Splitting HIGGS dataset into train/test." + head -n 10450000 HIGGS.csv > higgs.train.csv + tail -n 550000 HIGGS.csv > higgs.test.csv +fi + +# Split train and test files by column to simulate a federated environment. +site_files=(higgs.{train,test}.csv-site-*) +if [ ${#site_files[@]} -eq $((world_size*2)) ]; then + echo "Site files exist, skipping split." +else + echo "Splitting train/test into site files." + total_cols=28 # plus label + cols=$((total_cols/world_size)) + echo "Columns per site: $cols" + for (( site=1; site<=world_size; site++ )); do + if (( site == 1 )); then + start=$((cols*(site-1)+1)) + else + start=$((cols*(site-1)+2)) + fi + if (( site == world_size )); then + end=$((total_cols+1)) + else + end=$((cols*site+1)) + fi + echo "Site $site, columns $start-$end" + cut -d, -f${start}-${end} higgs.train.csv > higgs.train.csv-site-"${site}" + cut -d, -f${start}-${end} higgs.test.csv > higgs.test.csv-site-"${site}" + done +fi + +nvflare poc -n 2 --prepare +mkdir -p /tmp/nvflare/poc/admin/transfer/vertical-xgboost +cp -fr config custom /tmp/nvflare/poc/admin/transfer/vertical-xgboost +cp server-*.pem client-cert.pem /tmp/nvflare/poc/server/ +for (( site=1; site<=world_size; site++ )); do + cp server-cert.pem client-*.pem /tmp/nvflare/poc/site-"${site}"/ + ln -s "${PWD}"/higgs.train.csv-site-"${site}" /tmp/nvflare/poc/site-"${site}"/higgs.train.csv + ln -s "${PWD}"/higgs.test.csv-site-"${site}" /tmp/nvflare/poc/site-"${site}"/higgs.test.csv +done