Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create an adapter for package:web_socket #339

Merged
merged 7 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
## 3.0.0-wip

- Provide an adapter around `package:web_socket` `WebSocket`s and make it the
default implementation for `WebSocketChannel.connect`.

## 2.4.5

* use secure random number generator for frame masking.
- use secure random number generator for frame masking.

## 2.4.4

* Require Dart `^3.3`
* Require `package:web` `^0.5.0`.
- Require Dart `^3.3`
- Require `package:web` `^0.5.0`.

## 2.4.3

Expand Down
15 changes: 0 additions & 15 deletions lib/src/_connect_api.dart

This file was deleted.

16 changes: 0 additions & 16 deletions lib/src/_connect_html.dart

This file was deleted.

15 changes: 0 additions & 15 deletions lib/src/_connect_io.dart

This file was deleted.

6 changes: 2 additions & 4 deletions lib/src/channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ import 'package:async/async.dart';
import 'package:crypto/crypto.dart';
import 'package:stream_channel/stream_channel.dart';

import '_connect_api.dart'
if (dart.library.io) '_connect_io.dart'
if (dart.library.js_interop) '_connect_html.dart' as platform;
import '../web_socket_adapter_web_socket_channel.dart';
import 'copy/web_socket_impl.dart';
import 'exception.dart';

Expand Down Expand Up @@ -141,7 +139,7 @@ class WebSocketChannel extends StreamChannelMixin {
/// If there are errors creating the connection the [ready] future will
/// complete with an error.
factory WebSocketChannel.connect(Uri uri, {Iterable<String>? protocols}) =>
platform.connect(uri, protocols: protocols);
WebSocketAdapterWebSocketChannel.connect(uri, protocols: protocols);
}

/// The sink exposed by a [WebSocketChannel].
Expand Down
136 changes: 136 additions & 0 deletions lib/web_socket_adapter_web_socket_channel.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:typed_data';

import 'package:async/async.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:web_socket/web_socket.dart';

import 'src/channel.dart';
import 'src/exception.dart';

/// A [WebSocketChannel] implemented using [WebSocket].
class WebSocketAdapterWebSocketChannel extends StreamChannelMixin
implements WebSocketChannel {
@override
String? get protocol => _protocol;
String? _protocol;

@override
int? get closeCode => _closeCode;
int? _closeCode;

@override
String? get closeReason => _closeReason;
String? _closeReason;

/// The close code set by the local user.
///
/// To ensure proper ordering, this is stored until we get a done event on
/// [_controller.local.stream].
int? _localCloseCode;

/// The close reason set by the local user.
///
/// To ensure proper ordering, this is stored until we get a done event on
/// [_controller.local.stream].
String? _localCloseReason;

/// Completer for [ready].
final _readyCompleter = Completer<void>();

@override
Future<void> get ready => _readyCompleter.future;

@override
Stream get stream => _controller.foreign.stream;

final _controller =
StreamChannelController<Object?>(sync: true, allowForeignErrors: false);

@override
late final WebSocketSink sink = _WebSocketSink(this);

/// Creates a new WebSocket connection.
///
/// If provided, the [protocols] argument indicates that subprotocols that
/// the peer is able to select. See
/// [RFC-6455 1.9](https://datatracker.ietf.org/doc/html/rfc6455#section-1.9).
///
/// After construction, the [WebSocketAdapterWebSocketChannel] may not be
/// connected to the peer. The [ready] future will complete after the channel
/// is connected. If there are errors creating the connection the [ready]
/// future will complete with an error.
factory WebSocketAdapterWebSocketChannel.connect(Uri url,
{Iterable<String>? protocols}) =>
WebSocketAdapterWebSocketChannel._(
WebSocket.connect(url, protocols: protocols));

// Create a [WebSocketWebSocketChannelAdapter] from an existing [WebSocket].
factory WebSocketAdapterWebSocketChannel.fromWebSocket(WebSocket webSocket) =>
WebSocketAdapterWebSocketChannel._(Future.value(webSocket));

WebSocketAdapterWebSocketChannel._(Future<WebSocket> webSocketFuture) {
webSocketFuture.then((webSocket) {
var remoteClosed = false;
webSocket.events.listen((event) {
switch (event) {
case TextDataReceived(text: final text):
_controller.local.sink.add(text);
case BinaryDataReceived(data: final data):
_controller.local.sink.add(data);
case CloseReceived(code: final code, reason: final reason):
remoteClosed = true;
_closeCode = code;
_closeReason = reason;
_controller.local.sink.close();
}
});
_controller.local.stream.listen((obj) {
try {
switch (obj) {
case final String s:
webSocket.sendText(s);
case final Uint8List b:
webSocket.sendBytes(b);
case final List<int> b:
webSocket.sendBytes(Uint8List.fromList(b));
default:
throw UnsupportedError('Cannot send ${obj.runtimeType}');
}
} on WebSocketConnectionClosed catch (_) {
// There is nowhere to surface this error; `_controller.local.sink`
// has already been closed.
}
}, onDone: () {
if (!remoteClosed) {
webSocket.close(_localCloseCode, _localCloseReason);
}
});
_protocol = webSocket.protocol;
_readyCompleter.complete();
}, onError: (Object e) {
_readyCompleter.completeError(WebSocketChannelException.from(e));
});
}
}

