Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Communication efficiency optimization #19

Merged
merged 2 commits into from
Apr 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,17 @@ To run with distributed mode, you only need to:

- Prepare isolated data file and set up `cfg.distribute.data_file = PATH/TO/DATA` for each participant;
- Change `cfg.federate.model = 'distributed'`, and specify the role of each participant by `cfg.distributed.role = 'server'/'client'`.
- Set up a valid address by `cfg.distribute.host = x.x.x.x` and `cfg.distribute.host = xxxx`. (Note that for a server, you need to set up server_host/server_port for listening messge, while for a client, you need to set up client_host/client_port for listening and server_host/server_port for sending join-in applications when building up an FL course)
- Set up a valid address by `cfg.distribute.host = x.x.x.x` and `cfg.distribute.port = xxxx`. (Note that for a server, you need to set up server_host/server_port for listening messge, while for a client, you need to set up client_host/client_port for listening and server_host/server_port for sending join-in applications when building up an FL course)

We prepare a synthetic example for running with distributed mode:

```bash
# For server
python main.py --cfg federatedscope/example_configs/distributed_server.yaml data_path 'PATH/TO/DATA' server.host x.x.x.x client.port xxxx
python main.py --cfg federatedscope/example_configs/distributed_server.yaml data_path 'PATH/TO/DATA' distribute.server_host x.x.x.x distribute.server_port xxxx

# For client
python main.py --cfg federatedscope/example_configs/distributed_client.yaml data_path 'PATH/TO/DATA' server.host x.x.x.x server.port xxxx client.host x.x.x.x client.port xxxx
# For clients
python main.py --cfg federatedscope/example_configs/distributed_client_1.yaml data_path 'PATH/TO/DATA' distribute.server_host x.x.x.x distribute.server_port xxxx distribute.client_host x.x.x.x distribute.client_port xxxx
python main.py --cfg federatedscope/example_configs/distributed_client_2.yaml data_path 'PATH/TO/DATA' distribute.server_host x.x.x.x distribute.server_port xxxx distribute.client_host x.x.x.x distribute.client_port xxxx
```

And you can observe the results as (the IP addresses are anonymized with 'x.x.x.x'):
Expand Down
7 changes: 4 additions & 3 deletions federatedscope/core/communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ def _create_stub(receiver_address):
stub = gRPC_comm_manager_pb2_grpc.gRPCComServeFuncStub(channel)
return stub, channel
stub, channel = _create_stub(receiver_address)
request = message.msg_to_json(to_list=True)
stub.sendMessage(gRPC_comm_manager_pb2.MessageRequest(msg=request))
request = message.transform(to_list=True)
#msg_test = gRPC_comm_manager_pb2.MessageRequest(msg=request)
stub.sendMessage(request)
channel.close()

def send(self, message):
Expand All @@ -119,5 +120,5 @@ def send(self, message):
def receive(self):
received_msg = self.server_funcs.receive()
message = Message()
message.json_to_msg(received_msg.msg)
message.parse(received_msg.msg)
return message
9 changes: 5 additions & 4 deletions federatedscope/core/gRPC_server.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
import queue
from collections import deque

from federatedscope.core.proto import gRPC_comm_manager_pb2, gRPC_comm_manager_pb2_grpc


class gRPCComServeFunc(gRPC_comm_manager_pb2_grpc.gRPCComServeFuncServicer):
def __init__(self):
self.msg_queue = queue.Queue()
self.msg_queue = deque()

def sendMessage(self, request, context):
self.msg_queue.put(request)
self.msg_queue.append(request)

return gRPC_comm_manager_pb2.MessageResponse(msg='ACK')

def receive(self):
while self.msg_queue.empty():
while len(self.msg_queue) == 0:
continue
msg = self.msg_queue.get()
msg = self.msg_queue.popleft()
return msg
93 changes: 93 additions & 0 deletions federatedscope/core/message.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import sys
import json
import numpy as np
from federatedscope.core.proto import gRPC_comm_manager_pb2


