Skip to content

Commit

Permalink
[Service Bus] Add AMQPAnnotatedMessage to the ReceivedMessage (#10957)
Browse files Browse the repository at this point in the history
  • Loading branch information
HarshaNalluru authored Sep 3, 2020
1 parent 33ffbb3 commit fcca834
Show file tree
Hide file tree
Showing 4 changed files with 424 additions and 14 deletions.
48 changes: 46 additions & 2 deletions sdk/servicebus/service-bus/review/service-bus.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
```ts

import { AmqpMessage } from '@azure/core-amqp';
import { delay } from '@azure/core-amqp';
import { Delivery } from 'rhea-promise';
import { HttpOperationResponse } from '@azure/core-http';
Expand All @@ -22,6 +21,51 @@ import { UserAgentOptions } from '@azure/core-http';
import { WebSocketImpl } from 'rhea-promise';
import { WebSocketOptions } from '@azure/core-amqp';

// @public
export interface AmqpAnnotatedMessage {
applicationProperties?: {
[key: string]: any;
};
body: any;
deliveryAnnotations?: {
[key: string]: any;
};
footer?: {
[key: string]: any;
};
header?: AmqpMessageHeader;
messageAnnotations?: {
[key: string]: any;
};
properties?: AmqpMessageProperties;
}

// @public
export interface AmqpMessageHeader {
deliveryCount?: number;
durable?: boolean;
firstAcquirer?: boolean;
priority?: number;
timeToLive?: number;
}

// @public
export interface AmqpMessageProperties {
absoluteExpiryTime?: number;
contentEncoding?: string;
contentType?: string;
correlationId?: string | number | Buffer;
creationTime?: number;
groupId?: string;
groupSequence?: number;
messageId?: string | number | Buffer;
replyTo?: string;
replyToGroupId?: string;
subject?: string;
to?: string;
userId?: string;
}

// @public
export type AuthorizationRule = {
claimType: string;
Expand Down Expand Up @@ -226,7 +270,7 @@ export interface QueueRuntimePropertiesResponse extends QueueRuntimeProperties,

// @public
export interface ReceivedMessage extends ServiceBusMessage {
readonly _amqpMessage: AmqpMessage;
readonly _amqpAnnotatedMessage: AmqpAnnotatedMessage;
readonly deadLetterErrorDescription?: string;
readonly deadLetterReason?: string;
readonly deadLetterSource?: string;
Expand Down
3 changes: 3 additions & 0 deletions sdk/servicebus/service-bus/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ export {
} from "./serviceBusAtomManagementClient";
export { ServiceBusClient } from "./serviceBusClient";
export {
AmqpAnnotatedMessage,
AmqpMessageHeader,
AmqpMessageProperties,
DeadLetterOptions,
ReceivedMessage,
ReceivedMessageWithLock,
Expand Down
178 changes: 166 additions & 12 deletions sdk/servicebus/service-bus/src/serviceBusMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@

import Long from "long";
import { Delivery, DeliveryAnnotations, MessageAnnotations, uuid_to_string } from "rhea-promise";
import { AmqpMessage, Constants, ErrorNameConditionMapper, translate } from "@azure/core-amqp";
import {
AmqpMessage,
Constants,
ErrorNameConditionMapper,
MessageHeader,
MessageProperties,
translate
} from "@azure/core-amqp";
import * as log from "./log";
import { ConnectionContext } from "./connectionContext";
import { reorderLockToken } from "./util/utils";
Expand Down Expand Up @@ -225,6 +232,133 @@ export interface ServiceBusMessage {
properties?: { [key: string]: number | boolean | string | Date };
}

/**
* Describes the AmqpAnnotatedMessage, part of the ReceivedMessage(as `amqpAnnotatedMessage` property).
*/
export interface AmqpAnnotatedMessage {
/**
* Describes the defined set of standard header properties of the message.
*/
header?: AmqpMessageHeader;
/**
* Describes set of footer properties of the message.
*/
footer?: { [key: string]: any };
/**
* A dictionary containing message attributes that will be held in the message header
*/
messageAnnotations?: { [key: string]: any };
/**
* A dictionary used for delivery-specific
* non-standard properties at the head of the message.
*/
deliveryAnnotations?: { [key: string]: any };
/**
* A dictionary containing application specific message properties.
*/
applicationProperties?: { [key: string]: any };
/**
* Describes the defined set of standard properties of the message.
*/
properties?: AmqpMessageProperties;
/**
* The message body.
*/
body: any;
}

/**
* Describes the defined set of standard header properties of the message.
*/
export interface AmqpMessageHeader {
/**
* If this value is true, then this message has not been
* acquired by any other link. Ifthis value is false, then this message MAY have previously
* been acquired by another link or links.
*/
firstAcquirer?: boolean;
/**
* The number of prior unsuccessful delivery attempts.
*/
deliveryCount?: number;
/**
* Time to live in milli seconds.
*/
timeToLive?: number;
/**
* Specifies durability requirements.
*/
durable?: boolean;
/**
* The relative message priority. Higher numbers indicate higher
* priority messages.
*/
priority?: number;
}

/**
* Describes the defined set of standard properties of the message.
*/
export interface AmqpMessageProperties {
/**
* The application message identifier that uniquely idenitifes a message.
* The user is responsible for making sure that this is unique in
* the given context. Guids usually make a good fit.
*/
messageId?: string | number | Buffer;
/**
* The address of the node the message is destined for.
*/
to?: string;
/**
* The id that can be used to mark or
* identify messages between clients.
*/
correlationId?: string | number | Buffer;
/**
* MIME type for the message.
*/
contentType?: string;
/**
* The content-encoding property is used as a modifier to the content-type.
* When present, its valueindicates what additional content encodings have
* been applied to theapplication-data.
*/
contentEncoding?: string;
/**
* The time when this message is considered expired.
*/
absoluteExpiryTime?: number;
/**
* The time this message was created.
*/
creationTime?: number;
/**
* The group this message belongs to.
*/
groupId?: string;
/**
* The sequence number of this message with its group.
*/
groupSequence?: number;
/**
* The address of the node to send replies to.
*/
replyTo?: string;
/**
* The group the reply message belongs to.
*/
replyToGroupId?: string;
/**
* A common field for summary information about the message content and purpose.
*/
subject?: string;
/**
* The identity of the user responsible for producing the message.
*/
userId?: string;
}

/**
* @internal
* @ignore
Expand Down Expand Up @@ -445,7 +579,7 @@ export interface ReceivedMessage extends ServiceBusMessage {
* @property {AmqpMessage} _amqpMessage The underlying raw amqp message.
* @readonly
*/
readonly _amqpMessage: AmqpMessage;
readonly _amqpAnnotatedMessage: AmqpAnnotatedMessage;
}

/**
Expand Down Expand Up @@ -661,20 +795,20 @@ export function fromAmqpMessage(
}

const rcvdsbmsg: ReceivedMessage = {
_amqpMessage: msg,
_amqpAnnotatedMessage: toAmqpAnnotatedMessage(msg),
_delivery: delivery,
deliveryCount: msg.delivery_count,
lockToken:
delivery && delivery.tag && delivery.tag.length !== 0
? uuid_to_string(
shouldReorderLockToken === true
? reorderLockToken(
typeof delivery.tag === "string" ? Buffer.from(delivery.tag) : delivery.tag
)
: typeof delivery.tag === "string"
shouldReorderLockToken === true
? reorderLockToken(
typeof delivery.tag === "string" ? Buffer.from(delivery.tag) : delivery.tag
)
: typeof delivery.tag === "string"
? Buffer.from(delivery.tag)
: delivery.tag
)
)
: undefined,
...sbmsg,
...props,
Expand All @@ -686,6 +820,26 @@ export function fromAmqpMessage(
return rcvdsbmsg;
}

/**
* Takes AmqpMessage(type from "rhea") and returns it in the AmqpAnnotatedMessage format.
*
* @export
* @param {AmqpMessage} msg
* @returns {AmqpAnnotatedMessage}
*/
export function toAmqpAnnotatedMessage(msg: AmqpMessage): AmqpAnnotatedMessage {
const messageHeader = MessageHeader.fromAmqpMessageHeader(msg);
return {
header: { ...messageHeader, timeToLive: messageHeader.ttl },
footer: (msg as any).footer,
messageAnnotations: msg.message_annotations,
deliveryAnnotations: msg.delivery_annotations,
applicationProperties: msg.application_properties,
properties: MessageProperties.fromAmqpMessageProperties(msg),
body: msg.body
};
}

/**
* @internal
* @ignore
Expand Down Expand Up @@ -871,10 +1025,10 @@ export class ServiceBusMessageImpl implements ReceivedMessageWithLock {
*/
readonly delivery: Delivery;
/**
* @property {AmqpMessage} _amqpMessage The underlying raw amqp message.
* @property {AmqpMessage} _amqpAnnotatedMessage The underlying raw amqp annotated message.
* @readonly
*/
readonly _amqpMessage: AmqpMessage;
readonly _amqpAnnotatedMessage: AmqpAnnotatedMessage;
/**
* @property The reason for deadlettering the message.
* @readonly
Expand Down Expand Up @@ -913,7 +1067,7 @@ export class ServiceBusMessageImpl implements ReceivedMessageWithLock {
if (msg.body) {
this.body = this._context.dataTransformer.decode(msg.body);
}
this._amqpMessage = msg;
this._amqpAnnotatedMessage = toAmqpAnnotatedMessage(msg);
this.delivery = delivery;
}

Expand Down
Loading

0 comments on commit fcca834

Please sign in to comment.