/// A [WebSocketSink] that tracks the close code and reason passed to [close].
class _WebSocketSink extends DelegatingStreamSink implements WebSocketSink {
/// The channel to which this sink belongs.
final WebSocketAdapterWebSocketChannel _channel;

_WebSocketSink(WebSocketAdapterWebSocketChannel channel)
: _channel = channel,
super(channel._controller.foreign.sink);

@override
Future close([int? closeCode, String? closeReason]) {
_channel._localCloseCode = closeCode;
_channel._localCloseReason = closeReason;
return super.close();
}
}
9 changes: 8 additions & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: web_socket_channel
version: 2.4.5
version: 3.0.0-wip
description: >-
StreamChannel wrappers for WebSockets. Provides a cross-platform
WebSocketChannel API, a cross-platform implementation of that API that
Expand All @@ -14,7 +14,14 @@ dependencies:
crypto: ^3.0.0
stream_channel: ^2.1.0
web: ^0.5.0
web_socket: ^0.1.0

dev_dependencies:
dart_flutter_team_lints: ^2.0.0
test: ^1.16.0

# Remove this when versions of `package:test` and `shelf_web_socket` that support
# channel_web_socket 3.0 are released.
dependency_overrides:
shelf_web_socket: 1.0.4
test: 1.25.2
34 changes: 34 additions & 0 deletions test/echo_server_vm.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:io';

import 'package:stream_channel/stream_channel.dart';

Future<void> hybridMain(StreamChannel<Object?> channel) async {
late HttpServer server;

server = (await HttpServer.bind('localhost', 0))
..transform(WebSocketTransformer())
.listen((WebSocket webSocket) => webSocket.listen((data) {
if (data == 'close') {
webSocket.close(3001, 'you asked me to');
} else {
webSocket.add(data);
}
}));

channel.sink.add(server.port);
await channel
.stream.first; // Any writes indicates that the server should exit.
unawaited(server.close());
}

/// Starts an WebSocket server that echos the payload of the request.
Future<StreamChannel<Object?>> startServer() async {
final controller = StreamChannelController<Object?>(sync: true);
unawaited(hybridMain(controller.foreign));
return controller.local;
}
35 changes: 35 additions & 0 deletions test/echo_server_web.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'package:stream_channel/stream_channel.dart';
import 'package:test/test.dart';

/// Starts an WebSocket server that echos the payload of the request.
/// Copied from `echo_server_vm.dart`.
Future<StreamChannel<Object?>> startServer() async => spawnHybridCode(r'''
import 'dart:async';
import 'dart:io';

import 'package:stream_channel/stream_channel.dart';

/// Starts an WebSocket server that echos the payload of the request.
Future<void> hybridMain(StreamChannel<Object?> channel) async {
late HttpServer server;

server = (await HttpServer.bind('localhost', 0))
..transform(WebSocketTransformer())
.listen((WebSocket webSocket) => webSocket.listen((data) {
if (data == 'close') {
webSocket.close(3001, 'you asked me to');
} else {
webSocket.add(data);
}
}));

channel.sink.add(server.port);
await channel
.stream.first; // Any writes indicates that the server should exit.
unawaited(server.close());
}
''');
16 changes: 8 additions & 8 deletions test/io_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ void main() {
channel.stream.listen((request) {
expect(request, equals('ping'));
channel.sink.add('pong');
channel.sink.close(5678, 'raisin');
channel.sink.close(3678, 'raisin');
});
});

Expand All @@ -45,7 +45,7 @@ void main() {
}
n++;
}, onDone: expectAsync0(() {
expect(channel.closeCode, equals(5678));
expect(channel.closeCode, equals(3678));
expect(channel.closeReason, equals('raisin'));
}));
});
Expand All @@ -70,7 +70,7 @@ void main() {
channel.stream.listen(
expectAsync1((message) {
expect(message, equals('pong'));
channel.sink.close(5678, 'raisin');
channel.sink.close(3678, 'raisin');
}, count: 1),
onDone: expectAsync0(() {}));
});
Expand All @@ -97,7 +97,7 @@ void main() {
channel.stream.listen(
expectAsync1((message) {
expect(message, equals('pong'));
channel.sink.close(5678, 'raisin');
channel.sink.close(3678, 'raisin');
}, count: 1),
onDone: expectAsync0(() {}));
});
Expand All @@ -109,7 +109,7 @@ void main() {
expect(() async {
final channel = IOWebSocketChannel(webSocket);
await channel.stream.drain<void>();
expect(channel.closeCode, equals(5678));
expect(channel.closeCode, equals(3678));
expect(channel.closeReason, equals('raisin'));
}(), completes);
});
Expand All @@ -118,7 +118,7 @@ void main() {

expect(channel.ready, completes);

await channel.sink.close(5678, 'raisin');
await channel.sink.close(3678, 'raisin');
});

test('.connect wraps a connection error in WebSocketChannelException',
Expand Down Expand Up @@ -192,7 +192,7 @@ void main() {
expect(() async {
final channel = IOWebSocketChannel(webSocket);
await channel.stream.drain<void>();
expect(channel.closeCode, equals(5678));
expect(channel.closeCode, equals(3678));
expect(channel.closeReason, equals('raisin'));
}(), completes);
});
Expand All @@ -202,7 +202,7 @@ void main() {
connectTimeout: const Duration(milliseconds: 1000),
);
expect(channel.ready, completes);
await channel.sink.close(5678, 'raisin');
await channel.sink.close(3678, 'raisin');
});

test('.respects timeout parameter when trying to connect', () async {
Expand Down
Loading