From 5113b798815c4320480fe045f3ace8a0b78a235d Mon Sep 17 00:00:00 2001 From: Alexandre Ardhuin Date: Wed, 20 Mar 2019 16:23:03 +0100 Subject: [PATCH 1/5] enable some lints --- analysis_options.yaml | 28 ++++++++++++++++ example/routing/emit_log_direct.dart | 2 +- example/routing/receive_logs_direct.dart | 3 +- example/workers/new_task.dart | 2 +- lib/src/client/impl/channel_impl.dart | 32 ++++++------------- lib/src/client/impl/client_impl.dart | 9 +++--- lib/src/client/impl/consumer_impl.dart | 4 +-- lib/src/client/impl/queue_impl.dart | 2 +- lib/src/logging.dart | 2 +- lib/src/protocol/io/tuning_settings.dart | 2 +- lib/src/protocol/messages/bindings/basic.dart | 4 +-- .../protocol/messages/bindings/channel.dart | 2 +- .../messages/bindings/connection.dart | 2 +- .../protocol/messages/bindings/exchange.dart | 4 +-- lib/src/protocol/messages/bindings/queue.dart | 4 +-- lib/src/protocol/messages/bindings/tx.dart | 6 ++-- lib/src/protocol/stream/type_decoder.dart | 2 +- lib/src/protocol/stream/type_encoder.dart | 2 +- test/lib/auth_test.dart | 8 ++--- test/lib/enum_test.dart | 6 ++-- test/lib/exception_handling_test.dart | 5 +-- test/lib/mocks/mocks.dart | 6 ++-- test/run_all.dart | 2 +- tool/generate_bindings.dart | 6 ++-- 24 files changed, 77 insertions(+), 68 deletions(-) create mode 100644 analysis_options.yaml diff --git a/analysis_options.yaml b/analysis_options.yaml new file mode 100644 index 0000000..34dfe28 --- /dev/null +++ b/analysis_options.yaml @@ -0,0 +1,28 @@ +linter: + rules: + - avoid_empty_else + - avoid_init_to_null + - avoid_null_checks_in_equality_operators + - avoid_relative_lib_imports + - avoid_return_types_on_setters + - avoid_returning_null_for_future + - avoid_returning_null_for_void + - avoid_shadowing_type_parameters + - avoid_types_as_parameter_names + - empty_constructor_bodies + - no_duplicate_case_values + - null_closures + - prefer_conditional_assignment + - prefer_const_constructors + - prefer_contains + - prefer_equal_for_default_values + - prefer_is_empty + - prefer_is_not_empty + - prefer_null_aware_operators + - prefer_void_to_null + - recursive_getters + - slash_for_doc_comments + - unawaited_futures + - unrelated_type_equality_checks + - use_rethrow_when_possible + - valid_regexps \ No newline at end of file diff --git a/example/routing/emit_log_direct.dart b/example/routing/emit_log_direct.dart index 6c7409d..e26e28c 100644 --- a/example/routing/emit_log_direct.dart +++ b/example/routing/emit_log_direct.dart @@ -2,7 +2,7 @@ import "dart:io"; import "package:dart_amqp/dart_amqp.dart"; void main(List args) { - if (args.length < 2 || ["info", "warning", "error"].indexOf(args[0]) == -1) { + if (args.length < 2 || !["info", "warning", "error"].contains(args[0])) { print(""" Error: invalid arguments. Please invoke as: diff --git a/example/routing/receive_logs_direct.dart b/example/routing/receive_logs_direct.dart index be2ac2d..5fe1939 100644 --- a/example/routing/receive_logs_direct.dart +++ b/example/routing/receive_logs_direct.dart @@ -3,8 +3,7 @@ import "package:dart_amqp/dart_amqp.dart"; void main(List args) { if (args.isEmpty || - !args.every( - (String arg) => ["info", "warning", "error"].indexOf(arg) != -1)) { + !args.every((String arg) => ["info", "warning", "error"].contains(arg))) { print(""" Error: invalid arguments. Please invoke as: diff --git a/example/workers/new_task.dart b/example/workers/new_task.dart index 9d087a8..9b6c49d 100644 --- a/example/workers/new_task.dart +++ b/example/workers/new_task.dart @@ -6,7 +6,7 @@ void main(List args) { .channel() .then((Channel channel) => channel.queue("task_queue", durable: true)) .then((Queue queue) { - String message = args.length == 0 ? "Hello World!" : args.join(" "); + String message = args.isEmpty ? "Hello World!" : args.join(" "); queue.publish(message, properties: MessageProperties.persistentMessage()); print(" [x] Sent ${message}"); return client.close(); diff --git a/lib/src/client/impl/channel_impl.dart b/lib/src/client/impl/channel_impl.dart index 55cc483..92301b9 100644 --- a/lib/src/client/impl/channel_impl.dart +++ b/lib/src/client/impl/channel_impl.dart @@ -93,9 +93,8 @@ class _ChannelImpl implements Channel { (serverMessage.message as ConnectionStart); // Check if the currently supplied authentication provider is supported by the server. - if (serverResponse.mechanisms - .indexOf(_client.settings.authProvider.saslType) == - -1) { + if (!serverResponse.mechanisms + .contains(_client.settings.authProvider.saslType)) { _client._handleException(FatalException( "Selected authentication provider '${_client.settings.authProvider.saslType}' is unsupported by the server (server supports: ${serverResponse.mechanisms})")); return; @@ -136,7 +135,7 @@ class _ChannelImpl implements Channel { ..maxChannels = _client.tuningSettings.maxChannels > 0 ? _client.tuningSettings.maxChannels : serverResponse.channelMax - ..heartbeatPeriod = Duration(seconds: 0); + ..heartbeatPeriod = Duration.zero; // Respond with the mirrored tuning settings ConnectionTuneOk clientResponse = ConnectionTuneOk() @@ -183,12 +182,8 @@ class _ChannelImpl implements Channel { _channelClosed = Completer(); - if (classId == null) { - classId = 0; - } - if (methodId == null) { - methodId = 0; - } + classId ??= 0; + methodId ??= 0; // Channel #0 should close the connection instead of closing the channel Message closeRequest; @@ -359,9 +354,7 @@ class _ChannelImpl implements Channel { } // Mark the channel as closed - if (_channelClosed == null) { - _channelClosed = Completer(); - } + _channelClosed ??= Completer(); if (!_channelClosed.isCompleted) { _channelClosed.complete(); } @@ -406,10 +399,7 @@ class _ChannelImpl implements Channel { // Mark the channel as closed if we need to if (flagChannelAsClosed) { - if (_channelClosed == null) { - _channelClosed = Completer(); - } - + _channelClosed ??= Completer(); if (!_channelClosed.isCompleted) { _channelClosed.complete(); } @@ -514,12 +504,8 @@ class _ChannelImpl implements Channel { Future qos(int prefetchSize, int prefetchCount, {bool global = true}) { - if (prefetchSize == null) { - prefetchSize = 0; - } - if (prefetchCount == null) { - prefetchCount = 0; - } + prefetchSize ??= 0; + prefetchCount ??= 0; BasicQos qosRequest = BasicQos() ..prefetchSize = prefetchSize ..prefetchCount = prefetchCount diff --git a/lib/src/client/impl/client_impl.dart b/lib/src/client/impl/client_impl.dart index 5bc7f2d..84dcee0 100644 --- a/lib/src/client/impl/client_impl.dart +++ b/lib/src/client/impl/client_impl.dart @@ -31,9 +31,7 @@ class _ClientImpl implements Client { /// fail, then the [_connected] [Future] returned by a call to [open[ will also fail Future _reconnect() { - if (_connected == null) { - _connected = Completer(); - } + _connected ??= Completer(); connectionLogger.info( "Trying to connect to ${settings.host}:${settings.port} [attempt ${_connectionAttempt + 1}/${settings.maxConnectionAttempts}]"); @@ -46,7 +44,8 @@ class _ClientImpl implements Client { .transform(AmqpMessageDecoder().transformer) .listen(_handleMessage, onError: _handleException, - onDone: () => _handleException(SocketException("Socket closed"))); + onDone: () => + _handleException(const SocketException("Socket closed"))); // Allocate channel 0 for handshaking and transmit the AMQP header to bootstrap the handshake _channels.clear(); @@ -255,7 +254,7 @@ class _ClientImpl implements Client { } // Find next available channel - _ChannelImpl userChannel = null; + _ChannelImpl userChannel; int nextChannelId = 0; while (nextChannelId < 65536) { if (!_channels.containsKey(++nextChannelId)) { diff --git a/lib/src/client/impl/consumer_impl.dart b/lib/src/client/impl/consumer_impl.dart index 15f78ec..437e1ec 100644 --- a/lib/src/client/impl/consumer_impl.dart +++ b/lib/src/client/impl/consumer_impl.dart @@ -30,9 +30,7 @@ class _ConsumerImpl implements Consumer { void onMessage(DecodedMessageImpl serverMessage) { // Ensure that messate contains a non-null property object - if (serverMessage.properties == null) { - serverMessage.properties = MessageProperties(); - } + serverMessage.properties ??= MessageProperties(); _controller.add(_AmqpMessageImpl.fromDecodedMessage(this, serverMessage)); } diff --git a/lib/src/client/impl/queue_impl.dart b/lib/src/client/impl/queue_impl.dart index 5081449..f772e2e 100644 --- a/lib/src/client/impl/queue_impl.dart +++ b/lib/src/client/impl/queue_impl.dart @@ -124,7 +124,7 @@ class _QueueImpl implements Queue { Map arguments}) { // If a consumer with the requested tag exists, return that if (consumerTag != null && - !consumerTag.isEmpty && + consumerTag.isNotEmpty && channel._consumers.containsKey(consumerTag)) { return Future.value(channel._consumers[consumerTag]); } diff --git a/lib/src/logging.dart b/lib/src/logging.dart index 4e5d349..9f9c876 100644 --- a/lib/src/logging.dart +++ b/lib/src/logging.dart @@ -7,4 +7,4 @@ import "package:logging/logging.dart"; part "logging/logger.dart"; // A indenting json encoder used by the toString() method of messages -JsonEncoder indentingJsonEncoder = JsonEncoder.withIndent(" "); +const JsonEncoder indentingJsonEncoder = JsonEncoder.withIndent(" "); diff --git a/lib/src/protocol/io/tuning_settings.dart b/lib/src/protocol/io/tuning_settings.dart index 85b5b2b..91713a9 100644 --- a/lib/src/protocol/io/tuning_settings.dart +++ b/lib/src/protocol/io/tuning_settings.dart @@ -5,5 +5,5 @@ class TuningSettings { int maxFrameSize = 4096; - Duration heartbeatPeriod = Duration(seconds: 0); + Duration heartbeatPeriod = Duration.zero; } diff --git a/lib/src/protocol/messages/bindings/basic.dart b/lib/src/protocol/messages/bindings/basic.dart index e9c6656..cb0972e 100644 --- a/lib/src/protocol/messages/bindings/basic.dart +++ b/lib/src/protocol/messages/bindings/basic.dart @@ -35,7 +35,7 @@ class BasicQosOk implements Message { // Message arguments - BasicQosOk.fromStream(TypeDecoder decoder) {} + BasicQosOk.fromStream(TypeDecoder decoder); void serialize(TypeEncoder encoder) {} } @@ -255,7 +255,7 @@ class BasicRecoverOk implements Message { // Message arguments - BasicRecoverOk.fromStream(TypeDecoder decoder) {} + BasicRecoverOk.fromStream(TypeDecoder decoder); void serialize(TypeEncoder encoder) {} } diff --git a/lib/src/protocol/messages/bindings/channel.dart b/lib/src/protocol/messages/bindings/channel.dart index 25e0f36..7f026c1 100644 --- a/lib/src/protocol/messages/bindings/channel.dart +++ b/lib/src/protocol/messages/bindings/channel.dart @@ -120,7 +120,7 @@ class ChannelCloseOk implements Message { // Message arguments ChannelCloseOk(); - ChannelCloseOk.fromStream(TypeDecoder decoder) {} + ChannelCloseOk.fromStream(TypeDecoder decoder); void serialize(TypeEncoder encoder) { encoder..writeUInt16(msgClassId)..writeUInt16(msgMethodId); } diff --git a/lib/src/protocol/messages/bindings/connection.dart b/lib/src/protocol/messages/bindings/connection.dart index 27779bd..2a9d192 100644 --- a/lib/src/protocol/messages/bindings/connection.dart +++ b/lib/src/protocol/messages/bindings/connection.dart @@ -201,7 +201,7 @@ class ConnectionCloseOk implements Message { // Message arguments ConnectionCloseOk(); - ConnectionCloseOk.fromStream(TypeDecoder decoder) {} + ConnectionCloseOk.fromStream(TypeDecoder decoder); void serialize(TypeEncoder encoder) { encoder..writeUInt16(msgClassId)..writeUInt16(msgMethodId); } diff --git a/lib/src/protocol/messages/bindings/exchange.dart b/lib/src/protocol/messages/bindings/exchange.dart index b04a0d7..e971489 100644 --- a/lib/src/protocol/messages/bindings/exchange.dart +++ b/lib/src/protocol/messages/bindings/exchange.dart @@ -43,7 +43,7 @@ class ExchangeDeclareOk implements Message { // Message arguments - ExchangeDeclareOk.fromStream(TypeDecoder decoder) {} + ExchangeDeclareOk.fromStream(TypeDecoder decoder); void serialize(TypeEncoder encoder) {} } @@ -78,7 +78,7 @@ class ExchangeDeleteOk implements Message { // Message arguments - ExchangeDeleteOk.fromStream(TypeDecoder decoder) {} + ExchangeDeleteOk.fromStream(TypeDecoder decoder); void serialize(TypeEncoder encoder) {} } diff --git a/lib/src/protocol/messages/bindings/queue.dart b/lib/src/protocol/messages/bindings/queue.dart index bdd8b8c..2becdb5 100644 --- a/lib/src/protocol/messages/bindings/queue.dart +++ b/lib/src/protocol/messages/bindings/queue.dart @@ -88,7 +88,7 @@ class QueueBindOk implements Message { // Message arguments - QueueBindOk.fromStream(TypeDecoder decoder) {} + QueueBindOk.fromStream(TypeDecoder decoder); void serialize(TypeEncoder encoder) {} } @@ -126,7 +126,7 @@ class QueueUnbindOk implements Message { // Message arguments - QueueUnbindOk.fromStream(TypeDecoder decoder) {} + QueueUnbindOk.fromStream(TypeDecoder decoder); void serialize(TypeEncoder encoder) {} } diff --git a/lib/src/protocol/messages/bindings/tx.dart b/lib/src/protocol/messages/bindings/tx.dart index 3ab3585..2b24006 100644 --- a/lib/src/protocol/messages/bindings/tx.dart +++ b/lib/src/protocol/messages/bindings/tx.dart @@ -27,7 +27,7 @@ class TxSelectOk implements Message { // Message arguments - TxSelectOk.fromStream(TypeDecoder decoder) {} + TxSelectOk.fromStream(TypeDecoder decoder); void serialize(TypeEncoder encoder) {} } @@ -53,7 +53,7 @@ class TxCommitOk implements Message { // Message arguments - TxCommitOk.fromStream(TypeDecoder decoder) {} + TxCommitOk.fromStream(TypeDecoder decoder); void serialize(TypeEncoder encoder) {} } @@ -79,7 +79,7 @@ class TxRollbackOk implements Message { // Message arguments - TxRollbackOk.fromStream(TypeDecoder decoder) {} + TxRollbackOk.fromStream(TypeDecoder decoder); void serialize(TypeEncoder encoder) {} } diff --git a/lib/src/protocol/stream/type_decoder.dart b/lib/src/protocol/stream/type_decoder.dart index 2b83f20..b142ce8 100644 --- a/lib/src/protocol/stream/type_decoder.dart +++ b/lib/src/protocol/stream/type_decoder.dart @@ -112,7 +112,7 @@ class TypeDecoder { Object _readField(String fieldName) { int typeValue = readUInt8(); - FieldType type = null; + FieldType type; try { type = FieldType.valueOf(typeValue); } catch (e) {} diff --git a/lib/src/protocol/stream/type_encoder.dart b/lib/src/protocol/stream/type_encoder.dart index fc682b4..0126466 100644 --- a/lib/src/protocol/stream/type_encoder.dart +++ b/lib/src/protocol/stream/type_encoder.dart @@ -5,7 +5,7 @@ class TypeEncoder { final Endian endianess = Endian.big; - TypeEncoder({ChunkedOutputWriter withWriter = null}) { + TypeEncoder({ChunkedOutputWriter withWriter}) { _writer = withWriter == null ? ChunkedOutputWriter() : withWriter; } diff --git a/test/lib/auth_test.dart b/test/lib/auth_test.dart index 806c9ed..ec77e34 100644 --- a/test/lib/auth_test.dart +++ b/test/lib/auth_test.dart @@ -155,7 +155,7 @@ main({bool enableLogger = true}) { test("PLAIN authenticaion", () { ConnectionSettings settings = ConnectionSettings( - authProvider: PlainAuthenticator("guest", "guest")); + authProvider: const PlainAuthenticator("guest", "guest")); client = Client(settings: settings); client.connect().then(expectAsync1((_) {})); @@ -163,7 +163,7 @@ main({bool enableLogger = true}) { test("AMQPLAIN authenticaion", () { ConnectionSettings settings = ConnectionSettings( - authProvider: AmqPlainAuthenticator("guest", "guest")); + authProvider: const AmqPlainAuthenticator("guest", "guest")); client = Client(settings: settings); client.connect().then(expectAsync1((_) {})); @@ -223,8 +223,8 @@ main({bool enableLogger = true}) { }); test("invalid auth credentials", () { - ConnectionSettings settings = - ConnectionSettings(authProvider: PlainAuthenticator("foo", "foo")); + ConnectionSettings settings = ConnectionSettings( + authProvider: const PlainAuthenticator("foo", "foo")); client = Client(settings: settings); diff --git a/test/lib/enum_test.dart b/test/lib/enum_test.dart index 40a7662..e3d549e 100644 --- a/test/lib/enum_test.dart +++ b/test/lib/enum_test.dart @@ -25,9 +25,9 @@ main({bool enableLogger = true}) { for (Type enumClass in enumClasses) { ClassMirror cm = reflectClass(enumClass); - MethodMirror valueOfMirror = null; - MethodMirror nameOfMirror = null; - MethodMirror toStringMirror = null; + MethodMirror valueOfMirror; + MethodMirror nameOfMirror; + MethodMirror toStringMirror; // Run a first pass to detect which methods we can use cm.declarations.forEach((Symbol enumSymbol, declarationMirror) { diff --git a/test/lib/exception_handling_test.dart b/test/lib/exception_handling_test.dart index 611a944..130ff16 100644 --- a/test/lib/exception_handling_test.dart +++ b/test/lib/exception_handling_test.dart @@ -366,6 +366,7 @@ main({bool enableLogger = true}) { expect(ex, const TypeMatcher()); } + // ignore: unawaited_futures server .shutdown() .then((_) => @@ -376,8 +377,8 @@ main({bool enableLogger = true}) { client.errorListener((ex) => handleError(ex)); return server .shutdown() - .then((_) => - Future.delayed(Duration(seconds: 5) + server.responseDelay)) + .then((_) => Future.delayed( + const Duration(seconds: 5) + server.responseDelay)) .then((_) => fail("Expected an exception to be thrown")); }); }); diff --git a/test/lib/mocks/mocks.dart b/test/lib/mocks/mocks.dart index 3e8e12a..23b832b 100644 --- a/test/lib/mocks/mocks.dart +++ b/test/lib/mocks/mocks.dart @@ -42,8 +42,8 @@ class MockServer { client.destroy(); return Future.value(true); })) - ..add(_server.close().then( - (_) => Future.delayed(Duration(milliseconds: 20), () => true))); + ..add(_server.close().then((_) => + Future.delayed(const Duration(milliseconds: 20), () => true))); clients.clear(); _server = null; @@ -87,7 +87,7 @@ class MockServer { } void _handleClientData(Socket client, dynamic data) { - if (replayList != null && !replayList.isEmpty) { + if (replayList != null && replayList.isNotEmpty) { // Respond with the next payload in replay list Future.delayed(responseDelay).then((_) { client diff --git a/test/run_all.dart b/test/run_all.dart index 7bf2868..8f2c6aa 100644 --- a/test/run_all.dart +++ b/test/run_all.dart @@ -12,7 +12,7 @@ import "lib/client_test.dart" as client; void main(List args) { // Check if we need to disable our loggers - bool enableLogger = args.indexOf('--enable-logger') != -1; + bool enableLogger = args.contains('--enable-logger'); String allArgs = args.join("."); bool runAll = args.isEmpty || allArgs == '--enable-logger'; diff --git a/tool/generate_bindings.dart b/tool/generate_bindings.dart index 9137be5..1a3a4a6 100644 --- a/tool/generate_bindings.dart +++ b/tool/generate_bindings.dart @@ -219,10 +219,8 @@ ${String.fromCharCodes(List.filled(className.length + methodName.length + 1 }).replaceAll("-", "_"); // Retrieve Dart type for field domain - String fieldDomain = amqpMethodField.getAttribute("domain"); - if (fieldDomain == null) { - fieldDomain = amqpMethodField.getAttribute("type"); - } + String fieldDomain = amqpMethodField.getAttribute("domain") ?? + amqpMethodField.getAttribute("type"); String amqpType = _amqpCustomTypeToBasicType.containsKey(fieldDomain) ? _amqpCustomTypeToBasicType[fieldDomain] : fieldDomain; From 6f165eaebbfa53dcb8f32fbd39a4ea6ef6f824ec Mon Sep 17 00:00:00 2001 From: Alexandre Ardhuin Date: Wed, 20 Mar 2019 23:34:06 +0100 Subject: [PATCH 2/5] use async/await --- example/hello/receive.dart | 24 +- example/hello/send.dart | 15 +- example/pubsub/emit_log.dart | 20 +- example/pubsub/receive_logs.dart | 26 +- example/routing/emit_log_direct.dart | 22 +- example/routing/receive_logs_direct.dart | 32 +- example/rpc/rpc_client.dart | 62 ++- example/rpc/rpc_server.dart | 30 +- example/topics/emit_log_topic.dart | 20 +- example/topics/receive_logs_topic.dart | 27 +- example/workers/new_task.dart | 17 +- example/workers/worker.dart | 38 +- lib/src/client/impl/exchange_impl.dart | 12 +- test/lib/auth_test.dart | 33 +- test/lib/channel_test.dart | 76 ++-- test/lib/client_test.dart | 44 +- test/lib/exception_handling_test.dart | 98 +++-- test/lib/exchange_test.dart | 517 ++++++++++------------- test/lib/mocks/mocks.dart | 15 +- test/lib/queue_test.dart | 466 +++++++++----------- tool/generate_bindings.dart | 17 +- 21 files changed, 720 insertions(+), 891 deletions(-) diff --git a/example/hello/receive.dart b/example/hello/receive.dart index 80e2407..578bf10 100644 --- a/example/hello/receive.dart +++ b/example/hello/receive.dart @@ -1,24 +1,20 @@ import "dart:io"; import "package:dart_amqp/dart_amqp.dart"; -void main() { +void main() async { Client client = Client(); // Setup a signal handler to cleanly exit if CTRL+C is pressed - ProcessSignal.sigint.watch().listen((_) { - client.close().then((_) { - exit(0); - }); + ProcessSignal.sigint.watch().listen((_) async { + await client.close(); + exit(0); }); - client - .channel() - .then((Channel channel) => channel.queue("hello")) - .then((Queue queue) => queue.consume()) - .then((Consumer consumer) { - print(" [*] Waiting for messages. To exit, press CTRL+C"); - consumer.listen((AmqpMessage message) { - print(" [x] Received ${message.payloadAsString}"); - }); + Channel channel = await client.channel(); + Queue queue = await channel.queue("hello"); + Consumer consumer = await queue.consume(); + print(" [*] Waiting for messages. To exit, press CTRL+C"); + consumer.listen((message) { + print(" [x] Received ${message.payloadAsString}"); }); } diff --git a/example/hello/send.dart b/example/hello/send.dart index 4233ef0..ebd8f19 100644 --- a/example/hello/send.dart +++ b/example/hello/send.dart @@ -1,13 +1,10 @@ import "package:dart_amqp/dart_amqp.dart"; -void main() { +void main() async { Client client = Client(); - client - .channel() - .then((Channel channel) => channel.queue("hello")) - .then((Queue queue) { - queue.publish("Hello World!"); - print(" [x] Sent 'Hello World!'"); - return client.close(); - }); + Channel channel = await client.channel(); + Queue queue = await channel.queue("hello"); + queue.publish("Hello World!"); + print(" [x] Sent 'Hello World!'"); + await client.close(); } diff --git a/example/pubsub/emit_log.dart b/example/pubsub/emit_log.dart index 9b3febc..4d04540 100644 --- a/example/pubsub/emit_log.dart +++ b/example/pubsub/emit_log.dart @@ -1,15 +1,13 @@ import "package:dart_amqp/dart_amqp.dart"; -void main(List args) { +void main(List args) async { Client client = Client(); - client - .channel() - .then((Channel channel) => channel.exchange("logs", ExchangeType.FANOUT)) - .then((Exchange exchange) { - String message = args.join(' '); - // We dont care about the routing key as our exchange type is FANOUT - exchange.publish(message, null); - print(" [x] Sent ${message}"); - return client.close(); - }); + Channel channel = await client.channel(); + Exchange exchange = await channel.exchange("logs", ExchangeType.FANOUT); + + String message = args.join(' '); + // We dont care about the routing key as our exchange type is FANOUT + exchange.publish(message, null); + print(" [x] Sent ${message}"); + await client.close(); } diff --git a/example/pubsub/receive_logs.dart b/example/pubsub/receive_logs.dart index f890951..c1c2352 100644 --- a/example/pubsub/receive_logs.dart +++ b/example/pubsub/receive_logs.dart @@ -1,25 +1,21 @@ import "dart:io"; import "package:dart_amqp/dart_amqp.dart"; -void main() { +void main() async { Client client = Client(); // Setup a signal handler to cleanly exit if CTRL+C is pressed - ProcessSignal.sigint.watch().listen((_) { - client.close().then((_) { - exit(0); - }); + ProcessSignal.sigint.watch().listen((_) async { + await client.close(); + exit(0); }); - client - .channel() - .then((Channel channel) => channel.exchange("logs", ExchangeType.FANOUT)) - .then((Exchange exchange) => exchange.bindPrivateQueueConsumer(null)) - .then((Consumer consumer) { - print( - " [*] Waiting for logs on private queue ${consumer.queue.name}. To exit, press CTRL+C"); - consumer.listen((AmqpMessage message) { - print(" [x] ${message.payloadAsString}"); - }); + Channel channel = await client.channel(); + Exchange exchange = await channel.exchange("logs", ExchangeType.FANOUT); + Consumer consumer = await exchange.bindPrivateQueueConsumer(null); + print( + " [*] Waiting for logs on private queue ${consumer.queue.name}. To exit, press CTRL+C"); + consumer.listen((message) { + print(" [x] ${message.payloadAsString}"); }); } diff --git a/example/routing/emit_log_direct.dart b/example/routing/emit_log_direct.dart index e26e28c..828bbe0 100644 --- a/example/routing/emit_log_direct.dart +++ b/example/routing/emit_log_direct.dart @@ -1,7 +1,7 @@ import "dart:io"; import "package:dart_amqp/dart_amqp.dart"; -void main(List args) { +void main(List args) async { if (args.length < 2 || !["info", "warning", "error"].contains(args[0])) { print(""" Error: invalid arguments. Please invoke as: @@ -18,15 +18,13 @@ void main(List args) { String severity = args.first; Client client = Client(); - client - .channel() - .then((Channel channel) => - channel.exchange("direct_logs", ExchangeType.DIRECT)) - .then((Exchange exchange) { - String message = args.sublist(1).join(' '); - // Use 'severity' as our routing key - exchange.publish(message, severity); - print(" [x] Sent [${severity}] ${message}"); - return client.close(); - }); + Channel channel = await client.channel(); + Exchange exchange = + await channel.exchange("direct_logs", ExchangeType.DIRECT); + + String message = args.sublist(1).join(' '); + // Use 'severity' as our routing key + exchange.publish(message, severity); + print(" [x] Sent [${severity}] ${message}"); + await client.close(); } diff --git a/example/routing/receive_logs_direct.dart b/example/routing/receive_logs_direct.dart index 5fe1939..4191b77 100644 --- a/example/routing/receive_logs_direct.dart +++ b/example/routing/receive_logs_direct.dart @@ -1,9 +1,8 @@ import "dart:io"; import "package:dart_amqp/dart_amqp.dart"; -void main(List args) { - if (args.isEmpty || - !args.every((String arg) => ["info", "warning", "error"].contains(arg))) { +void main(List args) async { + if (args.isEmpty || !args.every(["info", "warning", "error"].contains)) { print(""" Error: invalid arguments. Please invoke as: @@ -22,23 +21,20 @@ void main(List args) { Client client = Client(); // Setup a signal handler to cleanly exit if CTRL+C is pressed - ProcessSignal.sigint.watch().listen((_) { - client.close().then((_) { - exit(0); - }); + ProcessSignal.sigint.watch().listen((_) async { + await client.close(); + exit(0); }); - client - .channel() - .then((Channel channel) => - channel.exchange("direct_logs", ExchangeType.DIRECT)) - .then((Exchange exchange) => exchange.bindPrivateQueueConsumer(args)) - .then((Consumer consumer) { + Channel channel = await client.channel(); + Exchange exchange = + await channel.exchange("direct_logs", ExchangeType.DIRECT); + Consumer consumer = await exchange.bindPrivateQueueConsumer(args); + + print( + " [*] Waiting for [${args.join(', ')}] logs on private queue ${consumer.queue.name}. To exit, press CTRL+C"); + consumer.listen((message) { print( - " [*] Waiting for [${args.join(', ')}] logs on private queue ${consumer.queue.name}. To exit, press CTRL+C"); - consumer.listen((AmqpMessage message) { - print( - " [x] [Exchange: ${message.exchangeName}] [${message.routingKey}] ${message.payloadAsString}"); - }); + " [x] [Exchange: ${message.exchangeName}] [${message.routingKey}] ${message.payloadAsString}"); }); } diff --git a/example/rpc/rpc_client.dart b/example/rpc/rpc_client.dart index cc97b20..5ca6529 100644 --- a/example/rpc/rpc_client.dart +++ b/example/rpc/rpc_client.dart @@ -11,21 +11,18 @@ class FibonacciRpcClient { String _replyQueueName; FibonacciRpcClient() : client = Client() { - client - .channel() - .then((Channel channel) => channel.queue("rpc_queue")) - .then((Queue rpcQueue) { - _serverQueue = rpcQueue; - - // Allocate a private queue for server responses - return rpcQueue.channel.privateQueue(); - }) - .then((Queue queue) => queue.consume()) - .then((Consumer consumer) { - _replyQueueName = consumer.queue.name; - consumer.listen(handleResponse); - connected.complete(); - }); + _init(); + } + + Future _init() async { + Channel channel = await client.channel(); + _serverQueue = await channel.queue("rpc_queue"); + // Allocate a private queue for server responses + Queue queue = await _serverQueue.channel.privateQueue(); + Consumer consumer = await queue.consume(); + _replyQueueName = consumer.queue.name; + consumer.listen(handleResponse); + connected.complete(); } void handleResponse(AmqpMessage message) { @@ -39,22 +36,22 @@ class FibonacciRpcClient { .complete(int.parse(message.payloadAsString)); } - Future call(int n) { + Future call(int n) async { // Make sure we are connected before sending the request - return connected.future.then((_) { - String uuid = "${_nextCorrelationId++}"; - Completer completer = Completer(); + await connected.future; + + String uuid = "${_nextCorrelationId++}"; + Completer completer = Completer(); - MessageProperties properties = MessageProperties() - ..replyTo = _replyQueueName - ..corellationId = uuid; + MessageProperties properties = MessageProperties() + ..replyTo = _replyQueueName + ..corellationId = uuid; - _pendingOperations[uuid] = completer; + _pendingOperations[uuid] = completer; - _serverQueue.publish({"n": n}, properties: properties); + _serverQueue.publish({"n": n}, properties: properties); - return completer.future; - }); + return completer.future; } Future close() { @@ -67,17 +64,14 @@ class FibonacciRpcClient { } } -main(List args) { +main(List args) async { FibonacciRpcClient client = FibonacciRpcClient(); int n = args.isEmpty ? 30 : num.parse(args[0]); // Make 10 parallel calls and get fib(1) to fib(10) - client - .call(n) - .then((int res) { - print(" [x] fib(${n}) = ${res}"); - }) - .then((_) => client.close()) - .then((_) => exit(0)); + int res = await client.call(n); + print(" [x] fib(${n}) = ${res}"); + await client.close(); + exit(0); } diff --git a/example/rpc/rpc_server.dart b/example/rpc/rpc_server.dart index 59f2668..71d7268 100644 --- a/example/rpc/rpc_server.dart +++ b/example/rpc/rpc_server.dart @@ -11,27 +11,23 @@ int fib(int n) { return fib(n - 1) + fib(n - 2); } -void main(List args) { +void main(List args) async { Client client = Client(); // Setup a signal handler to cleanly exit if CTRL+C is pressed - ProcessSignal.sigint.watch().listen((_) { - client.close().then((_) { - exit(0); - }); + ProcessSignal.sigint.watch().listen((_) async { + await client.close(); + exit(0); }); - client - .channel() - .then((Channel channel) => channel.qos(0, 1)) - .then((Channel channel) => channel.queue("rpc_queue")) - .then((Queue queue) => queue.consume()) - .then((Consumer consumer) { - print(" [x] Awaiting RPC request"); - consumer.listen((AmqpMessage message) { - int n = message.payloadAsJson["n"]; - print(" [.] fib(${n})"); - message.reply(fib(n).toString()); - }); + Channel channel = await client.channel(); + channel = await channel.qos(0, 1); + Queue queue = await channel.queue("rpc_queue"); + Consumer consumer = await queue.consume(); + print(" [x] Awaiting RPC request"); + consumer.listen((message) { + int n = message.payloadAsJson["n"]; + print(" [.] fib(${n})"); + message.reply(fib(n).toString()); }); } diff --git a/example/topics/emit_log_topic.dart b/example/topics/emit_log_topic.dart index 663bb9e..7f0b102 100644 --- a/example/topics/emit_log_topic.dart +++ b/example/topics/emit_log_topic.dart @@ -1,7 +1,7 @@ import "dart:io"; import "package:dart_amqp/dart_amqp.dart"; -void main(List args) { +void main(List args) async { if (args.length < 2) { print(""" Error: invalid arguments. Please invoke as: @@ -18,15 +18,11 @@ void main(List args) { String routingKey = args.first; Client client = Client(); - client - .channel() - .then((Channel channel) => - channel.exchange("topic_logs", ExchangeType.TOPIC)) - .then((Exchange exchange) { - String message = args.sublist(1).join(' '); - // Use 'severity' as our routing key - exchange.publish(message, routingKey); - print(" [x] Sent [${routingKey}] ${message}"); - return client.close(); - }); + Channel channel = await client.channel(); + Exchange exchange = await channel.exchange("topic_logs", ExchangeType.TOPIC); + String message = args.sublist(1).join(' '); + // Use 'severity' as our routing key + exchange.publish(message, routingKey); + print(" [x] Sent [${routingKey}] ${message}"); + await client.close(); } diff --git a/example/topics/receive_logs_topic.dart b/example/topics/receive_logs_topic.dart index e1b785d..2f1e67f 100644 --- a/example/topics/receive_logs_topic.dart +++ b/example/topics/receive_logs_topic.dart @@ -1,7 +1,7 @@ import "dart:io"; import "package:dart_amqp/dart_amqp.dart"; -void main(List args) { +void main(List args) async { if (args.isEmpty) { print(""" Error: invalid arguments. Please invoke as: @@ -21,23 +21,18 @@ void main(List args) { Client client = Client(); // Setup a signal handler to cleanly exit if CTRL+C is pressed - ProcessSignal.sigint.watch().listen((_) { - client.close().then((_) { - exit(0); - }); + ProcessSignal.sigint.watch().listen((_) async { + await client.close(); + exit(0); }); - client - .channel() - .then((Channel channel) => - channel.exchange("topic_logs", ExchangeType.TOPIC)) - .then((Exchange exchange) => exchange.bindPrivateQueueConsumer(args)) - .then((Consumer consumer) { + Channel channel = await client.channel(); + Exchange exchange = await channel.exchange("topic_logs", ExchangeType.TOPIC); + Consumer consumer = await exchange.bindPrivateQueueConsumer(args); + print( + " [*] Waiting for [${args.join(', ')}] logs on private queue ${consumer.queue.name}. To exit, press CTRL+C"); + consumer.listen((message) { print( - " [*] Waiting for [${args.join(', ')}] logs on private queue ${consumer.queue.name}. To exit, press CTRL+C"); - consumer.listen((AmqpMessage message) { - print( - " [x] [Exchange: ${message.exchangeName}] [${message.routingKey}] ${message.payloadAsString}"); - }); + " [x] [Exchange: ${message.exchangeName}] [${message.routingKey}] ${message.payloadAsString}"); }); } diff --git a/example/workers/new_task.dart b/example/workers/new_task.dart index 9b6c49d..63b7d7d 100644 --- a/example/workers/new_task.dart +++ b/example/workers/new_task.dart @@ -1,14 +1,11 @@ import "package:dart_amqp/dart_amqp.dart"; -void main(List args) { +void main(List args) async { Client client = Client(); - client - .channel() - .then((Channel channel) => channel.queue("task_queue", durable: true)) - .then((Queue queue) { - String message = args.isEmpty ? "Hello World!" : args.join(" "); - queue.publish(message, properties: MessageProperties.persistentMessage()); - print(" [x] Sent ${message}"); - return client.close(); - }); + Channel channel = await client.channel(); + Queue queue = await channel.queue("task_queue", durable: true); + String message = args.isEmpty ? "Hello World!" : args.join(" "); + queue.publish(message, properties: MessageProperties.persistentMessage()); + print(" [x] Sent ${message}"); + await client.close(); } diff --git a/example/workers/worker.dart b/example/workers/worker.dart index 10a54d7..7ccd791 100644 --- a/example/workers/worker.dart +++ b/example/workers/worker.dart @@ -1,32 +1,28 @@ import "dart:io"; import "package:dart_amqp/dart_amqp.dart"; -void main() { +void main() async { Client client = Client(); // Setup a signal handler to cleanly exit if CTRL+C is pressed - ProcessSignal.sigint.watch().listen((_) { - client.close().then((_) { - exit(0); - }); + ProcessSignal.sigint.watch().listen((_) async { + await client.close(); + exit(0); }); - client - .channel() - .then((Channel channel) => channel.qos(0, 1)) - .then((Channel channel) => channel.queue("task_queue", durable: true)) - .then((Queue queue) => queue.consume(noAck: false)) - .then((Consumer consumer) { - print(" [*] Waiting for messages. To exit, press CTRL+C"); - consumer.listen((AmqpMessage message) { - String payload = message.payloadAsString; - print(" [x] Received ${payload}"); - // Emulate a long task by sleeping 1 second for each '.' character in message - sleep(Duration(seconds: payload.split(".").length)); - print(" [x] Done"); + Channel channel = await client.channel(); + channel = await channel.qos(0, 1); + Queue queue = await channel.queue("task_queue", durable: true); + Consumer consumer = await queue.consume(noAck: false); + print(" [*] Waiting for messages. To exit, press CTRL+C"); + consumer.listen((message) { + String payload = message.payloadAsString; + print(" [x] Received ${payload}"); + // Emulate a long task by sleeping 1 second for each '.' character in message + sleep(Duration(seconds: payload.split(".").length)); + print(" [x] Done"); - // Ack message so it is marked as processed - message.ack(); - }); + // Ack message so it is marked as processed + message.ack(); }); } diff --git a/lib/src/client/impl/exchange_impl.dart b/lib/src/client/impl/exchange_impl.dart index 44083d2..98b33fb 100644 --- a/lib/src/client/impl/exchange_impl.dart +++ b/lib/src/client/impl/exchange_impl.dart @@ -45,7 +45,7 @@ class _ExchangeImpl implements Exchange { } Future bindPrivateQueueConsumer(List routingKeys, - {String consumerTag, bool noAck = true}) { + {String consumerTag, bool noAck = true}) async { // Fanout and headers exchanges do not need to specify any keys. Use the default one if none is specified if ((type == ExchangeType.FANOUT || type == ExchangeType.HEADERS) && (routingKeys == null || routingKeys.isEmpty)) { @@ -57,10 +57,10 @@ class _ExchangeImpl implements Exchange { "One or more routing keys needs to be specified for this exchange type"); } - return channel.privateQueue().then((Queue queue) { - return Future.forEach( - routingKeys, (String routingKey) => queue.bind(this, routingKey)) - .then((_) => queue.consume(consumerTag: consumerTag, noAck: noAck)); - }); + Queue queue = await channel.privateQueue(); + for (String routingKey in routingKeys) { + await queue.bind(this, routingKey); + } + return queue.consume(consumerTag: consumerTag, noAck: noAck); } } diff --git a/test/lib/auth_test.dart b/test/lib/auth_test.dart index ec77e34..40c511b 100644 --- a/test/lib/auth_test.dart +++ b/test/lib/auth_test.dart @@ -153,20 +153,20 @@ main({bool enableLogger = true}) { return client.close(); }); - test("PLAIN authenticaion", () { + test("PLAIN authenticaion", () async { ConnectionSettings settings = ConnectionSettings( authProvider: const PlainAuthenticator("guest", "guest")); client = Client(settings: settings); - client.connect().then(expectAsync1((_) {})); + await client.connect(); }); - test("AMQPLAIN authenticaion", () { + test("AMQPLAIN authenticaion", () async { ConnectionSettings settings = ConnectionSettings( authProvider: const AmqPlainAuthenticator("guest", "guest")); client = Client(settings: settings); - client.connect().then(expectAsync1((_) {})); + await client.connect(); }); }); @@ -184,11 +184,12 @@ main({bool enableLogger = true}) { return server.listen('127.0.0.1', 9000); }); - tearDown(() { - return client.close().then((_) => server.shutdown()); + tearDown(() async { + await client.close(); + await server.shutdown(); }); - test("multiple challenge-response rounds", () { + test("multiple challenge-response rounds", () async { generateHandshakeMessages(frameWriter, server, 10); // Encode final connection close @@ -196,7 +197,7 @@ main({bool enableLogger = true}) { server.replayList.add(frameWriter.outputEncoder.writer.joinChunks()); frameWriter.outputEncoder.writer.clear(); - client.connect().then(expectAsync1((_) {})); + await client.connect(); }); }); @@ -207,31 +208,35 @@ main({bool enableLogger = true}) { return client.close(); }); - test("unsupported SASL provider", () { + test("unsupported SASL provider", () async { ConnectionSettings settings = ConnectionSettings(authProvider: FooAuthProvider()); client = Client(settings: settings); - client.connect().catchError(expectAsync1((e) { + try { + await client.connect(); + } catch (e) { expect(e, const TypeMatcher()); expect( e.message, startsWith( "Selected authentication provider 'foo' is unsupported by the server")); - })); + } }); - test("invalid auth credentials", () { + test("invalid auth credentials", () async { ConnectionSettings settings = ConnectionSettings( authProvider: const PlainAuthenticator("foo", "foo")); client = Client(settings: settings); - client.connect().catchError(expectAsync1((e) { + try { + await client.connect(); + } catch (e) { expect(e, const TypeMatcher()); expect(e.message, equals("Authentication failed")); - })); + } }); }); } diff --git a/test/lib/channel_test.dart b/test/lib/channel_test.dart index b25c0c1..a97fa88 100644 --- a/test/lib/channel_test.dart +++ b/test/lib/channel_test.dart @@ -25,72 +25,68 @@ main({bool enableLogger = true}) { return client.close(); }); - test("select() followed by commit()", () { - return client - .channel() - .then((Channel channel) => channel.select()) - .then((Channel channel) => channel.commit()); + test("select() followed by commit()", () async { + Channel channel = await client.channel(); + channel = await channel.select(); + channel = await channel.commit(); }); - test("select() followed by rollback()", () { - return client - .channel() - .then((Channel channel) => channel.select()) - .then((Channel channel) => channel.rollback()); + test("select() followed by rollback()", () async { + Channel channel = await client.channel(); + channel = await channel.select(); + channel = await channel.rollback(); }); - test("flow control: off", () { + test("flow control: off", () async { // Rabbit does not support setting flow control to on - return client.channel().then((Channel channel) => channel.flow(true)); + Channel channel = await client.channel(); + channel = await channel.flow(true); }); group("exceptions:", () { - test("sending data on a closed channel should raise an exception", () { - return client - .channel() - .then((Channel channel) => channel.close()) - .then((Channel channel) { - expect( - () => channel.privateQueue(), - throwsA((e) => - e is StateError && e.message == "Channel has been closed")); - }); + test("sending data on a closed channel should raise an exception", + () async { + Channel channel = await client.channel(); + channel = await channel.close(); + expect( + () => channel.privateQueue(), + throwsA((e) => + e is StateError && e.message == "Channel has been closed")); }); test( "commit() on a non-transactional channel should raise a precondition-failed error", - () { - client - .channel() - .then((Channel channel) => channel.commit()) - .then((_) => fail("Expected an exception to be thrown")) - .catchError(expectAsync1((e) { + () async { + try { + Channel channel = await client.channel(); + channel = await channel.commit(); + fail("Expected an exception to be thrown"); + } catch (e) { expect(e, const TypeMatcher()); expect((e as ChannelException).errorType, equals(ErrorType.PRECONDITION_FAILED)); - })); + } }); test( "rollback() on a non-transactional channel should raise a precondition-failed error", - () { - client - .channel() - .then((Channel channel) => channel.rollback()) - .then((_) => fail("Expected an exception to be thrown")) - .catchError(expectAsync1((e) { + () async { + try { + Channel channel = await client.channel(); + channel = await channel.rollback(); + fail("Expected an exception to be thrown"); + } catch (e) { expect(e, const TypeMatcher()); expect((e as ChannelException).errorType, equals(ErrorType.PRECONDITION_FAILED)); expect(e.toString(), startsWith("ChannelException(PRECONDITION_FAILED)")); - })); + } }); - test("revocer()", () { - return client - .channel() - .then((Channel channel) => channel.recover(true)); + test("revocer()", () async { + Channel channel = await client.channel(); + channel = await channel.recover(true); }); }); }); diff --git a/test/lib/client_test.dart b/test/lib/client_test.dart index eae94bb..ae808ca 100644 --- a/test/lib/client_test.dart +++ b/test/lib/client_test.dart @@ -1,6 +1,6 @@ library dart_amqp.test.client; -import "dart:async"; +import 'dart:async'; import "package:test/test.dart"; @@ -18,55 +18,53 @@ main({bool enableLogger = true}) { group("Client test:", () { Client client; - tearDown(() { - return client.close(); + tearDown(() async { + await client.close(); }); - test("fail to connect after 2 attempts", () { + test("fail to connect after 2 attempts", () async { ConnectionSettings settings = ConnectionSettings( port: 8765, maxConnectionAttempts: 2, reconnectWaitTime: const Duration(milliseconds: 10)); client = Client(settings: settings); - client.connect().catchError(expectAsync1((ex) { + try { + await client.connect(); + } catch (ex) { expect(ex, const TypeMatcher()); expect(ex.toString(), startsWith('ConnectionFailedException')); - })); + } }); - test("multiple open attampts should return the same future", () { + test("multiple open attampts should return the same future", () async { client = Client(); - Future connectFuture = client.connect(); - expect(client.connect(), equals(connectFuture)); - - return connectFuture; + await connectFuture; }); - test("multiple close attampts should return the same future", () { + test("multiple close attampts should return the same future", () async { client = Client(); - - return client.connect().then((_) { - Future closeFuture = client.close(); - - expect(client.close(), equals(closeFuture)); - - return closeFuture; - }); + await client.connect(); + Future closeFuture = client.close(); + expect(client.close(), equals(closeFuture)); + await closeFuture; }); - test("exception when exceeding negotiated channel limit", () { + test("exception when exceeding negotiated channel limit", () async { ConnectionSettings settings = ConnectionSettings(tuningSettings: TuningSettings()..maxChannels = 1); client = Client(settings: settings); - return client.channel().then((_) => client.channel()).catchError((ex) { + await client.channel(); + try { + await client.channel(); + } catch (ex) { expect(ex, const TypeMatcher()); expect(ex.message, equals("Cannot allocate channel; channel limit exceeded (max 1)")); - }); + } }); }); } diff --git a/test/lib/exception_handling_test.dart b/test/lib/exception_handling_test.dart index 130ff16..0eac16a 100644 --- a/test/lib/exception_handling_test.dart +++ b/test/lib/exception_handling_test.dart @@ -134,12 +134,13 @@ main({bool enableLogger = true}) { return server.listen('127.0.0.1', port); }); - tearDown(() { - return client.close().then((_) => server.shutdown()); + tearDown(() async { + await client.close(); + await server.shutdown(); }); group("fatal exceptions:", () { - test("protocol mismatch", () { + test("protocol mismatch", () async { TypeEncoder encoder = TypeEncoder(); ProtocolHeader() ..protocolVersion = 0 @@ -158,12 +159,15 @@ main({bool enableLogger = true}) { "Could not negotiate a valid AMQP protocol version. Server supports AMQP 0.8.0")); } - client.connect().then((_) { + try { + await client.connect(); fail("Expected a FatalException to be thrown"); - }).catchError(expectAsync2(handleError)); + } catch (e, s) { + handleError(e, s); + } }); - test("frame without terminator", () { + test("frame without terminator", () async { frameWriter.writeMessage( 0, ConnectionStartMock() @@ -185,12 +189,15 @@ main({bool enableLogger = true}) { "Frame did not end with the expected frame terminator (0xCE)")); } - client.connect().then((_) { + try { + await client.connect(); fail("Expected an exception to be thrown"); - }).catchError(expectAsync2(handleError)); + } catch (e, s) { + handleError(e, s); + } }); - test("frame on channel > 0 while handshake in progress", () { + test("frame on channel > 0 while handshake in progress", () async { frameWriter.writeMessage( 1, ConnectionStartMock() @@ -209,12 +216,15 @@ main({bool enableLogger = true}) { "Received message for channel 1 while still handshaking")); } - client.connect().then((_) { + try { + await client.connect(); fail("Expected an exception to be thrown"); - }).catchError(expectAsync2(handleError)); + } catch (e, s) { + handleError(e, s); + } }); - test("unexpected frame during handshake", () { + test("unexpected frame during handshake", () async { // Connection start frameWriter.writeMessage( 0, @@ -250,14 +260,17 @@ main({bool enableLogger = true}) { "Received unexpected message TxSelectOk during handshake")); } - client.connect().then((_) { + try { + await client.connect(); fail("Expected an exception to be thrown"); - }).catchError(expectAsync2(handleError)); + } catch (e, s) { + handleError(e, s); + } }); }); group("connection exceptions:", () { - test("illegal frame size", () { + test("illegal frame size", () async { frameWriter.writeMessage( 0, ConnectionStartMock() @@ -283,12 +296,15 @@ main({bool enableLogger = true}) { "Frame size cannot be larger than ${tuningSettings.maxFrameSize} bytes. Server sent ${tuningSettings.maxFrameSize + 1} bytes")); } - client.connect().then((_) { + try { + await client.connect(); fail("Expected an exception to be thrown"); - }).catchError(expectAsync2(handleError)); + } catch (e, s) { + handleError(e, s); + } }); - test("connection-class message on channel > 0 post handshake", () { + test("connection-class message on channel > 0 post handshake", () async { generateHandshakeMessages(frameWriter, server); // Add a fake connection start message at channel 1 @@ -310,12 +326,15 @@ main({bool enableLogger = true}) { "Received CONNECTION class message on a channel > 0")); } - client.channel().then((_) { + try { + await client.channel(); fail("Expected an exception to be thrown"); - }).catchError(expectAsync2(handleError)); + } catch (e, s) { + handleError(e, s); + } }); - test("HEARTBEAT message on channel > 0", () { + test("HEARTBEAT message on channel > 0", () async { generateHandshakeMessages(frameWriter, server); // Add a heartbeat start message at channel 1 @@ -331,12 +350,15 @@ main({bool enableLogger = true}) { "Received HEARTBEAT message on a channel > 0")); } - client.channel().then((_) { + try { + await client.channel(); fail("Expected an exception to be thrown"); - }).catchError(expectAsync2(handleError)); + } catch (e, s) { + handleError(e, s); + } }); - test("connection close message post handshake", () { + test("connection close message post handshake", () async { generateHandshakeMessages(frameWriter, server); // Add a fake connection start message at channel 1 @@ -355,9 +377,12 @@ main({bool enableLogger = true}) { equals("ConnectionException(ACCESS_REFUSED): No access")); } - client.channel().then((_) { + try { + await client.channel(); fail("Expected an exception to be thrown"); - }).catchError(expectAsync2(handleError)); + } catch (e, s) { + handleError(e, s); + } }); }); group("error stream:", () { @@ -367,20 +392,15 @@ main({bool enableLogger = true}) { } // ignore: unawaited_futures - server - .shutdown() - .then((_) => - server.listen(client.settings.host, client.settings.port)) - .then((_) { + server.shutdown().then((_) async { + await server.listen(client.settings.host, client.settings.port); generateHandshakeMessages(frameWriter, server); - return client.connect().then((_) { - client.errorListener((ex) => handleError(ex)); - return server - .shutdown() - .then((_) => Future.delayed( - const Duration(seconds: 5) + server.responseDelay)) - .then((_) => fail("Expected an exception to be thrown")); - }); + await client.connect(); + client.errorListener((ex) => handleError(ex)); + await server.shutdown(); + await Future.delayed( + const Duration(seconds: 5) + server.responseDelay); + fail("Expected an exception to be thrown"); }); }); }, skip: true); diff --git a/test/lib/exchange_test.dart b/test/lib/exchange_test.dart index f2a589d..205c1fd 100644 --- a/test/lib/exchange_test.dart +++ b/test/lib/exchange_test.dart @@ -25,362 +25,277 @@ main({bool enableLogger = true}) { client2 = Client(); }); - tearDown(() { - return client.close().then((_) => client2.close()); + tearDown(() async { + await client.close(); + await client2.close(); }); - test("check if unknown exchange exists", () { - client - .channel() - .then((Channel channel) => - channel.exchange("foo123", ExchangeType.DIRECT, passive: true)) - .then((_) => fail("Expected an exception to be thrown")) - .catchError(expectAsync1((e) { + test("check if unknown exchange exists", () async { + try { + Channel channel = await client.channel(); + await channel.exchange("foo123", ExchangeType.DIRECT, passive: true); + fail("Expected an exception to be thrown"); + } catch (e) { expect(e, const TypeMatcher()); expect((e as ExchangeNotFoundException).errorType, equals(ErrorType.NOT_FOUND)); expect( e.toString(), startsWith("ExchangeNotFoundException: NOT_FOUND")); - })); + } }); - test("declare exchange", () { - client - .channel() - .then((Channel channel) => - channel.exchange("ex_test_1", ExchangeType.DIRECT)) - .then(expectAsync1((Exchange exchange) { - expect(exchange.channel, const TypeMatcher()); - expect(exchange.name, equals("ex_test_1")); - expect(exchange.type, equals(ExchangeType.DIRECT)); - })); + test("declare exchange", () async { + Channel channel = await client.channel(); + Exchange exchange = + await channel.exchange("ex_test_1", ExchangeType.DIRECT); + expect(exchange.channel, const TypeMatcher()); + expect(exchange.name, equals("ex_test_1")); + expect(exchange.type, equals(ExchangeType.DIRECT)); }); - test("declare exchange and bind private queue consumer", () { - client - .channel() - .then((Channel channel) => - channel.exchange("ex_test_1", ExchangeType.DIRECT)) - .then((Exchange exchange) => - exchange.bindPrivateQueueConsumer(["test"])) - .then(expectAsync1((Consumer consumer) { - expect(consumer.channel, const TypeMatcher()); - expect(consumer.queue, const TypeMatcher()); - expect(consumer.tag, isNotEmpty); - })); + test("declare exchange and bind private queue consumer", () async { + Channel channel = await client.channel(); + Exchange exchange = + await channel.exchange("ex_test_1", ExchangeType.DIRECT); + Consumer consumer = await exchange.bindPrivateQueueConsumer(["test"]); + expect(consumer.channel, const TypeMatcher()); + expect(consumer.queue, const TypeMatcher()); + expect(consumer.tag, isNotEmpty); }); - test("declare exchange and bind multiple routing keys", () { - client - .channel() - .then((Channel channel) => channel.qos(null, 1)) - .then((Channel channel) => - channel.exchange("ex_test_1", ExchangeType.DIRECT)) - .then((Exchange exchange) => - exchange.bindPrivateQueueConsumer(["test", "foo", "bar"])) - .then(expectAsync1((Consumer consumer) { - expect(consumer.channel, const TypeMatcher()); - expect(consumer.queue, const TypeMatcher()); - expect(consumer.tag, isNotEmpty); - })); + test("declare exchange and bind multiple routing keys", () async { + Channel channel = await client.channel(); + channel = await channel.qos(null, 1); + Exchange exchange = + await channel.exchange("ex_test_1", ExchangeType.DIRECT); + Consumer consumer = + await exchange.bindPrivateQueueConsumer(["test", "foo", "bar"]); + expect(consumer.channel, const TypeMatcher()); + expect(consumer.queue, const TypeMatcher()); + expect(consumer.tag, isNotEmpty); }); - test("declare exchange and publish message", () { + test("declare exchange and publish message", () async { Completer testCompleter = Completer(); - client - .channel() - .then((Channel channel) => - channel.exchange("ex_test_1", ExchangeType.DIRECT)) - .then((Exchange exchange) => - exchange.bindPrivateQueueConsumer(["test"])) - .then((Consumer consumer) { - // Listen for messages - consumer.listen(expectAsync1((AmqpMessage message) { - expect(message.payloadAsString, equals("Test message 1234")); - expect(message.routingKey, equals("test")); - - // Check for exception with missing reply-to property - expect( - () => message.reply(""), - throwsA((e) => - e is ArgumentError && - e.message == - "No reply-to property specified in the incoming message")); - - testCompleter.complete(); - })); - - // Connect second client and publish message to excahnge - client2 - .channel() - .then((Channel channel) => - channel.exchange("ex_test_1", ExchangeType.DIRECT)) - .then((Exchange client2Exchange) => - client2Exchange.publish("Test message 1234", "test")); - }); + + Channel channel = await client.channel(); + Exchange exchange = + await channel.exchange("ex_test_1", ExchangeType.DIRECT); + Consumer consumer = await exchange.bindPrivateQueueConsumer(["test"]); + + // Listen for messages + consumer.listen(expectAsync1((AmqpMessage message) { + expect(message.payloadAsString, equals("Test message 1234")); + expect(message.routingKey, equals("test")); + + // Check for exception with missing reply-to property + expect( + () => message.reply(""), + throwsA((e) => + e is ArgumentError && + e.message == + "No reply-to property specified in the incoming message")); + + testCompleter.complete(); + })); + + // Connect second client and publish message to exchange + Channel channel2 = await client.channel(); + Exchange client2Exchange = + await channel2.exchange("ex_test_1", ExchangeType.DIRECT); + client2Exchange.publish("Test message 1234", "test"); return testCompleter.future; }); - test("publish unrouteable message", () { + test("publish unrouteable message", () async { Completer testCompleter = Completer(); - client.channel().then((Channel channel) { - channel.basicReturnListener((BasicReturnMessage message) { - expect(message.replyCode, equals(312)); - expect(message.routingKey, equals("test")); - testCompleter.complete(); - }); - return channel.exchange("ex_test_1", ExchangeType.DIRECT); - }).then((Exchange exchange) => - exchange.publish("Test message 1234", "test", mandatory: true)); + + Channel channel = await client.channel(); + channel.basicReturnListener((BasicReturnMessage message) { + expect(message.replyCode, equals(312)); + expect(message.routingKey, equals("test")); + testCompleter.complete(); + }); + Exchange exchange = + await channel.exchange("ex_test_1", ExchangeType.DIRECT); + exchange.publish("Test message 1234", "test", mandatory: true); + return testCompleter.future; }); - test("two client json conversation through an exchange", () { + test("two client json conversation through an exchange", () async { Completer testCompleter = Completer(); - client - .channel() - .then((Channel channel) => - channel.exchange("ex_test_1", ExchangeType.DIRECT)) - .then((Exchange exchange) => - exchange.bindPrivateQueueConsumer(["test"])) - .then((Consumer consumer) { - // Listen for messages - consumer.listen((AmqpMessage message) { - expect(message.payloadAsString, equals('{"message":"1234"}')); - expect(message.payloadAsJson, equals({"message": "1234"})); - expect(message.payload, equals(message.payloadAsString.codeUnits)); - expect(message.routingKey, equals("test")); - expect(message.properties.corellationId, equals("123")); - expect(message.exchangeName, equals("ex_test_1")); - - // Reply with echo to sender - message.reply("echo:${message.payloadAsString}"); - }); - - // Connect second client and publish message to excahnge - client2 - .channel() - .then((Channel channel) => - channel.exchange("ex_test_1", ExchangeType.DIRECT)) - .then((Exchange client2Exchange) { - // Allocate private queue for response - client2Exchange.channel - .privateQueue() - .then((Queue replyQueue) => replyQueue.consume()) - .then((Consumer replyConsumer) { - // Bind reply listener - replyConsumer.listen((AmqpMessage reply) { - expect(reply.properties.corellationId, equals("123")); - expect(reply.payloadAsString, equals('echo:{"message":"1234"}')); - - // Pass! - testCompleter.complete(); - }); - - // Send initial message via exchange - client2Exchange.publish({"message": "1234"}, "test", - properties: MessageProperties() - ..corellationId = "123" - ..replyTo = replyConsumer.queue.name); - }); - }); + + Channel channel = await client.channel(); + Exchange exchange = + await channel.exchange("ex_test_1", ExchangeType.DIRECT); + Consumer consumer = await exchange.bindPrivateQueueConsumer(["test"]); + + // Listen for messages + consumer.listen((AmqpMessage message) { + expect(message.payloadAsString, equals('{"message":"1234"}')); + expect(message.payloadAsJson, equals({"message": "1234"})); + expect(message.payload, equals(message.payloadAsString.codeUnits)); + expect(message.routingKey, equals("test")); + expect(message.properties.corellationId, equals("123")); + expect(message.exchangeName, equals("ex_test_1")); + + // Reply with echo to sender + message.reply("echo:${message.payloadAsString}"); + }); + + // Connect second client and publish message to excahnge + Channel channel2 = await client2.channel(); + Exchange client2Exchange = + await channel2.exchange("ex_test_1", ExchangeType.DIRECT); + + // Allocate private queue for response + Queue replyQueue = await client2Exchange.channel.privateQueue(); + Consumer replyConsumer = await replyQueue.consume(); + + // Bind reply listener + replyConsumer.listen((AmqpMessage reply) { + expect(reply.properties.corellationId, equals("123")); + expect(reply.payloadAsString, equals('echo:{"message":"1234"}')); + + // Pass! + testCompleter.complete(); }); + // Send initial message via exchange + client2Exchange.publish({"message": "1234"}, "test", + properties: MessageProperties() + ..corellationId = "123" + ..replyTo = replyConsumer.queue.name); + return testCompleter.future; }); - test("declare and delete exchange", () { - client - .channel() - .then((Channel channel) => channel.qos(0, 1)) - .then((Channel channel) => - channel.exchange("ex_test_1", ExchangeType.DIRECT)) - .then((Exchange exchange) => exchange.delete()) - .then(expectAsync1((Exchange exchange) {})); + test("declare and delete exchange", () async { + Channel channel = await client.channel(); + channel = await channel.qos(0, 1); + Exchange exchange = + await channel.exchange("ex_test_1", ExchangeType.DIRECT); + await exchange.delete(); }); - test("publish to FANOUT exchange without a routing key", () { - client - .channel() - .then((Channel channel) => - channel.exchange("ex_test_2", ExchangeType.FANOUT)) - .then((Exchange exchange) => exchange.publish("Hello", "")) - .then(expectAsync1((_) {})); + test("publish to FANOUT exchange without a routing key", () async { + Channel channel = await client.channel(); + Exchange exchange = + await channel.exchange("ex_test_2", ExchangeType.FANOUT); + exchange.publish("Hello", ""); }); - test("bind queue to FANOUT exchange without a routing key", () { - client - .channel() - .then((Channel channel) => - channel.exchange("ex_test_2", ExchangeType.FANOUT)) - .then((Exchange exchange) => exchange.bindPrivateQueueConsumer([])) - .then(expectAsync1((Consumer consumer) {})); + test("bind queue to FANOUT exchange without a routing key", () async { + Channel channel = await client.channel(); + Exchange exchange = + await channel.exchange("ex_test_2", ExchangeType.FANOUT); + await exchange.bindPrivateQueueConsumer([]); }); - test("unbind queue from exchange", () { - Completer testCompleter = Completer(); - - client - .channel() - .then((Channel channel) => - channel.exchange("ex_test_2", ExchangeType.FANOUT)) - .then((Exchange exchange) { - exchange.channel - .privateQueue() - .then((Queue privateQueue) => privateQueue.bind(exchange, "")) - .then((Queue boundQueue) => boundQueue.unbind(exchange, "")) - .then((Queue unboundQueue) { - testCompleter.complete(); - }); - }); - - return testCompleter.future; + test("unbind queue from exchange", () async { + Channel channel = await client.channel(); + Exchange exchange = + await channel.exchange("ex_test_2", ExchangeType.FANOUT); + Queue privateQueue = await exchange.channel.privateQueue(); + Queue boundQueue = await privateQueue.bind(exchange, ""); + await boundQueue.unbind(exchange, ""); }); group("exceptions", () { - test("missing exchange name", () { - return client.channel().then((Channel channel) { - expect( - () => channel.exchange(null, null), - throwsA((ex) => - ex is ArgumentError && - ex.message == "The name of the exchange cannot be empty")); - }); + test("missing exchange name", () async { + Channel channel = await client.channel(); + expect( + () => channel.exchange(null, null), + throwsA((ex) => + ex is ArgumentError && + ex.message == "The name of the exchange cannot be empty")); }); - test("missing exchange type", () { - return client.channel().then((Channel channel) { - expect( - () => channel.exchange("foo", null), - throwsA((ex) => - ex is ArgumentError && - ex.message == - "The type of the exchange needs to be specified")); - }); + test("missing exchange type", () async { + Channel channel = await client.channel(); + expect( + () => channel.exchange("foo", null), + throwsA((ex) => + ex is ArgumentError && + ex.message == + "The type of the exchange needs to be specified")); }); - test("missing routing key for non-fanout exchange publish", () { - return client - .channel() - .then((Channel channel) => - channel.exchange("test", ExchangeType.DIRECT)) - .then((Exchange exchange) { - expect( - () => exchange.publish("foo", null), - throwsA((ex) => - ex is ArgumentError && - ex.message == "A valid routing key needs to be specified")); - }); + test("missing routing key for non-fanout exchange publish", () async { + Channel channel = await client.channel(); + Exchange exchange = await channel.exchange("test", ExchangeType.DIRECT); + expect( + () => exchange.publish("foo", null), + throwsA((ex) => + ex is ArgumentError && + ex.message == "A valid routing key needs to be specified")); }); test("missing private queue routing key for non-fanout exchange consumer", - () { - return client - .channel() - .then((Channel channel) => - channel.exchange("test", ExchangeType.DIRECT)) - .then((Exchange exchange) { - expect( - () => exchange.bindPrivateQueueConsumer([]), - throwsA((ex) => - ex is ArgumentError && - ex.message == - "One or more routing keys needs to be specified for this exchange type")); - }); + () async { + Channel channel = await client.channel(); + Exchange exchange = await channel.exchange("test", ExchangeType.DIRECT); + expect( + () => exchange.bindPrivateQueueConsumer([]), + throwsA((ex) => + ex is ArgumentError && + ex.message == + "One or more routing keys needs to be specified for this exchange type")); }); - test("bind to non-FANOUT exchange without specifying routing key", () { - Completer testCompleter = Completer(); - - client - .channel() - .then((Channel channel) => - channel.exchange("test", ExchangeType.DIRECT)) - .then((Exchange exchange) { - exchange.channel.privateQueue().then((Queue queue) { - expect( - () => queue.bind(exchange, ""), - throwsA((ex) => - ex is ArgumentError && - ex.message == - "A routing key needs to be specified to bind to this exchange type")); - testCompleter.complete(); - }); - }); - - return testCompleter.future; + test("bind to non-FANOUT exchange without specifying routing key", + () async { + Channel channel = await client.channel(); + Exchange exchange = await channel.exchange("test", ExchangeType.DIRECT); + Queue queue = await exchange.channel.privateQueue(); + expect( + () => queue.bind(exchange, ""), + throwsA((ex) => + ex is ArgumentError && + ex.message == + "A routing key needs to be specified to bind to this exchange type")); }); test("unbind from non-FANOUT exchange without specifying routing key", - () { - Completer testCompleter = Completer(); - - client - .channel() - .then((Channel channel) => - channel.exchange("test", ExchangeType.DIRECT)) - .then((Exchange exchange) { - exchange.channel - .privateQueue() - .then((Queue queue) => queue.bind(exchange, "test")) - .then((Queue queue) { - expect( - () => queue.unbind(exchange, ""), - throwsA((ex) => - ex is ArgumentError && - ex.message == - "A routing key needs to be specified to unbind from this exchange type")); - testCompleter.complete(); - }); - }); - - return testCompleter.future; + () async { + Channel channel = await client.channel(); + Exchange exchange = await channel.exchange("test", ExchangeType.DIRECT); + + Queue queue = await exchange.channel.privateQueue(); + queue = await queue.bind(exchange, "test"); + expect( + () => queue.unbind(exchange, ""), + throwsA((ex) => + ex is ArgumentError && + ex.message == + "A routing key needs to be specified to unbind from this exchange type")); }); - test("bind queue to null exchange", () { - Completer testCompleter = Completer(); - - client - .channel() - .then((Channel channel) => - channel.exchange("test", ExchangeType.DIRECT)) - .then((Exchange exchange) { - exchange.channel.privateQueue().then((Queue queue) { - expect( - () => queue.bind(null, ""), - throwsA((ex) => - ex is ArgumentError && - ex.message == "Exchange cannot be null")); - testCompleter.complete(); - }); - }); - - return testCompleter.future; + test("bind queue to null exchange", () async { + Channel channel = await client.channel(); + Exchange exchange = await channel.exchange("test", ExchangeType.DIRECT); + + Queue queue = await exchange.channel.privateQueue(); + expect( + () => queue.bind(null, ""), + throwsA((ex) => + ex is ArgumentError && + ex.message == "Exchange cannot be null")); }); - test("unbind queue from null exchange", () { - Completer testCompleter = Completer(); - - client - .channel() - .then((Channel channel) => - channel.exchange("test", ExchangeType.DIRECT)) - .then((Exchange exchange) { - exchange.channel - .privateQueue() - .then((Queue queue) => queue.bind(exchange, "test")) - .then((Queue queue) { - expect( - () => queue.unbind(null, ""), - throwsA((ex) => - ex is ArgumentError && - ex.message == "Exchange cannot be null")); - testCompleter.complete(); - }); - }); - - return testCompleter.future; + test("unbind queue from null exchange", () async { + Channel channel = await client.channel(); + Exchange exchange = await channel.exchange("test", ExchangeType.DIRECT); + + Queue queue = await exchange.channel.privateQueue(); + queue = await queue.bind(exchange, "test"); + expect( + () => queue.unbind(null, ""), + throwsA((ex) => + ex is ArgumentError && + ex.message == "Exchange cannot be null")); }); }); }); diff --git a/test/lib/mocks/mocks.dart b/test/lib/mocks/mocks.dart index 23b832b..a9e9d1f 100644 --- a/test/lib/mocks/mocks.dart +++ b/test/lib/mocks/mocks.dart @@ -63,18 +63,13 @@ class MockServer { } } - Future listen(String host, int port) { - Completer completer = Completer(); + Future listen(String host, int port) async { mockLogger.info("Binding MockServer to $host:$port"); - ServerSocket.bind(host, port).then((ServerSocket server) { - _server = server; - mockLogger.info("[$host:$port] Listening for incoming connections"); - _server.listen(_handleConnection); - completer.complete(); - }); - - return completer.future; + _server = await ServerSocket.bind(host, port); + mockLogger.info("[$host:$port] Listening for incoming connections"); + _server.listen(_handleConnection); + ; } void _handleConnection(Socket client) { diff --git a/test/lib/queue_test.dart b/test/lib/queue_test.dart index 544db92..0ff3b17 100644 --- a/test/lib/queue_test.dart +++ b/test/lib/queue_test.dart @@ -28,55 +28,44 @@ main({bool enableLogger = true}) { return client.close(); }); - test("check if unknown queue exists", () { - client - .channel() - .then((Channel channel) => channel.queue("foo", passive: true)) - .then((_) => fail("Expected an exception to be thrown")) - .catchError(expectAsync1((e) { + test("check if unknown queue exists", () async { + try { + Channel channel = await client.channel(); + await channel.queue("foo", passive: true); + fail("Expected an exception to be thrown"); + } catch (e) { expect(e, const TypeMatcher()); expect((e as QueueNotFoundException).errorType, equals(ErrorType.NOT_FOUND)); expect(e.toString(), startsWith("QueueNotFoundException: NOT_FOUND")); - })); + } }); - test("create private queue", () { - client - .channel() - .then((Channel channel) => channel.privateQueue()) - .then(expectAsync1((Queue queue) { - expect(queue.channel, const TypeMatcher()); - expect(queue.name, isNotEmpty); - expect(queue.consumerCount, equals(0)); - expect(queue.messageCount, equals(0)); - })); + test("create private queue", () async { + Channel channel = await client.channel(); + Queue queue = await channel.privateQueue(); + expect(queue.channel, const TypeMatcher()); + expect(queue.name, isNotEmpty); + expect(queue.consumerCount, equals(0)); + expect(queue.messageCount, equals(0)); }); - test("create public queue", () { - client - .channel() - .then((Channel channel) => channel.queue("test_1")) - .then(expectAsync1((Queue queue) { - expect(queue.channel, const TypeMatcher()); - expect(queue.name, isNotEmpty); - expect(queue.consumerCount, equals(0)); - expect(queue.messageCount, equals(0)); - })); + test("create public queue", () async { + Channel channel = await client.channel(); + Queue queue = await channel.queue("test_1"); + expect(queue.channel, const TypeMatcher()); + expect(queue.name, isNotEmpty); + expect(queue.consumerCount, equals(0)); + expect(queue.messageCount, equals(0)); }); - test("Check the existance of a created queue", () { - client - .channel() - .then((Channel channel) => channel.privateQueue()) - .then(expectAsync1((Queue privateQueue) { - // Check existance - privateQueue.channel - .queue(privateQueue.name, passive: true) - .then(expectAsync1((Queue queue) { - expect(queue.name, equals(privateQueue.name)); - })); - })); + test("Check the existance of a created queue", () async { + Channel channel = await client.channel(); + Queue privateQueue = await channel.privateQueue(); + // Check existance + Queue queue = + await privateQueue.channel.queue(privateQueue.name, passive: true); + expect(queue.name, equals(privateQueue.name)); }); }); @@ -93,294 +82,251 @@ main({bool enableLogger = true}) { return Future.wait([client.close(), client2.close()]); }); - test("queue message delivery", () { + test("queue message delivery", () async { Completer testCompleter = Completer(); - client - .channel() - .then((Channel channel) => channel.queue("test_2")) - .then((Queue testQueue) => testQueue.consume()) - .then((Consumer consumer) { - expect(consumer.channel, const TypeMatcher()); - expect(consumer.queue, const TypeMatcher()); - expect(consumer.tag, isNotEmpty); - - consumer.listen(expectAsync1((AmqpMessage message) { - expect(message.payloadAsString, equals("Test payload")); - testCompleter.complete(); - })); - - // Using second client publish a message to the queue - client2 - .channel() - .then((Channel channel) => channel.queue(consumer.queue.name)) - .then((Queue target) => target.publish("Test payload")); - }); + Channel channel = await client.channel(); + Queue testQueue = await channel.queue("test_2"); + Consumer consumer = await testQueue.consume(); + + expect(consumer.channel, const TypeMatcher()); + expect(consumer.queue, const TypeMatcher()); + expect(consumer.tag, isNotEmpty); + + consumer.listen(expectAsync1((AmqpMessage message) { + expect(message.payloadAsString, equals("Test payload")); + testCompleter.complete(); + })); + + // Using second client publish a message to the queue + Channel channel2 = await client2.channel(); + Queue target = await channel2.queue(consumer.queue.name); + target.publish("Test payload"); return testCompleter.future; }); - test("queue JSON message delivery (auto-filled content type)", () { + test("queue JSON message delivery (auto-filled content type)", () async { Completer testCompleter = Completer(); - client - .channel() - .then((Channel channel) => channel.queue("test_2")) - .then((Queue testQueue) => testQueue.consume()) - .then((Consumer consumer) { - expect(consumer.channel, const TypeMatcher()); - expect(consumer.queue, const TypeMatcher()); - expect(consumer.tag, isNotEmpty); - - consumer.listen(expectAsync1((AmqpMessage message) { - expect(message.payloadAsJson, equals({"message": "Test payload"})); - expect(message.properties.contentType, equals("application/json")); - testCompleter.complete(); - })); - - // Using second client publish a message to the queue - client2 - .channel() - .then((Channel channel) => channel.queue(consumer.queue.name)) - .then( - (Queue target) => target.publish({"message": "Test payload"})); - }); + Channel channel = await client.channel(); + Queue testQueue = await channel.queue("test_2"); + Consumer consumer = await testQueue.consume(); + + expect(consumer.channel, const TypeMatcher()); + expect(consumer.queue, const TypeMatcher()); + expect(consumer.tag, isNotEmpty); + + consumer.listen(expectAsync1((AmqpMessage message) { + expect(message.payloadAsJson, equals({"message": "Test payload"})); + expect(message.properties.contentType, equals("application/json")); + testCompleter.complete(); + })); + + // Using second client publish a message to the queue + Channel channel2 = await client2.channel(); + Queue target = await channel2.queue(consumer.queue.name); + target.publish({"message": "Test payload"}); return testCompleter.future; }); test( "queue JSON message delivery (auto-filled content type in existing persistent message property set)", - () { + () async { Completer testCompleter = Completer(); - client - .channel() - .then((Channel channel) => channel.queue("test_2")) - .then((Queue testQueue) => testQueue.consume()) - .then((Consumer consumer) { - expect(consumer.channel, const TypeMatcher()); - expect(consumer.queue, const TypeMatcher()); - expect(consumer.tag, isNotEmpty); - - // Use second accuracy - DateTime now = DateTime.now(); - now = now.subtract(Duration( - milliseconds: now.millisecond, microseconds: now.microsecond)); - - consumer.listen(expectAsync1((AmqpMessage message) { - expect(message.payloadAsJson, equals({"message": "Test payload"})); - expect(message.properties.contentType, equals("application/json")); - expect(message.properties.headers, equals({'X-HEADER': 'ok'})); - expect(message.properties.priority, equals(1)); - expect(message.properties.corellationId, equals("123")); - expect(message.properties.replyTo, equals("/dev/null")); - expect(message.properties.expiration, equals("60000")); - expect(message.properties.messageId, equals("0xf00")); - expect(message.properties.timestamp, equals(now)); - expect(message.properties.type, equals("test")); - expect(message.properties.userId, equals("guest")); - expect(message.properties.appId, equals("unit-test")); - testCompleter.complete(); - })); - - // Using second client publish a message with full properties to the queue - client2 - .channel() - .then((Channel channel) => channel.queue(consumer.queue.name)) - .then((Queue target) => target.publish({"message": "Test payload"}, - properties: MessageProperties.persistentMessage() - ..headers = {'X-HEADER': 'ok'} - ..priority = 1 - ..corellationId = "123" - ..replyTo = "/dev/null" - ..expiration = "60000" // 60 sec - ..messageId = "0xf00" - ..timestamp = now - ..type = "test" - ..userId = "guest" - ..appId = "unit-test")); - }); + Channel channel = await client.channel(); + Queue testQueue = await channel.queue("test_2"); + Consumer consumer = await testQueue.consume(); + + expect(consumer.channel, const TypeMatcher()); + expect(consumer.queue, const TypeMatcher()); + expect(consumer.tag, isNotEmpty); + + // Use second accuracy + DateTime now = DateTime.now(); + now = now.subtract(Duration( + milliseconds: now.millisecond, microseconds: now.microsecond)); + + consumer.listen(expectAsync1((AmqpMessage message) { + expect(message.payloadAsJson, equals({"message": "Test payload"})); + expect(message.properties.contentType, equals("application/json")); + expect(message.properties.headers, equals({'X-HEADER': 'ok'})); + expect(message.properties.priority, equals(1)); + expect(message.properties.corellationId, equals("123")); + expect(message.properties.replyTo, equals("/dev/null")); + expect(message.properties.expiration, equals("60000")); + expect(message.properties.messageId, equals("0xf00")); + expect(message.properties.timestamp, equals(now)); + expect(message.properties.type, equals("test")); + expect(message.properties.userId, equals("guest")); + expect(message.properties.appId, equals("unit-test")); + testCompleter.complete(); + })); + + // Using second client publish a message with full properties to the queue + Channel channel2 = await client2.channel(); + Queue target = await channel2.queue(consumer.queue.name); + target.publish({"message": "Test payload"}, + properties: MessageProperties.persistentMessage() + ..headers = {'X-HEADER': 'ok'} + ..priority = 1 + ..corellationId = "123" + ..replyTo = "/dev/null" + ..expiration = "60000" // 60 sec + ..messageId = "0xf00" + ..timestamp = now + ..type = "test" + ..userId = "guest" + ..appId = "unit-test"); return testCompleter.future; }); - test("queue message delivery with ack", () { + test("queue message delivery with ack", () async { Completer testCompleter = Completer(); - client - .channel() - .then((Channel channel) => channel.queue("test_3")) - .then((Queue testQueue) => testQueue.consume(noAck: false)) - .then((Consumer consumer) { - expect(consumer.channel, const TypeMatcher()); - expect(consumer.queue, const TypeMatcher()); - expect(consumer.tag, isNotEmpty); - - consumer.listen(expectAsync1((AmqpMessage message) { - expect(message.payloadAsString, equals("Test payload")); - message.ack(); - testCompleter.complete(); - })); - - // Using second client publish a message to the queue (request ack) - client2 - .channel() - .then((Channel channel) => channel.queue(consumer.queue.name)) - .then((Queue target) => - target.publish("Test payload", mandatory: true)); - }); + Channel channel = await client.channel(); + Queue testQueue = await channel.queue("test_3"); + Consumer consumer = await testQueue.consume(noAck: false); + + expect(consumer.channel, const TypeMatcher()); + expect(consumer.queue, const TypeMatcher()); + expect(consumer.tag, isNotEmpty); + + consumer.listen(expectAsync1((AmqpMessage message) { + expect(message.payloadAsString, equals("Test payload")); + message.ack(); + testCompleter.complete(); + })); + + // Using second client publish a message to the queue (request ack) + Channel channel2 = await client2.channel(); + Queue target = await channel2.queue(consumer.queue.name); + target.publish("Test payload", mandatory: true); return testCompleter.future; }); - test("reject delivered message", () { + test("reject delivered message", () async { Completer testCompleter = Completer(); - client - .channel() - .then((Channel channel) => channel.queue("test_3")) - .then((Queue testQueue) => testQueue.consume(noAck: false)) - .then((Consumer consumer) { - expect(consumer.channel, const TypeMatcher()); - expect(consumer.queue, const TypeMatcher()); - expect(consumer.tag, isNotEmpty); - - consumer.listen(expectAsync1((AmqpMessage message) { - expect(message.payloadAsString, equals("Test payload")); - message.reject(false); - testCompleter.complete(); - })); - - // Using second client publish a message to the queue (request ack) - client2 - .channel() - .then((Channel channel) => channel.queue(consumer.queue.name)) - .then((Queue target) => - target.publish("Test payload", mandatory: true)); - }); + Channel channel = await client.channel(); + Queue testQueue = await channel.queue("test_3"); + Consumer consumer = await testQueue.consume(noAck: false); + + expect(consumer.channel, const TypeMatcher()); + expect(consumer.queue, const TypeMatcher()); + expect(consumer.tag, isNotEmpty); + + consumer.listen(expectAsync1((AmqpMessage message) { + expect(message.payloadAsString, equals("Test payload")); + message.reject(false); + testCompleter.complete(); + })); + + // Using second client publish a message to the queue (request ack) + Channel channel2 = await client2.channel(); + Queue target = await channel2.queue(consumer.queue.name); + target.publish("Test payload", mandatory: true); return testCompleter.future; }); - test("queue cancel consumer", () { + test("queue cancel consumer", () async { Completer testCompleter = Completer(); - client - .channel() - .then((Channel channel) => channel.queue("test_3")) - .then((Queue testQueue) => testQueue.consume(noAck: false)) - .then((Consumer consumer) { - consumer.listen((AmqpMessage message) { - fail("Received unexpected AMQP message"); - }, onDone: () { - testCompleter.complete(); - }); - - // Cancel the consumer and wait for the stream controller to close - consumer.cancel(); + + Channel channel = await client.channel(); + Queue testQueue = await channel.queue("test_3"); + Consumer consumer = await testQueue.consume(noAck: false); + + consumer.listen((AmqpMessage message) { + fail("Received unexpected AMQP message"); + }, onDone: () { + testCompleter.complete(); }); + // Cancel the consumer and wait for the stream controller to close + await consumer.cancel(); + return testCompleter.future; }); - test("delete queue", () { - client - .channel() - .then((Channel channel) => channel.queue("test_3")) - .then((Queue testQueue) => testQueue.delete()) - .then(expectAsync1((Queue queue) {})); + test("delete queue", () async { + Channel channel = await client.channel(); + Queue testQueue = await channel.queue("test_3"); + await testQueue.delete(); }); test( "consuming with same consumer tag on same channel should return identical consumer", - () { - Completer testCompleter = Completer(); - - client - .channel() - .then((Channel channel) => channel.queue("test_3")) - .then( - (Queue testQueue) => testQueue.consume(consumerTag: "test_tag_1")) - .then((Consumer consumer1) { - consumer1.queue - .consume(consumerTag: "test_tag_1") - .then((Consumer consumer2) { - expect(true, identical(consumer1, consumer2)); - testCompleter.complete(); - }); - }); - - return testCompleter.future; + () async { + Channel channel = await client.channel(); + Queue testQueue = await channel.queue("test_3"); + Consumer consumer1 = await testQueue.consume(consumerTag: "test_tag_1"); + Consumer consumer2 = + await consumer1.queue.consume(consumerTag: "test_tag_1"); + + expect(true, identical(consumer1, consumer2)); }); - test("purge a queue", () { - return client - .channel() - .then((Channel channel) => channel.queue("test_4")) - .then((Queue queue) => queue.purge()); + test("purge a queue", () async { + Channel channel = await client.channel(); + Queue queue = await channel.queue("test_4"); + await queue.purge(); }); group("exceptions:", () { - test("unsupported message payload", () { - client - .channel() - .then((Channel channel) => channel.queue("test_99")) - .then((Queue queue) => queue.publish(StreamController())) - .catchError(expectAsync1((ex) { + test("unsupported message payload", () async { + Channel channel = await client.channel(); + Queue queue = await channel.queue("test_99"); + try { + queue.publish(StreamController()); + } catch (ex) { expect(ex, const TypeMatcher()); expect( ex.message, equals( "Message payload should be either a Map, an Iterable, a String or an UInt8List instance")); - })); + } }); test( "server closes channel after publishing message with invalid properties; next channel operation should fail", - () { - client - .channel() - .then((Channel channel) => channel.queue("test_100")) - .then((Queue queue) { + () async { + Channel channel = await client.channel(); + Queue queue = await channel.queue("test_100"); + try { queue.publish("invalid properties test", properties: MessageProperties()..expiration = "undefined"); - - return queue.channel.queue("other_queue"); - }).catchError(expectAsync1((ex) { + await queue.channel.queue("other_queue"); + } catch (ex) { expect(ex, const TypeMatcher()); expect( ex.toString(), equals( "ChannelException(PRECONDITION_FAILED): PRECONDITION_FAILED - invalid expiration 'undefined': no_integer")); - })); + } }); test( "trying to publish to a channel closed by a prior invalid published message; next publish should fail", - () { - Completer testCompleter = Completer(); - - client - .channel() - .then((Channel channel) => channel.queue("test_100")) - .then((Queue queue) { - queue.publish("invalid properties test", - properties: MessageProperties()..expiration = "undefined"); - - Future.delayed(const Duration(seconds: 1)).then((_) { - queue.publish("test"); - }).catchError(expectAsync1((ex) { - expect(ex, const TypeMatcher()); - expect( - ex.toString(), - equals( - "ChannelException(PRECONDITION_FAILED): PRECONDITION_FAILED - invalid expiration 'undefined': no_integer")); - - testCompleter.complete(); - })); - }); - - return testCompleter.future; + () async { + Channel channel = await client.channel(); + Queue queue = await channel.queue("test_100"); + queue.publish("invalid properties test", + properties: MessageProperties()..expiration = "undefined"); + try { + await Future.delayed(const Duration(seconds: 1)); + queue.publish("test"); + } catch (ex) { + expect(ex, const TypeMatcher()); + expect( + ex.toString(), + equals( + "ChannelException(PRECONDITION_FAILED): PRECONDITION_FAILED - invalid expiration 'undefined': no_integer")); + } }); }); }); diff --git a/tool/generate_bindings.dart b/tool/generate_bindings.dart index 1a3a4a6..b962122 100644 --- a/tool/generate_bindings.dart +++ b/tool/generate_bindings.dart @@ -118,17 +118,16 @@ part "protocol/io/amqp_message_decoder.dart"; }; Map _amqpCustomTypeToBasicType = {}; - Future _retrieveSchema(String schemaUrl) { + Future _retrieveSchema(String schemaUrl) async { logger.info("- Retrieving schema from ${schemaUrl}"); // Check for cached copy File cachedCopy = File(schemaUrl.split('/').last); - return cachedCopy.exists().then((bool exists) { - return exists ? cachedCopy.readAsString() : http.read(schemaUrl); - }).then((String data) { - logger.info("- Parsing schema"); - return xml.parse(data); - }); + bool exists = await cachedCopy.exists(); + String data = + exists ? await cachedCopy.readAsString() : await http.read(schemaUrl); + logger.info("- Parsing schema"); + return xml.parse(data); } String _parseMethod( @@ -475,8 +474,8 @@ part of dart_amqp.protocol; messageFile.writeAsStringSync(generatedMessageFactoryFile.toString()); } - void build(String schemaUrl) { - _retrieveSchema(schemaUrl).then(_parseSchema); + void build(String schemaUrl) async { + _parseSchema(await _retrieveSchema(schemaUrl)); } } From 1d34304b203397445a369d23f747fd3133d1a75a Mon Sep 17 00:00:00 2001 From: Alexandre Ardhuin Date: Thu, 21 Mar 2019 11:38:16 +0100 Subject: [PATCH 3/5] fix runtime type error --- tool/generate_bindings.dart | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tool/generate_bindings.dart b/tool/generate_bindings.dart index b962122..a41ef83 100644 --- a/tool/generate_bindings.dart +++ b/tool/generate_bindings.dart @@ -448,8 +448,8 @@ part of dart_amqp.protocol; void _parseSchema(xml.XmlDocument schema) { logger.info("- Processing custom domains"); schema.descendants - .where((xml.XmlNode node) => - node is xml.XmlElement && node.name.local == "domain") + .whereType() + .where((xml.XmlElement node) => node.name.local == "domain") .forEach(_parseDomain); logger.info("- Processing amqp classes"); From 50310dba8f38f5e1e02888a320fac0a2626a35ee Mon Sep 17 00:00:00 2001 From: Alexandre Ardhuin Date: Thu, 21 Mar 2019 11:48:11 +0100 Subject: [PATCH 4/5] update tool/generate_bindings.dart --- lib/src/protocol.dart | 2 +- lib/src/protocol/messages/bindings/basic.dart | 8 +++++--- lib/src/protocol/messages/bindings/channel.dart | 6 ++++-- .../protocol/messages/bindings/connection.dart | 6 ++++-- .../protocol/messages/bindings/exchange.dart | 8 +++++--- lib/src/protocol/messages/bindings/queue.dart | 8 +++++--- lib/src/protocol/messages/bindings/tx.dart | 10 ++++++---- lib/src/protocol/messages/message.dart | 2 +- tool/generate_bindings.dart | 17 +++++++++-------- 9 files changed, 40 insertions(+), 27 deletions(-) diff --git a/lib/src/protocol.dart b/lib/src/protocol.dart index 5263ecf..7564a32 100644 --- a/lib/src/protocol.dart +++ b/lib/src/protocol.dart @@ -1,6 +1,6 @@ // The file contains AMQP binding imports // -// File was auto-generated by generate_bindings.dart at 2015-09-24 14:33:26.633 +// File was auto-generated by generate_bindings.dart at 2019-03-21 11:46:06.772009 // // Do not modify diff --git a/lib/src/protocol/messages/bindings/basic.dart b/lib/src/protocol/messages/bindings/basic.dart index cb0972e..7c7ae4e 100644 --- a/lib/src/protocol/messages/bindings/basic.dart +++ b/lib/src/protocol/messages/bindings/basic.dart @@ -1,9 +1,11 @@ // The file contains all method messages for AMQP class Basic (id: 60) // -// File was auto-generated by generate_bindings.dart at 2015-09-24 14:33:27.116 +// File was auto-generated by generate_bindings.dart at 2019-03-21 11:46:08.137286 // // Do not modify +// ignore_for_file: empty_constructor_bodies + part of dart_amqp.protocol; class BasicQos implements Message { @@ -35,7 +37,7 @@ class BasicQosOk implements Message { // Message arguments - BasicQosOk.fromStream(TypeDecoder decoder); + BasicQosOk.fromStream(TypeDecoder decoder) {} void serialize(TypeEncoder encoder) {} } @@ -255,7 +257,7 @@ class BasicRecoverOk implements Message { // Message arguments - BasicRecoverOk.fromStream(TypeDecoder decoder); + BasicRecoverOk.fromStream(TypeDecoder decoder) {} void serialize(TypeEncoder encoder) {} } diff --git a/lib/src/protocol/messages/bindings/channel.dart b/lib/src/protocol/messages/bindings/channel.dart index 7f026c1..1e84de1 100644 --- a/lib/src/protocol/messages/bindings/channel.dart +++ b/lib/src/protocol/messages/bindings/channel.dart @@ -1,9 +1,11 @@ // The file contains all method messages for AMQP class Channel (id: 20) // -// File was auto-generated by generate_bindings.dart at 2015-09-24 14:33:27.101 +// File was auto-generated by generate_bindings.dart at 2019-03-21 11:46:08.126667 // // Do not modify +// ignore_for_file: empty_constructor_bodies + part of dart_amqp.protocol; class ChannelOpen implements Message { @@ -120,7 +122,7 @@ class ChannelCloseOk implements Message { // Message arguments ChannelCloseOk(); - ChannelCloseOk.fromStream(TypeDecoder decoder); + ChannelCloseOk.fromStream(TypeDecoder decoder) {} void serialize(TypeEncoder encoder) { encoder..writeUInt16(msgClassId)..writeUInt16(msgMethodId); } diff --git a/lib/src/protocol/messages/bindings/connection.dart b/lib/src/protocol/messages/bindings/connection.dart index 2a9d192..fb7d4c8 100644 --- a/lib/src/protocol/messages/bindings/connection.dart +++ b/lib/src/protocol/messages/bindings/connection.dart @@ -1,9 +1,11 @@ // The file contains all method messages for AMQP class Connection (id: 10) // -// File was auto-generated by generate_bindings.dart at 2015-09-24 14:33:27.081 +// File was auto-generated by generate_bindings.dart at 2019-03-21 11:46:08.114155 // // Do not modify +// ignore_for_file: empty_constructor_bodies + part of dart_amqp.protocol; class ConnectionStart implements Message { @@ -201,7 +203,7 @@ class ConnectionCloseOk implements Message { // Message arguments ConnectionCloseOk(); - ConnectionCloseOk.fromStream(TypeDecoder decoder); + ConnectionCloseOk.fromStream(TypeDecoder decoder) {} void serialize(TypeEncoder encoder) { encoder..writeUInt16(msgClassId)..writeUInt16(msgMethodId); } diff --git a/lib/src/protocol/messages/bindings/exchange.dart b/lib/src/protocol/messages/bindings/exchange.dart index e971489..0be3641 100644 --- a/lib/src/protocol/messages/bindings/exchange.dart +++ b/lib/src/protocol/messages/bindings/exchange.dart @@ -1,9 +1,11 @@ // The file contains all method messages for AMQP class Exchange (id: 40) // -// File was auto-generated by generate_bindings.dart at 2015-09-24 14:33:27.105 +// File was auto-generated by generate_bindings.dart at 2019-03-21 11:46:08.129595 // // Do not modify +// ignore_for_file: empty_constructor_bodies + part of dart_amqp.protocol; class ExchangeDeclare implements Message { @@ -43,7 +45,7 @@ class ExchangeDeclareOk implements Message { // Message arguments - ExchangeDeclareOk.fromStream(TypeDecoder decoder); + ExchangeDeclareOk.fromStream(TypeDecoder decoder) {} void serialize(TypeEncoder encoder) {} } @@ -78,7 +80,7 @@ class ExchangeDeleteOk implements Message { // Message arguments - ExchangeDeleteOk.fromStream(TypeDecoder decoder); + ExchangeDeleteOk.fromStream(TypeDecoder decoder) {} void serialize(TypeEncoder encoder) {} } diff --git a/lib/src/protocol/messages/bindings/queue.dart b/lib/src/protocol/messages/bindings/queue.dart index 2becdb5..fbffc30 100644 --- a/lib/src/protocol/messages/bindings/queue.dart +++ b/lib/src/protocol/messages/bindings/queue.dart @@ -1,9 +1,11 @@ // The file contains all method messages for AMQP class Queue (id: 50) // -// File was auto-generated by generate_bindings.dart at 2015-09-24 14:33:27.109 +// File was auto-generated by generate_bindings.dart at 2019-03-21 11:46:08.132436 // // Do not modify +// ignore_for_file: empty_constructor_bodies + part of dart_amqp.protocol; class QueueDeclare implements Message { @@ -88,7 +90,7 @@ class QueueBindOk implements Message { // Message arguments - QueueBindOk.fromStream(TypeDecoder decoder); + QueueBindOk.fromStream(TypeDecoder decoder) {} void serialize(TypeEncoder encoder) {} } @@ -126,7 +128,7 @@ class QueueUnbindOk implements Message { // Message arguments - QueueUnbindOk.fromStream(TypeDecoder decoder); + QueueUnbindOk.fromStream(TypeDecoder decoder) {} void serialize(TypeEncoder encoder) {} } diff --git a/lib/src/protocol/messages/bindings/tx.dart b/lib/src/protocol/messages/bindings/tx.dart index 2b24006..6f3cca5 100644 --- a/lib/src/protocol/messages/bindings/tx.dart +++ b/lib/src/protocol/messages/bindings/tx.dart @@ -1,9 +1,11 @@ // The file contains all method messages for AMQP class Tx (id: 90) // -// File was auto-generated by generate_bindings.dart at 2015-09-24 14:33:27.123 +// File was auto-generated by generate_bindings.dart at 2019-03-21 11:46:08.142771 // // Do not modify +// ignore_for_file: empty_constructor_bodies + part of dart_amqp.protocol; class TxSelect implements Message { @@ -27,7 +29,7 @@ class TxSelectOk implements Message { // Message arguments - TxSelectOk.fromStream(TypeDecoder decoder); + TxSelectOk.fromStream(TypeDecoder decoder) {} void serialize(TypeEncoder encoder) {} } @@ -53,7 +55,7 @@ class TxCommitOk implements Message { // Message arguments - TxCommitOk.fromStream(TypeDecoder decoder); + TxCommitOk.fromStream(TypeDecoder decoder) {} void serialize(TypeEncoder encoder) {} } @@ -79,7 +81,7 @@ class TxRollbackOk implements Message { // Message arguments - TxRollbackOk.fromStream(TypeDecoder decoder); + TxRollbackOk.fromStream(TypeDecoder decoder) {} void serialize(TypeEncoder encoder) {} } diff --git a/lib/src/protocol/messages/message.dart b/lib/src/protocol/messages/message.dart index 45f1bf5..b549a63 100644 --- a/lib/src/protocol/messages/message.dart +++ b/lib/src/protocol/messages/message.dart @@ -2,7 +2,7 @@ // and a factory constructor for unserializing AMQP messages // from incoming frames // -// File was auto-generated by generate_bindings.dart at 2015-09-24 14:33:26.633 +// File was auto-generated by generate_bindings.dart at 2019-03-21 11:46:06.771985 // // Do not modify diff --git a/tool/generate_bindings.dart b/tool/generate_bindings.dart index a41ef83..63294ad 100644 --- a/tool/generate_bindings.dart +++ b/tool/generate_bindings.dart @@ -48,7 +48,7 @@ abstract class Message { } // Message decoding failed; unknown message - throw new ArgumentError("Unknown message type (class: ${msgClassId}, method: ${msgMethodId})"); + throw ArgumentError("Unknown message type (class: ${msgClassId}, method: ${msgMethodId})"); } } """); @@ -160,7 +160,7 @@ part "protocol/io/amqp_message_decoder.dart"; if (implementedByClient) { generatedMessageFactoryFile.write(""" case $methodId: - return new ${className}${methodName}.fromStream( decoder ); + return ${className}${methodName}.fromStream(decoder); """); } @@ -176,7 +176,7 @@ class ${className}${methodName} implements Message { // Message arguments """); StringBuffer serializerMethod = StringBuffer(""" - void serialize( TypeEncoder encoder ) { + void serialize(TypeEncoder encoder) { encoder ..writeUInt16(msgClassId) ..writeUInt16(msgMethodId) @@ -189,7 +189,7 @@ class ${className}${methodName} implements Message { : ""); if (implementedByClient) { ctors.write(""" - ${className}${methodName}.fromStream( TypeDecoder decoder ){ + ${className}${methodName}.fromStream(TypeDecoder decoder) { """); } StringBuffer toStringMethod = StringBuffer(""" @@ -341,8 +341,7 @@ ${String.fromCharCodes(List.filled(className.length + methodName.length + 1 } else { // Write an empty serializer stub to avoid warnings generatedClass..write("\n\n")..write(""" - void serialize( TypeEncoder encoder ) { - } + void serialize(TypeEncoder encoder) {} """); } // ..write("\n") @@ -369,8 +368,8 @@ ${String.fromCharCodes(List.filled(className.length + methodName.length + 1 // Update message factory for the new class generatedMessageFactoryFile.write(""" - case ${classId}: // Class: ${className} - switch( msgMethodId ){ + case ${classId}: // Class: ${className} + switch (msgMethodId) { """); // Begin generation of method classes for this message class @@ -381,6 +380,8 @@ ${String.fromCharCodes(List.filled(className.length + methodName.length + 1 // // Do not modify +// ignore_for_file: empty_constructor_bodies + part of dart_amqp.protocol; """); From ba4c3102d4695017833689b64fc0f5a3637b4671 Mon Sep 17 00:00:00 2001 From: Alexandre Ardhuin Date: Fri, 22 Mar 2019 14:12:37 +0100 Subject: [PATCH 5/5] use pedantic-1.5.0 --- analysis_options.yaml | 20 +------------------- pubspec.yaml | 1 + 2 files changed, 2 insertions(+), 19 deletions(-) diff --git a/analysis_options.yaml b/analysis_options.yaml index 34dfe28..3ba64fa 100644 --- a/analysis_options.yaml +++ b/analysis_options.yaml @@ -1,28 +1,10 @@ +include: package:pedantic/analysis_options.yaml linter: rules: - - avoid_empty_else - - avoid_init_to_null - avoid_null_checks_in_equality_operators - - avoid_relative_lib_imports - - avoid_return_types_on_setters - avoid_returning_null_for_future - avoid_returning_null_for_void - - avoid_shadowing_type_parameters - - avoid_types_as_parameter_names - - empty_constructor_bodies - - no_duplicate_case_values - - null_closures - prefer_conditional_assignment - prefer_const_constructors - - prefer_contains - - prefer_equal_for_default_values - - prefer_is_empty - - prefer_is_not_empty - prefer_null_aware_operators - prefer_void_to_null - - recursive_getters - - slash_for_doc_comments - - unawaited_futures - - unrelated_type_equality_checks - - use_rethrow_when_possible - - valid_regexps \ No newline at end of file diff --git a/pubspec.yaml b/pubspec.yaml index b0e434a..d1ee3da 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -15,3 +15,4 @@ dev_dependencies: xml: ^3.2.0 coverage: ^0.12.2 coveralls: ^3.2.0 + pedantic: 1.5.0