From 29bcace002134d29600820a6dba7cd15b8f66559 Mon Sep 17 00:00:00 2001 From: Chaoyang He Date: Mon, 22 Aug 2022 13:13:57 -0700 Subject: [PATCH 01/13] Update README.md --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 3018245c5c..23382088ff 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,10 @@ FedML Blog: https://medium.com/@FedML \ FedML Research: https://fedml.ai/research-papers/ \ FedML Product Overview: https://medium.com/@FedML/fedml-ai-platform-releases-the-worlds-federated-learning-open-platform-on-public-cloud-with-an-8024e68a70b6 +Join the Community: \ +Slack: https://join.slack.com/t/fedml/shared_invite/zt-havwx1ee-a1xfOUrATNfc9DFqU~r34w \ +Discord: https://discord.gg/9xkW8ae6RV + [//]: # () [//]: # (

) From 7fcc52e31cbbc1a3090d5d8ef4e2938b6dec51ca Mon Sep 17 00:00:00 2001 From: Ray Sun Date: Tue, 23 Aug 2022 14:35:02 +0100 Subject: [PATCH 02/13] Update datasets.py remove unnecessary if --- python/fedml/data/cifar100/datasets.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/python/fedml/data/cifar100/datasets.py b/python/fedml/data/cifar100/datasets.py index 54e4a9734a..82c283589e 100644 --- a/python/fedml/data/cifar100/datasets.py +++ b/python/fedml/data/cifar100/datasets.py @@ -70,14 +70,8 @@ def __build_truncated_dataset__(self): self.root, self.train, self.transform, self.target_transform, self.download ) - if self.train: - # print("train member of the class: {}".format(self.train)) - # data = cifar_dataobj.train_data - data = cifar_dataobj.data - target = np.array(cifar_dataobj.targets) - else: - data = cifar_dataobj.data - target = np.array(cifar_dataobj.targets) + data = cifar_dataobj.data + target = np.array(cifar_dataobj.targets) if self.dataidxs is not None: data = data[self.dataidxs] From e529ced1cedf32eee468e58391c9bcfb9a39b567 Mon Sep 17 00:00:00 2001 From: Ray Sun Date: Wed, 24 Aug 2022 11:10:09 +0100 Subject: [PATCH 03/13] Update requirements.txt remove loguru --- devops/scripts/requirements.txt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/devops/scripts/requirements.txt b/devops/scripts/requirements.txt index 1de7b3f694..132b809fee 100644 --- a/devops/scripts/requirements.txt +++ b/devops/scripts/requirements.txt @@ -1,8 +1,7 @@ protobuf grpcio grpcio-tools -loguru dill multiprocess nvidia-ml-py3 -matplotlib \ No newline at end of file +matplotlib From 9149455609611037c27880d691ad9ee2d59b3aed Mon Sep 17 00:00:00 2001 From: Ray Sun Date: Wed, 24 Aug 2022 11:11:55 +0100 Subject: [PATCH 04/13] Update attack_defense_data_loader.py remove loguru --- python/fedml/core/security/common/attack_defense_data_loader.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/fedml/core/security/common/attack_defense_data_loader.py b/python/fedml/core/security/common/attack_defense_data_loader.py index 796cce87b2..e2dcf88dd4 100644 --- a/python/fedml/core/security/common/attack_defense_data_loader.py +++ b/python/fedml/core/security/common/attack_defense_data_loader.py @@ -50,7 +50,6 @@ def load_data_loader_from_file(cls, filename): """ Loads DataLoader object from a file if available. - :param logger: loguru.Logger :param filename: string """ print("Loading data loader from file: {}".format(filename)) From 48986e135b27584f489289036e394e708791b2d8 Mon Sep 17 00:00:00 2001 From: ranyide Date: Sun, 28 Aug 2022 22:28:29 +0800 Subject: [PATCH 05/13] bugfix synthetic data url split data --- .../torch_client.py | 111 +++++++++--------- .../torch_server.py | 87 +++++++------- python/fedml/data/cifar10/efficient_loader.py | 18 ++- python/fedml/data/data_loader.py | 59 ++++------ 4 files changed, 136 insertions(+), 139 deletions(-) diff --git a/python/examples/cross_silo/mqtt_s3_fedavg_cifar10_lr_example/torch_client.py b/python/examples/cross_silo/mqtt_s3_fedavg_cifar10_lr_example/torch_client.py index 82b416832f..44a8e08b93 100644 --- a/python/examples/cross_silo/mqtt_s3_fedavg_cifar10_lr_example/torch_client.py +++ b/python/examples/cross_silo/mqtt_s3_fedavg_cifar10_lr_example/torch_client.py @@ -3,59 +3,60 @@ import fedml from fedml import FedMLRunner from fedml.data.MNIST.data_loader import download_mnist, load_partition_data_mnist -from .trainer.classification_aggregator import ClassificationAggregator -from .trainer.classification_trainer import ClassificationTrainer +from trainer.classification_aggregator import ClassificationAggregator +from trainer.classification_trainer import ClassificationTrainer +from fedml.data.data_loader import load -def load_data(args): - download_mnist(args.data_cache_dir) - fedml.logging.info("load_data. dataset_name = %s" % args.dataset) - - """ - Please read through the data loader at to see how to customize the dataset for FedML framework. - """ - ( - client_num, - train_data_num, - test_data_num, - train_data_global, - test_data_global, - train_data_local_num_dict, - train_data_local_dict, - test_data_local_dict, - class_num, - ) = load_partition_data_mnist( - args, - args.batch_size, - train_path=args.data_cache_dir + "/MNIST/train", - test_path=args.data_cache_dir + "/MNIST/test", - ) - """ - For shallow NN or linear models, - we uniformly sample a fraction of clients each round (as the original FedAvg paper) - """ - args.client_num_in_total = client_num - dataset = [ - train_data_num, - test_data_num, - train_data_global, - test_data_global, - train_data_local_num_dict, - train_data_local_dict, - test_data_local_dict, - class_num, - ] - return dataset, class_num - - -class LogisticRegression(torch.nn.Module): - def __init__(self, input_dim, output_dim): - super(LogisticRegression, self).__init__() - self.linear = torch.nn.Linear(input_dim, output_dim) - - def forward(self, x): - outputs = torch.sigmoid(self.linear(x)) - return outputs +# def load_data(args): +# download_mnist(args.data_cache_dir) +# fedml.logging.info("load_data. dataset_name = %s" % args.dataset) +# +# """ +# Please read through the data loader at to see how to customize the dataset for FedML framework. +# """ +# ( +# client_num, +# train_data_num, +# test_data_num, +# train_data_global, +# test_data_global, +# train_data_local_num_dict, +# train_data_local_dict, +# test_data_local_dict, +# class_num, +# ) = load_partition_data_mnist( +# args, +# args.batch_size, +# train_path=args.data_cache_dir + "/MNIST/train", +# test_path=args.data_cache_dir + "/MNIST/test", +# ) +# """ +# For shallow NN or linear models, +# we uniformly sample a fraction of clients each round (as the original FedAvg paper) +# """ +# args.client_num_in_total = client_num +# dataset = [ +# train_data_num, +# test_data_num, +# train_data_global, +# test_data_global, +# train_data_local_num_dict, +# train_data_local_dict, +# test_data_local_dict, +# class_num, +# ] +# return dataset, class_num +# +# +# class LogisticRegression(torch.nn.Module): +# def __init__(self, input_dim, output_dim): +# super(LogisticRegression, self).__init__() +# self.linear = torch.nn.Linear(input_dim, output_dim) +# +# def forward(self, x): +# outputs = torch.sigmoid(self.linear(x)) +# return outputs if __name__ == "__main__": @@ -66,13 +67,13 @@ def forward(self, x): device = fedml.device.get_device(args) # load data - dataset, class_num = load_data(args) + dataset, class_num = load(args) # create model and trainer model = fedml.model.create(args, output_dim=class_num) - trainer = ClassificationTrainer(model=model, args=args) - aggregator = ClassificationAggregator(model=model, args=args) + # trainer = ClassificationTrainer(model=model, args=args) + # aggregator = ClassificationAggregator(model=model, args=args) # start training - fedml_runner = FedMLRunner(args, device, dataset, model, trainer, aggregator) + fedml_runner = FedMLRunner(args, device, dataset, model) fedml_runner.run() diff --git a/python/examples/cross_silo/mqtt_s3_fedavg_cifar10_lr_example/torch_server.py b/python/examples/cross_silo/mqtt_s3_fedavg_cifar10_lr_example/torch_server.py index 82b416832f..294e04f1d6 100644 --- a/python/examples/cross_silo/mqtt_s3_fedavg_cifar10_lr_example/torch_server.py +++ b/python/examples/cross_silo/mqtt_s3_fedavg_cifar10_lr_example/torch_server.py @@ -2,50 +2,51 @@ import fedml from fedml import FedMLRunner -from fedml.data.MNIST.data_loader import download_mnist, load_partition_data_mnist -from .trainer.classification_aggregator import ClassificationAggregator -from .trainer.classification_trainer import ClassificationTrainer +# from fedml.data.MNIST.data_loader import download_mnist, load_partition_data_mnist +from trainer.classification_aggregator import ClassificationAggregator +from trainer.classification_trainer import ClassificationTrainer +from fedml.data.data_loader import load -def load_data(args): - download_mnist(args.data_cache_dir) - fedml.logging.info("load_data. dataset_name = %s" % args.dataset) - - """ - Please read through the data loader at to see how to customize the dataset for FedML framework. - """ - ( - client_num, - train_data_num, - test_data_num, - train_data_global, - test_data_global, - train_data_local_num_dict, - train_data_local_dict, - test_data_local_dict, - class_num, - ) = load_partition_data_mnist( - args, - args.batch_size, - train_path=args.data_cache_dir + "/MNIST/train", - test_path=args.data_cache_dir + "/MNIST/test", - ) - """ - For shallow NN or linear models, - we uniformly sample a fraction of clients each round (as the original FedAvg paper) - """ - args.client_num_in_total = client_num - dataset = [ - train_data_num, - test_data_num, - train_data_global, - test_data_global, - train_data_local_num_dict, - train_data_local_dict, - test_data_local_dict, - class_num, - ] - return dataset, class_num +# def load_data(args): +# download_mnist(args.data_cache_dir) +# fedml.logging.info("load_data. dataset_name = %s" % args.dataset) +# +# """ +# Please read through the data loader at to see how to customize the dataset for FedML framework. +# """ +# ( +# client_num, +# train_data_num, +# test_data_num, +# train_data_global, +# test_data_global, +# train_data_local_num_dict, +# train_data_local_dict, +# test_data_local_dict, +# class_num, +# ) = load_partition_data_mnist( +# args, +# args.batch_size, +# train_path=args.data_cache_dir + "/MNIST/train", +# test_path=args.data_cache_dir + "/MNIST/test", +# ) +# """ +# For shallow NN or linear models, +# we uniformly sample a fraction of clients each round (as the original FedAvg paper) +# """ +# args.client_num_in_total = client_num +# dataset = [ +# train_data_num, +# test_data_num, +# train_data_global, +# test_data_global, +# train_data_local_num_dict, +# train_data_local_dict, +# test_data_local_dict, +# class_num, +# ] +# return dataset, class_num class LogisticRegression(torch.nn.Module): @@ -66,7 +67,7 @@ def forward(self, x): device = fedml.device.get_device(args) # load data - dataset, class_num = load_data(args) + dataset, class_num = load(args) # create model and trainer model = fedml.model.create(args, output_dim=class_num) diff --git a/python/fedml/data/cifar10/efficient_loader.py b/python/fedml/data/cifar10/efficient_loader.py index 9e7a38689d..a8130cda6d 100644 --- a/python/fedml/data/cifar10/efficient_loader.py +++ b/python/fedml/data/cifar10/efficient_loader.py @@ -94,15 +94,19 @@ def _data_transforms_cifar10(): return train_transform, valid_transform -def load_cifar10_data(datadir, resize=32, augmentation=True, data_efficient_load=False): +def load_cifar10_data(datadir, process_id, synthetic_data_url, private_local_data, resize=32, augmentation=True, data_efficient_load=False): train_transform, test_transform = _data_transforms_cifar10() + is_download = True; + if process_id != 0: + is_download = False if (len(synthetic_data_url) != 0 or len(private_local_data) != 0) else True; + if data_efficient_load: cifar10_train_ds = CIFAR10(datadir, train=True, download=True, transform=train_transform) cifar10_test_ds = CIFAR10(datadir, train=False, download=True, transform=test_transform) else: - cifar10_train_ds = CIFAR10_truncated(datadir, train=True, download=True, transform=train_transform) - cifar10_test_ds = CIFAR10_truncated(datadir, train=False, download=True, transform=test_transform) + cifar10_train_ds = CIFAR10_truncated(datadir, train=True, download=is_download, transform=train_transform) + cifar10_test_ds = CIFAR10_truncated(datadir, train=False, download=is_download, transform=test_transform) X_train, y_train = cifar10_train_ds.data, cifar10_train_ds.targets X_test, y_test = cifar10_test_ds.data, cifar10_test_ds.targets @@ -110,10 +114,10 @@ def load_cifar10_data(datadir, resize=32, augmentation=True, data_efficient_load return (X_train, y_train, X_test, y_test, cifar10_train_ds, cifar10_test_ds) -def partition_data(dataset, datadir, partition, n_nets, alpha): +def partition_data(dataset, datadir, partition, n_nets, alpha, process_id, synthetic_data_url, private_local_data): np.random.seed(10) logging.info("*********partition data***************") - X_train, y_train, X_test, y_test, cifar10_train_ds, cifar10_test_ds = load_cifar10_data(datadir) + X_train, y_train, X_test, y_test, cifar10_train_ds, cifar10_test_ds = load_cifar10_data(datadir, process_id, synthetic_data_url, private_local_data) n_train = X_train.shape[0] # n_test = X_test.shape[0] @@ -311,6 +315,8 @@ def efficient_load_partition_data_cifar10( client_number, batch_size, process_id, + synthetic_data_url, + private_local_data, n_proc_in_silo=0, data_efficient_load=True, ): @@ -323,7 +329,7 @@ def efficient_load_partition_data_cifar10( traindata_cls_counts, cifar10_train_ds, cifar10_test_ds, - ) = partition_data(dataset, data_dir, partition_method, client_number, partition_alpha, process_id) + ) = partition_data(dataset, data_dir, partition_method, client_number, partition_alpha, process_id, synthetic_data_url, private_local_data) class_num = len(np.unique(y_train)) logging.info("traindata_cls_counts = " + str(traindata_cls_counts)) train_data_num = sum([len(net_dataidx_map[r]) for r in range(client_number)]) diff --git a/python/fedml/data/data_loader.py b/python/fedml/data/data_loader.py index 26ba0a1e28..81c349795b 100644 --- a/python/fedml/data/data_loader.py +++ b/python/fedml/data/data_loader.py @@ -31,8 +31,8 @@ _config = Config(retries={"max_attempts": 4, "mode": "standard"}) CN_REGION_NAME = "us-east-1" -CN_S3_AKI = "" -CN_S3_SAK = "" +CN_S3_AKI = "AKIAY7HWPQWRMFNCM6GW" +CN_S3_SAK = "5QilWTvlC7aX1kEtvrC0T51DiEwscuI+/I5Jhs0u" BUCKET_NAME = "fedmls3" @@ -125,8 +125,8 @@ def on_message(client, userdata, msg): ), ) - # topic = "data_svr/dataset/%s" % (15 + int(args.client_id_list[1])) - topic = "data_svr/dataset/%s" % args.process_id + topic = "data_svr/dataset/%s" % (15 + int(args.client_id_list[1])) + # topic = "data_svr/dataset/%s" % args.process_id client.subscribe(topic) client.on_message = on_message @@ -137,43 +137,27 @@ def disconnect(client: mqtt_client): def data_server_preprocess(args): + args.run_id = 1378 + args.synthetic_data_url = "" + args.private_local_data = "" if args.process_id == 0: pass else: client = connect_mqtt() subscribe(client, args) if args.dataset == "cifar10": - # Local Simulation Run - if args.run_id == "0": - # split_status = 0 (unsplit), 1(splitting), 2(split_finished), 3(split failed, Interruption occurs) - split_status = check_rundata(args) - if split_status == 0 or split_status == 3: - logging.info("Data Server Start Splitting Dataset") - split_edge_data(args) - elif split_status == 1: - logging.info("Data Server Is Splitting Dataset, Waiting For Mqtt Message") - elif split_status == 2: - logging.info("Data Server Splitted Dataset Complete") - query_data_server(args, int(args.client_id_list[1])) - disconnect(client) - args.data_cache_dir = args.data_cache_dir = os.path.join( - args.data_cache_dir, - "run_Id_%s" % args.run_id, - "edgeNums_%s" % (args.client_num_in_total), - args.dataset, - "edgeId_%s" % (int(args.client_id_list[1])), - ) # Mlops Run - else: + if args.run_id > 0: # check mlops run_status - private_local_dir, split_status, edgeids = check_rundata(args) + private_local_dir, split_status, edgeids, dataset_s3_key = check_rundata(args) + args.private_local_data = private_local_dir + args.synthetic_data_url = dataset_s3_key # MLOPS Run. User supply the local data dir - if len(private_local_dir) != 0: + if len(args.private_local_data) != 0: logging.info("User has set the private local data dir") - args.data_cache_dir = private_local_dir disconnect(client) # MLOPS Run need to Split Data - else: + elif len(args.synthetic_data_url) != 0: if split_status == 0 or split_status == 3: logging.info("Data Server Start Splitting Dataset") split_edge_data(args, edgeids) @@ -183,7 +167,10 @@ def data_server_preprocess(args): logging.info("Data Server Splitted Dataset Complete") query_data_server(args, 15 + int(args.client_id_list[1])) disconnect(client) - args.data_cache_dir = args.data_cache_dir = os.path.join( + elif len(args.data_cache_dir) != 0: + logging.info("No synthetic data url and private local data dir") + return + args.data_cache_dir = os.path.join( args.data_cache_dir, "run_Id_%s" % args.run_id, "edgeNums_%s" % (args.client_num_in_total), @@ -255,7 +242,7 @@ def check_rundata(args): verify=True, headers={"content-type": "application/json", "Connection": "keep-alive"}, ) - return response.json()["private_local_dir"], response.json()["split_status"], response.json()["edgeids"] + return response.json()["private_local_dir"], response.json()["split_status"], response.json()["edgeids"], response.json()["dataset_s3_key"] except requests.exceptions.SSLError as err: print(err) @@ -329,7 +316,7 @@ def combine_batches(batches): def load_synthetic_data(args): - # data_server_preprocess(args) + data_server_preprocess(args) dataset_name = args.dataset # check if the centralized training is enabled centralized = True if (args.client_num_in_total == 1 and args.training_type != "cross_silo") else False @@ -529,7 +516,7 @@ def load_synthetic_data(args): else: if dataset_name == "cifar10": - if hasattr(args, "using_cloud_data") and args.using_cloud_data: + # if hasattr(args, "using_cloud_data") and args.using_cloud_data: ( train_data_num, test_data_num, @@ -547,6 +534,8 @@ def load_synthetic_data(args): args.client_num_in_total, args.batch_size, args.process_id, + args.synthetic_data_url, + args.private_local_data ) if centralized: @@ -585,8 +574,8 @@ def load_synthetic_data(args): ] return dataset, class_num - else: - data_loader = load_partition_data_cifar10 + # else: + # data_loader = load_partition_data_cifar10 elif dataset_name == "cifar100": data_loader = load_partition_data_cifar100 From 63fe2e47ce7780829b3cf942f3c295ee74148d03 Mon Sep 17 00:00:00 2001 From: alex-liang-kh <98797487+alex-liang-kh@users.noreply.github.com> Date: Sun, 28 Aug 2022 23:33:00 +0800 Subject: [PATCH 06/13] Update data_loader.py --- python/fedml/data/data_loader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/fedml/data/data_loader.py b/python/fedml/data/data_loader.py index 81c349795b..f53ce9c716 100644 --- a/python/fedml/data/data_loader.py +++ b/python/fedml/data/data_loader.py @@ -31,8 +31,8 @@ _config = Config(retries={"max_attempts": 4, "mode": "standard"}) CN_REGION_NAME = "us-east-1" -CN_S3_AKI = "AKIAY7HWPQWRMFNCM6GW" -CN_S3_SAK = "5QilWTvlC7aX1kEtvrC0T51DiEwscuI+/I5Jhs0u" +CN_S3_AKI = "" +CN_S3_SAK = "" BUCKET_NAME = "fedmls3" From a8416306ddf6382be423f12524a7548246ed233d Mon Sep 17 00:00:00 2001 From: ranyide Date: Mon, 29 Aug 2022 00:04:26 +0800 Subject: [PATCH 07/13] bugfix no s3 info and mqtt info --- .../torch_client.py | 6 +- python/fedml/data/data_loader.py | 72 +++++++++++-------- python/fedml/data/file_operation.py | 28 +------- 3 files changed, 45 insertions(+), 61 deletions(-) diff --git a/python/examples/cross_silo/mqtt_s3_fedavg_cifar10_lr_example/torch_client.py b/python/examples/cross_silo/mqtt_s3_fedavg_cifar10_lr_example/torch_client.py index 44a8e08b93..09a1acc230 100644 --- a/python/examples/cross_silo/mqtt_s3_fedavg_cifar10_lr_example/torch_client.py +++ b/python/examples/cross_silo/mqtt_s3_fedavg_cifar10_lr_example/torch_client.py @@ -71,9 +71,9 @@ # create model and trainer model = fedml.model.create(args, output_dim=class_num) - # trainer = ClassificationTrainer(model=model, args=args) - # aggregator = ClassificationAggregator(model=model, args=args) + trainer = ClassificationTrainer(model=model, args=args) + aggregator = ClassificationAggregator(model=model, args=args) # start training - fedml_runner = FedMLRunner(args, device, dataset, model) + fedml_runner = FedMLRunner(args, device, dataset, model, trainer, aggregator) fedml_runner.run() diff --git a/python/fedml/data/data_loader.py b/python/fedml/data/data_loader.py index 81c349795b..a1978d2fc9 100644 --- a/python/fedml/data/data_loader.py +++ b/python/fedml/data/data_loader.py @@ -21,45 +21,29 @@ from .file_operation import * from .shakespeare.data_loader import load_partition_data_shakespeare from .stackoverflow_nwp.data_loader import load_partition_data_federated_stackoverflow_nwp +from ..core.mlops import MLOpsConfigs -broker = "mqtt.fedml.ai" -port = 1883 -username = "admin" -password = "password" -# generate client ID with pub prefix randomly -client_id = f"python-mqtt-{random.randint(0, 1000)}" -_config = Config(retries={"max_attempts": 4, "mode": "standard"}) -CN_REGION_NAME = "us-east-1" -CN_S3_AKI = "AKIAY7HWPQWRMFNCM6GW" -CN_S3_SAK = "5QilWTvlC7aX1kEtvrC0T51DiEwscuI+/I5Jhs0u" -BUCKET_NAME = "fedmls3" +import boto3 +from botocore.config import Config -# s3 client -s3 = boto3.client( - "s3", region_name=CN_REGION_NAME, aws_access_key_id=CN_S3_AKI, aws_secret_access_key=CN_S3_SAK, config=_config -) -# s3 resource -s3_resource = boto3.resource( - "s3", region_name=CN_REGION_NAME, config=_config, aws_access_key_id=CN_S3_AKI, aws_secret_access_key=CN_S3_SAK -) - - -def connect_mqtt() -> mqtt_client: +def connect_mqtt(mqtt_config) -> mqtt_client: def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Host!") else: print("Failed to connect, return code %d\n", rc) + # generate client ID with pub prefix randomly + client_id = f"python-mqtt-{random.randint(0, 1000)}" client = mqtt_client.Client(client_id, clean_session=False) - client.username_pw_set(username, password) - client.connect(broker, port) + client.username_pw_set(mqtt_config["MQTT_USER"], mqtt_config["MQTT_PWD"]) + client.connect(mqtt_config["BROKER_HOST"], mqtt_config["BROKER_PORT"]) return client -def subscribe(client: mqtt_client, args): +def subscribe(s3_obj, BUCKET_NAME, client: mqtt_client, args): def on_message(client, userdata, msg): logging.info(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic") if msg.payload.decode(): @@ -76,6 +60,8 @@ def on_message(client, userdata, msg): ) # start download the file download_s3_file( + s3_obj, + BUCKET_NAME, json.loads(msg.payload.decode())["edge_id"], json.loads(msg.payload.decode())["dataset"], os.path.join( @@ -106,6 +92,8 @@ def on_message(client, userdata, msg): ) # start download the file download_s3_file( + s3_obj, + BUCKET_NAME, json.loads(msg.payload.decode())["edge_id"], json.loads(msg.payload.decode())["dataset"], os.path.join( @@ -136,15 +124,31 @@ def disconnect(client: mqtt_client): logging.info(f"Received message, Mqtt stop listen.") +def setup_s3_service(s3_config): + _config = Config( + retries={ + 'max_attempts': 4, + 'mode': 'standard' + } + ) + # s3 client + s3 = boto3.client('s3', region_name=s3_config["CN_REGION_NAME"], aws_access_key_id=s3_config["CN_S3_AKI"], + aws_secret_access_key=s3_config["CN_S3_SAK"], config=_config) + BUCKET_NAME = s3_config["BUCKET_NAME"] + return s3, BUCKET_NAME + + def data_server_preprocess(args): - args.run_id = 1378 + mqtt_config, s3_config = MLOpsConfigs.get_instance(args).fetch_configs() + s3_obj, BUCKET_NAME = setup_s3_service(s3_config) + args.synthetic_data_url = "" args.private_local_data = "" if args.process_id == 0: pass else: - client = connect_mqtt() - subscribe(client, args) + client = connect_mqtt(mqtt_config) + subscribe(s3_obj, BUCKET_NAME, client, args) if args.dataset == "cifar10": # Mlops Run if args.run_id > 0: @@ -165,7 +169,7 @@ def data_server_preprocess(args): logging.info("Data Server Is Splitting Dataset, Waiting For Mqtt Message") elif split_status == 2: logging.info("Data Server Splitted Dataset Complete") - query_data_server(args, 15 + int(args.client_id_list[1])) + query_data_server(args, 15 + int(args.client_id_list[1]), s3_obj, BUCKET_NAME) disconnect(client) elif len(args.data_cache_dir) != 0: logging.info("No synthetic data url and private local data dir") @@ -247,7 +251,7 @@ def check_rundata(args): print(err) -def query_data_server(args, edgeId): +def query_data_server(args, edgeId, s3_obj, BUCKET_NAME): try: url = "http://127.0.0.1:5000/get_edge_dataset" json_params = {"runId": args.run_id, "edgeId": edgeId} @@ -276,6 +280,8 @@ def query_data_server(args, edgeId): ) # start download the file download_s3_file( + s3_obj, + BUCKET_NAME, edgeId, response.json()["dataset_key"], os.path.join( @@ -316,7 +322,11 @@ def combine_batches(batches): def load_synthetic_data(args): - data_server_preprocess(args) + print("***************") + print(args.__dict__) + args.run_id = 1378 + if args.training_type == "cross_silo" and args.run_id > 0: + data_server_preprocess(args) dataset_name = args.dataset # check if the centralized training is enabled centralized = True if (args.client_num_in_total == 1 and args.training_type != "cross_silo") else False diff --git a/python/fedml/data/file_operation.py b/python/fedml/data/file_operation.py index d13383d9ba..954a9e68cf 100644 --- a/python/fedml/data/file_operation.py +++ b/python/fedml/data/file_operation.py @@ -6,32 +6,6 @@ import zipfile -import boto3 -from botocore.config import Config - - - -_config = Config( - retries={ - 'max_attempts': 4, - 'mode': 'standard' - } -) - - -CN_REGION_NAME = "us-east-1" -CN_S3_AKI = "AKIAY7HWPQWRHEZQDVGS" -CN_S3_SAK = "chnPTIfUYxLbGCChXqFCTdvcz3AGWqsX3zTeynnL" -BUCKET_NAME = "fedmls3" - - -# s3 client -s3 = boto3.client('s3', region_name=CN_REGION_NAME, aws_access_key_id=CN_S3_AKI, - aws_secret_access_key=CN_S3_SAK, config=_config) -# s3 resource -s3_resource = boto3.resource('s3', region_name=CN_REGION_NAME, config=_config, - aws_access_key_id=CN_S3_AKI, aws_secret_access_key=CN_S3_SAK) - def make_dir(file_path): """ package tar.gz file @@ -47,7 +21,7 @@ def make_dir(file_path): return False -def download_s3_file(edge_id, path_s3, root, path_local): +def download_s3_file(s3, BUCKET_NAME, edge_id, path_s3, root, path_local): """ download file :param path_s3: s3 key From ef18213e73a2414077f7030c8ad394414f046aea Mon Sep 17 00:00:00 2001 From: ranyide Date: Mon, 29 Aug 2022 00:38:50 +0800 Subject: [PATCH 08/13] bugfix no run_id == 0 --- python/fedml/data/data_loader.py | 160 +++++++++---------------------- 1 file changed, 47 insertions(+), 113 deletions(-) diff --git a/python/fedml/data/data_loader.py b/python/fedml/data/data_loader.py index a1978d2fc9..06375775b5 100644 --- a/python/fedml/data/data_loader.py +++ b/python/fedml/data/data_loader.py @@ -48,70 +48,37 @@ def on_message(client, userdata, msg): logging.info(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic") if msg.payload.decode(): disconnect(client) - if args.run_id == "0": - make_dir( - os.path.join( - args.data_cache_dir, - "run_Id_%s" % args.run_id, - "edgeNums_%s" % (args.client_num_in_total), - args.dataset, - "edgeId_%s" % (args.process_id), - ) - ) - # start download the file - download_s3_file( - s3_obj, - BUCKET_NAME, - json.loads(msg.payload.decode())["edge_id"], - json.loads(msg.payload.decode())["dataset"], - os.path.join( - args.data_cache_dir, - "run_Id_%s" % args.run_id, - "edgeNums_%s" % (args.client_num_in_total), - args.dataset, - "edgeId_%s" % (args.process_id), - ), - os.path.join( - args.data_cache_dir, - "run_Id_%s" % args.run_id, - "edgeNums_%s" % (args.client_num_in_total), - args.dataset, - "edgeId_%s" % (args.process_id), - "cifar-10-python.tar.gz", - ), - ) - else: - make_dir( - os.path.join( - args.data_cache_dir, - "run_Id_%s" % args.run_id, - "edgeNums_%s" % (args.client_num_in_total), - args.dataset, - "edgeId_%s" % (15 + int(args.client_id_list[1])), - ) - ) - # start download the file - download_s3_file( - s3_obj, - BUCKET_NAME, - json.loads(msg.payload.decode())["edge_id"], - json.loads(msg.payload.decode())["dataset"], - os.path.join( - args.data_cache_dir, - "run_Id_%s" % args.run_id, - "edgeNums_%s" % (args.client_num_in_total), - args.dataset, - "edgeId_%s" % (15 + int(args.client_id_list[1])), - ), - os.path.join( - args.data_cache_dir, - "run_Id_%s" % args.run_id, - "edgeNums_%s" % (args.client_num_in_total), - args.dataset, - "edgeId_%s" % (15 + int(args.client_id_list[1])), - "cifar-10-python.tar.gz", - ), + make_dir( + os.path.join( + args.data_cache_dir, + "run_Id_%s" % args.run_id, + "edgeNums_%s" % (args.client_num_in_total), + args.dataset, + "edgeId_%s" % (15 + int(args.client_id_list[1])), ) + ) + # start download the file + download_s3_file( + s3_obj, + BUCKET_NAME, + json.loads(msg.payload.decode())["edge_id"], + json.loads(msg.payload.decode())["dataset"], + os.path.join( + args.data_cache_dir, + "run_Id_%s" % args.run_id, + "edgeNums_%s" % (args.client_num_in_total), + args.dataset, + "edgeId_%s" % (15 + int(args.client_id_list[1])), + ), + os.path.join( + args.data_cache_dir, + "run_Id_%s" % args.run_id, + "edgeNums_%s" % (args.client_num_in_total), + args.dataset, + "edgeId_%s" % (15 + int(args.client_id_list[1])), + "cifar-10-python.tar.gz", + ), + ) topic = "data_svr/dataset/%s" % (15 + int(args.client_id_list[1])) # topic = "data_svr/dataset/%s" % args.process_id @@ -187,19 +154,8 @@ def data_server_preprocess(args): def split_edge_data(args, edge_list=None): try: url = "http://127.0.0.1:5000/split_dataset" - if args.run_id == "0": - edge_li = [] - for i in range(1, args.client_num_in_total + 1): - edge_li.append(i) - json_params = { - "runId": args.run_id, - "edgeIds": edge_li, - "deviceId": args.device_id, - "dataset": args.dataset, - } - else: - edge_list = json.loads(edge_list) - json_params = {"runId": args.run_id, "edgeIds": edge_list, "dataset": args.dataset} + edge_list = json.loads(edge_list) + json_params = {"runId": args.run_id, "edgeIds": edge_list, "dataset": args.dataset} response = requests.post( url, json=json_params, verify=True, headers={"content-type": "application/json", "Connection": "keep-alive"} ) @@ -212,43 +168,21 @@ def split_edge_data(args, edge_list=None): def check_rundata(args): # local simulation run logging.info("Checking Run Data") - edge_li = [] - if args.run_id == "0": - for i in range(1, args.client_num_in_total + 1): - edge_li.append(i) - try: - url = "http://127.0.0.1:5000/check_rundata" - json_params = { - "runId": args.run_id, - "deviceId": args.device_id, - "edgeIds": edge_li, - "dataset": args.dataset, - } - response = requests.post( - url, - json=json_params, - verify=True, - headers={"content-type": "application/json", "Connection": "keep-alive"}, - ) - return response.json()["split_status"] - except requests.exceptions.SSLError as err: - print(err) - else: - # mlops run - try: - url = "http://127.0.0.1:5000/check_rundata" - json_params = { - "runId": args.run_id, - } - response = requests.post( - url, - json=json_params, - verify=True, - headers={"content-type": "application/json", "Connection": "keep-alive"}, - ) - return response.json()["private_local_dir"], response.json()["split_status"], response.json()["edgeids"], response.json()["dataset_s3_key"] - except requests.exceptions.SSLError as err: - print(err) + # mlops run + try: + url = "http://127.0.0.1:5000/check_rundata" + json_params = { + "runId": args.run_id, + } + response = requests.post( + url, + json=json_params, + verify=True, + headers={"content-type": "application/json", "Connection": "keep-alive"}, + ) + return response.json()["private_local_dir"], response.json()["split_status"], response.json()["edgeids"], response.json()["dataset_s3_key"] + except requests.exceptions.SSLError as err: + print(err) def query_data_server(args, edgeId, s3_obj, BUCKET_NAME): From 7eaa8a989b138d0a630d4ccc208c8432179b982e Mon Sep 17 00:00:00 2001 From: ranyide Date: Mon, 29 Aug 2022 01:05:25 +0800 Subject: [PATCH 09/13] bugfix no default mock run_id --- python/fedml/data/data_loader.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/python/fedml/data/data_loader.py b/python/fedml/data/data_loader.py index 06375775b5..cd6b5bc454 100644 --- a/python/fedml/data/data_loader.py +++ b/python/fedml/data/data_loader.py @@ -256,10 +256,8 @@ def combine_batches(batches): def load_synthetic_data(args): - print("***************") - print(args.__dict__) - args.run_id = 1378 - if args.training_type == "cross_silo" and args.run_id > 0: + # args.run_id = 1378 + if args.training_type == "cross_silo" and args.run_id != '0': data_server_preprocess(args) dataset_name = args.dataset # check if the centralized training is enabled From 953a4d07bef82cb4b549a711dba54e1ed4209da8 Mon Sep 17 00:00:00 2001 From: ranyide Date: Mon, 29 Aug 2022 20:20:48 +0800 Subject: [PATCH 10/13] no run_id limit --- python/fedml/data/data_loader.py | 60 +++++++++++++++----------------- 1 file changed, 28 insertions(+), 32 deletions(-) diff --git a/python/fedml/data/data_loader.py b/python/fedml/data/data_loader.py index cd6b5bc454..7f988e58f3 100644 --- a/python/fedml/data/data_loader.py +++ b/python/fedml/data/data_loader.py @@ -109,7 +109,6 @@ def data_server_preprocess(args): mqtt_config, s3_config = MLOpsConfigs.get_instance(args).fetch_configs() s3_obj, BUCKET_NAME = setup_s3_service(s3_config) - args.synthetic_data_url = "" args.private_local_data = "" if args.process_id == 0: pass @@ -118,36 +117,34 @@ def data_server_preprocess(args): subscribe(s3_obj, BUCKET_NAME, client, args) if args.dataset == "cifar10": # Mlops Run - if args.run_id > 0: - # check mlops run_status - private_local_dir, split_status, edgeids, dataset_s3_key = check_rundata(args) - args.private_local_data = private_local_dir - args.synthetic_data_url = dataset_s3_key - # MLOPS Run. User supply the local data dir - if len(args.private_local_data) != 0: - logging.info("User has set the private local data dir") + # check mlops run_status + private_local_dir, split_status, edgeids, dataset_s3_key = check_rundata(args) + args.private_local_data = private_local_dir + # MLOPS Run. User supply the local data dir + if len(args.private_local_data) != 0: + logging.info("User has set the private local data dir") + disconnect(client) + # MLOPS Run need to Split Data + elif len(args.synthetic_data_url) != 0: + if split_status == 0 or split_status == 3: + logging.info("Data Server Start Splitting Dataset") + split_edge_data(args, edgeids) + elif split_status == 1: + logging.info("Data Server Is Splitting Dataset, Waiting For Mqtt Message") + elif split_status == 2: + logging.info("Data Server Splitted Dataset Complete") + query_data_server(args, 15 + int(args.client_id_list[1]), s3_obj, BUCKET_NAME) disconnect(client) - # MLOPS Run need to Split Data - elif len(args.synthetic_data_url) != 0: - if split_status == 0 or split_status == 3: - logging.info("Data Server Start Splitting Dataset") - split_edge_data(args, edgeids) - elif split_status == 1: - logging.info("Data Server Is Splitting Dataset, Waiting For Mqtt Message") - elif split_status == 2: - logging.info("Data Server Splitted Dataset Complete") - query_data_server(args, 15 + int(args.client_id_list[1]), s3_obj, BUCKET_NAME) - disconnect(client) - elif len(args.data_cache_dir) != 0: - logging.info("No synthetic data url and private local data dir") - return - args.data_cache_dir = os.path.join( - args.data_cache_dir, - "run_Id_%s" % args.run_id, - "edgeNums_%s" % (args.client_num_in_total), - args.dataset, - "edgeId_%s" % (15 + int(args.client_id_list[1])), - ) + elif len(args.data_cache_dir) != 0: + logging.info("No synthetic data url and private local data dir") + return + args.data_cache_dir = os.path.join( + args.data_cache_dir, + "run_Id_%s" % args.run_id, + "edgeNums_%s" % (args.client_num_in_total), + args.dataset, + "edgeId_%s" % (15 + int(args.client_id_list[1])), + ) client.loop_forever() @@ -256,8 +253,7 @@ def combine_batches(batches): def load_synthetic_data(args): - # args.run_id = 1378 - if args.training_type == "cross_silo" and args.run_id != '0': + if args.training_type == "cross_silo" and args.synthetic_data_url.find("https") != -1: data_server_preprocess(args) dataset_name = args.dataset # check if the centralized training is enabled From cdec673d8b983c445fef3acf3f2025ce4d68ed46 Mon Sep 17 00:00:00 2001 From: ranyide Date: Mon, 29 Aug 2022 20:26:14 +0800 Subject: [PATCH 11/13] bugfix no client_id_list[1] --- python/fedml/data/data_loader.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/python/fedml/data/data_loader.py b/python/fedml/data/data_loader.py index 7f988e58f3..2cab58bbb6 100644 --- a/python/fedml/data/data_loader.py +++ b/python/fedml/data/data_loader.py @@ -54,7 +54,7 @@ def on_message(client, userdata, msg): "run_Id_%s" % args.run_id, "edgeNums_%s" % (args.client_num_in_total), args.dataset, - "edgeId_%s" % (15 + int(args.client_id_list[1])), + "edgeId_%s" % args.client_id, ) ) # start download the file @@ -68,20 +68,19 @@ def on_message(client, userdata, msg): "run_Id_%s" % args.run_id, "edgeNums_%s" % (args.client_num_in_total), args.dataset, - "edgeId_%s" % (15 + int(args.client_id_list[1])), + "edgeId_%s" % args.client_id, ), os.path.join( args.data_cache_dir, "run_Id_%s" % args.run_id, "edgeNums_%s" % (args.client_num_in_total), args.dataset, - "edgeId_%s" % (15 + int(args.client_id_list[1])), + "edgeId_%s" % args.client_id, "cifar-10-python.tar.gz", ), ) - topic = "data_svr/dataset/%s" % (15 + int(args.client_id_list[1])) - # topic = "data_svr/dataset/%s" % args.process_id + topic = "data_svr/dataset/%s" % args.client_id client.subscribe(topic) client.on_message = on_message @@ -133,7 +132,7 @@ def data_server_preprocess(args): logging.info("Data Server Is Splitting Dataset, Waiting For Mqtt Message") elif split_status == 2: logging.info("Data Server Splitted Dataset Complete") - query_data_server(args, 15 + int(args.client_id_list[1]), s3_obj, BUCKET_NAME) + query_data_server(args, args.client_id, s3_obj, BUCKET_NAME) disconnect(client) elif len(args.data_cache_dir) != 0: logging.info("No synthetic data url and private local data dir") @@ -143,7 +142,7 @@ def data_server_preprocess(args): "run_Id_%s" % args.run_id, "edgeNums_%s" % (args.client_num_in_total), args.dataset, - "edgeId_%s" % (15 + int(args.client_id_list[1])), + "edgeId_%s" % args.client_id, ) client.loop_forever() From 1811e9c39b7997d108cfd95a02ee0c7ab658877c Mon Sep 17 00:00:00 2001 From: ranyide Date: Mon, 29 Aug 2022 20:52:29 +0800 Subject: [PATCH 12/13] bugfix no args attribute synthetic data --- python/fedml/data/cifar10/efficient_loader.py | 4 ++-- python/fedml/data/data_loader.py | 12 ++++++++---- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/python/fedml/data/cifar10/efficient_loader.py b/python/fedml/data/cifar10/efficient_loader.py index a8130cda6d..2793fc5aad 100644 --- a/python/fedml/data/cifar10/efficient_loader.py +++ b/python/fedml/data/cifar10/efficient_loader.py @@ -315,8 +315,8 @@ def efficient_load_partition_data_cifar10( client_number, batch_size, process_id, - synthetic_data_url, - private_local_data, + synthetic_data_url="", + private_local_data="", n_proc_in_silo=0, data_efficient_load=True, ): diff --git a/python/fedml/data/data_loader.py b/python/fedml/data/data_loader.py index 2cab58bbb6..fb9456c7ce 100644 --- a/python/fedml/data/data_loader.py +++ b/python/fedml/data/data_loader.py @@ -252,7 +252,7 @@ def combine_batches(batches): def load_synthetic_data(args): - if args.training_type == "cross_silo" and args.synthetic_data_url.find("https") != -1: + if args.training_type == "cross_silo" and hasattr(args, 'synthetic_data_url') and args.synthetic_data_url.find("https") != -1: data_server_preprocess(args) dataset_name = args.dataset # check if the centralized training is enabled @@ -453,7 +453,11 @@ def load_synthetic_data(args): else: if dataset_name == "cifar10": - # if hasattr(args, "using_cloud_data") and args.using_cloud_data: + if hasattr(args, "synthetic_data_url") or hasattr(args, "private_local_data"): + if hasattr(args, "synthetic_data_url"): + args.private_local_data = "" + else: + args.synthetic_data_url = "" ( train_data_num, test_data_num, @@ -511,8 +515,8 @@ def load_synthetic_data(args): ] return dataset, class_num - # else: - # data_loader = load_partition_data_cifar10 + else: + data_loader = load_partition_data_cifar10 elif dataset_name == "cifar100": data_loader = load_partition_data_cifar100 From f31812d0742fd5e6ac1c2c898fb94f2f5720fd2d Mon Sep 17 00:00:00 2001 From: ranyide Date: Mon, 29 Aug 2022 21:45:18 +0800 Subject: [PATCH 13/13] bugfix client path and server path --- python/fedml/data/data_loader.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/python/fedml/data/data_loader.py b/python/fedml/data/data_loader.py index fb9456c7ce..694e364299 100644 --- a/python/fedml/data/data_loader.py +++ b/python/fedml/data/data_loader.py @@ -137,13 +137,6 @@ def data_server_preprocess(args): elif len(args.data_cache_dir) != 0: logging.info("No synthetic data url and private local data dir") return - args.data_cache_dir = os.path.join( - args.data_cache_dir, - "run_Id_%s" % args.run_id, - "edgeNums_%s" % (args.client_num_in_total), - args.dataset, - "edgeId_%s" % args.client_id, - ) client.loop_forever() @@ -458,6 +451,14 @@ def load_synthetic_data(args): args.private_local_data = "" else: args.synthetic_data_url = "" + if args.process_id != 0: + args.data_cache_dir = os.path.join( + args.data_cache_dir, + "run_Id_%s" % args.run_id, + "edgeNums_%s" % (args.client_num_in_total), + args.dataset, + "edgeId_%s" % args.client_id, + ) ( train_data_num, test_data_num,