Skip to content
This repository has been archived by the owner on Jan 6, 2025. It is now read-only.
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: dart-archive/sse
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 283568d
Choose a base ref
..
head repository: dart-archive/sse
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 8d018dd
Choose a head ref
Showing with 110 additions and 57 deletions.
  1. +25 −22 CHANGELOG.md
  2. +9 −3 CONTRIBUTING.md
  3. +8 −8 README.md
  4. +64 −21 lib/client/sse_client.dart
  5. +3 −2 lib/src/server/sse_handler.dart
  6. +1 −1 pubspec.yaml
47 changes: 25 additions & 22 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 4.1.2-dev

- Send `fetch` requests instead of `XHR` requests.
- Add an optional `debugKey` parameter to `SseClient` to include in logging.

## 4.1.1

- Apply `keepAlive` logic to `SocketException`s.
@@ -25,8 +30,8 @@

## 3.8.1

- Fix an issue where closing the `SseConnection` stream would result in
an error.
- Fix an issue where closing the `SseConnection` stream would result in an
error.

## 3.8.0

@@ -36,8 +41,8 @@

## 3.7.0

- Deprecate the client's `onOpen` getter. Messages will now be buffered until
a connection is established.
- Deprecate the client's `onOpen` getter. Messages will now be buffered until a
connection is established.

## 3.6.1

@@ -48,15 +53,14 @@
- Improve performance by buffering out of order messages in the server instead
of the client.

** Note ** This is not modelled as a breaking change as the server can handle
messages from older clients. However, clients should be using the latest server
if they require order guarantees.

\*\* Note \*\* This is not modelled as a breaking change as the server can
handle messages from older clients. However, clients should be using the latest
server if they require order guarantees.

## 3.5.0

- Add new `shutdown` methods on `SseHandler` and `SseConnection` to allow closing
connections immediately, ignoring any keep-alive periods.
- Add new `shutdown` methods on `SseHandler` and `SseConnection` to allow
closing connections immediately, ignoring any keep-alive periods.

## 3.4.0

@@ -65,14 +69,14 @@ if they require order guarantees.

## 3.3.0

- Add an `onClose` event to the `SseConnection`. This allows consumers to
listen to this event in lue of `sseConnection.sink.done` as that is not
guaranteed to fire.
- Add an `onClose` event to the `SseConnection`. This allows consumers to listen
to this event in lue of `sseConnection.sink.done` as that is not guaranteed to
fire.

## 3.2.2

- Fix an issue where `keepAlive` may cause state errors when attempting to
send messages on a closed stream.
- Fix an issue where `keepAlive` may cause state errors when attempting to send
messages on a closed stream.

## 3.2.1

@@ -94,21 +98,21 @@ if they require order guarantees.
- Make `isInKeepAlive` on `SseConnection` private.

**Note that this is a breaking change but in actuality no one should be
depending on this API.**
depending on this API.**

## 3.1.0

- Add optional `keepAlive` parameter to the `SseHandler`. If `keepAlive` is
supplied, the connection will remain active for this period after a
disconnect and can be reconnected transparently. If there is no reconnect
within that period, the connection will be closed normally.
supplied, the connection will remain active for this period after a disconnect
and can be reconnected transparently. If there is no reconnect within that
period, the connection will be closed normally.

## 3.0.0

- Add retry logic.

**Possible Breaking Change Error messages may now be delayed up to 5 seconds
in the client.**
**Possible Breaking Change Error messages may now be delayed up to 5 seconds in
the client.**

## 2.1.2

@@ -145,7 +149,6 @@ if they require order guarantees.

- Internal cleanup.


## 0.0.1

- Initial commit.
12 changes: 9 additions & 3 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ Want to contribute? Great! First, read this page (including the small print at
the end).

### Before you contribute

