Skip to content

Commit

Permalink
Enhanced monitor with system metrics logging and wandb support (#90)
Browse files Browse the repository at this point in the history
* added system metrics to be monitored
  • Loading branch information
yxdyc authored May 24, 2022
1 parent e625a0d commit e9b4b38
Show file tree
Hide file tree
Showing 30 changed files with 823 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,12 @@ RUN conda install -y numpy=1.21.2 scikit-learn=1.0.2 scipy=1.7.3 pandas=1.4.1 -c
RUN conda install -y pytorch=1.10.1 torchvision=0.11.2 torchaudio=0.10.1 cudatoolkit=11.3 -c pytorch -c conda-forge \
&& conda clean -a -y

# torch helper package
RUN conda install -y fvcore iopath -c fvcore -c iopath -c conda-forge \
&& conda clean -a -y

# auxiliaries (communications, monitoring, etc.)
RUN conda install -y wandb tensorboard tensorboardX -c conda-forge \
RUN conda install -y wandb tensorboard tensorboardX pympler -c conda-forge \
&& pip install grpcio grpcio-tools protobuf==3.19.4 setuptools==61.2.0 \
&& conda clean -a -y

Expand Down
6 changes: 5 additions & 1 deletion enviroment/docker_files/federatedscope-torch1.10.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ RUN conda install -y numpy=1.21.2 scikit-learn=1.0.2 scipy=1.7.3 pandas=1.4.1 -c
RUN conda install -y pytorch=1.10.1 torchvision=0.11.2 torchaudio=0.10.1 cudatoolkit=11.3 -c pytorch -c conda-forge \
&& conda clean -a -y

# torch helper package
RUN conda install -y fvcore iopath -c fvcore -c iopath -c conda-forge \
&& conda clean -a -y

# auxiliaries (communications, monitoring, etc.)
RUN conda install -y wandb tensorboard tensorboardX -c conda-forge \
RUN conda install -y wandb tensorboard tensorboardX pympler -c conda-forge \
&& pip install grpcio grpcio-tools protobuf==3.19.4 setuptools==61.2.0 \
&& conda clean -a -y
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ RUN conda install -y numpy=1.21.2 scikit-learn=1.0.2 scipy=1.7.3 pandas=1.4.1 -c
RUN conda install -y pytorch==1.8.0 torchvision==0.9.0 torchaudio==0.8.0 cudatoolkit=10.2 -c pytorch \
&& conda clean -a -y

# torch helper package
RUN conda install -y fvcore iopath -c fvcore -c iopath -c conda-forge \
&& conda clean -a -y

# for graph
RUN conda install -y pyg==2.0.1 -c pyg \
&& conda install -y rdkit=2021.09.4 -c conda-forge \
Expand All @@ -52,6 +56,6 @@ RUN conda install -y sentencepiece textgrid typeguard -c conda-forge \
&& conda clean -a -y

# auxiliaries (communications, monitoring, etc.)
RUN conda install -y wandb tensorboard tensorboardX -c conda-forge \
RUN conda install -y wandb tensorboard tensorboardX pympler -c conda-forge \
&& pip install grpcio grpcio-tools protobuf==3.19.4 setuptools==61.2.0 \
&& conda clean -a -y
3 changes: 3 additions & 0 deletions enviroment/requirements-torch1.10-application.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,7 @@ transformers==4.16.2
tokenizers==0.10.3
torchtext
datasets
fvcore
pympler
iopath

5 changes: 4 additions & 1 deletion enviroment/requirements-torch1.10.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,7 @@ tensorboardX
grpcio
grpcio-tools
protobuf==3.19.4
setuptools==61.2.0
setuptools==61.2.0
fvcore
pympler
iopath
3 changes: 3 additions & 0 deletions enviroment/requirements-torch1.8-application.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,8 @@ transformers==4.16.2
tokenizers==0.10.3
torchtext
datasets
fvcore
pympler
iopath


15 changes: 10 additions & 5 deletions federatedscope/core/auxiliaries/trainer_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,25 @@ def get_trainer(model=None,
device=None,
config=None,
only_for_eval=False,
is_attacker=False):
is_attacker=False,
monitor=None):
if config.trainer.type == 'general':
if config.backend == 'torch':
from federatedscope.core.trainers import GeneralTorchTrainer
trainer = GeneralTorchTrainer(model=model,
data=data,
device=device,
config=config,
only_for_eval=only_for_eval)
only_for_eval=only_for_eval,
monitor=monitor)
elif config.backend == 'tensorflow':
from federatedscope.core.trainers.tf_trainer import GeneralTFTrainer
trainer = GeneralTFTrainer(model=model,
data=data,
device=device,
config=config,
only_for_eval=only_for_eval)
only_for_eval=only_for_eval,
monitor=monitor)
else:
raise ValueError
elif config.trainer.type == 'none':
Expand Down Expand Up @@ -78,7 +81,8 @@ def get_trainer(model=None,
data=data,
device=device,
config=config,
only_for_eval=only_for_eval)
only_for_eval=only_for_eval,
monitor=monitor)
else:
# try to find user registered trainer
trainer = None
Expand All @@ -89,7 +93,8 @@ def get_trainer(model=None,
data=data,
device=device,
config=config,
only_for_eval=only_for_eval)
only_for_eval=only_for_eval,
monitor=monitor)
if trainer is None:
raise ValueError('Trainer {} is not provided'.format(
config.trainer.type))
Expand Down
106 changes: 104 additions & 2 deletions federatedscope/core/auxiliaries/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import copy
import json
import logging
import math
import os
Expand Down Expand Up @@ -61,6 +63,8 @@ def update_logger(cfg, clear_before_add=False):
# ================ create outdir to save log, exp_config, models, etc,.
if cfg.outdir == "":
cfg.outdir = os.path.join(os.getcwd(), "exp")
if cfg.expname == "":
cfg.expname = f"{cfg.federate.method}_{cfg.model.type}_on_{cfg.data.type}"
cfg.outdir = os.path.join(cfg.outdir, cfg.expname)

