diff --git a/sdk/servicebus/service-bus/review/service-bus.api.md b/sdk/servicebus/service-bus/review/service-bus.api.md index 99c1422a21cf..2a266dbb4412 100644 --- a/sdk/servicebus/service-bus/review/service-bus.api.md +++ b/sdk/servicebus/service-bus/review/service-bus.api.md @@ -4,7 +4,6 @@ ```ts -import { AmqpMessage } from '@azure/core-amqp'; import { delay } from '@azure/core-amqp'; import { Delivery } from 'rhea-promise'; import { HttpOperationResponse } from '@azure/core-http'; @@ -22,6 +21,51 @@ import { UserAgentOptions } from '@azure/core-http'; import { WebSocketImpl } from 'rhea-promise'; import { WebSocketOptions } from '@azure/core-amqp'; +// @public +export interface AmqpAnnotatedMessage { + applicationProperties?: { + [key: string]: any; + }; + body: any; + deliveryAnnotations?: { + [key: string]: any; + }; + footer?: { + [key: string]: any; + }; + header?: AmqpMessageHeader; + messageAnnotations?: { + [key: string]: any; + }; + properties?: AmqpMessageProperties; +} + +// @public +export interface AmqpMessageHeader { + deliveryCount?: number; + durable?: boolean; + firstAcquirer?: boolean; + priority?: number; + timeToLive?: number; +} + +// @public +export interface AmqpMessageProperties { + absoluteExpiryTime?: number; + contentEncoding?: string; + contentType?: string; + correlationId?: string | number | Buffer; + creationTime?: number; + groupId?: string; + groupSequence?: number; + messageId?: string | number | Buffer; + replyTo?: string; + replyToGroupId?: string; + subject?: string; + to?: string; + userId?: string; +} + // @public export type AuthorizationRule = { claimType: string; @@ -226,7 +270,7 @@ export interface QueueRuntimePropertiesResponse extends QueueRuntimeProperties, // @public export interface ReceivedMessage extends ServiceBusMessage { - readonly _amqpMessage: AmqpMessage; + readonly _amqpAnnotatedMessage: AmqpAnnotatedMessage; readonly deadLetterErrorDescription?: string; readonly deadLetterReason?: string; readonly deadLetterSource?: string; diff --git a/sdk/servicebus/service-bus/src/index.ts b/sdk/servicebus/service-bus/src/index.ts index e1776c50e3e4..53cdfbb5be3e 100644 --- a/sdk/servicebus/service-bus/src/index.ts +++ b/sdk/servicebus/service-bus/src/index.ts @@ -67,6 +67,9 @@ export { } from "./serviceBusAtomManagementClient"; export { ServiceBusClient } from "./serviceBusClient"; export { + AmqpAnnotatedMessage, + AmqpMessageHeader, + AmqpMessageProperties, DeadLetterOptions, ReceivedMessage, ReceivedMessageWithLock, diff --git a/sdk/servicebus/service-bus/src/serviceBusMessage.ts b/sdk/servicebus/service-bus/src/serviceBusMessage.ts index c0071fecf16b..fb0d863ba7e5 100644 --- a/sdk/servicebus/service-bus/src/serviceBusMessage.ts +++ b/sdk/servicebus/service-bus/src/serviceBusMessage.ts @@ -3,7 +3,14 @@ import Long from "long"; import { Delivery, DeliveryAnnotations, MessageAnnotations, uuid_to_string } from "rhea-promise"; -import { AmqpMessage, Constants, ErrorNameConditionMapper, translate } from "@azure/core-amqp"; +import { + AmqpMessage, + Constants, + ErrorNameConditionMapper, + MessageHeader, + MessageProperties, + translate +} from "@azure/core-amqp"; import * as log from "./log"; import { ConnectionContext } from "./connectionContext"; import { reorderLockToken } from "./util/utils"; @@ -225,6 +232,133 @@ export interface ServiceBusMessage { properties?: { [key: string]: number | boolean | string | Date }; } +/** + * Describes the AmqpAnnotatedMessage, part of the ReceivedMessage(as `amqpAnnotatedMessage` property). + */ +export interface AmqpAnnotatedMessage { + /** + * Describes the defined set of standard header properties of the message. + */ + header?: AmqpMessageHeader; + /** + * Describes set of footer properties of the message. + */ + footer?: { [key: string]: any }; + /** + * A dictionary containing message attributes that will be held in the message header + */ + messageAnnotations?: { [key: string]: any }; + /** + * A dictionary used for delivery-specific + * non-standard properties at the head of the message. + */ + deliveryAnnotations?: { [key: string]: any }; + /** + * A dictionary containing application specific message properties. + */ + applicationProperties?: { [key: string]: any }; + /** + * Describes the defined set of standard properties of the message. + */ + properties?: AmqpMessageProperties; + /** + * The message body. + */ + body: any; +} + +/** + * Describes the defined set of standard header properties of the message. + */ +export interface AmqpMessageHeader { + /** + * If this value is true, then this message has not been + * acquired by any other link. Ifthis value is false, then this message MAY have previously + * been acquired by another link or links. + */ + firstAcquirer?: boolean; + /** + * The number of prior unsuccessful delivery attempts. + */ + deliveryCount?: number; + /** + * Time to live in milli seconds. + */ + timeToLive?: number; + /** + * Specifies durability requirements. + */ + durable?: boolean; + /** + * The relative message priority. Higher numbers indicate higher + * priority messages. + */ + priority?: number; +} + +/** + * Describes the defined set of standard properties of the message. + */ +export interface AmqpMessageProperties { + /** + * The application message identifier that uniquely idenitifes a message. + * The user is responsible for making sure that this is unique in + * the given context. Guids usually make a good fit. + */ + messageId?: string | number | Buffer; + /** + * The address of the node the message is destined for. + */ + to?: string; + /** + * The id that can be used to mark or + * identify messages between clients. + */ + correlationId?: string | number | Buffer; + /** + * MIME type for the message. + */ + contentType?: string; + /** + * The content-encoding property is used as a modifier to the content-type. + * When present, its valueindicates what additional content encodings have + * been applied to theapplication-data. + */ + contentEncoding?: string; + /** + * The time when this message is considered expired. + */ + absoluteExpiryTime?: number; + /** + * The time this message was created. + */ + creationTime?: number; + /** + * The group this message belongs to. + */ + groupId?: string; + /** + * The sequence number of this message with its group. + */ + groupSequence?: number; + /** + * The address of the node to send replies to. + */ + replyTo?: string; + /** + * The group the reply message belongs to. + */ + replyToGroupId?: string; + /** + * A common field for summary information about the message content and purpose. + */ + subject?: string; + /** + * The identity of the user responsible for producing the message. + */ + userId?: string; +} + /** * @internal * @ignore @@ -445,7 +579,7 @@ export interface ReceivedMessage extends ServiceBusMessage { * @property {AmqpMessage} _amqpMessage The underlying raw amqp message. * @readonly */ - readonly _amqpMessage: AmqpMessage; + readonly _amqpAnnotatedMessage: AmqpAnnotatedMessage; } /** @@ -661,20 +795,20 @@ export function fromAmqpMessage( } const rcvdsbmsg: ReceivedMessage = { - _amqpMessage: msg, + _amqpAnnotatedMessage: toAmqpAnnotatedMessage(msg), _delivery: delivery, deliveryCount: msg.delivery_count, lockToken: delivery && delivery.tag && delivery.tag.length !== 0 ? uuid_to_string( - shouldReorderLockToken === true - ? reorderLockToken( - typeof delivery.tag === "string" ? Buffer.from(delivery.tag) : delivery.tag - ) - : typeof delivery.tag === "string" + shouldReorderLockToken === true + ? reorderLockToken( + typeof delivery.tag === "string" ? Buffer.from(delivery.tag) : delivery.tag + ) + : typeof delivery.tag === "string" ? Buffer.from(delivery.tag) : delivery.tag - ) + ) : undefined, ...sbmsg, ...props, @@ -686,6 +820,26 @@ export function fromAmqpMessage( return rcvdsbmsg; } +/** + * Takes AmqpMessage(type from "rhea") and returns it in the AmqpAnnotatedMessage format. + * + * @export + * @param {AmqpMessage} msg + * @returns {AmqpAnnotatedMessage} + */ +export function toAmqpAnnotatedMessage(msg: AmqpMessage): AmqpAnnotatedMessage { + const messageHeader = MessageHeader.fromAmqpMessageHeader(msg); + return { + header: { ...messageHeader, timeToLive: messageHeader.ttl }, + footer: (msg as any).footer, + messageAnnotations: msg.message_annotations, + deliveryAnnotations: msg.delivery_annotations, + applicationProperties: msg.application_properties, + properties: MessageProperties.fromAmqpMessageProperties(msg), + body: msg.body + }; +} + /** * @internal * @ignore @@ -871,10 +1025,10 @@ export class ServiceBusMessageImpl implements ReceivedMessageWithLock { */ readonly delivery: Delivery; /** - * @property {AmqpMessage} _amqpMessage The underlying raw amqp message. + * @property {AmqpMessage} _amqpAnnotatedMessage The underlying raw amqp annotated message. * @readonly */ - readonly _amqpMessage: AmqpMessage; + readonly _amqpAnnotatedMessage: AmqpAnnotatedMessage; /** * @property The reason for deadlettering the message. * @readonly @@ -913,7 +1067,7 @@ export class ServiceBusMessageImpl implements ReceivedMessageWithLock { if (msg.body) { this.body = this._context.dataTransformer.decode(msg.body); } - this._amqpMessage = msg; + this._amqpAnnotatedMessage = toAmqpAnnotatedMessage(msg); this.delivery = delivery; } diff --git a/sdk/servicebus/service-bus/test/internal/serviceBusMessage.spec.ts b/sdk/servicebus/service-bus/test/internal/serviceBusMessage.spec.ts new file mode 100644 index 000000000000..6149830dc873 --- /dev/null +++ b/sdk/servicebus/service-bus/test/internal/serviceBusMessage.spec.ts @@ -0,0 +1,209 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { ServiceBusMessageImpl, InternalReceiveMode } from "../../src/serviceBusMessage"; +import { ConnectionContext } from "../../src/connectionContext"; +import { Delivery, uuid_to_string, MessageAnnotations, DeliveryAnnotations } from "rhea-promise"; +import chai from "chai"; +import { AmqpMessage, Constants } from "@azure/core-amqp"; +const assert = chai.assert; + +const fakeContext = { + dataTransformer: { + encode: (data) => data, + decode: (data) => data + } +} as ConnectionContext; +const fakeEntityPath = "dummy"; +const fakeDelivery = {} as Delivery; + +describe("ServiceBusMessageImpl LockToken unit tests", () => { + const message_annotations: MessageAnnotations = {}; + message_annotations[Constants.enqueuedTime] = Date.now(); + const amqpMessage: AmqpMessage = { + body: "hello", + message_annotations + }; + + const fakeDeliveryTag = new Buffer(16); + for (let i = 0; i < fakeDeliveryTag.length; i++) { + fakeDeliveryTag[i] = Math.floor(Math.random() * 255); + } + const expectedLockToken = uuid_to_string(fakeDeliveryTag); + + it("Lock token in peekLock mode", () => { + const sbMessage = new ServiceBusMessageImpl( + fakeContext, + fakeEntityPath, + amqpMessage, + { tag: fakeDeliveryTag } as Delivery, + false, + InternalReceiveMode.peekLock + ); + + assert.equal(sbMessage.lockToken, expectedLockToken, "Unexpected lock token found"); + }); + + it("Lock token in receiveAndDelete mode", () => { + const sbMessage = new ServiceBusMessageImpl( + fakeContext, + fakeEntityPath, + amqpMessage, + { tag: fakeDeliveryTag } as Delivery, + false, + InternalReceiveMode.receiveAndDelete + ); + + assert.equal(!!sbMessage.lockToken, false, "Unexpected lock token found"); + }); +}); + +describe("ServiceBusMessageImpl AmqpAnnotations unit tests", () => { + const message_annotations: MessageAnnotations = {}; + message_annotations[Constants.enqueuedTime] = Date.now(); + message_annotations[Constants.partitionKey] = "dummy-partition-key"; + message_annotations[Constants.viaPartitionKey] = "dummy-via-partition-key"; + message_annotations["random-msg-annotation-key"] = "random-msg-annotation-value"; + + const delivery_annotations: DeliveryAnnotations = { + delivery_annotations_one: "delivery_annotations_one_value", + delivery_annotations_two: "delivery_annotations_two_value", + delivery_annotations_three: "delivery_annotations_three_value" + }; + + const amqpMessage: AmqpMessage = { + body: "hello", + message_annotations, + delivery_annotations, + delivery_count: 2, + first_acquirer: true, + ttl: 123456, + durable: true, + priority: 9876, + message_id: "random_messageId", + reply_to: "random_replyTo", + to: "random_to", + correlation_id: "random_correlationId", + content_type: "random_contentType", + content_encoding: "random_contentEncoding", + absolute_expiry_time: 123908745, + creation_time: 45612349, + group_id: "random_groupId", + group_sequence: 98723560, + reply_to_group_id: "random_replyToGroupId", + subject: "random_subject", + user_id: "random_user_id" + }; + + const sbMessage = new ServiceBusMessageImpl( + fakeContext, + fakeEntityPath, + amqpMessage, + fakeDelivery, + false, + InternalReceiveMode.peekLock + ); + + it("headers match", () => { + assert.equal(sbMessage._amqpAnnotatedMessage.header?.firstAcquirer, amqpMessage.first_acquirer); + assert.equal(sbMessage._amqpAnnotatedMessage.header?.timeToLive, amqpMessage.ttl); + assert.equal(sbMessage._amqpAnnotatedMessage.header?.durable, amqpMessage.durable); + assert.equal(sbMessage._amqpAnnotatedMessage.header?.priority, amqpMessage.priority); + assert.equal(sbMessage._amqpAnnotatedMessage.header?.deliveryCount, amqpMessage.delivery_count); + + assert.equal(sbMessage.deliveryCount, amqpMessage.delivery_count); + assert.equal(sbMessage.timeToLive, amqpMessage.ttl); + }); + + it("message annotations match", () => { + if (!sbMessage._amqpAnnotatedMessage.messageAnnotations) { + throw new Error("Message Annotations should not be empty"); + } + + for (const key in message_annotations) { + if (Object.prototype.hasOwnProperty.call(message_annotations, key)) { + assert.equal( + sbMessage._amqpAnnotatedMessage.messageAnnotations[key], + message_annotations[key], + `Unexpected value for key: ${key}` + ); + } + } + + assert.equal( + sbMessage.partitionKey, + message_annotations[Constants.partitionKey], + "Unexpected Partition Key" + ); + assert.equal( + sbMessage.viaPartitionKey, + message_annotations[Constants.viaPartitionKey], + "Unexpected Via Partition Key" + ); + }); + + it("delivery annotations match", () => { + if (!sbMessage._amqpAnnotatedMessage.deliveryAnnotations) { + throw new Error("Delivery Annotations should not be empty"); + } + + for (const key in delivery_annotations) { + if (Object.prototype.hasOwnProperty.call(delivery_annotations, key)) { + assert.equal( + sbMessage._amqpAnnotatedMessage.deliveryAnnotations[key], + delivery_annotations[key], + `Unexpected value for key: ${key}` + ); + } + } + }); + + it("properties match", () => { + assert.equal(sbMessage._amqpAnnotatedMessage.properties?.messageId, amqpMessage.message_id); + assert.equal(sbMessage._amqpAnnotatedMessage.properties?.replyTo, amqpMessage.reply_to); + assert.equal(sbMessage._amqpAnnotatedMessage.properties?.to, amqpMessage.to); + assert.equal( + sbMessage._amqpAnnotatedMessage.properties?.correlationId, + amqpMessage.correlation_id + ); + assert.equal(sbMessage._amqpAnnotatedMessage.properties?.contentType, amqpMessage.content_type); + assert.equal( + sbMessage._amqpAnnotatedMessage.properties?.contentEncoding, + amqpMessage.content_encoding + ); + assert.equal( + sbMessage._amqpAnnotatedMessage.properties?.absoluteExpiryTime, + amqpMessage.absolute_expiry_time + ); + assert.equal( + sbMessage._amqpAnnotatedMessage.properties?.creationTime, + amqpMessage.creation_time + ); + assert.equal(sbMessage._amqpAnnotatedMessage.properties?.groupId, amqpMessage.group_id); + assert.equal( + sbMessage._amqpAnnotatedMessage.properties?.replyToGroupId, + amqpMessage.reply_to_group_id + ); + assert.equal( + sbMessage._amqpAnnotatedMessage.properties?.groupSequence, + amqpMessage.group_sequence + ); + assert.equal(sbMessage._amqpAnnotatedMessage.properties?.subject, amqpMessage.subject); + assert.equal(sbMessage._amqpAnnotatedMessage.properties?.userId, amqpMessage.user_id); + + assert.equal(sbMessage._amqpAnnotatedMessage.properties?.messageId, sbMessage.messageId); + assert.equal(sbMessage._amqpAnnotatedMessage.properties?.replyTo, sbMessage.replyTo); + assert.equal(sbMessage._amqpAnnotatedMessage.properties?.to, sbMessage.to); + assert.equal( + sbMessage._amqpAnnotatedMessage.properties?.correlationId, + sbMessage.correlationId + ); + assert.equal(sbMessage._amqpAnnotatedMessage.properties?.contentType, sbMessage.contentType); + assert.equal(sbMessage._amqpAnnotatedMessage.properties?.groupId, sbMessage.sessionId); + assert.equal( + sbMessage._amqpAnnotatedMessage.properties?.replyToGroupId, + sbMessage.replyToSessionId + ); + assert.equal(sbMessage._amqpAnnotatedMessage.properties?.subject, sbMessage.label); + }); +});