Skip to content

Commit

Permalink
use Isolation for sending and receiving chat messages
Browse files Browse the repository at this point in the history
  • Loading branch information
Aleksandr Sokolovskii committed Jan 29, 2019
1 parent bd95633 commit b4e9b4e
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 103 deletions.
1 change: 1 addition & 0 deletions flutter-grpc-tutorial.code-workspace
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
],
"settings": {
"cSpell.words": [
"Friendlychat",
"Stateful",
"amsokol",
"grpc",
Expand Down
2 changes: 1 addition & 1 deletion flutter_client/lib/chat_message_outgoing.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const String _name = "Me";
/// Outgoing message statuses
/// UNKNOWN - message just created and is not sent yet
/// SENT - message is sent to the server successfully
enum MessageOutgoingStatus { UNKNOWN, SENT }
enum MessageOutgoingStatus { UNKNOWN, SENT, FAILED }

/// MessageOutgoing is class defining message data (id and text) and status
class MessageOutgoing extends Message {
Expand Down
2 changes: 1 addition & 1 deletion flutter_client/lib/chat_screen.dart
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class ChatScreenState extends State<ChatScreen> with TickerProviderStateMixin {
onSentError: onSentError,
onReceivedSuccess: onReceivedSuccess,
onReceivedError: onReceivedError);
_service.startListening();
_service.start();
}

@override
Expand Down
285 changes: 184 additions & 101 deletions flutter_client/lib/chat_service.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import 'dart:isolate';
import 'dart:io';
import 'package:grpc/grpc.dart';

import 'api/v1/chat.pbgrpc.dart' as grpc;
Expand All @@ -7,19 +9,25 @@ import 'chat_message.dart';
import 'chat_message_outgoing.dart';

/// CHANGE TO IP ADDRESS OF YOUR SERVER IF IT IS NECESSARY
const serverIP = "127.0.0.1";
const serverIP = "10.0.2.2";
const serverPort = 3000;

/// ChatService client implementation
class ChatService {
/// Flag is indicating that client is shutting down
bool _isShutdown = false;
// _isolateSending is isolate to send chat messages
Isolate _isolateSending;

/// gRPC client channel to send messages to the server
ClientChannel _clientSend;
// Port to send message
SendPort _portSending;

/// gRPC client channel to receive messages from the server
ClientChannel _clientReceive;
// Port to get status of message sending
ReceivePort _portSendStatus;

// _isolateReceiving is isolate to receive chat messages
Isolate _isolateReceiving;

// Port to receive messages
ReceivePort _portReceiving;

/// Event is raised when message has been sent to the server successfully
final void Function(MessageOutgoing message) onSentSuccess;
Expand All @@ -38,117 +46,192 @@ class ChatService {
{this.onSentSuccess,
this.onSentError,
this.onReceivedSuccess,
this.onReceivedError});

// Shutdown client
Future<void> shutdown() async {
_isShutdown = true;
_shutdownSend();
_shutdownReceive();
this.onReceivedError})
: _portSendStatus = ReceivePort(),
_portReceiving = ReceivePort();

// Start threads to send and receive messages
void start() {
_startSending();
_startReceiving();
}

// Shutdown client (send channel)
void _shutdownSend() {
if (_clientSend != null) {
_clientSend.shutdown();
_clientSend = null;
/// Start thread to send messages
void _startSending() async {
// start thread to send messages
_isolateSending =
await Isolate.spawn(_sendingIsolate, _portSendStatus.sendPort);

// listen send status
await for (var msg in _portSendStatus) {
if (msg is SendPort) {
_portSending = msg;
} else {
var message = msg[0] as MessageOutgoing;
var statusDetails = msg[1] as String;
switch (message.status) {
case MessageOutgoingStatus.SENT:
// call for success handler
if (onSentSuccess != null) {
onSentSuccess(message);
}
break;
case MessageOutgoingStatus.FAILED:
// call for error handler
if (onSentError != null) {
onSentError(message, statusDetails);
}
break;
default:
// call for error handler
if (onSentError != null) {
onSentError(message, "unexpected message status");
}
break;
}
}
}
}

// Shutdown client (receive channel)
void _shutdownReceive() {
if (_clientReceive != null) {
_clientReceive.shutdown();
_clientReceive = null;
}
}
/// Thread to send messages
static void _sendingIsolate(SendPort portSendStatus) async {
// Port to get messages to send
ReceivePort portSendMessages = ReceivePort();

// send port to send messages to the caller
portSendStatus.send(portSendMessages.sendPort);

ClientChannel client;

// waiting messages to send
await for (MessageOutgoing message in portSendMessages) {
var sent = false;
do {
if (client == null) {
// create new client
client = ClientChannel(
serverIP, // Your IP here or localhost
port: serverPort,
options: ChannelOptions(
//TODO: Change to secure with server certificates
credentials: ChannelCredentials.insecure(),
idleTimeout: Duration(seconds: 1),
),
);
}

/// Send message to the server
void send(MessageOutgoing message) {
if (_clientSend == null) {
// create new client
_clientSend = ClientChannel(
serverIP, // Your IP here or localhost
port: serverPort,
options: ChannelOptions(
//TODO: Change to secure with server certificates
credentials: ChannelCredentials.insecure(),
idleTimeout: Duration(seconds: 10),
),
);
}
MessageOutgoingStatus statusCode;
String statusDetails;

try {
// try to send
var request = StringValue.create();
request.value = message.text;
await grpc.ChatServiceClient(client).send(request);
// sent successfully
statusCode = MessageOutgoingStatus.SENT;
sent = true;
} catch (e) {
// sent failed
statusCode = MessageOutgoingStatus.FAILED;
statusDetails = e.toString();
// reset client
client.shutdown();
client = null;
} finally {
var msg = MessageOutgoing(
text: message.text, id: message.id, status: statusCode);
portSendStatus.send([
msg,
statusDetails,
]);
}

var request = StringValue.create();
request.value = message.text;

grpc.ChatServiceClient(_clientSend).send(request).then((_) {
// call for success handler
if (onSentSuccess != null) {
var sentMessage = MessageOutgoing(
text: message.text,
id: message.id,
status: MessageOutgoingStatus.SENT);
onSentSuccess(sentMessage);
}
}).catchError((e) {
if (!_isShutdown) {
// invalidate current client
_shutdownSend();
if (!sent) {
// try to send again
sleep(Duration(seconds: 5));
}
} while (!sent);
}
}

/// Start listening messages from the server
void _startReceiving() async {
// start thread to receive messages
_isolateReceiving =
await Isolate.spawn(_receivingIsolate, _portReceiving.sendPort);

// listen for incoming messages
await for (var msg in _portReceiving) {
var message = msg[0] as Message;
var error = msg[1] as String;
if (error != null) {
// call for error handler
if (onSentError != null) {
onSentError(message, e.toString());
if (onReceivedError != null) {
onReceivedError(error);
}
} else {
if (onReceivedSuccess != null) {
onReceivedSuccess(message);
}

// try to send again
Future.delayed(Duration(seconds: 30), () {
send(message);
});
}
});
}
}

/// Start listening messages from the server
void startListening() {
if (_clientReceive == null) {
// create new client
_clientReceive = ClientChannel(
serverIP, // Your IP here or localhost
port: serverPort,
options: ChannelOptions(
//TODO: Change to secure with server certificates
credentials: ChannelCredentials.insecure(),
idleTimeout: Duration(seconds: 10),
),
);
}
/// Thread to listen messages from the server
static void _receivingIsolate(SendPort portReceive) async {
ClientChannel client;

do {
if (client == null) {
// create new client
client = ClientChannel(
serverIP, // Your IP here or localhost
port: serverPort,
options: ChannelOptions(
//TODO: Change to secure with server certificates
credentials: ChannelCredentials.insecure(),
idleTimeout: Duration(seconds: 1),
),
);
}

var stream =
grpc.ChatServiceClient(_clientReceive).subscribe(Empty.create());
var stream = grpc.ChatServiceClient(client).subscribe(Empty.create());

stream.forEach((msg) {
if (onReceivedSuccess != null) {
var message = Message(msg.text);
onReceivedSuccess(message);
try {
await for (var msg in stream) {
var message = Message(msg.text);
portReceive.send([message, null]);
}
} catch (e) {
// reset client
client.shutdown();
client = null;
// notify caller
portReceive.send([null, e.toString()]);
}
}).then((_) {
// raise exception to start listening again
throw Exception("stream from the server has been closed");
}).catchError((e) {
if (!_isShutdown) {
// invalidate current client
_shutdownReceive();
// try to connect again
sleep(Duration(seconds: 5));
} while (true);
}

// call for error handler
if (onReceivedError != null) {
onReceivedError(e.toString());
}
// Shutdown client
void shutdown() {
// stop sending
_isolateSending?.kill(priority: Isolate.immediate);
_isolateSending = null;
_portSendStatus?.close();
_portSendStatus = null;

// stop receiving
_isolateReceiving?.kill(priority: Isolate.immediate);
_isolateReceiving = null;
_portReceiving?.close();
_portReceiving = null;
}

// start listening again
Future.delayed(Duration(seconds: 30), () {
startListening();
});
}
});
/// Send message to the server
void send(MessageOutgoing message) {
_portSending.send(message);
}
}

0 comments on commit b4e9b4e

Please sign in to comment.