Skip to content

Commit

Permalink
Protection against double completion of completers
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeesouth committed Jun 22, 2021
1 parent d0236a0 commit 669084f
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 24 deletions.
16 changes: 8 additions & 8 deletions lib/http_connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class TransportSendQueue {

Future<void> stop() {
_executing = false;
_sendBufferedData.complete();
if (!_sendBufferedData.isCompleted) _sendBufferedData.complete();
return _sendLoopPromise;
}

Expand All @@ -149,7 +149,7 @@ class TransportSendQueue {
}

_buffer.add(data);
_sendBufferedData.complete();
if (!_sendBufferedData.isCompleted) _sendBufferedData.complete();
}

Future<void> _sendLoop() async {
Expand All @@ -158,7 +158,9 @@ class TransportSendQueue {

if (!_executing) {
if (_transportResult != null) {
_transportResult.completeError("Connection stopped.");
if (!_transportResult.isCompleted) {
_transportResult.completeError('Connection stopped.');
}
}

break;
Expand All @@ -177,11 +179,9 @@ class TransportSendQueue {

try {
await this.transport.send(data);
if (!transportResult.isCompleted) {
transportResult.complete();
}
if (!transportResult.isCompleted) transportResult.complete();
} catch (error) {
transportResult.completeError(error);
if (!transportResult.isCompleted) transportResult.completeError(error);
}
}
}
Expand Down Expand Up @@ -648,7 +648,7 @@ class HttpConnection implements IConnection {
if (_connectionState == ConnectionState.Disconnecting) {
// A call to stop() induced this call to stopConnection and needs to be completed.
// Any stop() awaiters will be scheduled to continue after the onclose callback fires.
_stopPromiseCompleter.complete();
if (!_stopPromiseCompleter.isCompleted) _stopPromiseCompleter.complete();
}

if (error != null) {
Expand Down
30 changes: 20 additions & 10 deletions lib/hub_connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -397,25 +397,31 @@ class HubConnection {
_callbacks[invocationDescriptor.invocationId] =
(HubMessageBase invocationEvent, Exception error) {
if (error != null) {
completer.completeError(error);
if (!completer.isCompleted) completer.completeError(error);
return;
} else if (invocationEvent != null) {
if (invocationEvent is CompletionMessage) {
if (invocationEvent.error != null) {
completer.completeError(new GeneralError(invocationEvent.error));
if (!completer.isCompleted) {
completer.completeError(new GeneralError(invocationEvent.error));
}
} else {
completer.complete(invocationEvent.result);
if (!completer.isCompleted) {
completer.complete(invocationEvent.result);
}
}
} else {
completer.completeError(new GeneralError(
"Unexpected message type: ${invocationEvent.type}"));
if (!completer.isCompleted) {
completer.completeError(new GeneralError(
"Unexpected message type: ${invocationEvent.type}"));
}
}
}
};

final promiseQueue =
_sendWithProtocol(invocationDescriptor).catchError((e) {
completer.completeError(e);
if (!completer.isCompleted) completer.completeError(e);
// invocationId will always have a value for a non-blocking invocation
_callbacks.remove(invocationDescriptor.invocationId);
});
Expand Down Expand Up @@ -586,7 +592,9 @@ class HubConnection {

final error = GeneralError(message);

_handshakeCompleter?.completeError(error);
if (!_handshakeCompleter.isCompleted) {
_handshakeCompleter?.completeError(error);
}
_handshakeCompleter = null;
throw error;
}
Expand All @@ -597,14 +605,16 @@ class HubConnection {

final error = GeneralError(message);

_handshakeCompleter?.completeError(error);
if (!_handshakeCompleter.isCompleted) {
_handshakeCompleter?.completeError(error);
}
_handshakeCompleter = null;
throw error;
} else {
_logger?.finer("Server handshake complete.");
}

_handshakeCompleter?.complete();
if (!_handshakeCompleter.isCompleted) _handshakeCompleter?.complete();
_handshakeCompleter = null;
return handshakeResult.remainingData;
}
Expand Down Expand Up @@ -677,7 +687,7 @@ class HubConnection {
// If the handshake is in progress, start will be waiting for the handshake promise, so we complete it.
// If it has already completed, this should just noop.
if (_handshakeCompleter != null) {
_handshakeCompleter.complete();
if (!_handshakeCompleter.isCompleted) _handshakeCompleter.complete();
}

_cancelCallbacksWithError(error ??
Expand Down
12 changes: 8 additions & 4 deletions lib/web_socket_transport.dart
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class WebSocketTransport implements ITransport {
_logger?.finest("WebSocket try connecting to '$url'.");
_webSocket = WebSocketChannel.connect(Uri.parse(url));
opened = true;
websocketCompleter.complete();
if (!websocketCompleter.isCompleted) websocketCompleter.complete();
_logger?.info("WebSocket connected to '$url'.");
_webSocketListenSub = _webSocket.stream.listen(
// onData
Expand All @@ -77,7 +77,9 @@ class WebSocketTransport implements ITransport {
// onError
onError: (Object error) {
var e = error != null ? error : "Unknown websocket error";
websocketCompleter.completeError(e);
if (!websocketCompleter.isCompleted) {
websocketCompleter.completeError(e);
}
},

// onDone
Expand All @@ -89,8 +91,10 @@ class WebSocketTransport implements ITransport {
onClose();
}
} else {
websocketCompleter
.completeError("There was an error with the transport.");
if (!websocketCompleter.isCompleted) {
websocketCompleter
.completeError("There was an error with the transport.");
}
}
},
);
Expand Down
5 changes: 3 additions & 2 deletions lib/web_supporting_http_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ class WebSupportingHttpClient extends SignalRHttpClient {
final abortFuture = Future<void>(() {
final completer = Completer<void>();
if (request.abortSignal != null) {
request.abortSignal.onabort =
() => completer.completeError(AbortError());
request.abortSignal.onabort = () {
if (!completer.isCompleted) completer.completeError(AbortError());
};
}
return completer.future;
});
Expand Down

0 comments on commit 669084f

Please sign in to comment.