Skip to content

Commit

Permalink
Issue #49: Close socket on error
Browse files Browse the repository at this point in the history
  • Loading branch information
ra1u committed Nov 2, 2021
1 parent 6812f8d commit 053dc90
Showing 1 changed file with 28 additions and 19 deletions.
47 changes: 28 additions & 19 deletions lib/lazystream.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,29 @@
part of redis;

// like Stream but has method next for simple reading
class StreamNext<T> {
late Queue<Completer<T>> _queue;
class StreamNext {
late Queue<Completer<List<int>>> _queue;
late int _nfut;
late int _npack;
late bool done;
StreamNext.fromstream(Stream<T> stream) {
_queue = Queue<Completer<T>>();
late Socket socket;
late StreamSubscription<List<int>> _sub;
StreamNext.fromstream(Socket _socket) {
_queue = Queue<Completer<List<int>>>();
_nfut = 0;
_npack = 0;
done = false;
stream.listen(onData, onError: this.onError, onDone: this.onDone);
socket = _socket;
socket.listen(onData, onError: this.onError, onDone: this.onDone);
}

void onData(T event) {
void onData(List<int> event) {
if (_nfut >= 1) {
Completer c = _queue.removeFirst();
c.complete(event);
_nfut -= 1;
} else {
Completer<T> c = Completer<T>();
Completer<List<int>> c = Completer<List<int>>();
c.complete(event);
_queue.addLast(c);
_npack += 1;
Expand All @@ -44,28 +47,34 @@ class StreamNext<T> {

void onError(error) {
done = true;
if (_nfut >= 1) {
_nfut = 0;
for (Completer<T> e in _queue) {
e.completeError(error);
// close socket on error
// follow bug https://github.com/ra1u/redis-dart/issues/49
// and bug https://github.com/dart-lang/sdk/issues/47538
socket.close().then<void>((_){
if (_nfut >= 1) {
_nfut = 0;
for (Completer<List<int>> e in _queue) {
print("complete error");
e.completeError(error);
}
}
}
});
}

void onDone() {
onError("stream is closed");
}

Future<T> next() {
Future<List<int>> next() {
if (_npack == 0) {
if (done) {
return Future<T>.error("stream closed");
return Future<List<int>>.error("stream closed");
}
_nfut += 1;
_queue.addLast(Completer<T>());
_queue.addLast(Completer<List<int>>());
return _queue.last.future;
} else {
Completer<T> c = _queue.removeFirst();
Completer<List<int>> c = _queue.removeFirst();
_npack -= 1;
return c.future;
}
Expand All @@ -74,12 +83,12 @@ class StreamNext<T> {

// it
class LazyStream {
late StreamNext<List<int>> _stream;
late StreamNext _stream;
late List<int> _remainder;
late List<int> _return;
late Iterator<int> _iter;
LazyStream.fromstream(Stream<List<int>> stream) {
_stream = StreamNext<List<int>>.fromstream(stream);
LazyStream.fromstream(Socket socket) {
_stream = StreamNext.fromstream(socket);
_return = <int>[];
_remainder = <int>[];
_iter = _remainder.iterator;
Expand Down

0 comments on commit 053dc90

Please sign in to comment.