Before we can use your code, you must sign the
[Google Individual Contributor License Agreement](https://cla.developers.google.com/about/google-individual)
(CLA), which you can do online. The CLA is necessary mainly because you own the
@@ -18,16 +19,21 @@ possibly guide you. Coordinating up front makes it much easier to avoid
frustration later on.

### Code reviews

All submissions, including submissions by project members, require review.

### File headers

All files in the project must start with the following header.

// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
```
// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
```

### The small print

Contributions made by corporations are covered by a different agreement than the
one above, the
[Software Grant and Corporate Contributor License Agreement](https://developers.google.com/open-source/cla/corporate).
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -2,13 +2,13 @@
[![pub package](https://img.shields.io/pub/v/sse.svg)](https://pub.dev/packages/sse)
[![package publisher](https://img.shields.io/pub/publisher/sse.svg)](https://pub.dev/packages/sse/publisher)

This package provides support for bi-directional communication through
Server Sent Events and corresponding POST requests.
This package provides support for bi-directional communication through Server
Sent Events and corresponding POST requests.

This package is not intended to be a general purpose SSE package, but instead
is a bidirectional protocol for use when Websockets are unavailable.
That is, both the client and the server expose a `sink` and `stream` on which to send
and receive messages respectively.
This package is not intended to be a general purpose SSE package, but instead is
a bidirectional protocol for use when Websockets are unavailable. That is, both
the client and the server expose a `sink` and `stream` on which to send and
receive messages respectively.

Both the server and client have implicit assumptions on each other and therefore a
client from this package must be paired with a server from this package.
Both the server and client have implicit assumptions on each other and therefore
a client from this package must be paired with a server from this package.
85 changes: 64 additions & 21 deletions lib/client/sse_client.dart
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ import 'dart:async';
import 'dart:convert';
import 'dart:html';

import 'package:js/js.dart';
import 'package:logging/logging.dart';
import 'package:pool/pool.dart';
import 'package:stream_channel/stream_channel.dart';
@@ -25,6 +26,8 @@ final _requestPool = Pool(1000);
/// The client can send any JSON-encodable messages to the server by adding
/// them to the [sink] and listen to messages from the server on the [stream].
class SseClient extends StreamChannelMixin<String?> {
final String _clientId;

final _incomingController = StreamController<String>();

final _outgoingController = StreamController<String>();
@@ -42,12 +45,14 @@ class SseClient extends StreamChannelMixin<String?> {
Timer? _errorTimer;

/// [serverUrl] is the URL under which the server is listening for
/// incoming bi-directional SSE connections.
SseClient(String serverUrl) {
var clientId = generateUuidV4();
_eventSource =
EventSource('$serverUrl?sseClientId=$clientId', withCredentials: true);
_serverUrl = '$serverUrl?sseClientId=$clientId';
/// incoming bi-directional SSE connections. [debugKey] is an optional key
/// that can be used to identify the SSE connection.
SseClient(String serverUrl, {String? debugKey})
: _clientId = debugKey == null
? generateUuidV4()
: '$debugKey-${generateUuidV4()}' {
_serverUrl = '$serverUrl?sseClientId=$_clientId';
_eventSource = EventSource(_serverUrl, withCredentials: true);
_eventSource.onOpen.first.whenComplete(() {
_onConnected.complete();
_outgoingController.stream
@@ -64,13 +69,7 @@ class SseClient extends StreamChannelMixin<String?> {
// By default the SSE client uses keep-alive.
// Allow for a retry to connect before giving up.
_errorTimer = Timer(const Duration(seconds: 5), () {
_incomingController.addError(error);
close();
if (!_onConnected.isCompleted) {
// This call must happen after the call to close() which checks
// whether the completer was completed earlier.
_onConnected.completeError(error);
}
_closeWithError(error);
});
}
});
@@ -103,12 +102,22 @@ class SseClient extends StreamChannelMixin<String?> {
_outgoingController.close();
}

void _closeWithError(Object error) {
_incomingController.addError(error);
close();
if (!_onConnected.isCompleted) {
// This call must happen after the call to close() which checks
// whether the completer was completed earlier.
_onConnected.completeError(error);
}
}

void _onIncomingControlMessage(Event message) {
var data = (message as MessageEvent).data;
if (data == 'close') {
close();
} else {
throw UnsupportedError('Illegal Control Message "$data"');
throw UnsupportedError('[$_clientId] Illegal Control Message "$data"');
}
}

@@ -128,17 +137,51 @@ class SseClient extends StreamChannelMixin<String?> {
try {
encodedMessage = jsonEncode(message);
} on JsonUnsupportedObjectError catch (e) {
_logger.warning('Unable to encode outgoing message: $e');
_logger.warning('[$_clientId] Unable to encode outgoing message: $e');
} on ArgumentError catch (e) {
_logger.warning('Invalid argument: $e');
_logger.warning('[$_clientId] Invalid argument: $e');
}
try {
await HttpRequest.request('$_serverUrl&messageId=${++_lastMessageId}',
method: 'POST', sendData: encodedMessage, withCredentials: true);
} catch (e) {
_logger.severe('Failed to send $message:\n $e');
close();
final url = '$_serverUrl&messageId=${++_lastMessageId}';
await _fetch(
url,
_FetchOptions(
method: 'POST',
body: encodedMessage,
credentialsOptions:
_CredentialsOptions(credentials: 'include')));
} catch (error) {
final augmentedError =
'[$_clientId] SSE client failed to send $message:\n $error';
_logger.severe(augmentedError);
_closeWithError(augmentedError);
}
});
}
}

// Custom implementation of Fetch API until Dart supports GET vs. POST,
// credentials, etc. See https://github.com/dart-lang/http/issues/595.
@JS('fetch')
external Object _nativeJsFetch(String resourceUrl, _FetchOptions options);

Future<dynamic> _fetch(String resourceUrl, _FetchOptions options) =>
promiseToFuture(_nativeJsFetch(resourceUrl, options));

@JS()
@anonymous
class _FetchOptions {
external factory _FetchOptions({
required String method, // e.g., 'GET', 'POST'
required _CredentialsOptions credentialsOptions,
required String? body,
});
}

@JS()
@anonymous
class _CredentialsOptions {
external factory _CredentialsOptions({
required String credentials, // e.g., 'omit', 'same-origin', 'include'
});
}
5 changes: 3 additions & 2 deletions lib/src/server/sse_handler.dart
Original file line number Diff line number Diff line change
@@ -264,14 +264,15 @@ class SseHandler {

Future<shelf.Response> _handleIncomingMessage(
shelf.Request req, String path) async {
String? clientId;
try {
var clientId = req.url.queryParameters['sseClientId'];
clientId = req.url.queryParameters['sseClientId'];
var messageId = int.parse(req.url.queryParameters['messageId'] ?? '0');
var message = await req.readAsString();
var jsonObject = json.decode(message) as String;
_connections[clientId]?._addIncomingMessage(messageId, jsonObject);
} catch (e, st) {
_logger.fine('Failed to handle incoming message. $e $st');
_logger.fine('[$clientId] Failed to handle incoming message. $e $st');
}
return shelf.Response.ok('', headers: {
'access-control-allow-credentials': 'true',
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: sse
version: 4.1.1
version: 4.1.2-dev
description: >-
Provides client and server functionality for setting up bi-directional
communication through Server Sent Events (SSE) and corresponding POST