Skip to content

Commit

Permalink
Ensure that calls with a noWait=true flag complete returned Futures
Browse files Browse the repository at this point in the history
When an application ivokes any of the library methods with the noWait
flag set, the broker will not reply back with a message to confirm that the
request (e.g. declare a queue, exchange etc.) was successful. As a
result the returned Future values where never completed thus causing the
callers to hang.

When using the noWait flag, the broker will report errors asynchronously
via the channel or connection instance.
  • Loading branch information
achilleasa committed Oct 1, 2021
1 parent 41410bf commit ac86799
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 13 deletions.
25 changes: 19 additions & 6 deletions lib/src/client/impl/channel_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,14 @@ class _ChannelImpl implements Channel {
{MessageProperties? properties,
Object? payloadContent,
Completer? completer,
Object? futurePayload}) {
Object? futurePayload,
bool noWait = false}) {
if (_channelClosed != null && (_channelClosed != completer)) {
throw _channelCloseException ?? StateError("Channel has been closed");
}

// If an op completer is specified add it to the queue
if (completer != null) {
// If an op completer is specified add it to the queue unless noWait is set.
if (completer != null && !noWait) {
_pendingOperations.addLast(completer);
_pendingOperationPayloads.addLast(futurePayload ?? true);
}
Expand All @@ -91,6 +92,12 @@ class _ChannelImpl implements Channel {
..writeMessage(channelId, message,
properties: properties, payloadContent: payloadContent)
..pipe(_client._socket!);

// If the noWait flag was specified, complete the future now. The broken
// will raise any errors asynchronously via the channel or connection.
if (completer != null && noWait) {
completer.complete(futurePayload ?? true);
}
}

/// Implement the handshake flow specified by the AMQP spec by
Expand Down Expand Up @@ -474,7 +481,9 @@ class _ChannelImpl implements Channel {

Completer<Queue> opCompleter = Completer<Queue>();
writeMessage(queueRequest,
completer: opCompleter, futurePayload: _QueueImpl(this, name));
completer: opCompleter,
futurePayload: _QueueImpl(this, name),
noWait: noWait);
return opCompleter.future;
}

Expand All @@ -493,7 +502,9 @@ class _ChannelImpl implements Channel {

Completer<Queue> opCompleter = Completer<Queue>();
writeMessage(queueRequest,
completer: opCompleter, futurePayload: _QueueImpl(this, ""));
completer: opCompleter,
futurePayload: _QueueImpl(this, ""),
noWait: noWait);
return opCompleter.future;
}

Expand All @@ -519,7 +530,9 @@ class _ChannelImpl implements Channel {

Completer<Exchange> opCompleter = Completer<Exchange>();
writeMessage(exchangeRequest,
completer: opCompleter, futurePayload: _ExchangeImpl(this, name, type));
completer: opCompleter,
futurePayload: _ExchangeImpl(this, name, type),
noWait: noWait);
return opCompleter.future;
}

Expand Down
2 changes: 1 addition & 1 deletion lib/src/client/impl/consumer_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class _ConsumerImpl implements Consumer {

Completer<Consumer> completer = Completer<Consumer>();
channel.writeMessage(cancelRequest,
completer: completer, futurePayload: this);
completer: completer, futurePayload: this, noWait: noWait);
completer.future.then((_) => _controller.close());
return completer.future;
}
Expand Down
2 changes: 1 addition & 1 deletion lib/src/client/impl/exchange_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class _ExchangeImpl implements Exchange {

Completer<Exchange> completer = Completer<Exchange>();
channel.writeMessage(deleteRequest,
completer: completer, futurePayload: this);
completer: completer, futurePayload: this, noWait: noWait);
return completer.future;
}

Expand Down
12 changes: 7 additions & 5 deletions lib/src/client/impl/queue_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class _QueueImpl implements Queue {

Completer<Queue> completer = Completer<Queue>();
channel.writeMessage(deleteRequest,
completer: completer, futurePayload: this);
completer: completer, futurePayload: this, noWait: noWait);
return completer.future;
}

Expand All @@ -45,7 +45,7 @@ class _QueueImpl implements Queue {

Completer<Queue> completer = Completer<Queue>();
channel.writeMessage(purgeRequest,
completer: completer, futurePayload: this);
completer: completer, futurePayload: this, noWait: noWait);
return completer.future;
}

Expand Down Expand Up @@ -76,7 +76,7 @@ class _QueueImpl implements Queue {

Completer<Queue> completer = Completer<Queue>();
channel.writeMessage(bindRequest,
completer: completer, futurePayload: this);
completer: completer, futurePayload: this, noWait: noWait);
return completer.future;
}

Expand All @@ -103,7 +103,7 @@ class _QueueImpl implements Queue {

Completer<Queue> completer = Completer<Queue>();
channel.writeMessage(unbindRequest,
completer: completer, futurePayload: this);
completer: completer, futurePayload: this, noWait: noWait);
return completer.future;
}

Expand Down Expand Up @@ -150,7 +150,9 @@ class _QueueImpl implements Queue {

Completer<Consumer> completer = Completer<Consumer>();
channel.writeMessage(consumeRequest,
completer: completer, futurePayload: _ConsumerImpl(channel, this, ""));
completer: completer,
futurePayload: _ConsumerImpl(channel, this, ""),
noWait: noWait);
return completer.future;
}

Expand Down
24 changes: 24 additions & 0 deletions test/lib/queue_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,30 @@ main({bool enableLogger = true}) {
return Future.wait([client.close(), client2.close()]);
});

test("queue message delivery when using noWait", () async {
Completer testCompleter = Completer();

Channel channel = await client.channel();
Queue testQueue = await channel.queue("test_2", noWait: true);
Consumer consumer = await testQueue.consume();

expect(consumer.channel, const TypeMatcher<Channel>());
expect(consumer.queue, const TypeMatcher<Queue>());
expect(consumer.tag, isNotEmpty);

consumer.listen(expectAsync1((AmqpMessage message) {
expect(message.payloadAsString, equals("Test payload"));
testCompleter.complete();
}));

// Using second client publish a message to the queue
Channel channel2 = await client2.channel();
Queue target = await channel2.queue(consumer.queue.name, noWait: true);
target.publish("Test payload");

return testCompleter.future;
});

test("queue message delivery", () async {
Completer testCompleter = Completer();

Expand Down

0 comments on commit ac86799

Please sign in to comment.