From c0ea76fa245d5226651be570c90708bd8b73307f Mon Sep 17 00:00:00 2001 From: Brian Quinlan Date: Tue, 9 Apr 2024 12:07:39 -0700 Subject: [PATCH] Implement IOWebSocketChannel as a WebSocketAdapterWebSocket subclass (dart-lang/web_socket_channel#342) --- pkgs/web_socket_channel/CHANGELOG.md | 2 + pkgs/web_socket_channel/lib/io.dart | 102 +++--------------- ...web_socket_adapter_web_socket_channel.dart | 26 +++-- pkgs/web_socket_channel/pubspec.yaml | 2 +- pkgs/web_socket_channel/test/io_test.dart | 7 +- .../web_socket_adapter_web_socket_test.dart | 21 +++- 6 files changed, 61 insertions(+), 99 deletions(-) diff --git a/pkgs/web_socket_channel/CHANGELOG.md b/pkgs/web_socket_channel/CHANGELOG.md index 139ef22c4b..5026c83c5e 100644 --- a/pkgs/web_socket_channel/CHANGELOG.md +++ b/pkgs/web_socket_channel/CHANGELOG.md @@ -4,6 +4,8 @@ default implementation for `WebSocketChannel.connect`. - **BREAKING**: Remove `WebSocketChannel` constructor. - **BREAKING**: Make `WebSocketChannel` an `abstract interface`. +- **BREAKING**: `IOWebSocketChannel.ready` will throw + `WebSocketChannelException` instead of `WebSocketException`. ## 2.4.5 diff --git a/pkgs/web_socket_channel/lib/io.dart b/pkgs/web_socket_channel/lib/io.dart index 9f71d812f9..cfba457af6 100644 --- a/pkgs/web_socket_channel/lib/io.dart +++ b/pkgs/web_socket_channel/lib/io.dart @@ -3,51 +3,15 @@ // BSD-style license that can be found in the LICENSE file. import 'dart:async'; -import 'dart:io'; - -import 'package:async/async.dart'; -import 'package:stream_channel/stream_channel.dart'; +import 'dart:io' show HttpClient, WebSocket; +import 'package:web_socket/io_web_socket.dart' as io_web_socket; import 'src/channel.dart'; import 'src/exception.dart'; -import 'src/sink_completer.dart'; +import 'web_socket_adapter_web_socket_channel.dart'; /// A [WebSocketChannel] that communicates using a `dart:io` [WebSocket]. -class IOWebSocketChannel extends StreamChannelMixin - implements WebSocketChannel { - /// The underlying `dart:io` [WebSocket]. - /// - /// If the channel was constructed with [IOWebSocketChannel.connect], this is - /// `null` until the [WebSocket.connect] future completes. - WebSocket? _webSocket; - - @override - String? get protocol => _webSocket?.protocol; - - @override - int? get closeCode => _webSocket?.closeCode; - - @override - String? get closeReason => _webSocket?.closeReason; - - @override - final Stream stream; - - @override - final WebSocketSink sink; - - /// Completer for [ready]. - final Completer _readyCompleter; - - @override - Future get ready => _readyCompleter.future; - - /// The underlying [WebSocket], if this channel has connected. - /// - /// If the future returned from [WebSocket.connect] has not yet completed, or - /// completed as an error, this will be null. - WebSocket? get innerWebSocket => _webSocket; - +class IOWebSocketChannel extends WebSocketAdapterWebSocketChannel { /// Creates a new WebSocket connection. /// /// Connects to [url] using [WebSocket.connect] and returns a channel that can @@ -76,58 +40,24 @@ class IOWebSocketChannel extends StreamChannelMixin Duration? connectTimeout, HttpClient? customClient, }) { - late IOWebSocketChannel channel; - final sinkCompleter = WebSocketSinkCompleter(); - var future = WebSocket.connect( + var webSocketFuture = WebSocket.connect( url.toString(), headers: headers, protocols: protocols, customClient: customClient, - ); + ).then((webSocket) => webSocket..pingInterval = pingInterval); + if (connectTimeout != null) { - future = future.timeout(connectTimeout); + webSocketFuture = webSocketFuture.timeout(connectTimeout); } - final stream = StreamCompleter.fromFuture(future.then((webSocket) { - webSocket.pingInterval = pingInterval; - channel._webSocket = webSocket; - channel._readyCompleter.complete(); - sinkCompleter.setDestinationSink(_IOWebSocketSink(webSocket)); - return webSocket; - }).catchError((Object error, StackTrace stackTrace) { - channel._readyCompleter.completeError(error, stackTrace); - throw WebSocketChannelException.from(error); - })); - return channel = - IOWebSocketChannel._withoutSocket(stream, sinkCompleter.sink); - } - - /// Creates a channel wrapping [socket]. - IOWebSocketChannel(WebSocket socket) - : _webSocket = socket, - stream = socket.handleError( - (Object? error) => throw WebSocketChannelException.from(error)), - sink = _IOWebSocketSink(socket), - _readyCompleter = Completer()..complete(); - /// Creates a channel without a socket. - /// - /// This is used with `connect` to synchronously provide a channel that later - /// has a socket added. - IOWebSocketChannel._withoutSocket(Stream stream, this.sink) - : _webSocket = null, - stream = stream.handleError( - (Object? error) => throw WebSocketChannelException.from(error)), - _readyCompleter = Completer(); -} - -/// A [WebSocketSink] that forwards [close] calls to a `dart:io` [WebSocket]. -class _IOWebSocketSink extends DelegatingStreamSink implements WebSocketSink { - /// The underlying socket. - final WebSocket _webSocket; - - _IOWebSocketSink(WebSocket super.webSocket) : _webSocket = webSocket; + return IOWebSocketChannel(webSocketFuture); + } - @override - Future close([int? closeCode, String? closeReason]) => - _webSocket.close(closeCode, closeReason); + /// Creates a channel wrapping [webSocket]. + IOWebSocketChannel(FutureOr webSocket) + : super(webSocket is Future + ? webSocket.then(io_web_socket.IOWebSocket.fromWebSocket) + as FutureOr + : io_web_socket.IOWebSocket.fromWebSocket(webSocket)); } diff --git a/pkgs/web_socket_channel/lib/web_socket_adapter_web_socket_channel.dart b/pkgs/web_socket_channel/lib/web_socket_adapter_web_socket_channel.dart index 28780ffee6..2a1242a871 100644 --- a/pkgs/web_socket_channel/lib/web_socket_adapter_web_socket_channel.dart +++ b/pkgs/web_socket_channel/lib/web_socket_adapter_web_socket_channel.dart @@ -66,14 +66,19 @@ class WebSocketAdapterWebSocketChannel extends StreamChannelMixin /// future will complete with an error. factory WebSocketAdapterWebSocketChannel.connect(Uri url, {Iterable? protocols}) => - WebSocketAdapterWebSocketChannel._( + WebSocketAdapterWebSocketChannel( WebSocket.connect(url, protocols: protocols)); - // Create a [WebSocketWebSocketChannelAdapter] from an existing [WebSocket]. - factory WebSocketAdapterWebSocketChannel.fromWebSocket(WebSocket webSocket) => - WebSocketAdapterWebSocketChannel._(Future.value(webSocket)); + // Construct a [WebSocketWebSocketChannelAdapter] from an existing + // [WebSocket]. + WebSocketAdapterWebSocketChannel(FutureOr webSocket) { + Future webSocketFuture; + if (webSocket is WebSocket) { + webSocketFuture = Future.value(webSocket); + } else { + webSocketFuture = webSocket; + } - WebSocketAdapterWebSocketChannel._(Future webSocketFuture) { webSocketFuture.then((webSocket) { var remoteClosed = false; webSocket.events.listen((event) { @@ -113,7 +118,16 @@ class WebSocketAdapterWebSocketChannel extends StreamChannelMixin _protocol = webSocket.protocol; _readyCompleter.complete(); }, onError: (Object e) { - _readyCompleter.completeError(WebSocketChannelException.from(e)); + Exception error; + if (e is TimeoutException) { + // Required for backwards compatibility with `IOWebSocketChannel`. + error = e; + } else { + error = WebSocketChannelException.from(e); + } + _readyCompleter.completeError(error); + _controller.local.sink.addError(error); + _controller.local.sink.close(); }); } } diff --git a/pkgs/web_socket_channel/pubspec.yaml b/pkgs/web_socket_channel/pubspec.yaml index 2dfbc7f738..d198922290 100644 --- a/pkgs/web_socket_channel/pubspec.yaml +++ b/pkgs/web_socket_channel/pubspec.yaml @@ -14,7 +14,7 @@ dependencies: crypto: ^3.0.0 stream_channel: ^2.1.0 web: ^0.5.0 - web_socket: ^0.1.0 + web_socket: ^0.1.1 dev_dependencies: dart_flutter_team_lints: ^2.0.0 diff --git a/pkgs/web_socket_channel/test/io_test.dart b/pkgs/web_socket_channel/test/io_test.dart index ac1ffcc0c7..1b7ae35611 100644 --- a/pkgs/web_socket_channel/test/io_test.dart +++ b/pkgs/web_socket_channel/test/io_test.dart @@ -131,7 +131,7 @@ void main() { }); final channel = IOWebSocketChannel.connect('ws://localhost:${server.port}'); - expect(channel.ready, throwsA(isA())); + expect(channel.ready, throwsA(isA())); expect(channel.stream.drain(), throwsA(isA())); }); @@ -154,7 +154,7 @@ void main() { 'ws://localhost:${server.port}', protocols: [failedProtocol], ); - expect(channel.ready, throwsA(isA())); + expect(channel.ready, throwsA(isA())); expect( channel.stream.drain(), throwsA(isA()), @@ -230,8 +230,7 @@ void main() { ); expect(channel.ready, throwsA(isA())); - expect(channel.stream.drain(), - throwsA(isA())); + expect(channel.stream.drain(), throwsA(anything)); }); test('.custom client is passed through', () async { diff --git a/pkgs/web_socket_channel/test/web_socket_adapter_web_socket_test.dart b/pkgs/web_socket_channel/test/web_socket_adapter_web_socket_test.dart index 24f9907d96..c66133b6e2 100644 --- a/pkgs/web_socket_channel/test/web_socket_adapter_web_socket_test.dart +++ b/pkgs/web_socket_channel/test/web_socket_adapter_web_socket_test.dart @@ -112,9 +112,26 @@ void main() { expect(channel.closeReason, null); }); - test('fromWebSocket', () async { + test('constructor with WebSocket', () async { final webSocket = await WebSocket.connect(uri); - final channel = WebSocketAdapterWebSocketChannel.fromWebSocket(webSocket); + final channel = WebSocketAdapterWebSocketChannel(webSocket); + + await expectLater(channel.ready, completes); + channel.sink.add('This count says:'); + channel.sink.add([1, 2, 3]); + channel.sink.add('And then:'); + channel.sink.add([4, 5, 6]); + expect(await channel.stream.take(4).toList(), [ + 'This count says:', + [1, 2, 3], + 'And then:', + [4, 5, 6] + ]); + }); + + test('constructor with Future', () async { + final webSocketFuture = WebSocket.connect(uri); + final channel = WebSocketAdapterWebSocketChannel(webSocketFuture); await expectLater(channel.ready, completes); channel.sink.add('This count says:');