From cec765141bf0ec4e63cd6785fee64de519975c5f Mon Sep 17 00:00:00 2001 From: rayrayraykk <18007356109@163.com> Date: Wed, 28 Sep 2022 19:42:34 +0800 Subject: [PATCH 1/8] add first func for complt --- federatedscope/core/fed_runner.py | 22 ++++++++++++++++++++++ federatedscope/core/workers/client.py | 13 +++++++++---- federatedscope/core/workers/server.py | 13 +++++++++---- federatedscope/main.py | 3 ++- 4 files changed, 42 insertions(+), 9 deletions(-) diff --git a/federatedscope/core/fed_runner.py b/federatedscope/core/fed_runner.py index 795cf0d32..7ab99831c 100644 --- a/federatedscope/core/fed_runner.py +++ b/federatedscope/core/fed_runner.py @@ -63,6 +63,7 @@ def __init__(self, self.resource_info = get_resource_info( config.federate.resource_info_file) + def setup(self): if self.mode == 'standalone': self.shared_comm_queue = deque() self._setup_for_standalone() @@ -184,6 +185,7 @@ def run(self): For the standalone mode, a shared message queue will be set up to simulate ``receiving message``. """ + self.setup() if self.mode == 'standalone': # trigger the FL course for each_client in self.client: @@ -427,3 +429,23 @@ def _handle_msg(self, msg, rcv=-1): self.client[each_receiver].msg_handlers[msg.msg_type](msg) self.client[each_receiver]._monitor.track_download_bytes( download_bytes) + + def check(self): + """ + Check the completeness of Server and Client. + + Returns: + + """ + # try: + client_msg_handler_dict = self.client_class.get_msg_handler_dict() + server_msg_handler_dict = self.server_class.get_msg_handler_dict() + # except: + # logger.warning('Completeness check failed for ' + # 'NotImplementedError.') + # return True + + logger.info(client_msg_handler_dict) + logger.info(server_msg_handler_dict) + # import networkx as nx + # TODO: Check the connectivity of the Digraph diff --git a/federatedscope/core/workers/client.py b/federatedscope/core/workers/client.py index 2dbc4bdd7..ce7f7112c 100644 --- a/federatedscope/core/workers/client.py +++ b/federatedscope/core/workers/client.py @@ -45,7 +45,12 @@ def __init__(self, is_unseen_client=False, *args, **kwargs): + # Register message handlers + self.msg_handlers = dict() + self._register_default_handlers() + if config is None: + return super(Client, self).__init__(ID, state, config, model, strategy) # the unseen_client indicates that whether this client contributes to @@ -91,10 +96,6 @@ def __init__(self, )) if self._cfg.federate.use_ss else None self.msg_buffer = {'train': dict(), 'eval': dict()} - # Register message handlers - self.msg_handlers = dict() - self._register_default_handlers() - # Communication and communication ability if 'resource_info' in kwargs and kwargs['resource_info'] is not None: self.comp_speed = float( @@ -534,3 +535,7 @@ def callback_funcs_for_converged(self, message: Message): """ self._monitor.global_converged() + + @classmethod + def get_msg_handler_dict(cls): + return cls().msg_handlers diff --git a/federatedscope/core/workers/server.py b/federatedscope/core/workers/server.py index 38ceb6435..564a04435 100644 --- a/federatedscope/core/workers/server.py +++ b/federatedscope/core/workers/server.py @@ -50,7 +50,12 @@ def __init__(self, strategy=None, unseen_clients_id=None, **kwargs): + # Register message handlers + self.msg_handlers = dict() + self._register_default_handlers() + if config is None: + return super(Server, self).__init__(ID, state, config, model, strategy) self.data = data @@ -165,10 +170,6 @@ def __init__(self, self.client_resource_info = kwargs['client_resource_info'] \ if 'client_resource_info' in kwargs else None - # Register message handlers - self.msg_handlers = dict() - self._register_default_handlers() - # Initialize communication manager and message buffer self.msg_buffer = {'train': dict(), 'eval': dict()} self.staled_msg_buffer = list() @@ -981,3 +982,7 @@ def callback_funcs_for_metrics(self, message: Message): self.msg_buffer['eval'][round][sender] = content return self.check_and_move_on(check_eval_result=True) + + @classmethod + def get_msg_handler_dict(cls): + return cls().msg_handlers diff --git a/federatedscope/main.py b/federatedscope/main.py index 40d8a0e94..17730426e 100644 --- a/federatedscope/main.py +++ b/federatedscope/main.py @@ -48,4 +48,5 @@ client_class=get_client_cls(init_cfg), config=init_cfg.clone(), client_config=client_cfg) - _ = runner.run() + runner.check() + # _ = runner.run() From fbbc37aca7c69b3177de4c3cdee80e7c3c1f94df Mon Sep 17 00:00:00 2001 From: rayrayraykk <18007356109@163.com> Date: Wed, 28 Sep 2022 19:58:18 +0800 Subject: [PATCH 2/8] fix minor bug --- federatedscope/core/workers/client.py | 6 +++--- federatedscope/core/workers/server.py | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/federatedscope/core/workers/client.py b/federatedscope/core/workers/client.py index ce7f7112c..71c6df744 100644 --- a/federatedscope/core/workers/client.py +++ b/federatedscope/core/workers/client.py @@ -162,7 +162,7 @@ def _calculate_model_delta(self, init_model, updated_model): else: return model_deltas[0] - def register_handlers(self, msg_type, callback_func): + def register_handlers(self, msg_type, callback_func, send_msg=None): """ To bind a message type with a handling function. @@ -171,7 +171,7 @@ def register_handlers(self, msg_type, callback_func): callback_func: The handling functions to handle the received message """ - self.msg_handlers[msg_type] = callback_func + self.msg_handlers[msg_type] = (callback_func, send_msg) def _register_default_handlers(self): self.register_handlers('assign_client_id', @@ -206,7 +206,7 @@ def run(self): while True: msg = self.comm_manager.receive() if self.state <= msg.state: - self.msg_handlers[msg.msg_type](msg) + self.msg_handlers[msg.msg_type][0](msg) if msg.msg_type == 'finish': break diff --git a/federatedscope/core/workers/server.py b/federatedscope/core/workers/server.py index 564a04435..4980abfcb 100644 --- a/federatedscope/core/workers/server.py +++ b/federatedscope/core/workers/server.py @@ -207,7 +207,7 @@ def total_round_num(self, value): def register_noise_injector(self, func): self._noise_injector = func - def register_handlers(self, msg_type, callback_func): + def register_handlers(self, msg_type, callback_func, send_msg=None): """ To bind a message type with a handling function. @@ -216,7 +216,7 @@ def register_handlers(self, msg_type, callback_func): callback_func: The handling functions to handle the received message """ - self.msg_handlers[msg_type] = callback_func + self.msg_handlers[msg_type] = (callback_func, send_msg) def _register_default_handlers(self): self.register_handlers('join_in', self.callback_funcs_for_join_in) @@ -233,7 +233,7 @@ def run(self): # Begin: Broadcast model parameters and start to FL train while self.join_in_client_num < self.client_num: msg = self.comm_manager.receive() - self.msg_handlers[msg.msg_type](msg) + self.msg_handlers[msg.msg_type][0](msg) # Running: listen for message (updates from clients), # aggregate and broadcast feedbacks (aggregated model parameters) @@ -245,7 +245,7 @@ def run(self): while self.state <= self.total_round_num: try: msg = self.comm_manager.receive() - move_on_flag = self.msg_handlers[msg.msg_type](msg) + move_on_flag = self.msg_handlers[msg.msg_type][0](msg) if move_on_flag: time_counter.reset() except TimeoutError: From 0cc6169e852bc84ebcf9b25808d40579566ea04e Mon Sep 17 00:00:00 2001 From: rayrayraykk <18007356109@163.com> Date: Thu, 29 Sep 2022 17:11:40 +0800 Subject: [PATCH 3/8] add check for completeness --- federatedscope/core/configs/config.py | 3 ++ federatedscope/core/fed_runner.py | 67 ++++++++++++++++++++++----- federatedscope/core/workers/client.py | 24 ++++++---- federatedscope/core/workers/server.py | 28 +++++++---- federatedscope/main.py | 3 +- setup.py | 2 +- 6 files changed, 94 insertions(+), 33 deletions(-) diff --git a/federatedscope/core/configs/config.py b/federatedscope/core/configs/config.py index 48b71b421..ade104c5b 100644 --- a/federatedscope/core/configs/config.py +++ b/federatedscope/core/configs/config.py @@ -262,6 +262,9 @@ def init_global_cfg(cfg): # Whether to use GPU cfg.use_gpu = False + # Whether to check the completeness of msg_handler + cfg.check_completeness = False + # Whether to print verbose logging info cfg.verbose = 1 diff --git a/federatedscope/core/fed_runner.py b/federatedscope/core/fed_runner.py index 7ab99831c..b036dfb09 100644 --- a/federatedscope/core/fed_runner.py +++ b/federatedscope/core/fed_runner.py @@ -63,6 +63,9 @@ def __init__(self, self.resource_info = get_resource_info( config.federate.resource_info_file) + # Check the completeness of msg_handler. + self.check() + def setup(self): if self.mode == 'standalone': self.shared_comm_queue = deque() @@ -437,15 +440,55 @@ def check(self): Returns: """ - # try: - client_msg_handler_dict = self.client_class.get_msg_handler_dict() - server_msg_handler_dict = self.server_class.get_msg_handler_dict() - # except: - # logger.warning('Completeness check failed for ' - # 'NotImplementedError.') - # return True - - logger.info(client_msg_handler_dict) - logger.info(server_msg_handler_dict) - # import networkx as nx - # TODO: Check the connectivity of the Digraph + if self.cfg.check_completeness: + try: + import os + import networkx as nx + import matplotlib.pyplot as plt + # Build check graph + G = nx.DiGraph() + flags = {0: 'Client', 1: 'Server'} + msg_handler_dicts = [ + self.client_class.get_msg_handler_dict(), + self.server_class.get_msg_handler_dict() + ] + for flag, msg_handler_dict in zip(flags.keys(), + msg_handler_dicts): + role, oppo = flags[flag], flags[(flag + 1) % 2] + for msg_in, (handler, msgs_out) in \ + msg_handler_dict.items(): + for msg_out in msgs_out: + msg_in_key = f'{oppo}_{msg_in}' + handler_key = f'{role}_{handler}' + msg_out_key = f'{role}_{msg_out}' + G.add_node(msg_in_key, subset=1) + G.add_node(handler_key, subset=0 if flag else 2) + G.add_node(msg_out_key, subset=1) + G.add_edge(msg_in_key, handler_key) + G.add_edge(handler_key, msg_out_key) + pos = nx.multipartite_layout(G) + plt.figure(figsize=(20, 15)) + nx.draw(G, + pos, + with_labels=True, + node_color='white', + node_size=800) + fig_path = os.path.join(self.cfg.outdir, 'msg_handler.png') + plt.savefig(fig_path) + if not nx.is_weakly_connected(G): + if nx.has_path(G, 'Client_join_in', 'Server_finish'): + logger.warning( + f'Completeness check raises warning for ' + f'some handlers not in FL process! Save ' + f'check results in {fig_path}.') + else: + logger.error( + f'Completeness check fails for there is no' + f'path from `join_in` to `finish`! Save ' + f'check results in {fig_path}.') + else: + logger.info(f'Completeness check passes! Save check ' + f'results in {fig_path}.') + except Exception as error: + logger.warning(f'Completeness check failed for {error}!') + return diff --git a/federatedscope/core/workers/client.py b/federatedscope/core/workers/client.py index 71c6df744..1698dd938 100644 --- a/federatedscope/core/workers/client.py +++ b/federatedscope/core/workers/client.py @@ -47,6 +47,7 @@ def __init__(self, **kwargs): # Register message handlers self.msg_handlers = dict() + self.msg_handlers_str = dict() self._register_default_handlers() if config is None: @@ -162,7 +163,7 @@ def _calculate_model_delta(self, init_model, updated_model): else: return model_deltas[0] - def register_handlers(self, msg_type, callback_func, send_msg=None): + def register_handlers(self, msg_type, callback_func, send_msg=[None]): """ To bind a message type with a handling function. @@ -171,19 +172,24 @@ def register_handlers(self, msg_type, callback_func, send_msg=None): callback_func: The handling functions to handle the received message """ - self.msg_handlers[msg_type] = (callback_func, send_msg) + self.msg_handlers[msg_type] = callback_func + self.msg_handlers_str[msg_type] = (callback_func.__name__, send_msg) def _register_default_handlers(self): self.register_handlers('assign_client_id', - self.callback_funcs_for_assign_id) + self.callback_funcs_for_assign_id, [None]) self.register_handlers('ask_for_join_in_info', - self.callback_funcs_for_join_in_info) + self.callback_funcs_for_join_in_info, + ['join_in_info']) self.register_handlers('address', self.callback_funcs_for_address) self.register_handlers('model_para', - self.callback_funcs_for_model_para) + self.callback_funcs_for_model_para, + ['model_para']) self.register_handlers('ss_model_para', - self.callback_funcs_for_model_para) - self.register_handlers('evaluate', self.callback_funcs_for_evaluate) + self.callback_funcs_for_model_para, + ['ss_model_para']) + self.register_handlers('evaluate', self.callback_funcs_for_evaluate, + ['metric']) self.register_handlers('finish', self.callback_funcs_for_finish) self.register_handlers('converged', self.callback_funcs_for_converged) @@ -206,7 +212,7 @@ def run(self): while True: msg = self.comm_manager.receive() if self.state <= msg.state: - self.msg_handlers[msg.msg_type][0](msg) + self.msg_handlers[msg.msg_type](msg) if msg.msg_type == 'finish': break @@ -538,4 +544,4 @@ def callback_funcs_for_converged(self, message: Message): @classmethod def get_msg_handler_dict(cls): - return cls().msg_handlers + return cls().msg_handlers_str diff --git a/federatedscope/core/workers/server.py b/federatedscope/core/workers/server.py index 4980abfcb..e43c282b3 100644 --- a/federatedscope/core/workers/server.py +++ b/federatedscope/core/workers/server.py @@ -52,6 +52,7 @@ def __init__(self, **kwargs): # Register message handlers self.msg_handlers = dict() + self.msg_handlers_str = dict() self._register_default_handlers() if config is None: @@ -207,7 +208,7 @@ def total_round_num(self, value): def register_noise_injector(self, func): self._noise_injector = func - def register_handlers(self, msg_type, callback_func, send_msg=None): + def register_handlers(self, msg_type, callback_func, send_msg=[None]): """ To bind a message type with a handling function. @@ -216,13 +217,22 @@ def register_handlers(self, msg_type, callback_func, send_msg=None): callback_func: The handling functions to handle the received message """ - self.msg_handlers[msg_type] = (callback_func, send_msg) + self.msg_handlers[msg_type] = callback_func + self.msg_handlers_str[msg_type] = (callback_func.__name__, send_msg) def _register_default_handlers(self): - self.register_handlers('join_in', self.callback_funcs_for_join_in) - self.register_handlers('join_in_info', self.callback_funcs_for_join_in) - self.register_handlers('model_para', self.callback_funcs_model_para) - self.register_handlers('metrics', self.callback_funcs_for_metrics) + self.register_handlers('join_in', self.callback_funcs_for_join_in, [ + 'assign_client_id', 'ask_for_join_in_info', 'address', 'model_para' + ]) + self.register_handlers('join_in_info', self.callback_funcs_for_join_in, + [ + 'assign_client_id', 'ask_for_join_in_info', + 'address', 'model_para' + ]) + self.register_handlers('model_para', self.callback_funcs_model_para, + ['model_para', 'finish']) + self.register_handlers('metrics', self.callback_funcs_for_metrics, + ['converged']) def run(self): """ @@ -233,7 +243,7 @@ def run(self): # Begin: Broadcast model parameters and start to FL train while self.join_in_client_num < self.client_num: msg = self.comm_manager.receive() - self.msg_handlers[msg.msg_type][0](msg) + self.msg_handlers[msg.msg_type](msg) # Running: listen for message (updates from clients), # aggregate and broadcast feedbacks (aggregated model parameters) @@ -245,7 +255,7 @@ def run(self): while self.state <= self.total_round_num: try: msg = self.comm_manager.receive() - move_on_flag = self.msg_handlers[msg.msg_type][0](msg) + move_on_flag = self.msg_handlers[msg.msg_type](msg) if move_on_flag: time_counter.reset() except TimeoutError: @@ -985,4 +995,4 @@ def callback_funcs_for_metrics(self, message: Message): @classmethod def get_msg_handler_dict(cls): - return cls().msg_handlers + return cls().msg_handlers_str diff --git a/federatedscope/main.py b/federatedscope/main.py index 17730426e..40d8a0e94 100644 --- a/federatedscope/main.py +++ b/federatedscope/main.py @@ -48,5 +48,4 @@ client_class=get_client_cls(init_cfg), config=init_cfg.clone(), client_config=client_cfg) - runner.check() - # _ = runner.run() + _ = runner.run() diff --git a/setup.py b/setup.py index 4fb9a7407..2ff502220 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ test_requires = [] -dev_requires = test_requires + ['pre-commit'] +dev_requires = test_requires + ['pre-commit', 'networkx', 'matplotlib'] org_requires = ['paramiko==2.11.0', 'celery[redis]', 'cmd2'] From 1eb8b24b906032c5147eb05de6bb9572ef9db6fe Mon Sep 17 00:00:00 2001 From: rayrayraykk <18007356109@163.com> Date: Thu, 29 Sep 2022 17:33:02 +0800 Subject: [PATCH 4/8] minor fix --- federatedscope/core/fed_runner.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/federatedscope/core/fed_runner.py b/federatedscope/core/fed_runner.py index b036dfb09..fc0494a06 100644 --- a/federatedscope/core/fed_runner.py +++ b/federatedscope/core/fed_runner.py @@ -475,20 +475,19 @@ def check(self): node_size=800) fig_path = os.path.join(self.cfg.outdir, 'msg_handler.png') plt.savefig(fig_path) - if not nx.is_weakly_connected(G): - if nx.has_path(G, 'Client_join_in', 'Server_finish'): + if nx.has_path(G, 'Client_join_in', 'Server_finish'): + if nx.is_weakly_connected(G): + logger.info(f'Completeness check passes! Save check ' + f'results in {fig_path}.') + else: logger.warning( f'Completeness check raises warning for ' f'some handlers not in FL process! Save ' f'check results in {fig_path}.') - else: - logger.error( - f'Completeness check fails for there is no' - f'path from `join_in` to `finish`! Save ' - f'check results in {fig_path}.') else: - logger.info(f'Completeness check passes! Save check ' - f'results in {fig_path}.') + logger.error(f'Completeness check fails for there is no' + f'path from `join_in` to `finish`! Save ' + f'check results in {fig_path}.') except Exception as error: logger.warning(f'Completeness check failed for {error}!') return From ac1f81643f8f789bb27a79a965dfa8027157ce72 Mon Sep 17 00:00:00 2001 From: rayrayraykk <18007356109@163.com> Date: Sat, 8 Oct 2022 15:34:13 +0800 Subject: [PATCH 5/8] avoid indent --- federatedscope/core/fed_runner.py | 101 +++++++++++++++--------------- 1 file changed, 50 insertions(+), 51 deletions(-) diff --git a/federatedscope/core/fed_runner.py b/federatedscope/core/fed_runner.py index fc0494a06..caa525639 100644 --- a/federatedscope/core/fed_runner.py +++ b/federatedscope/core/fed_runner.py @@ -437,57 +437,56 @@ def check(self): """ Check the completeness of Server and Client. - Returns: - """ - if self.cfg.check_completeness: - try: - import os - import networkx as nx - import matplotlib.pyplot as plt - # Build check graph - G = nx.DiGraph() - flags = {0: 'Client', 1: 'Server'} - msg_handler_dicts = [ - self.client_class.get_msg_handler_dict(), - self.server_class.get_msg_handler_dict() - ] - for flag, msg_handler_dict in zip(flags.keys(), - msg_handler_dicts): - role, oppo = flags[flag], flags[(flag + 1) % 2] - for msg_in, (handler, msgs_out) in \ - msg_handler_dict.items(): - for msg_out in msgs_out: - msg_in_key = f'{oppo}_{msg_in}' - handler_key = f'{role}_{handler}' - msg_out_key = f'{role}_{msg_out}' - G.add_node(msg_in_key, subset=1) - G.add_node(handler_key, subset=0 if flag else 2) - G.add_node(msg_out_key, subset=1) - G.add_edge(msg_in_key, handler_key) - G.add_edge(handler_key, msg_out_key) - pos = nx.multipartite_layout(G) - plt.figure(figsize=(20, 15)) - nx.draw(G, - pos, - with_labels=True, - node_color='white', - node_size=800) - fig_path = os.path.join(self.cfg.outdir, 'msg_handler.png') - plt.savefig(fig_path) - if nx.has_path(G, 'Client_join_in', 'Server_finish'): - if nx.is_weakly_connected(G): - logger.info(f'Completeness check passes! Save check ' - f'results in {fig_path}.') - else: - logger.warning( - f'Completeness check raises warning for ' - f'some handlers not in FL process! Save ' - f'check results in {fig_path}.') + if not self.cfg.check_completeness: + return + try: + import os + import networkx as nx + import matplotlib.pyplot as plt + # Build check graph + G = nx.DiGraph() + flags = {0: 'Client', 1: 'Server'} + msg_handler_dicts = [ + self.client_class.get_msg_handler_dict(), + self.server_class.get_msg_handler_dict() + ] + for flag, msg_handler_dict in zip(flags.keys(), + msg_handler_dicts): + role, oppo = flags[flag], flags[(flag + 1) % 2] + for msg_in, (handler, msgs_out) in \ + msg_handler_dict.items(): + for msg_out in msgs_out: + msg_in_key = f'{oppo}_{msg_in}' + handler_key = f'{role}_{handler}' + msg_out_key = f'{role}_{msg_out}' + G.add_node(msg_in_key, subset=1) + G.add_node(handler_key, subset=0 if flag else 2) + G.add_node(msg_out_key, subset=1) + G.add_edge(msg_in_key, handler_key) + G.add_edge(handler_key, msg_out_key) + pos = nx.multipartite_layout(G) + plt.figure(figsize=(20, 15)) + nx.draw(G, + pos, + with_labels=True, + node_color='white', + node_size=800) + fig_path = os.path.join(self.cfg.outdir, 'msg_handler.png') + plt.savefig(fig_path) + if nx.has_path(G, 'Client_join_in', 'Server_finish'): + if nx.is_weakly_connected(G): + logger.info(f'Completeness check passes! Save check ' + f'results in {fig_path}.') else: - logger.error(f'Completeness check fails for there is no' - f'path from `join_in` to `finish`! Save ' - f'check results in {fig_path}.') - except Exception as error: - logger.warning(f'Completeness check failed for {error}!') + logger.warning( + f'Completeness check raises warning for ' + f'some handlers not in FL process! Save ' + f'check results in {fig_path}.') + else: + logger.error(f'Completeness check fails for there is no' + f'path from `join_in` to `finish`! Save ' + f'check results in {fig_path}.') + except Exception as error: + logger.warning(f'Completeness check failed for {error}!') return From 53330d340f6ed30235a52d0a526ef3900dbcca07 Mon Sep 17 00:00:00 2001 From: rayrayraykk <18007356109@163.com> Date: Sat, 8 Oct 2022 15:34:57 +0800 Subject: [PATCH 6/8] avoid indent --- federatedscope/core/fed_runner.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/federatedscope/core/fed_runner.py b/federatedscope/core/fed_runner.py index caa525639..61f8dda86 100644 --- a/federatedscope/core/fed_runner.py +++ b/federatedscope/core/fed_runner.py @@ -451,8 +451,7 @@ def check(self): self.client_class.get_msg_handler_dict(), self.server_class.get_msg_handler_dict() ] - for flag, msg_handler_dict in zip(flags.keys(), - msg_handler_dicts): + for flag, msg_handler_dict in zip(flags.keys(), msg_handler_dicts): role, oppo = flags[flag], flags[(flag + 1) % 2] for msg_in, (handler, msgs_out) in \ msg_handler_dict.items(): @@ -479,10 +478,9 @@ def check(self): logger.info(f'Completeness check passes! Save check ' f'results in {fig_path}.') else: - logger.warning( - f'Completeness check raises warning for ' - f'some handlers not in FL process! Save ' - f'check results in {fig_path}.') + logger.warning(f'Completeness check raises warning for ' + f'some handlers not in FL process! Save ' + f'check results in {fig_path}.') else: logger.error(f'Completeness check fails for there is no' f'path from `join_in` to `finish`! Save ' From e48ae8dc9d64c6405ebc6264150f092ca5866afe Mon Sep 17 00:00:00 2001 From: rayrayraykk <18007356109@163.com> Date: Mon, 10 Oct 2022 16:58:13 +0800 Subject: [PATCH 7/8] fix send_msg type --- federatedscope/contrib/worker/example.py | 30 +++++++++++++++++++----- federatedscope/core/fed_runner.py | 5 +++- federatedscope/core/workers/client.py | 6 ++--- federatedscope/core/workers/server.py | 7 ++---- 4 files changed, 33 insertions(+), 15 deletions(-) diff --git a/federatedscope/contrib/worker/example.py b/federatedscope/contrib/worker/example.py index 7f0ed2e7e..e55b66f1c 100644 --- a/federatedscope/contrib/worker/example.py +++ b/federatedscope/contrib/worker/example.py @@ -3,18 +3,36 @@ # Build your worker here. -class MyClient(Client): - pass +class MyServer(Server): + def _register_default_handlers(self): + self.register_handlers('join_in', self.callback_funcs_for_join_in, + ['assign_client_id', 'address', 'model_para']) + self.register_handlers('join_in_info', self.callback_funcs_for_join_in, + ['address', 'model_para']) + self.register_handlers('model_para', self.callback_funcs_model_para, + ['model_para', 'evaluate', 'finish']) + self.register_handlers('metrics', self.callback_funcs_for_metrics, + ['converged']) -class MyServer(Server): - pass +class MyClient(Client): + def _register_default_handlers(self): + self.register_handlers('assign_client_id', + self.callback_funcs_for_assign_id, [None]) + self.register_handlers('address', self.callback_funcs_for_address) + self.register_handlers('model_para', + self.callback_funcs_for_model_para, + ['model_para', 'ss_model_para']) + self.register_handlers('evaluate', self.callback_funcs_for_evaluate, + ['metrics']) + self.register_handlers('finish', self.callback_funcs_for_finish) + self.register_handlers('converged', self.callback_funcs_for_converged) def call_my_worker(method): - if method == 'mymethod': + if method == 'myfedavg': worker_builder = {'client': MyClient, 'server': MyServer} return worker_builder -register_worker('mymethod', call_my_worker) +register_worker('myfedavg', call_my_worker) diff --git a/federatedscope/core/fed_runner.py b/federatedscope/core/fed_runner.py index 61f8dda86..770743c5c 100644 --- a/federatedscope/core/fed_runner.py +++ b/federatedscope/core/fed_runner.py @@ -470,7 +470,10 @@ def check(self): pos, with_labels=True, node_color='white', - node_size=800) + node_size=800, + width=1.0, + arrowsize=25, + arrowstyle='->') fig_path = os.path.join(self.cfg.outdir, 'msg_handler.png') plt.savefig(fig_path) if nx.has_path(G, 'Client_join_in', 'Server_finish'): diff --git a/federatedscope/core/workers/client.py b/federatedscope/core/workers/client.py index 1698dd938..cb90b16b5 100644 --- a/federatedscope/core/workers/client.py +++ b/federatedscope/core/workers/client.py @@ -184,12 +184,12 @@ def _register_default_handlers(self): self.register_handlers('address', self.callback_funcs_for_address) self.register_handlers('model_para', self.callback_funcs_for_model_para, - ['model_para']) + ['model_para', 'ss_model_para']) self.register_handlers('ss_model_para', self.callback_funcs_for_model_para, - ['ss_model_para']) + ['ss_model_para', 'model_para']) self.register_handlers('evaluate', self.callback_funcs_for_evaluate, - ['metric']) + ['metrics']) self.register_handlers('finish', self.callback_funcs_for_finish) self.register_handlers('converged', self.callback_funcs_for_converged) diff --git a/federatedscope/core/workers/server.py b/federatedscope/core/workers/server.py index e43c282b3..0569ccf84 100644 --- a/federatedscope/core/workers/server.py +++ b/federatedscope/core/workers/server.py @@ -225,12 +225,9 @@ def _register_default_handlers(self): 'assign_client_id', 'ask_for_join_in_info', 'address', 'model_para' ]) self.register_handlers('join_in_info', self.callback_funcs_for_join_in, - [ - 'assign_client_id', 'ask_for_join_in_info', - 'address', 'model_para' - ]) + ['address', 'model_para']) self.register_handlers('model_para', self.callback_funcs_model_para, - ['model_para', 'finish']) + ['model_para', 'evaluate', 'finish']) self.register_handlers('metrics', self.callback_funcs_for_metrics, ['converged']) From 7df38c9c6036ee26fe827f98f422ca14b6ca348c Mon Sep 17 00:00:00 2001 From: rayrayraykk <18007356109@163.com> Date: Thu, 24 Nov 2022 18:14:03 +0800 Subject: [PATCH 8/8] add time for test --- .github/workflows/test_distribute.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test_distribute.yml b/.github/workflows/test_distribute.yml index 957c241fc..c3bee4b77 100644 --- a/.github/workflows/test_distribute.yml +++ b/.github/workflows/test_distribute.yml @@ -5,7 +5,7 @@ on: [push, pull_request] jobs: run: runs-on: ${{ matrix.os }} - timeout-minutes: 10 + timeout-minutes: 20 strategy: matrix: os: [ubuntu-latest]