# if exist, make directory with given name and time
Expand All @@ -84,10 +88,37 @@ def update_logger(cfg, clear_before_add=False):
"%(asctime)s (%(module)s:%(lineno)d) %(levelname)s: %(message)s")
fh.setFormatter(logger_formatter)
root_logger.addHandler(fh)
#sys.stderr = sys.stdout
# sys.stderr = sys.stdout

root_logger.info(f"the output dir is {cfg.outdir}")

if cfg.wandb.use:
init_wandb(cfg)


def init_wandb(cfg):
try:
import wandb
except ImportError:
logger.error("cfg.wandb.use=True but not install the wandb package")
exit()
dataset_name = cfg.data.type
method_name = cfg.federate.method
exp_name = cfg.expname

tmp_cfg = copy.deepcopy(cfg)
tmp_cfg.cfg_check_funcs = []
import yaml
cfg_yaml = yaml.safe_load(tmp_cfg.dump())

wandb.init(project=cfg.wandb.name_project,
entity=cfg.wandb.name_user,
config=cfg_yaml,
group=dataset_name,
job_type=method_name,
name=exp_name,
notes=f"{method_name}, {exp_name}")


def get_dataset(type, root, transform, target_transform, download=True):
if isinstance(type, str):
Expand Down Expand Up @@ -158,7 +189,6 @@ def get_random(type, sample_shape, params, device):


def batch_iter(data, batch_size=64, shuffled=True):

assert 'x' in data and 'y' in data
data_x = data['x']
data_y = data['y']
Expand Down Expand Up @@ -264,3 +294,75 @@ def block(self):

def exceed_max_failure(self, num_failure):
return num_failure > self.max_failure


def logfile_2_wandb_dict(exp_log_f, raw_out=True):
"""
parse the logfiles [exp_print.log, eval_results.log] into wandb_dict that contains non-nested dicts
:param exp_log_f: opened exp_log file
:param raw_out: True indicates "exp_print.log", otherwise indicates "eval_results.log",
the difference is whether contains the logger header such as "2022-05-02 16:55:02,843 (client:197) INFO:"
:return: tuple including (all_log_res, exp_stop_normal, last_line, log_res_best)
"""
log_res_best = {}
exp_stop_normal = False
all_log_res = []
last_line = None
for line in exp_log_f:
last_line = line
if " Find new best result" in line:
# e.g.,
# 2022-03-22 10:48:42,562 (server:459) INFO: Find new best result for client_individual.test_acc with value 0.5911787974683544
parse_res = line.split("INFO: ")[1].split("with value")
best_key, best_val = parse_res[-2], parse_res[-1]
# client_individual.test_acc -> client_individual/test_acc
best_key = best_key.replace("Find new best result for",
"").replace(".", "/")
log_res_best[best_key.strip()] = float(best_val.strip())

if "'Role': 'Server #'" in line:
if raw_out:
line = line.split("INFO: ")[1]
res = line.replace("\'", "\"")
res = json.loads(s=res)
if res['Role'] == 'Server #':
cur_round = res['Round']
res.pop('Role')
if cur_round != "Final" and 'Results_raw' in res:
res.pop('Results_raw')

log_res = {}
for key, val in res.items():
if not isinstance(val, dict):
log_res[key] = val
else:
if cur_round != "Final":
for key_inner, val_inner in val.items():
assert not isinstance(
val_inner, dict), "Un-expected log format"
log_res[f"{key}/{key_inner}"] = val_inner

else:
exp_stop_normal = True
if key == "Results_raw":
for final_type, final_type_dict in res[
"Results_raw"].items():
for inner_key, inner_val in final_type_dict.items(
):
log_res_best[
f"{final_type}/{inner_key}"] = inner_val
# log_res_best = {}
# for best_res_type, val_dict in val.items():
# for key_inner, val_inner in val_dict.items():
# assert not isinstance(val_inner, dict), "Un-expected log format"
# log_res_best[f"{best_res_type}/{key_inner}"] = val_inner
# if log_res_best is not None and "Results_weighted_avg/val_loss" in log_res and \
# log_res_best["client_summarized_weighted_avg/val_loss"] > \
# log_res["Results_weighted_avg/val_loss"]:
# print("Missing the results of last round, update best results")
# for key, val in log_res.items():
# log_res_best[key.replace("Results", "client_summarized")] = val
all_log_res.append(log_res)
return all_log_res, exp_stop_normal, last_line, log_res_best
6 changes: 5 additions & 1 deletion federatedscope/core/communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ class StandaloneCommManager(object):
"""
The communicator used for standalone mode
"""
def __init__(self, comm_queue):
def __init__(self, comm_queue, monitor=None):
self.comm_queue = comm_queue
self.neighbors = dict()
self.monitor = monitor # used to track the communication related metrics

