Skip to content

Commit

Permalink
[Service Bus] Filling the gaps for AmqpAnnotatedMessage (Azure#14786)
Browse files Browse the repository at this point in the history
Azure#14703 missed a few things while translating Annotated message into RheaMessage.
This PR attempts to fill the gaps.
  • Loading branch information
HarshaNalluru authored and vindicatesociety committed Apr 26, 2021
1 parent 252e64c commit 0ca7ba7
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 47 deletions.
1 change: 1 addition & 0 deletions sdk/core/core-amqp/review/core-amqp.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export interface AmqpAnnotatedMessage {
// @public
export const AmqpAnnotatedMessage: {
fromRheaMessage(msg: Message): AmqpAnnotatedMessage;
toRheaMessage(msg: AmqpAnnotatedMessage): Message;
};

// @public
Expand Down
15 changes: 15 additions & 0 deletions sdk/core/core-amqp/src/amqpAnnotatedMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,20 @@ export const AmqpAnnotatedMessage = {
properties: AmqpMessageProperties.fromRheaMessageProperties(msg),
body: msg.body
};
},
/**
* Takes AmqpAnnotatedMessage and returns it in the RheaMessage(`Message` type from "rhea") format.
*/
toRheaMessage(msg: AmqpAnnotatedMessage): RheaMessage {
const message = {
...AmqpMessageProperties.toRheaMessageProperties(msg.properties || {}),
...AmqpMessageHeader.toRheaMessageHeader(msg.header || {}),
body: msg.body,
message_annotations: msg.messageAnnotations,
delivery_annotations: msg.deliveryAnnotations,
application_properties: msg.applicationProperties,
footer: msg.footer
};
return message;
}
};
84 changes: 38 additions & 46 deletions sdk/servicebus/service-bus/src/serviceBusMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,51 +278,52 @@ export function toRheaMessage(
msg: ServiceBusMessage | ServiceBusReceivedMessage | AmqpAnnotatedMessage,
encoder: Pick<typeof defaultDataTransformer, "encode">
): RheaMessage {
let amqpMsg: RheaMessage;
if (isAmqpAnnotatedMessage(msg)) {
const amqpMsg: RheaMessage = {
body: encoder.encode(msg.body, msg.bodyType ?? "data"),
message_annotations: {}
amqpMsg = {
...AmqpAnnotatedMessage.toRheaMessage(msg),
body: encoder.encode(msg.body, msg.bodyType ?? "data")
};

if (msg.applicationProperties != null) {
amqpMsg.application_properties = msg.applicationProperties;
} else {
let bodyType: "data" | "sequence" | "value" = "data";

if (isServiceBusReceivedMessage(msg)) {
/*
* TODO: this is a bit complicated.
*
* It seems reasonable to expect to be able to round-trip a message (ie,
* receive a message, and then send it again, possibly to another queue / topic).
* If the user does that we need to make sure to respect their original AMQP
* type so when the message is re - encoded we don't put 'body' into the wrong spot.
*
* The complication is that we need to decide if we're okay with respecting a field
* from the rawAmqpMessage, which up until now we've treated as just vestigial
* information on send. My hope is that the use case of "alter the sb message in some
* incompatible way with the underying _rawAmqpMessage.bodyType" is not common
* enough for us to try to do anything more than what I'm doing here.
*/
bodyType = msg._rawAmqpMessage.bodyType ?? "data";
}

if (msg.messageAnnotations != null) {
amqpMsg.message_annotations = msg.messageAnnotations;
}
amqpMsg = {
body: encoder.encode(msg.body, bodyType),
message_annotations: {}
};

if (msg.deliveryAnnotations != null) {
amqpMsg.delivery_annotations = msg.deliveryAnnotations;
}
amqpMsg.ttl = msg.timeToLive;
}

return amqpMsg;
if (amqpMsg.ttl != null && amqpMsg.ttl !== Constants.maxDurationValue) {
amqpMsg.creation_time = Date.now();
amqpMsg.absolute_expiry_time = Math.min(
Constants.maxAbsoluteExpiryTime,
amqpMsg.creation_time + amqpMsg.ttl
);
}

let bodyType: "data" | "sequence" | "value" = "data";

if (isServiceBusReceivedMessage(msg)) {
/*
* TODO: this is a bit complicated.
*
* It seems reasonable to expect to be able to round-trip a message (ie,
* receive a message, and then send it again, possibly to another queue / topic).
* If the user does that we need to make sure to respect their original AMQP
* type so when the message is re - encoded we don't put 'body' into the wrong spot.
*
* The complication is that we need to decide if we're okay with respecting a field
* from the rawAmqpMessage, which up until now we've treated as just vestigial
* information on send. My hope is that the use case of "alter the sb message in some
* incompatible way with the underying _rawAmqpMessage.bodyType" is not common
* enough for us to try to do anything more than what I'm doing here.
*/
bodyType = msg._rawAmqpMessage.bodyType ?? "data";
}

const amqpMsg: RheaMessage = {
body: encoder.encode(msg.body, bodyType),
message_annotations: {}
};
if (isAmqpAnnotatedMessage(msg)) {
return amqpMsg;
}

if (msg.applicationProperties != null) {
amqpMsg.application_properties = msg.applicationProperties;
Expand Down Expand Up @@ -362,15 +363,6 @@ export function toRheaMessage(
if (msg.replyToSessionId != null) {
amqpMsg.reply_to_group_id = msg.replyToSessionId;
}
if (msg.timeToLive != null && msg.timeToLive !== Constants.maxDurationValue) {
amqpMsg.ttl = msg.timeToLive;
amqpMsg.creation_time = Date.now();
if (Constants.maxAbsoluteExpiryTime - amqpMsg.creation_time > amqpMsg.ttl) {
amqpMsg.absolute_expiry_time = amqpMsg.creation_time + amqpMsg.ttl;
} else {
amqpMsg.absolute_expiry_time = Constants.maxAbsoluteExpiryTime;
}
}
if (msg.partitionKey != null) {
if (msg.partitionKey.length > Constants.maxPartitionKeyLength) {
throw new Error(
Expand Down
140 changes: 140 additions & 0 deletions sdk/servicebus/service-bus/test/public/amqpAnnotatedMessage.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import chai from "chai";
import chaiAsPromised from "chai-as-promised";
import { ServiceBusSender } from "../../src";
import { ServiceBusReceiver } from "../../src/receivers/receiver";
import {
ServiceBusClientForTests,
createServiceBusClientForTests,
testPeekMsgsLength,
EntityName,
getRandomTestClientType
} from "../public/utils/testutils2";
import { AmqpAnnotatedMessage } from "@azure/core-amqp";
import { generateUuid } from "@azure/core-http";

const should = chai.should();
chai.use(chaiAsPromised);
const assert = chai.assert;

const anyRandomTestClientType = getRandomTestClientType();

let serviceBusClient: ServiceBusClientForTests;
let entityNames: EntityName;
let sender: ServiceBusSender;
let receiver: ServiceBusReceiver;
const sessionId = "session-1";

function afterEachTest(): Promise<void> {
return serviceBusClient.test.afterEach();
}

function getSampleAmqpAnnotatedMessage(randomTag?: string): AmqpAnnotatedMessage {
if (randomTag == null) {
randomTag = Math.random().toString();
}

return {
body: `message body ${randomTag}`,
bodyType: "data",
header: {
deliveryCount: 10, // TODO: Doesn't make sense to set on the message to be sent, should this be removed for sending?
durable: false,
firstAcquirer: false,
priority: 20,
timeToLive: 100000
},
applicationProperties: {
propOne: 1,
propTwo: "two",
propThree: true,
propFour: Date()
},
// deliveryAnnotations - TODO: should this be removed for sending?
footer: {
propFooter: "foot"
},
messageAnnotations: { propMsgAnnotate: "annotation" },
properties: {
contentEncoding: "application/json; charset=utf-8",
correlationId: randomTag,
messageId: generateUuid()
}
};
}

describe("AmqpAnnotatedMessage", function(): void {
before(() => {
serviceBusClient = createServiceBusClientForTests();
});

after(() => {
return serviceBusClient.test.after();
});

afterEach(async () => {
await afterEachTest();
});

async function receiveMsg(testMessage: AmqpAnnotatedMessage) {
receiver = await serviceBusClient.test.createReceiveAndDeleteReceiver({
...entityNames,
sessionId
});
const msgs = await receiver.receiveMessages(1);

should.equal(Array.isArray(msgs), true, "`ReceivedMessages` is not an array");
should.equal(msgs.length, 1, "Unexpected number of messages");
should.equal(msgs[0].body, testMessage.body, "Unexpected body on the received message");
should.equal(
msgs[0]._rawAmqpMessage.messageAnnotations!["propMsgAnnotate"],
testMessage.messageAnnotations!["propMsgAnnotate"],
"Unexpected messageAnnotations on the received message"
);
assert.deepEqualExcluding(
msgs[0]._rawAmqpMessage,
testMessage,
["deliveryAnnotations", "body", "messageAnnotations", "header", "properties"],
"Unexpected on the AmqpAnnotatedMessage"
);
assert.deepEqualExcluding(
msgs[0]._rawAmqpMessage.header!,
testMessage.header!,
["deliveryCount"],
"Unexpected header on the AmqpAnnotatedMessage"
);
assert.deepEqualExcluding(
msgs[0]._rawAmqpMessage.properties!,
testMessage.properties!,
["creationTime", "absoluteExpiryTime", "groupId"],
"Unexpected properties on the AmqpAnnotatedMessage"
);
assert.equal(
msgs[0]._rawAmqpMessage.properties!.groupId,
testMessage.properties!.groupId,
"Unexpected session-id on the AmqpAnnotatedMessage"
);
}

it(
anyRandomTestClientType + ": send, receive, verify props, and complete()",
async function(): Promise<void> {
entityNames = await serviceBusClient.test.createTestEntities(anyRandomTestClientType);

const testMessage: AmqpAnnotatedMessage = getSampleAmqpAnnotatedMessage();
testMessage.properties = {
...testMessage.properties,
groupId: entityNames.usesSessions ? sessionId : undefined
};
sender = serviceBusClient.test.addToCleanup(
serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!)
);
await sender.sendMessages(testMessage);
await receiveMsg(testMessage);

await testPeekMsgsLength(receiver, 0);
}
);
});
3 changes: 2 additions & 1 deletion sdk/servicebus/service-bus/test/public/utils/testUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ export class TestMessage {
applicationProperties: {
propOne: 1,
propTwo: "two",
propThree: true
propThree: true,
propFour: Date()
},
sessionId: TestMessage.sessionId,
replyToSessionId: "some-other-session-id"
Expand Down

0 comments on commit 0ca7ba7

Please sign in to comment.