diff --git a/lib/src/client/impl/channel_impl.dart b/lib/src/client/impl/channel_impl.dart index 8a6d0f6..96e20e7 100644 --- a/lib/src/client/impl/channel_impl.dart +++ b/lib/src/client/impl/channel_impl.dart @@ -68,13 +68,14 @@ class _ChannelImpl implements Channel { {MessageProperties? properties, Object? payloadContent, Completer? completer, - Object? futurePayload}) { + Object? futurePayload, + bool noWait = false}) { if (_channelClosed != null && (_channelClosed != completer)) { throw _channelCloseException ?? StateError("Channel has been closed"); } - // If an op completer is specified add it to the queue - if (completer != null) { + // If an op completer is specified add it to the queue unless noWait is set. + if (completer != null && !noWait) { _pendingOperations.addLast(completer); _pendingOperationPayloads.addLast(futurePayload ?? true); } @@ -91,6 +92,12 @@ class _ChannelImpl implements Channel { ..writeMessage(channelId, message, properties: properties, payloadContent: payloadContent) ..pipe(_client._socket!); + + // If the noWait flag was specified, complete the future now. The broken + // will raise any errors asynchronously via the channel or connection. + if (completer != null && noWait) { + completer.complete(futurePayload ?? true); + } } /// Implement the handshake flow specified by the AMQP spec by @@ -474,7 +481,9 @@ class _ChannelImpl implements Channel { Completer opCompleter = Completer(); writeMessage(queueRequest, - completer: opCompleter, futurePayload: _QueueImpl(this, name)); + completer: opCompleter, + futurePayload: _QueueImpl(this, name), + noWait: noWait); return opCompleter.future; } @@ -493,7 +502,9 @@ class _ChannelImpl implements Channel { Completer opCompleter = Completer(); writeMessage(queueRequest, - completer: opCompleter, futurePayload: _QueueImpl(this, "")); + completer: opCompleter, + futurePayload: _QueueImpl(this, ""), + noWait: noWait); return opCompleter.future; } @@ -519,7 +530,9 @@ class _ChannelImpl implements Channel { Completer opCompleter = Completer(); writeMessage(exchangeRequest, - completer: opCompleter, futurePayload: _ExchangeImpl(this, name, type)); + completer: opCompleter, + futurePayload: _ExchangeImpl(this, name, type), + noWait: noWait); return opCompleter.future; } diff --git a/lib/src/client/impl/consumer_impl.dart b/lib/src/client/impl/consumer_impl.dart index 9768406..d288a44 100644 --- a/lib/src/client/impl/consumer_impl.dart +++ b/lib/src/client/impl/consumer_impl.dart @@ -36,7 +36,7 @@ class _ConsumerImpl implements Consumer { Completer completer = Completer(); channel.writeMessage(cancelRequest, - completer: completer, futurePayload: this); + completer: completer, futurePayload: this, noWait: noWait); completer.future.then((_) => _controller.close()); return completer.future; } diff --git a/lib/src/client/impl/exchange_impl.dart b/lib/src/client/impl/exchange_impl.dart index 767ed68..7b0a255 100644 --- a/lib/src/client/impl/exchange_impl.dart +++ b/lib/src/client/impl/exchange_impl.dart @@ -22,7 +22,7 @@ class _ExchangeImpl implements Exchange { Completer completer = Completer(); channel.writeMessage(deleteRequest, - completer: completer, futurePayload: this); + completer: completer, futurePayload: this, noWait: noWait); return completer.future; } diff --git a/lib/src/client/impl/queue_impl.dart b/lib/src/client/impl/queue_impl.dart index f6a9c7a..738ec2f 100644 --- a/lib/src/client/impl/queue_impl.dart +++ b/lib/src/client/impl/queue_impl.dart @@ -32,7 +32,7 @@ class _QueueImpl implements Queue { Completer completer = Completer(); channel.writeMessage(deleteRequest, - completer: completer, futurePayload: this); + completer: completer, futurePayload: this, noWait: noWait); return completer.future; } @@ -45,7 +45,7 @@ class _QueueImpl implements Queue { Completer completer = Completer(); channel.writeMessage(purgeRequest, - completer: completer, futurePayload: this); + completer: completer, futurePayload: this, noWait: noWait); return completer.future; } @@ -76,7 +76,7 @@ class _QueueImpl implements Queue { Completer completer = Completer(); channel.writeMessage(bindRequest, - completer: completer, futurePayload: this); + completer: completer, futurePayload: this, noWait: noWait); return completer.future; } @@ -103,7 +103,7 @@ class _QueueImpl implements Queue { Completer completer = Completer(); channel.writeMessage(unbindRequest, - completer: completer, futurePayload: this); + completer: completer, futurePayload: this, noWait: noWait); return completer.future; } @@ -150,7 +150,9 @@ class _QueueImpl implements Queue { Completer completer = Completer(); channel.writeMessage(consumeRequest, - completer: completer, futurePayload: _ConsumerImpl(channel, this, "")); + completer: completer, + futurePayload: _ConsumerImpl(channel, this, ""), + noWait: noWait); return completer.future; } diff --git a/test/lib/queue_test.dart b/test/lib/queue_test.dart index c5ba9aa..22bba0c 100644 --- a/test/lib/queue_test.dart +++ b/test/lib/queue_test.dart @@ -82,6 +82,30 @@ main({bool enableLogger = true}) { return Future.wait([client.close(), client2.close()]); }); + test("queue message delivery when using noWait", () async { + Completer testCompleter = Completer(); + + Channel channel = await client.channel(); + Queue testQueue = await channel.queue("test_2", noWait: true); + Consumer consumer = await testQueue.consume(); + + expect(consumer.channel, const TypeMatcher()); + expect(consumer.queue, const TypeMatcher()); + expect(consumer.tag, isNotEmpty); + + consumer.listen(expectAsync1((AmqpMessage message) { + expect(message.payloadAsString, equals("Test payload")); + testCompleter.complete(); + })); + + // Using second client publish a message to the queue + Channel channel2 = await client2.channel(); + Queue target = await channel2.queue(consumer.queue.name, noWait: true); + target.publish("Test payload"); + + return testCompleter.future; + }); + test("queue message delivery", () async { Completer testCompleter = Completer();