def receive(self):
# we don't need receive() in standalone
Expand All @@ -37,6 +38,8 @@ def get_neighbors(self, neighbor_id=None):

def send(self, message):
self.comm_queue.append(message)
download_bytes, upload_bytes = message.count_bytes()
self.monitor.track_upload_bytes(upload_bytes)


class gRPCCommManager(object):
Expand All @@ -60,6 +63,7 @@ def __init__(self, host='0.0.0.0', port='50050', client_num=2):
port=port,
options=options)
self.neighbors = dict()
self.monitor = None # used to track the communication related metrics

def serve(self, max_workers, host, port, options):
"""
Expand Down
2 changes: 0 additions & 2 deletions federatedscope/core/configs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
if "config" in all_sub_configs:
all_sub_configs.remove('config')


from federatedscope.core.configs.config import CN, init_global_cfg
__all__ = __all__ + \
[
Expand All @@ -28,4 +27,3 @@
for base_config in base_configs:
all_sub_configs.pop(all_sub_configs.index(base_config))
all_sub_configs.insert(0, base_config)

6 changes: 4 additions & 2 deletions federatedscope/core/configs/cfg_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ def extend_data_cfg(cfg):
cfg.data.args = [] # args for external dataset, eg. [{'download': True}]
cfg.data.splitter = ''
cfg.data.splitter_args = [] # args for splitter, eg. [{'alpha': 0.5}]
cfg.data.transform = [] # transform for x, eg. [['ToTensor'], ['Normalize', {'mean': [0.1307], 'std': [0.3081]}]]
cfg.data.transform = [
] # transform for x, eg. [['ToTensor'], ['Normalize', {'mean': [0.1307], 'std': [0.3081]}]]
cfg.data.target_transform = [] # target_transform for y, use as above
cfg.data.pre_transform = [] # pre_transform for `torch_geometric` dataset, use as above
cfg.data.pre_transform = [
] # pre_transform for `torch_geometric` dataset, use as above
cfg.data.batch_size = 64
cfg.data.drop_last = False
cfg.data.sizes = [10, 5]
Expand Down
21 changes: 20 additions & 1 deletion federatedscope/core/configs/config.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import copy
import logging
import os

from yacs.config import CfgNode

import federatedscope.register as register

logger = logging.getLogger(__name__)


class CN(CfgNode):
"""
Expand Down Expand Up @@ -83,7 +86,9 @@ def clean_unused_sub_cfgs(self):

def freeze(self):
"""
make the cfg attributes immutable, and save the freezed cfg_check_funcs into "self.outdir/config.yaml" for better reproducibility
1) make the cfg attributes immutable;
2) save the frozen cfg_check_funcs into "self.outdir/config.yaml" for better reproducibility;
3) if self.wandb.use=True, update the frozen config
:return:
"""
Expand All @@ -96,6 +101,20 @@ def freeze(self):
tmp_cfg = copy.deepcopy(self)
tmp_cfg.cfg_check_funcs = []
print(tmp_cfg.dump())
if self.wandb.use:
# update the frozen config
try:
import wandb
except ImportError:
logger.error(
"cfg.wandb.use=True but not install the wandb package")
exit()

import yaml
cfg_yaml = yaml.safe_load(tmp_cfg.dump())
wandb.config.update(cfg_yaml, allow_val_change=True)

logger.info("the used configs are: \n" + str(tmp_cfg))

super(CN, self).freeze()

Expand Down
6 changes: 5 additions & 1 deletion federatedscope/core/fed_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def is_broadcast(msg):
msg = self.shared_comm_queue.popleft()
self._handle_msg(msg)

self.server._monitor.compress_raw_res_file()
self.server._monitor.finish_fed_runner(fl_mode=self.mode)

return self.server.best_results

Expand Down Expand Up @@ -245,10 +245,14 @@ def _handle_msg(self, msg, rcv=-1):
return

sender, receiver = msg.sender, msg.receiver
download_bytes, upload_bytes = msg.count_bytes()
if not isinstance(receiver, list):
receiver = [receiver]
for each_receiver in receiver:
if each_receiver == 0:
self.server.msg_handlers[msg.msg_type](msg)
self.server._monitor.track_download_bytes(download_bytes)
else:
self.client[each_receiver].msg_handlers[msg.msg_type](msg)
self.client[each_receiver]._monitor.track_download_bytes(
download_bytes)
Loading

0 comments on commit e9b4b38

Please sign in to comment.