Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[service-bus] Final work to expose AMQP body type encoding publicly #15295

Merged
merged 7 commits into from
May 17, 2021
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Release History

## 7.2.0-beta.1 (Unreleased)
## 7.2.0-beta.1 (2021-05-18)

### New Features

Expand Down
6 changes: 3 additions & 3 deletions sdk/servicebus/service-bus/review/service-bus.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ export interface ServiceBusMessageBatch {
// @internal
readonly _messageSpanContexts: SpanContext[];
readonly sizeInBytes: number;
tryAddMessage(message: ServiceBusMessage, options?: TryAddOptions): boolean;
tryAddMessage(message: ServiceBusMessage | AmqpAnnotatedMessage, options?: TryAddOptions): boolean;
}

// @public
Expand Down Expand Up @@ -458,8 +458,8 @@ export interface ServiceBusSender {
createMessageBatch(options?: CreateMessageBatchOptions): Promise<ServiceBusMessageBatch>;
entityPath: string;
isClosed: boolean;
scheduleMessages(messages: ServiceBusMessage | ServiceBusMessage[], scheduledEnqueueTimeUtc: Date, options?: OperationOptionsBase): Promise<Long[]>;
sendMessages(messages: ServiceBusMessage | ServiceBusMessage[] | ServiceBusMessageBatch, options?: OperationOptionsBase): Promise<void>;
scheduleMessages(messages: ServiceBusMessage | ServiceBusMessage[] | AmqpAnnotatedMessage | AmqpAnnotatedMessage[], scheduledEnqueueTimeUtc: Date, options?: OperationOptionsBase): Promise<Long[]>;
sendMessages(messages: ServiceBusMessage | ServiceBusMessage[] | ServiceBusMessageBatch | AmqpAnnotatedMessage | AmqpAnnotatedMessage[], options?: OperationOptionsBase): Promise<void>;
}

