diff --git a/lib/http_connection.dart b/lib/http_connection.dart index b2a70ff..80a9308 100644 --- a/lib/http_connection.dart +++ b/lib/http_connection.dart @@ -133,7 +133,7 @@ class TransportSendQueue { Future stop() { _executing = false; - _sendBufferedData.complete(); + if (!_sendBufferedData.isCompleted) _sendBufferedData.complete(); return _sendLoopPromise; } @@ -149,7 +149,7 @@ class TransportSendQueue { } _buffer.add(data); - _sendBufferedData.complete(); + if (!_sendBufferedData.isCompleted) _sendBufferedData.complete(); } Future _sendLoop() async { @@ -158,7 +158,9 @@ class TransportSendQueue { if (!_executing) { if (_transportResult != null) { - _transportResult.completeError("Connection stopped."); + if (!_transportResult.isCompleted) { + _transportResult.completeError('Connection stopped.'); + } } break; @@ -177,11 +179,9 @@ class TransportSendQueue { try { await this.transport.send(data); - if (!transportResult.isCompleted) { - transportResult.complete(); - } + if (!transportResult.isCompleted) transportResult.complete(); } catch (error) { - transportResult.completeError(error); + if (!transportResult.isCompleted) transportResult.completeError(error); } } } @@ -648,7 +648,7 @@ class HttpConnection implements IConnection { if (_connectionState == ConnectionState.Disconnecting) { // A call to stop() induced this call to stopConnection and needs to be completed. // Any stop() awaiters will be scheduled to continue after the onclose callback fires. - _stopPromiseCompleter.complete(); + if (!_stopPromiseCompleter.isCompleted) _stopPromiseCompleter.complete(); } if (error != null) { diff --git a/lib/hub_connection.dart b/lib/hub_connection.dart index f8f648b..3bc6f05 100644 --- a/lib/hub_connection.dart +++ b/lib/hub_connection.dart @@ -397,25 +397,31 @@ class HubConnection { _callbacks[invocationDescriptor.invocationId] = (HubMessageBase invocationEvent, Exception error) { if (error != null) { - completer.completeError(error); + if (!completer.isCompleted) completer.completeError(error); return; } else if (invocationEvent != null) { if (invocationEvent is CompletionMessage) { if (invocationEvent.error != null) { - completer.completeError(new GeneralError(invocationEvent.error)); + if (!completer.isCompleted) { + completer.completeError(new GeneralError(invocationEvent.error)); + } } else { - completer.complete(invocationEvent.result); + if (!completer.isCompleted) { + completer.complete(invocationEvent.result); + } } } else { - completer.completeError(new GeneralError( - "Unexpected message type: ${invocationEvent.type}")); + if (!completer.isCompleted) { + completer.completeError(new GeneralError( + "Unexpected message type: ${invocationEvent.type}")); + } } } }; final promiseQueue = _sendWithProtocol(invocationDescriptor).catchError((e) { - completer.completeError(e); + if (!completer.isCompleted) completer.completeError(e); // invocationId will always have a value for a non-blocking invocation _callbacks.remove(invocationDescriptor.invocationId); }); @@ -586,7 +592,9 @@ class HubConnection { final error = GeneralError(message); - _handshakeCompleter?.completeError(error); + if (!_handshakeCompleter.isCompleted) { + _handshakeCompleter?.completeError(error); + } _handshakeCompleter = null; throw error; } @@ -597,14 +605,16 @@ class HubConnection { final error = GeneralError(message); - _handshakeCompleter?.completeError(error); + if (!_handshakeCompleter.isCompleted) { + _handshakeCompleter?.completeError(error); + } _handshakeCompleter = null; throw error; } else { _logger?.finer("Server handshake complete."); } - _handshakeCompleter?.complete(); + if (!_handshakeCompleter.isCompleted) _handshakeCompleter?.complete(); _handshakeCompleter = null; return handshakeResult.remainingData; } @@ -677,7 +687,7 @@ class HubConnection { // If the handshake is in progress, start will be waiting for the handshake promise, so we complete it. // If it has already completed, this should just noop. if (_handshakeCompleter != null) { - _handshakeCompleter.complete(); + if (!_handshakeCompleter.isCompleted) _handshakeCompleter.complete(); } _cancelCallbacksWithError(error ?? diff --git a/lib/web_socket_transport.dart b/lib/web_socket_transport.dart index 7d3f751..303391b 100644 --- a/lib/web_socket_transport.dart +++ b/lib/web_socket_transport.dart @@ -52,7 +52,7 @@ class WebSocketTransport implements ITransport { _logger?.finest("WebSocket try connecting to '$url'."); _webSocket = WebSocketChannel.connect(Uri.parse(url)); opened = true; - websocketCompleter.complete(); + if (!websocketCompleter.isCompleted) websocketCompleter.complete(); _logger?.info("WebSocket connected to '$url'."); _webSocketListenSub = _webSocket.stream.listen( // onData @@ -77,7 +77,9 @@ class WebSocketTransport implements ITransport { // onError onError: (Object error) { var e = error != null ? error : "Unknown websocket error"; - websocketCompleter.completeError(e); + if (!websocketCompleter.isCompleted) { + websocketCompleter.completeError(e); + } }, // onDone @@ -89,8 +91,10 @@ class WebSocketTransport implements ITransport { onClose(); } } else { - websocketCompleter - .completeError("There was an error with the transport."); + if (!websocketCompleter.isCompleted) { + websocketCompleter + .completeError("There was an error with the transport."); + } } }, ); diff --git a/lib/web_supporting_http_client.dart b/lib/web_supporting_http_client.dart index 02e2124..e382984 100644 --- a/lib/web_supporting_http_client.dart +++ b/lib/web_supporting_http_client.dart @@ -48,8 +48,9 @@ class WebSupportingHttpClient extends SignalRHttpClient { final abortFuture = Future(() { final completer = Completer(); if (request.abortSignal != null) { - request.abortSignal.onabort = - () => completer.completeError(AbortError()); + request.abortSignal.onabort = () { + if (!completer.isCompleted) completer.completeError(AbortError()); + }; } return completer.future; });