Skip to content

Commit

Permalink
Implement IOWebSocketChannel as a WebSocketAdapterWebSocket subclass (d…
Browse files Browse the repository at this point in the history
  • Loading branch information
brianquinlan authored Apr 9, 2024
1 parent 510f396 commit c0ea76f
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 99 deletions.
2 changes: 2 additions & 0 deletions pkgs/web_socket_channel/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
default implementation for `WebSocketChannel.connect`.
- **BREAKING**: Remove `WebSocketChannel` constructor.
- **BREAKING**: Make `WebSocketChannel` an `abstract interface`.
- **BREAKING**: `IOWebSocketChannel.ready` will throw
`WebSocketChannelException` instead of `WebSocketException`.

## 2.4.5

Expand Down
102 changes: 16 additions & 86 deletions pkgs/web_socket_channel/lib/io.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,51 +3,15 @@
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:io';

import 'package:async/async.dart';
import 'package:stream_channel/stream_channel.dart';
import 'dart:io' show HttpClient, WebSocket;
import 'package:web_socket/io_web_socket.dart' as io_web_socket;

import 'src/channel.dart';
import 'src/exception.dart';
import 'src/sink_completer.dart';
import 'web_socket_adapter_web_socket_channel.dart';

/// A [WebSocketChannel] that communicates using a `dart:io` [WebSocket].
class IOWebSocketChannel extends StreamChannelMixin
implements WebSocketChannel {
/// The underlying `dart:io` [WebSocket].
///
/// If the channel was constructed with [IOWebSocketChannel.connect], this is
/// `null` until the [WebSocket.connect] future completes.
WebSocket? _webSocket;

@override
String? get protocol => _webSocket?.protocol;

@override
int? get closeCode => _webSocket?.closeCode;

@override
String? get closeReason => _webSocket?.closeReason;

@override
final Stream stream;

@override
final WebSocketSink sink;

/// Completer for [ready].
final Completer<void> _readyCompleter;

@override
Future<void> get ready => _readyCompleter.future;

/// The underlying [WebSocket], if this channel has connected.
///
/// If the future returned from [WebSocket.connect] has not yet completed, or
/// completed as an error, this will be null.
WebSocket? get innerWebSocket => _webSocket;

class IOWebSocketChannel extends WebSocketAdapterWebSocketChannel {
/// Creates a new WebSocket connection.
///
/// Connects to [url] using [WebSocket.connect] and returns a channel that can
Expand Down Expand Up @@ -76,58 +40,24 @@ class IOWebSocketChannel extends StreamChannelMixin
Duration? connectTimeout,
HttpClient? customClient,
}) {
late IOWebSocketChannel channel;
final sinkCompleter = WebSocketSinkCompleter();
var future = WebSocket.connect(
var webSocketFuture = WebSocket.connect(
url.toString(),
headers: headers,
protocols: protocols,
customClient: customClient,
);
).then((webSocket) => webSocket..pingInterval = pingInterval);

if (connectTimeout != null) {
future = future.timeout(connectTimeout);
webSocketFuture = webSocketFuture.timeout(connectTimeout);
}
final stream = StreamCompleter.fromFuture(future.then((webSocket) {
webSocket.pingInterval = pingInterval;
channel._webSocket = webSocket;
channel._readyCompleter.complete();
sinkCompleter.setDestinationSink(_IOWebSocketSink(webSocket));
return webSocket;
}).catchError((Object error, StackTrace stackTrace) {
channel._readyCompleter.completeError(error, stackTrace);
throw WebSocketChannelException.from(error);
}));
return channel =
IOWebSocketChannel._withoutSocket(stream, sinkCompleter.sink);
}

/// Creates a channel wrapping [socket].
IOWebSocketChannel(WebSocket socket)
: _webSocket = socket,
stream = socket.handleError(
(Object? error) => throw WebSocketChannelException.from(error)),
sink = _IOWebSocketSink(socket),
_readyCompleter = Completer()..complete();

/// Creates a channel without a socket.
///
/// This is used with `connect` to synchronously provide a channel that later
/// has a socket added.
IOWebSocketChannel._withoutSocket(Stream stream, this.sink)
: _webSocket = null,
stream = stream.handleError(
(Object? error) => throw WebSocketChannelException.from(error)),
_readyCompleter = Completer();
}

/// A [WebSocketSink] that forwards [close] calls to a `dart:io` [WebSocket].
class _IOWebSocketSink extends DelegatingStreamSink implements WebSocketSink {
/// The underlying socket.
final WebSocket _webSocket;

_IOWebSocketSink(WebSocket super.webSocket) : _webSocket = webSocket;
return IOWebSocketChannel(webSocketFuture);
}

