Skip to content

Commit

Permalink
For the communication efficiency: dynamic type selection in gRPC serv…
Browse files Browse the repository at this point in the history
…icer; transformer & parser
  • Loading branch information
xieyxclack committed Apr 15, 2022
1 parent f1a6003 commit 606b11d
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 125 deletions.
7 changes: 4 additions & 3 deletions federatedscope/core/communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ def _send(self, receiver_address, message):
options=(('grpc.enable_http_proxy',
0), ))
stub = gRPC_comm_manager_pb2_grpc.gRPCComServeFuncStub(channel)
request = message.msg_to_json(to_list=True)
stub.sendMessage(gRPC_comm_manager_pb2.MessageRequest(msg=request))
request = message.transform(to_list=True)
#msg_test = gRPC_comm_manager_pb2.MessageRequest(msg=request)
stub.sendMessage(request)
channel.close()

def send(self, message):
Expand All @@ -123,5 +124,5 @@ def send(self, message):
def receive(self):
received_msg = self.servicer.receive()
message = Message()
message.json_to_msg(received_msg.msg)
message.parse(received_msg.msg)
return message
9 changes: 5 additions & 4 deletions federatedscope/core/gRPC_server.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
import queue
from collections import deque

from federatedscope.core.proto import gRPC_comm_manager_pb2, gRPC_comm_manager_pb2_grpc


class gRPCComServeFunc(gRPC_comm_manager_pb2_grpc.gRPCComServeFuncServicer):
def __init__(self):
self.msg_queue = queue.Queue()
self.msg_queue = deque()

def sendMessage(self, request, context):
self.msg_queue.put(request)
self.msg_queue.append(request)

return gRPC_comm_manager_pb2.MessageResponse(msg='ACK')

def receive(self):
while self.msg_queue.empty():
while len(self.msg_queue) == 0:
continue
msg = self.msg_queue.get()
msg = self.msg_queue.popleft()
return msg
93 changes: 93 additions & 0 deletions federatedscope/core/message.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import sys
import json
import numpy as np
from federatedscope.core.proto import gRPC_comm_manager_pb2


class Message(object):
Expand Down Expand Up @@ -109,3 +111,94 @@ def json_to_msg(self, json_string):
self.state = json_msg['state']
self.content = json_msg['content']
self.strategy = json_msg['strategy']

def create_by_type(self, value, nested=False):
if isinstance(value, dict):
m_dict = gRPC_comm_manager_pb2.mDict()
for key in value.keys():
m_dict.dict_value[key].MergeFrom(
self.create_by_type(value[key], nested=True))
if nested:
msg_value = gRPC_comm_manager_pb2.MsgValue()
msg_value.dict_msg.MergeFrom(m_dict)
return msg_value
else:
return m_dict
elif isinstance(value, list) or isinstance(value, tuple):
m_list = gRPC_comm_manager_pb2.mList()
for each in value:
m_list.list_value.append(self.create_by_type(each,
nested=True))
if nested:
msg_value = gRPC_comm_manager_pb2.MsgValue()
msg_value.list_msg.MergeFrom(m_list)
return msg_value
else:
return m_list
else:
m_single = gRPC_comm_manager_pb2.mSingle()
if type(value) in [int, np.int32]:
m_single.int_value = value
elif type(value) in [str]:
m_single.str_value = value
elif type(value) in [float, np.float32]:
m_single.float_value = value
else:
raise ValueError(
'The data type {} has not been supported.'.format(
type(value)))

if nested:
msg_value = gRPC_comm_manager_pb2.MsgValue()
msg_value.single_msg.MergeFrom(m_single)
return msg_value
else:
return m_single

def build_msg_value(self, value):
msg_value = gRPC_comm_manager_pb2.MsgValue()

if isinstance(value, list) or isinstance(value, tuple):
msg_value.list_msg.MergeFrom(self.create_by_type(value))
elif isinstance(value, dict):
msg_value.dict_msg.MergeFrom(self.create_by_type(value))
else:
msg_value.single_msg.MergeFrom(self.create_by_type(value))

return msg_value

def transform(self, to_list=False):
if to_list:
self.content = self.transform_to_list(self.content)

splited_msg = gRPC_comm_manager_pb2.MessageRequest() # map/dict
splited_msg.msg['sender'].MergeFrom(self.build_msg_value(self.sender))
splited_msg.msg['receiver'].MergeFrom(
self.build_msg_value(self.receiver))
splited_msg.msg['state'].MergeFrom(self.build_msg_value(self.state))
splited_msg.msg['msg_type'].MergeFrom(
self.build_msg_value(self.msg_type))
splited_msg.msg['content'].MergeFrom(self.build_msg_value(
self.content))
return splited_msg

def _parse_msg(self, value):
if isinstance(value, gRPC_comm_manager_pb2.MsgValue) or isinstance(
value, gRPC_comm_manager_pb2.mSingle):
return self._parse_msg(getattr(value, value.WhichOneof("type")))
elif isinstance(value, gRPC_comm_manager_pb2.mList):
return [self._parse_msg(each) for each in value.list_value]
elif isinstance(value, gRPC_comm_manager_pb2.mDict):
return {
k: self._parse_msg(value.dict_value[k])
for k in value.dict_value
}
else:
return value

def parse(self, received_msg):
self.sender = self._parse_msg(received_msg['sender'])
self.receiver = self._parse_msg(received_msg['receiver'])
self.msg_type = self._parse_msg(received_msg['msg_type'])
self.state = self._parse_msg(received_msg['state'])
self.content = self._parse_msg(received_msg['content'])
28 changes: 26 additions & 2 deletions federatedscope/core/proto/gRPC_comm_manager.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,33 @@ service gRPCComServeFunc {
}

message MessageRequest{
string msg = 1;
map<string, MsgValue> msg = 1;
}

message MsgValue{
oneof type {
mSingle single_msg = 1;
mList list_msg = 2;
mDict dict_msg = 3;
}
}

message mSingle{
oneof type {
float float_value = 1;
int32 int_value = 2;
string str_value = 3;
}
}

message mList{
repeated MsgValue list_value = 1;
}

message mDict{
map<string, MsgValue> dict_value = 1;
}

message MessageResponse{
string msg = 1;
}
}
Loading

0 comments on commit 606b11d

Please sign in to comment.