From b4e9b4ed0288e29a75335c146783a404db3e1918 Mon Sep 17 00:00:00 2001 From: Aleksandr Sokolovskii Date: Tue, 29 Jan 2019 14:42:16 +0300 Subject: [PATCH] use Isolation for sending and receiving chat messages --- flutter-grpc-tutorial.code-workspace | 1 + flutter_client/lib/chat_message_outgoing.dart | 2 +- flutter_client/lib/chat_screen.dart | 2 +- flutter_client/lib/chat_service.dart | 285 +++++++++++------- 4 files changed, 187 insertions(+), 103 deletions(-) diff --git a/flutter-grpc-tutorial.code-workspace b/flutter-grpc-tutorial.code-workspace index 91b2da9..e9526a6 100644 --- a/flutter-grpc-tutorial.code-workspace +++ b/flutter-grpc-tutorial.code-workspace @@ -15,6 +15,7 @@ ], "settings": { "cSpell.words": [ + "Friendlychat", "Stateful", "amsokol", "grpc", diff --git a/flutter_client/lib/chat_message_outgoing.dart b/flutter_client/lib/chat_message_outgoing.dart index ac3e4ab..27070eb 100644 --- a/flutter_client/lib/chat_message_outgoing.dart +++ b/flutter_client/lib/chat_message_outgoing.dart @@ -8,7 +8,7 @@ const String _name = "Me"; /// Outgoing message statuses /// UNKNOWN - message just created and is not sent yet /// SENT - message is sent to the server successfully -enum MessageOutgoingStatus { UNKNOWN, SENT } +enum MessageOutgoingStatus { UNKNOWN, SENT, FAILED } /// MessageOutgoing is class defining message data (id and text) and status class MessageOutgoing extends Message { diff --git a/flutter_client/lib/chat_screen.dart b/flutter_client/lib/chat_screen.dart index 633a50a..3ef7b27 100644 --- a/flutter_client/lib/chat_screen.dart +++ b/flutter_client/lib/chat_screen.dart @@ -51,7 +51,7 @@ class ChatScreenState extends State with TickerProviderStateMixin { onSentError: onSentError, onReceivedSuccess: onReceivedSuccess, onReceivedError: onReceivedError); - _service.startListening(); + _service.start(); } @override diff --git a/flutter_client/lib/chat_service.dart b/flutter_client/lib/chat_service.dart index 6164d94..f120de8 100644 --- a/flutter_client/lib/chat_service.dart +++ b/flutter_client/lib/chat_service.dart @@ -1,3 +1,5 @@ +import 'dart:isolate'; +import 'dart:io'; import 'package:grpc/grpc.dart'; import 'api/v1/chat.pbgrpc.dart' as grpc; @@ -7,19 +9,25 @@ import 'chat_message.dart'; import 'chat_message_outgoing.dart'; /// CHANGE TO IP ADDRESS OF YOUR SERVER IF IT IS NECESSARY -const serverIP = "127.0.0.1"; +const serverIP = "10.0.2.2"; const serverPort = 3000; /// ChatService client implementation class ChatService { - /// Flag is indicating that client is shutting down - bool _isShutdown = false; + // _isolateSending is isolate to send chat messages + Isolate _isolateSending; - /// gRPC client channel to send messages to the server - ClientChannel _clientSend; + // Port to send message + SendPort _portSending; - /// gRPC client channel to receive messages from the server - ClientChannel _clientReceive; + // Port to get status of message sending + ReceivePort _portSendStatus; + + // _isolateReceiving is isolate to receive chat messages + Isolate _isolateReceiving; + + // Port to receive messages + ReceivePort _portReceiving; /// Event is raised when message has been sent to the server successfully final void Function(MessageOutgoing message) onSentSuccess; @@ -38,117 +46,192 @@ class ChatService { {this.onSentSuccess, this.onSentError, this.onReceivedSuccess, - this.onReceivedError}); - - // Shutdown client - Future shutdown() async { - _isShutdown = true; - _shutdownSend(); - _shutdownReceive(); + this.onReceivedError}) + : _portSendStatus = ReceivePort(), + _portReceiving = ReceivePort(); + + // Start threads to send and receive messages + void start() { + _startSending(); + _startReceiving(); } - // Shutdown client (send channel) - void _shutdownSend() { - if (_clientSend != null) { - _clientSend.shutdown(); - _clientSend = null; + /// Start thread to send messages + void _startSending() async { + // start thread to send messages + _isolateSending = + await Isolate.spawn(_sendingIsolate, _portSendStatus.sendPort); + + // listen send status + await for (var msg in _portSendStatus) { + if (msg is SendPort) { + _portSending = msg; + } else { + var message = msg[0] as MessageOutgoing; + var statusDetails = msg[1] as String; + switch (message.status) { + case MessageOutgoingStatus.SENT: + // call for success handler + if (onSentSuccess != null) { + onSentSuccess(message); + } + break; + case MessageOutgoingStatus.FAILED: + // call for error handler + if (onSentError != null) { + onSentError(message, statusDetails); + } + break; + default: + // call for error handler + if (onSentError != null) { + onSentError(message, "unexpected message status"); + } + break; + } + } } } - // Shutdown client (receive channel) - void _shutdownReceive() { - if (_clientReceive != null) { - _clientReceive.shutdown(); - _clientReceive = null; - } - } + /// Thread to send messages + static void _sendingIsolate(SendPort portSendStatus) async { + // Port to get messages to send + ReceivePort portSendMessages = ReceivePort(); + + // send port to send messages to the caller + portSendStatus.send(portSendMessages.sendPort); + + ClientChannel client; + + // waiting messages to send + await for (MessageOutgoing message in portSendMessages) { + var sent = false; + do { + if (client == null) { + // create new client + client = ClientChannel( + serverIP, // Your IP here or localhost + port: serverPort, + options: ChannelOptions( + //TODO: Change to secure with server certificates + credentials: ChannelCredentials.insecure(), + idleTimeout: Duration(seconds: 1), + ), + ); + } - /// Send message to the server - void send(MessageOutgoing message) { - if (_clientSend == null) { - // create new client - _clientSend = ClientChannel( - serverIP, // Your IP here or localhost - port: serverPort, - options: ChannelOptions( - //TODO: Change to secure with server certificates - credentials: ChannelCredentials.insecure(), - idleTimeout: Duration(seconds: 10), - ), - ); - } + MessageOutgoingStatus statusCode; + String statusDetails; + + try { + // try to send + var request = StringValue.create(); + request.value = message.text; + await grpc.ChatServiceClient(client).send(request); + // sent successfully + statusCode = MessageOutgoingStatus.SENT; + sent = true; + } catch (e) { + // sent failed + statusCode = MessageOutgoingStatus.FAILED; + statusDetails = e.toString(); + // reset client + client.shutdown(); + client = null; + } finally { + var msg = MessageOutgoing( + text: message.text, id: message.id, status: statusCode); + portSendStatus.send([ + msg, + statusDetails, + ]); + } - var request = StringValue.create(); - request.value = message.text; - - grpc.ChatServiceClient(_clientSend).send(request).then((_) { - // call for success handler - if (onSentSuccess != null) { - var sentMessage = MessageOutgoing( - text: message.text, - id: message.id, - status: MessageOutgoingStatus.SENT); - onSentSuccess(sentMessage); - } - }).catchError((e) { - if (!_isShutdown) { - // invalidate current client - _shutdownSend(); + if (!sent) { + // try to send again + sleep(Duration(seconds: 5)); + } + } while (!sent); + } + } + /// Start listening messages from the server + void _startReceiving() async { + // start thread to receive messages + _isolateReceiving = + await Isolate.spawn(_receivingIsolate, _portReceiving.sendPort); + + // listen for incoming messages + await for (var msg in _portReceiving) { + var message = msg[0] as Message; + var error = msg[1] as String; + if (error != null) { // call for error handler - if (onSentError != null) { - onSentError(message, e.toString()); + if (onReceivedError != null) { + onReceivedError(error); + } + } else { + if (onReceivedSuccess != null) { + onReceivedSuccess(message); } - - // try to send again - Future.delayed(Duration(seconds: 30), () { - send(message); - }); } - }); + } } - /// Start listening messages from the server - void startListening() { - if (_clientReceive == null) { - // create new client - _clientReceive = ClientChannel( - serverIP, // Your IP here or localhost - port: serverPort, - options: ChannelOptions( - //TODO: Change to secure with server certificates - credentials: ChannelCredentials.insecure(), - idleTimeout: Duration(seconds: 10), - ), - ); - } + /// Thread to listen messages from the server + static void _receivingIsolate(SendPort portReceive) async { + ClientChannel client; + + do { + if (client == null) { + // create new client + client = ClientChannel( + serverIP, // Your IP here or localhost + port: serverPort, + options: ChannelOptions( + //TODO: Change to secure with server certificates + credentials: ChannelCredentials.insecure(), + idleTimeout: Duration(seconds: 1), + ), + ); + } - var stream = - grpc.ChatServiceClient(_clientReceive).subscribe(Empty.create()); + var stream = grpc.ChatServiceClient(client).subscribe(Empty.create()); - stream.forEach((msg) { - if (onReceivedSuccess != null) { - var message = Message(msg.text); - onReceivedSuccess(message); + try { + await for (var msg in stream) { + var message = Message(msg.text); + portReceive.send([message, null]); + } + } catch (e) { + // reset client + client.shutdown(); + client = null; + // notify caller + portReceive.send([null, e.toString()]); } - }).then((_) { - // raise exception to start listening again - throw Exception("stream from the server has been closed"); - }).catchError((e) { - if (!_isShutdown) { - // invalidate current client - _shutdownReceive(); + // try to connect again + sleep(Duration(seconds: 5)); + } while (true); + } - // call for error handler - if (onReceivedError != null) { - onReceivedError(e.toString()); - } + // Shutdown client + void shutdown() { + // stop sending + _isolateSending?.kill(priority: Isolate.immediate); + _isolateSending = null; + _portSendStatus?.close(); + _portSendStatus = null; + + // stop receiving + _isolateReceiving?.kill(priority: Isolate.immediate); + _isolateReceiving = null; + _portReceiving?.close(); + _portReceiving = null; + } - // start listening again - Future.delayed(Duration(seconds: 30), () { - startListening(); - }); - } - }); + /// Send message to the server + void send(MessageOutgoing message) { + _portSending.send(message); } }