Skip to content

Commit

Permalink
update: Migrate off legacy JS/HTML apis
Browse files Browse the repository at this point in the history
  • Loading branch information
minoic committed Aug 16, 2024
1 parent c18e185 commit b96174d
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 39 deletions.
4 changes: 2 additions & 2 deletions lib/grpc_or_grpcweb.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import 'src/client/grpc_or_grpcweb_channel_grpc.dart'
if (dart.library.html) 'src/client/grpc_or_grpcweb_channel_web.dart';
import 'src/client/grpc_or_grpcweb_channel_web.dart'
if (dart.library.io) 'src/client/grpc_or_grpcweb_channel_grpc.dart';
import 'src/client/http2_channel.dart';
import 'src/client/options.dart';

Expand Down
77 changes: 40 additions & 37 deletions lib/src/client/transport/xhr_transport.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
// limitations under the License.

import 'dart:async';
import 'dart:html';
import 'dart:js_interop';
import 'dart:typed_data';

import 'package:meta/meta.dart';
import 'package:web/web.dart';

import '../../client/call.dart';
import '../../shared/message.dart';
Expand All @@ -30,7 +31,7 @@ import 'web_streams.dart';
const _contentTypeKey = 'Content-Type';

class XhrTransportStream implements GrpcTransportStream {
final HttpRequest _request;
final XMLHttpRequest _request;
final ErrorHandler _onError;
final Function(XhrTransportStream stream) _onDone;
bool _headersReceived = false;
Expand All @@ -45,23 +46,22 @@ class XhrTransportStream implements GrpcTransportStream {
@override
StreamSink<List<int>> get outgoingMessages => _outgoingMessages.sink;

XhrTransportStream(this._request,
{required ErrorHandler onError, required onDone})
XhrTransportStream(this._request, {required ErrorHandler onError, required onDone})
: _onError = onError,
_onDone = onDone {
_outgoingMessages.stream
.map(frame)
.listen((data) => _request.send(data), cancelOnError: true);
.listen((data) => _request.send(Int8List.fromList(data).toJS), cancelOnError: true, onError: _onError);

_request.onReadyStateChange.listen((data) {
_request.onReadyStateChange.listen((_) {
if (_incomingProcessor.isClosed) {
return;
}
switch (_request.readyState) {
case HttpRequest.HEADERS_RECEIVED:
case 2:
_onHeadersReceived();
break;
case HttpRequest.DONE:
case 4:
_onRequestDone();
_close();
break;
Expand All @@ -72,36 +72,29 @@ class XhrTransportStream implements GrpcTransportStream {
if (_incomingProcessor.isClosed) {
return;
}
_onError(GrpcError.unavailable('XhrConnection connection-error'),
StackTrace.current);
_onError(GrpcError.unavailable('XhrConnection connection-error'), StackTrace.current);
terminate();
});

_request.onProgress.listen((_) {
if (_incomingProcessor.isClosed) {
return;
}
// Use response over responseText as most browsers don't support
// using responseText during an onProgress event.
final responseString = _request.response as String;
final bytes = Uint8List.fromList(
responseString.substring(_requestBytesRead).codeUnits)
.buffer;
_requestBytesRead = responseString.length;
final responseText = _request.responseText;
final bytes = Uint8List.fromList(responseText.substring(_requestBytesRead).codeUnits).buffer;
_requestBytesRead = responseText.length;
_incomingProcessor.add(bytes);
});

_incomingProcessor.stream
.transform(GrpcWebDecoder())
.transform(grpcDecompressor())
.listen(_incomingMessages.add,
onError: _onError, onDone: _incomingMessages.close);
.listen(_incomingMessages.add, onError: _onError, onDone: _incomingMessages.close);
}

bool _validateResponseState() {
try {
validateHttpStatusAndContentType(
_request.status, _request.responseHeaders,
validateHttpStatusAndContentType(_request.status, _parseHeaders(_request.getAllResponseHeaders()),
rawResponse: _request.responseText);
return true;
} catch (e, st) {
Expand All @@ -115,17 +108,15 @@ class XhrTransportStream implements GrpcTransportStream {
if (!_validateResponseState()) {
return;
}
_incomingMessages.add(GrpcMetadata(_request.responseHeaders));
_incomingMessages.add(GrpcMetadata(_parseHeaders(_request.getAllResponseHeaders())));
}

void _onRequestDone() {
if (!_headersReceived && !_validateResponseState()) {
return;
}
if (_request.response == null) {
_onError(
GrpcError.unavailable('XhrConnection request null response', null,
_request.responseText),
if (_request.status != 200) {
_onError(GrpcError.unavailable('Request failed with status: ${_request.status}', null, _request.responseText),
StackTrace.current);
return;
}
Expand All @@ -137,6 +128,20 @@ class XhrTransportStream implements GrpcTransportStream {
_onDone(this);
}

Map<String, String> _parseHeaders(String rawHeaders) {
final headers = <String, String>{};
final lines = rawHeaders.split('\r\n');
for (var line in lines) {
final index = line.indexOf(': ');
if (index != -1) {
final key = line.substring(0, index);
final value = line.substring(index + 2);
headers[key] = value;
}
}
return headers;
}

@override
Future<void> terminate() async {
_close();
Expand All @@ -153,24 +158,24 @@ class XhrClientConnection implements ClientConnection {

@override
String get authority => uri.authority;

@override
String get scheme => uri.scheme;

void _initializeRequest(HttpRequest request, Map<String, String> metadata) {
for (final header in metadata.keys) {
request.setRequestHeader(header, metadata[header]!);
}
void _initializeRequest(XMLHttpRequest request, Map<String, String> metadata) {
metadata.forEach((key, value) {
request.setRequestHeader(key, value);
});
// Overriding the mimetype allows us to stream and parse the data
request.overrideMimeType('text/plain; charset=x-user-defined');
request.responseType = 'text';
}

@visibleForTesting
HttpRequest createHttpRequest() => HttpRequest();
XMLHttpRequest createHttpRequest() => XMLHttpRequest();

@override
GrpcTransportStream makeRequest(String path, Duration? timeout,
Map<String, String> metadata, ErrorHandler onError,
GrpcTransportStream makeRequest(String path, Duration? timeout, Map<String, String> metadata, ErrorHandler onError,
{CallOptions? callOptions}) {
// gRPC-web headers.
if (_getContentTypeHeader(metadata) == null) {
Expand All @@ -180,8 +185,7 @@ class XhrClientConnection implements ClientConnection {
}

var requestUri = uri.resolve(path);
if (callOptions is WebCallOptions &&
callOptions.bypassCorsPreflight == true) {
if (callOptions is WebCallOptions && callOptions.bypassCorsPreflight == true) {
requestUri = cors.moveHttpHeadersToQueryParam(metadata, requestUri);
}

Expand All @@ -193,8 +197,7 @@ class XhrClientConnection implements ClientConnection {
// Must set headers after calling open().
_initializeRequest(request, metadata);

final transportStream =
XhrTransportStream(request, onError: onError, onDone: _removeStream);
final transportStream = XhrTransportStream(request, onError: onError, onDone: _removeStream);
_requests.add(transportStream);
return transportStream;
}
Expand Down
1 change: 1 addition & 0 deletions pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dependencies:
http2: ^2.2.0
protobuf: '>=2.0.0 <4.0.0'
clock: ^1.1.1
web: ^1.0.0

dev_dependencies:
build_runner: ^2.0.0
Expand Down

0 comments on commit b96174d

Please sign in to comment.