Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add distributed mode for xgb algo #471

Merged
merged 3 commits into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions federatedscope/vertical_fl/xgb_base/worker/Test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,8 @@ def test_for_node(self, tree_num, node_num):
if node_num >= 2**self.client.max_tree_depth - 1:
if tree_num + 1 < self.client.num_of_trees:
# TODO: add feedback during training
self.client.state += 1
logger.info(
f'----------- Starting a new training round (Round '
f'#{self.client.state}) -------------')
logger.info(f'----------- Building a new tree (Tree '
f'#{tree_num + 1}) -------------')
# build the next tree
self.client.fs.compute_for_root(tree_num + 1)

Expand All @@ -67,8 +65,9 @@ def test_for_node(self, tree_num, node_num):
each for each in list(
self.client.comm_manager.neighbors.keys())
if each != self.client.server_id
and each != self.client.ID
],
content=None))
content='None'))
self.client.comm_manager.send(
Message(msg_type='feature_importance',
sender=self.client.ID,
Expand Down
16 changes: 14 additions & 2 deletions federatedscope/vertical_fl/xgb_base/worker/XGBClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def __init__(self,
self.num_of_trees = None
self.max_tree_depth = None

self.federate_mode = config.federate.mode

self.bin_num = config.train.optimizer.bin_num
self.batch_size = config.data.batch_size

Expand Down Expand Up @@ -102,6 +104,7 @@ def __init__(self,
self.callback_func_for_compute_next_node)
self.register_handlers('send_feature_importance',
self.callback_func_for_send_feature_importance)
self.register_handlers('finish', self.callback_func_for_finish)

# save the order of values in each feature
def order_feature(self, data):
Expand All @@ -125,15 +128,21 @@ def sample_data(self, index=None):
def callback_func_for_model_para(self, message: Message):
self.lambda_, self.gamma, self.num_of_trees, self.max_tree_depth \
= message.content

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add some explanations here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if self.federate_mode == 'distributed':
self.comm_manager.add_neighbors(neighbor_id=self.ID,
address={
'host': self.comm_manager.host,
'port': self.comm_manager.port
})
self.tree_list = [
Tree(self.max_tree_depth).tree for _ in range(self.num_of_trees)
]
if self.own_label:
self.batch_index, self.x, self.y = self.sample_data()
# init y_hat
self.y_hat = np.random.uniform(low=0.0, high=1.0, size=len(self.y))
# self.y_hat = np.zeros(len(self.y))
logger.info(f'----------- Starting a new training round (Round '
logger.info(f'---------- Building a new tree (Tree '
f'#{self.state}) -------------')
self.comm_manager.send(
Message(
Expand Down Expand Up @@ -217,3 +226,6 @@ def callback_func_for_send_feature_importance(self, message: Message):
state=self.state,
receiver=self.server_id,
content=self.feature_importance))

def callback_func_for_finish(self, message: Message):
pass
10 changes: 10 additions & 0 deletions federatedscope/vertical_fl/xgb_base/worker/XGBServer.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,17 @@ def callback_func_for_feature_importance(self, message: Message):
rnd=self.tree_num,
role='Server #',
forms=self._cfg.eval.report)
formatted_logs['feature_importance'] = self.feature_importance_dict
logger.info(formatted_logs)
self.comm_manager.send(
Message(msg_type='finish',
sender=self.ID,
receiver=list(
self.comm_manager.get_neighbors().keys()),
state=self.state,
content='None'))
# jump out running
self.state = self.total_round_num + 1

def callback_func_for_test_result(self, message: Message):
self.tree_num, self.metrics = message.content
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use_gpu: False
device: 0
early_stop:
patience: 5
seed: 12345
federate:
client_num: 2
mode: 'distributed'
make_global_eval: False
online_aggr: False
total_round_num: 20
distribute:
use: True
server_host: '127.0.0.1'
server_port: 50051
client_host: '127.0.0.1'
client_port: 50052
role: 'client'
data_idx: 1
data:
root: data/
type: credit
splits: [0.8, 0.2]
dataloader:
type: raw
batch_size: 2000
model:
type: lr
train:
optimizer:
bin_num: 100
lambda_: 0.1
gamma: 0
num_of_trees: 10
max_tree_depth: 3
xgb_base:
use: True
use_bin: True
dims: [5, 10]
criterion:
type: CrossEntropyLoss
trainer:
type: none
eval:
freq: 3
best_res_update_round_wise_key: test_loss
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use_gpu: False
device: 0
early_stop:
patience: 5
seed: 12345
federate:
client_num: 2
mode: 'distributed'
make_global_eval: False
online_aggr: False
total_round_num: 20
distribute:
use: True
server_host: '127.0.0.1'
server_port: 50051
client_host: '127.0.0.1'
client_port: 50053
role: 'client'
data_idx: 2
data:
root: data/
type: credit
splits: [0.8, 0.2]
dataloader:
type: raw
batch_size: 2000
model:
type: lr
train:
optimizer:
bin_num: 100
lambda_: 0.1
gamma: 0
num_of_trees: 10
max_tree_depth: 3
xgb_base:
use: True
use_bin: True
dims: [5, 10]
criterion:
type: CrossEntropyLoss
trainer:
type: none
eval:
freq: 3
best_res_update_round_wise_key: test_loss
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use_gpu: False
device: 0
early_stop:
patience: 5
seed: 12345
federate:
client_num: 2
mode: 'distributed'
make_global_eval: False
online_aggr: False
total_round_num: 20
distribute:
use: True
server_host: '127.0.0.1'
server_port: 50051
role: 'server'
data_idx: 0
data:
root: data/
type: credit
splits: [0.8, 0.2]
dataloader:
type: raw
batch_size: 2000
model:
type: lr
train:
optimizer:
bin_num: 100
lambda_: 0.1
gamma: 0
num_of_trees: 10
max_tree_depth: 3
xgb_base:
use: True
use_bin: True
dims: [5, 10]
criterion:
type: CrossEntropyLoss
trainer:
type: none
eval:
freq: 3
best_res_update_round_wise_key: test_loss
18 changes: 18 additions & 0 deletions scripts/distributed_scripts/run_distributed_xgb.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
set -e

cd ..

echo "Test distributed mode with XGB..."

### server owns global test data
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it support the situation that server owns global test data?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So shall we remove the ''redundant'' code here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified.

# python federatedscope/main.py --cfg scripts/distributed_scripts/distributed_configs/distributed_server.yaml &
### server doesn't own data
python federatedscope/main.py --cfg scripts/distributed_scripts/distributed_configs/distributed_xgb_server.yaml &
sleep 2

# clients
python federatedscope/main.py --cfg scripts/distributed_scripts/distributed_configs/distributed_xgb_client_1.yaml &
sleep 2
python federatedscope/main.py --cfg scripts/distributed_scripts/distributed_configs/distributed_xgb_client_2.yaml &
sleep 2