// @public
Expand Down
76 changes: 62 additions & 14 deletions sdk/servicebus/service-bus/src/core/managementClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import {
MessagingError,
RequestResponseLink,
SendRequestOptions,
RetryOptions
RetryOptions,
AmqpAnnotatedMessage
} from "@azure/core-amqp";
import { ConnectionContext } from "../connectionContext";
import {
Expand All @@ -29,7 +30,9 @@ import {
ServiceBusMessage,
ServiceBusMessageImpl,
toRheaMessage,
fromRheaMessage
fromRheaMessage,
updateScheduledTime,
updateMessageId
} from "../serviceBusMessage";
import { LinkEntity, RequestResponseLinkOptions } from "./linkEntity";
import { managementClientLogger, receiverLogger, senderLogger, ServiceBusLogger } from "../log";
Expand Down Expand Up @@ -585,7 +588,7 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
*/
async scheduleMessages(
scheduledEnqueueTimeUtc: Date,
messages: ServiceBusMessage[],
messages: ServiceBusMessage[] | AmqpAnnotatedMessage[],
options?: OperationOptionsBase & SendManagementRequestOptions
): Promise<Long[]> {
throwErrorIfConnectionClosed(this._context);
Expand All @@ -595,21 +598,28 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
const messageBody: any[] = [];
for (let i = 0; i < messages.length; i++) {
const item = messages[i];
if (!item.messageId) item.messageId = generate_uuid();
item.scheduledEnqueueTimeUtc = scheduledEnqueueTimeUtc;

try {
const amqpMessage = toRheaMessage(item, defaultDataTransformer);

const entry: any = {
message: RheaMessageUtil.encode(amqpMessage),
"message-id": item.messageId
const rheaMessage = toRheaMessage(item, defaultDataTransformer);
updateMessageId(rheaMessage, rheaMessage.message_id || generate_uuid());
updateScheduledTime(rheaMessage, scheduledEnqueueTimeUtc);

const entry: {
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
message: Buffer;
["message-id"]: ServiceBusMessage["messageId"];
["partition-key"]?: ServiceBusMessage["partitionKey"];
[Constants.sessionIdMapKey]?: string | undefined;
} = {
message: RheaMessageUtil.encode(rheaMessage),
"message-id": rheaMessage.message_id
};
if (item.sessionId) {
entry[Constants.sessionIdMapKey] = item.sessionId;

if (rheaMessage.group_id) {
entry[Constants.sessionIdMapKey] = rheaMessage.group_id;
}
if (item.partitionKey) {
entry["partition-key"] = item.partitionKey;

if (rheaMessage.message_annotations?.[Constants.partitionKey]) {
entry["partition-key"] = rheaMessage.message_annotations[Constants.partitionKey];
}

// Will be required later for implementing Transactions
Expand Down Expand Up @@ -1319,3 +1329,41 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
}
}
}

/**
* Converts an AmqpAnnotatedMessage or ServiceBusMessage into a properly formatted
* message for sending to the mgmt link for scheduling.
*
* @internal
* @hidden
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
*/
export function toScheduleableMessage(
item: ServiceBusMessage | AmqpAnnotatedMessage,
scheduledEnqueueTimeUtc: Date
) {
const rheaMessage = toRheaMessage(item, defaultDataTransformer);
updateMessageId(rheaMessage, rheaMessage.message_id || generate_uuid());
updateScheduledTime(rheaMessage, scheduledEnqueueTimeUtc);

const entry: any = {
message: RheaMessageUtil.encode(rheaMessage),
"message-id": rheaMessage.message_id
};

rheaMessage.message_annotations = {
...rheaMessage.message_annotations,
[Constants.scheduledEnqueueTime]: scheduledEnqueueTimeUtc
};

if (rheaMessage.group_id) {
entry[Constants.sessionIdMapKey] = rheaMessage.group_id;
}

const partitionKey =
rheaMessage.message_annotations && rheaMessage.message_annotations[Constants.partitionKey];

if (partitionKey) {
entry["partition-key"] = partitionKey;
}
return entry;
}
29 changes: 20 additions & 9 deletions sdk/servicebus/service-bus/src/sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@ export interface ServiceBusSender {
* @throws `ServiceBusError` if the service returns an error while sending messages to the service.
*/
sendMessages(
messages: ServiceBusMessage | ServiceBusMessage[] | ServiceBusMessageBatch,
messages:
| ServiceBusMessage
| ServiceBusMessage[]
| ServiceBusMessageBatch
| AmqpAnnotatedMessage
| AmqpAnnotatedMessage[],
options?: OperationOptionsBase
): Promise<void>;

Expand Down Expand Up @@ -98,7 +103,11 @@ export interface ServiceBusSender {
* @throws `ServiceBusError` if the service returns an error while scheduling messages.
*/
scheduleMessages(
messages: ServiceBusMessage | ServiceBusMessage[],
messages:
| ServiceBusMessage
| ServiceBusMessage[]
| AmqpAnnotatedMessage
| AmqpAnnotatedMessage[],
scheduledEnqueueTimeUtc: Date,
options?: OperationOptionsBase
): Promise<Long[]>;
Expand Down Expand Up @@ -242,7 +251,11 @@ export class ServiceBusSenderImpl implements ServiceBusSender {
}

async scheduleMessages(
messages: ServiceBusMessage | ServiceBusMessage[],
messages:
| ServiceBusMessage
| ServiceBusMessage[]
| AmqpAnnotatedMessage
| AmqpAnnotatedMessage[],
scheduledEnqueueTimeUtc: Date,
options: OperationOptionsBase = {}
): Promise<Long[]> {
Expand Down Expand Up @@ -299,16 +312,14 @@ export class ServiceBusSenderImpl implements ServiceBusSender {
? sequenceNumbers
: [sequenceNumbers];
const cancelSchedulesMessagesOperationPromise = async (): Promise<void> => {
return this._context.getManagementClient(this._entityPath).cancelScheduledMessages(
sequenceNumbersToCancel,

{
return this._context
.getManagementClient(this._entityPath)
.cancelScheduledMessages(sequenceNumbersToCancel, {
...options,
associatedLinkName: this._sender.name,
requestName: "cancelScheduledMessages",
timeoutInMs: this._retryOptions.timeoutInMs
}
);
});
};
const config: RetryConfig<void> = {
operation: cancelSchedulesMessagesOperationPromise,
Expand Down
Loading