From e4f6e2e356e383c9d541fe6cd5c7df374666b5cf Mon Sep 17 00:00:00 2001 From: Raj Maniar Date: Tue, 20 Oct 2015 15:59:31 -0700 Subject: [PATCH 1/3] Add error stream to client implementation --- lib/src/client/client.dart | 5 +++++ lib/src/client/impl/client_impl.dart | 11 +++++++++++ 2 files changed, 16 insertions(+) diff --git a/lib/src/client/client.dart b/lib/src/client/client.dart index 2374995..4b8d24d 100644 --- a/lib/src/client/client.dart +++ b/lib/src/client/client.dart @@ -30,4 +30,9 @@ abstract class Client { * the new [Channel] */ Future channel(); + + /** + * Register listener for errors + */ + StreamSubscription errorListener(void onData(Exception error), { Function onError, void onDone(), bool cancelOnError }); } diff --git a/lib/src/client/impl/client_impl.dart b/lib/src/client/impl/client_impl.dart index 035a86f..9432477 100644 --- a/lib/src/client/impl/client_impl.dart +++ b/lib/src/client/impl/client_impl.dart @@ -19,6 +19,9 @@ class _ClientImpl implements Client { Completer _connected; Completer _clientClosed; + //Error Stream + StreamController _error = new StreamController.broadcast(); + _ClientImpl({ConnectionSettings settings}) { // Use defaults if no settings specified @@ -166,6 +169,10 @@ class _ClientImpl implements Client { return; } + if (_error != null && _error.hasListener && !_error.isClosed) { + _error.add(ex); + } + switch (ex.runtimeType) { case FatalException: case ConnectionException: @@ -236,6 +243,7 @@ class _ClientImpl implements Client { _socket.destroy(); _socket = null; _connected = null; + _error.close(); _clientClosed.complete(); }); @@ -270,4 +278,7 @@ class _ClientImpl implements Client { return userChannel._channelOpened.future; }); } + + StreamSubscription errorListener(void onData(Exception error), { Function onError, void onDone(), bool cancelOnError}) => _error.stream.listen(onData, onError : onError, onDone : onDone, cancelOnError : cancelOnError); + } From 2175c476f9e244f11ffdcaa7412a9c58790f42de Mon Sep 17 00:00:00 2001 From: Raj Maniar Date: Wed, 21 Oct 2015 17:20:40 -0700 Subject: [PATCH 2/3] Add unit test for error stream --- test/lib/exception_handling_test.dart | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/test/lib/exception_handling_test.dart b/test/lib/exception_handling_test.dart index c6d4d5e..ac849f9 100644 --- a/test/lib/exception_handling_test.dart +++ b/test/lib/exception_handling_test.dart @@ -1,6 +1,7 @@ library dart_amqp.test.exceptions; import "dart:typed_data"; +import "dart:async"; import "../packages/unittest/unittest.dart"; import "../packages/mock/mock.dart"; @@ -350,6 +351,26 @@ main({bool enableLogger : true}) { }).catchError(expectAsync(handleError)); }); }); - + group("error stream:", () { + test("fatal exception", () async { + void handleError(ex) { + expect(ex, new isInstanceOf()); + } + server.shutdown() + .then((_) => server.listen("127.0.0.1", 9000)) + .then((_) { + generateHandshakeMessages(frameWriter, server); + return client + .connect() + .then((_) { + client.errorListener((ex) => handleError(ex)); + return server.shutdown() + .then((_) => new Future.delayed( + new Duration(seconds: 5) + server.responseDelay)) + .then((_) => fail("Expected an exception to be thrown")); + }); + }); + }); + }); }); } From 20956df6fd00b72780b0db068363b4053ecc4084 Mon Sep 17 00:00:00 2001 From: Raj Maniar Date: Wed, 21 Oct 2015 17:24:36 -0700 Subject: [PATCH 3/3] remove hardcoded host/port from unit test --- test/lib/exception_handling_test.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/lib/exception_handling_test.dart b/test/lib/exception_handling_test.dart index ac849f9..507d669 100644 --- a/test/lib/exception_handling_test.dart +++ b/test/lib/exception_handling_test.dart @@ -357,7 +357,7 @@ main({bool enableLogger : true}) { expect(ex, new isInstanceOf()); } server.shutdown() - .then((_) => server.listen("127.0.0.1", 9000)) + .then((_) => server.listen(client.settings.host, client.settings.port)) .then((_) { generateHandshakeMessages(frameWriter, server); return client