Skip to content

Commit

Permalink
Merge pull request #61 from achilleasa/support-confirm
Browse files Browse the repository at this point in the history
Support publish confirmations
  • Loading branch information
achilleasa authored Oct 1, 2021
2 parents e869edf + 0f62a3a commit 41410bf
Show file tree
Hide file tree
Showing 15 changed files with 331 additions and 35 deletions.
79 changes: 46 additions & 33 deletions API.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,34 +60,47 @@ The following table summarizes the methods available to an AMQP client. For deta

The following table summarizes the methods available to an AMQP channel obtained via the `channel()` method of a `client` instance. For detailed documentation on each method and its arguments please consult the class [documentation](https://github.com/achilleasa/dart_amqp/blob/master/lib/src/client/channel.dart).

| Method | Description
|--------------------|------------------
| close() | Close the channel and abort any pending operations.
| queue() | Define a named queue.
| privateQueue() | Define a private queue with a random name that will be deleted when the channel closes.
| exchange() | Define an exchange that can be used to route messages to multiple recipients.
| qos() | Manage the QoS settings for the channel (prefetech size & count).
| ack() | Acknowledge a message by its id.
| select() | Begin a transaction.
| commit() | Commit a transaction.
| rollback() | Rollback a transaction.
| flow() | Control message flow.
| recover() | Recover unacknowledged messages.
| basicReturnListener()| Get a StreamSubscription for handling undelivered messages.
| Method | Description
|----------------------------|
| close() | Close the channel and abort any pending operations.
| queue() | Define a named queue.
| privateQueue() | Define a private queue with a random name that will be deleted when the channel closes.
| exchange() | Define an exchange that can be used to route messages to multiple recipients.
| qos() | Manage the QoS settings for the channel (prefetech size & count).
| ack() | Acknowledge a message by its id.
| select() | Begin a transaction.
| commit() | Commit a transaction.
| rollback() | Rollback a transaction.
| flow() | Control message flow.
| recover() | Recover unacknowledged messages.
| basicReturnListener() | Get a StreamSubscription for handling undelivered messages.
| confirmPublishedMessages() | Request that the broker ACKs/NACKs the handling of published messages.
| publishNotifier() | Register a listener for publish notifications emitted by the broker.

The `confirmPublishedMessages` and `publishNotifier` methods leverage the _publisher confirms_
[extension](https://www.rabbitmq.com/confirms.html#publisher-confirms).

Notifications obtained using this mechanism only guarantee that the broker has
either processed (persisted) a published message successfully or it has dropped
it (e.g. out of disk space). Applications should never assume that receiving an
ACK from the broker for a published message means that a consumer has
successfully processed the message. As demonstrated by [examples/confirm](https://github.com/achilleasa/dart_amqp/blob/master/examples/confirm),
the broker will ACK messages published to a queue even if there are no consumers listening
on the other end.

## Exchanges

The following table summarizes the methods available to an AMQP exchange declared via the the `exchange()` method of a `channel` instance. For detailed documentation on each method and its arguments please consult the class [documentation](https://github.com/achilleasa/dart_amqp/blob/master/lib/src/client/exchange.dart).

| Method | Description
|--------------------|------------------
| name() | A getter for the exchange name.
| type() | A getter for the exchange [type](https://github.com/achilleasa/dart_amqp/blob/master/lib/src/enums/exchange_type.dart).
| channel() | A getter for the [channel](#channels) where this exchange was declared.
| delete() | Delete the exchange.
| publish() | Publish message using a routing key
| Method | Description
|----------------------------|
| name() | A getter for the exchange name.
| type() | A getter for the exchange [type](https://github.com/achilleasa/dart_amqp/blob/master/lib/src/enums/exchange_type.dart).
| channel() | A getter for the [channel](#channels) where this exchange was declared.
| delete() | Delete the exchange.
| publish() | Publish message using a routing key
| bindPrivateQueueConsumer() | Convenience method that allocates a private [queue](#queues), binds it to the exchange via a routing key and returns a [consumer](#consumers) for processing messages.
| bindQueueConsumer() | Convenience method that allocates a named [queue](#queues), binds it to the exchange via a routing key and returns a [consumer](#consumers) for processing messages.
| bindQueueConsumer() | Convenience method that allocates a named [queue](#queues), binds it to the exchange via a routing key and returns a [consumer](#consumers) for processing messages.

## Queues

Expand Down Expand Up @@ -122,17 +135,17 @@ Queue consumers are essentially StreamControllers that emit a `Stream<AmqpMessag
payload of the incoming message as well as the incoming message properties
and provides helper methods for replying, ack-ing and rejecting messages. The following table summarizes the methods provided by `AmqpMessage`. For detailed documentation on each method and its arguments please consult the class [documentation](https://github.com/achilleasa/dart_amqp/blob/master/lib/src/client/amqp_message.dart).

| Method | Description
|--------------------|------------------
| payload() | A getter for retrieving the raw message paylaod as an Uint8List.
| payloadAsString() | A getter for retrieving the message payload as an UTF8 String.
| payloadAsJson() | A getter for retrieving the message payload as a parsed JSON document.
| exchangeName() | A getter for the [exchange](#exchanges) where the message was published.
| routingKey() | A getter for the routing key used for publishing this message.
| properties() | A getter for retrieving message [properties](https://github.com/achilleasa/dart_amqp/blob/master/lib/src/protocol/messages/message_properties.dart).
| ack() | Acknowledge this message.
| reply() | Reply to the message sender with a new message.
| reject() | Reject this message.
| Method | Description
|-------------------|
| payload() | A getter for retrieving the raw message paylaod as an Uint8List.
| payloadAsString() | A getter for retrieving the message payload as an UTF8 String.
| payloadAsJson() | A getter for retrieving the message payload as a parsed JSON document.
| exchangeName() | A getter for the [exchange](#exchanges) where the message was published.
| routingKey() | A getter for the routing key used for publishing this message.
| properties() | A getter for retrieving message [properties](https://github.com/achilleasa/dart_amqp/blob/master/lib/src/protocol/messages/message_properties.dart).
| ack() | Acknowledge this message.
| reply() | Reply to the message sender with a new message.
| reject() | Reject this message.

## Error handling

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Main features:
- supports PLAIN and AMQPLAIN authentication providers while other authentication schemes can be plugged in by implementing the appropriate interface.
- implements the entire 0.9.1 protocol specification (except basic get and recover-async)
- supports both plain-text and TLS connections
- supports publish confirmations

Things not working yet:
- the driver does not currently support recovering client topologies when re-establishing connections. This feature may be implemented in a future version.
Expand Down
30 changes: 30 additions & 0 deletions example/confirm/confirm.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import "dart:async";
import "package:dart_amqp/dart_amqp.dart";

void main() async {
Completer done = Completer();
Client client = Client();
Channel channel = await client.channel();
Queue queue = await channel.privateQueue();

// To work with publish confirmations we first need to enable support for
// confirmations on the channel used by our queue.
await queue.channel.confirmPublishedMessages();

// Then register a handler to process publish notifications.
queue.channel.publishNotifier((PublishNotification notification) {
Object? msg = notification.message;
String? corId = notification.properties?.corellationId;
bool ack = notification.published;
print(
" [!] received delivery notification: msg: '$msg', correlation ID: '$corId', ACK'd?: $ack");
done.complete();
});

MessageProperties msgProps = MessageProperties()..corellationId = "42";
queue.publish("Hello World!", properties: msgProps);
print(" [x] Sent 'Hello World!'; waiting for delivery confirmation");

await done.future;
await client.close();
}
3 changes: 2 additions & 1 deletion example/example.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,5 @@ functionality. If you need RPC support for your application you may want to cons

# Additional examples

The [example](https://github.com/achilleasa/dart_amqp/tree/master/example) folder contains implementations of the six RabbitMQ getting started [tutorials](https://www.rabbitmq.com/getstarted.html).
The [example](https://github.com/achilleasa/dart_amqp/tree/master/example) folder contains implementations of the six RabbitMQ getting started [tutorials](https://www.rabbitmq.com/getstarted.html)
plus an [extra](https://github.com/achilleasa/dart_amqp/tree/master/example/confirm) example of working with publish confirmations.
2 changes: 2 additions & 0 deletions lib/src/client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ part "client/consumer.dart";
part "client/exchange.dart";
part "client/amqp_message.dart";
part "client/basicreturn_message.dart";
part "client/publish_notification.dart";

// client implementations
part "client/impl/amqp_message_impl.dart";
Expand All @@ -31,3 +32,4 @@ part "client/impl/queue_impl.dart";
part "client/impl/consumer_impl.dart";
part "client/impl/exchange_impl.dart";
part "client/impl/basic_return_message_impl.dart";
part "client/impl/publish_notification_impl.dart";
23 changes: 23 additions & 0 deletions lib/src/client/channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,27 @@ abstract class Channel {
{Function onError,
void Function() onDone,
bool cancelOnError});

/// Register a listener to be notified when the broker ACKs or NACKs
/// published messages.
///
/// When publish confirmations have been enabled on the channel via
/// [confirmPublishedMessages]), the broker will either ACK each published
/// message to indicate that it has been successfully handled/queued for
/// deliver or NACK it to indicate that the message was lost (e.g. out of
/// disk space).
///
/// Note that receiving an ACK for a message does not guarantee that it has
/// been processed by one or more consumers. For example, when publishing to
/// a queue with no consumers, the broker will still ACK the message.
StreamSubscription<PublishNotification> publishNotifier(
void Function(PublishNotification notification) onData,
{Function onError,
void Function() onDone,
bool cancelOnError});

/// Request that from this point onwards, the broker must confirm whether it
/// has processed or dropped each message published to this channel. A
/// listener for these notifications can be registered via [publishNotifier].
Future confirmPublishedMessages();
}
87 changes: 87 additions & 0 deletions lib/src/client/impl/channel_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,21 @@ class _ChannelImpl implements Channel {
Exception? _channelCloseException;
final _basicReturnStream = StreamController<BasicReturnMessage>.broadcast();

// Support for delivery confirmations
late Map<int, _PublishNotificationImpl> _pendingDeliveries;
late int _nextPublishSeqNo;
final _publishNotificationStream =
StreamController<PublishNotification>.broadcast();

_ChannelImpl(this.channelId, this._client) {
_frameWriter = FrameWriter(_client.tuningSettings);
_pendingOperations = ListQueue<Completer>();
_pendingOperationPayloads = ListQueue<Object>();
_consumers = <String, _ConsumerImpl>{};

_pendingDeliveries = <int, _PublishNotificationImpl>{};
_nextPublishSeqNo = 0; // delivery confirmations are disabled

// If we are opening a user channel signal to the server; otherwise perform connection handshake
if (channelId > 0) {
_channelOpened = Completer<Channel>();
Expand Down Expand Up @@ -70,6 +79,14 @@ class _ChannelImpl implements Channel {
_pendingOperationPayloads.addLast(futurePayload ?? true);
}

// If this is a publish request and delivery confirmations are enabled
// add it to the pending delivery list.
if (message is BasicPublish && _nextPublishSeqNo > 0) {
_pendingDeliveries[_nextPublishSeqNo] =
_PublishNotificationImpl(payloadContent, properties, false);
_nextPublishSeqNo++;
}

_frameWriter
..writeMessage(channelId, message,
properties: properties, payloadContent: payloadContent)
Expand Down Expand Up @@ -235,6 +252,12 @@ class _ChannelImpl implements Channel {
case BasicRecoverOk:
_completeOperation(serverMessage.message);
break;
case ConfirmSelectOk:
// When confirmed deliveries get enabled, we use increasing sequence
// numbers (starting from 1) to match published messages to ACKs/NACKs.
_nextPublishSeqNo = 1;
_completeOperation(serverMessage.message);
break;
// Queues
case QueueDeclareOk:
QueueDeclareOk serverResponse = serverMessage.message as QueueDeclareOk;
Expand Down Expand Up @@ -294,6 +317,17 @@ class _ChannelImpl implements Channel {
case ExchangeDeleteOk:
_completeOperation(serverMessage.message);
break;
// Confirmations
case BasicAck:
BasicAck serverResponse = (serverMessage.message as BasicAck);
_handlePublishConfirmation(
serverResponse.deliveryTag, true, serverResponse.multiple);
break;
case BasicNack:
BasicNack serverResponse = (serverMessage.message as BasicNack);
_handlePublishConfirmation(
serverResponse.deliveryTag, false, serverResponse.multiple);
break;
}
}

Expand Down Expand Up @@ -561,4 +595,57 @@ class _ChannelImpl implements Channel {
writeMessage(recoverRequest, completer: opCompleter, futurePayload: this);
return opCompleter.future;
}

@override
StreamSubscription<PublishNotification> publishNotifier(
void Function(PublishNotification notification) onData,
{Function? onError,
void Function()? onDone,
bool cancelOnError = false}) =>
_publishNotificationStream.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);

@override
Future confirmPublishedMessages() {
Completer opCompleter = Completer();
if (_nextPublishSeqNo > 0) {
opCompleter.complete(); // already enabled
} else {
ConfirmSelect confirmRequest = ConfirmSelect();
writeMessage(confirmRequest, completer: opCompleter);
}
return opCompleter.future;
}

void _handlePublishConfirmation(int seqNo, bool ack, bool multipleMessages) {
if (!multipleMessages) {
// Ack/Nack specific seqNo
_PublishNotificationImpl? notification = _pendingDeliveries.remove(seqNo);
if (notification != null &&
_publishNotificationStream.hasListener &&
!_publishNotificationStream.isClosed) {
notification.published = ack;
_publishNotificationStream.add(notification);
}
return;
}

// Multi Ack/Nack; messages up to seqNo
for (var pendingSeqNo in _pendingDeliveries.keys) {
if (pendingSeqNo > seqNo) {
// only interested in keys up to pendingSeqNo
break;
}

_PublishNotificationImpl? notification = _pendingDeliveries.remove(seqNo);
if (notification != null &&
_publishNotificationStream.hasListener &&
!_publishNotificationStream.isClosed) {
notification.published = ack;
_publishNotificationStream.add(notification);
}
}

return;
}
}
12 changes: 12 additions & 0 deletions lib/src/client/impl/publish_notification_impl.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
part of dart_amqp.client;

class _PublishNotificationImpl implements PublishNotification {
@override
final Object? message;
@override
final MessageProperties? properties;
@override
bool published;

_PublishNotificationImpl(this.message, this.properties, this.published);
}
7 changes: 7 additions & 0 deletions lib/src/client/publish_notification.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
part of dart_amqp.client;

abstract class PublishNotification {
Object? get message;
MessageProperties? get properties;
bool get published;
}
1 change: 1 addition & 0 deletions lib/src/protocol.dart
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,4 @@ part "protocol/messages/bindings/exchange.dart";
part "protocol/messages/bindings/queue.dart";
part "protocol/messages/bindings/basic.dart";
part "protocol/messages/bindings/tx.dart";
part "protocol/messages/bindings/confirm.dart";
38 changes: 38 additions & 0 deletions lib/src/protocol/messages/bindings/basic.dart
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,44 @@ class BasicAck implements Message {

BasicAck();

BasicAck.fromStream(TypeDecoder decoder) {
deliveryTag = decoder.readUInt64();
int _bitmask;
_bitmask = decoder.readUInt8();
multiple = _bitmask & 0x1 != 0;
}

@override
void serialize(TypeEncoder encoder) {
encoder
..writeUInt16(msgClassId)
..writeUInt16(msgMethodId)
..writeUInt64(deliveryTag)
..writeBits([multiple]);
}
}

class BasicNack implements Message {
@override
final bool msgHasContent = false;
@override
final int msgClassId = 60;
@override
final int msgMethodId = 120;

// Message arguments
int deliveryTag = 0;
bool multiple = false;

BasicNack();

BasicNack.fromStream(TypeDecoder decoder) {
deliveryTag = decoder.readUInt64();
int _bitmask;
_bitmask = decoder.readUInt8();
multiple = _bitmask & 0x1 != 0;
}

@override
void serialize(TypeEncoder encoder) {
encoder
Expand Down
Loading

0 comments on commit 41410bf

Please sign in to comment.