diff --git a/.github/workflows/test_autotune.yml b/.github/workflows/test_autotune.yml new file mode 100644 index 000000000..209fe457e --- /dev/null +++ b/.github/workflows/test_autotune.yml @@ -0,0 +1,35 @@ +name: UnitTests for Autotune Module + +on: [push, pull_request] + +jobs: + run: + runs-on: ${{ matrix.os }} + timeout-minutes: 20 + strategy: + matrix: + os: [ubuntu-latest] + python-version: ['3.9'] + torch-version: ['1.10.1'] + torchvision-version: ['0.11.2'] + torchaudio-version: ['0.10.1'] + env: + OS: ${{ matrix.os }} + PYTHON: '3.9' + steps: + - uses: actions/checkout@master + - name: Setup Python ${{ matrix.python-version }} + uses: actions/setup-python@master + with: + python-version: ${{ matrix.python-version }} + - name: Install PyTorch ${{ matrix.torch-version }}+cpu + run: | + pip install numpy typing-extensions dataclasses + pip install torch==${{ matrix.torch-version}}+cpu torchvision==${{matrix.torchvision-version}}+cpu torchaudio==${{matrix.torchaudio-version}}+cpu -f https://download.pytorch.org/whl/torch_stable.html + - name: Install FS + run: | + pip install -e .[test,hpo] + - name: Test Autotune + run: | + python federatedscope/hpo.py --cfg federatedscope/autotune/baseline/fedhpo_vfl.yaml + [ $? -eq 1 ] && exit 1 || echo "Passed" \ No newline at end of file diff --git a/federatedscope/autotune/__init__.py b/federatedscope/autotune/__init__.py index 194971fd8..8316f0a23 100644 --- a/federatedscope/autotune/__init__.py +++ b/federatedscope/autotune/__init__.py @@ -2,8 +2,9 @@ from federatedscope.autotune.utils import parse_search_space, \ config2cmdargs, config2str from federatedscope.autotune.algos import get_scheduler +from federatedscope.autotune.run import run_scheduler __all__ = [ 'Continuous', 'Discrete', 'parse_search_space', 'config2cmdargs', - 'config2str', 'get_scheduler' + 'config2str', 'get_scheduler', 'run_scheduler' ] diff --git a/federatedscope/autotune/algos.py b/federatedscope/autotune/algos.py index a21f2ac95..cf09c72ba 100644 --- a/federatedscope/autotune/algos.py +++ b/federatedscope/autotune/algos.py @@ -1,7 +1,6 @@ import os import logging from copy import deepcopy -from contextlib import redirect_stdout import threading import math @@ -15,7 +14,7 @@ get_server_cls from federatedscope.core.auxiliaries.runner_builder import get_runner from federatedscope.autotune.utils import parse_search_space, \ - config2cmdargs, config2str, summarize_hpo_results + config2cmdargs, config2str, summarize_hpo_results, log2wandb logger = logging.getLogger(__name__) @@ -25,14 +24,13 @@ def make_trial(trial_cfg, client_cfgs=None): data, modified_config = get_data(config=trial_cfg.clone()) trial_cfg.merge_from_other_cfg(modified_config) trial_cfg.freeze() - Fed_runner = get_runner(data=data, + fed_runner = get_runner(data=data, server_class=get_server_cls(trial_cfg), client_class=get_client_cls(trial_cfg), config=trial_cfg.clone(), - client_config=client_cfgs) - results = Fed_runner.run() - key1, key2 = trial_cfg.hpo.metric.split('.') - return results[key1][key2] + client_configs=client_cfgs) + results = fed_runner.run() + return results class TrialExecutor(threading.Thread): @@ -57,7 +55,7 @@ def run(self): server_class=get_server_cls(self._trial_cfg), client_class=get_client_cls(self._trial_cfg), config=self._trial_cfg.clone(), - client_config=self._client_cfgs) + client_configs=self._client_cfgs) results = Fed_runner.run() key1, key2 = self._trial_cfg.hpo.metric.split('.') self._returns['perf'] = results[key1][key2] @@ -177,6 +175,7 @@ def _evaluate(self, configs): completed_trial_results = thread_results[i] cfg_idx = completed_trial_results['cfg_idx'] perfs[cfg_idx] = completed_trial_results['perf'] + # TODO: Support num_worker in WandB logger.info( "Evaluate the {}-th config {} and get performance {}". format(cfg_idx, configs[cfg_idx], perfs[cfg_idx])) @@ -187,21 +186,24 @@ def _evaluate(self, configs): for i, config in enumerate(configs): trial_cfg = self._cfg.clone() trial_cfg.merge_from_list(config2cmdargs(config)) - perfs[i] = make_trial(trial_cfg, self._client_cfgs) + results = make_trial(trial_cfg, self._client_cfgs) + key1, key2 = trial_cfg.hpo.metric.split('.') + perfs[i] = results[key1][key2] logger.info( "Evaluate the {}-th config {} and get performance {}". format(i, config, perfs[i])) - + if self._cfg.wandb.use: + log2wandb(i, config, results, trial_cfg) return perfs def optimize(self): perfs = self._evaluate(self._init_configs) - results = summarize_hpo_results(self._init_configs, perfs, white_list=set( self._search_space.keys()), - desc=self._cfg.hpo.larger_better) + desc=self._cfg.hpo.larger_better, + use_wandb=self._cfg.wandb.use) logger.info( "========================== HPO Final ==========================") logger.info("\n{}".format(results)) @@ -263,7 +265,8 @@ def optimize(self): current_configs, current_perfs, white_list=set(self._search_space.keys()), - desc=self._cfg.hpo.larger_better) + desc=self._cfg.hpo.larger_better, + use_wandb=self._cfg.wandb.use) self._stage += 1 logger.info( "========================== Stage{} ==========================" diff --git a/federatedscope/autotune/baseline/fedhpo_vfl.yaml b/federatedscope/autotune/baseline/fedhpo_vfl.yaml new file mode 100644 index 000000000..27f7b529b --- /dev/null +++ b/federatedscope/autotune/baseline/fedhpo_vfl.yaml @@ -0,0 +1,54 @@ +use_gpu: False +device: 0 +backend: torch +outdir: vFL_adult +federate: + mode: standalone + client_num: 2 + total_round_num: 30 +model: + type: lr + use_bias: False +train: + optimizer: + lr: 0.5 + bin_num: 100 + lambda_: 0.1 + gamma: 0 + num_of_trees: 5 + max_tree_depth: 3 +xgb_base: + use: True + use_bin: False +data: + root: data/ + type: adult + splits: [1.0, 0.0] + args: [{normalization: False, standardization: True}] +feat_engr: + scenario: vfl +dataloader: + type: raw + batch_size: 50 +criterion: + type: CrossEntropyLoss +trainer: + type: none +vertical_dims: [7, 14] +vertical: + use: False + key_size: 256 +eval: + freq: 5 + best_res_update_round_wise_key: test_loss +hpo: + scheduler: sha + num_workers: 0 + init_cand_num: 9 + ss: 'federatedscope/autotune/baseline/vfl_ss.yaml' + sha: + budgets: [ 3, 9 ] + elim_rate: 3 + iter: 1 + metric: 'server_global_eval.test_loss' + working_folder: sha \ No newline at end of file diff --git a/federatedscope/autotune/baseline/vfl_ss.yaml b/federatedscope/autotune/baseline/vfl_ss.yaml new file mode 100644 index 000000000..ad10db3ad --- /dev/null +++ b/federatedscope/autotune/baseline/vfl_ss.yaml @@ -0,0 +1,25 @@ +train.optimizer.lr: + type: float + lower: 0.01 + upper: 1.0 + log: True +train.optimizer.num_of_trees: + type: int + lower: 3 + upper: 5 +vertical.use: + type: cate + choices: [True, False] +feat_engr.type: + type: cate + choices: ['', 'min_max_norm', 'instance_norm', 'standardization', 'log_transform', 'uniform_binning', 'quantile_binning', 'correlation_filter', 'variance_filter', 'iv_filter'] +condition1: + type: equal + child: train.optimizer.num_of_trees + parent: vertical.use + value: False +condition2: + type: equal + child: train.optimizer.lr + parent: vertical.use + value: True diff --git a/federatedscope/autotune/hpbandster.py b/federatedscope/autotune/hpbandster.py index 48f6513f7..a0bda9e8b 100644 --- a/federatedscope/autotune/hpbandster.py +++ b/federatedscope/autotune/hpbandster.py @@ -9,7 +9,7 @@ from hpbandster.core.worker import Worker from hpbandster.optimizers import BOHB, HyperBand, RandomSearch -from federatedscope.autotune.utils import eval_in_fs +from federatedscope.autotune.utils import eval_in_fs, log2wandb logging.basicConfig(level=logging.WARNING) logger = logging.getLogger(__name__) @@ -63,12 +63,15 @@ def __init__(self, super(MyWorker, self).__init__(**kwargs) self.sleep_interval = sleep_interval self.cfg = cfg + self.client_cfgs = client_cfgs self._ss = ss self._init_configs = [] self._perfs = [] def compute(self, config, budget, **kwargs): - res = eval_in_fs(self.cfg, config, int(budget), self.client_cfgs) + results = eval_in_fs(self.cfg, config, int(budget), self.client_cfgs) + key1, key2 = self.cfg.hpo.metric.split('.') + res = results[key1][key2] config = dict(config) config['federate.total_round_num'] = budget self._init_configs.append(config) @@ -76,6 +79,8 @@ def compute(self, config, budget, **kwargs): time.sleep(self.sleep_interval) logger.info(f'Evaluate the {len(self._perfs)-1}-th config ' f'{config}, and get performance {res}') + if self.cfg.wandb.use: + log2wandb(len(self._perfs) - 1, config, results, self.cfg) return {'loss': float(res), 'info': res} def summarize(self): @@ -83,7 +88,8 @@ def summarize(self): results = summarize_hpo_results(self._init_configs, self._perfs, white_list=set(self._ss.keys()), - desc=self.cfg.hpo.larger_better) + desc=self.cfg.hpo.larger_better, + use_wandb=self.cfg.wandb.use) logger.info( "========================== HPO Final ==========================") logger.info("\n{}".format(results)) diff --git a/federatedscope/autotune/run.py b/federatedscope/autotune/run.py new file mode 100644 index 000000000..bdb2d27b6 --- /dev/null +++ b/federatedscope/autotune/run.py @@ -0,0 +1,23 @@ +def run_scheduler(scheduler, cfg, client_cfgs=None): + """ + This function is to optimize FedHPO problem by scheduler. The method is + decided by `cfg.hpo.scheduler`. + Args: + scheduler: Scheduler for conducting serval FS runs, \ + see ``federatedscope.autotune.algos.Scheduler`` + cfg: The configurations of the FL course. + client_cfgs: The clients' configurations. + """ + if cfg.hpo.scheduler in ['sha', 'wrap_sha']: + _ = scheduler.optimize() + elif cfg.hpo.scheduler in [ + 'rs', 'bo_kde', 'hb', 'bohb', 'wrap_rs', 'wrap_bo_kde', 'wrap_hb', + 'wrap_bohb' + ]: + from federatedscope.autotune.hpbandster import run_hpbandster + run_hpbandster(cfg, scheduler, client_cfgs) + elif cfg.hpo.scheduler in ['bo_gp', 'bo_rf', 'wrap_bo_gp', 'wrap_bo_rf']: + from federatedscope.autotune.smac import run_smac + run_smac(cfg, scheduler, client_cfgs) + else: + raise ValueError(f'No scheduler named {cfg.hpo.scheduler}') diff --git a/federatedscope/autotune/smac.py b/federatedscope/autotune/smac.py index a05734caf..067f53147 100644 --- a/federatedscope/autotune/smac.py +++ b/federatedscope/autotune/smac.py @@ -1,7 +1,7 @@ import logging import numpy as np import ConfigSpace as CS -from federatedscope.autotune.utils import eval_in_fs +from federatedscope.autotune.utils import eval_in_fs, log2wandb from smac.facade.smac_bb_facade import SMAC4BB from smac.facade.smac_hpo_facade import SMAC4HPO from smac.scenario.scenario import Scenario @@ -15,14 +15,26 @@ def run_smac(cfg, scheduler, client_cfgs=None): perfs = [] def optimization_function_wrapper(config): + """ + Used as an evaluation function for SMAC optimizer. + Args: + config: configurations of FS run. + + Returns: + Best results of server of specific FS run. + """ budget = cfg.hpo.sha.budgets[-1] - res = eval_in_fs(cfg, config, budget, client_cfgs) + results = eval_in_fs(cfg, config, budget, client_cfgs) + key1, key2 = cfg.hpo.metric.split('.') + res = results[key1][key2] config = dict(config) config['federate.total_round_num'] = budget init_configs.append(config) perfs.append(res) logger.info(f'Evaluate the {len(perfs)-1}-th config ' f'{config}, and get performance {res}') + if cfg.wandb.use: + log2wandb(len(perfs) - 1, config, results, cfg) return res def summarize(): @@ -30,7 +42,8 @@ def summarize(): results = summarize_hpo_results(init_configs, perfs, white_list=set(config_space.keys()), - desc=cfg.hpo.larger_better) + desc=cfg.hpo.larger_better, + use_wandb=cfg.wandb.use) logger.info( "========================== HPO Final ==========================") logger.info("\n{}".format(results)) diff --git a/federatedscope/autotune/utils.py b/federatedscope/autotune/utils.py index 22f5b9504..271984a35 100644 --- a/federatedscope/autotune/utils.py +++ b/federatedscope/autotune/utils.py @@ -1,24 +1,80 @@ import yaml +import logging import pandas as pd import ConfigSpace as CS +logger = logging.getLogger(__name__) + + +def generate_hpo_exp_name(cfg): + return f'{cfg.hpo.scheduler}_{cfg.hpo.sha.budgets}_{cfg.hpo.metric}' + + +def parse_condition_param(condition, ss): + """ + Parse conditions param to generate ``ConfigSpace.conditions`` + + Condition parameters: EqualsCondition, NotEqualsCondition, \ + LessThanCondition, GreaterThanCondition, InCondition + + Args: + condition (dict): configspace condition dict, which is supposed to + have four keys for + ss (CS.ConfigurationSpace): configspace + + Returns: + ConfigSpace.conditions: the conditions for configspace + """ + str_func_mapping = { + 'equal': CS.EqualsCondition, + 'not_equal': CS.NotEqualsCondition, + 'less': CS.LessThanCondition, + 'greater': CS.GreaterThanCondition, + 'in': CS.InCondition, + 'and': CS.AndConjunction, + 'or': CS.OrConjunction, + } + cond_type = condition['type'] + assert cond_type in str_func_mapping.keys(), f'the param condition ' \ + f'should be in' \ + f' {str_func_mapping.keys()}.' + + if cond_type in ['and', 'in']: + return str_func_mapping[cond_type]( + parse_condition_param(condition['child'], ss), + parse_condition_param(condition['parent'], ss), + ) + else: + return str_func_mapping[cond_type]( + child=ss[condition['child']], + parent=ss[condition['parent']], + value=condition['value'], + ) + def parse_search_space(config_path): - """Parse yaml format configuration to generate search space + """ + Parse yaml format configuration to generate search space + Arguments: config_path (str): the path of the yaml file. - :returns: the search space. - :rtype: ConfigSpace object + Return: + ConfigSpace object: the search space. + """ ss = CS.ConfigurationSpace() + conditions = [] with open(config_path, 'r') as ips: raw_ss_config = yaml.load(ips, Loader=yaml.FullLoader) - for k in raw_ss_config.keys(): - name = k - v = raw_ss_config[k] + # Add hyperparameters + for name in raw_ss_config.keys(): + if name.startswith('condition'): + # Deal with condition later + continue + v = raw_ss_config[name] hyper_type = v['type'] del v['type'] v['name'] = name @@ -33,16 +89,21 @@ def parse_search_space(config_path): raise ValueError("Unsupported hyper type {}".format(hyper_type)) ss.add_hyperparameter(hyper_config) + # Add conditions + for name in raw_ss_config.keys(): + if name.startswith('condition'): + conditions.append(parse_condition_param(raw_ss_config[name], ss)) + ss.add_conditions(conditions) return ss def config2cmdargs(config): - ''' + """ Arguments: config (dict): key is cfg node name, value is the specified value. Returns: results (list): cmd args - ''' + """ results = [] for k, v in config.items(): @@ -52,13 +113,13 @@ def config2cmdargs(config): def config2str(config): - ''' + """ Arguments: config (dict): key is cfg node name, value is the choice of hyper-parameter. Returns: name (str): the string representation of this config - ''' + """ vals = [] for k in config: @@ -69,17 +130,35 @@ def config2str(config): return name -def summarize_hpo_results(configs, perfs, white_list=None, desc=False): - cols = [k for k in configs[0] if (white_list is None or k in white_list) - ] + ['performance'] - d = [[ - trial_cfg[k] - for k in trial_cfg if (white_list is None or k in white_list) - ] + [result] for trial_cfg, result in zip(configs, perfs)] +def summarize_hpo_results(configs, + perfs, + white_list=None, + desc=False, + use_wandb=False): + if white_list is not None: + cols = list(white_list) + ['performance'] + else: + cols = [k for k in configs[0]] + ['performance'] + + d = [] + for trial_cfg, result in zip(configs, perfs): + if white_list is not None: + d.append([ + trial_cfg[k] if k in trial_cfg.keys() else None + for k in white_list + ] + [result]) + else: + d.append([trial_cfg[k] for k in trial_cfg] + [result]) d = sorted(d, key=lambda ele: ele[-1], reverse=desc) df = pd.DataFrame(d, columns=cols) pd.set_option('display.max_colwidth', None) pd.set_option('display.max_columns', None) + + if use_wandb: + import wandb + table = wandb.Table(dataframe=df) + wandb.log({'ConfigurationRank': table}) + return df @@ -90,9 +169,9 @@ def parse_logs(file_list): FONTSIZE = 40 MARKSIZE = 25 - def process(file): + def process(file_path): history = [] - with open(file, 'r') as F: + with open(file_path, 'r') as F: for line in F: try: state, line = line.split('INFO: ') @@ -134,6 +213,17 @@ def process(file): def eval_in_fs(cfg, config, budget, client_cfgs=None): + """ + + Args: + cfg: fs cfg + config: sampled trial CS.Configuration + budget: budget round for this trial + client_cfgs: client-wise cfg + + Returns: + The best results returned from FedRunner + """ import ConfigSpace as CS from federatedscope.core.auxiliaries.utils import setup_seed from federatedscope.core.auxiliaries.data_builder import get_data @@ -166,11 +256,32 @@ def eval_in_fs(cfg, config, budget, client_cfgs=None): data, modified_config = get_data(config=trial_cfg.clone()) trial_cfg.merge_from_other_cfg(modified_config) trial_cfg.freeze() - Fed_runner = get_runner(data=data, + fed_runner = get_runner(data=data, server_class=get_server_cls(trial_cfg), client_class=get_client_cls(trial_cfg), config=trial_cfg.clone(), - client_config=client_cfgs) - results = Fed_runner.run() + client_configs=client_cfgs) + results = fed_runner.run() + + return results + + +def config_bool2int(config): + # TODO: refactor bool/str to int + import copy + new_dict = copy.deepcopy(config) + for key, value in new_dict.items(): + if isinstance(new_dict[key], bool): + new_dict[key] = int(value) + return new_dict + + +def log2wandb(trial, config, results, trial_cfg): + import wandb key1, key2 = trial_cfg.hpo.metric.split('.') - return results[key1][key2] + log_res = { + 'Trial_index': trial, + 'Config': config_bool2int(config), + trial_cfg.hpo.metric: results[key1][key2], + } + wandb.log(log_res) diff --git a/federatedscope/core/auxiliaries/feat_engr_builder.py b/federatedscope/core/auxiliaries/feat_engr_builder.py new file mode 100644 index 000000000..38385b756 --- /dev/null +++ b/federatedscope/core/auxiliaries/feat_engr_builder.py @@ -0,0 +1,35 @@ +import logging +from importlib import import_module + +logger = logging.getLogger(__name__) + + +def dummy_wrapper(worker): + return worker + + +def get_feat_engr_wrapper(config): + """ + Args: + config: configurations for FL, see ``federatedscope.core.configs`` + + Returns: + wrapper for client and wrapper for server. + """ + if config.feat_engr.type == '': + return dummy_wrapper, dummy_wrapper + + logger.info(f'Feature engineering only works on tabular data, please ' + f'check your `data.type` {config.data.type}.') + + wrap_client = \ + getattr(import_module(f'federatedscope.core.feature.' + f'{config.feat_engr.scenario}'), + f'wrap_{config.feat_engr.type}_client') + + wrap_server = \ + getattr(import_module(f'federatedscope.core.feature.' + f'{config.feat_engr.scenario}'), + f'wrap_{config.feat_engr.type}_server') + + return wrap_client, wrap_server diff --git a/federatedscope/core/auxiliaries/logging.py b/federatedscope/core/auxiliaries/logging.py index a0c0bd546..94e7565f3 100644 --- a/federatedscope/core/auxiliaries/logging.py +++ b/federatedscope/core/auxiliaries/logging.py @@ -4,9 +4,10 @@ import os import re import time -from datetime import datetime +import yaml import numpy as np +from datetime import datetime logger = logging.getLogger(__name__) @@ -151,7 +152,6 @@ def init_wandb(cfg): tmp_cfg.clear_aux_info( ) # in most cases, no need to save the cfg_check_funcs via wandb tmp_cfg.de_arguments() - import yaml cfg_yaml = yaml.safe_load(tmp_cfg.dump()) wandb.init(project=cfg.wandb.name_project, diff --git a/federatedscope/core/configs/README.md b/federatedscope/core/configs/README.md index b741cfdc5..98b809268 100644 --- a/federatedscope/core/configs/README.md +++ b/federatedscope/core/configs/README.md @@ -1,5 +1,6 @@ ## Configurations We summarize all the customizable configurations: +- [config.py](#config) - [cfg_data.py](#data) - [cfg_model.py](#model) - [cfg_fl_algo.py](#federated-algorithms) @@ -11,6 +12,23 @@ We summarize all the customizable configurations: - [cfg_hpo.py](#auto-tuning-components) - [cfg_attack.py](#attack) +### config +The configurations related to environment of running experiment. + +| Name | (Type) Default Value | Description | Note | +| ---------------------- | -------------------- | ------------------------------------------------------------ | ---- | +| `backend` | (string) 'torch' | The backend for local training | - | +| `use_gpu` | (bool) False | Whether to use GPU | - | +| `check_completeness` | (bool) False | Whether to check the completeness of msg_handler | - | +| `verbose` | (int) 1 | Whether to print verbose logging info | - | +| `print_decimal_digits` | (int) 6 | How many decimal places we print out using logger | - | +| `device` | (int) -1 | Specify the device for training | - | +| `seed` | (int) 0 | Random seed | - | +| `outdir` | (string) '' | The dir used to save log, exp_config, models, etc,. | - | +| `expname` | (string) '' | Detailed exp name to distinguish different sub-exp | - | +| `expname_tag` | (string) '' | Detailed exp tag to distinguish different sub-exp with the same expname | - | + + ### Data The configurations related to the data/dataset are defined in `cfg_data.py`. diff --git a/federatedscope/core/configs/cfg_data.py b/federatedscope/core/configs/cfg_data.py index d061a1ff8..5c50d10b0 100644 --- a/federatedscope/core/configs/cfg_data.py +++ b/federatedscope/core/configs/cfg_data.py @@ -64,6 +64,23 @@ def extend_data_cfg(cfg): cfg.data.quadratic.min_curv = 0.02 cfg.data.quadratic.max_curv = 12.5 + # feature engineering + cfg.feat_engr = CN() + cfg.feat_engr.type = '' + cfg.feat_engr.scenario = 'hfl' + cfg.feat_engr.num_bins = 5 # Used for binning + cfg.feat_engr.selec_threshold = 0.05 # Used for feature selection + cfg.feat_engr.selec_woe_binning = 'quantile' + + cfg.feat_engr.secure = CN() + cfg.feat_engr.secure.type = 'encrypt' + cfg.feat_engr.secure.key_size = 3072 + + cfg.feat_engr.secure.encrypt = CN() + cfg.feat_engr.secure.encrypt.type = 'dummy' + + cfg.feat_engr.secure.dp = CN() # under dev + # --------------- outdated configs --------------- # TODO: delete this code block cfg.data.loader = '' diff --git a/federatedscope/core/configs/cfg_fl_setting.py b/federatedscope/core/configs/cfg_fl_setting.py index 38ac2427c..e64e35185 100644 --- a/federatedscope/core/configs/cfg_fl_setting.py +++ b/federatedscope/core/configs/cfg_fl_setting.py @@ -67,10 +67,10 @@ def extend_fl_setting_cfg(cfg): # ---------------------------------------------------------------------- # # Vertical FL related options (for demo) # ---------------------------------------------------------------------- # + cfg.vertical_dims = [5, 10] # Avoid to be removed when `use` is False cfg.vertical = CN() cfg.vertical.use = False cfg.vertical.encryption = 'paillier' - cfg.vertical.dims = [5, 10] cfg.vertical.key_size = 3072 # ---------------------------------------------------------------------- # @@ -79,7 +79,6 @@ def extend_fl_setting_cfg(cfg): cfg.xgb_base = CN() cfg.xgb_base.use = False cfg.xgb_base.use_bin = False - cfg.xgb_base.dims = [5, 10] # --------------- register corresponding check function ---------- cfg.register_cfg_check_fun(assert_fl_setting_cfg) diff --git a/federatedscope/core/feature/__init__.py b/federatedscope/core/feature/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/federatedscope/core/feature/hfl/__init__.py b/federatedscope/core/feature/hfl/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/federatedscope/core/feature/hfl/preprocess/__init__.py b/federatedscope/core/feature/hfl/preprocess/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/federatedscope/core/feature/hfl/preprocess/log_transform.py b/federatedscope/core/feature/hfl/preprocess/log_transform.py new file mode 100644 index 000000000..e69de29bb diff --git a/federatedscope/core/feature/hfl/preprocess/min_max_norm.py b/federatedscope/core/feature/hfl/preprocess/min_max_norm.py new file mode 100644 index 000000000..e69de29bb diff --git a/federatedscope/core/feature/hfl/preprocess/quantile_binning.py b/federatedscope/core/feature/hfl/preprocess/quantile_binning.py new file mode 100644 index 000000000..e69de29bb diff --git a/federatedscope/core/feature/hfl/preprocess/standard.py b/federatedscope/core/feature/hfl/preprocess/standard.py new file mode 100644 index 000000000..e69de29bb diff --git a/federatedscope/core/feature/hfl/preprocess/uniform_binning.py b/federatedscope/core/feature/hfl/preprocess/uniform_binning.py new file mode 100644 index 000000000..e69de29bb diff --git a/federatedscope/core/feature/hfl/selection/__init__.py b/federatedscope/core/feature/hfl/selection/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/federatedscope/core/feature/hfl/selection/correlation_filter.py b/federatedscope/core/feature/hfl/selection/correlation_filter.py new file mode 100644 index 000000000..e69de29bb diff --git a/federatedscope/core/feature/hfl/selection/variance_filter.py b/federatedscope/core/feature/hfl/selection/variance_filter.py new file mode 100644 index 000000000..e69de29bb diff --git a/federatedscope/core/feature/utils.py b/federatedscope/core/feature/utils.py new file mode 100644 index 000000000..1d96b7f02 --- /dev/null +++ b/federatedscope/core/feature/utils.py @@ -0,0 +1,80 @@ +import logging +import numpy as np + +logger = logging.getLogger(__name__) + + +def merge_splits_feat(data): + """ + + Args: + data: ``federatedscope.core.data.ClientData`` with Tabular format. + + Returns: + Merged data feature/x. + """ + merged_feat = None + merged_y = None + for split in ['train_data', 'val_data', 'test_data']: + if hasattr(data, split): + split_data = getattr(data, split) + if split_data is not None and 'x' in split_data: + if merged_feat is None: + merged_feat = split_data['x'] + else: + merged_feat = \ + np.concatenate((merged_feat, split_data['x']), axis=0) + if split_data is not None and 'y' in split_data: + if merged_y is None: + merged_y = split_data['y'] + else: + merged_y = \ + np.concatenate((merged_y, split_data['y']), axis=0) + return merged_feat, merged_y + + +def vfl_binning(feat, num_bins, strategy='uniform'): + """ + + Args: + feat: feature to be binned, which must be 2D numpy.array + num_bins: list for bins + strategy: binning strategy, ``'uniform'`` or ``'quantile'`` + + Returns: + Bin edges for binning + """ + num_features = feat.shape[1] + bin_edges = np.zeros(num_features, dtype=object) + + for i in range(num_features): + col = feat[:, i] + col_min, col_max = np.min(col), np.max(col) + if col_min == col_max: + logger.warning( + f'Feature {i} is constant and will be replaced with 0.') + bin_edges[i] = np.array([-np.inf, np.inf]) + continue + if strategy == 'uniform': + bin_edges[i] = np.linspace(col_min, col_max, num_bins[i] + 1) + elif strategy == 'quantile': + quantiles = np.linspace(0, 100, num_bins[i] + 1) + bin_edges[i] = np.asarray(np.percentile(col, quantiles)) + + return bin_edges + + +def secure_builder(cfg): + if cfg.feat_engr.secure.type == 'encrypt': + if cfg.feat_engr.secure.encrypt.type == 'dummy': + from federatedscope.core.secure.encrypt.dummy_encrypt import \ + DummyEncryptKeypair + keypair_generator = DummyEncryptKeypair( + cfg.feat_engr.secure.key_size) + else: + raise NotImplementedError(f'Not implemented encrypt method' + f' {cfg.feat_engr.secure.encrypt.type}.') + return keypair_generator + else: + raise NotImplementedError(f'Not implemented secure method' + f' {cfg.feat_engr.secure.type}.') diff --git a/federatedscope/core/feature/vfl/__init__.py b/federatedscope/core/feature/vfl/__init__.py new file mode 100644 index 000000000..64cfa38d6 --- /dev/null +++ b/federatedscope/core/feature/vfl/__init__.py @@ -0,0 +1,30 @@ +from federatedscope.core.feature.vfl.preprocess.instance_norm import \ + wrap_instance_norm_client, wrap_instance_norm_server +from federatedscope.core.feature.vfl.preprocess.min_max_norm import \ + wrap_min_max_norm_client, wrap_min_max_norm_server +from federatedscope.core.feature.vfl.preprocess.log_transform import \ + wrap_log_transform_client, wrap_log_transform_server +from federatedscope.core.feature.vfl.preprocess.standardization import \ + wrap_standardization_client, wrap_standardization_server +from federatedscope.core.feature.vfl.preprocess.uniform_binning import \ + wrap_uniform_binning_client, wrap_uniform_binning_server +from federatedscope.core.feature.vfl.preprocess.quantile_binning import \ + wrap_quantile_binning_client, wrap_quantile_binning_server +from federatedscope.core.feature.vfl.selection.variance_filter import \ + wrap_variance_filter_client, wrap_variance_filter_server +from federatedscope.core.feature.vfl.selection.correlation_filter import \ + wrap_correlation_filter_client, wrap_correlation_filter_server +from federatedscope.core.feature.vfl.selection.iv_filter import \ + wrap_iv_filter_client, wrap_iv_filter_server + +__all__ = [ + 'wrap_instance_norm_client', 'wrap_instance_norm_server', + 'wrap_min_max_norm_client', 'wrap_min_max_norm_server', + 'wrap_log_transform_client', 'wrap_log_transform_server', + 'wrap_standardization_client', 'wrap_standardization_server', + 'wrap_uniform_binning_client', 'wrap_uniform_binning_server', + 'wrap_quantile_binning_client', 'wrap_quantile_binning_server', + 'wrap_variance_filter_client', 'wrap_variance_filter_server', + 'wrap_correlation_filter_client', 'wrap_correlation_filter_server', + 'wrap_iv_filter_client', 'wrap_iv_filter_server' +] diff --git a/federatedscope/core/feature/vfl/preprocess/__init__.py b/federatedscope/core/feature/vfl/preprocess/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/federatedscope/core/feature/vfl/preprocess/instance_norm.py b/federatedscope/core/feature/vfl/preprocess/instance_norm.py new file mode 100644 index 000000000..1d6779d01 --- /dev/null +++ b/federatedscope/core/feature/vfl/preprocess/instance_norm.py @@ -0,0 +1,305 @@ +import types +import logging +import numpy as np + +from federatedscope.core.message import Message +from federatedscope.core.secret_sharing import AdditiveSecretSharing + +logger = logging.getLogger(__name__) + + +def wrap_instance_norm_server(worker): + """ + This function is to perform instance norm on vfl tabular data for server. + Args: + worker: ``federatedscope.core.workers.Worker`` to be wrapped + + Returns: + Wrap vfl server with instance norm. + """ + def trigger_for_feat_engr(self, + trigger_train_func, + kwargs_for_trigger_train_func={}): + # broadcast_model_para_func after feature engineering finishing + self.trigger_train_func = trigger_train_func + self.kwargs_for_trigger_train_func = \ + kwargs_for_trigger_train_func + + logger.info('Start to execute instance norm with secret sharing.') + + # Broadcast client address for ss + self.broadcast_client_address() + self.ss_manager = AdditiveSecretSharing( + shared_party_num=int(self._cfg.federate.client_num)) + + # Ask for instance statistics + self.msg_buffer['ss_instance_sum'] = [] + self.msg_buffer['ss_instance_sum_norm_square'] = [] + self.comm_manager.send( + Message(msg_type='ask_for_instance_sum', + sender=self.ID, + receiver=list(self.comm_manager.neighbors.keys()), + timestamp=self.cur_timestamp)) + + def callback_func_for_ss_instance_sum(self, message: Message): + """ + The handling function for aggregating ss_instance_sum from all clients. + """ + content = message.content + self.msg_buffer['ss_instance_sum'].append(content) + + if len(self.msg_buffer['ss_instance_sum']) == self._client_num: + logger.info('Sever secret reconstruct ss_instance_sum.') + # Aggregate ss_instance_sum + instance_mean = {} + for split in self.msg_buffer['ss_instance_sum'][0]: + instance_mean[split] = self.ss_manager.secret_reconstruct([ + content[split] + for content in self.msg_buffer['ss_instance_sum'] + ]) + instance_mean[split] = instance_mean[split][ + 'sum'] / self._cfg.vertical_dims[-1] + logger.info('Server send instance_mean to clients') + self.global_mean = instance_mean + self.comm_manager.send( + Message(msg_type='instance_mean', + sender=self.ID, + receiver=list(self.comm_manager.neighbors.keys()), + timestamp=self.cur_timestamp, + content=instance_mean)) + + def callback_func_for_ss_instance_sum_norm_square(self, message: Message): + """ + The handling function for aggregating instance_var from all clients. + """ + content = message.content + self.msg_buffer['ss_instance_sum_norm_square'].append(content) + + if len(self.msg_buffer['ss_instance_sum_norm_square']) == \ + self._client_num: + logger.info('Sever secret reconstruct ' + 'ss_instance_sum_norm_square.') + # Aggregate ss_instance_sum_norm_square + instance_var = {} + for split in self.msg_buffer['ss_instance_sum'][0]: + instance_var[split] = self.ss_manager.secret_reconstruct([ + content[split] for content in + self.msg_buffer['ss_instance_sum_norm_square'] + ]) + instance_var[split] = instance_var[split][ + 'sum_norm_square'] / self._cfg.vertical_dims[-1] + + # Apply instance norm + for split in ['train_data', 'val_data', 'test_data']: + if hasattr(self.data, split): + split_data = getattr(self.data, split) + if split_data is not None and 'x' in split_data: + split_data['x'] = ( + (split_data['x'].T - self.global_mean[split]) / + (instance_var[split]**0.5)).T + + self.comm_manager.send( + Message(msg_type='instance_var', + sender=self.ID, + receiver=list(self.comm_manager.neighbors.keys()), + timestamp=self.cur_timestamp, + content=instance_var)) + self._init_data_related_var() + self.trigger_train_func(**self.kwargs_for_trigger_train_func) + + # Bind method to instance + worker.trigger_for_feat_engr = types.MethodType(trigger_for_feat_engr, + worker) + worker.callback_func_for_ss_instance_sum = types.MethodType( + callback_func_for_ss_instance_sum, worker) + worker.callback_func_for_ss_instance_sum_norm_square = types.MethodType( + callback_func_for_ss_instance_sum_norm_square, worker) + + # Register handlers functions + worker.register_handlers('ss_instance_sum', + worker.callback_func_for_ss_instance_sum) + worker.register_handlers( + 'ss_instance_sum_norm_square', + worker.callback_func_for_ss_instance_sum_norm_square) + return worker + + +def wrap_instance_norm_client(worker): + """ + This function is to perform instance norm vfl tabular data for client. + Args: + worker: ``federatedscope.core.workers.Worker`` to be wrapped + + Returns: + Wrap vfl client with instance norm. + """ + def callback_func_for_ask_for_instance_sum(self, message: Message): + self.ss_manager = AdditiveSecretSharing( + shared_party_num=int(self._cfg.federate.client_num)) + self.msg_buffer['ss_instance_sum'] = {} + content = {} + # Calculate sum + for split in ['train_data', 'val_data', 'test_data']: + if hasattr(self.data, split): + split_data = getattr(self.data, split) + if split_data is not None and 'x' in split_data: + content[split] = self.ss_manager.secret_split( + {'sum': np.sum(split_data['x'], axis=1)}) + # Self-hosted ss_instance_sum + self.msg_buffer['ss_instance_sum'][self.ID] = { + key: value[self.ID - 1] + for key, value in content.items() + } + + # Send ss split to neighbors + for neighbor in self.comm_manager.neighbors: + content_frame = { + key: value[neighbor - 1] + for key, value in content.items() + } + + self.comm_manager.send( + Message(msg_type='ss_instance_sum', + sender=self.ID, + receiver=[neighbor], + content=content_frame)) + + def callback_func_ss_instance_sum(self, message: Message): + sender = message.sender + content_frame = message.content + self.msg_buffer['ss_instance_sum'][sender] = content_frame + logger.info(f'Client {self.ID} receive Client {sender} ' + f'ss_instance_sum') + + if len(self.msg_buffer['ss_instance_sum'].keys()) == \ + self._cfg.federate.client_num: + # Sum up ss_instance_sum + content = {} + for sender in self.msg_buffer['ss_instance_sum'].keys(): + content_frame = self.msg_buffer['ss_instance_sum'][sender] + for key, value in content_frame.items(): + if key not in content: + content[key] = value + else: + for sub_key in content[key].keys(): + content[key][sub_key] += content_frame[key][ + sub_key] + + self.comm_manager.send( + Message(msg_type='ss_instance_sum', + sender=self.ID, + receiver=[self.server_id], + content=content)) + + def callback_func_for_instance_mean(self, message: Message): + """ + The handling function for calculate instance_norm after receiving \ + var and mean. + """ + sender = message.sender + feat_mean = message.content + self.global_mean = feat_mean + + self.msg_buffer['ss_instance_sum_norm_square'] = {} # For variance + content = {} + # Send norm_square + for split in ['train_data', 'val_data', 'test_data']: + if hasattr(self.data, split): + split_data = getattr(self.data, split) + if split_data is not None and 'x' in split_data: + content[split] = self.ss_manager.secret_split({ + 'sum_norm_square': np.sum( + (split_data['x'].T - feat_mean[split]).T**2, + axis=1) + }) + + # Self-hosted ss_instance_sum_norm_square + self.msg_buffer['ss_instance_sum_norm_square'][self.ID] = { + key: value[self.ID - 1] + for key, value in content.items() + } + + # Send ss split to neighbors + for neighbor in self.comm_manager.neighbors: + content_frame = { + key: value[neighbor - 1] + for key, value in content.items() + } + + self.comm_manager.send( + Message(msg_type='ss_instance_sum_norm_square', + sender=self.ID, + receiver=[neighbor], + content=content_frame)) + + def callback_func_for_ss_instance_sum_norm_square(self, message: Message): + sender = message.sender + content_frame = message.content + self.msg_buffer['ss_instance_sum_norm_square'][sender] = content_frame + logger.info(f'Client {self.ID} receive Client {sender} ' + f'ss_instance_sum_norm_square') + + if len(self.msg_buffer['ss_instance_sum_norm_square'].keys()) == \ + self._cfg.federate.client_num: + # Sum up ss_instance_sum_norm_square + content = {} + for sender in self.msg_buffer['ss_instance_sum_norm_square'].keys( + ): + content_frame = self.msg_buffer['ss_instance_sum_norm_square'][ + sender] + for key, value in content_frame.items(): + if key not in content: + content[key] = value + else: + for sub_key in content[key].keys(): + content[key][sub_key] += content_frame[key][ + sub_key] + + self.comm_manager.send( + Message(msg_type='ss_instance_sum_norm_square', + sender=self.ID, + receiver=[self.server_id], + content=content)) + + def callback_func_for_instance_var(self, message: Message): + """ + The handling function for performing instance_norm after receiving \ + var and mean. + """ + feat_var = message.content + # Apply instance norm + for split in ['train_data', 'val_data', 'test_data']: + if hasattr(self.data, split): + split_data = getattr(self.data, split) + if split_data is not None and 'x' in split_data: + split_data['x'] = ( + (split_data['x'].T - self.global_mean[split]) / + (feat_var[split]**0.5)).T + logger.info('Instance norm finished.') + self._init_data_related_var() + + # Bind method to instance + worker.callback_func_for_ask_for_instance_sum = types.MethodType( + callback_func_for_ask_for_instance_sum, worker) + worker.callback_func_ss_instance_sum = types.MethodType( + callback_func_ss_instance_sum, worker) + worker.callback_func_for_instance_mean = types.MethodType( + callback_func_for_instance_mean, worker) + worker.callback_func_for_ss_instance_sum_norm_square = types.MethodType( + callback_func_for_ss_instance_sum_norm_square, worker) + worker.callback_func_for_instance_var = types.MethodType( + callback_func_for_instance_var, worker) + + # Register handlers functions + worker.register_handlers('ask_for_instance_sum', + worker.callback_func_for_ask_for_instance_sum) + worker.register_handlers('ss_instance_sum', + worker.callback_func_ss_instance_sum) + worker.register_handlers('instance_mean', + worker.callback_func_for_instance_mean) + worker.register_handlers( + 'ss_instance_sum_norm_square', + worker.callback_func_for_ss_instance_sum_norm_square) + worker.register_handlers('instance_var', + worker.callback_func_for_instance_var) + return worker diff --git a/federatedscope/core/feature/vfl/preprocess/log_transform.py b/federatedscope/core/feature/vfl/preprocess/log_transform.py new file mode 100644 index 000000000..d22b79e21 --- /dev/null +++ b/federatedscope/core/feature/vfl/preprocess/log_transform.py @@ -0,0 +1,32 @@ +import logging +import numpy as np + +logger = logging.getLogger(__name__) + + +def wrap_log_transform(worker): + """ + This function is to perform log transform for data; + Args: + worker: ``federatedscope.core.workers.Worker`` to be wrapped + + Returns: + Wrap worker with log transformed data. + """ + logger.info('Start to execute log-transform scaling .') + + for split in ['train_data', 'val_data', 'test_data']: + if hasattr(worker.data, split): + split_data = getattr(worker.data, split) + if split_data is not None and 'x' in split_data: + split_data['x'] = np.log(split_data['x']) + worker._init_data_related_var() + return worker + + +def wrap_log_transform_client(worker): + return wrap_log_transform(worker) + + +def wrap_log_transform_server(worker): + return wrap_log_transform(worker) diff --git a/federatedscope/core/feature/vfl/preprocess/min_max_norm.py b/federatedscope/core/feature/vfl/preprocess/min_max_norm.py new file mode 100644 index 000000000..fa5afe0d0 --- /dev/null +++ b/federatedscope/core/feature/vfl/preprocess/min_max_norm.py @@ -0,0 +1,46 @@ +import logging +import numpy as np + +from federatedscope.core.feature.utils import merge_splits_feat + +logger = logging.getLogger(__name__) + + +def wrap_min_max_norm(worker): + """ + This function is to perform min-max scale vfl tabular data. + Args: + worker: ``federatedscope.core.workers.Worker`` to be wrapped + + Returns: + Wrap worker with min-max scaled data + """ + logger.info('Start to execute min-max scaling.') + + # Merge train & val & test + merged_feat, _ = merge_splits_feat(worker.data) + + feat_min = np.min(merged_feat, axis=0) + feat_max = np.max(merged_feat, axis=0) + + # If max == min, it will be replaced with 0.0 + for col_i in range(len(feat_min)): + if feat_min[col_i] == feat_max[col_i]: + feat_max[col_i] = np.inf + + for split in ['train_data', 'val_data', 'test_data']: + if hasattr(worker.data, split): + split_data = getattr(worker.data, split) + if split_data is not None and 'x' in split_data: + split_data['x'] = \ + (split_data['x'] - feat_min) / (feat_max - feat_min) + worker._init_data_related_var() + return worker + + +def wrap_min_max_norm_client(worker): + return wrap_min_max_norm(worker) + + +def wrap_min_max_norm_server(worker): + return wrap_min_max_norm(worker) diff --git a/federatedscope/core/feature/vfl/preprocess/quantile_binning.py b/federatedscope/core/feature/vfl/preprocess/quantile_binning.py new file mode 100644 index 000000000..c5228457e --- /dev/null +++ b/federatedscope/core/feature/vfl/preprocess/quantile_binning.py @@ -0,0 +1,47 @@ +import logging +import numpy as np + +from federatedscope.core.feature.utils import merge_splits_feat, vfl_binning + +logger = logging.getLogger(__name__) + + +def wrap_quantile_binning(worker): + """ + This function is to perform quantile_binning for vfl tabular data. + Args: + worker: ``federatedscope.core.workers.Worker`` to be wrapped + + Returns: + Wrap worker with quantile_binning data + """ + logger.info('Start to execute quantile binning.') + + # Merge train & val & test + merged_feat, _ = merge_splits_feat(worker.data) + + # Get bin edges + if merged_feat is not None: + num_features = merged_feat.shape[1] + num_bins = [worker._cfg.feat_engr.num_bins] * num_features + bin_edges = vfl_binning(merged_feat, num_bins, 'quantile') + + # Transform + for split in ['train_data', 'val_data', 'test_data']: + if hasattr(worker.data, split): + split_data = getattr(worker.data, split) + if split_data is not None and 'x' in split_data: + for i in range(split_data['x'].shape[1]): + split_data['x'][:, i] = \ + np.searchsorted(bin_edges[i][1:-1], + split_data['x'][:, i], side="right") + worker._init_data_related_var() + return worker + + +def wrap_quantile_binning_client(worker): + return wrap_quantile_binning(worker) + + +def wrap_quantile_binning_server(worker): + return wrap_quantile_binning(worker) diff --git a/federatedscope/core/feature/vfl/preprocess/standardization.py b/federatedscope/core/feature/vfl/preprocess/standardization.py new file mode 100644 index 000000000..03bb894fb --- /dev/null +++ b/federatedscope/core/feature/vfl/preprocess/standardization.py @@ -0,0 +1,40 @@ +import logging +import numpy as np + +from federatedscope.core.feature.utils import merge_splits_feat + +logger = logging.getLogger(__name__) + + +def wrap_standardization(worker): + """ + This function is to perform z-norm/standardization for vfl tabular data. + Args: + worker: ``federatedscope.core.workers.Worker`` to be wrapped + + Returns: + Wrap worker z-norm/standardization data + """ + logger.info('Start to execute standardization.') + + # Merge train & val & test + merged_feat, _ = merge_splits_feat(worker.data) + + feat_mean = np.mean(merged_feat, axis=0) + feat_std = np.std(merged_feat, axis=0) + + for split in ['train_data', 'val_data', 'test_data']: + if hasattr(worker.data, split): + split_data = getattr(worker.data, split) + if split_data is not None and 'x' in split_data: + split_data['x'] = (split_data['x'] - feat_mean) / feat_std + worker._init_data_related_var() + return worker + + +def wrap_standardization_client(worker): + return wrap_standardization(worker) + + +def wrap_standardization_server(worker): + return wrap_standardization(worker) diff --git a/federatedscope/core/feature/vfl/preprocess/uniform_binning.py b/federatedscope/core/feature/vfl/preprocess/uniform_binning.py new file mode 100644 index 000000000..a94dfc6ee --- /dev/null +++ b/federatedscope/core/feature/vfl/preprocess/uniform_binning.py @@ -0,0 +1,47 @@ +import logging +import numpy as np + +from federatedscope.core.feature.utils import merge_splits_feat, vfl_binning + +logger = logging.getLogger(__name__) + + +def wrap_uniform_binning(worker): + """ + This function is to perform uniform_binning for vfl tabular data. + Args: + worker: ``federatedscope.core.workers.Worker`` to be wrapped + + Returns: + Wrap worker with uniform_binning data + """ + logger.info('Start to execute uniform binning.') + + # Merge train & val & test + merged_feat, _ = merge_splits_feat(worker.data) + + # Get bin edges + if merged_feat is not None: + num_features = merged_feat.shape[1] + num_bins = [worker._cfg.feat_engr.num_bins] * num_features + bin_edges = vfl_binning(merged_feat, num_bins, 'uniform') + + # Transform + for split in ['train_data', 'val_data', 'test_data']: + if hasattr(worker.data, split): + split_data = getattr(worker.data, split) + if split_data is not None and 'x' in split_data: + for i in range(split_data['x'].shape[1]): + split_data['x'][:, i] = \ + np.searchsorted(bin_edges[i][1:-1], + split_data['x'][:, i], side="right") + worker._init_data_related_var() + return worker + + +def wrap_uniform_binning_client(worker): + return wrap_uniform_binning(worker) + + +def wrap_uniform_binning_server(worker): + return wrap_uniform_binning(worker) diff --git a/federatedscope/core/feature/vfl/selection/__init__.py b/federatedscope/core/feature/vfl/selection/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/federatedscope/core/feature/vfl/selection/correlation_filter.py b/federatedscope/core/feature/vfl/selection/correlation_filter.py new file mode 100644 index 000000000..5a86c8361 --- /dev/null +++ b/federatedscope/core/feature/vfl/selection/correlation_filter.py @@ -0,0 +1,267 @@ +import types +import logging +import numpy as np + +from federatedscope.core.message import Message +from federatedscope.core.feature.utils import secure_builder, merge_splits_feat + +logger = logging.getLogger(__name__) + + +def wrap_correlation_filter_server(worker): + """ + This function is to perform feature selection with correlation_filter \ + to data for server. + Args: + worker: ``federatedscope.core.workers.Worker`` to be wrapped + + Returns: + Wrap vfl server with correlation_filter. + """ + def trigger_for_feat_engr(self, + trigger_train_func, + kwargs_for_trigger_train_func={}): + logger.info('Start to execute correlation_filter, which requires FHE.') + + self.msg_buffer['feat_dim'] = {} + # broadcast_model_para_func after feature engineering finishing + self.trigger_train_func = trigger_train_func + self.kwargs_for_trigger_train_func = \ + kwargs_for_trigger_train_func + + # Broadcast client address and feat_engr_public_key + self.broadcast_client_address() + self.feat_engr_public_key, self.feat_engr_private_key = \ + secure_builder(worker._cfg).generate_keypair() + logger.info('Sending feat_engr_public_keys to clients.') + self.comm_manager.send( + Message(msg_type='feat_engr_public_keys', + sender=self.ID, + receiver=list(self.comm_manager.get_neighbors().keys()), + state=self.state, + content=self.feat_engr_public_key)) + + def callback_funcs_for_en_feat_corrcoef(self, message: Message): + receiver, en_feat_corrcoef = message.content + + feat_corrcoef = [ + self.feat_engr_private_key.decrypt(x) for x in en_feat_corrcoef + ] + logger.info('Sending correlation coefficient to clients.') + self.comm_manager.send( + Message(msg_type='feat_corrcoef', + sender=self.ID, + receiver=receiver, + state=self.state, + content=feat_corrcoef)) + + def callbacks_funcs_for_feat_dim(self, message: Message): + feat_dim, filtered_col = message.content + sender = message.sender + + self.msg_buffer['feat_dim'][sender] = (feat_dim, filtered_col) + if len(self.msg_buffer['feat_dim']) == self._client_num: + server_filtered_col = None + vertical_dims = [0] + for i in sorted(list(self.msg_buffer['feat_dim'].keys())): + vertical_dims.append( + sum(vertical_dims) + self.msg_buffer['feat_dim'][i][0]) + if server_filtered_col is None: + server_filtered_col = self.msg_buffer['feat_dim'][i][1] + else: + server_filtered_col = \ + np.hstack((server_filtered_col, self.msg_buffer[ + 'feat_dim'][i][1]+self.vertical_dims[i-2])) + # Filter feature + for split in ['train_data', 'val_data', 'test_data']: + if hasattr(worker.data, split): + split_data = getattr(worker.data, split) + if split_data is not None and 'x' in split_data: + split_data['x'] = \ + np.delete(split_data['x'], server_filtered_col, + axis=1) + + vertical_dims.pop(0) + self.comm_manager.send( + Message(msg_type='vertical_dims', + sender=self.ID, + receiver=list( + self.comm_manager.get_neighbors().keys()), + state=self.state, + content=vertical_dims)) + self.vertical_dims = vertical_dims + if hasattr(self, '_init_data_related_var'): + self._init_data_related_var() + self.trigger_train_func(**self.kwargs_for_trigger_train_func) + + # Bind method to instance + worker.trigger_for_feat_engr = types.MethodType(trigger_for_feat_engr, + worker) + worker.callback_funcs_for_en_feat_corrcoef = types.MethodType( + callback_funcs_for_en_feat_corrcoef, worker) + worker.callbacks_funcs_for_feat_dim = types.MethodType( + callbacks_funcs_for_feat_dim, worker) + + # Register handlers functions + worker.register_handlers('en_feat_corrcoef', + worker.callback_funcs_for_en_feat_corrcoef) + worker.register_handlers('feat_dim', worker.callbacks_funcs_for_feat_dim) + return worker + + +def wrap_correlation_filter_client(worker): + """ + This function is to perform feature selection with correlation_filter \ + to data for client. + Args: + worker: ``federatedscope.core.workers.Worker`` to be wrapped + + Returns: + Wrap vfl client with correlation_filter. + """ + def callback_funcs_for_feat_engr_public_keys(self, message: Message): + self.feat_engr_public_key = message.content + if self.own_label: + self.msg_buffer['encrypted_norm_feat'] = {} + logger.info(f'Client {self.ID} ask_for_encrypted_norm_feat.') + self.comm_manager.send( + Message(msg_type='ask_for_encrypted_norm_feat', + sender=self.ID, + receiver=[ + each for each in self.comm_manager.neighbors + if each != self.server_id + ], + state=self.state, + content=None)) + + def callback_funcs_for_ask_for_encrypted_norm_feat(self, message: Message): + sender = message.sender + merged_feat, _ = merge_splits_feat(worker.data) + norm_feat = (merged_feat - np.mean(merged_feat, axis=0)) / ( + np.std(merged_feat, axis=0) * merged_feat.shape[0]) + + en_norm_feat = [[self.feat_engr_public_key.encrypt(j) for j in i] + for i in norm_feat] + logger.info( + f'Client {self.ID} sending encrypted_norm_feat to {sender}.') + self.comm_manager.send( + Message(msg_type='encrypted_norm_feat', + sender=self.ID, + receiver=[sender], + state=self.state, + content=en_norm_feat)) + + def callback_funcs_for_encrypted_norm_feat(self, message: Message): + if not self.own_label: + raise NotImplementedError(f'Client {self.ID} do not have y.') + + en_norm_feat = message.content + sender = message.sender + self.msg_buffer['encrypted_norm_feat'][sender] = en_norm_feat + + if len(self.msg_buffer['encrypted_norm_feat'].keys()) == \ + len([each for each in self.comm_manager.neighbors if each != + self.server_id]): + threshold = worker._cfg.feat_engr.selec_threshold + merged_feat, merged_y = merge_splits_feat(worker.data) + + # Filter local feature + if merged_feat is not None: + feat_corrcoef = [] + for i in range(merged_feat.shape[1]): + feat_corrcoef.append( + np.sum( + (merged_feat[:, i] - np.mean(merged_feat[:, i])) * + (merged_y - np.mean(merged_y)) / + merged_feat.shape[0] / + (np.std(merged_feat[:, i]) * np.std(merged_y)))) + filtered_col = (np.array(feat_corrcoef) < + threshold).nonzero()[0] + logger.info(f'The eliminated feature of Client {self.ID} is' + f' {filtered_col}') + # Filter feature + for split in ['train_data', 'val_data', 'test_data']: + if hasattr(worker.data, split): + split_data = getattr(worker.data, split) + if split_data is not None and 'x' in split_data: + split_data['x'] = \ + np.delete(split_data['x'], filtered_col, + axis=1) + self.comm_manager.send( + Message(msg_type='feat_dim', + sender=self.ID, + receiver=[self.server_id], + content=(split_data['x'].shape[1], filtered_col))) + + norm_y = (merged_y - np.mean(merged_y)) / np.std(merged_y) + # Calculate correlation coefficient + for sender, en_norm_feat in \ + self.msg_buffer['encrypted_norm_feat'].items(): + en_feat_corrcoef = [] + for i in range(np.array(en_norm_feat).shape[1]): + en_feat_corrcoef.append( + np.sum(np.array(en_norm_feat)[:, i] * norm_y)) + + # Send to server for decryption + logger.info(f'Client {self.ID} send en_feat_corrcoef to' + f' {self.server_id}.') + self.comm_manager.send( + Message(msg_type='en_feat_corrcoef', + sender=self.ID, + receiver=[self.server_id], + content=(sender, en_feat_corrcoef))) + + def callbacks_funcs_for_feat_corrcoef(self, message: Message): + feat_corrcoef = message.content + + threshold = worker._cfg.feat_engr.selec_threshold + filtered_col = (np.array(feat_corrcoef) < threshold).nonzero()[0] + logger.info(f'The eliminated feature of Client {self.ID} is' + f' {filtered_col}') + + # Filter feature + for split in ['train_data', 'val_data', 'test_data']: + if hasattr(worker.data, split): + split_data = getattr(worker.data, split) + if split_data is not None and 'x' in split_data: + split_data['x'] = \ + np.delete(split_data['x'], filtered_col, axis=1) + + self.comm_manager.send( + Message(msg_type='feat_dim', + sender=self.ID, + receiver=[self.server_id], + content=(split_data['x'].shape[1], filtered_col))) + + def callback_funcs_for_vertical_dims(self, message: Message): + vertical_dims = message.content + self.vertical_dims = vertical_dims + if hasattr(self, '_init_data_related_var'): + self._init_data_related_var() + + # Bind method to instance + worker.callback_funcs_for_feat_engr_public_keys = types.MethodType( + callback_funcs_for_feat_engr_public_keys, worker) + worker.callback_funcs_for_ask_for_encrypted_norm_feat = types.MethodType( + callback_funcs_for_ask_for_encrypted_norm_feat, worker) + worker.callback_funcs_for_encrypted_norm_feat = types.MethodType( + callback_funcs_for_encrypted_norm_feat, worker) + worker.callbacks_funcs_for_feat_corrcoef = types.MethodType( + callbacks_funcs_for_feat_corrcoef, worker) + worker.callback_funcs_for_vertical_dims = types.MethodType( + callback_funcs_for_vertical_dims, worker) + + # Register handlers functions + worker.register_handlers('feat_engr_public_keys', + worker.callback_funcs_for_feat_engr_public_keys) + worker.register_handlers( + 'ask_for_encrypted_norm_feat', + worker.callback_funcs_for_ask_for_encrypted_norm_feat) + worker.register_handlers('encrypted_norm_feat', + worker.callback_funcs_for_encrypted_norm_feat) + worker.register_handlers('feat_corrcoef', + worker.callbacks_funcs_for_feat_corrcoef) + worker.register_handlers('vertical_dims', + worker.callback_funcs_for_vertical_dims) + + return worker diff --git a/federatedscope/core/feature/vfl/selection/iv_filter.py b/federatedscope/core/feature/vfl/selection/iv_filter.py new file mode 100644 index 000000000..8b00be81a --- /dev/null +++ b/federatedscope/core/feature/vfl/selection/iv_filter.py @@ -0,0 +1,263 @@ +import types +import logging +import numpy as np + +from federatedscope.core.message import Message +from federatedscope.core.feature.utils import secure_builder, \ + merge_splits_feat, vfl_binning + +logger = logging.getLogger(__name__) + + +def wrap_iv_filter_server(worker): + """ + This function is to perform feature selection with iv_filter \ + to data for server. + Args: + worker: ``federatedscope.core.workers.Worker`` to be wrapped + + Returns: + Wrap vfl server with iv_filter. + """ + def trigger_for_feat_engr(self, + trigger_train_func, + kwargs_for_trigger_train_func={}): + logger.info('Start to execute woe_filter, which requires HE.') + self.trigger_train_func = trigger_train_func + self.kwargs_for_trigger_train_func = \ + kwargs_for_trigger_train_func + self.msg_buffer['feat_dim'] = {} + + # Broadcast client address and feat_engr_public_key + self.broadcast_client_address() + self.comm_manager.send( + Message(msg_type='binning', + sender=self.ID, + receiver=list(self.comm_manager.get_neighbors().keys()), + state=self.state, + content=self._cfg.feat_engr.selec_woe_binning)) + + def callbacks_funcs_for_feat_dim(self, message: Message): + feat_dim, filtered_col = message.content + sender = message.sender + + self.msg_buffer['feat_dim'][sender] = (feat_dim, filtered_col) + if len(self.msg_buffer['feat_dim']) == self._client_num: + server_filtered_col = None + vertical_dims = [0] + for i in sorted(list(self.msg_buffer['feat_dim'].keys())): + vertical_dims.append( + sum(vertical_dims) + self.msg_buffer['feat_dim'][i][0]) + if server_filtered_col is None: + server_filtered_col = self.msg_buffer['feat_dim'][i][1] + else: + server_filtered_col = \ + np.hstack((server_filtered_col, self.msg_buffer[ + 'feat_dim'][i][1] + self.vertical_dims[i - 2])) + # Filter feature + for split in ['train_data', 'val_data', 'test_data']: + if hasattr(worker.data, split): + split_data = getattr(worker.data, split) + if split_data is not None and 'x' in split_data: + split_data['x'] = \ + np.delete(split_data['x'], server_filtered_col, + axis=1) + + vertical_dims.pop(0) + self.comm_manager.send( + Message(msg_type='vertical_dims', + sender=self.ID, + receiver=list( + self.comm_manager.get_neighbors().keys()), + state=self.state, + content=vertical_dims)) + self.vertical_dims = vertical_dims + if hasattr(self, '_init_data_related_var'): + self._init_data_related_var() + self.trigger_train_func(**self.kwargs_for_trigger_train_func) + + # Bind method to instance + worker.trigger_for_feat_engr = types.MethodType(trigger_for_feat_engr, + worker) + worker.callbacks_funcs_for_feat_dim = types.MethodType( + callbacks_funcs_for_feat_dim, worker) + + # Register handlers functions + worker.register_handlers('feat_dim', worker.callbacks_funcs_for_feat_dim) + return worker + + +def wrap_iv_filter_client(worker): + """ + This function is to perform feature selection with iv_filter \ + to data for client. + Args: + worker: ``federatedscope.core.workers.Worker`` to be wrapped + + Returns: + Wrap vfl client with iv_filter. + """ + def callback_funcs_for_binning(self, message: Message): + # Merge train & val & test + merged_feat, merged_y = merge_splits_feat(worker.data) + + if merged_feat is not None: + num_features = merged_feat.shape[1] + num_bins = [worker._cfg.feat_engr.num_bins] * num_features + bin_edges = vfl_binning(merged_feat, num_bins, message.content) + + for i in range(merged_feat.shape[1]): + merged_feat[:, i] = np.searchsorted(bin_edges[i][1:-1], + merged_feat[:, i], + side="right") + self.merged_feat, self.merged_y = merged_feat, merged_y + self.bin_edges = bin_edges + self.sum_en_y_cnt = 1 + + if self.own_label: + assert len(set(merged_y)) == 2, \ + 'IV filter only available for binary classification tasks' + # Local woe_filter + iv_list = [] + good_t = np.sum(merged_y) + bad_t = merged_feat.shape[0] - good_t + for j in range(merged_feat.shape[1]): + feat = merged_feat[:, j] + iv_value = 0 + for binning in set(feat): + good_rate = np.sum( + merged_y[np.where(feat == binning)[0]]) / good_t + bad_rate = (np.sum(feat == binning) - good_t * good_rate) \ + / bad_t + iv_value += np.log(bad_rate/good_rate) * \ + (bad_rate - good_rate) + iv_list.append(iv_value) + threshold = worker._cfg.feat_engr.selec_threshold + filtered_col = (np.array(iv_list) < threshold).nonzero()[0] + # Filter local feature + for split in ['train_data', 'val_data', 'test_data']: + if hasattr(worker.data, split): + split_data = getattr(worker.data, split) + if split_data is not None and 'x' in split_data: + split_data['x'] = \ + np.delete(split_data['x'], filtered_col, axis=1) + worker._init_data_related_var() + + self.comm_manager.send( + Message(msg_type='feat_dim', + sender=self.ID, + receiver=[self.server_id], + content=(split_data['x'].shape[1], filtered_col))) + + # Generate HE keys + self.feat_engr_public_key, self.feat_engr_private_key = \ + secure_builder(worker._cfg).generate_keypair() + en_y = [self.feat_engr_public_key.encrypt(y_i) for y_i in merged_y] + + # Send en_y_dict to client without label + self.comm_manager.send( + Message(msg_type='en_y', + sender=self.ID, + receiver=[ + each for each in self.comm_manager.neighbors + if each != self.server_id + ], + state=self.state, + content=en_y)) + + def callback_func_for_en_y(self, message: Message): + en_y = message.content + sender = message.sender + + sum_en_y = [] + for j in range(self.merged_feat.shape[1]): + feat = self.merged_feat[:, j] + sum_en_y_j = [] + for binning in set(feat): + index = np.where(feat == binning)[0] + sum_en_y_jj = np.sum([en_y[x] for x in index]) + binning_num = len(index) + sum_en_y_j.append((sum_en_y_jj, binning_num)) + sum_en_y.append(sum_en_y_j) + + self.comm_manager.send( + Message(msg_type='sum_en_y', + sender=self.ID, + receiver=[sender], + state=self.state, + content=sum_en_y)) + + def callback_func_for_sum_en_y(self, message: Message): + sum_en_y = message.content + sender = message.sender + + good_t = np.sum(self.merged_y) + bad_t = len(self.merged_y) - good_t + + iv_list = [] + for j in range(len(sum_en_y)): + sum_en_y_j = sum_en_y[j] + iv_value = 0 + for binning in sum_en_y_j: + sum_en_y_jj, binning_num = binning + good_rate = self.feat_engr_private_key.decrypt( + sum_en_y_jj) / good_t + bad_rate = (binning_num - good_t * good_rate) / bad_t + iv_value += np.log(bad_rate / good_rate) * \ + (bad_rate - good_rate) + iv_list.append(iv_value) + + self.comm_manager.send( + Message(msg_type='iv_list', + sender=self.ID, + receiver=[sender], + state=self.state, + content=iv_list)) + + def callback_func_for_iv_list(self, message: Message): + iv_list = message.content + + threshold = worker._cfg.feat_engr.selec_threshold + filtered_col = (np.array(iv_list) < threshold).nonzero()[0] + # Filter local feature + for split in ['train_data', 'val_data', 'test_data']: + if hasattr(worker.data, split): + split_data = getattr(worker.data, split) + if split_data is not None and 'x' in split_data: + split_data['x'] = \ + np.delete(split_data['x'], filtered_col, axis=1) + worker._init_data_related_var() + + self.comm_manager.send( + Message(msg_type='feat_dim', + sender=self.ID, + receiver=[self.server_id], + content=(split_data['x'].shape[1], filtered_col))) + + def callback_funcs_for_vertical_dims(self, message: Message): + vertical_dims = message.content + self.vertical_dims = vertical_dims + if hasattr(self, '_init_data_related_var'): + self._init_data_related_var() + + # Bind method to instance + worker.callback_funcs_for_binning = types.MethodType( + callback_funcs_for_binning, worker) + worker.callback_func_for_en_y = types.MethodType(callback_func_for_en_y, + worker) + worker.callback_func_for_sum_en_y = types.MethodType( + callback_func_for_sum_en_y, worker) + worker.callback_func_for_iv_list = types.MethodType( + callback_func_for_iv_list, worker) + worker.callback_funcs_for_vertical_dims = types.MethodType( + callback_funcs_for_vertical_dims, worker) + + # Register handlers functions + worker.register_handlers('binning', worker.callback_funcs_for_binning) + worker.register_handlers('en_y', worker.callback_func_for_en_y) + worker.register_handlers('sum_en_y', worker.callback_func_for_sum_en_y) + worker.register_handlers('iv_list', worker.callback_func_for_iv_list) + worker.register_handlers('vertical_dims', + worker.callback_funcs_for_vertical_dims) + + return worker diff --git a/federatedscope/core/feature/vfl/selection/variance_filter.py b/federatedscope/core/feature/vfl/selection/variance_filter.py new file mode 100644 index 000000000..327af58da --- /dev/null +++ b/federatedscope/core/feature/vfl/selection/variance_filter.py @@ -0,0 +1,44 @@ +import logging +import numpy as np + +from federatedscope.core.feature.utils import merge_splits_feat + +logger = logging.getLogger(__name__) + + +def wrap_variance_filter(worker): + """ + This function is to perform z-norm/standardization for vfl tabular data. + Args: + worker: ``federatedscope.core.workers.Worker`` to be wrapped + + Returns: + Wrap worker z-norm/standardization data + """ + logger.info('Start to execute standardization.') + + # Merge train & val & test + merged_feat, _ = merge_splits_feat(worker.data) + + # If variance is smaller than threshold, the feature will be removed + feat_var = np.var(merged_feat, axis=0) + threshold = worker._cfg.feat_engr.selec_threshold + filtered_col = (feat_var < threshold).nonzero()[0] + + # Filter feature + for split in ['train_data', 'val_data', 'test_data']: + if hasattr(worker.data, split): + split_data = getattr(worker.data, split) + if split_data is not None and 'x' in split_data: + split_data['x'] = \ + np.delete(split_data['x'], filtered_col, axis=1) + worker._init_data_related_var() + return worker + + +def wrap_variance_filter_client(worker): + return wrap_variance_filter(worker) + + +def wrap_variance_filter_server(worker): + return wrap_variance_filter(worker) diff --git a/federatedscope/core/fed_runner.py b/federatedscope/core/fed_runner.py index 914a249f7..322736446 100644 --- a/federatedscope/core/fed_runner.py +++ b/federatedscope/core/fed_runner.py @@ -10,6 +10,8 @@ from federatedscope.core.gpu_manager import GPUManager from federatedscope.core.auxiliaries.model_builder import get_model from federatedscope.core.auxiliaries.utils import get_resource_info +from federatedscope.core.auxiliaries.feat_engr_builder import \ + get_feat_engr_wrapper logger = logging.getLogger(__name__) @@ -63,6 +65,8 @@ def __init__(self, specified_device=self.cfg.device) self.unseen_clients_id = [] + self.feat_engr_wrapper_client, self.feat_engr_wrapper_server = \ + get_feat_engr_wrapper(config) if self.cfg.federate.unseen_clients_rate > 0: self.unseen_clients_id = np.random.choice( np.arange(1, self.cfg.federate.client_num + 1), @@ -160,7 +164,7 @@ def _setup_server(self, resource_info=None, client_resource_info=None): wrap_nbafl_server wrap_nbafl_server(server) logger.info('Server has been set up ... ') - return server + return self.feat_engr_wrapper_server(server) def _setup_client(self, client_id=-1, @@ -209,7 +213,7 @@ def _setup_client(self, else: logger.info(f'Client {client_id} has been set up ... ') - return client + return self.feat_engr_wrapper_client(client) def check(self): """ diff --git a/federatedscope/core/secure/__init__.py b/federatedscope/core/secure/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/federatedscope/core/secure/encrypt/__init__.py b/federatedscope/core/secure/encrypt/__init__.py new file mode 100644 index 000000000..ec59509a8 --- /dev/null +++ b/federatedscope/core/secure/encrypt/__init__.py @@ -0,0 +1,4 @@ +from federatedscope.core.secure.encrypt.dummy_encrypt import \ + DummyEncryptKeypair + +__all__ = ['DummyEncryptKeypair'] diff --git a/federatedscope/core/secure/encrypt/dummy_encrypt.py b/federatedscope/core/secure/encrypt/dummy_encrypt.py new file mode 100644 index 000000000..125015198 --- /dev/null +++ b/federatedscope/core/secure/encrypt/dummy_encrypt.py @@ -0,0 +1,60 @@ +DEFAULT_KEY_SIZE = 3072 + + +class DummyEncryptKeypair(object): + def __init__(self, n_length=DEFAULT_KEY_SIZE): + self.n_length = n_length + + @staticmethod + def generate_keypair(): + n = p = q = None + public_key = DummyEncryptPublicKey(n) + private_key = DummyEncryptPrivateKey(public_key, p, q) + return public_key, private_key + + +class DummyEncryptPublicKey(object): + def __init__(self, n): + self.n = n + + def __eq__(self, other): + return self.n == other.n + + def encrypt(self, value): + ciphertext = value + encrypted_num = DummyEncryptNumber(self, ciphertext) + return encrypted_num + + +class DummyEncryptPrivateKey(object): + def __init__(self, public_key, p, q): + self.public_key = public_key + self.p = p + self.q = q + + def __eq__(self, other): + return self.p == other.p and self.q == other.q + + def decrypt(self, encrypted_num): + assert self.public_key == encrypted_num.public_key, \ + 'public_key not matched!' + decrypt_val = encrypted_num.ciphertext + return decrypt_val + + +class DummyEncryptNumber(object): + def __init__(self, public_key, ciphertext): + self.public_key = public_key + self.ciphertext = ciphertext + + def __add__(self, other): + if isinstance(other, DummyEncryptNumber): + return DummyEncryptNumber(self.public_key, + self.ciphertext + other.ciphertext) + return DummyEncryptNumber(self.public_key, self.ciphertext + other) + + def __mul__(self, other): + if isinstance(other, DummyEncryptNumber): + return DummyEncryptNumber(self.public_key, + self.ciphertext * self.ciphertext) + return DummyEncryptNumber(self.public_key, self.ciphertext * other) diff --git a/federatedscope/core/trainers/trainer_Ditto.py b/federatedscope/core/trainers/trainer_Ditto.py index f30b50f46..39bbcd488 100644 --- a/federatedscope/core/trainers/trainer_Ditto.py +++ b/federatedscope/core/trainers/trainer_Ditto.py @@ -311,7 +311,7 @@ def _hook_on_fit_end_free_cuda(ctx): Attribute Operation ================================== =========================== ``ctx.global_model`` Move to ``cpu`` - ``ctx.locol_model`` Move to ``cpu`` + ``ctx.locol_model`` Move to ``cpu`` ================================== =========================== """ ctx.global_model.to(torch.device("cpu")) diff --git a/federatedscope/core/workers/server.py b/federatedscope/core/workers/server.py index 227615397..f34e13963 100644 --- a/federatedscope/core/workers/server.py +++ b/federatedscope/core/workers/server.py @@ -794,11 +794,24 @@ def trigger_for_start(self): self.deadline_for_cur_round = self.cur_timestamp + \ self._cfg.asyn.time_budget + # start feature engineering + self.trigger_for_feat_engr( + self.broadcast_model_para, { + 'msg_type': 'model_para', + 'sample_client_num': self.sample_client_num + }) + logger.info( '----------- Starting training (Round #{:d}) -------------'. format(self.state)) - self.broadcast_model_para(msg_type='model_para', - sample_client_num=self.sample_client_num) + + def trigger_for_feat_engr(self, + trigger_train_func, + kwargs_for_trigger_train_func={}): + """ + Interface for feature engineering, the default operation is none + """ + trigger_train_func(**kwargs_for_trigger_train_func) def trigger_for_time_up(self, check_timestamp=None): """ diff --git a/federatedscope/cv/trainer/__init__.py b/federatedscope/cv/trainer/__init__.py index 880c20a1e..f270e493e 100644 --- a/federatedscope/cv/trainer/__init__.py +++ b/federatedscope/cv/trainer/__init__.py @@ -1,25 +1,23 @@ -""" -Copyright (c) 2021 Matthias Fey, Jiaxuan You - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. -""" +# Copyright (c) 2021 Matthias Fey, Jiaxuan You +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. from os.path import dirname, basename, isfile, join import glob diff --git a/federatedscope/hpo.py b/federatedscope/hpo.py index dfbd4ee7d..789c897e7 100644 --- a/federatedscope/hpo.py +++ b/federatedscope/hpo.py @@ -11,7 +11,7 @@ from federatedscope.core.auxiliaries.logging import update_logger from federatedscope.core.cmd_args import parse_args, parse_client_cfg from federatedscope.core.configs.config import global_cfg, CfgNode -from federatedscope.autotune import get_scheduler +from federatedscope.autotune import get_scheduler, run_scheduler if os.environ.get('https_proxy'): del os.environ['https_proxy'] @@ -26,6 +26,11 @@ cfg_opt, client_cfg_opt = parse_client_cfg(args.opts) init_cfg.merge_from_list(cfg_opt) + # Update Exp_name for hpo + if init_cfg.expname == '': + from federatedscope.autotune.utils import generate_hpo_exp_name + init_cfg.expname = generate_hpo_exp_name(init_cfg) + update_logger(init_cfg, clear_before_add=True) setup_seed(init_cfg.seed) @@ -38,21 +43,4 @@ client_cfgs = None scheduler = get_scheduler(init_cfg, client_cfgs) - - if init_cfg.hpo.scheduler in ['sha', 'wrap_sha']: - _ = scheduler.optimize() - elif init_cfg.hpo.scheduler in [ - 'rs', 'bo_kde', 'hb', 'bohb', 'wrap_rs', 'wrap_bo_kde', 'wrap_hb', - 'wrap_bohb' - ]: - from federatedscope.autotune.hpbandster import run_hpbandster - run_hpbandster(init_cfg, scheduler, client_cfgs) - elif init_cfg.hpo.scheduler in [ - 'bo_gp', 'bo_rf', 'wrap_bo_gp', 'wrap_bo_rf' - ]: - from federatedscope.autotune.smac import run_smac - run_smac(init_cfg, scheduler, client_cfgs) - else: - raise ValueError(f'No scheduler named {init_cfg.hpo.scheduler}') - - # logger.info(results) + run_scheduler(scheduler, init_cfg, client_cfgs) diff --git a/federatedscope/vertical_fl/vertical_fl.yaml b/federatedscope/vertical_fl/baseline/vertical_fl.yaml similarity index 100% rename from federatedscope/vertical_fl/vertical_fl.yaml rename to federatedscope/vertical_fl/baseline/vertical_fl.yaml diff --git a/federatedscope/vertical_fl/vertical_on_adult.yaml b/federatedscope/vertical_fl/baseline/vertical_on_adult.yaml similarity index 95% rename from federatedscope/vertical_fl/vertical_on_adult.yaml rename to federatedscope/vertical_fl/baseline/vertical_on_adult.yaml index fcbca5993..c5f7bd379 100644 --- a/federatedscope/vertical_fl/vertical_on_adult.yaml +++ b/federatedscope/vertical_fl/baseline/vertical_on_adult.yaml @@ -23,10 +23,10 @@ criterion: type: CrossEntropyLoss trainer: type: none +vertical_dims: [7, 14] vertical: use: True key_size: 256 - dims: [7, 14] eval: freq: 5 best_res_update_round_wise_key: test_loss diff --git a/federatedscope/vertical_fl/dataloader/dataloader.py b/federatedscope/vertical_fl/dataloader/dataloader.py index 804574901..9b44393ae 100644 --- a/federatedscope/vertical_fl/dataloader/dataloader.py +++ b/federatedscope/vertical_fl/dataloader/dataloader.py @@ -23,10 +23,10 @@ def load_vertical_data(config=None, generate=False): name = config.data.type.lower() # TODO: merge the following later if config.vertical.use: - feature_partition = config.vertical.dims + feature_partition = config.vertical_dims algo = 'lr' elif config.xgb_base.use: - feature_partition = config.xgb_base.dims + feature_partition = config.vertical_dims algo = 'xgb' else: raise ValueError('You must provide the data partition') @@ -101,7 +101,7 @@ def load_vertical_data(config=None, generate=False): INSTANCE_NUM = 1000 TRAIN_SPLIT = 0.9 - total_dims = np.sum(config.vertical.dims) + total_dims = np.sum(config.vertical_dims) theta = np.random.uniform(low=-1.0, high=1.0, size=(total_dims, 1)) x = np.random.choice([-1.0, 1.0, -2.0, 2.0, -3.0, 3.0], size=(INSTANCE_NUM, total_dims)) @@ -122,14 +122,14 @@ def load_vertical_data(config=None, generate=False): # For Client #1 data[1] = dict() - data[1]['train'] = {'x': x[:train_num, :config.vertical.dims[0]]} + data[1]['train'] = {'x': x[:train_num, :config.vertical_dims[0]]} data[1]['val'] = None data[1]['test'] = test_data # For Client #2 data[2] = dict() data[2]['train'] = { - 'x': x[:train_num, config.vertical.dims[0]:], + 'x': x[:train_num, config.vertical_dims[0]:], 'y': y[:train_num] } data[2]['val'] = None diff --git a/federatedscope/vertical_fl/dataset/__init__.py b/federatedscope/vertical_fl/dataset/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/federatedscope/vertical_fl/worker/vertical_client.py b/federatedscope/vertical_fl/worker/vertical_client.py index 8e9bba9d0..0114d5a6a 100644 --- a/federatedscope/vertical_fl/worker/vertical_client.py +++ b/federatedscope/vertical_fl/worker/vertical_client.py @@ -35,10 +35,7 @@ def __init__(self, self.public_key = None self.theta = None self.batch_index = None - self.own_label = ('y' in self.data['train']) - self.dataloader = batch_iter(self.data['train'], - self._cfg.dataloader.batch_size, - shuffled=True) + self._init_data_related_var() self.register_handlers('public_keys', self.callback_funcs_for_public_keys) @@ -49,6 +46,12 @@ def __init__(self, self.register_handlers('encryped_gradient_v', self.callback_funcs_for_encryped_gradient_v) + def _init_data_related_var(self): + self.own_label = ('y' in self.data['train']) + self.dataloader = batch_iter(self.data['train'], + self._cfg.dataloader.batch_size, + shuffled=True) + def sample_data(self, index=None): if index is None: assert self.own_label diff --git a/federatedscope/vertical_fl/worker/vertical_server.py b/federatedscope/vertical_fl/worker/vertical_server.py index 27b57573a..39330a16f 100644 --- a/federatedscope/vertical_fl/worker/vertical_server.py +++ b/federatedscope/vertical_fl/worker/vertical_server.py @@ -4,6 +4,7 @@ from federatedscope.core.workers import Server from federatedscope.core.message import Message from federatedscope.vertical_fl.Paillier import abstract_paillier +from federatedscope.core.auxiliaries.model_builder import get_model logger = logging.getLogger(__name__) @@ -35,18 +36,24 @@ def __init__(self, cfg_key_size = config.vertical.key_size self.public_key, self.private_key = \ abstract_paillier.generate_paillier_keypair(n_length=cfg_key_size) - self.dims = [0] + config.vertical.dims - self.theta = self.model.state_dict()['fc.weight'].numpy().reshape(-1) + self.vertical_dims = config.vertical_dims + self._init_data_related_var() + self.lr = config.train.optimizer.lr self.register_handlers('encryped_gradient', self.callback_funcs_for_encryped_gradient) + def _init_data_related_var(self): + self.dims = [0] + self.vertical_dims + self.model = get_model(self._cfg.model, self.data) + self.theta = self.model.state_dict()['fc.weight'].numpy().reshape(-1) + def trigger_for_start(self): if self.check_client_join_in(): self.broadcast_public_keys() self.broadcast_client_address() - self.broadcast_model_para() + self.trigger_for_feat_engr(self.broadcast_model_para) def broadcast_public_keys(self): self.comm_manager.send( diff --git a/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_abalone.yaml b/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_abalone.yaml index c320418b7..5941730d5 100644 --- a/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_abalone.yaml +++ b/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_abalone.yaml @@ -24,10 +24,10 @@ train: gamma: 0 num_of_trees: 10 max_tree_depth: 3 +vertical_dims: [4, 8] xgb_base: use: True use_bin: True - dims: [4, 8] eval: freq: 5 best_res_update_round_wise_key: test_loss \ No newline at end of file diff --git a/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_adult.yaml b/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_adult.yaml index 49f64aa92..50bd5d744 100644 --- a/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_adult.yaml +++ b/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_adult.yaml @@ -9,10 +9,10 @@ model: data: root: data/ type: adult - batch_size: 2000 splits: [1.0, 0.0] dataloader: type: raw + batch_size: 2000 criterion: type: CrossEntropyLoss trainer: @@ -24,10 +24,10 @@ train: gamma: 0 num_of_trees: 10 max_tree_depth: 3 +vertical_dims: [7, 14] xgb_base: use: True use_bin: True - dims: [7, 14] eval: freq: 3 best_res_update_round_wise_key: test_loss \ No newline at end of file diff --git a/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_blogfeedback.yaml b/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_blogfeedback.yaml index 7a34f92fd..b0587d1db 100644 --- a/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_blogfeedback.yaml +++ b/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_blogfeedback.yaml @@ -24,10 +24,10 @@ train: gamma: 0 num_of_trees: 9 max_tree_depth: 3 +vertical_dims: [10, 20] xgb_base: use: True use_bin: True - dims: [10, 20] eval: freq: 3 best_res_update_round_wise_key: test_loss \ No newline at end of file diff --git a/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_givemesomecredit.yaml b/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_givemesomecredit.yaml index b40838cd1..4c433d209 100644 --- a/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_givemesomecredit.yaml +++ b/federatedscope/vertical_fl/xgb_base/baseline/xgb_base_on_givemesomecredit.yaml @@ -24,10 +24,10 @@ train: gamma: 0 num_of_trees: 10 max_tree_depth: 3 +vertical_dims: [5, 10] xgb_base: use: True use_bin: True - dims: [5, 10] eval: freq: 3 best_res_update_round_wise_key: test_loss \ No newline at end of file diff --git a/federatedscope/vertical_fl/xgb_base/worker/XGBClient.py b/federatedscope/vertical_fl/xgb_base/worker/XGBClient.py index ce6b717aa..d94174e72 100644 --- a/federatedscope/vertical_fl/xgb_base/worker/XGBClient.py +++ b/federatedscope/vertical_fl/xgb_base/worker/XGBClient.py @@ -42,6 +42,7 @@ def __init__(self, self.gamma = None self.num_of_trees = None self.max_tree_depth = None + self.vertical_dims = self._cfg.vertical_dims self.federate_mode = config.federate.mode @@ -50,7 +51,18 @@ def __init__(self, self.data = data self.own_label = ('y' in self.data['train']) + self._init_data_related_var() + self.register_handlers('model_para', self.callback_func_for_model_para) + self.register_handlers('data_sample', + self.callback_func_for_data_sample) + self.register_handlers('compute_next_node', + self.callback_func_for_compute_next_node) + self.register_handlers('send_feature_importance', + self.callback_func_for_send_feature_importance) + self.register_handlers('finish', self.callback_func_for_finish) + + def _init_data_related_var(self): self.test_x = self.data['test']['x'] if self.own_label: self.test_y = self.data['test']['y'] @@ -59,7 +71,7 @@ def __init__(self, self.y_hat = None self.y = None - self.num_of_parties = config.federate.client_num + self.num_of_parties = self._cfg.federate.client_num self.dataloader = batch_iter(self.data['train'], self._cfg.data.batch_size, @@ -69,7 +81,7 @@ def __init__(self, self.z = 0 - self.feature_list = [0] + config.xgb_base.dims + self.feature_list = [0] + self.vertical_dims self.feature_partition = [ self.feature_list[i] - self.feature_list[i - 1] for i in range(1, len(self.feature_list)) @@ -84,28 +96,19 @@ def __init__(self, # the following two lines are the two alogs, where # the first one corresponding to sending the whole feature order # the second one corresponding to sending the bins of feature order - if config.xgb_base.use_bin: + if self._cfg.xgb_base.use_bin: self.fs = Feature_sort_by_bin(self, bin_num=self.bin_num) else: self.fs = Feature_sort_base(self) self.ts = Test_base(self) - self.criterion_type = config.criterion.type + self.criterion_type = self._cfg.criterion.type if self.criterion_type == 'CrossEntropyLoss': self.ls = TwoClassificationloss() elif self.criterion_type == 'Regression': self.ls = Regression_by_mseloss() - self.register_handlers('model_para', self.callback_func_for_model_para) - self.register_handlers('data_sample', - self.callback_func_for_data_sample) - self.register_handlers('compute_next_node', - self.callback_func_for_compute_next_node) - self.register_handlers('send_feature_importance', - self.callback_func_for_send_feature_importance) - self.register_handlers('finish', self.callback_func_for_finish) - # save the order of values in each feature def order_feature(self, data): for j in range(data.shape[1]): diff --git a/federatedscope/vertical_fl/xgb_base/worker/XGBServer.py b/federatedscope/vertical_fl/xgb_base/worker/XGBServer.py index 80eb3f275..e694afd13 100644 --- a/federatedscope/vertical_fl/xgb_base/worker/XGBServer.py +++ b/federatedscope/vertical_fl/xgb_base/worker/XGBServer.py @@ -30,16 +30,8 @@ def __init__(self, self.max_tree_depth = config.train.optimizer.max_tree_depth self.num_of_parties = config.federate.client_num - - self.batch_size = config.data.batch_size - - self.feature_list = [0] + config.xgb_base.dims - self.feature_partition = [ - self.feature_list[i + 1] - self.feature_list[i] - for i in range(len(self.feature_list) - 1) - ] - self.total_num_of_feature = self.feature_list[-1] - + self.vertical_dims = self._cfg.vertical_dims + self._init_data_related_var() self.data = data self.tree_list = [ @@ -52,10 +44,19 @@ def __init__(self, self.register_handlers('feature_importance', self.callback_func_for_feature_importance) + def _init_data_related_var(self): + self.batch_size = self._cfg.data.batch_size + self.feature_list = [0] + self.vertical_dims + self.feature_partition = [ + self.feature_list[i + 1] - self.feature_list[i] + for i in range(len(self.feature_list) - 1) + ] + self.total_num_of_feature = self.feature_list[-1] + def trigger_for_start(self): if self.check_client_join_in(): self.broadcast_client_address() - self.broadcast_model_para() + self.trigger_for_feat_engr(self.broadcast_model_para) def broadcast_model_para(self): self.comm_manager.send( diff --git a/tests/test_yaml.py b/tests/test_yaml.py index faf3a109e..3ab8d1c55 100644 --- a/tests/test_yaml.py +++ b/tests/test_yaml.py @@ -10,7 +10,7 @@ class YAMLTest(unittest.TestCase): def setUp(self): - self.exclude_all = ['benchmark', 'scripts'] + self.exclude_all = ['benchmark', 'scripts', 'federatedscope/autotune'] self.exclude_file = [ '.pre-commit-config.yaml', 'meta.yaml', 'federatedscope/gfl/baseline/isolated_gin_minibatch_on_cikmcup_per_client.yaml', @@ -29,6 +29,7 @@ def setUp(self): def test_yaml(self): init_cfg = global_cfg.clone() sign, cont = False, False + error_file = [] for dirpath, _, filenames in os.walk(self.root): for prefix in self.exclude_all: if dirpath.startswith(prefix): @@ -40,22 +41,25 @@ def test_yaml(self): filenames = [f for f in filenames if f.endswith('.yaml')] for f in filenames: yaml_file = os.path.join(dirpath, f) - if yaml_file in self.exclude_file: + # Ignore `ss` search space and yaml file in `exclude_file` + if yaml_file in self.exclude_file or 'ss' in yaml_file: continue try: init_cfg.merge_from_file(yaml_file) except KeyError as error: + error_file.append(yaml_file.removeprefix(self.root)) logger.error( f"KeyError: {error} in file: {yaml_file.removeprefix(self.root)}" ) sign = True except ValueError as error: + error_file.append(yaml_file.removeprefix(self.root)) logger.error( f"ValueError: {error} in file: {yaml_file.removeprefix(self.root)}" ) sign = True init_cfg = global_cfg.clone() - self.assertIs(sign, False, "Yaml check failed.") + self.assertIs(sign, False, f'Yaml check failed in {error_file}.') if __name__ == '__main__':