class Message(object):
Expand Down Expand Up @@ -109,3 +111,94 @@ def json_to_msg(self, json_string):
self.state = json_msg['state']
self.content = json_msg['content']
self.strategy = json_msg['strategy']

def create_by_type(self, value, nested=False):
if isinstance(value, dict):
m_dict = gRPC_comm_manager_pb2.mDict()
for key in value.keys():
m_dict.dict_value[key].MergeFrom(
self.create_by_type(value[key], nested=True))
if nested:
msg_value = gRPC_comm_manager_pb2.MsgValue()
msg_value.dict_msg.MergeFrom(m_dict)
return msg_value
else:
return m_dict
elif isinstance(value, list) or isinstance(value, tuple):
m_list = gRPC_comm_manager_pb2.mList()
for each in value:
m_list.list_value.append(self.create_by_type(each,
nested=True))
if nested:
msg_value = gRPC_comm_manager_pb2.MsgValue()
msg_value.list_msg.MergeFrom(m_list)
return msg_value
else:
return m_list
else:
m_single = gRPC_comm_manager_pb2.mSingle()
if type(value) in [int, np.int32]:
m_single.int_value = value
elif type(value) in [str]:
m_single.str_value = value
elif type(value) in [float, np.float32]:
m_single.float_value = value
else:
raise ValueError(
'The data type {} has not been supported.'.format(
type(value)))

if nested:
msg_value = gRPC_comm_manager_pb2.MsgValue()
msg_value.single_msg.MergeFrom(m_single)
return msg_value
else:
return m_single

def build_msg_value(self, value):
msg_value = gRPC_comm_manager_pb2.MsgValue()

if isinstance(value, list) or isinstance(value, tuple):
msg_value.list_msg.MergeFrom(self.create_by_type(value))
elif isinstance(value, dict):
msg_value.dict_msg.MergeFrom(self.create_by_type(value))
else:
msg_value.single_msg.MergeFrom(self.create_by_type(value))

return msg_value

def transform(self, to_list=False):
if to_list:
self.content = self.transform_to_list(self.content)

splited_msg = gRPC_comm_manager_pb2.MessageRequest() # map/dict
splited_msg.msg['sender'].MergeFrom(self.build_msg_value(self.sender))
splited_msg.msg['receiver'].MergeFrom(
self.build_msg_value(self.receiver))
splited_msg.msg['state'].MergeFrom(self.build_msg_value(self.state))
splited_msg.msg['msg_type'].MergeFrom(
self.build_msg_value(self.msg_type))
splited_msg.msg['content'].MergeFrom(self.build_msg_value(
self.content))
return splited_msg

def _parse_msg(self, value):
if isinstance(value, gRPC_comm_manager_pb2.MsgValue) or isinstance(
value, gRPC_comm_manager_pb2.mSingle):
return self._parse_msg(getattr(value, value.WhichOneof("type")))
elif isinstance(value, gRPC_comm_manager_pb2.mList):
return [self._parse_msg(each) for each in value.list_value]
elif isinstance(value, gRPC_comm_manager_pb2.mDict):
return {
k: self._parse_msg(value.dict_value[k])
for k in value.dict_value
}
else:
return value

def parse(self, received_msg):
self.sender = self._parse_msg(received_msg['sender'])
self.receiver = self._parse_msg(received_msg['receiver'])
self.msg_type = self._parse_msg(received_msg['msg_type'])
self.state = self._parse_msg(received_msg['state'])
self.content = self._parse_msg(received_msg['content'])
28 changes: 26 additions & 2 deletions federatedscope/core/proto/gRPC_comm_manager.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,33 @@ service gRPCComServeFunc {
}

message MessageRequest{
string msg = 1;
map<string, MsgValue> msg = 1;
}

message MsgValue{
oneof type {
mSingle single_msg = 1;
mList list_msg = 2;
mDict dict_msg = 3;
}
}

message mSingle{
oneof type {
float float_value = 1;
int32 int_value = 2;
string str_value = 3;
}
}

message mList{
repeated MsgValue list_value = 1;
}

message mDict{
map<string, MsgValue> dict_value = 1;
}

message MessageResponse{
string msg = 1;
}
}
Loading