diff --git a/sdk/core/core-amqp/review/core-amqp.api.md b/sdk/core/core-amqp/review/core-amqp.api.md index b4c78544d1d5..aa3f5a3e1152 100644 --- a/sdk/core/core-amqp/review/core-amqp.api.md +++ b/sdk/core/core-amqp/review/core-amqp.api.md @@ -45,6 +45,7 @@ export interface AmqpAnnotatedMessage { // @public export const AmqpAnnotatedMessage: { fromRheaMessage(msg: Message): AmqpAnnotatedMessage; + toRheaMessage(msg: AmqpAnnotatedMessage): Message; }; // @public diff --git a/sdk/core/core-amqp/src/amqpAnnotatedMessage.ts b/sdk/core/core-amqp/src/amqpAnnotatedMessage.ts index a910d79fe38c..24ef3472a895 100644 --- a/sdk/core/core-amqp/src/amqpAnnotatedMessage.ts +++ b/sdk/core/core-amqp/src/amqpAnnotatedMessage.ts @@ -62,5 +62,20 @@ export const AmqpAnnotatedMessage = { properties: AmqpMessageProperties.fromRheaMessageProperties(msg), body: msg.body }; + }, + /** + * Takes AmqpAnnotatedMessage and returns it in the RheaMessage(`Message` type from "rhea") format. + */ + toRheaMessage(msg: AmqpAnnotatedMessage): RheaMessage { + const message = { + ...AmqpMessageProperties.toRheaMessageProperties(msg.properties || {}), + ...AmqpMessageHeader.toRheaMessageHeader(msg.header || {}), + body: msg.body, + message_annotations: msg.messageAnnotations, + delivery_annotations: msg.deliveryAnnotations, + application_properties: msg.applicationProperties, + footer: msg.footer + }; + return message; } }; diff --git a/sdk/servicebus/service-bus/src/serviceBusMessage.ts b/sdk/servicebus/service-bus/src/serviceBusMessage.ts index b0b79b9cde7a..bfd1abaa992e 100644 --- a/sdk/servicebus/service-bus/src/serviceBusMessage.ts +++ b/sdk/servicebus/service-bus/src/serviceBusMessage.ts @@ -278,51 +278,52 @@ export function toRheaMessage( msg: ServiceBusMessage | ServiceBusReceivedMessage | AmqpAnnotatedMessage, encoder: Pick ): RheaMessage { + let amqpMsg: RheaMessage; if (isAmqpAnnotatedMessage(msg)) { - const amqpMsg: RheaMessage = { - body: encoder.encode(msg.body, msg.bodyType ?? "data"), - message_annotations: {} + amqpMsg = { + ...AmqpAnnotatedMessage.toRheaMessage(msg), + body: encoder.encode(msg.body, msg.bodyType ?? "data") }; - - if (msg.applicationProperties != null) { - amqpMsg.application_properties = msg.applicationProperties; + } else { + let bodyType: "data" | "sequence" | "value" = "data"; + + if (isServiceBusReceivedMessage(msg)) { + /* + * TODO: this is a bit complicated. + * + * It seems reasonable to expect to be able to round-trip a message (ie, + * receive a message, and then send it again, possibly to another queue / topic). + * If the user does that we need to make sure to respect their original AMQP + * type so when the message is re - encoded we don't put 'body' into the wrong spot. + * + * The complication is that we need to decide if we're okay with respecting a field + * from the rawAmqpMessage, which up until now we've treated as just vestigial + * information on send. My hope is that the use case of "alter the sb message in some + * incompatible way with the underying _rawAmqpMessage.bodyType" is not common + * enough for us to try to do anything more than what I'm doing here. + */ + bodyType = msg._rawAmqpMessage.bodyType ?? "data"; } - if (msg.messageAnnotations != null) { - amqpMsg.message_annotations = msg.messageAnnotations; - } + amqpMsg = { + body: encoder.encode(msg.body, bodyType), + message_annotations: {} + }; - if (msg.deliveryAnnotations != null) { - amqpMsg.delivery_annotations = msg.deliveryAnnotations; - } + amqpMsg.ttl = msg.timeToLive; + } - return amqpMsg; + if (amqpMsg.ttl != null && amqpMsg.ttl !== Constants.maxDurationValue) { + amqpMsg.creation_time = Date.now(); + amqpMsg.absolute_expiry_time = Math.min( + Constants.maxAbsoluteExpiryTime, + amqpMsg.creation_time + amqpMsg.ttl + ); } - let bodyType: "data" | "sequence" | "value" = "data"; - - if (isServiceBusReceivedMessage(msg)) { - /* - * TODO: this is a bit complicated. - * - * It seems reasonable to expect to be able to round-trip a message (ie, - * receive a message, and then send it again, possibly to another queue / topic). - * If the user does that we need to make sure to respect their original AMQP - * type so when the message is re - encoded we don't put 'body' into the wrong spot. - * - * The complication is that we need to decide if we're okay with respecting a field - * from the rawAmqpMessage, which up until now we've treated as just vestigial - * information on send. My hope is that the use case of "alter the sb message in some - * incompatible way with the underying _rawAmqpMessage.bodyType" is not common - * enough for us to try to do anything more than what I'm doing here. - */ - bodyType = msg._rawAmqpMessage.bodyType ?? "data"; - } - - const amqpMsg: RheaMessage = { - body: encoder.encode(msg.body, bodyType), - message_annotations: {} - }; + if (isAmqpAnnotatedMessage(msg)) { + return amqpMsg; + } if (msg.applicationProperties != null) { amqpMsg.application_properties = msg.applicationProperties; @@ -362,15 +363,6 @@ export function toRheaMessage( if (msg.replyToSessionId != null) { amqpMsg.reply_to_group_id = msg.replyToSessionId; } - if (msg.timeToLive != null && msg.timeToLive !== Constants.maxDurationValue) { - amqpMsg.ttl = msg.timeToLive; - amqpMsg.creation_time = Date.now(); - if (Constants.maxAbsoluteExpiryTime - amqpMsg.creation_time > amqpMsg.ttl) { - amqpMsg.absolute_expiry_time = amqpMsg.creation_time + amqpMsg.ttl; - } else { - amqpMsg.absolute_expiry_time = Constants.maxAbsoluteExpiryTime; - } - } if (msg.partitionKey != null) { if (msg.partitionKey.length > Constants.maxPartitionKeyLength) { throw new Error( diff --git a/sdk/servicebus/service-bus/test/public/amqpAnnotatedMessage.spec.ts b/sdk/servicebus/service-bus/test/public/amqpAnnotatedMessage.spec.ts new file mode 100644 index 000000000000..eb1f9cb15192 --- /dev/null +++ b/sdk/servicebus/service-bus/test/public/amqpAnnotatedMessage.spec.ts @@ -0,0 +1,140 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import chai from "chai"; +import chaiAsPromised from "chai-as-promised"; +import { ServiceBusSender } from "../../src"; +import { ServiceBusReceiver } from "../../src/receivers/receiver"; +import { + ServiceBusClientForTests, + createServiceBusClientForTests, + testPeekMsgsLength, + EntityName, + getRandomTestClientType +} from "../public/utils/testutils2"; +import { AmqpAnnotatedMessage } from "@azure/core-amqp"; +import { generateUuid } from "@azure/core-http"; + +const should = chai.should(); +chai.use(chaiAsPromised); +const assert = chai.assert; + +const anyRandomTestClientType = getRandomTestClientType(); + +let serviceBusClient: ServiceBusClientForTests; +let entityNames: EntityName; +let sender: ServiceBusSender; +let receiver: ServiceBusReceiver; +const sessionId = "session-1"; + +function afterEachTest(): Promise { + return serviceBusClient.test.afterEach(); +} + +function getSampleAmqpAnnotatedMessage(randomTag?: string): AmqpAnnotatedMessage { + if (randomTag == null) { + randomTag = Math.random().toString(); + } + + return { + body: `message body ${randomTag}`, + bodyType: "data", + header: { + deliveryCount: 10, // TODO: Doesn't make sense to set on the message to be sent, should this be removed for sending? + durable: false, + firstAcquirer: false, + priority: 20, + timeToLive: 100000 + }, + applicationProperties: { + propOne: 1, + propTwo: "two", + propThree: true, + propFour: Date() + }, + // deliveryAnnotations - TODO: should this be removed for sending? + footer: { + propFooter: "foot" + }, + messageAnnotations: { propMsgAnnotate: "annotation" }, + properties: { + contentEncoding: "application/json; charset=utf-8", + correlationId: randomTag, + messageId: generateUuid() + } + }; +} + +describe("AmqpAnnotatedMessage", function(): void { + before(() => { + serviceBusClient = createServiceBusClientForTests(); + }); + + after(() => { + return serviceBusClient.test.after(); + }); + + afterEach(async () => { + await afterEachTest(); + }); + + async function receiveMsg(testMessage: AmqpAnnotatedMessage) { + receiver = await serviceBusClient.test.createReceiveAndDeleteReceiver({ + ...entityNames, + sessionId + }); + const msgs = await receiver.receiveMessages(1); + + should.equal(Array.isArray(msgs), true, "`ReceivedMessages` is not an array"); + should.equal(msgs.length, 1, "Unexpected number of messages"); + should.equal(msgs[0].body, testMessage.body, "Unexpected body on the received message"); + should.equal( + msgs[0]._rawAmqpMessage.messageAnnotations!["propMsgAnnotate"], + testMessage.messageAnnotations!["propMsgAnnotate"], + "Unexpected messageAnnotations on the received message" + ); + assert.deepEqualExcluding( + msgs[0]._rawAmqpMessage, + testMessage, + ["deliveryAnnotations", "body", "messageAnnotations", "header", "properties"], + "Unexpected on the AmqpAnnotatedMessage" + ); + assert.deepEqualExcluding( + msgs[0]._rawAmqpMessage.header!, + testMessage.header!, + ["deliveryCount"], + "Unexpected header on the AmqpAnnotatedMessage" + ); + assert.deepEqualExcluding( + msgs[0]._rawAmqpMessage.properties!, + testMessage.properties!, + ["creationTime", "absoluteExpiryTime", "groupId"], + "Unexpected properties on the AmqpAnnotatedMessage" + ); + assert.equal( + msgs[0]._rawAmqpMessage.properties!.groupId, + testMessage.properties!.groupId, + "Unexpected session-id on the AmqpAnnotatedMessage" + ); + } + + it( + anyRandomTestClientType + ": send, receive, verify props, and complete()", + async function(): Promise { + entityNames = await serviceBusClient.test.createTestEntities(anyRandomTestClientType); + + const testMessage: AmqpAnnotatedMessage = getSampleAmqpAnnotatedMessage(); + testMessage.properties = { + ...testMessage.properties, + groupId: entityNames.usesSessions ? sessionId : undefined + }; + sender = serviceBusClient.test.addToCleanup( + serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) + ); + await sender.sendMessages(testMessage); + await receiveMsg(testMessage); + + await testPeekMsgsLength(receiver, 0); + } + ); +}); diff --git a/sdk/servicebus/service-bus/test/public/utils/testUtils.ts b/sdk/servicebus/service-bus/test/public/utils/testUtils.ts index 22a76f4b8d21..0b0f816c4c46 100644 --- a/sdk/servicebus/service-bus/test/public/utils/testUtils.ts +++ b/sdk/servicebus/service-bus/test/public/utils/testUtils.ts @@ -50,7 +50,8 @@ export class TestMessage { applicationProperties: { propOne: 1, propTwo: "two", - propThree: true + propThree: true, + propFour: Date() }, sessionId: TestMessage.sessionId, replyToSessionId: "some-other-session-id"