@override
Future close([int? closeCode, String? closeReason]) =>
_webSocket.close(closeCode, closeReason);
/// Creates a channel wrapping [webSocket].
IOWebSocketChannel(FutureOr<WebSocket> webSocket)
: super(webSocket is Future<WebSocket>
? webSocket.then(io_web_socket.IOWebSocket.fromWebSocket)
as FutureOr<io_web_socket.IOWebSocket>
: io_web_socket.IOWebSocket.fromWebSocket(webSocket));
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,19 @@ class WebSocketAdapterWebSocketChannel extends StreamChannelMixin
/// future will complete with an error.
factory WebSocketAdapterWebSocketChannel.connect(Uri url,
{Iterable<String>? protocols}) =>
WebSocketAdapterWebSocketChannel._(
WebSocketAdapterWebSocketChannel(
WebSocket.connect(url, protocols: protocols));

// Create a [WebSocketWebSocketChannelAdapter] from an existing [WebSocket].
factory WebSocketAdapterWebSocketChannel.fromWebSocket(WebSocket webSocket) =>
WebSocketAdapterWebSocketChannel._(Future.value(webSocket));
// Construct a [WebSocketWebSocketChannelAdapter] from an existing
// [WebSocket].
WebSocketAdapterWebSocketChannel(FutureOr<WebSocket> webSocket) {
Future<WebSocket> webSocketFuture;
if (webSocket is WebSocket) {
webSocketFuture = Future.value(webSocket);
} else {
webSocketFuture = webSocket;
}

WebSocketAdapterWebSocketChannel._(Future<WebSocket> webSocketFuture) {
webSocketFuture.then((webSocket) {
var remoteClosed = false;
webSocket.events.listen((event) {
Expand Down Expand Up @@ -113,7 +118,16 @@ class WebSocketAdapterWebSocketChannel extends StreamChannelMixin
_protocol = webSocket.protocol;
_readyCompleter.complete();
}, onError: (Object e) {
_readyCompleter.completeError(WebSocketChannelException.from(e));
Exception error;
if (e is TimeoutException) {
// Required for backwards compatibility with `IOWebSocketChannel`.
error = e;
} else {
error = WebSocketChannelException.from(e);
}
_readyCompleter.completeError(error);
_controller.local.sink.addError(error);
_controller.local.sink.close();
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkgs/web_socket_channel/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ dependencies:
crypto: ^3.0.0
stream_channel: ^2.1.0
web: ^0.5.0
web_socket: ^0.1.0
web_socket: ^0.1.1

dev_dependencies:
dart_flutter_team_lints: ^2.0.0
Expand Down
7 changes: 3 additions & 4 deletions pkgs/web_socket_channel/test/io_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ void main() {
});

final channel = IOWebSocketChannel.connect('ws://localhost:${server.port}');
expect(channel.ready, throwsA(isA<WebSocketException>()));
expect(channel.ready, throwsA(isA<WebSocketChannelException>()));
expect(channel.stream.drain<void>(),
throwsA(isA<WebSocketChannelException>()));
});
Expand All @@ -154,7 +154,7 @@ void main() {
'ws://localhost:${server.port}',
protocols: [failedProtocol],
);
expect(channel.ready, throwsA(isA<WebSocketException>()));
expect(channel.ready, throwsA(isA<WebSocketChannelException>()));
expect(
channel.stream.drain<void>(),
throwsA(isA<WebSocketChannelException>()),
Expand Down Expand Up @@ -230,8 +230,7 @@ void main() {
);

expect(channel.ready, throwsA(isA<TimeoutException>()));
expect(channel.stream.drain<void>(),
throwsA(isA<WebSocketChannelException>()));
expect(channel.stream.drain<void>(), throwsA(anything));
});

test('.custom client is passed through', () async {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,26 @@ void main() {
expect(channel.closeReason, null);
});

test('fromWebSocket', () async {
test('constructor with WebSocket', () async {
final webSocket = await WebSocket.connect(uri);
final channel = WebSocketAdapterWebSocketChannel.fromWebSocket(webSocket);
final channel = WebSocketAdapterWebSocketChannel(webSocket);

await expectLater(channel.ready, completes);
channel.sink.add('This count says:');
channel.sink.add([1, 2, 3]);
channel.sink.add('And then:');
channel.sink.add([4, 5, 6]);
expect(await channel.stream.take(4).toList(), [
'This count says:',
[1, 2, 3],
'And then:',
[4, 5, 6]
]);
});

test('constructor with Future<WebSocket>', () async {
final webSocketFuture = WebSocket.connect(uri);
final channel = WebSocketAdapterWebSocketChannel(webSocketFuture);

await expectLater(channel.ready, completes);
channel.sink.add('This count says:');
Expand Down

0 comments on commit c0ea76f

Please sign in to comment.