diff --git a/LICENSE b/LICENSE index a71c7c2b8..a21b48af8 100644 --- a/LICENSE +++ b/LICENSE @@ -335,25 +335,6 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. --------------------------------------------------------------------------------- - -Code in federatedscope/core/communication.py is adapted from -https://github.com/FedML-AI/FedML - -Copyright [FedML] [Chaoyang He, Salman Avestimehr] - -Licensed under the Apache License, Version 2.0 (the "License"); - -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - https://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -------------------------------------------------------------------------------- diff --git a/federatedscope/core/communication.py b/federatedscope/core/communication.py index 180f905db..0ef1c474a 100644 --- a/federatedscope/core/communication.py +++ b/federatedscope/core/communication.py @@ -1,18 +1,3 @@ -# Copyright [FedML] [Chaoyang He, Salman Avestimehr] -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - import grpc from concurrent import futures @@ -56,10 +41,12 @@ def send(self, message): class gRPCCommManager(object): """ - The implementation of gRPCCommManager is referred to https://github.com/FedML-AI/FedML/tree/master/fedml_core/distributed/communication/gRPC + The implementation of gRPCCommManager is referred to the tutorial on https://grpc.io/docs/languages/python/ """ def __init__(self, host='0.0.0.0', port='50050', client_num=2): - grpc_opts = [ + self.host = host + self.port = port + options = [ ("grpc.max_send_message_length", global_cfg.distribute.grpc_max_send_message_length), ("grpc.max_receive_message_length", @@ -67,18 +54,21 @@ def __init__(self, host='0.0.0.0', port='50050', client_num=2): ("grpc.enable_http_proxy", global_cfg.distribute.grpc_enable_http_proxy), ] - self.grpc_server = grpc.server( - futures.ThreadPoolExecutor(max_workers=client_num), - options=grpc_opts) - self.servicer = gRPCComServeFunc() + self.server_funcs = gRPCComServeFunc() + self.grpc_server = self.serve(max_workers=client_num, host=host, port=port, options=options) + self.neighbors = dict() + + def serve(self, max_workers, host, port, options): + """ + This function is referred to https://grpc.io/docs/languages/python/basics/#starting-the-server + """ + server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers), options=options) gRPC_comm_manager_pb2_grpc.add_gRPCComServeFuncServicer_to_server( - servicer=self.servicer, server=self.grpc_server) - self.grpc_server.add_insecure_port("{}:{}".format(host, port)) - self.host = host - self.port = port - self.grpc_server.start() + self.server_funcs, server) + server.add_insecure_port("{}:{}".format(host, port)) + server.start() - self.neighbors = dict() + return server def add_neighbors(self, neighbor_id, address): self.neighbors[neighbor_id] = '{}:{}'.format(address['host'], @@ -98,10 +88,16 @@ def get_neighbors(self, neighbor_id=None): return self.neighbors def _send(self, receiver_address, message): - channel = grpc.insecure_channel(receiver_address, + def _create_stub(receiver_address): + """ + This part is referred to https://grpc.io/docs/languages/python/basics/#creating-a-stub + """ + channel = grpc.insecure_channel(receiver_address, options=(('grpc.enable_http_proxy', 0), )) - stub = gRPC_comm_manager_pb2_grpc.gRPCComServeFuncStub(channel) + stub = gRPC_comm_manager_pb2_grpc.gRPCComServeFuncStub(channel) + return stub, channel + stub, channel = _create_stub(receiver_address) request = message.msg_to_json(to_list=True) stub.sendMessage(gRPC_comm_manager_pb2.MessageRequest(msg=request)) channel.close() @@ -121,7 +117,7 @@ def send(self, message): self._send(receiver_address, message) def receive(self): - received_msg = self.servicer.receive() + received_msg = self.server_funcs.receive() message = Message() message.json_to_msg(received_msg.msg) return message diff --git a/federatedscope/core/proto/gRPC_comm_manager.proto b/federatedscope/core/proto/gRPC_comm_manager.proto index efc9d65e6..e8f685b7d 100644 --- a/federatedscope/core/proto/gRPC_comm_manager.proto +++ b/federatedscope/core/proto/gRPC_comm_manager.proto @@ -1,7 +1,7 @@ syntax = "proto3"; service gRPCComServeFunc { - rpc sendMessage (MessageRequest) returns (MessageResponse); + rpc sendMessage (MessageRequest) returns (MessageResponse) {}; } message MessageRequest{